diff --git a/.gitignore b/.gitignore index 0c6c49b..37226fc 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ compile_commands.json build/ cythoned/ __pycache__/ +data* diff --git a/bridge.pyx b/bridge.pyx index 953cc1b..9a55ffb 100644 --- a/bridge.pyx +++ b/bridge.pyx @@ -67,16 +67,24 @@ cdef public long vocab_idx_of(Word* w): return -1 -cdef public void f_idx_list_to_print(float* f_idxs, size_t num): - idxs = np.asarray(f_idxs).astype(np.int) +cdef public void _dbg_idx_list_to_print(long* f_idxs, size_t num): + idxs = np.asarray(f_idxs) cdef str pyuni = ' '.join(nn.inv_vocab[i] for i in idxs) - print(pyuni) + eprint(pyuni) -cdef public void debug_print(object o): +cdef public void _dbg_print(object o): eprint(o) +cdef public void _dbg_print_cbow_batch( + object net, float* batch, size_t bs +): + X_np, y_np = cbow_batch(net, batch, bs) + eprint(X_np) + eprint(y_np) + + cdef public void randidx(int* idx, size_t l, size_t how_much): i_np = np.random.choice(l, how_much, replace=False).astype(np.intc) memcpy(idx, PyArray_DATA(i_np), how_much * sizeof(int)) diff --git a/main.c b/main.c index 2646b98..977bed8 100644 --- a/main.c +++ b/main.c @@ -16,12 +16,12 @@ #define TAG_IWORD 8 #define TAG_INSTR 9 -#define COMM 50 +#define COMM 500 #define ITER 50 -#define BS 32 +#define BS 64 #define EMB 20 #define WIN 2 -#define FLPC 0.8 +#define FLPC 1 #define in_range(i, x) (size_t (i) = 0; (i) < (x); (i)++) // I am honestly VERY sorry for this but power corrupts even the best of us @@ -60,7 +60,7 @@ size_t number_of(Role what) { case TOKENIZER: if (g_argc < 2) { INFO_PRINTLN("NOT ENOUGH INPUTS!"); - exit(1); + MPI_Abort(MPI_COMM_WORLD, 1); } return g_argc - 1; case FILTERER: @@ -91,7 +91,7 @@ int role_id_from_mpi_id(Role role, int mid) { int rid = mid - z; if (rid >= number_of(role) || rid < 0) { INFO_PRINTF("%d is not a %d\n", mid, role); - exit(1); + MPI_Abort(MPI_COMM_WORLD, 1); } return rid; } @@ -104,7 +104,19 @@ Role map_node() { base += number_of(r); } INFO_PRINTF("Something went wrong for node %d\n", node); - exit(1); // this is bad + MPI_Abort(MPI_COMM_WORLD, 1); // this is bad +} + +void announce_ready(int dest) { + int me = my_mpi_id(); + MPI_Send(&me, 1, MPI_INT, dest, TAG_READY, MPI_COMM_WORLD); +} + +int wait_for_ready() { + int ready; + MPI_Recv(&ready, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + return ready; } void free_word(Word* w) { @@ -132,50 +144,58 @@ int recv_word(Word* w, int src) { long len; MPI_Status stat; MPI_Recv(&len, 1, MPI_LONG, src, TAG_STLEN, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); + &stat); + int the_src = stat.MPI_SOURCE; if (w->mem < len + 1) { w->mem = len + 1; w->data = realloc(w->data, sizeof(char) * w->mem); } - MPI_Recv(w->data, len + 1, MPI_CHAR, src, TAG_SWORD, MPI_COMM_WORLD, - &stat); - return stat.MPI_SOURCE; + MPI_Recv(w->data, len + 1, MPI_CHAR, the_src, TAG_SWORD, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + return the_src; } void tokenizer(const char* source) { + INFO_PRINTF("Starting tokenizer %d\n", getpid()); WordList wl = {0, 0, NULL}; while (get_tokens(&wl, source)) { for in_range(i, wl.n_words) { + // int tok = wait_for_ready(); send_word(&wl.words[i], mpi_id_from_role_id(FILTERER, 0)); } } - Word terminator = {1, ""}; - send_word(&terminator, mpi_id_from_role_id(FILTERER, 0)); - free_wordlist(&wl); } void filterer() { + INFO_PRINTF("Starting filterer %d\n", getpid()); Word w = {0, NULL}; - const size_t bufsize = 2 * WIN + 1; - long* idx = malloc(bufsize * sizeof(long)); - size_t have = 0; + const size_t num_streams = number_of(TOKENIZER); + const size_t entry_size = 2 * WIN + 1; + const size_t bufsize = num_streams * entry_size; + + long* buffer = malloc(bufsize * sizeof(long)); + size_t* have = calloc(num_streams, sizeof(size_t)); + + int src = 0; // WLOG while (1) { - while (have < bufsize) { - recv_word(&w, role_id_from_mpi_id(TOKENIZER, 0)); - if (!strlen(w.data)) break; - idx[have] = vocab_idx_of(&w); - if (idx[have] != -1) have++; + int stream_offs; + while (have[src] != entry_size) { + // src = rand() % num_streams; + // announce_ready(role_id_from_mpi_id(TOKENIZER, src)); + src = recv_word(&w, MPI_ANY_SOURCE); + src = role_id_from_mpi_id(TOKENIZER, src); + stream_offs = src*entry_size; + buffer[stream_offs + have[src]] = vocab_idx_of(&w); + if (buffer[stream_offs + have[src]] != -1) have[src]++; } - if (!strlen(w.data)) break; - have = 0; - MPI_Send(idx, bufsize, MPI_LONG, mpi_id_from_role_id(BATCHER, 0), + have[src] = 0; + MPI_Send(buffer + stream_offs, entry_size, MPI_LONG, + mpi_id_from_role_id(BATCHER, 0), TAG_IWORD, MPI_COMM_WORLD); } - idx[0] = -1; - MPI_Send(idx, bufsize, MPI_LONG, mpi_id_from_role_id(BATCHER, 0), - TAG_IWORD, MPI_COMM_WORLD); free_word(&w); - free(idx); + free(buffer); + free(have); } void batcher() { @@ -190,17 +210,15 @@ void batcher() { MPI_Recv(l_wid, entry_size, MPI_LONG, mpi_id_from_role_id(FILTERER, 0), TAG_IWORD, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - if (l_wid[0] == -1) break; for in_range(c, entry_size) { batch[r*entry_size + c] = (float)l_wid[c]; } } - if (l_wid[0] == -1) break; - INFO_PRINT("."); + printf("."); MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Send(batch, bufsize, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD); - INFO_PRINTLN("!"); + printf("!\n"); } free(l_wid); free(batch); @@ -242,7 +260,7 @@ void recv_weights(WeightList* wl, int src) { } void learner() { - INFO_PRINTF("Starting slave %d\n", getpid()); + INFO_PRINTF("Starting learner %d\n", getpid()); int me = my_mpi_id(); PyObject* net = create_network(WIN, EMB); @@ -265,7 +283,6 @@ void learner() { MPI_STATUS_IGNORE); step_net(net, batch, BS); } - // printf("%d net: %f\n", my_mpi_id(), eval_net(net)); update_weightlist(&wl, net); send_weights(&wl, mpi_id_from_role_id(DISPATCHER, 0)); } @@ -275,6 +292,7 @@ void learner() { } void dispatcher() { + INFO_PRINTF("Starting dispatcher %d\n", getpid()); PyObject* frank = create_network(WIN, EMB); create_test_dataset(WIN); WeightList wl; @@ -302,7 +320,7 @@ void dispatcher() { } combo_weights(&wl, wls, lpr); set_net_weights(frank, &wl); - // printf("Frank: %f\n", eval_net(frank)); + // INFO_PRINTF("Frank: %f\n", eval_net(frank)); } Py_DECREF(frank); free_weightlist(&wl);