commit a80a3de4faa7648ebd6ee76195dc196f0640ab90 Author: Pavel Lutskov Date: Sun Nov 24 22:14:26 2019 -0800 it started working so commit it diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..45ad3b5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +.*.sw? +DS_Store +library.c +library.h +run +compile_commands.json +build/ diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..4d121f5 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,19 @@ +cmake_minimum_required(VERSION 3.5) +project(fedavg_mpi) + +find_package(MPI REQUIRED) +find_package(Python3 COMPONENTS Development NumPy) + +add_executable(${PROJECT_NAME} main.c library.c) + +target_include_directories(${PROJECT_NAME} PRIVATE ${Python3_INCLUDE_DIRS}) +target_include_directories(${PROJECT_NAME} PRIVATE ${Python3_NumPy_INCLUDE_DIRS}) +target_include_directories(${PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) + +target_link_libraries(${PROJECT_NAME} ${Python3_LIBRARIES}) +target_link_libraries(${PROJECT_NAME} ${MPI_C_LIBRARIES}) + +target_compile_options(${PROJECT_NAME} PRIVATE -Wall -g -std=c99) +add_compile_definitions(NPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION) + +set(CMAKE_EXPORT_COMPILE_COMMANDS on) diff --git a/library.pyx b/library.pyx new file mode 100644 index 0000000..6ba4858 --- /dev/null +++ b/library.pyx @@ -0,0 +1,72 @@ +cimport numpy as np +import numpy as np +import mynet as mn + + +ctr = [] +X_train, y_train, X_test, y_test = mn.load_mnist() + + +cdef public char * greeting(): + return f'The value is {3**3**3}'.encode('utf-8') + + +cdef public void debug_print(object o): + print(o.flags) + # print(o) + + +cdef public np.ndarray[np.float32_t, ndim=2, mode='c'] dot( + np.ndarray[np.float32_t, ndim=2, mode='c'] x, + np.ndarray[np.float32_t, ndim=2, mode='c'] y +): + return x @ y + + +cdef public np.ndarray[np.float32_t, ndim=2, mode='c'] predict( + object net, + np.ndarray[np.float32_t, ndim=2, mode='c'] X +): + try: + return net(X) + except Exception as e: + print(e) + + +cdef public object create_network(): + return mn.Network((784, 10), mn.relu, mn.sigmoid, mn.bin_x_entropy) + + +cdef public object combo_net(list nets): + return mn.combo_net(nets) + + +cdef public object make_like(object neta, object netb): + netb.be_like(neta) + + +cdef public void step_net( + object net, + np.ndarray[np.float32_t, ndim=2, mode='c'] batch +): + opt = mn.SGDOptimizer(lr=0.1) + net.step(batch[:, :784], batch[:, 784:], opt) + + +cdef public float eval_net( + object net +): + return net.evaluate(X_test, y_test, 'cls') + + +cdef public np.ndarray[np.float32_t, ndim=2, mode='c'] mnist_batch( + Py_ssize_t bs +): + idx = np.random.choice(len(X_train), bs, replace=False) + arr = np.concatenate([X_train[idx], y_train[idx]], axis=1) + return arr + +cdef public float arrsum( + np.ndarray[np.float32_t, ndim=2, mode='c'] a +): + return np.sum(a) diff --git a/main.c b/main.c new file mode 100644 index 0000000..631f73c --- /dev/null +++ b/main.c @@ -0,0 +1,89 @@ +#include +#include +#include + +#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION +#include + +#include "library.h" + +#define P_READER 0 +#define P_SLAVE 1 +#define P_MASTER 2 + +#define COMM 100 +#define ITER 20 +#define BS 50 + +// Reads some data and converts it to 2D float array +void data_reader() { + while (1) { + PyArrayObject* batch = mnist_batch(10); + + long* shape = PyArray_SHAPE(batch); + MPI_Send(shape, 2, MPI_LONG, P_SLAVE, 0, MPI_COMM_WORLD); + MPI_Send(PyArray_DATA(batch), PyArray_SIZE(batch), MPI_FLOAT, + P_SLAVE, 0, MPI_COMM_WORLD); + Py_DECREF(batch); + } +} + +// Receives weight updates and trains, sends learned weights back to master +void slave_node() { + PyObject* net = create_network(); + for (int i = 0; i < COMM; i++) { + char go; + MPI_Recv(&go, 1, MPI_CHAR, P_MASTER, MPI_ANY_TAG, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + for (int k = 0; k < ITER; k++) { + long shape[2]; + MPI_Recv(shape, 2, MPI_LONG, P_READER, MPI_ANY_TAG, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + long size = shape[0] * shape[1]; + float* data = malloc(shape[0] * shape[1] * sizeof(float)); + MPI_Recv(data, size, MPI_FLOAT, P_READER, MPI_ANY_TAG, + MPI_COMM_WORLD, MPI_STATUS_IGNORE); + PyArrayObject* batch = PyArray_SimpleNewFromData( + 2, shape, NPY_FLOAT32, data); + step_net(net, batch); + } + printf("%f\n", eval_net(net)); + } +} + +// Stores most up-to-date model, sends it to slaves for training +void master_node() { + for (int i = 0; i < COMM; i++) { + char go; + MPI_Send(&go, 1, MPI_CHAR, P_SLAVE, 0, MPI_COMM_WORLD); + } +} + +int main (int argc, const char **argv) { + int node; + MPI_Init(NULL, NULL); + MPI_Comm_rank(MPI_COMM_WORLD, &node); + + // Cython Boilerplate + PyImport_AppendInittab("library", PyInit_library); + Py_Initialize(); + import_array(); + PyRun_SimpleString("import sys\nsys.path.insert(0,'')"); + PyObject* library_module = PyImport_ImportModule("library"); + + // Actual Code + if (node == 0) { + data_reader(); + } + else if (node == 1) { + slave_node(); + } + else if (node == 2) { + master_node(); + } + + // Cython Finalizing Boilerplate + Py_DECREF(library_module); + Py_Finalize(); + MPI_Finalize(); +}