diff --git a/library.pyx b/library.pyx index aa2b1b2..e4762df 100644 --- a/library.pyx +++ b/library.pyx @@ -27,7 +27,7 @@ ctypedef public struct Network: Dense* layers; -cdef public char * greeting(): +cdef public char *greeting(): return f'The value is {3**3**3}'.encode('utf-8') @@ -52,6 +52,7 @@ cdef public void step_net( cdef size_t in_dim = net.geometry[0] cdef size_t out_dim = net.geometry[-1] batch = np.asarray(batch_data) + # print(np.argmax(batch[:, in_dim:], axis=1), flush=True) net.step(batch[:, :in_dim], batch[:, in_dim:], opt) @@ -60,9 +61,17 @@ cdef public float eval_net(Network* c_net): return net.evaluate(X_test, y_test, 'cls') -cdef public void mnist_batch(float* batch, size_t bs): - idx = np.random.choice(len(X_train), bs, replace=False) - arr = np.concatenate([X_train[idx], y_train[idx]], axis=1) +cdef public void mnist_batch(float* batch, size_t bs, int part, int total): + if total == 0: + X_pool, y_pool = X_train, y_train + else: + partsize = len(X_train) // total + X_pool = X_train[part*partsize:(part+1)*partsize] + y_pool = y_train[part*partsize:(part+1)*partsize] + + idx = np.random.choice(len(X_pool), bs, replace=True) + arr = np.concatenate([X_pool[idx], y_pool[idx]], axis=1) + assert arr.flags['C_CONTIGUOUS'] memcpy(batch, PyArray_DATA(arr), arr.size*sizeof(float)) @@ -76,6 +85,8 @@ cdef public void create_c_network(Network* c_net): c_net.layers[i].shape[1] = d1 c_net.layers[i].W = malloc(sizeof(float) * d0 * d1) c_net.layers[i].b = malloc(sizeof(float) * d1) + assert l.W.flags['C_CONTIGUOUS'] + assert l.b.flags['C_CONTIGUOUS'] memcpy(c_net.layers[i].W, PyArray_DATA(l.W), sizeof(float) * d0 * d1) memcpy(c_net.layers[i].b, PyArray_DATA(l.b), sizeof(float) * d1) c_net.layers[i].ownmem = 1 @@ -101,7 +112,7 @@ cdef object wrap_c_network(Network* c_net): """Create a thin wrapper not owning the memory.""" net = create_network(init=False) for i, l in enumerate(net.layers): - d0, d1 = l.W.shape + d0, d1 = c_net.layers[i].shape[0], c_net.layers[i].shape[1] l.W = np.asarray(c_net.layers[i].W) l.b = np.asarray(c_net.layers[i].b) return net @@ -114,7 +125,7 @@ def inspect_array(a): def create_network(init=True): - return mn.Network((784, 10), mn.relu, mn.sigmoid, mn.bin_x_entropy, + return mn.Network((784, 30, 10), mn.relu, mn.sigmoid, mn.bin_x_entropy, initialize=init) diff --git a/main.c b/main.c index 9b56ad3..57dd5ab 100644 --- a/main.c +++ b/main.c @@ -4,10 +4,6 @@ #include #include -#define P_READER 0 -#define P_MASTER 1 -#define P_SLAVE 2 - #define TAG_IDGAF 0 #define TAG_BATCH 1 #define TAG_NETWK 2 @@ -15,23 +11,25 @@ #define TAG_READY 4 #define COMM 500 -#define ITER 40 +#define ITER 120 #define BS 50 -#define FSPC 0.2 +#define FSPC 0.4 -#define sid(s) s + P_SLAVE - -#define s_in_slaves(w) (size_t s = 0; s < w - P_SLAVE; s++) -#define i_in_range(x) (size_t i = 0; i < x; i++) +#define in_range(i, x) (size_t (i) = 0; (i) < (x); (i)++) // I am honestly VERY sorry for this but power corrupts even the best of us +#define INFO_PRINTF(fmt, ...) \ + do { fprintf(stderr, fmt, __VA_ARGS__); } while(0) +#define INFO_PRINTLN(what) \ + do { fprintf(stderr, "%s\n", what); } while(0) + + typedef enum{ DATA, SLAVE, MASTER } Role; - typedef struct IntQueue IntQueue; struct IntQueue { int head; @@ -67,6 +65,67 @@ int queue_full(IntQueue *q) { return ((q->tail + 1) % q->size) == q->head; } +int number_of_nodes() { + int n; + MPI_Comm_size(MPI_COMM_WORLD, &n); + return n; +} + +int number_of_masters() { + return 1; +} + +int number_of_readers() { + return 1; +} + +int number_of_slaves() { + return number_of_nodes() - number_of_masters() - number_of_readers(); +} + +int my_id() { + int i; + MPI_Comm_rank(MPI_COMM_WORLD, &i); + return i; +} + +int master_id(int m) { + return m; +} + +int reader_id(int r) { + return r + number_of_masters(); +} + +int slave_id(int s) { + return s + number_of_masters() + number_of_readers(); +} + +Role map_node() { + int node; + MPI_Comm_rank(MPI_COMM_WORLD, &node); + if (node >= reader_id(0) && node <= reader_id(number_of_readers()-1)) { + return DATA; + } + if (node >= master_id(0) && node <= master_id(number_of_masters()-1)) { + return MASTER; + } + if (node >= slave_id(0) && node <= slave_id(number_of_slaves()-1)) { + return SLAVE; + } + exit(1); // this is bad +} + +int rid(int id, Role what) { + int z; + switch (what) { + case DATA: z = reader_id(0); break; + case SLAVE: z = slave_id(0); break; + case MASTER: z = master_id(0); break; + } + return id - z; +} + void data_reader() { // Reads some data and converts it to a float array printf("Start reader\n"); @@ -77,7 +136,7 @@ void data_reader() { while (1) { MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - mnist_batch(batch, BS); + mnist_batch(batch, BS, rid(s, SLAVE), number_of_slaves()); MPI_Send(batch, batch_numel, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD); } free(batch); @@ -86,7 +145,7 @@ void data_reader() { void send_weights(const Network* c_net, int dest, int tag) { // This assumes that the receiving end has a fully initialized network // Of the same arch as `c_net` - for i_in_range(c_net->n_layers) { + for in_range(i, c_net->n_layers) { long d0 = c_net->layers[i].shape[0]; long d1 = c_net->layers[i].shape[1]; MPI_Send(c_net->layers[i].W, d0 * d1, MPI_FLOAT, dest, tag, @@ -99,7 +158,7 @@ void send_weights(const Network* c_net, int dest, int tag) { void recv_weights(const Network* c_net, int src, int tag) { // This assumes that the sender is going to send stuff that is going // To fit exactly into the c_net - for i_in_range(c_net->n_layers) { + for in_range(i, c_net->n_layers) { long d0 = c_net->layers[i].shape[0]; long d1 = c_net->layers[i].shape[1]; MPI_Recv(c_net->layers[i].W, d0 * d1, MPI_FLOAT, src, tag, @@ -114,7 +173,7 @@ void send_network(const Network* c_net, int dest, int tag) { // It's best to receive with `recv_network` size_t n_layers = c_net->n_layers; MPI_Send(&n_layers, 1, MPI_LONG, dest, tag, MPI_COMM_WORLD); - for i_in_range(c_net->n_layers) { + for in_range(i, c_net->n_layers) { long d0 = c_net->layers[i].shape[0]; long d1 = c_net->layers[i].shape[1]; MPI_Send(c_net->layers[i].shape, 2, MPI_LONG, dest, tag, @@ -131,7 +190,7 @@ void recv_network(Network* c_net, int src, int tag) { MPI_Recv(&c_net->n_layers, 1, MPI_LONG, src, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); c_net->layers = malloc(sizeof(Dense) * c_net->n_layers); - for i_in_range(c_net->n_layers) { + for in_range(i, c_net->n_layers) { MPI_Recv(&c_net->layers[i].shape, 2, MPI_LONG, src, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); long d0 = c_net->layers[i].shape[0]; @@ -148,7 +207,7 @@ void recv_network(Network* c_net, int src, int tag) { void free_network_contents(Network* c_net) { // Cleans up the net - for i_in_range(c_net->n_layers) { + for in_range(i, c_net->n_layers) { if (c_net->layers[i].ownmem) { free(c_net->layers[i].b); free(c_net->layers[i].W); @@ -160,8 +219,12 @@ void free_network_contents(Network* c_net) { } } -// Receives weight updates and trains, sends learned weights back to master void slave_node() { + // 0. Announce readiness? + // 1. Receive weights from master ([ ] has to know its master) + // 2. Request batch from reader ([ ] has to choose a reader) + // 3. Do computations + // 4. Send weights back to master printf("Start slave\n"); int me; @@ -172,67 +235,62 @@ void slave_node() { Network net; create_c_network(&net); - for i_in_range(COMM) { - MPI_Send(&me, 1, MPI_INT, P_MASTER, TAG_READY, MPI_COMM_WORLD); - recv_weights(&net, P_MASTER, TAG_NETWK); - for (int k = 0; k < ITER; k++) { - MPI_Send(&me, 1, MPI_INT, P_READER, TAG_READY, MPI_COMM_WORLD); - MPI_Recv(batch, batch_numel, MPI_FLOAT, P_READER, TAG_BATCH, + for in_range(i, COMM) { + // INFO_PRINTF("%d announcing itself\n", my_id()); + MPI_Send(&me, 1, MPI_INT, master_id(0), TAG_READY, MPI_COMM_WORLD); + // INFO_PRINTF("%d waitng for weights from %d\n", my_id(), master_id(0)); + recv_weights(&net, master_id(0), TAG_WEIGH); + // INFO_PRINTF("%d an answer!\n", my_id()); + for in_range(k, ITER) { + MPI_Send(&me, 1, MPI_INT, reader_id(0), TAG_READY, MPI_COMM_WORLD); + MPI_Recv(batch, batch_numel, MPI_FLOAT, reader_id(0), TAG_BATCH, MPI_COMM_WORLD, MPI_STATUS_IGNORE); step_net(&net, batch, BS); } - printf("Net: %f\n", eval_net(&net)); - send_weights(&net, P_MASTER, TAG_WEIGH); + printf("%d net: %f\n", my_id(), eval_net(&net)); + send_weights(&net, master_id(0), TAG_WEIGH); } free_network_contents(&net); free(batch); } void master_node() { - // Stores most up-to-date model, sends it to slaves for training - // First do it synchronously - // Need a "slave registry" - printf("Start master\n"); + // 0. Initialize model - int world_size; - MPI_Comm_size(MPI_COMM_WORLD, &world_size); + // 1. Send it to some slaves for processing (synchronous) + // 2. Receive weights back (synchronous) + // 3. Average the weights + + printf("Start master\n"); Network frank; create_c_network(&frank); - // It's better to have more memory than needed - // Than less memory than needed - // Kong Fuzi - Network* nets = malloc(sizeof(Network) * world_size); - for s_in_slaves(world_size) create_c_network(nets + s); + int spr = number_of_slaves() * FSPC; // Slaves per round + int s; - IntQueue slave_queue; - queue_from_size(&slave_queue, world_size - P_SLAVE); + Network *nets = malloc(sizeof(Network) * spr); + int *handles = malloc(sizeof(int) * spr); - for i_in_range(COMM) { - for s_in_slaves(world_size) { - send_weights(&frank, sid(s), TAG_WEIGH); + for in_range(i, spr) create_c_network(nets + i); + for in_range(i, COMM) { + + for in_range(k, spr) { + MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + send_weights(&frank, s, TAG_WEIGH); + handles[k] = s; } - for s_in_slaves(world_size) { - recv_weights(nets + s, sid(s), TAG_WEIGH); + for in_range(k, spr) { + recv_weights(nets + k, handles[k], TAG_WEIGH); } - combo_c_net(&frank, nets, world_size - P_SLAVE); + combo_c_net(&frank, nets, spr); printf("Frank: %f\n", eval_net(&frank)); } free_network_contents(&frank); free(nets); } -Role map_node() { - int node; - MPI_Comm_rank(MPI_COMM_WORLD, &node); - if (node == P_READER) return DATA; - if (node == P_MASTER) return MASTER; - if (node >= P_SLAVE) return SLAVE; - - exit(1); // this is bad -} - int main (int argc, const char **argv) { MPI_Init(NULL, NULL);