diff --git a/.gitignore b/.gitignore index 45ad3b5..75e5137 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ library.h run compile_commands.json build/ +__pycache__/ diff --git a/library.pyx b/library.pyx index e4762df..92a02bc 100644 --- a/library.pyx +++ b/library.pyx @@ -1,30 +1,30 @@ cimport numpy as np import numpy as np -import mynet as mn + +from sys import stderr from libc.stdlib cimport malloc from libc.string cimport memcpy +import nn -ctr = [] -X_train, y_train, X_test, y_test = mn.load_mnist() -opt = mn.SGDOptimizer(lr=0.1) + +X_train, y_train, X_test, y_test = nn.load_mnist() cdef extern from "numpy/arrayobject.h": void *PyArray_DATA(np.ndarray arr) -ctypedef public struct Dense: - long[2] shape - int ownmem +ctypedef public struct Weight: + size_t dims + long* shape float* W - float* b -ctypedef public struct Network: - size_t n_layers; - Dense* layers; +ctypedef public struct WeightList: + size_t n_weights; + Weight* weights; cdef public char *greeting(): @@ -35,33 +35,31 @@ cdef public void debug_print(object o): print(o) -cdef public void predict( - Network* net, - float* X, - size_t batch_size -): - pass +cdef public object create_network(): + return nn.create_mnist_network() + + +cdef public void set_net_weights(object net, WeightList* wl): + net.set_weights(wrap_weight_list(wl)) cdef public void step_net( - Network* c_net, - float* batch_data, - size_t batch_size + object net, float* X, float* y, size_t batch_size ): - net = wrap_c_network(c_net) - cdef size_t in_dim = net.geometry[0] - cdef size_t out_dim = net.geometry[-1] - batch = np.asarray(batch_data) - # print(np.argmax(batch[:, in_dim:], axis=1), flush=True) - net.step(batch[:, :in_dim], batch[:, in_dim:], opt) + in_shape = (batch_size,) + net.layers[0].input_shape[1:] + out_shape = (batch_size,) + net.layers[-1].output_shape[1:] + X_train = np.asarray(X).reshape(in_shape) + y_train = np.asarray(y).reshape(out_shape) + + net.train_on_batch(X_train, y_train) -cdef public float eval_net(Network* c_net): - net = wrap_c_network(c_net) - return net.evaluate(X_test, y_test, 'cls') +cdef public float eval_net(object net): + return net.evaluate(X_test, y_test, verbose=False)[1] -cdef public void mnist_batch(float* batch, size_t bs, int part, int total): +cdef public void mnist_batch(float* X, float* y, size_t bs, + int part, int total): if total == 0: X_pool, y_pool = X_train, y_train else: @@ -70,71 +68,66 @@ cdef public void mnist_batch(float* batch, size_t bs, int part, int total): y_pool = y_train[part*partsize:(part+1)*partsize] idx = np.random.choice(len(X_pool), bs, replace=True) - arr = np.concatenate([X_pool[idx], y_pool[idx]], axis=1) - assert arr.flags['C_CONTIGUOUS'] - memcpy(batch, PyArray_DATA(arr), arr.size*sizeof(float)) + + X_r = X_pool[idx] + y_r = y_pool[idx] + + assert X_r.flags['C_CONTIGUOUS'] + assert y_r.flags['C_CONTIGUOUS'] + memcpy(X, PyArray_DATA(X_r), X_r.size * sizeof(float)) + memcpy(y, PyArray_DATA(y_r), y_r.size * sizeof(float)) -cdef public void create_c_network(Network* c_net): - net = create_network() - c_net.n_layers = len(net.layers) - c_net.layers = malloc(sizeof(Dense) * c_net.n_layers) - for i, l in enumerate(net.layers): - d0, d1 = l.W.shape - c_net.layers[i].shape[0] = d0 - c_net.layers[i].shape[1] = d1 - c_net.layers[i].W = malloc(sizeof(float) * d0 * d1) - c_net.layers[i].b = malloc(sizeof(float) * d1) - assert l.W.flags['C_CONTIGUOUS'] - assert l.b.flags['C_CONTIGUOUS'] - memcpy(c_net.layers[i].W, PyArray_DATA(l.W), sizeof(float) * d0 * d1) - memcpy(c_net.layers[i].b, PyArray_DATA(l.b), sizeof(float) * d1) - c_net.layers[i].ownmem = 1 +cdef public void init_weightlist_like(WeightList* wl, object net): + weights = net.get_weights() + wl.n_weights = len(weights) + wl.weights = malloc(sizeof(Weight) * wl.n_weights) + for i, w in enumerate(weights): + sh = np.asarray(w.shape, dtype=long) + wl.weights[i].dims = sh.size + wl.weights[i].shape = malloc(sizeof(long) * sh.size) + wl.weights[i].W = malloc(sizeof(float) * w.size) + + assert sh.flags['C_CONTIGUOUS'] + memcpy(wl.weights[i].shape, PyArray_DATA(sh), + sh.size * sizeof(long)) -cdef public void combo_c_net(Network* c_frank, Network* c_nets, - size_t num_nets): - """ONE-LINER HOW BOUT THAT HUH.""" - combo_net( - wrap_c_network(c_frank), - [wrap_c_network(&c_nets[i]) for i in range(num_nets)] - ) +cdef public void update_weightlist(WeightList* wl, object net): + weights = net.get_weights() + for i, w in enumerate(weights): + w = w.astype(np.float32) + + assert w.flags['C_CONTIGUOUS'] + memcpy(wl.weights[i].W, PyArray_DATA(w), + w.size * sizeof(float)) -cdef public void be_like(Network* c_dst, Network* c_src): - """Conveniently transform one C network into another.""" - dst = wrap_c_network(c_dst) - src = wrap_c_network(c_src) - dst.be_like(src) +cdef public void combo_weights( + WeightList* wl_frank, WeightList* wls, size_t num_weights +): + """Not a one-liner anymore :/""" + alpha = 1. / num_weights + frank = wrap_weight_list(wl_frank) + for w in frank: + w[:] = 0 + for i in range(num_weights): + for wf, ww in zip(frank, wrap_weight_list(&wls[i])): + wf += alpha * ww -cdef object wrap_c_network(Network* c_net): - """Create a thin wrapper not owning the memory.""" - net = create_network(init=False) - for i, l in enumerate(net.layers): - d0, d1 = c_net.layers[i].shape[0], c_net.layers[i].shape[1] - l.W = np.asarray(c_net.layers[i].W) - l.b = np.asarray(c_net.layers[i].b) - return net +cdef list wrap_weight_list(WeightList* wl): + weights = [] + for i in range(wl.n_weights): + w_shape = wl.weights[i].shape + w_numel = np.prod(w_shape) + weights.append( + np.asarray(wl.weights[i].W).reshape(w_shape) + ) + return weights def inspect_array(a): print(a.flags, flush=True) print(a.dtype, flush=True) print(a.sum(), flush=True) - - -def create_network(init=True): - return mn.Network((784, 30, 10), mn.relu, mn.sigmoid, mn.bin_x_entropy, - initialize=init) - - -def combo_net(net, nets, alpha=None): - tot = len(nets) - if alpha is None: - alpha = [1 / tot] * tot - for l in net.layers: - l.set_weights(np.zeros_like(t) for t in l.trainables()) - for n, a in zip(nets, alpha): - for la, lb in zip(n.layers, net.layers): - lb.update(t * a for t in la.trainables()) diff --git a/main.c b/main.c index 57dd5ab..3eb8d78 100644 --- a/main.c +++ b/main.c @@ -9,11 +9,12 @@ #define TAG_NETWK 2 #define TAG_WEIGH 3 #define TAG_READY 4 +#define TAG_BREAK 5 -#define COMM 500 -#define ITER 120 +#define COMM 100 +#define ITER 20 #define BS 50 -#define FSPC 0.4 +#define FSPC 1 #define in_range(i, x) (size_t (i) = 0; (i) < (x); (i)++) // I am honestly VERY sorry for this but power corrupts even the best of us @@ -126,96 +127,60 @@ int rid(int id, Role what) { return id - z; } +void free_weightlist(WeightList* wl) { + for in_range(i, wl->n_weights) { + free(wl->weights[i].shape); + free(wl->weights[i].W); + } + free(wl->weights); +} + void data_reader() { // Reads some data and converts it to a float array - printf("Start reader\n"); - size_t batch_numel = (784 + 10) * BS; - float* batch = malloc(batch_numel * sizeof(float)); + INFO_PRINTF("Starting reader %d\n", getpid()); + + size_t X_numel = 784 * BS; + size_t y_numel = 10 * BS; + float* X = malloc(X_numel * sizeof(float)); + float* y = malloc(y_numel * sizeof(float)); int s = 0; - while (1) { + while (s != -1) { MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - mnist_batch(batch, BS, rid(s, SLAVE), number_of_slaves()); - MPI_Send(batch, batch_numel, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD); - } - free(batch); -} - -void send_weights(const Network* c_net, int dest, int tag) { - // This assumes that the receiving end has a fully initialized network - // Of the same arch as `c_net` - for in_range(i, c_net->n_layers) { - long d0 = c_net->layers[i].shape[0]; - long d1 = c_net->layers[i].shape[1]; - MPI_Send(c_net->layers[i].W, d0 * d1, MPI_FLOAT, dest, tag, - MPI_COMM_WORLD); - MPI_Send(c_net->layers[i].b, d1, MPI_FLOAT, dest, tag, - MPI_COMM_WORLD); - } -} - -void recv_weights(const Network* c_net, int src, int tag) { - // This assumes that the sender is going to send stuff that is going - // To fit exactly into the c_net - for in_range(i, c_net->n_layers) { - long d0 = c_net->layers[i].shape[0]; - long d1 = c_net->layers[i].shape[1]; - MPI_Recv(c_net->layers[i].W, d0 * d1, MPI_FLOAT, src, tag, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); - MPI_Recv(c_net->layers[i].b, d1, MPI_FLOAT, src, tag, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); - } -} - -void send_network(const Network* c_net, int dest, int tag) { - // Send a network to the expecting destination - // It's best to receive with `recv_network` - size_t n_layers = c_net->n_layers; - MPI_Send(&n_layers, 1, MPI_LONG, dest, tag, MPI_COMM_WORLD); - for in_range(i, c_net->n_layers) { - long d0 = c_net->layers[i].shape[0]; - long d1 = c_net->layers[i].shape[1]; - MPI_Send(c_net->layers[i].shape, 2, MPI_LONG, dest, tag, - MPI_COMM_WORLD); - MPI_Send(c_net->layers[i].W, d0 * d1, MPI_FLOAT, dest, tag, - MPI_COMM_WORLD); - MPI_Send(c_net->layers[i].b, d1, MPI_FLOAT, dest, tag, - MPI_COMM_WORLD); - } -} - -void recv_network(Network* c_net, int src, int tag) { - // c_net HAS TO BE a fresh empty Network struct - MPI_Recv(&c_net->n_layers, 1, MPI_LONG, src, tag, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - c_net->layers = malloc(sizeof(Dense) * c_net->n_layers); - for in_range(i, c_net->n_layers) { - MPI_Recv(&c_net->layers[i].shape, 2, MPI_LONG, src, tag, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); - long d0 = c_net->layers[i].shape[0]; - long d1 = c_net->layers[i].shape[1]; - c_net->layers[i].ownmem = 1; - c_net->layers[i].W = malloc(sizeof(float) * d0 * d1); - c_net->layers[i].b = malloc(sizeof(float) * d1); - MPI_Recv(c_net->layers[i].W, d0 * d1, MPI_FLOAT, src, tag, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); - MPI_Recv(c_net->layers[i].b, d1, MPI_FLOAT, src, tag, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); - } -} - -void free_network_contents(Network* c_net) { - // Cleans up the net - for in_range(i, c_net->n_layers) { - if (c_net->layers[i].ownmem) { - free(c_net->layers[i].b); - free(c_net->layers[i].W); + if (s != -1) { + mnist_batch(X, y, BS, rid(s, SLAVE), number_of_slaves()); + MPI_Send(X, X_numel, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD); + MPI_Send(y, y_numel, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD); } } - if (c_net->layers != NULL) { - free(c_net->layers); - c_net->layers = NULL; // So that you don't get any ideas + free(X); + free(y); +} + +void send_weights(const WeightList* wl, int dest, int tag) { + // This assumes that the receiving end knows exactly + // the number of elements being sent and has memory ready + // for it. + for in_range(i, wl->n_weights) { + long n_el = 1; + for in_range(k, wl->weights[i].dims) { + n_el *= wl->weights[i].shape[k]; + } + MPI_Send(wl->weights[i].W, n_el, MPI_FLOAT, dest, tag, MPI_COMM_WORLD); + } +} + +void recv_weights(WeightList* wl, int src, int tag) { + // This assumes that the sender sends stuff that is going + // to fit into memory in correct order too. + for in_range(i, wl->n_weights) { + long n_el = 1; + for in_range(d, wl->weights[i].dims) { + n_el *= wl->weights[i].shape[d]; + } + MPI_Recv(wl->weights[i].W, n_el, MPI_FLOAT, src, tag, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); } } @@ -225,33 +190,38 @@ void slave_node() { // 2. Request batch from reader ([ ] has to choose a reader) // 3. Do computations // 4. Send weights back to master - printf("Start slave\n"); + INFO_PRINTF("Starting slave %d\n", getpid()); int me; MPI_Comm_rank(MPI_COMM_WORLD, &me); - size_t batch_numel = (784 + 10) * BS; - float* batch = malloc(batch_numel * sizeof(float)); - Network net; - create_c_network(&net); + size_t X_numel = 784 * BS; + size_t y_numel = 10 * BS; + float* X = malloc(X_numel * sizeof(float)); + float* y = malloc(y_numel * sizeof(float)); + + PyObject* net = create_network(); + WeightList wl; + init_weightlist_like(&wl, net); for in_range(i, COMM) { - // INFO_PRINTF("%d announcing itself\n", my_id()); MPI_Send(&me, 1, MPI_INT, master_id(0), TAG_READY, MPI_COMM_WORLD); - // INFO_PRINTF("%d waitng for weights from %d\n", my_id(), master_id(0)); - recv_weights(&net, master_id(0), TAG_WEIGH); - // INFO_PRINTF("%d an answer!\n", my_id()); + recv_weights(&wl, master_id(0), TAG_WEIGH); + set_net_weights(net, &wl); for in_range(k, ITER) { MPI_Send(&me, 1, MPI_INT, reader_id(0), TAG_READY, MPI_COMM_WORLD); - MPI_Recv(batch, batch_numel, MPI_FLOAT, reader_id(0), TAG_BATCH, + MPI_Recv(X, X_numel, MPI_FLOAT, reader_id(0), TAG_BATCH, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - step_net(&net, batch, BS); + MPI_Recv(y, y_numel, MPI_FLOAT, reader_id(0), TAG_BATCH, + MPI_COMM_WORLD, MPI_STATUS_IGNORE); + step_net(net, X, y, BS); } - printf("%d net: %f\n", my_id(), eval_net(&net)); - send_weights(&net, master_id(0), TAG_WEIGH); + printf("%d net: %f\n", my_id(), eval_net(net)); + update_weightlist(&wl, net); + send_weights(&wl, master_id(0), TAG_WEIGH); } - free_network_contents(&net); - free(batch); + Py_DECREF(net); + free_weightlist(&wl); } void master_node() { @@ -261,34 +231,47 @@ void master_node() { // 2. Receive weights back (synchronous) // 3. Average the weights - printf("Start master\n"); - Network frank; - create_c_network(&frank); + PyObject* frank = create_network(); + WeightList wl; + init_weightlist_like(&wl, frank); + update_weightlist(&wl, frank); int spr = number_of_slaves() * FSPC; // Slaves per round int s; - Network *nets = malloc(sizeof(Network) * spr); + WeightList *wls = malloc(sizeof(WeightList) * spr); int *handles = malloc(sizeof(int) * spr); - for in_range(i, spr) create_c_network(nets + i); + for in_range(i, spr) { + init_weightlist_like(wls + i, frank); + } for in_range(i, COMM) { for in_range(k, spr) { MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - send_weights(&frank, s, TAG_WEIGH); + send_weights(&wl, s, TAG_WEIGH); handles[k] = s; } for in_range(k, spr) { - recv_weights(nets + k, handles[k], TAG_WEIGH); + recv_weights(wls + k, handles[k], TAG_WEIGH); + } + combo_weights(&wl, wls, spr); + set_net_weights(frank, &wl); + printf("Frank: %f\n", eval_net(frank)); + } + Py_DECREF(frank); + free_weightlist(&wl); + for in_range(i, spr) free_weightlist(wls + i); + free(wls); + if (rid(my_id(), MASTER) == 0) { + for in_range(r, number_of_readers()) { + int stop = -1; + MPI_Send(&stop, 1, MPI_INT, reader_id(r), TAG_READY, + MPI_COMM_WORLD); } - combo_c_net(&frank, nets, spr); - printf("Frank: %f\n", eval_net(&frank)); } - free_network_contents(&frank); - free(nets); } int main (int argc, const char **argv) { diff --git a/meson.build b/meson.build index b0b3306..5df3f49 100644 --- a/meson.build +++ b/meson.build @@ -1,4 +1,5 @@ project('fedavg_mpi', 'c') +add_global_arguments('-Wno-unused-command-line-argument', language: 'c') add_project_arguments( '-DNPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION', language: 'c' @@ -13,5 +14,6 @@ numpy_header = include_directories(run_command( executable( 'fedavg_mpi', 'main.c', 'cythoned/library.c', dependencies: [mpi, python], - include_directories: numpy_header + include_directories: numpy_header, + link_args: '-Wl,-w' ) diff --git a/nn.py b/nn.py new file mode 100644 index 0000000..b70201b --- /dev/null +++ b/nn.py @@ -0,0 +1,16 @@ +import os +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' + +import tensorflow as tf +tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) # STFU! +from mynet import load_mnist + + +def create_mnist_network(): + model = tf.keras.models.Sequential([ + tf.keras.layers.Dense(30, input_shape=(784,), activation='relu'), + tf.keras.layers.Dense(10, activation='softmax') + ]) + model.compile(loss='categorical_crossentropy', optimizer='sgd', + metrics=['accuracy']) + return model