for some reason I dockerized it and it works
This commit is contained in:
4
.gitignore
vendored
4
.gitignore
vendored
@@ -2,8 +2,8 @@
|
|||||||
.DS_Store
|
.DS_Store
|
||||||
run
|
run
|
||||||
compile_commands.json
|
compile_commands.json
|
||||||
cfg.json
|
|
||||||
build/
|
build/
|
||||||
trained/
|
trained/
|
||||||
__pycache__/
|
__pycache__/
|
||||||
data_*/
|
data
|
||||||
|
.dockerignore
|
||||||
|
|||||||
9
Dockerfile
Normal file
9
Dockerfile
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
from ubuntu:18.04
|
||||||
|
|
||||||
|
RUN apt -y update && \
|
||||||
|
apt -y install build-essential pkg-config ninja-build python3 python3-pip python3-dev mpich
|
||||||
|
RUN pip3 install meson numpy tensorflow flask cython
|
||||||
|
RUN mkdir /workspace
|
||||||
|
COPY bridge.pyx library.py server.py meson.build main.c /workspace/
|
||||||
|
RUN cd /workspace && meson build && cd build && ninja
|
||||||
|
WORKDIR /workspace
|
||||||
108
README.md
Normal file
108
README.md
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
# Implementation of Federated Averaging with MPI, Keras and Cython
|
||||||
|
|
||||||
|
(_for educational purposes_)
|
||||||
|
|
||||||
|
## What's it doing?
|
||||||
|
|
||||||
|
The system implemented in this project learns word embeddings with CBOW
|
||||||
|
approach, and furthermore, tries to do it in a distributed fashion. There are
|
||||||
|
two flavors of distribution present here:
|
||||||
|
|
||||||
|
1. Reading tokens (words) from a source (a text file for now), filtering and
|
||||||
|
looking up vocabulary indices for words, windowing and batching are all
|
||||||
|
implemented in separate processes and form an *input pipeline*.
|
||||||
|
2. Neural Network training is done in parallel across several nodes
|
||||||
|
(*learners*), with the learned weights periodically gathered, averaged and
|
||||||
|
distributed by the central node, a.k.a. *dispatcher*.
|
||||||
|
|
||||||
|
In this framework each learner can have its own input pipeline or all learners
|
||||||
|
can tap a single input pipeline or something in between can also work. It's not
|
||||||
|
possible in current version for one learner to tap more than one pipeline
|
||||||
|
though.
|
||||||
|
|
||||||
|
## How to make this work
|
||||||
|
|
||||||
|
### Requirements
|
||||||
|
|
||||||
|
* A recent UNIX-y system
|
||||||
|
* A recent GCC (default macOS clang also seems to work)
|
||||||
|
* MPICH 3
|
||||||
|
* Python 3.6 with dev headers and libraries (e.g. `python3-dev` on Ubuntu)
|
||||||
|
* Meson and ninja for building
|
||||||
|
* TensorFlow 1.14
|
||||||
|
* flask
|
||||||
|
* Cython
|
||||||
|
|
||||||
|
### Compiling
|
||||||
|
|
||||||
|
Compilation is supposed to be as simple as: (run in project root)
|
||||||
|
|
||||||
|
```sh
|
||||||
|
meson build && cd build && ninja
|
||||||
|
```
|
||||||
|
|
||||||
|
If this fails then either fix it yourself or let me know I guess.
|
||||||
|
|
||||||
|
### Running
|
||||||
|
|
||||||
|
Now this isn't without some quirks (due to this being a course project and
|
||||||
|
all). First you have to run *FROM PROJECT ROOT* using the following command
|
||||||
|
(don't run it yet as there are more instructions coming):
|
||||||
|
|
||||||
|
```sh
|
||||||
|
mpiexec -n NUM_PROC ./build/fedavg_mpi /path/to/training/data/textfile{1,2,3}
|
||||||
|
```
|
||||||
|
|
||||||
|
This program **expects a couple of things**:
|
||||||
|
|
||||||
|
First, **in the project root** create a directory `data` and put in there
|
||||||
|
the following three files:
|
||||||
|
- `vocab.txt` -- a whitespace-separated list of words, for which the embeddings
|
||||||
|
will be learned. The words can only contain lowercase alphabetic ASCII chars
|
||||||
|
(you can try lowercase UTF-8 and see what happens but no guarantees here).
|
||||||
|
- `test.txt` -- a testing dataset with context windows of size 5, one line per
|
||||||
|
window. The central (so third) word in the context window will be used as the
|
||||||
|
target and the surrounding words as the source. The same requirements apply
|
||||||
|
here as for the vocabulary, and furthermore only words present in the
|
||||||
|
`vocab.txt` are allowed in `test.txt`. This file will be used to track the loss
|
||||||
|
of the network during training. An example of the `test.txt` format.
|
||||||
|
|
||||||
|
```
|
||||||
|
the quick brown fox jumped
|
||||||
|
over a lazy dog padword
|
||||||
|
```
|
||||||
|
|
||||||
|
There also needs to be a file `cfg.json` **in the project root** containing the
|
||||||
|
following fields:
|
||||||
|
|
||||||
|
* `"data"`: `some_name` -- the name of the directory in which you put
|
||||||
|
`vocab.txt` and `test.txt`;
|
||||||
|
* `"bpe"`: Number of independent learner SGD iterations per communication
|
||||||
|
round;
|
||||||
|
* `"bs"`: batch size (the number of context windows in a batch);
|
||||||
|
* `"target"`: The float value for the loss that you want to achieve, once the
|
||||||
|
network reaches this loss it will stop training, save the embeddings and exit.
|
||||||
|
|
||||||
|
Then, for each training data file passed as an argument (these can reside
|
||||||
|
wherever you want them to), an input pipeline will be constructed in the
|
||||||
|
program. There are 3 nodes in the input pipeline (tokenizer, filter, batcher).
|
||||||
|
Then there's this rule that one learner isn't allowed to tap more than one
|
||||||
|
pipeline, so each pipeline will need at least one learner. There also needs to
|
||||||
|
be a dispatcher process and a visualizer process.
|
||||||
|
|
||||||
|
**TLDR:** The formula for the number of processes that you need to request from
|
||||||
|
`mpiexec -n` looks like this:
|
||||||
|
|
||||||
|
```
|
||||||
|
NUM_PROCS >= 4*num_data_files + 2
|
||||||
|
```
|
||||||
|
|
||||||
|
There is also a convenient (well, somewhat) formula to determine how many
|
||||||
|
learners you will get depending on the arguments you passed:
|
||||||
|
|
||||||
|
```
|
||||||
|
learners = NUM_PROCS - 2 - 3*num_data_files
|
||||||
|
```
|
||||||
|
|
||||||
|
The good thing is, the program will complain if it doesn't like the numbers you
|
||||||
|
passed it and tell you how to fix it.
|
||||||
@@ -47,11 +47,6 @@ cdef public void serve():
|
|||||||
srv.serve()
|
srv.serve()
|
||||||
|
|
||||||
|
|
||||||
cdef public void bump_count():
|
|
||||||
eprint(f'bumping count from {srv.counter} to {srv.counter + 1}')
|
|
||||||
srv.counter += 1
|
|
||||||
|
|
||||||
|
|
||||||
cdef public size_t getwin():
|
cdef public size_t getwin():
|
||||||
return nn.WIN
|
return nn.WIN
|
||||||
|
|
||||||
@@ -72,10 +67,6 @@ cdef public float gettarget():
|
|||||||
return nn.CFG['target']
|
return nn.CFG['target']
|
||||||
|
|
||||||
|
|
||||||
cdef public float getflpc():
|
|
||||||
return nn.CFG['flpc']
|
|
||||||
|
|
||||||
|
|
||||||
cdef public int get_tokens(WordList* wl, const char *filename):
|
cdef public int get_tokens(WordList* wl, const char *filename):
|
||||||
fnu = filename.decode('utf-8')
|
fnu = filename.decode('utf-8')
|
||||||
if fnu not in tokenizers:
|
if fnu not in tokenizers:
|
||||||
|
|||||||
38
library.py
38
library.py
@@ -1,9 +1,8 @@
|
|||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
|
from sys import stderr
|
||||||
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
|
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
|
||||||
|
|
||||||
from mynet import onehot
|
|
||||||
|
|
||||||
|
|
||||||
WIN = 2
|
WIN = 2
|
||||||
EMB = 32
|
EMB = 32
|
||||||
@@ -11,21 +10,27 @@ EMB = 32
|
|||||||
HERE = os.path.abspath(os.path.dirname(__file__))
|
HERE = os.path.abspath(os.path.dirname(__file__))
|
||||||
|
|
||||||
|
|
||||||
def read_cfg():
|
DATA = os.path.join(HERE, 'data')
|
||||||
with open(os.path.join(HERE, 'cfg.json')) as f:
|
|
||||||
return json.load(f)
|
|
||||||
|
|
||||||
|
|
||||||
CFG = read_cfg()
|
|
||||||
DATA = os.path.join(HERE, CFG['data'])
|
|
||||||
RESULTS = os.path.join(HERE, 'trained')
|
RESULTS = os.path.join(HERE, 'trained')
|
||||||
CORPUS = os.path.join(DATA, 'corpus.txt')
|
CORPUS = os.path.join(DATA, 'corpus.txt')
|
||||||
VOCAB = os.path.join(DATA, 'vocab.txt')
|
VOCAB = os.path.join(DATA, 'vocab.txt')
|
||||||
TEST = os.path.join(DATA, 'test.txt')
|
TEST = os.path.join(DATA, 'test.txt')
|
||||||
|
|
||||||
|
|
||||||
|
if not os.path.exists(RESULTS):
|
||||||
|
os.mkdir(RESULTS)
|
||||||
|
|
||||||
|
|
||||||
|
def read_cfg():
|
||||||
|
with open(os.path.join(DATA, 'cfg.json'), encoding='utf-8') as f:
|
||||||
|
return json.load(f)
|
||||||
|
|
||||||
|
|
||||||
|
CFG = read_cfg()
|
||||||
|
|
||||||
|
|
||||||
def read_vocab_list():
|
def read_vocab_list():
|
||||||
with open(VOCAB) as f:
|
with open(VOCAB, encoding='utf-8') as f:
|
||||||
return f.read().split()
|
return f.read().split()
|
||||||
|
|
||||||
|
|
||||||
@@ -41,6 +46,13 @@ def word_tokenize(s: str):
|
|||||||
return l.split()
|
return l.split()
|
||||||
|
|
||||||
|
|
||||||
|
def onehot(a, nc=10):
|
||||||
|
import numpy as np
|
||||||
|
oh = np.zeros((len(a), nc), dtype=np.float32)
|
||||||
|
oh[np.arange(len(a)), a.flatten().astype(np.int)] = 1
|
||||||
|
return oh
|
||||||
|
|
||||||
|
|
||||||
def create_test_dataset():
|
def create_test_dataset():
|
||||||
import numpy as np
|
import numpy as np
|
||||||
test_dataset = np.vectorize(vocab.get)(np.genfromtxt(TEST, dtype=str))
|
test_dataset = np.vectorize(vocab.get)(np.genfromtxt(TEST, dtype=str))
|
||||||
@@ -89,7 +101,7 @@ def eval_network(net):
|
|||||||
|
|
||||||
|
|
||||||
def token_generator(filename):
|
def token_generator(filename):
|
||||||
with open(filename) as f:
|
with open(filename, encoding='utf-8') as f:
|
||||||
for l in f:
|
for l in f:
|
||||||
if not l.isspace():
|
if not l.isspace():
|
||||||
tok = word_tokenize(l)
|
tok = word_tokenize(l)
|
||||||
@@ -103,8 +115,8 @@ def get_embeddings(net):
|
|||||||
|
|
||||||
def save_embeddings(emb):
|
def save_embeddings(emb):
|
||||||
import numpy as np
|
import numpy as np
|
||||||
np.savetxt(os.path.join(RESULTS, f'embeddings_{CFG["data"]}.csv'), emb)
|
np.savetxt(os.path.join(RESULTS, f'embeddings_{CFG["name"]}.csv'), emb)
|
||||||
|
|
||||||
|
|
||||||
def ckpt_network(net):
|
def ckpt_network(net):
|
||||||
net.save_weights(os.path.join(RESULTS, f'model_ckpt_{CFG["data"]}.h5'))
|
net.save_weights(os.path.join(RESULTS, f'model_ckpt_{CFG["name"]}.h5'))
|
||||||
|
|||||||
7
main.c
7
main.c
@@ -368,14 +368,13 @@ void dispatcher() {
|
|||||||
size_t bs = getbs();
|
size_t bs = getbs();
|
||||||
size_t bpe = getbpe();
|
size_t bpe = getbpe();
|
||||||
float target = gettarget();
|
float target = gettarget();
|
||||||
float flpc = getflpc();
|
|
||||||
|
|
||||||
PyObject* frank = create_network();
|
PyObject* frank = create_network();
|
||||||
WeightList wl;
|
WeightList wl;
|
||||||
init_weightlist_like(&wl, frank);
|
init_weightlist_like(&wl, frank);
|
||||||
update_weightlist(&wl, frank);
|
update_weightlist(&wl, frank);
|
||||||
|
|
||||||
int lpr = number_of(LEARNER) * flpc; // Learners per round
|
int lpr = number_of(LEARNER);
|
||||||
WeightList *wls = malloc(sizeof(WeightList) * lpr);
|
WeightList *wls = malloc(sizeof(WeightList) * lpr);
|
||||||
for in_range(i, lpr) {
|
for in_range(i, lpr) {
|
||||||
init_weightlist_like(wls + i, frank);
|
init_weightlist_like(wls + i, frank);
|
||||||
@@ -445,10 +444,6 @@ void dispatcher() {
|
|||||||
void visualizer() {
|
void visualizer() {
|
||||||
INFO_PRINTF("Starting visualizer %d\n", getpid());
|
INFO_PRINTF("Starting visualizer %d\n", getpid());
|
||||||
serve();
|
serve();
|
||||||
while (1) {
|
|
||||||
sleep(1);
|
|
||||||
bump_count();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int main (int argc, const char **argv) {
|
int main (int argc, const char **argv) {
|
||||||
|
|||||||
15
meson.build
15
meson.build
@@ -1,10 +1,21 @@
|
|||||||
project('fedavg_mpi', 'c')
|
project('fedavg_mpi', 'c')
|
||||||
add_global_arguments('-Wno-unused-command-line-argument', language: 'c')
|
|
||||||
add_project_arguments(
|
add_project_arguments(
|
||||||
'-DNPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION',
|
'-DNPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION',
|
||||||
language: 'c'
|
language: 'c'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
compiler = meson.get_compiler('c')
|
||||||
|
if compiler.has_argument('-Wno-unused-command-line-argument')
|
||||||
|
add_global_arguments('-Wno-unused-command-line-argument', language: 'c')
|
||||||
|
endif
|
||||||
|
|
||||||
|
if compiler.has_link_argument('-Wl,-w')
|
||||||
|
add_link_args = ['-Wl,-w']
|
||||||
|
else
|
||||||
|
add_link_args = []
|
||||||
|
endif
|
||||||
|
|
||||||
mpi = dependency('mpi')
|
mpi = dependency('mpi')
|
||||||
python = dependency('python3')
|
python = dependency('python3')
|
||||||
numpy_header = include_directories(run_command(
|
numpy_header = include_directories(run_command(
|
||||||
@@ -21,4 +32,4 @@ executable('fedavg_mpi',
|
|||||||
'main.c', bridge,
|
'main.c', bridge,
|
||||||
dependencies: [mpi, python],
|
dependencies: [mpi, python],
|
||||||
include_directories: numpy_header,
|
include_directories: numpy_header,
|
||||||
link_args: '-Wl,-w')
|
link_args: add_link_args)
|
||||||
|
|||||||
Reference in New Issue
Block a user