set up synchronous comm for many slaves
and also abusing the c preprocessor
This commit is contained in:
@@ -4,7 +4,7 @@ project(fedavg_mpi)
|
|||||||
find_package(MPI REQUIRED)
|
find_package(MPI REQUIRED)
|
||||||
find_package(Python3 COMPONENTS Development NumPy)
|
find_package(Python3 COMPONENTS Development NumPy)
|
||||||
|
|
||||||
add_executable(${PROJECT_NAME} main.c library.c)
|
add_executable(${PROJECT_NAME} main.c cythoned/library.c)
|
||||||
|
|
||||||
target_include_directories(${PROJECT_NAME} PRIVATE ${Python3_INCLUDE_DIRS})
|
target_include_directories(${PROJECT_NAME} PRIVATE ${Python3_INCLUDE_DIRS})
|
||||||
target_include_directories(${PROJECT_NAME} PRIVATE ${Python3_NumPy_INCLUDE_DIRS})
|
target_include_directories(${PROJECT_NAME} PRIVATE ${Python3_NumPy_INCLUDE_DIRS})
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ cdef public void create_c_network(Network* c_net):
|
|||||||
c_net.layers[i].ownmem = 1
|
c_net.layers[i].ownmem = 1
|
||||||
|
|
||||||
|
|
||||||
cdef public void frankenstein(Network* c_frank, Network* c_nets,
|
cdef public void combo_c_net(Network* c_frank, Network* c_nets,
|
||||||
size_t num_nets):
|
size_t num_nets):
|
||||||
"""ONE-LINER HOW BOUT THAT HUH."""
|
"""ONE-LINER HOW BOUT THAT HUH."""
|
||||||
combo_net(
|
combo_net(
|
||||||
|
|||||||
115
main.c
115
main.c
@@ -1,16 +1,27 @@
|
|||||||
#include "library.h"
|
#include "cythoned/library.h"
|
||||||
|
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <mpi.h>
|
#include <mpi.h>
|
||||||
|
|
||||||
#define P_READER 0
|
#define P_READER 0
|
||||||
#define P_SLAVE 1
|
#define P_MASTER 1
|
||||||
#define P_MASTER 2
|
#define P_SLAVE 2
|
||||||
|
|
||||||
|
#define TAG_IDGAF 0
|
||||||
|
#define TAG_BATCH 1
|
||||||
|
#define TAG_NETWK 2
|
||||||
|
#define TAG_WEIGH 2
|
||||||
|
|
||||||
#define COMM 500
|
#define COMM 500
|
||||||
#define ITER 32
|
#define ITER 40
|
||||||
#define BS 32
|
#define BS 50
|
||||||
|
|
||||||
|
#define sid(s) s + P_SLAVE
|
||||||
|
|
||||||
|
#define s_in_slaves(w) (size_t s = 0; s < w - P_SLAVE; s++)
|
||||||
|
#define i_in_range(x) (size_t i = 0; i < x; i++)
|
||||||
|
// I am honestly VERY sorry for this but power corrupts even the best of us
|
||||||
|
|
||||||
typedef enum{
|
typedef enum{
|
||||||
DATA,
|
DATA,
|
||||||
@@ -23,19 +34,53 @@ void data_reader() {
|
|||||||
printf("Start reader\n");
|
printf("Start reader\n");
|
||||||
size_t batch_numel = (784 + 10) * BS;
|
size_t batch_numel = (784 + 10) * BS;
|
||||||
float* batch = malloc(batch_numel * sizeof(float));
|
float* batch = malloc(batch_numel * sizeof(float));
|
||||||
|
int s = 0;
|
||||||
|
int num_slaves;
|
||||||
|
MPI_Comm_size(MPI_COMM_WORLD, &num_slaves);
|
||||||
|
num_slaves -= P_SLAVE;
|
||||||
while (1) {
|
while (1) {
|
||||||
mnist_batch(batch, BS);
|
mnist_batch(batch, BS);
|
||||||
MPI_Send(batch, batch_numel, MPI_FLOAT, P_SLAVE, 0, MPI_COMM_WORLD);
|
MPI_Send(batch, batch_numel, MPI_FLOAT, sid(s),
|
||||||
|
TAG_BATCH, MPI_COMM_WORLD);
|
||||||
|
s = (s + 1) % num_slaves;
|
||||||
}
|
}
|
||||||
free(batch);
|
free(batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void send_weights(const Network* c_net, int dest, int tag) {
|
||||||
|
// This assumes that the receiving end has a fully initialized network
|
||||||
|
// Of the same arch as `c_net`
|
||||||
|
for i_in_range(c_net->n_layers) {
|
||||||
|
long d0 = c_net->layers[i].shape[0];
|
||||||
|
long d1 = c_net->layers[i].shape[1];
|
||||||
|
MPI_Send(c_net->layers[i].W, d0 * d1, MPI_FLOAT, dest, tag,
|
||||||
|
MPI_COMM_WORLD);
|
||||||
|
MPI_Send(c_net->layers[i].b, d1, MPI_FLOAT, dest, tag,
|
||||||
|
MPI_COMM_WORLD);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void recv_weights(const Network* c_net, int src, int tag) {
|
||||||
|
// This assumes that the sender is going to send stuff that is going
|
||||||
|
// To fit exactly into the c_net
|
||||||
|
for i_in_range(c_net->n_layers) {
|
||||||
|
long d0 = c_net->layers[i].shape[0];
|
||||||
|
long d1 = c_net->layers[i].shape[1];
|
||||||
|
MPI_Recv(c_net->layers[i].W, d0 * d1, MPI_FLOAT, src, tag,
|
||||||
|
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
||||||
|
MPI_Recv(c_net->layers[i].b, d1, MPI_FLOAT, src, tag,
|
||||||
|
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void send_network(const Network* c_net, int dest, int tag) {
|
void send_network(const Network* c_net, int dest, int tag) {
|
||||||
// Send a network to the expecting destination
|
// Send a network to the expecting destination
|
||||||
// It's best to receive with `recv_network`
|
// It's best to receive with `recv_network`
|
||||||
size_t n_layers = c_net->n_layers;
|
size_t n_layers = c_net->n_layers;
|
||||||
MPI_Send(&n_layers, 1, MPI_LONG, dest, tag, MPI_COMM_WORLD);
|
MPI_Send(&n_layers, 1, MPI_LONG, dest, tag, MPI_COMM_WORLD);
|
||||||
for (size_t i = 0; i < n_layers; i++) {
|
for i_in_range(c_net->n_layers) {
|
||||||
long d0 = c_net->layers[i].shape[0];
|
long d0 = c_net->layers[i].shape[0];
|
||||||
long d1 = c_net->layers[i].shape[1];
|
long d1 = c_net->layers[i].shape[1];
|
||||||
MPI_Send(c_net->layers[i].shape, 2, MPI_LONG, dest, tag,
|
MPI_Send(c_net->layers[i].shape, 2, MPI_LONG, dest, tag,
|
||||||
@@ -48,11 +93,11 @@ void send_network(const Network* c_net, int dest, int tag) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void recv_network(Network* c_net, int src, int tag) {
|
void recv_network(Network* c_net, int src, int tag) {
|
||||||
// Creates a new network at c_net (all pointers will be lost so beware)
|
// c_net HAS TO BE a fresh empty Network struct
|
||||||
MPI_Recv(&c_net->n_layers, 1, MPI_LONG, src, tag, MPI_COMM_WORLD,
|
MPI_Recv(&c_net->n_layers, 1, MPI_LONG, src, tag, MPI_COMM_WORLD,
|
||||||
MPI_STATUS_IGNORE);
|
MPI_STATUS_IGNORE);
|
||||||
c_net->layers = malloc(sizeof(Dense) * c_net->n_layers);
|
c_net->layers = malloc(sizeof(Dense) * c_net->n_layers);
|
||||||
for (size_t i = 0; i < c_net->n_layers; i++) {
|
for i_in_range(c_net->n_layers) {
|
||||||
MPI_Recv(&c_net->layers[i].shape, 2, MPI_LONG, src, tag,
|
MPI_Recv(&c_net->layers[i].shape, 2, MPI_LONG, src, tag,
|
||||||
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
||||||
long d0 = c_net->layers[i].shape[0];
|
long d0 = c_net->layers[i].shape[0];
|
||||||
@@ -69,57 +114,69 @@ void recv_network(Network* c_net, int src, int tag) {
|
|||||||
|
|
||||||
void free_network_contents(Network* c_net) {
|
void free_network_contents(Network* c_net) {
|
||||||
// Cleans up the net
|
// Cleans up the net
|
||||||
for (size_t i = 0; i < c_net->n_layers; i++) {
|
for i_in_range(c_net->n_layers) {
|
||||||
if (c_net->layers[i].ownmem) {
|
if (c_net->layers[i].ownmem) {
|
||||||
free(c_net->layers[i].b);
|
free(c_net->layers[i].b);
|
||||||
free(c_net->layers[i].W);
|
free(c_net->layers[i].W);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (c_net->layers != NULL) {
|
||||||
free(c_net->layers);
|
free(c_net->layers);
|
||||||
c_net->layers = NULL; // So that you don't get any ideas
|
c_net->layers = NULL; // So that you don't get any ideas
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Receives weight updates and trains, sends learned weights back to master
|
// Receives weight updates and trains, sends learned weights back to master
|
||||||
void slave_node() {
|
void slave_node() {
|
||||||
printf("Start slave\n");
|
printf("Start slave\n");
|
||||||
Network net;
|
|
||||||
create_c_network(&net);
|
|
||||||
|
|
||||||
size_t batch_numel = (784 + 10) * BS;
|
size_t batch_numel = (784 + 10) * BS;
|
||||||
float* batch = malloc(batch_numel * sizeof(float));
|
float* batch = malloc(batch_numel * sizeof(float));
|
||||||
|
Network net;
|
||||||
|
create_c_network(&net);
|
||||||
|
|
||||||
for (int i = 0; i < COMM; i++) {
|
for i_in_range(COMM) {
|
||||||
char go;
|
recv_weights(&net, P_MASTER, TAG_NETWK);
|
||||||
MPI_Recv(&go, 1, MPI_CHAR, P_MASTER, MPI_ANY_TAG, MPI_COMM_WORLD,
|
|
||||||
MPI_STATUS_IGNORE);
|
|
||||||
for (int k = 0; k < ITER; k++) {
|
for (int k = 0; k < ITER; k++) {
|
||||||
MPI_Recv(batch, batch_numel, MPI_FLOAT, P_READER, MPI_ANY_TAG,
|
MPI_Recv(batch, batch_numel, MPI_FLOAT, P_READER, TAG_BATCH,
|
||||||
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
||||||
step_net(&net, batch, BS);
|
step_net(&net, batch, BS);
|
||||||
}
|
}
|
||||||
printf("Net: %f\n", eval_net(&net));
|
printf("Net: %f\n", eval_net(&net));
|
||||||
send_network(&net, P_MASTER, 0);
|
send_weights(&net, P_MASTER, TAG_WEIGH);
|
||||||
}
|
}
|
||||||
|
|
||||||
free(batch);
|
|
||||||
free_network_contents(&net);
|
free_network_contents(&net);
|
||||||
|
free(batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stores most up-to-date model, sends it to slaves for training
|
|
||||||
void master_node() {
|
void master_node() {
|
||||||
|
// Stores most up-to-date model, sends it to slaves for training
|
||||||
|
// First do it synchronously
|
||||||
|
// Need a "slave registry"
|
||||||
|
int world_size;
|
||||||
|
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
||||||
printf("Start master\n");
|
printf("Start master\n");
|
||||||
Network frank;
|
Network frank;
|
||||||
create_c_network(&frank);
|
create_c_network(&frank);
|
||||||
for (int i = 0; i < COMM; i++) {
|
|
||||||
char go;
|
// It's better to have more memory than needed
|
||||||
MPI_Send(&go, 1, MPI_CHAR, P_SLAVE, 0, MPI_COMM_WORLD);
|
// Than less memory than needed
|
||||||
Network net;
|
// Kong Fuzi
|
||||||
recv_network(&net, P_SLAVE, MPI_ANY_TAG);
|
Network* nets = malloc(sizeof(Network) * world_size);
|
||||||
frankenstein(&frank, &net, 1);
|
for s_in_slaves(world_size) create_c_network(nets + s);
|
||||||
free_network_contents(&net);
|
|
||||||
|
for i_in_range(COMM) {
|
||||||
|
for s_in_slaves(world_size) {
|
||||||
|
send_weights(&frank, sid(s), TAG_WEIGH);
|
||||||
|
}
|
||||||
|
for s_in_slaves(world_size) {
|
||||||
|
recv_weights(nets + s, sid(s), TAG_WEIGH);
|
||||||
|
}
|
||||||
|
combo_c_net(&frank, nets, world_size - P_SLAVE);
|
||||||
printf("Frank: %f\n", eval_net(&frank));
|
printf("Frank: %f\n", eval_net(&frank));
|
||||||
}
|
}
|
||||||
free_network_contents(&frank);
|
free_network_contents(&frank);
|
||||||
|
free(nets);
|
||||||
}
|
}
|
||||||
|
|
||||||
Role map_node() {
|
Role map_node() {
|
||||||
@@ -127,7 +184,7 @@ Role map_node() {
|
|||||||
MPI_Comm_rank(MPI_COMM_WORLD, &node);
|
MPI_Comm_rank(MPI_COMM_WORLD, &node);
|
||||||
if (node == P_READER) return DATA;
|
if (node == P_READER) return DATA;
|
||||||
if (node == P_MASTER) return MASTER;
|
if (node == P_MASTER) return MASTER;
|
||||||
if (node == P_SLAVE) return SLAVE;
|
if (node >= P_SLAVE) return SLAVE;
|
||||||
|
|
||||||
exit(1); // this is bad
|
exit(1); // this is bad
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user