From 79e8b9874b503c6346bec6ccfd65070e87d25a5e Mon Sep 17 00:00:00 2001 From: Pavel Lutskov Date: Fri, 13 Dec 2019 17:35:06 -0800 Subject: [PATCH] for some reason I dockerized it and it works --- .gitignore | 4 +- Dockerfile | 9 +++++ README.md | 108 ++++++++++++++++++++++++++++++++++++++++++++++++++++ bridge.pyx | 9 ----- library.py | 38 +++++++++++------- main.c | 7 +--- meson.build | 15 +++++++- 7 files changed, 158 insertions(+), 32 deletions(-) create mode 100644 Dockerfile create mode 100644 README.md diff --git a/.gitignore b/.gitignore index 5d8f7d5..349876d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,8 +2,8 @@ .DS_Store run compile_commands.json -cfg.json build/ trained/ __pycache__/ -data_*/ +data +.dockerignore diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d018af6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,9 @@ +from ubuntu:18.04 + +RUN apt -y update && \ + apt -y install build-essential pkg-config ninja-build python3 python3-pip python3-dev mpich +RUN pip3 install meson numpy tensorflow flask cython +RUN mkdir /workspace +COPY bridge.pyx library.py server.py meson.build main.c /workspace/ +RUN cd /workspace && meson build && cd build && ninja +WORKDIR /workspace diff --git a/README.md b/README.md new file mode 100644 index 0000000..7c8a454 --- /dev/null +++ b/README.md @@ -0,0 +1,108 @@ +# Implementation of Federated Averaging with MPI, Keras and Cython + +(_for educational purposes_) + +## What's it doing? + +The system implemented in this project learns word embeddings with CBOW +approach, and furthermore, tries to do it in a distributed fashion. There are +two flavors of distribution present here: + +1. Reading tokens (words) from a source (a text file for now), filtering and + looking up vocabulary indices for words, windowing and batching are all +implemented in separate processes and form an *input pipeline*. +2. Neural Network training is done in parallel across several nodes + (*learners*), with the learned weights periodically gathered, averaged and + distributed by the central node, a.k.a. *dispatcher*. + +In this framework each learner can have its own input pipeline or all learners +can tap a single input pipeline or something in between can also work. It's not +possible in current version for one learner to tap more than one pipeline +though. + +## How to make this work + +### Requirements + +* A recent UNIX-y system +* A recent GCC (default macOS clang also seems to work) +* MPICH 3 +* Python 3.6 with dev headers and libraries (e.g. `python3-dev` on Ubuntu) +* Meson and ninja for building +* TensorFlow 1.14 +* flask +* Cython + +### Compiling + +Compilation is supposed to be as simple as: (run in project root) + +```sh +meson build && cd build && ninja +``` + +If this fails then either fix it yourself or let me know I guess. + +### Running + +Now this isn't without some quirks (due to this being a course project and +all). First you have to run *FROM PROJECT ROOT* using the following command +(don't run it yet as there are more instructions coming): + +```sh +mpiexec -n NUM_PROC ./build/fedavg_mpi /path/to/training/data/textfile{1,2,3} +``` + +This program **expects a couple of things**: + +First, **in the project root** create a directory `data` and put in there +the following three files: +- `vocab.txt` -- a whitespace-separated list of words, for which the embeddings + will be learned. The words can only contain lowercase alphabetic ASCII chars +(you can try lowercase UTF-8 and see what happens but no guarantees here). +- `test.txt` -- a testing dataset with context windows of size 5, one line per + window. The central (so third) word in the context window will be used as the +target and the surrounding words as the source. The same requirements apply +here as for the vocabulary, and furthermore only words present in the +`vocab.txt` are allowed in `test.txt`. This file will be used to track the loss +of the network during training. An example of the `test.txt` format. + +``` +the quick brown fox jumped +over a lazy dog padword +``` + +There also needs to be a file `cfg.json` **in the project root** containing the +following fields: + +* `"data"`: `some_name` -- the name of the directory in which you put +`vocab.txt` and `test.txt`; +* `"bpe"`: Number of independent learner SGD iterations per communication + round; +* `"bs"`: batch size (the number of context windows in a batch); +* `"target"`: The float value for the loss that you want to achieve, once the +network reaches this loss it will stop training, save the embeddings and exit. + +Then, for each training data file passed as an argument (these can reside +wherever you want them to), an input pipeline will be constructed in the +program. There are 3 nodes in the input pipeline (tokenizer, filter, batcher). +Then there's this rule that one learner isn't allowed to tap more than one +pipeline, so each pipeline will need at least one learner. There also needs to +be a dispatcher process and a visualizer process. + +**TLDR:** The formula for the number of processes that you need to request from +`mpiexec -n` looks like this: + +``` +NUM_PROCS >= 4*num_data_files + 2 +``` + +There is also a convenient (well, somewhat) formula to determine how many +learners you will get depending on the arguments you passed: + +``` +learners = NUM_PROCS - 2 - 3*num_data_files +``` + +The good thing is, the program will complain if it doesn't like the numbers you +passed it and tell you how to fix it. diff --git a/bridge.pyx b/bridge.pyx index b01e7a8..eb50114 100644 --- a/bridge.pyx +++ b/bridge.pyx @@ -47,11 +47,6 @@ cdef public void serve(): srv.serve() -cdef public void bump_count(): - eprint(f'bumping count from {srv.counter} to {srv.counter + 1}') - srv.counter += 1 - - cdef public size_t getwin(): return nn.WIN @@ -72,10 +67,6 @@ cdef public float gettarget(): return nn.CFG['target'] -cdef public float getflpc(): - return nn.CFG['flpc'] - - cdef public int get_tokens(WordList* wl, const char *filename): fnu = filename.decode('utf-8') if fnu not in tokenizers: diff --git a/library.py b/library.py index 56b6a8f..96447ea 100644 --- a/library.py +++ b/library.py @@ -1,9 +1,8 @@ import os import json +from sys import stderr os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' -from mynet import onehot - WIN = 2 EMB = 32 @@ -11,21 +10,27 @@ EMB = 32 HERE = os.path.abspath(os.path.dirname(__file__)) -def read_cfg(): - with open(os.path.join(HERE, 'cfg.json')) as f: - return json.load(f) - - -CFG = read_cfg() -DATA = os.path.join(HERE, CFG['data']) +DATA = os.path.join(HERE, 'data') RESULTS = os.path.join(HERE, 'trained') CORPUS = os.path.join(DATA, 'corpus.txt') VOCAB = os.path.join(DATA, 'vocab.txt') TEST = os.path.join(DATA, 'test.txt') +if not os.path.exists(RESULTS): + os.mkdir(RESULTS) + + +def read_cfg(): + with open(os.path.join(DATA, 'cfg.json'), encoding='utf-8') as f: + return json.load(f) + + +CFG = read_cfg() + + def read_vocab_list(): - with open(VOCAB) as f: + with open(VOCAB, encoding='utf-8') as f: return f.read().split() @@ -41,6 +46,13 @@ def word_tokenize(s: str): return l.split() +def onehot(a, nc=10): + import numpy as np + oh = np.zeros((len(a), nc), dtype=np.float32) + oh[np.arange(len(a)), a.flatten().astype(np.int)] = 1 + return oh + + def create_test_dataset(): import numpy as np test_dataset = np.vectorize(vocab.get)(np.genfromtxt(TEST, dtype=str)) @@ -89,7 +101,7 @@ def eval_network(net): def token_generator(filename): - with open(filename) as f: + with open(filename, encoding='utf-8') as f: for l in f: if not l.isspace(): tok = word_tokenize(l) @@ -103,8 +115,8 @@ def get_embeddings(net): def save_embeddings(emb): import numpy as np - np.savetxt(os.path.join(RESULTS, f'embeddings_{CFG["data"]}.csv'), emb) + np.savetxt(os.path.join(RESULTS, f'embeddings_{CFG["name"]}.csv'), emb) def ckpt_network(net): - net.save_weights(os.path.join(RESULTS, f'model_ckpt_{CFG["data"]}.h5')) + net.save_weights(os.path.join(RESULTS, f'model_ckpt_{CFG["name"]}.h5')) diff --git a/main.c b/main.c index 0c50d31..e2f7556 100644 --- a/main.c +++ b/main.c @@ -368,14 +368,13 @@ void dispatcher() { size_t bs = getbs(); size_t bpe = getbpe(); float target = gettarget(); - float flpc = getflpc(); PyObject* frank = create_network(); WeightList wl; init_weightlist_like(&wl, frank); update_weightlist(&wl, frank); - int lpr = number_of(LEARNER) * flpc; // Learners per round + int lpr = number_of(LEARNER); WeightList *wls = malloc(sizeof(WeightList) * lpr); for in_range(i, lpr) { init_weightlist_like(wls + i, frank); @@ -445,10 +444,6 @@ void dispatcher() { void visualizer() { INFO_PRINTF("Starting visualizer %d\n", getpid()); serve(); - while (1) { - sleep(1); - bump_count(); - } } int main (int argc, const char **argv) { diff --git a/meson.build b/meson.build index ab9d386..5c086ab 100644 --- a/meson.build +++ b/meson.build @@ -1,10 +1,21 @@ project('fedavg_mpi', 'c') -add_global_arguments('-Wno-unused-command-line-argument', language: 'c') + add_project_arguments( '-DNPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION', language: 'c' ) +compiler = meson.get_compiler('c') +if compiler.has_argument('-Wno-unused-command-line-argument') + add_global_arguments('-Wno-unused-command-line-argument', language: 'c') +endif + +if compiler.has_link_argument('-Wl,-w') + add_link_args = ['-Wl,-w'] +else + add_link_args = [] +endif + mpi = dependency('mpi') python = dependency('python3') numpy_header = include_directories(run_command( @@ -21,4 +32,4 @@ executable('fedavg_mpi', 'main.c', bridge, dependencies: [mpi, python], include_directories: numpy_header, - link_args: '-Wl,-w') + link_args: add_link_args)