reached some mildly amusing result

of slaves having "independent" data sources and master still
training through them
This commit is contained in:
2019-11-27 17:37:40 -08:00
parent 5af90383f0
commit c4a21b6be8
2 changed files with 130 additions and 61 deletions

View File

@@ -27,7 +27,7 @@ ctypedef public struct Network:
Dense* layers; Dense* layers;
cdef public char * greeting(): cdef public char *greeting():
return f'The value is {3**3**3}'.encode('utf-8') return f'The value is {3**3**3}'.encode('utf-8')
@@ -52,6 +52,7 @@ cdef public void step_net(
cdef size_t in_dim = net.geometry[0] cdef size_t in_dim = net.geometry[0]
cdef size_t out_dim = net.geometry[-1] cdef size_t out_dim = net.geometry[-1]
batch = np.asarray(<float[:batch_size,:in_dim+out_dim]>batch_data) batch = np.asarray(<float[:batch_size,:in_dim+out_dim]>batch_data)
# print(np.argmax(batch[:, in_dim:], axis=1), flush=True)
net.step(batch[:, :in_dim], batch[:, in_dim:], opt) net.step(batch[:, :in_dim], batch[:, in_dim:], opt)
@@ -60,9 +61,17 @@ cdef public float eval_net(Network* c_net):
return net.evaluate(X_test, y_test, 'cls') return net.evaluate(X_test, y_test, 'cls')
cdef public void mnist_batch(float* batch, size_t bs): cdef public void mnist_batch(float* batch, size_t bs, int part, int total):
idx = np.random.choice(len(X_train), bs, replace=False) if total == 0:
arr = np.concatenate([X_train[idx], y_train[idx]], axis=1) X_pool, y_pool = X_train, y_train
else:
partsize = len(X_train) // total
X_pool = X_train[part*partsize:(part+1)*partsize]
y_pool = y_train[part*partsize:(part+1)*partsize]
idx = np.random.choice(len(X_pool), bs, replace=True)
arr = np.concatenate([X_pool[idx], y_pool[idx]], axis=1)
assert arr.flags['C_CONTIGUOUS']
memcpy(batch, <float*>PyArray_DATA(arr), arr.size*sizeof(float)) memcpy(batch, <float*>PyArray_DATA(arr), arr.size*sizeof(float))
@@ -76,6 +85,8 @@ cdef public void create_c_network(Network* c_net):
c_net.layers[i].shape[1] = d1 c_net.layers[i].shape[1] = d1
c_net.layers[i].W = <float*>malloc(sizeof(float) * d0 * d1) c_net.layers[i].W = <float*>malloc(sizeof(float) * d0 * d1)
c_net.layers[i].b = <float*>malloc(sizeof(float) * d1) c_net.layers[i].b = <float*>malloc(sizeof(float) * d1)
assert l.W.flags['C_CONTIGUOUS']
assert l.b.flags['C_CONTIGUOUS']
memcpy(c_net.layers[i].W, PyArray_DATA(l.W), sizeof(float) * d0 * d1) memcpy(c_net.layers[i].W, PyArray_DATA(l.W), sizeof(float) * d0 * d1)
memcpy(c_net.layers[i].b, PyArray_DATA(l.b), sizeof(float) * d1) memcpy(c_net.layers[i].b, PyArray_DATA(l.b), sizeof(float) * d1)
c_net.layers[i].ownmem = 1 c_net.layers[i].ownmem = 1
@@ -101,7 +112,7 @@ cdef object wrap_c_network(Network* c_net):
"""Create a thin wrapper not owning the memory.""" """Create a thin wrapper not owning the memory."""
net = create_network(init=False) net = create_network(init=False)
for i, l in enumerate(net.layers): for i, l in enumerate(net.layers):
d0, d1 = l.W.shape d0, d1 = c_net.layers[i].shape[0], c_net.layers[i].shape[1]
l.W = np.asarray(<float[:d0,:d1]>c_net.layers[i].W) l.W = np.asarray(<float[:d0,:d1]>c_net.layers[i].W)
l.b = np.asarray(<float[:d1]>c_net.layers[i].b) l.b = np.asarray(<float[:d1]>c_net.layers[i].b)
return net return net
@@ -114,7 +125,7 @@ def inspect_array(a):
def create_network(init=True): def create_network(init=True):
return mn.Network((784, 10), mn.relu, mn.sigmoid, mn.bin_x_entropy, return mn.Network((784, 30, 10), mn.relu, mn.sigmoid, mn.bin_x_entropy,
initialize=init) initialize=init)

168
main.c
View File

@@ -4,10 +4,6 @@
#include <stdlib.h> #include <stdlib.h>
#include <mpi.h> #include <mpi.h>
#define P_READER 0
#define P_MASTER 1
#define P_SLAVE 2
#define TAG_IDGAF 0 #define TAG_IDGAF 0
#define TAG_BATCH 1 #define TAG_BATCH 1
#define TAG_NETWK 2 #define TAG_NETWK 2
@@ -15,23 +11,25 @@
#define TAG_READY 4 #define TAG_READY 4
#define COMM 500 #define COMM 500
#define ITER 40 #define ITER 120
#define BS 50 #define BS 50
#define FSPC 0.2 #define FSPC 0.4
#define sid(s) s + P_SLAVE #define in_range(i, x) (size_t (i) = 0; (i) < (x); (i)++)
#define s_in_slaves(w) (size_t s = 0; s < w - P_SLAVE; s++)
#define i_in_range(x) (size_t i = 0; i < x; i++)
// I am honestly VERY sorry for this but power corrupts even the best of us // I am honestly VERY sorry for this but power corrupts even the best of us
#define INFO_PRINTF(fmt, ...) \
do { fprintf(stderr, fmt, __VA_ARGS__); } while(0)
#define INFO_PRINTLN(what) \
do { fprintf(stderr, "%s\n", what); } while(0)
typedef enum{ typedef enum{
DATA, DATA,
SLAVE, SLAVE,
MASTER MASTER
} Role; } Role;
typedef struct IntQueue IntQueue; typedef struct IntQueue IntQueue;
struct IntQueue { struct IntQueue {
int head; int head;
@@ -67,6 +65,67 @@ int queue_full(IntQueue *q) {
return ((q->tail + 1) % q->size) == q->head; return ((q->tail + 1) % q->size) == q->head;
} }
int number_of_nodes() {
int n;
MPI_Comm_size(MPI_COMM_WORLD, &n);
return n;
}
int number_of_masters() {
return 1;
}
int number_of_readers() {
return 1;
}
int number_of_slaves() {
return number_of_nodes() - number_of_masters() - number_of_readers();
}
int my_id() {
int i;
MPI_Comm_rank(MPI_COMM_WORLD, &i);
return i;
}
int master_id(int m) {
return m;
}
int reader_id(int r) {
return r + number_of_masters();
}
int slave_id(int s) {
return s + number_of_masters() + number_of_readers();
}
Role map_node() {
int node;
MPI_Comm_rank(MPI_COMM_WORLD, &node);
if (node >= reader_id(0) && node <= reader_id(number_of_readers()-1)) {
return DATA;
}
if (node >= master_id(0) && node <= master_id(number_of_masters()-1)) {
return MASTER;
}
if (node >= slave_id(0) && node <= slave_id(number_of_slaves()-1)) {
return SLAVE;
}
exit(1); // this is bad
}
int rid(int id, Role what) {
int z;
switch (what) {
case DATA: z = reader_id(0); break;
case SLAVE: z = slave_id(0); break;
case MASTER: z = master_id(0); break;
}
return id - z;
}
void data_reader() { void data_reader() {
// Reads some data and converts it to a float array // Reads some data and converts it to a float array
printf("Start reader\n"); printf("Start reader\n");
@@ -77,7 +136,7 @@ void data_reader() {
while (1) { while (1) {
MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD, MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); MPI_STATUS_IGNORE);
mnist_batch(batch, BS); mnist_batch(batch, BS, rid(s, SLAVE), number_of_slaves());
MPI_Send(batch, batch_numel, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD); MPI_Send(batch, batch_numel, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD);
} }
free(batch); free(batch);
@@ -86,7 +145,7 @@ void data_reader() {
void send_weights(const Network* c_net, int dest, int tag) { void send_weights(const Network* c_net, int dest, int tag) {
// This assumes that the receiving end has a fully initialized network // This assumes that the receiving end has a fully initialized network
// Of the same arch as `c_net` // Of the same arch as `c_net`
for i_in_range(c_net->n_layers) { for in_range(i, c_net->n_layers) {
long d0 = c_net->layers[i].shape[0]; long d0 = c_net->layers[i].shape[0];
long d1 = c_net->layers[i].shape[1]; long d1 = c_net->layers[i].shape[1];
MPI_Send(c_net->layers[i].W, d0 * d1, MPI_FLOAT, dest, tag, MPI_Send(c_net->layers[i].W, d0 * d1, MPI_FLOAT, dest, tag,
@@ -99,7 +158,7 @@ void send_weights(const Network* c_net, int dest, int tag) {
void recv_weights(const Network* c_net, int src, int tag) { void recv_weights(const Network* c_net, int src, int tag) {
// This assumes that the sender is going to send stuff that is going // This assumes that the sender is going to send stuff that is going
// To fit exactly into the c_net // To fit exactly into the c_net
for i_in_range(c_net->n_layers) { for in_range(i, c_net->n_layers) {
long d0 = c_net->layers[i].shape[0]; long d0 = c_net->layers[i].shape[0];
long d1 = c_net->layers[i].shape[1]; long d1 = c_net->layers[i].shape[1];
MPI_Recv(c_net->layers[i].W, d0 * d1, MPI_FLOAT, src, tag, MPI_Recv(c_net->layers[i].W, d0 * d1, MPI_FLOAT, src, tag,
@@ -114,7 +173,7 @@ void send_network(const Network* c_net, int dest, int tag) {
// It's best to receive with `recv_network` // It's best to receive with `recv_network`
size_t n_layers = c_net->n_layers; size_t n_layers = c_net->n_layers;
MPI_Send(&n_layers, 1, MPI_LONG, dest, tag, MPI_COMM_WORLD); MPI_Send(&n_layers, 1, MPI_LONG, dest, tag, MPI_COMM_WORLD);
for i_in_range(c_net->n_layers) { for in_range(i, c_net->n_layers) {
long d0 = c_net->layers[i].shape[0]; long d0 = c_net->layers[i].shape[0];
long d1 = c_net->layers[i].shape[1]; long d1 = c_net->layers[i].shape[1];
MPI_Send(c_net->layers[i].shape, 2, MPI_LONG, dest, tag, MPI_Send(c_net->layers[i].shape, 2, MPI_LONG, dest, tag,
@@ -131,7 +190,7 @@ void recv_network(Network* c_net, int src, int tag) {
MPI_Recv(&c_net->n_layers, 1, MPI_LONG, src, tag, MPI_COMM_WORLD, MPI_Recv(&c_net->n_layers, 1, MPI_LONG, src, tag, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); MPI_STATUS_IGNORE);
c_net->layers = malloc(sizeof(Dense) * c_net->n_layers); c_net->layers = malloc(sizeof(Dense) * c_net->n_layers);
for i_in_range(c_net->n_layers) { for in_range(i, c_net->n_layers) {
MPI_Recv(&c_net->layers[i].shape, 2, MPI_LONG, src, tag, MPI_Recv(&c_net->layers[i].shape, 2, MPI_LONG, src, tag,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_COMM_WORLD, MPI_STATUS_IGNORE);
long d0 = c_net->layers[i].shape[0]; long d0 = c_net->layers[i].shape[0];
@@ -148,7 +207,7 @@ void recv_network(Network* c_net, int src, int tag) {
void free_network_contents(Network* c_net) { void free_network_contents(Network* c_net) {
// Cleans up the net // Cleans up the net
for i_in_range(c_net->n_layers) { for in_range(i, c_net->n_layers) {
if (c_net->layers[i].ownmem) { if (c_net->layers[i].ownmem) {
free(c_net->layers[i].b); free(c_net->layers[i].b);
free(c_net->layers[i].W); free(c_net->layers[i].W);
@@ -160,8 +219,12 @@ void free_network_contents(Network* c_net) {
} }
} }
// Receives weight updates and trains, sends learned weights back to master
void slave_node() { void slave_node() {
// 0. Announce readiness?
// 1. Receive weights from master ([ ] has to know its master)
// 2. Request batch from reader ([ ] has to choose a reader)
// 3. Do computations
// 4. Send weights back to master
printf("Start slave\n"); printf("Start slave\n");
int me; int me;
@@ -172,67 +235,62 @@ void slave_node() {
Network net; Network net;
create_c_network(&net); create_c_network(&net);
for i_in_range(COMM) { for in_range(i, COMM) {
MPI_Send(&me, 1, MPI_INT, P_MASTER, TAG_READY, MPI_COMM_WORLD); // INFO_PRINTF("%d announcing itself\n", my_id());
recv_weights(&net, P_MASTER, TAG_NETWK); MPI_Send(&me, 1, MPI_INT, master_id(0), TAG_READY, MPI_COMM_WORLD);
for (int k = 0; k < ITER; k++) { // INFO_PRINTF("%d waitng for weights from %d\n", my_id(), master_id(0));
MPI_Send(&me, 1, MPI_INT, P_READER, TAG_READY, MPI_COMM_WORLD); recv_weights(&net, master_id(0), TAG_WEIGH);
MPI_Recv(batch, batch_numel, MPI_FLOAT, P_READER, TAG_BATCH, // INFO_PRINTF("%d an answer!\n", my_id());
for in_range(k, ITER) {
MPI_Send(&me, 1, MPI_INT, reader_id(0), TAG_READY, MPI_COMM_WORLD);
MPI_Recv(batch, batch_numel, MPI_FLOAT, reader_id(0), TAG_BATCH,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_COMM_WORLD, MPI_STATUS_IGNORE);
step_net(&net, batch, BS); step_net(&net, batch, BS);
} }
printf("Net: %f\n", eval_net(&net)); printf("%d net: %f\n", my_id(), eval_net(&net));
send_weights(&net, P_MASTER, TAG_WEIGH); send_weights(&net, master_id(0), TAG_WEIGH);
} }
free_network_contents(&net); free_network_contents(&net);
free(batch); free(batch);
} }
void master_node() { void master_node() {
// Stores most up-to-date model, sends it to slaves for training // 0. Initialize model
// First do it synchronously
// Need a "slave registry"
printf("Start master\n");
int world_size; // 1. Send it to some slaves for processing (synchronous)
MPI_Comm_size(MPI_COMM_WORLD, &world_size); // 2. Receive weights back (synchronous)
// 3. Average the weights
printf("Start master\n");
Network frank; Network frank;
create_c_network(&frank); create_c_network(&frank);
// It's better to have more memory than needed int spr = number_of_slaves() * FSPC; // Slaves per round
// Than less memory than needed int s;
// Kong Fuzi
Network* nets = malloc(sizeof(Network) * world_size);
for s_in_slaves(world_size) create_c_network(nets + s);
IntQueue slave_queue; Network *nets = malloc(sizeof(Network) * spr);
queue_from_size(&slave_queue, world_size - P_SLAVE); int *handles = malloc(sizeof(int) * spr);
for i_in_range(COMM) { for in_range(i, spr) create_c_network(nets + i);
for s_in_slaves(world_size) { for in_range(i, COMM) {
send_weights(&frank, sid(s), TAG_WEIGH);
for in_range(k, spr) {
MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
send_weights(&frank, s, TAG_WEIGH);
handles[k] = s;
} }
for s_in_slaves(world_size) { for in_range(k, spr) {
recv_weights(nets + s, sid(s), TAG_WEIGH); recv_weights(nets + k, handles[k], TAG_WEIGH);
} }
combo_c_net(&frank, nets, world_size - P_SLAVE); combo_c_net(&frank, nets, spr);
printf("Frank: %f\n", eval_net(&frank)); printf("Frank: %f\n", eval_net(&frank));
} }
free_network_contents(&frank); free_network_contents(&frank);
free(nets); free(nets);
} }
Role map_node() {
int node;
MPI_Comm_rank(MPI_COMM_WORLD, &node);
if (node == P_READER) return DATA;
if (node == P_MASTER) return MASTER;
if (node >= P_SLAVE) return SLAVE;
exit(1); // this is bad
}
int main (int argc, const char **argv) { int main (int argc, const char **argv) {
MPI_Init(NULL, NULL); MPI_Init(NULL, NULL);