added batcher and even made it twice as fast

as it was a minute ago
This commit is contained in:
2019-11-30 22:40:23 -08:00
parent 101248965c
commit 7409eca38b
3 changed files with 38 additions and 22 deletions

View File

@@ -58,9 +58,9 @@ cdef public int get_tokens(WordList* wl, const char *filename):
cdef public long vocab_idx_of(Word* w): cdef public long vocab_idx_of(Word* w):
word = w.data.decode('utf-8') word = w.data.decode('utf-8')
if word.lower() in nn.vocab: try:
return nn.vocab.index(word.lower()) return nn.vocab.index(word)
else: except ValueError:
return -1 return -1

View File

@@ -9,7 +9,7 @@ from mynet import load_mnist, onehot
def word_tokenize(s: str): def word_tokenize(s: str):
l = ''.join(c if c.isalpha() else ' ' for c in s) l = ''.join(c.lower() if c.isalpha() else ' ' for c in s)
return l.split() return l.split()
@@ -47,7 +47,7 @@ def create_cbow_network(win, vocab, embed):
def token_generator(filename): def token_generator(filename):
with open(filename) as f: with open(filename) as f:
for l in f.readlines(500): for l in f.readlines():
if not l.isspace(): if not l.isspace():
tok = word_tokenize(l) tok = word_tokenize(l)
if tok: if tok:

50
main.c
View File

@@ -17,7 +17,7 @@
#define COMM 100 #define COMM 100
#define ITER 20 #define ITER 20
#define BS 50 #define BS 20
#define EMB 20 #define EMB 20
#define WIN 2 #define WIN 2
#define FSPC 1 #define FSPC 1
@@ -156,41 +156,54 @@ void tokenizer(const char* source) {
void filterer() { void filterer() {
Word w = {0, NULL}; Word w = {0, NULL};
long idx;
while (1) { while (1) {
recv_word(&w, role_id_from_mpi_id(TOKENIZER, 0)); recv_word(&w, role_id_from_mpi_id(TOKENIZER, 0));
if (!strlen(w.data)) { if (!strlen(w.data)) {
break; break;
} }
INFO_PRINTF("%s: ", w.data); // INFO_PRINTF("%s: ", w.data);
long idx = vocab_idx_of(&w); idx = vocab_idx_of(&w);
INFO_PRINTF("%ld\n", idx); // INFO_PRINTF("%ld\n", idx);
// if (idx != -1) { if (idx != -1) {
// MPI_Send(&idx, 1, MPI_LONG, mpi_id_from_role_id(BATCHER, 0), MPI_Send(&idx, 1, MPI_LONG, mpi_id_from_role_id(BATCHER, 0),
// TAG_IWORD, MPI_COMM_WORLD); TAG_IWORD, MPI_COMM_WORLD);
// } }
} }
idx = -1;
MPI_Send(&idx, 1, MPI_LONG, mpi_id_from_role_id(BATCHER, 0),
TAG_IWORD, MPI_COMM_WORLD);
free_word(&w); free_word(&w);
} }
void batcher() { void batcher() {
// Reads some data and converts it to a float array // Reads some data and converts it to a float array
INFO_PRINTF("Starting batcher %d\n", getpid()); // INFO_PRINTF("Starting batcher %d\n", getpid());
int s = 0; // int s = 0;
const size_t n_words = BS + WIN + WIN; const size_t n_words = BS + WIN + WIN;
float* f_widx = malloc(n_words * sizeof(float)); float* f_widx = malloc(n_words * sizeof(float));
long l_wid = 0;
while (l_wid != -1) {
while (s != -1) {
for in_range(i, n_words) { for in_range(i, n_words) {
long l_wid; MPI_Recv(&l_wid, 1, MPI_LONG, mpi_id_from_role_id(FILTERER, 0),
MPI_Recv(&l_wid, 1, MPI_LONG, role_id_from_mpi_id(FILTERER, 0),
TAG_IWORD, MPI_COMM_WORLD, MPI_STATUS_IGNORE); TAG_IWORD, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if (l_wid == -1) break;
f_widx[i] = (float)l_wid; f_widx[i] = (float)l_wid;
} }
MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD, if (l_wid == -1) break;
MPI_STATUS_IGNORE);
if (s != -1) { for in_range(i, n_words) {
MPI_Send(f_widx, n_words, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD); INFO_PRINTF("%5.0f ", f_widx[i]);
} }
INFO_PRINTLN("");
// MPI_Recv(&s, 1, MPI_INT, MPI_ANY_SOURCE, TAG_READY, MPI_COMM_WORLD,
// MPI_STATUS_IGNORE);
// if (s != -1) {
// MPI_Send(f_widx, n_words, MPI_FLOAT, s, TAG_BATCH, MPI_COMM_WORLD);
// }
} }
free(f_widx); free(f_widx);
} }
@@ -333,6 +346,9 @@ int main (int argc, const char **argv) {
case FILTERER: case FILTERER:
filterer(); filterer();
break; break;
case BATCHER:
batcher();
break;
default: default:
INFO_PRINTLN("DYING HORRIBLY!"); INFO_PRINTLN("DYING HORRIBLY!");
// case SLAVE: slave_node(); break; // case SLAVE: slave_node(); break;