From 7043b65532f5200d17d12e178979d2d9db83f0bd Mon Sep 17 00:00:00 2001 From: Pavel Lutskov Date: Wed, 11 Dec 2019 10:31:16 -0800 Subject: [PATCH] this is the baseline for evaluation --- bridge.pyx | 8 +++++--- library.py | 31 +++++++---------------------- main.c | 58 +++++++++++++++++++++++++++++++++++++----------------- 3 files changed, 52 insertions(+), 45 deletions(-) diff --git a/bridge.pyx b/bridge.pyx index e4dbc75..09a9120 100644 --- a/bridge.pyx +++ b/bridge.pyx @@ -97,7 +97,9 @@ cdef public void randidx(int* idx, size_t l, size_t how_much): cdef public object create_network(int win, int embed): try: - return nn.create_cbow_network(win, embed) + net = nn.create_cbow_network(win, embed) + eprint(net) + return net except Exception as e: eprint(e) @@ -169,7 +171,7 @@ cdef tuple cbow_batch( ): win = net.input_shape[1] // 2 batch_np = np.asarray(batch) - X_np = np.concatenate([batch_np[:, :win], batch_np[:, win+1:]], axis=1) + X_np = batch_np[:, [*range(win), *range(win+1, win+win+1)]] y_np = nn.onehot(batch_np[:, win], nc=len(nn.vocab)) return X_np, y_np @@ -192,7 +194,7 @@ cdef void words_into_wordlist(WordList* wl, list words): wl.words = realloc(wl.words, wl.mem * sizeof(Word)) for i in range(old, wl.mem): wl.words[i].mem = 0 - wl.words[i].data = 0 + wl.words[i].data = NULL wl.n_words = len(words) for i, w in enumerate(words): diff --git a/library.py b/library.py index c49eaad..11ca407 100644 --- a/library.py +++ b/library.py @@ -2,10 +2,9 @@ import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' import numpy as np -import flask import tensorflow as tf tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) # STFU! -from nltk.tokenize import word_tokenize as wt +tf.random.set_random_seed(42) from mynet import onehot @@ -14,6 +13,7 @@ HERE = os.path.abspath(os.path.dirname(__file__)) DATA = os.path.join(HERE, 'data') CORPUS = os.path.join(DATA, 'corpus.txt') VOCAB = os.path.join(DATA, 'vocab.txt') +TEST = os.path.join(DATA, 'test.txt') vocab = { w: i for i, w in enumerate(open(VOCAB).read().splitlines(keepends=False)) @@ -21,35 +21,18 @@ vocab = { inv_vocab = sorted(vocab, key=vocab.get) -app = flask.Flask(__name__) - - -@app.route('/') -def webfront(): - return 'Hello world!' - - def word_tokenize(s: str): l = ''.join(c.lower() if c.isalpha() else ' ' for c in s) return l.split() def create_test_dataset(win): - S = 1000 - with open(CORPUS) as f: - ds = np.array([vocab[w] for w in word_tokenize(f.read()) - if w in vocab]) - idx = np.random.choice(np.arange(win, len(ds) - win), S) - return ( - # X - np.stack([ - np.concatenate([ds[i-win:i], ds[i+1:i+win+1]]) - for i in idx - ], axis=0).astype(np.float32), + test_dataset = np.vectorize(vocab.get)(np.genfromtxt(TEST, dtype=str)) + assert test_dataset.shape[1] == 2*win + 1 + X_test = test_dataset[:, [*range(0, win), *range(win+1, win+win+1)]] + y_test = onehot(test_dataset[:, win], nc=len(vocab)) + return X_test, y_test - #y - onehot(ds[idx], nc=len(vocab)) - ) def create_mnist_network(): model = tf.keras.models.Sequential([ diff --git a/main.c b/main.c index b85a76f..54a3472 100644 --- a/main.c +++ b/main.c @@ -16,14 +16,14 @@ #define TAG_IWORD 8 #define TAG_INSTR 9 -#define COMM 500 -#define ITER 50 -#define BS 64 -#define EMB 20 +#define COMM 25 +#define ITER 690 +#define BS 32 +#define EMB 32 #define WIN 2 #define FLPC 1 -#define in_range(i, x) (size_t (i) = 0; (i) < (x); (i)++) +#define in_range(i, x) (size_t i = 0; i < (x); i++) // I am honestly VERY sorry for this but power corrupts even the best of us #define INFO_PRINTF(fmt, ...) \ @@ -33,7 +33,7 @@ #define INFO_PRINT(what) \ do { fprintf(stderr, "%s", what); } while(0) -int g_argc = 1; +int g_argc; // sorry! typedef enum{ TOKENIZER, @@ -76,7 +76,7 @@ size_t number_of(Role what) { - number_of(DISPATCHER) - number_of(VISUALIZER); case VISUALIZER: - return 1; + return 0; case DISPATCHER: return 1; } @@ -109,6 +109,7 @@ Role map_node() { } INFO_PRINTF("Something went wrong for node %d\n", node); MPI_Abort(MPI_COMM_WORLD, 1); // this is bad + return -1; // Not going to happen anyway (i hope) } void announce_ready(int dest) { @@ -144,6 +145,12 @@ void send_word(Word* w, int dest) { MPI_Send(w->data, len + 1, MPI_CHAR, dest, TAG_SWORD, MPI_COMM_WORLD); } +void ssend_word(Word* w, int dest) { + long len = strlen(w->data); + MPI_Ssend(&len, 1, MPI_LONG, dest, TAG_STLEN, MPI_COMM_WORLD); + MPI_Ssend(w->data, len + 1, MPI_CHAR, dest, TAG_SWORD, MPI_COMM_WORLD); +} + int recv_word(Word* w, int src) { long len; MPI_Status stat; @@ -162,10 +169,16 @@ int recv_word(Word* w, int src) { void tokenizer(const char* source) { INFO_PRINTF("Starting tokenizer %d\n", getpid()); WordList wl = {0, 0, NULL}; + size_t sync_ctr = 0; while (get_tokens(&wl, source)) { for in_range(i, wl.n_words) { - // int tok = wait_for_ready(); - send_word(&wl.words[i], mpi_id_from_role_id(FILTERER, 0)); + if (sync_ctr == 1000) { + ssend_word(&wl.words[i], mpi_id_from_role_id(FILTERER, 0)); + sync_ctr = 0; + } else { + send_word(&wl.words[i], mpi_id_from_role_id(FILTERER, 0)); + } + sync_ctr++; } } } @@ -184,8 +197,6 @@ void filterer() { while (1) { int stream_offs; while (have[src] != entry_size) { - // src = rand() % num_streams; - // announce_ready(role_id_from_mpi_id(TOKENIZER, src)); src = recv_word(&w, MPI_ANY_SOURCE); src = role_id_from_mpi_id(TOKENIZER, src); stream_offs = src*entry_size; @@ -218,7 +229,6 @@ void batcher() { batch[r*entry_size + c] = (float)l_wid[c]; } } - printf("."); MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Send(batch, bufsize, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD); @@ -304,13 +314,16 @@ void dispatcher() { update_weightlist(&wl, frank); int lpr = number_of(LEARNER) * FLPC; // Learners per round - WeightList *wls = malloc(sizeof(WeightList) * lpr); - int *round = malloc(sizeof(int) * lpr); - for in_range(i, lpr) { init_weightlist_like(wls + i, frank); } + int *round = malloc(sizeof(int) * lpr); + + float first_loss = eval_net(frank); + float crt_loss = first_loss; + float min_loss = crt_loss; + time_t start = time(NULL); for in_range(i, COMM) { randidx(round, number_of(LEARNER), lpr); @@ -324,8 +337,17 @@ void dispatcher() { } combo_weights(&wl, wls, lpr); set_net_weights(frank, &wl); - INFO_PRINTF("Frank: %f\n", eval_net(frank)); + crt_loss = eval_net(frank); + min_loss = crt_loss < min_loss ? crt_loss : min_loss; + INFO_PRINTF("Round %ld, validation loss %f\n", i, crt_loss); } + time_t finish = time(NULL); + float delta_t = finish - start; + float delta_l = first_loss - eval_net(frank); + INFO_PRINTF( + "Laptop MPI sgd consecutive_batch W%d E%d BS%d R%d bpe%d LPR%d," + "%f,%f,%f\n", WIN, EMB, BS, COMM, ITER, lpr, + delta_l / COMM, delta_l / delta_t, min_loss); Py_DECREF(frank); free_weightlist(&wl); for in_range(i, lpr) free_weightlist(wls + i); @@ -338,10 +360,11 @@ void visualizer() { serve(); } - int main (int argc, const char **argv) { MPI_Init(NULL, NULL); + g_argc = argc; + // Cython Boilerplate PyImport_AppendInittab("bridge", PyInit_bridge); Py_Initialize(); @@ -350,7 +373,6 @@ int main (int argc, const char **argv) { // Actual Code int role_id; - g_argc = argc; switch (map_node()) { case TOKENIZER: role_id = role_id_from_mpi_id(TOKENIZER, my_mpi_id());