From b2a4681fb20b0adc33b532e1ed4b501dd94e2ae0 Mon Sep 17 00:00:00 2001 From: Pavel Lutskov Date: Sun, 1 Dec 2019 17:49:09 -0800 Subject: [PATCH] implemented some kind of dispatching --- bridge.pyx | 13 +++--- main.c | 119 ++++++++++++++++++++++++++--------------------------- 2 files changed, 67 insertions(+), 65 deletions(-) diff --git a/bridge.pyx b/bridge.pyx index 4b964b0..953cc1b 100644 --- a/bridge.pyx +++ b/bridge.pyx @@ -52,7 +52,9 @@ cdef public int get_tokens(WordList* wl, const char *filename): try: words = next(g) except StopIteration: - return 0 + tokenizers[fnu] = nn.token_generator(fnu) + g = tokenizers[fnu] + words = next(g) words_into_wordlist(wl, words) return 1 @@ -69,16 +71,17 @@ cdef public void f_idx_list_to_print(float* f_idxs, size_t num): idxs = np.asarray(f_idxs).astype(np.int) cdef str pyuni = ' '.join(nn.inv_vocab[i] for i in idxs) print(pyuni) - # cdef bytes b = pyuni.encode('utf-8') - # cdef char* retval = malloc((len(b) + 1) * sizeof(char)) - # retval[len(b)] = 0 - # return retval cdef public void debug_print(object o): eprint(o) +cdef public void randidx(int* idx, size_t l, size_t how_much): + i_np = np.random.choice(l, how_much, replace=False).astype(np.intc) + memcpy(idx, PyArray_DATA(i_np), how_much * sizeof(int)) + + cdef public object create_network(int win, int embed): try: return nn.create_cbow_network(win, embed) diff --git a/main.c b/main.c index ae6c4c4..2646b98 100644 --- a/main.c +++ b/main.c @@ -14,13 +14,14 @@ #define TAG_STLEN 6 #define TAG_SWORD 7 #define TAG_IWORD 8 +#define TAG_INSTR 9 -#define COMM 10 -#define ITER 100 +#define COMM 50 +#define ITER 50 #define BS 32 #define EMB 20 #define WIN 2 -#define FSPC 1 +#define FLPC 0.8 #define in_range(i, x) (size_t (i) = 0; (i) < (x); (i)++) // I am honestly VERY sorry for this but power corrupts even the best of us @@ -29,15 +30,17 @@ do { fprintf(stderr, fmt, __VA_ARGS__); } while(0) #define INFO_PRINTLN(what) \ do { fprintf(stderr, "%s\n", what); } while(0) +#define INFO_PRINT(what) \ + do { fprintf(stderr, "%s", what); } while(0) -// char_stream -> tokenize -> word_strem -> filter + batch -> slave network +int g_argc = 1; typedef enum{ TOKENIZER, FILTERER, BATCHER, LEARNER, - MASTER + DISPATCHER } Role; int world_size() { @@ -55,7 +58,11 @@ int my_mpi_id() { size_t number_of(Role what) { switch (what) { case TOKENIZER: - return 1; + if (g_argc < 2) { + INFO_PRINTLN("NOT ENOUGH INPUTS!"); + exit(1); + } + return g_argc - 1; case FILTERER: return 1; case BATCHER: @@ -65,10 +72,9 @@ size_t number_of(Role what) { - number_of(TOKENIZER) - number_of(FILTERER) - number_of(BATCHER) - - number_of(MASTER); - case MASTER: - return 0; -#warning "set to real number of masters!" + - number_of(DISPATCHER); + case DISPATCHER: + return 1; } } @@ -93,10 +99,11 @@ int role_id_from_mpi_id(Role role, int mid) { Role map_node() { int node = my_mpi_id(); size_t base = 0; - for (Role r = TOKENIZER; r <= MASTER; r++) { + for (Role r = TOKENIZER; r <= DISPATCHER; r++) { if (node < number_of(r) + base) return r; base += number_of(r); } + INFO_PRINTF("Something went wrong for node %d\n", node); exit(1); // this is bad } @@ -121,8 +128,9 @@ void send_word(Word* w, int dest) { MPI_Send(w->data, len + 1, MPI_CHAR, dest, TAG_SWORD, MPI_COMM_WORLD); } -void recv_word(Word* w, int src) { +int recv_word(Word* w, int src) { long len; + MPI_Status stat; MPI_Recv(&len, 1, MPI_LONG, src, TAG_STLEN, MPI_COMM_WORLD, MPI_STATUS_IGNORE); if (w->mem < len + 1) { @@ -130,7 +138,8 @@ void recv_word(Word* w, int src) { w->data = realloc(w->data, sizeof(char) * w->mem); } MPI_Recv(w->data, len + 1, MPI_CHAR, src, TAG_SWORD, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); + &stat); + return stat.MPI_SOURCE; } void tokenizer(const char* source) { @@ -170,8 +179,6 @@ void filterer() { } void batcher() { - // Reads some data and converts it to a float array - // INFO_PRINTF("Starting batcher %d\n", getpid()); int s = 0; const size_t entry_size = 2 * WIN + 1; const size_t bufsize = BS * entry_size; @@ -189,10 +196,11 @@ void batcher() { } } if (l_wid[0] == -1) break; - + INFO_PRINT("."); MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Send(batch, bufsize, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD); + INFO_PRINTLN("!"); } free(l_wid); free(batch); @@ -206,7 +214,7 @@ void free_weightlist(WeightList* wl) { free(wl->weights); } -void send_weights(const WeightList* wl, int dest, int tag) { +void send_weights(const WeightList* wl, int dest) { // This assumes that the receiving end knows exactly // the number of elements being sent and has memory ready // for it. @@ -215,11 +223,12 @@ void send_weights(const WeightList* wl, int dest, int tag) { for in_range(k, wl->weights[i].dims) { n_el *= wl->weights[i].shape[k]; } - MPI_Send(wl->weights[i].W, n_el, MPI_FLOAT, dest, tag, MPI_COMM_WORLD); + MPI_Send(wl->weights[i].W, n_el, MPI_FLOAT, dest, + TAG_WEIGH, MPI_COMM_WORLD); } } -void recv_weights(WeightList* wl, int src, int tag) { +void recv_weights(WeightList* wl, int src) { // This assumes that the sender sends stuff that is going // to fit into memory in correct order too. for in_range(i, wl->n_weights) { @@ -227,17 +236,12 @@ void recv_weights(WeightList* wl, int src, int tag) { for in_range(d, wl->weights[i].dims) { n_el *= wl->weights[i].shape[d]; } - MPI_Recv(wl->weights[i].W, n_el, MPI_FLOAT, src, tag, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); + MPI_Recv(wl->weights[i].W, n_el, MPI_FLOAT, src, + TAG_WEIGH, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } } void learner() { - // 0. Announce readiness? - // 1. Receive weights from master ([ ] has to know its master) - // 2. Request batch from reader ([ ] has to choose a reader) - // 3. Do computations - // 4. Send weights back to master INFO_PRINTF("Starting slave %d\n", getpid()); int me = my_mpi_id(); @@ -251,6 +255,8 @@ void learner() { float* batch = malloc(bufsize * sizeof(float)); for in_range(i, COMM) { + recv_weights(&wl, mpi_id_from_role_id(DISPATCHER, 0)); + set_net_weights(net, &wl); for in_range(k, ITER) { MPI_Send(&me, 1, MPI_INT, mpi_id_from_role_id(BATCHER, 0), TAG_READY, MPI_COMM_WORLD); @@ -259,61 +265,50 @@ void learner() { MPI_STATUS_IGNORE); step_net(net, batch, BS); } - printf("%d net: %f\n", my_mpi_id(), eval_net(net)); + // printf("%d net: %f\n", my_mpi_id(), eval_net(net)); update_weightlist(&wl, net); + send_weights(&wl, mpi_id_from_role_id(DISPATCHER, 0)); } Py_DECREF(net); free_weightlist(&wl); + free(batch); } -void master_node() { - // 0. Initialize model - - // 1. Send it to some slaves for processing (synchronous) - // 2. Receive weights back (synchronous) - // 3. Average the weights - - +void dispatcher() { PyObject* frank = create_network(WIN, EMB); + create_test_dataset(WIN); WeightList wl; init_weightlist_like(&wl, frank); update_weightlist(&wl, frank); - int spr = number_of(LEARNER) * FSPC; // Slaves per round - int s; + int lpr = number_of(LEARNER) * FLPC; // Learners per round - WeightList *wls = malloc(sizeof(WeightList) * spr); - int *handles = malloc(sizeof(int) * spr); + WeightList *wls = malloc(sizeof(WeightList) * lpr); + int *round = malloc(sizeof(int) * lpr); - for in_range(i, spr) { + for in_range(i, lpr) { init_weightlist_like(wls + i, frank); } for in_range(i, COMM) { + randidx(round, number_of(LEARNER), lpr); - for in_range(k, spr) { - MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - send_weights(&wl, s, TAG_WEIGH); - handles[k] = s; + for in_range(k, lpr) { + // INFO_PRINTF(" %5d", round[k]); + send_weights(&wl, mpi_id_from_role_id(LEARNER, round[k])); } - for in_range(k, spr) { - recv_weights(wls + k, handles[k], TAG_WEIGH); + // INFO_PRINTLN(""); + for in_range(k, lpr) { + recv_weights(wls + k, mpi_id_from_role_id(LEARNER, round[k])); } - combo_weights(&wl, wls, spr); + combo_weights(&wl, wls, lpr); set_net_weights(frank, &wl); - printf("Frank: %f\n", eval_net(frank)); + // printf("Frank: %f\n", eval_net(frank)); } Py_DECREF(frank); free_weightlist(&wl); - for in_range(i, spr) free_weightlist(wls + i); + for in_range(i, lpr) free_weightlist(wls + i); free(wls); - // if (role_id_from_mpi_id(my_mpi_id(), MASTER) == 0) { - // for in_range(r, number_of(BATCHER)) { - // int stop = -1; - // MPI_Send(&stop, 1, MPI_INT, reader_id(r), TAG_READY, - // MPI_COMM_WORLD); - // } - // } + free(round); } int main (int argc, const char **argv) { @@ -326,9 +321,12 @@ int main (int argc, const char **argv) { PyObject* bridge_module = PyImport_ImportModule("bridge"); // Actual Code + int role_id; + g_argc = argc; switch (map_node()) { case TOKENIZER: - tokenizer(argv[1]); + role_id = role_id_from_mpi_id(TOKENIZER, my_mpi_id()); + tokenizer(argv[role_id + 1]); break; case FILTERER: filterer(); @@ -339,10 +337,11 @@ int main (int argc, const char **argv) { case LEARNER: learner(); break; + case DISPATCHER: + dispatcher(); + break; default: INFO_PRINTLN("DYING HORRIBLY!"); - // case SLAVE: slave_node(); break; - // case MASTER: master_node(); break; } // Finalizing Boilerplate