side-quest successful, go eat mom's spaghetti

the side-quest was to flatten the pipeline so that the only
branching is allowed after batcher
This commit is contained in:
2019-12-11 11:19:56 -08:00
parent 32742059c7
commit 136e5938c3

28
main.c
View File

@@ -149,6 +149,7 @@ void ssend_word(Word* w, int dest) {
} }
int recv_word(Word* w, int src) { int recv_word(Word* w, int src) {
// WAT is going on here I have no idea
long len; long len;
MPI_Status stat; MPI_Status stat;
MPI_Recv(&len, 1, MPI_LONG, src, TAG_STLEN, MPI_COMM_WORLD, MPI_Recv(&len, 1, MPI_LONG, src, TAG_STLEN, MPI_COMM_WORLD,
@@ -196,38 +197,33 @@ void tokenizer(const char* source) {
void filterer() { void filterer() {
INFO_PRINTF("Starting filterer %d\n", getpid()); INFO_PRINTF("Starting filterer %d\n", getpid());
int rid = role_id_from_mpi_id(FILTERER, my_mpi_id());
Word w = {0, NULL}; Word w = {0, NULL};
const size_t num_streams = number_of(TOKENIZER);
const size_t entry_size = 2 * WIN + 1; const size_t entry_size = 2 * WIN + 1;
const size_t bufsize = num_streams * entry_size; const size_t bufsize = entry_size;
long* buffer = malloc(bufsize * sizeof(long)); long* buffer = malloc(bufsize * sizeof(long));
size_t* have = calloc(num_streams, sizeof(size_t)); size_t have = 0;
int src = 0; // WLOG
while (1) { while (1) {
int stream_offs; while (have != entry_size) { // TODO FLATTEN PIPELINE
while (have[src] != entry_size) { // TODO FLATTEN PIPELINE recv_word(&w, mpi_id_from_role_id(TOKENIZER, rid));
src = recv_word(&w, MPI_ANY_SOURCE);
if (!strlen(w.data)) break; if (!strlen(w.data)) break;
src = role_id_from_mpi_id(TOKENIZER, src); buffer[have] = vocab_idx_of(&w);
stream_offs = src*entry_size; if (buffer[have] != -1) have++;
buffer[stream_offs + have[src]] = vocab_idx_of(&w);
if (buffer[stream_offs + have[src]] != -1) have[src]++;
} }
if (!strlen(w.data)) break; if (!strlen(w.data)) break;
have[src] = 0; have = 0;
MPI_Send(buffer + stream_offs, entry_size, MPI_LONG, MPI_Send(buffer, entry_size, MPI_LONG,
mpi_id_from_role_id(BATCHER, 0), mpi_id_from_role_id(BATCHER, rid),
TAG_IWORD, MPI_COMM_WORLD); TAG_IWORD, MPI_COMM_WORLD);
} }
free_word(&w); free_word(&w);
free(buffer); free(buffer);
free(have);
INFO_PRINTF("Finishing filterer %d\n", getpid()); INFO_PRINTF("Finishing filterer %d\n", getpid());
} }