\documentclass{article} \usepackage[letterpaper, margin=1in]{geometry} \usepackage[colorlinks]{hyperref} \usepackage{graphicx} \usepackage{listings} \lstset{basicstyle=\ttfamily} \renewcommand{\floatpagefraction}{.8} \title{Distributed Natural Language Processing with MPI and Python} \author{Pavel Lutskov for CPSC 521 @ UBC} \begin{document} \maketitle \section{Introduction} Natural language processing (NLP) is a field of computer science with applications such as digital assistants or machine translation. A typical NLP application consists of different stages of data processing forming a pipeline, the stages of which may be executed in parallel. Furthermore, individual pipeline stages involving complex data intensive NLP algorithms, such as word embedding calculation, may also benefit from parallelization. Finally, the abundance of the textual data distributed over the Internet motivates implementation of NLP algorithms in a distributed fashion. One of the established frameworks for distributed computing is the MPI~\cite{mpich} library for the C language. However, because of the complexity of the NLP algorithms, it is infeasible to implement them in C. Therefore, the idea of this project was to interface the existing Python libraries for NLP and machine learning with C code and to leverage the MPI library for parallelization and distribution of computations. The possible milestones of the project were initially identified as follows: \begin{itemize} \item Investigating the possibility of passing data and calling simple Python routines from C. \item Writing pipeline stages in C with help of NLTK~\cite{nltk} framework. \item Parallelizing individual stages with MPI. \item Implementing a data intensive algorithm with parallel stage execution (e.g. large scale word embedding computation). \item Benchmarking the parallelized implementation against a sequential Python program. \end{itemize} However, early on it became apparent that the Python \verb|multiprocessing| module, which is used internally by NLTK, causes various conflicts when incorporating the Python interpreter into a C application. For this reason, NLTK had to be abandoned, and the focus of the project was shifted towards the distributed Deep Learning-based computation of word embeddings with the help of TensorFlow~\cite{tensorflow} framework. \section{Architecture Overview} The system implemented during the work on this project computes word embeddings for a given vocabulary based on a user-supplied text corpus using the CBOW approach proposed~\cite{cbow-skip-gram} by Mikolov et al.\@ in 2013. This approach involves training a neural network on unstructured textual data to perform some proxy task. The resulting embedding matrix is the weight matrix of the first layer of the trained neural network. The text data, before being supplied to the neural network, has to pass several preprocessing stages. These stages, as implemented in this project, form an \textit{input pipeline}, which is depicted in \autoref{fig:pipeline}. First, the pipeline node called \textit{Tokenizer} reads a character stream from a text file. This node is responsible for replacing all non-ASCII alphabetic characters in the stream with whitespace, normalizing the stream by setting all remaining alphabetic characters to lowercase, and finally splitting the stream into tokens (words) and passing the words one-by-one to the next pipeline stage. \begin{figure} \centering \includegraphics[width=0.7\linewidth]{fig/input_pipeline.pdf} \caption{An Input Pipeline in the System} \label{fig:pipeline} \end{figure} The next pipeline stage is filtering, for which the \textit{Filter} node is responsible. When computing word embeddings using the CBOW model, only those words can be used, that are present in the training vocabulary. Furthermore, the neural network doesn't accept raw text as input, but requires the words to be encoded with an integer index corresponding to the word's position in the vocabulary. Finally, the CBOW network doesn't process individual words, but operates on \textit{context windows} of word indices. Therefore, the task of the \textit{Filter} node is to remove all the words from the pipeline that are not in the training vocabulary, replace the words with integer indices, and, finally, to assemble the indices into a context window. As soon as a context window is filled it is sent down the pipeline for training batch assembly. In the system implemented in this project a context window of size 5 is used. In the final stage of the input pipeline, the node called \textit{Batcher} accumulates the context windows into batches, which can then be requested by a \textit{Learner} node containing the neural network for the actual neural network training. The other dimension of the parallelism employed in this system is the distributed neural network training. In this project, an approach proposed~\cite{fedavg} in 2016 by McMahan et al.\@ is used. The idea is to distribute a copy of a neural network to a number of independent workers, which would then separately perform several training iterations, possibly based on their individual independent training data. The learned neural network weights are then collected from the workers, a new model is computed by taking the arithmetic average of the gathered weights, and then this neural network is distributed to the workers for a new training round. The assumption behind this architecture is that individually each worker will only need to perform a fraction of training iterations for the combined model to achieve the desired performance, compared to a case when only a single neural network is trained sequentially. In the presented system, there is one central node, called the \textit{Dispatcher}, that is responsible for storing the model weights, distributing the weights to the \textit{Learner} nodes, which perform the actual training, and collecting the weights at the end of a training round and computing their average. \autoref{fig:modes} demonstrates that the system allows for each \textit{Learner} to have its own input pipeline, or for one single input pipeline to be shared among all Learners, or for some intermediate configuration. However, it is not currently possible for one Learner to access more than one input pipeline. \begin{figure}[h] \centering \includegraphics[width=\linewidth]{fig/modes.pdf} \caption{Possible Pipeline Configurations} \label{fig:modes} \end{figure} \section{Implementation Details} \subsection{Overview} The application logic for the project is split across three files: \verb|main.c|, \verb|bridge.pyx| and \verb|library.py|. In the \verb|main.c| file, the overall system architecture is defined, the communication between nodes is implemented with the help of the MPI library, and, finally, the current execution state, such as the current model weights, is stored and managed. This project was tested using the MPICH 3.3 library~\cite{mpich} implementing the MPI standard. The neural network training algorithms, as well as algorithms for stream tokenization and filtering are implemented in the \verb|library.py| file. This file targets Python 3.6 and uses the libraries NumPy~\cite{numpy} 1.16 for general numerical computations and TensorFlow 1.14 for Deep Learning, as well as several Python standard library facilities. Finally, the file \verb|bridge.pyx| provides interface functions for the C code to access the Python functionality, thus creating a bridge between the algorithms and the system aspects. In a \verb|.pyx| file, C and Python code can be mixed rather freely, with occasional use of some special syntax. This file is translated by the Cython~\cite{cython} framework into \verb|bridge.c| and \verb|bridge.h| files. The \verb|bridge.c| is then used as a compilation unit for the final executable, and the \verb|bridge.h| is included into the \verb|main.c| as a header file. In order for the compilation to succeed, the compiler needs to be pointed towards the Python header files, and, since NumPy code is used in \verb|bridge.pyx|, to the NumPy header files. Furthermore, the application needs to be linked against the Python dynamic libraries, which results in the Python interpreter being embedded into the final executable. In order to simplify the compilation process and to make the codebase more portable, the build system Meson~\cite{meson} was used in this project to facilitate building. \subsection{Running the Application} \label{ssec:running} To run this system, you will need the following software: \begin{itemize} \item A recent macOS or Linux; \item A recent compiler, \textit{GCC} or \textit{clang}; \item \textit{MPICH} 3; \item \textit{Python} $\geq3.6$ with headers and libraries (e.g.\@ on Ubuntu you need to install \verb|python3-dev|); \item \textit{Meson}, \textit{Cython} and \textit{ninja} for building; \item \textit{TensorFlow} 1.14, \textit{Numpy} 1.16; \end{itemize} The application can then be built from the repository root by issuing the following command: \begin{lstlisting} meson build && (cd build && ninja) \end{lstlisting} Then, the program expects to be run from the repository root and for a directory named \verb|config| to be present in the repository root. This directory has to contain the following three files: \begin{itemize} \item \verb|vocab.txt| --- This file will contain the vocabulary words, for which the embeddings will be learned. These words need to be whitespace or newline separated, and only contain alphabetic lowercase ASCII characters. \item \verb|test.txt| --- This file contains the testing dataset of context windows, based on which the training performance of the network will be tracked. A context window of size 5 is used in the project, so this file has to contain 5 whitespace separated words per line. The third word in each line is the target word, and other words are the surrounding context. Only the words are allowed here, that are present in \verb|vocab.txt|. \item \verb|cfg.json| --- This file contains several key--value pairs for configuration of the training procedure: \begin{itemize} \item \verb|"data_name"| --- The name of the dataset that is used to train the network, can an alphanumeric string of your choice. \item \verb|"bpe"| --- Batches per Epoch, the number of independent iterations each Learner will perform before sending the weights back to the Dispatcher. \item \verb|"bs"| --- Batch Size, the number of context windows in a training batch. \item \verb|"target"| --- The targeted value of the neural network loss function evaluated on the testing dataset. As soon as this value is reached, the program will stop training and exit. \end{itemize} \end{itemize} Once these files have been created, the program can be run from the repository root by issuing the following command: \begin{lstlisting} mpiexec -n NUM_PROC ./build/fedavg_mpi /path/to/dataset/text{1,2,3} \end{lstlisting} For each text file passed as an argument, the system will create an input pipeline, consisting of 3 nodes (Tokenizer, Filter, Batcher). Furthermore, each pipeline needs at least one Learner. There also needs to be one Dispatcher node for the whole application. Therefore, the formula for the minimum number of processes to be requested from \verb|mpiexec| looks like the following: \begin{lstlisting} NUM_PROC >= (4 * num_text_files) + 1 \end{lstlisting} To figure out how many Learners will be created, the following formula can be used: \begin{lstlisting} num_learners = NUM_PROC - 1 - (3 * num_text_files) \end{lstlisting} During running, the program will create the folder \verb|trained| in the repository root, if it doesn't already exist, and will save there after each training round the weights of the neural network in form of an HDF5 file, and also separately the embedding matrix, which is a whitespace separated CSV file with rows representing the embedding vectors and having the same order as the words in the \verb|config/vocab.txt| file. The embedding vectors are hard-coded to have 32 dimensions. \subsection{Component Implementation} \paragraph{Configuration Reading} The files in the \verb|config/| directory are read by the \verb|library.py| module on start-up, and the vocabulary, the test dataset and the parameters of training are stored as global module objects. The \verb|bridge.pyx| then imports the \verb|library.py| module and defines several C public API functions for the \verb|main.c| code to access the configuration parameters, or to perform a word index lookup or evaluate a neural network based on the test dataset. \paragraph{Tokenizer} A Tokenizer node is implemented in the \verb|tokenizer| function in the \verb|main.c| file, which receives as an argument the path to a text file, from which the tokens will be read. It then calls a function \verb|get_tokens(WordList* wl, const char* filename)|, defined in the \verb|bridge.pyx| file. The \verb|WordList| structure is a dynamically growable list of \verb|Word| structs, that records the number of \verb|Word|s in the list as well as the memory available for storing the \verb|Word|s. A \verb|Word| structure is a wrapper around the C \verb|char*|, keeping track of the memory allocated to the pointer. The function \verb|get_tokens| consults a global dictionary contained in \verb|bridge.pyx| that keeps track of the file names for which a token generator already exists. If the generator for the file was not yet created, or if it is already empty, then a new generator is created, by calling the \verb|token_generator(filename)| function, defined in \verb|library.py|, which returns the generator that yields a list of tokens from a line in the file, line by line. A list of words is then queried from the generator, and the \verb|WordList| structure is populated with the words from the list, expanding the memory allocated to it if needed. The \verb|tokenizer| function then sends the \verb|Word|s from the \verb|WordList| one-by-one to the Filter node, and as soon as all words are sent it calls \verb|get_tokens| again. In the current implementation the Tokenizer will loop on the input data until it receives a signal from the Dispatcher to stop. After this, it will send an empty \verb|Word| down the pipeline to inform the Filter and the Batcher to stop too. \paragraph{Filter} A Filter node, implemented in \verb|filter| function in \verb|main.c| receives the \verb|Word|s one by one from the Tokenizer and looks up their indices in the vocabulary by calling the \verb|vocab_idx_of(Word* w)| function defined in \verb|bridge.pyx|. That function performs a dictionary lookup for the word, based on the \verb|config/vocab.txt| file, and returns its index on success or \verb|-1| if the word is not known. The Filter will assemble the indices in a \verb|long* window| variable until enough words are received to send the context window to the Batcher. If a word received from the Tokenizer is empty, the Filter sets the first element in the context window to \verb|-1| and sends the window to the Batcher for termination. \paragraph{Batcher} A Batcher is a rather simple pure C routine, that first assembles the context windows into a batch, simultaneously converting \verb|long| into \verb|float|, and then waits for some Learner to announce itself. Once it receives a signal from a Learner it responds with a batch and starts assembling the next batch. Since this node may receive signals from both Filter and Learner, it also may need to receive termination signals from both in order to avoid waiting for a signal from a finished process. Therefore, if the first element of the received window from the Tokenizer is \verb|-1|, or if the Learner sends \verb|-1| when announcing itself, then the Batcher will terminate immediately. \paragraph{Learner} A Learner, implemented in \verb|learner| function in \verb|main.c| first creates a TensorFlow neural network object and stores the network as a \verb|PyObject*|. It also initializes a C \verb|WeightList| struct to store the network weights and to serve as a buffer for communication with the Dispatcher. It then waits for the Dispatcher to announce a new training round, after which the Dispatcher will send the weights and the Learner will receive the weights into the \verb|WeightList| struct. Since a \verb|WeightList| has a rather complex structure, a pair of functions \verb|send_weights| and \verb|recv_weights| are used for communicating the weights. Then, the Learner will use the \verb|WeightList| to set the neural network weights, by employing the \verb|set_net_weights| function defined in \verb|bridge.pyx|. This is one of the cases where it is particularly convenient to use Cython, since raw C memory pointers can be easily converted to \verb|NumPy| arrays, which one then can directly use to set the weights of a TensorFlow network. Then, the Learner will perform a number of training iterations, specified by \verb|"bpe"| key in \verb|config/cfg.json| file. For each iteration, the Learner will send its MPI id to its designated Batcher and will receive a batch in form of a \verb|float*|. This \verb|float*|, together with the \verb|PyObject*| network object can be passed to the \verb|step_net| Cython function to perform one step of training. This function, again, leverages the ease of converting C data into NumPy arrays in Cython. Finally, after all iterations, the weights of the network will be written to the \verb|WeightList| by a Cython routine \verb|update_weightlist| and the \verb|WeightList| will be sent back to the Dispatcher, and the Learner will wait for the signal to start the next training round. If it instead receives a signal to stop training, then it will send a \verb|-1| to its designated Batcher and terminate. \paragraph{Dispatcher} The Dispatcher also initializes a neural network and a \verb|WeightList| structure using the same procedure as the Learner. This network will serve as the single source of truth for the whole application. For each training round the Dispatcher will send out the \verb|WeightList| to the Learners, and upon receiving all the \verb|WeightList|s back from the Learners will compute their arithmetic element-wise average and store it in its own \verb|WeightList| structure, using the function \verb|combo_weights| from \verb|bridge.pyx|. This updated \verb|WeightList| will also be assigned to the Dispatcher's network, after which the loss of the network will be evaluated based on the testing dataset from the \verb|config/test.txt|. After each iteration the network weights and the embedding matrix will be saved, as described in \autoref{ssec:running}. These iterations will continue until the loss is below the \verb|"target"|, defined in \verb|config/cfg.json|. In this case instead of the signal to start the training round, the Dispatcher will send a \verb|-1| to all Tokenizers and Learners, so that all pipelines can be properly halted. After this the Dispatcher will compute and print some run statistics and exit. \section{Evaluation} The main focus of evaluation was to determine if executing several neural network training nodes in parallel can speed-up the training process. The employed approach was to define a \textit{target loss} that the network has to achieve and then to measure \textit{the number of context windows} that each Learner node has to process and, secondarily, the time it takes for the system to reach the target. The motivation behind this approach is that although the total number of training windows consumed by the system is the number of windows for each Learner times the number of Learners, the Learners process their windows in parallel, thus the longest computation path is as long as the number of windows that each Learner processes, which is a reasonable approximation for parallel performance. Moreover, the tests have shown that Learners dominate the running time (the pipeline with a single Learner could process around 45 batches/s, but over 500 batches/s when the call to the training function in the Learner was commented out), therefore the number of context windows processed by Learners is the most important parameter for the overall performance. It is also possible to count the processed batches and not the context windows, however it may be interesting to compare the influence of the number of the context windows in a batch (i.e.\@ the \textit{batch size}) on the training performance, such that e.g.\@ increasing the batch size might actually reduce the amount of data needed for training. The wall time was only used as a secondary measure, since due to time constraints and software incompatibility it was not possible to launch the system on the computing cluster, so the tests had to be performed on a laptop with a modest double core 1.3 GHz CPU, which means that using more than 2 Learner nodes would essentially result in sequential simulation of the parallel processing, thus yielding no improvements in processing time. The evaluations were performed on two datasets. The first one being the book ``Moby Dick'' by Herman Melville (approx.\@ 200k words), obtained from the Project Gutenberg~\cite{gutenberg}, using the API provided by the NLTK toolkit. The vocabulary used for this dataset are all words from the book excluding English stop words, as defined by NLTK. The test part for this dataset were a 1000 randomly selected context windows from the book. Another dataset was a part of a recent English Wikipedia dump~\cite{wikidump} (approx.\@ 90M words), which was transformed into plain text using the WikiExtractor~\cite{wikiextractor} tool. For this dataset the vocabulary is the list of 10000 most frequently used English words, obtained from~\cite{10k-words}, also excluding the stop words. As a test data, 5000 context windows were randomly sampled from the dump file. The test configurations were: \begin{itemize} \item a single pipeline with 1, 2, 4, 8, 12 Learners (up to 17 total processes); \item or individual pipelines for 1, 2, 4, 8 Learners, each reading a separate part of a dataset (up to 33 total processes). \end{itemize} For the smaller of the two datasets the target was set to \verb|8.4|, and it can be observed in \autoref{fig:datasets}, that modest speedups can be achieved by employing up to 8 Learners, with the system maxing out on 2.4x speed-up. Furthermore, a \mbox{2 Learner -- 2 Pipeline} configuration training independently on two different halves of the book never even reaches the target. A possible explanation for this is that the ``Moby Dick'' book is too small for multiple Learners to have sufficient data to train on. For the larger dataset with the target set to \verb|8.3|, however, the results were more promising, as can be seen in \autoref{fig:datasets} and \autoref{fig:speedups}. Using 2 Learners instead of 1 resulted in nearly linear reduction of both the amount of data consumed by each Learner (1.95x) and time to target (1.94x). This result also validates the use of the number of context windows consumed by each Learner as a proxy for system performance, since scaling within the number of available cores results in an almost perfect correlation between the amount of data per Learner and the wall time. Going from 2 to 4 Learners decreases the amount of data per Learner by another 2x, with the wall time remaining roughly the same, demonstrating the core depletion on the laptop. Further increasing the number of Learner nodes results in observable, but sub-linear speedups, with the 12 Learner System using 7x less data per Learner to achieve the target loss of \verb|8.3|. This decrease in gains can probably be linked to the deficiencies of the neural network model being used, and thus, to achieve further speed-ups, the network architecture and training hyperparameters have to be investigated in more depth. Furthermore, the loss plots suggest that for longer training the difference between configurations with different number of Learners should still be observable, however, due to time and hardware constraints it was not possible to investigate the speed-ups achieved in longer running trials in more detail. Finally, as can be observed in \autoref{fig:datasets} and \autoref{fig:speedups}, the systems with individual pipelines with independent input data for each Learner initially perform and scale worse than the single-pipeline systems. However, in the later stages of training the effect of using multiple pipelines becomes more positive, e.g.\@ the \mbox{4 Learner -- 4 Pipeline} system almost catches up with the \mbox{12 Learner -- 1 Pipeline} system. Since input pipelines are computationally cheap, and it is computationally viable not to store the data as one big file but rather have it split across multiple nodes, this mode of operation should be investigated further and possibly preferred for large-scale training. As a last note, the learned embeddings themselves were not of high importance for the evaluation, since it is known that in order to obtain high quality embeddings a much higher amount of data (a dataset of \mbox{$>$ 100B words}) and computation time is needed than it was feasible to do as a part of the project. However, the learning outcomes were empirically evaluated and it was found that even with relatively short training runs the networks could capture some meaningful relationships between the vocabulary words. \begin{figure} \centering \includegraphics[width=\linewidth]{fig/datasets.pdf} \caption{Validation Loss Against the Amount of Data per Learner} \label{fig:datasets} \end{figure} \begin{figure} \centering \includegraphics[width=\linewidth]{fig/speedups.pdf} \caption{Scalability} \label{fig:speedups} \end{figure} \section{Conclusion and Future Works} Let us briefly summarize the main accomplishments of this project. First, the resulting system demonstrates the power of Cython as a tool for incorporating Python code into C applications. This aspect of Cython is often overlooked as it is mostly used in the reverse direction --- accelerating Python with embedded C code. The use of Cython allows to write independent idiomatic code in both C and Python parts of the application and to seamlessly connect these two parts. The drawbacks of this approach are that the full Python interpreter still gets embedded into the C application, and, furthermore, some parts of Python, such as the \verb|multiprocessing| module, result in failures when embedded into a C application, which prohibits to use some Python libraries like NLTK that use \verb|multiprocessing| internally. Another major accomplishment is the creation of a modular distributed Deep Learning architecture for a basic NLP task, which can be further expanded to compute higher level problems, like word prediction or sentiment analysis. Furthermore, the results of the tests show that there can be significant improvements in terms of training times if the training is performed on multiple nodes in parallel, even with independent data on each node. The directions for future improvements can be identified as follows. First, the system currently uses CPU for neural network training, which is inefficient. Therefore, it might be interesting to investigate whether MPI can be used to distribute the system across the cluster of GPU-equipped nodes. Furthermore, the architecture of the neural network probably requires some fine-tuning to achieve better scalability, as reported in~\cite{fedavg}. It would also be interesting to investigate finer-grain parallelism with FG-MPI~\cite{fg-mpi}, especially for the input pipeline, since the pipeline nodes are rather too lightweight for each of them to occupy a separate process, and therefore the coroutine-based parallelism might be a better fit in this case. Finally, an interesting direction would be to split the neural networks across multiple nodes, with one neural network layer occupying one node (e.g.\@ as in~\cite{syngrad}), which might distribute the computational load across the nodes more evenly. \bibliographystyle{IEEEtran} \bibliography{IEEEabrv, references} \end{document}