the pipeline kinda works, you can even demo it
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -6,3 +6,4 @@ compile_commands.json
|
||||
build/
|
||||
cythoned/
|
||||
__pycache__/
|
||||
data*
|
||||
|
||||
16
bridge.pyx
16
bridge.pyx
@@ -67,16 +67,24 @@ cdef public long vocab_idx_of(Word* w):
|
||||
return -1
|
||||
|
||||
|
||||
cdef public void f_idx_list_to_print(float* f_idxs, size_t num):
|
||||
idxs = np.asarray(<float[:num]>f_idxs).astype(np.int)
|
||||
cdef public void _dbg_idx_list_to_print(long* f_idxs, size_t num):
|
||||
idxs = np.asarray(<long[:num]>f_idxs)
|
||||
cdef str pyuni = ' '.join(nn.inv_vocab[i] for i in idxs)
|
||||
print(pyuni)
|
||||
eprint(pyuni)
|
||||
|
||||
|
||||
cdef public void debug_print(object o):
|
||||
cdef public void _dbg_print(object o):
|
||||
eprint(o)
|
||||
|
||||
|
||||
cdef public void _dbg_print_cbow_batch(
|
||||
object net, float* batch, size_t bs
|
||||
):
|
||||
X_np, y_np = cbow_batch(net, batch, bs)
|
||||
eprint(X_np)
|
||||
eprint(y_np)
|
||||
|
||||
|
||||
cdef public void randidx(int* idx, size_t l, size_t how_much):
|
||||
i_np = np.random.choice(l, how_much, replace=False).astype(np.intc)
|
||||
memcpy(idx, PyArray_DATA(i_np), how_much * sizeof(int))
|
||||
|
||||
88
main.c
88
main.c
@@ -16,12 +16,12 @@
|
||||
#define TAG_IWORD 8
|
||||
#define TAG_INSTR 9
|
||||
|
||||
#define COMM 50
|
||||
#define COMM 500
|
||||
#define ITER 50
|
||||
#define BS 32
|
||||
#define BS 64
|
||||
#define EMB 20
|
||||
#define WIN 2
|
||||
#define FLPC 0.8
|
||||
#define FLPC 1
|
||||
|
||||
#define in_range(i, x) (size_t (i) = 0; (i) < (x); (i)++)
|
||||
// I am honestly VERY sorry for this but power corrupts even the best of us
|
||||
@@ -60,7 +60,7 @@ size_t number_of(Role what) {
|
||||
case TOKENIZER:
|
||||
if (g_argc < 2) {
|
||||
INFO_PRINTLN("NOT ENOUGH INPUTS!");
|
||||
exit(1);
|
||||
MPI_Abort(MPI_COMM_WORLD, 1);
|
||||
}
|
||||
return g_argc - 1;
|
||||
case FILTERER:
|
||||
@@ -91,7 +91,7 @@ int role_id_from_mpi_id(Role role, int mid) {
|
||||
int rid = mid - z;
|
||||
if (rid >= number_of(role) || rid < 0) {
|
||||
INFO_PRINTF("%d is not a %d\n", mid, role);
|
||||
exit(1);
|
||||
MPI_Abort(MPI_COMM_WORLD, 1);
|
||||
}
|
||||
return rid;
|
||||
}
|
||||
@@ -104,7 +104,19 @@ Role map_node() {
|
||||
base += number_of(r);
|
||||
}
|
||||
INFO_PRINTF("Something went wrong for node %d\n", node);
|
||||
exit(1); // this is bad
|
||||
MPI_Abort(MPI_COMM_WORLD, 1); // this is bad
|
||||
}
|
||||
|
||||
void announce_ready(int dest) {
|
||||
int me = my_mpi_id();
|
||||
MPI_Send(&me, 1, MPI_INT, dest, TAG_READY, MPI_COMM_WORLD);
|
||||
}
|
||||
|
||||
int wait_for_ready() {
|
||||
int ready;
|
||||
MPI_Recv(&ready, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD,
|
||||
MPI_STATUS_IGNORE);
|
||||
return ready;
|
||||
}
|
||||
|
||||
void free_word(Word* w) {
|
||||
@@ -132,50 +144,58 @@ int recv_word(Word* w, int src) {
|
||||
long len;
|
||||
MPI_Status stat;
|
||||
MPI_Recv(&len, 1, MPI_LONG, src, TAG_STLEN, MPI_COMM_WORLD,
|
||||
MPI_STATUS_IGNORE);
|
||||
&stat);
|
||||
int the_src = stat.MPI_SOURCE;
|
||||
if (w->mem < len + 1) {
|
||||
w->mem = len + 1;
|
||||
w->data = realloc(w->data, sizeof(char) * w->mem);
|
||||
}
|
||||
MPI_Recv(w->data, len + 1, MPI_CHAR, src, TAG_SWORD, MPI_COMM_WORLD,
|
||||
&stat);
|
||||
return stat.MPI_SOURCE;
|
||||
MPI_Recv(w->data, len + 1, MPI_CHAR, the_src, TAG_SWORD, MPI_COMM_WORLD,
|
||||
MPI_STATUS_IGNORE);
|
||||
return the_src;
|
||||
}
|
||||
|
||||
void tokenizer(const char* source) {
|
||||
INFO_PRINTF("Starting tokenizer %d\n", getpid());
|
||||
WordList wl = {0, 0, NULL};
|
||||
while (get_tokens(&wl, source)) {
|
||||
for in_range(i, wl.n_words) {
|
||||
// int tok = wait_for_ready();
|
||||
send_word(&wl.words[i], mpi_id_from_role_id(FILTERER, 0));
|
||||
}
|
||||
}
|
||||
Word terminator = {1, ""};
|
||||
send_word(&terminator, mpi_id_from_role_id(FILTERER, 0));
|
||||
free_wordlist(&wl);
|
||||
}
|
||||
|
||||
void filterer() {
|
||||
INFO_PRINTF("Starting filterer %d\n", getpid());
|
||||
Word w = {0, NULL};
|
||||
const size_t bufsize = 2 * WIN + 1;
|
||||
long* idx = malloc(bufsize * sizeof(long));
|
||||
size_t have = 0;
|
||||
const size_t num_streams = number_of(TOKENIZER);
|
||||
const size_t entry_size = 2 * WIN + 1;
|
||||
const size_t bufsize = num_streams * entry_size;
|
||||
|
||||
long* buffer = malloc(bufsize * sizeof(long));
|
||||
size_t* have = calloc(num_streams, sizeof(size_t));
|
||||
|
||||
int src = 0; // WLOG
|
||||
while (1) {
|
||||
while (have < bufsize) {
|
||||
recv_word(&w, role_id_from_mpi_id(TOKENIZER, 0));
|
||||
if (!strlen(w.data)) break;
|
||||
idx[have] = vocab_idx_of(&w);
|
||||
if (idx[have] != -1) have++;
|
||||
int stream_offs;
|
||||
while (have[src] != entry_size) {
|
||||
// src = rand() % num_streams;
|
||||
// announce_ready(role_id_from_mpi_id(TOKENIZER, src));
|
||||
src = recv_word(&w, MPI_ANY_SOURCE);
|
||||
src = role_id_from_mpi_id(TOKENIZER, src);
|
||||
stream_offs = src*entry_size;
|
||||
buffer[stream_offs + have[src]] = vocab_idx_of(&w);
|
||||
if (buffer[stream_offs + have[src]] != -1) have[src]++;
|
||||
}
|
||||
if (!strlen(w.data)) break;
|
||||
have = 0;
|
||||
MPI_Send(idx, bufsize, MPI_LONG, mpi_id_from_role_id(BATCHER, 0),
|
||||
have[src] = 0;
|
||||
MPI_Send(buffer + stream_offs, entry_size, MPI_LONG,
|
||||
mpi_id_from_role_id(BATCHER, 0),
|
||||
TAG_IWORD, MPI_COMM_WORLD);
|
||||
}
|
||||
idx[0] = -1;
|
||||
MPI_Send(idx, bufsize, MPI_LONG, mpi_id_from_role_id(BATCHER, 0),
|
||||
TAG_IWORD, MPI_COMM_WORLD);
|
||||
free_word(&w);
|
||||
free(idx);
|
||||
free(buffer);
|
||||
free(have);
|
||||
}
|
||||
|
||||
void batcher() {
|
||||
@@ -190,17 +210,15 @@ void batcher() {
|
||||
MPI_Recv(l_wid, entry_size, MPI_LONG,
|
||||
mpi_id_from_role_id(FILTERER, 0),
|
||||
TAG_IWORD, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
||||
if (l_wid[0] == -1) break;
|
||||
for in_range(c, entry_size) {
|
||||
batch[r*entry_size + c] = (float)l_wid[c];
|
||||
}
|
||||
}
|
||||
if (l_wid[0] == -1) break;
|
||||
INFO_PRINT(".");
|
||||
printf(".");
|
||||
MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD,
|
||||
MPI_STATUS_IGNORE);
|
||||
MPI_Send(batch, bufsize, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD);
|
||||
INFO_PRINTLN("!");
|
||||
printf("!\n");
|
||||
}
|
||||
free(l_wid);
|
||||
free(batch);
|
||||
@@ -242,7 +260,7 @@ void recv_weights(WeightList* wl, int src) {
|
||||
}
|
||||
|
||||
void learner() {
|
||||
INFO_PRINTF("Starting slave %d\n", getpid());
|
||||
INFO_PRINTF("Starting learner %d\n", getpid());
|
||||
int me = my_mpi_id();
|
||||
|
||||
PyObject* net = create_network(WIN, EMB);
|
||||
@@ -265,7 +283,6 @@ void learner() {
|
||||
MPI_STATUS_IGNORE);
|
||||
step_net(net, batch, BS);
|
||||
}
|
||||
// printf("%d net: %f\n", my_mpi_id(), eval_net(net));
|
||||
update_weightlist(&wl, net);
|
||||
send_weights(&wl, mpi_id_from_role_id(DISPATCHER, 0));
|
||||
}
|
||||
@@ -275,6 +292,7 @@ void learner() {
|
||||
}
|
||||
|
||||
void dispatcher() {
|
||||
INFO_PRINTF("Starting dispatcher %d\n", getpid());
|
||||
PyObject* frank = create_network(WIN, EMB);
|
||||
create_test_dataset(WIN);
|
||||
WeightList wl;
|
||||
@@ -302,7 +320,7 @@ void dispatcher() {
|
||||
}
|
||||
combo_weights(&wl, wls, lpr);
|
||||
set_net_weights(frank, &wl);
|
||||
// printf("Frank: %f\n", eval_net(frank));
|
||||
// INFO_PRINTF("Frank: %f\n", eval_net(frank));
|
||||
}
|
||||
Py_DECREF(frank);
|
||||
free_weightlist(&wl);
|
||||
|
||||
Reference in New Issue
Block a user