diff --git a/main.c b/main.c index f3828b9..13b7b29 100644 --- a/main.c +++ b/main.c @@ -13,11 +13,11 @@ #define TAG_BREAK 5 #define TAG_STLEN 6 #define TAG_SWORD 7 -#define TAG_IWORD 8 +#define TAG_IWIND 8 #define TAG_INSTR 9 #define TAG_TERMT 10 -#define COMM 1 +#define COMM 25 #define ITER 690 #define BS 32 #define EMB 32 @@ -97,6 +97,10 @@ int role_id_from_mpi_id(Role role, int mid) { return rid; } +int my_role_id(Role role) { + return role_id_from_mpi_id(role, my_mpi_id()); +} + Role map_node() { int node = my_mpi_id(); size_t base = 0; @@ -164,9 +168,19 @@ int recv_word(Word* w, int src) { return the_src; } +void send_window(long* window, size_t winsize, int dest) { + MPI_Send(window, winsize, MPI_LONG, dest, TAG_IWIND, MPI_COMM_WORLD); +} + +void recv_window(long* window, size_t winsize, int src) { + MPI_Recv(window, winsize, MPI_LONG, src, TAG_IWIND, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); +} + void tokenizer(const char* source) { INFO_PRINTF("Starting tokenizer %d\n", getpid()); - int rid = role_id_from_mpi_id(TOKENIZER, my_mpi_id()); + int rid = my_role_id(TOKENIZER); + int next = mpi_id_from_role_id(FILTERER, rid); WordList wl = {0, 0, NULL}; size_t sync_ctr = 0; @@ -181,75 +195,90 @@ void tokenizer(const char* source) { while (!stop && get_tokens(&wl, source)) { for in_range(i, wl.n_words) { if (sync_ctr == 10000) { - ssend_word(&wl.words[i], mpi_id_from_role_id(FILTERER, rid)); + ssend_word(&wl.words[i], next); sync_ctr = 0; } else { - send_word(&wl.words[i], mpi_id_from_role_id(FILTERER, rid)); + send_word(&wl.words[i], next); } sync_ctr++; } MPI_Test(&stop_req, &stop, MPI_STATUS_IGNORE); } free_wordlist(&wl); - send_word(&terminator, mpi_id_from_role_id(FILTERER, rid)); + send_word(&terminator, next); INFO_PRINTF("Finishing tokenizer %d\n", getpid()); } void filterer() { INFO_PRINTF("Starting filterer %d\n", getpid()); - int rid = role_id_from_mpi_id(FILTERER, my_mpi_id()); + int rid = my_role_id(FILTERER); + int prev = mpi_id_from_role_id(TOKENIZER, rid); + int next = mpi_id_from_role_id(BATCHER, rid); Word w = {0, NULL}; - const size_t entry_size = 2 * WIN + 1; - const size_t bufsize = entry_size; - long* buffer = malloc(bufsize * sizeof(long)); + const size_t window_size = 2 * WIN + 1; + long* window = malloc(window_size * sizeof(long)); size_t have = 0; while (1) { - while (have != entry_size) { // TODO FLATTEN PIPELINE - recv_word(&w, mpi_id_from_role_id(TOKENIZER, rid)); + while (have != window_size) { + recv_word(&w, prev); if (!strlen(w.data)) break; - buffer[have] = vocab_idx_of(&w); - if (buffer[have] != -1) have++; + window[have] = vocab_idx_of(&w); + if (window[have] != -1) have++; } if (!strlen(w.data)) break; have = 0; - MPI_Send(buffer, entry_size, MPI_LONG, - mpi_id_from_role_id(BATCHER, rid), - TAG_IWORD, MPI_COMM_WORLD); + send_window(window, window_size, next); } + window[0] = -1; + send_window(window, window_size, next); free_word(&w); - free(buffer); + free(window); INFO_PRINTF("Finishing filterer %d\n", getpid()); } void batcher() { - int s = 0; - const size_t entry_size = 2 * WIN + 1; - const size_t bufsize = BS * entry_size; + INFO_PRINTF("Starting batcher %d\n", getpid()); + int rid = my_role_id(BATCHER); + int prev = mpi_id_from_role_id(FILTERER, rid); + + int learner_mpi_id = 0; + const size_t window_size = 2 * WIN + 1; + const size_t bufsize = BS * window_size; float* batch = malloc(bufsize * sizeof(float)); - long* l_wid = malloc(entry_size * sizeof(long)); + long* l_wid = malloc(window_size * sizeof(long)); while (1) { for in_range(r, BS) { - MPI_Recv(l_wid, entry_size, MPI_LONG, - mpi_id_from_role_id(FILTERER, 0), - TAG_IWORD, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - for in_range(c, entry_size) { - batch[r*entry_size + c] = (float)l_wid[c]; + recv_window(l_wid, window_size, prev); + + if (l_wid[0] == -1) break; + + for in_range(c, window_size) { + batch[r*window_size + c] = (float)l_wid[c]; } } - MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD, + + if (l_wid[0] == -1) break; + + MPI_Recv(&learner_mpi_id, 1, MPI_INT, MPI_ANY_SOURCE, + TAG_READY, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - MPI_Send(batch, bufsize, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD); + + if (learner_mpi_id == -1) break; + + MPI_Send(batch, bufsize, MPI_FLOAT, learner_mpi_id, TAG_BATCH, + MPI_COMM_WORLD); printf("!\n"); } free(l_wid); free(batch); + INFO_PRINTF("Finishing batcher %d\n", getpid()); } void free_weightlist(WeightList* wl) { @@ -290,37 +319,47 @@ void recv_weights(WeightList* wl, int src) { void learner() { INFO_PRINTF("Starting learner %d\n", getpid()); int me = my_mpi_id(); + int batcher = mpi_id_from_role_id(BATCHER, 0); + int dispatcher = mpi_id_from_role_id(DISPATCHER, 0); PyObject* net = create_network(WIN, EMB); create_test_dataset(WIN); WeightList wl; init_weightlist_like(&wl, net); - size_t entry_size = (2*WIN + 1); - size_t bufsize = BS * entry_size; + size_t window_size = (2*WIN + 1); + size_t bufsize = BS * window_size; float* batch = malloc(bufsize * sizeof(float)); - for in_range(i, COMM) { - recv_weights(&wl, mpi_id_from_role_id(DISPATCHER, 0)); + int go; + MPI_Recv(&go, 1, MPI_INT, dispatcher, TAG_INSTR, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + + while (go != -1) { + recv_weights(&wl, dispatcher); set_net_weights(net, &wl); for in_range(k, ITER) { - MPI_Send(&me, 1, MPI_INT, mpi_id_from_role_id(BATCHER, 0), - TAG_READY, MPI_COMM_WORLD); - MPI_Recv(batch, bufsize, MPI_FLOAT, - mpi_id_from_role_id(BATCHER, 0), TAG_BATCH, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); + MPI_Send(&me, 1, MPI_INT, batcher, TAG_READY, MPI_COMM_WORLD); + MPI_Recv(batch, bufsize, MPI_FLOAT, batcher, TAG_BATCH, + MPI_COMM_WORLD, MPI_STATUS_IGNORE); step_net(net, batch, BS); } update_weightlist(&wl, net); - send_weights(&wl, mpi_id_from_role_id(DISPATCHER, 0)); + send_weights(&wl, dispatcher); + MPI_Recv(&go, 1, MPI_INT, dispatcher, TAG_INSTR, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); } + MPI_Send(&go, 1, MPI_INT, batcher, TAG_READY, MPI_COMM_WORLD); Py_DECREF(net); free_weightlist(&wl); free(batch); + INFO_PRINTF("Finishing learner %d\n", getpid()); } void dispatcher() { INFO_PRINTF("Starting dispatcher %d\n", getpid()); + int go = 1; + PyObject* frank = create_network(WIN, EMB); create_test_dataset(WIN); WeightList wl; @@ -340,13 +379,14 @@ void dispatcher() { time_t start = time(NULL); for in_range(i, COMM) { randidx(round, number_of(LEARNER), lpr); - for in_range(k, lpr) { - // INFO_PRINTF(" %5d", round[k]); - send_weights(&wl, mpi_id_from_role_id(LEARNER, round[k])); + // Instruct learners to learn + int lrnr_mpi_id = mpi_id_from_role_id(LEARNER, round[k]); + MPI_Send(&go, 1, MPI_INT, lrnr_mpi_id, TAG_INSTR, MPI_COMM_WORLD); + send_weights(&wl, lrnr_mpi_id); } - // INFO_PRINTLN(""); for in_range(k, lpr) { + // Collect the results recv_weights(wls + k, mpi_id_from_role_id(LEARNER, round[k])); } combo_weights(&wl, wls, lpr); @@ -356,9 +396,15 @@ void dispatcher() { INFO_PRINTF("Round %ld, validation loss %f\n", i, crt_loss); } - int stop = 1; - MPI_Send(&stop, 1, MPI_INT, mpi_id_from_role_id(TOKENIZER, 0), TAG_TERMT, - MPI_COMM_WORLD); + go = -1; + for in_range(t, number_of(TOKENIZER)) { + MPI_Send(&go, 1, MPI_INT, mpi_id_from_role_id(TOKENIZER, t), + TAG_TERMT, MPI_COMM_WORLD); + } + for in_range(l, number_of(LEARNER)) { + MPI_Send(&go, 1, MPI_INT, mpi_id_from_role_id(LEARNER, l), + TAG_INSTR, MPI_COMM_WORLD); + } time_t finish = time(NULL); float delta_t = finish - start; @@ -372,6 +418,7 @@ void dispatcher() { for in_range(i, lpr) free_weightlist(wls + i); free(wls); free(round); + INFO_PRINTF("Finishing dispatcher %d\n", getpid()); } void visualizer() {