From 9727499f4df3e33c48e96dbc984e8187f4419efe Mon Sep 17 00:00:00 2001 From: Pavel Lutskov Date: Tue, 26 Nov 2019 21:20:40 -0800 Subject: [PATCH] set up synchronous comm for many slaves and also abusing the c preprocessor --- CMakeLists.txt | 2 +- library.pyx | 2 +- main.c | 119 ++++++++++++++++++++++++++++++++++++------------- 3 files changed, 90 insertions(+), 33 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4d121f5..5623e48 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,7 +4,7 @@ project(fedavg_mpi) find_package(MPI REQUIRED) find_package(Python3 COMPONENTS Development NumPy) -add_executable(${PROJECT_NAME} main.c library.c) +add_executable(${PROJECT_NAME} main.c cythoned/library.c) target_include_directories(${PROJECT_NAME} PRIVATE ${Python3_INCLUDE_DIRS}) target_include_directories(${PROJECT_NAME} PRIVATE ${Python3_NumPy_INCLUDE_DIRS}) diff --git a/library.pyx b/library.pyx index 9087aba..aa2b1b2 100644 --- a/library.pyx +++ b/library.pyx @@ -81,7 +81,7 @@ cdef public void create_c_network(Network* c_net): c_net.layers[i].ownmem = 1 -cdef public void frankenstein(Network* c_frank, Network* c_nets, +cdef public void combo_c_net(Network* c_frank, Network* c_nets, size_t num_nets): """ONE-LINER HOW BOUT THAT HUH.""" combo_net( diff --git a/main.c b/main.c index 65f4bcc..9cf7f3c 100644 --- a/main.c +++ b/main.c @@ -1,16 +1,27 @@ -#include "library.h" +#include "cythoned/library.h" #include #include #include #define P_READER 0 -#define P_SLAVE 1 -#define P_MASTER 2 +#define P_MASTER 1 +#define P_SLAVE 2 + +#define TAG_IDGAF 0 +#define TAG_BATCH 1 +#define TAG_NETWK 2 +#define TAG_WEIGH 2 #define COMM 500 -#define ITER 32 -#define BS 32 +#define ITER 40 +#define BS 50 + +#define sid(s) s + P_SLAVE + +#define s_in_slaves(w) (size_t s = 0; s < w - P_SLAVE; s++) +#define i_in_range(x) (size_t i = 0; i < x; i++) +// I am honestly VERY sorry for this but power corrupts even the best of us typedef enum{ DATA, @@ -23,19 +34,53 @@ void data_reader() { printf("Start reader\n"); size_t batch_numel = (784 + 10) * BS; float* batch = malloc(batch_numel * sizeof(float)); + int s = 0; + int num_slaves; + MPI_Comm_size(MPI_COMM_WORLD, &num_slaves); + num_slaves -= P_SLAVE; while (1) { mnist_batch(batch, BS); - MPI_Send(batch, batch_numel, MPI_FLOAT, P_SLAVE, 0, MPI_COMM_WORLD); + MPI_Send(batch, batch_numel, MPI_FLOAT, sid(s), + TAG_BATCH, MPI_COMM_WORLD); + s = (s + 1) % num_slaves; } free(batch); } +void send_weights(const Network* c_net, int dest, int tag) { + // This assumes that the receiving end has a fully initialized network + // Of the same arch as `c_net` + for i_in_range(c_net->n_layers) { + long d0 = c_net->layers[i].shape[0]; + long d1 = c_net->layers[i].shape[1]; + MPI_Send(c_net->layers[i].W, d0 * d1, MPI_FLOAT, dest, tag, + MPI_COMM_WORLD); + MPI_Send(c_net->layers[i].b, d1, MPI_FLOAT, dest, tag, + MPI_COMM_WORLD); + } +} + +void recv_weights(const Network* c_net, int src, int tag) { + // This assumes that the sender is going to send stuff that is going + // To fit exactly into the c_net + for i_in_range(c_net->n_layers) { + long d0 = c_net->layers[i].shape[0]; + long d1 = c_net->layers[i].shape[1]; + MPI_Recv(c_net->layers[i].W, d0 * d1, MPI_FLOAT, src, tag, + MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(c_net->layers[i].b, d1, MPI_FLOAT, src, tag, + MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } +} + + + void send_network(const Network* c_net, int dest, int tag) { // Send a network to the expecting destination // It's best to receive with `recv_network` size_t n_layers = c_net->n_layers; MPI_Send(&n_layers, 1, MPI_LONG, dest, tag, MPI_COMM_WORLD); - for (size_t i = 0; i < n_layers; i++) { + for i_in_range(c_net->n_layers) { long d0 = c_net->layers[i].shape[0]; long d1 = c_net->layers[i].shape[1]; MPI_Send(c_net->layers[i].shape, 2, MPI_LONG, dest, tag, @@ -48,11 +93,11 @@ void send_network(const Network* c_net, int dest, int tag) { } void recv_network(Network* c_net, int src, int tag) { - // Creates a new network at c_net (all pointers will be lost so beware) + // c_net HAS TO BE a fresh empty Network struct MPI_Recv(&c_net->n_layers, 1, MPI_LONG, src, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); c_net->layers = malloc(sizeof(Dense) * c_net->n_layers); - for (size_t i = 0; i < c_net->n_layers; i++) { + for i_in_range(c_net->n_layers) { MPI_Recv(&c_net->layers[i].shape, 2, MPI_LONG, src, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); long d0 = c_net->layers[i].shape[0]; @@ -69,57 +114,69 @@ void recv_network(Network* c_net, int src, int tag) { void free_network_contents(Network* c_net) { // Cleans up the net - for (size_t i = 0; i < c_net->n_layers; i++) { + for i_in_range(c_net->n_layers) { if (c_net->layers[i].ownmem) { free(c_net->layers[i].b); free(c_net->layers[i].W); } } - free(c_net->layers); - c_net->layers = NULL; // So that you don't get any ideas + if (c_net->layers != NULL) { + free(c_net->layers); + c_net->layers = NULL; // So that you don't get any ideas + } } // Receives weight updates and trains, sends learned weights back to master void slave_node() { printf("Start slave\n"); - Network net; - create_c_network(&net); size_t batch_numel = (784 + 10) * BS; float* batch = malloc(batch_numel * sizeof(float)); + Network net; + create_c_network(&net); - for (int i = 0; i < COMM; i++) { - char go; - MPI_Recv(&go, 1, MPI_CHAR, P_MASTER, MPI_ANY_TAG, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); + for i_in_range(COMM) { + recv_weights(&net, P_MASTER, TAG_NETWK); for (int k = 0; k < ITER; k++) { - MPI_Recv(batch, batch_numel, MPI_FLOAT, P_READER, MPI_ANY_TAG, + MPI_Recv(batch, batch_numel, MPI_FLOAT, P_READER, TAG_BATCH, MPI_COMM_WORLD, MPI_STATUS_IGNORE); step_net(&net, batch, BS); } printf("Net: %f\n", eval_net(&net)); - send_network(&net, P_MASTER, 0); + send_weights(&net, P_MASTER, TAG_WEIGH); } - - free(batch); free_network_contents(&net); + free(batch); } -// Stores most up-to-date model, sends it to slaves for training void master_node() { + // Stores most up-to-date model, sends it to slaves for training + // First do it synchronously + // Need a "slave registry" + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); printf("Start master\n"); Network frank; create_c_network(&frank); - for (int i = 0; i < COMM; i++) { - char go; - MPI_Send(&go, 1, MPI_CHAR, P_SLAVE, 0, MPI_COMM_WORLD); - Network net; - recv_network(&net, P_SLAVE, MPI_ANY_TAG); - frankenstein(&frank, &net, 1); - free_network_contents(&net); + + // It's better to have more memory than needed + // Than less memory than needed + // Kong Fuzi + Network* nets = malloc(sizeof(Network) * world_size); + for s_in_slaves(world_size) create_c_network(nets + s); + + for i_in_range(COMM) { + for s_in_slaves(world_size) { + send_weights(&frank, sid(s), TAG_WEIGH); + } + for s_in_slaves(world_size) { + recv_weights(nets + s, sid(s), TAG_WEIGH); + } + combo_c_net(&frank, nets, world_size - P_SLAVE); printf("Frank: %f\n", eval_net(&frank)); } free_network_contents(&frank); + free(nets); } Role map_node() { @@ -127,7 +184,7 @@ Role map_node() { MPI_Comm_rank(MPI_COMM_WORLD, &node); if (node == P_READER) return DATA; if (node == P_MASTER) return MASTER; - if (node == P_SLAVE) return SLAVE; + if (node >= P_SLAVE) return SLAVE; exit(1); // this is bad }