termination seems to be implemented okayish

This commit is contained in:
2019-12-11 15:29:59 -08:00
parent 136e5938c3
commit 2e5042f0e3

139
main.c
View File

@@ -13,11 +13,11 @@
#define TAG_BREAK 5
#define TAG_STLEN 6
#define TAG_SWORD 7
#define TAG_IWORD 8
#define TAG_IWIND 8
#define TAG_INSTR 9
#define TAG_TERMT 10
#define COMM 1
#define COMM 25
#define ITER 690
#define BS 32
#define EMB 32
@@ -97,6 +97,10 @@ int role_id_from_mpi_id(Role role, int mid) {
return rid;
}
int my_role_id(Role role) {
return role_id_from_mpi_id(role, my_mpi_id());
}
Role map_node() {
int node = my_mpi_id();
size_t base = 0;
@@ -164,9 +168,19 @@ int recv_word(Word* w, int src) {
return the_src;
}
void send_window(long* window, size_t winsize, int dest) {
MPI_Send(window, winsize, MPI_LONG, dest, TAG_IWIND, MPI_COMM_WORLD);
}
void recv_window(long* window, size_t winsize, int src) {
MPI_Recv(window, winsize, MPI_LONG, src, TAG_IWIND, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
void tokenizer(const char* source) {
INFO_PRINTF("Starting tokenizer %d\n", getpid());
int rid = role_id_from_mpi_id(TOKENIZER, my_mpi_id());
int rid = my_role_id(TOKENIZER);
int next = mpi_id_from_role_id(FILTERER, rid);
WordList wl = {0, 0, NULL};
size_t sync_ctr = 0;
@@ -181,75 +195,90 @@ void tokenizer(const char* source) {
while (!stop && get_tokens(&wl, source)) {
for in_range(i, wl.n_words) {
if (sync_ctr == 10000) {
ssend_word(&wl.words[i], mpi_id_from_role_id(FILTERER, rid));
ssend_word(&wl.words[i], next);
sync_ctr = 0;
} else {
send_word(&wl.words[i], mpi_id_from_role_id(FILTERER, rid));
send_word(&wl.words[i], next);
}
sync_ctr++;
}
MPI_Test(&stop_req, &stop, MPI_STATUS_IGNORE);
}
free_wordlist(&wl);
send_word(&terminator, mpi_id_from_role_id(FILTERER, rid));
send_word(&terminator, next);
INFO_PRINTF("Finishing tokenizer %d\n", getpid());
}
void filterer() {
INFO_PRINTF("Starting filterer %d\n", getpid());
int rid = role_id_from_mpi_id(FILTERER, my_mpi_id());
int rid = my_role_id(FILTERER);
int prev = mpi_id_from_role_id(TOKENIZER, rid);
int next = mpi_id_from_role_id(BATCHER, rid);
Word w = {0, NULL};
const size_t entry_size = 2 * WIN + 1;
const size_t bufsize = entry_size;
long* buffer = malloc(bufsize * sizeof(long));
const size_t window_size = 2 * WIN + 1;
long* window = malloc(window_size * sizeof(long));
size_t have = 0;
while (1) {
while (have != entry_size) { // TODO FLATTEN PIPELINE
recv_word(&w, mpi_id_from_role_id(TOKENIZER, rid));
while (have != window_size) {
recv_word(&w, prev);
if (!strlen(w.data)) break;
buffer[have] = vocab_idx_of(&w);
if (buffer[have] != -1) have++;
window[have] = vocab_idx_of(&w);
if (window[have] != -1) have++;
}
if (!strlen(w.data)) break;
have = 0;
MPI_Send(buffer, entry_size, MPI_LONG,
mpi_id_from_role_id(BATCHER, rid),
TAG_IWORD, MPI_COMM_WORLD);
send_window(window, window_size, next);
}
window[0] = -1;
send_window(window, window_size, next);
free_word(&w);
free(buffer);
free(window);
INFO_PRINTF("Finishing filterer %d\n", getpid());
}
void batcher() {
int s = 0;
const size_t entry_size = 2 * WIN + 1;
const size_t bufsize = BS * entry_size;
INFO_PRINTF("Starting batcher %d\n", getpid());
int rid = my_role_id(BATCHER);
int prev = mpi_id_from_role_id(FILTERER, rid);
int learner_mpi_id = 0;
const size_t window_size = 2 * WIN + 1;
const size_t bufsize = BS * window_size;
float* batch = malloc(bufsize * sizeof(float));
long* l_wid = malloc(entry_size * sizeof(long));
long* l_wid = malloc(window_size * sizeof(long));
while (1) {
for in_range(r, BS) {
MPI_Recv(l_wid, entry_size, MPI_LONG,
mpi_id_from_role_id(FILTERER, 0),
TAG_IWORD, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for in_range(c, entry_size) {
batch[r*entry_size + c] = (float)l_wid[c];
recv_window(l_wid, window_size, prev);
if (l_wid[0] == -1) break;
for in_range(c, window_size) {
batch[r*window_size + c] = (float)l_wid[c];
}
}
MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD,
if (l_wid[0] == -1) break;
MPI_Recv(&learner_mpi_id, 1, MPI_INT, MPI_ANY_SOURCE,
TAG_READY, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
MPI_Send(batch, bufsize, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD);
if (learner_mpi_id == -1) break;
MPI_Send(batch, bufsize, MPI_FLOAT, learner_mpi_id, TAG_BATCH,
MPI_COMM_WORLD);
printf("!\n");
}
free(l_wid);
free(batch);
INFO_PRINTF("Finishing batcher %d\n", getpid());
}
void free_weightlist(WeightList* wl) {
@@ -290,37 +319,47 @@ void recv_weights(WeightList* wl, int src) {
void learner() {
INFO_PRINTF("Starting learner %d\n", getpid());
int me = my_mpi_id();
int batcher = mpi_id_from_role_id(BATCHER, 0);
int dispatcher = mpi_id_from_role_id(DISPATCHER, 0);
PyObject* net = create_network(WIN, EMB);
create_test_dataset(WIN);
WeightList wl;
init_weightlist_like(&wl, net);
size_t entry_size = (2*WIN + 1);
size_t bufsize = BS * entry_size;
size_t window_size = (2*WIN + 1);
size_t bufsize = BS * window_size;
float* batch = malloc(bufsize * sizeof(float));
for in_range(i, COMM) {
recv_weights(&wl, mpi_id_from_role_id(DISPATCHER, 0));
int go;
MPI_Recv(&go, 1, MPI_INT, dispatcher, TAG_INSTR, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
while (go != -1) {
recv_weights(&wl, dispatcher);
set_net_weights(net, &wl);
for in_range(k, ITER) {
MPI_Send(&me, 1, MPI_INT, mpi_id_from_role_id(BATCHER, 0),
TAG_READY, MPI_COMM_WORLD);
MPI_Recv(batch, bufsize, MPI_FLOAT,
mpi_id_from_role_id(BATCHER, 0), TAG_BATCH, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
MPI_Send(&me, 1, MPI_INT, batcher, TAG_READY, MPI_COMM_WORLD);
MPI_Recv(batch, bufsize, MPI_FLOAT, batcher, TAG_BATCH,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
step_net(net, batch, BS);
}
update_weightlist(&wl, net);
send_weights(&wl, mpi_id_from_role_id(DISPATCHER, 0));
send_weights(&wl, dispatcher);
MPI_Recv(&go, 1, MPI_INT, dispatcher, TAG_INSTR, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
MPI_Send(&go, 1, MPI_INT, batcher, TAG_READY, MPI_COMM_WORLD);
Py_DECREF(net);
free_weightlist(&wl);
free(batch);
INFO_PRINTF("Finishing learner %d\n", getpid());
}
void dispatcher() {
INFO_PRINTF("Starting dispatcher %d\n", getpid());
int go = 1;
PyObject* frank = create_network(WIN, EMB);
create_test_dataset(WIN);
WeightList wl;
@@ -340,13 +379,14 @@ void dispatcher() {
time_t start = time(NULL);
for in_range(i, COMM) {
randidx(round, number_of(LEARNER), lpr);
for in_range(k, lpr) {
// INFO_PRINTF(" %5d", round[k]);
send_weights(&wl, mpi_id_from_role_id(LEARNER, round[k]));
// Instruct learners to learn
int lrnr_mpi_id = mpi_id_from_role_id(LEARNER, round[k]);
MPI_Send(&go, 1, MPI_INT, lrnr_mpi_id, TAG_INSTR, MPI_COMM_WORLD);
send_weights(&wl, lrnr_mpi_id);
}
// INFO_PRINTLN("");
for in_range(k, lpr) {
// Collect the results
recv_weights(wls + k, mpi_id_from_role_id(LEARNER, round[k]));
}
combo_weights(&wl, wls, lpr);
@@ -356,9 +396,15 @@ void dispatcher() {
INFO_PRINTF("Round %ld, validation loss %f\n", i, crt_loss);
}
int stop = 1;
MPI_Send(&stop, 1, MPI_INT, mpi_id_from_role_id(TOKENIZER, 0), TAG_TERMT,
MPI_COMM_WORLD);
go = -1;
for in_range(t, number_of(TOKENIZER)) {
MPI_Send(&go, 1, MPI_INT, mpi_id_from_role_id(TOKENIZER, t),
TAG_TERMT, MPI_COMM_WORLD);
}
for in_range(l, number_of(LEARNER)) {
MPI_Send(&go, 1, MPI_INT, mpi_id_from_role_id(LEARNER, l),
TAG_INSTR, MPI_COMM_WORLD);
}
time_t finish = time(NULL);
float delta_t = finish - start;
@@ -372,6 +418,7 @@ void dispatcher() {
for in_range(i, lpr) free_weightlist(wls + i);
free(wls);
free(round);
INFO_PRINTF("Finishing dispatcher %d\n", getpid());
}
void visualizer() {