From 966bbc904c36d8344b562b3cf64ebb0dec141cb7 Mon Sep 17 00:00:00 2001 From: Pavel Lutskov Date: Thu, 12 Dec 2019 20:02:03 -0800 Subject: [PATCH] mainly faster startup thanks to 'lazy' loading --- .gitignore | 2 +- library.py | 14 +++++++++----- main.c | 43 +++++++++++++++++++++++++------------------ meson.build | 4 ---- 4 files changed, 35 insertions(+), 28 deletions(-) diff --git a/.gitignore b/.gitignore index 1f25dbf..0e41c6f 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,4 @@ compile_commands.json build/ cythoned/ __pycache__/ -data/ +data_*/ diff --git a/library.py b/library.py index c4b0485..f0f5b32 100644 --- a/library.py +++ b/library.py @@ -1,11 +1,6 @@ import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' -import numpy as np -import tensorflow as tf -tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) # STFU! -tf.random.set_random_seed(42) - from mynet import onehot @@ -27,6 +22,7 @@ def word_tokenize(s: str): def create_test_dataset(win): + import numpy as np test_dataset = np.vectorize(vocab.get)(np.genfromtxt(TEST, dtype=str)) assert test_dataset.shape[1] == 2*win + 1 X_test = test_dataset[:, [*range(0, win), *range(win+1, win+win+1)]] @@ -35,6 +31,10 @@ def create_test_dataset(win): def create_mnist_network(): + import tensorflow as tf + tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) # STFU! + tf.random.set_random_seed(42) + model = tf.keras.models.Sequential([ tf.keras.layers.Dense(30, input_shape=(784,), activation='relu'), tf.keras.layers.Dense(10, activation='softmax') @@ -45,6 +45,10 @@ def create_mnist_network(): def create_cbow_network(win, embed): + import tensorflow as tf + tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) # STFU! + tf.random.set_random_seed(42) + ctxt = tf.keras.layers.Input(shape=[2*win]) ed = tf.keras.layers.Embedding(len(vocab), embed, input_length=2*win)(ctxt) cbow = tf.keras.layers.Lambda(lambda x: tf.reduce_mean(x, axis=1))(ed) diff --git a/main.c b/main.c index 97ef736..513bd59 100644 --- a/main.c +++ b/main.c @@ -17,8 +17,9 @@ #define TAG_INSTR 9 #define TAG_TERMT 10 -#define COMM 25 -#define ITER 100 +#define COMM 50 +#define ITER 250 +#define TARGET 8.40 #define BS 32 #define EMB 32 #define WIN 2 @@ -213,8 +214,8 @@ void tokenizer(const char* source) { void filterer() { INFO_PRINTF("Starting filterer %d\n", getpid()); int rid = my_role_id(FILTERER); - int prev = mpi_id_from_role_id(TOKENIZER, rid); - int next = mpi_id_from_role_id(BATCHER, rid); + int tokenizer = mpi_id_from_role_id(TOKENIZER, rid); + int batcher = mpi_id_from_role_id(BATCHER, rid); Word w = {0, NULL}; const size_t window_size = 2 * WIN + 1; @@ -223,7 +224,7 @@ void filterer() { while (1) { while (have != window_size) { - recv_word(&w, prev); + recv_word(&w, tokenizer); if (!strlen(w.data)) break; @@ -234,10 +235,10 @@ void filterer() { if (!strlen(w.data)) break; have = 0; - send_window(window, window_size, next); + send_window(window, window_size, batcher); } window[0] = -1; - send_window(window, window_size, next); + send_window(window, window_size, batcher); free_word(&w); free(window); INFO_PRINTF("Finishing filterer %d\n", getpid()); @@ -246,7 +247,7 @@ void filterer() { void batcher() { INFO_PRINTF("Starting batcher %d\n", getpid()); int rid = my_role_id(BATCHER); - int prev = mpi_id_from_role_id(FILTERER, rid); + int tokenizer = mpi_id_from_role_id(FILTERER, rid); int learner_mpi_id = 0; const size_t window_size = 2 * WIN + 1; @@ -256,7 +257,7 @@ void batcher() { while (1) { for in_range(r, BS) { - recv_window(l_wid, window_size, prev); + recv_window(l_wid, window_size, tokenizer); if (l_wid[0] == -1) break; @@ -381,7 +382,8 @@ void dispatcher() { float crt_loss = first_loss; float min_loss = crt_loss; time_t start = time(NULL); - for in_range(i, COMM) { + size_t rounds = 0; + while (crt_loss > TARGET) { randidx(round, number_of(LEARNER), lpr); for in_range(k, lpr) { // Instruct learners to learn @@ -397,8 +399,10 @@ void dispatcher() { set_net_weights(frank, &wl); crt_loss = eval_net(frank); min_loss = crt_loss < min_loss ? crt_loss : min_loss; - INFO_PRINTF("Round %ld, validation loss %f\n", i, crt_loss); + INFO_PRINTF("Round %ld, validation loss %f\n", rounds, crt_loss); + rounds++; } + time_t finish = time(NULL); go = -1; for in_range(t, number_of(TOKENIZER)) { @@ -410,14 +414,17 @@ void dispatcher() { TAG_INSTR, MPI_COMM_WORLD); } - time_t finish = time(NULL); float delta_t = finish - start; float delta_l = first_loss - crt_loss; INFO_PRINTF( - "Laptop MPI adam consecutive_batch W%d E%d " - "BS%d R%d bpe%d LPR%d pp%d," - "%f,%f,%f\n", WIN, EMB, BS, COMM, ITER, lpr, g_argc - 1, - delta_l / COMM, delta_l / delta_t, min_loss); + "Laptop MPI adam consecutive_batch " + "W%d E%d BS%d bpe%d LPR%d pp%lu," + "%f,%f,%f,%f," + "%lu,%.0f,%lu\n", + WIN, EMB, BS, ITER, lpr, number_of(TOKENIZER), + delta_l/rounds, delta_l/delta_t, min_loss, TARGET, + rounds, delta_t,BS*ITER*rounds + ); Py_DECREF(frank); free_weightlist(&wl); for in_range(i, lpr) free_weightlist(wls + i); @@ -440,9 +447,9 @@ int main (int argc, const char **argv) { MPI_Abort(MPI_COMM_WORLD, 1); } int pipelines = argc - 1; - int min_nodes = 3 * pipelines + 2; + int min_nodes = 4 * pipelines + 1; if (world_size() < min_nodes) { - INFO_PRINTF("You requested %d pipelines " + INFO_PRINTF("You requested %d pipeline(s) " "but only provided %d procs " "(%d required)\n", pipelines, world_size(), min_nodes); diff --git a/meson.build b/meson.build index 7e2ad46..ab9d386 100644 --- a/meson.build +++ b/meson.build @@ -22,7 +22,3 @@ executable('fedavg_mpi', dependencies: [mpi, python], include_directories: numpy_header, link_args: '-Wl,-w') - -run_command('cp', - meson.current_build_dir() + 'compile_commands.json', - meson.current_source_dir())