Files
fedavg_mpi/docs/report.latex

523 lines
28 KiB
Plaintext

\documentclass{article}
\usepackage[letterpaper, margin=1in]{geometry}
\usepackage[colorlinks]{hyperref}
\usepackage{graphicx}
\usepackage{listings}
\lstset{basicstyle=\ttfamily}
\renewcommand{\floatpagefraction}{.8}
\title{Distributed Natural Language Processing with MPI and Python}
\author{Pavel Lutskov for CPSC 521 @ UBC}
\begin{document}
\maketitle
\section{Introduction}
Natural language processing (NLP) is a field of computer science with
applications such as digital assistants or machine translation. A typical NLP
application consists of different stages of data processing forming a pipeline,
the stages of which may be executed in parallel. Furthermore, individual
pipeline stages involving complex data intensive NLP algorithms, such as word
embedding calculation, may also benefit from parallelization. Finally, the
abundance of the textual data distributed over the Internet motivates
implementation of NLP algorithms in a distributed fashion. One of the
established frameworks for distributed computing is the MPI~\cite{mpich}
library for the C language. However, because of the complexity of the NLP
algorithms, it is infeasible to implement them in C. Therefore, the idea of
this project was to interface the existing Python libraries for NLP and machine
learning with C code and to leverage the MPI library for parallelization and
distribution of computations. The possible milestones of the project were
initially identified as follows:
\begin{itemize}
\item Investigating the possibility of passing data and calling simple Python
routines from C.
\item Writing pipeline stages in C with help of NLTK~\cite{nltk} framework.
\item Parallelizing individual stages with MPI.
\item Implementing a data intensive algorithm with parallel stage execution
(e.g. large scale word embedding computation).
\item Benchmarking the parallelized implementation against a sequential
Python program.
\end{itemize}
However, early on it became apparent that the Python \verb|multiprocessing|
module, which is used internally by NLTK, causes various conflicts when
incorporating the Python interpreter into a C application. For this reason,
NLTK had to be abandoned, and the focus of the project was shifted towards the
distributed Deep Learning-based computation of word embeddings with the help of
TensorFlow~\cite{tensorflow} framework.
\section{Architecture Overview}
The system implemented during the work on this project computes word embeddings
for a given vocabulary based on a user-supplied text corpus using the CBOW
approach proposed~\cite{cbow-skip-gram} by Mikolov et al.\@ in 2013. This
approach involves training a neural network on unstructured textual data to
perform some proxy task. The resulting embedding matrix is the weight matrix of
the first layer of the trained neural network.
The text data, before being supplied to the neural network, has to pass several
preprocessing stages. These stages, as implemented in this project, form an
\textit{input pipeline}, which is depicted in \autoref{fig:pipeline}. First,
the pipeline node called \textit{Tokenizer} reads a character stream from a
text file. This node is responsible for replacing all non-alphabetic and
non-ASCII characters in the stream with whitespace, normalizing the stream by
setting all remaining alphabetic characters to lowercase, and finally splitting
the stream into tokens (words) and passing the words one-by-one to the next
pipeline stage.
\begin{figure}
\centering
\includegraphics[width=0.7\linewidth]{fig/input_pipeline.pdf}
\caption{An Input Pipeline in the System}
\label{fig:pipeline}
\end{figure}
The next pipeline stage is filtering, for which the \textit{Filter} node is
responsible. When computing word embeddings using the CBOW model, only those
words can be used, that are present in the training vocabulary. Furthermore,
the neural network doesn't accept raw text as input, but requires the words to
be encoded with an integer index corresponding to the word's position in the
vocabulary. Finally, the CBOW network doesn't process individual words, but
operates on \textit{context windows} of word indices. Therefore, the task of
the \textit{Filter} node is to remove all the words from the pipeline that are
not in the training vocabulary, replace the words with integer indices, and,
finally, to assemble the indices into a context window. As soon as a context
window is filled it is sent down the pipeline for training batch assembly. In
the system implemented in this project a context window of size 5 is used.
In the final stage of the input pipeline, the node called \textit{Batcher}
accumulates the context windows into batches, which can then be requested by
\textit{Learner} nodes containing the neural network for the actual neural
network training.
The other dimension of the parallelism employed in this system is the
distributed neural network training. In this project, an approach
proposed~\cite{fedavg} in 2016 by McMahan et al.\@ is used. The idea is to
distribute a copy of a neural network to a number of independent workers, which
would then separately perform several training iterations, possibly based on
their individual independent training data. The learned neural network weights
are then collected from the workers, a new model is computed by taking the
arithmetic average of the gathered weights, and then this neural network is
distributed to the workers for a new training round. The assumption behind this
architecture is that individually each worker will only need to perform a
fraction of training iterations for the combined model to achieve the desired
performance, compared to a case when only a single neural network is trained
sequentially.
In the presented system, there is one central node, called the
\textit{Dispatcher}, that is responsible for storing the model weights,
distributing the weights to the \textit{Learner} nodes (which perform the
actual training) and collecting the weights at the end of a training round and
computing their average. \autoref{fig:modes} demonstrates that the system
allows for each \textit{Learner} to have its own input pipeline, or for one
single input pipeline to be shared among all Learners, or for some intermediate
configuration. However, it is not currently possible for one Learner to access
more than one input pipeline.
\begin{figure}[h]
\centering
\includegraphics[width=\linewidth]{fig/modes.pdf}
\caption{Possible Pipeline Configurations}
\label{fig:modes}
\end{figure}
\section{Implementation Details}
\subsection{Overview}
The application logic for the project is split across three files:
\verb|main.c|, \verb|bridge.pyx| and \verb|library.py|. In the \verb|main.c|
file, the overall system architecture is defined, the communication between
nodes is implemented with the help of the MPI library, and, finally, the
current execution state, such as the current model weights, is stored and
managed. This project was tested using the MPICH 3.3 library~\cite{mpich}
implementing the MPI standard. The neural network training algorithms, as well
as algorithms for stream tokenization and filtering are implemented in the
\verb|library.py| file. This file targets Python 3.6 and uses the libraries
NumPy~\cite{numpy} 1.16 for general numerical computations and TensorFlow 1.14
for Deep Learning, as well as several Python standard library facilities.
Finally, the file \verb|bridge.pyx| provides interface functions for the C code
to access the Python functionality, thus creating a bridge between the
algorithms and the system aspects. In a \verb|.pyx| file, C and Python code can
be mixed rather freely, with occasional use of some special syntax. This file
is translated by the Cython~\cite{cython} framework into \verb|bridge.c| and
\verb|bridge.h| files. The \verb|bridge.c| is then used as a compilation unit
for the final executable, and the \verb|bridge.h| is included into the
\verb|main.c| as a header file. In order for the compilation to succeed, the
compiler needs to be pointed towards the Python header files, and, since NumPy
code is used in \verb|bridge.pyx|, to the NumPy header files. Furthermore, the
application needs to be linked against the Python dynamic libraries, which
results in the Python interpreter being embedded into the final executable. In
order to simplify the compilation process and to make the codebase more
portable, the build system Meson~\cite{meson} was used in this project to
facilitate building.
\subsection{Running the Application} \label{ssec:running}
To run this system, you will need the following software:
\begin{itemize}
\item A recent macOS or Linux;
\item A recent compiler, \textit{GCC} or \textit{clang};
\item \textit{MPICH} 3;
\item \textit{Python} $\geq3.6$ with headers and libraries (e.g.\@ on Ubuntu
you need to install \verb|python3-dev|);
\item \textit{Meson}, \textit{Cython} and \textit{ninja} for building;
\item \textit{TensorFlow} 1.14, \textit{Numpy} 1.16;
\end{itemize}
The application can then be built from the repository root by issuing the
following command:
\begin{lstlisting}
meson build && (cd build && ninja)
\end{lstlisting}
Then, the program expects to be run from the repository root and for a
directory named \verb|config| to be present in the repository root. This
directory has to contain the following three files:
\begin{itemize}
\item \verb|vocab.txt| --- This file will contain the vocabulary words, for
which the embeddings will be learned. These words need to be
whitespace or newline separated, and only contain alphabetic lowercase
ASCII characters.
\item \verb|test.txt| --- This file contains the testing dataset of context
windows, based on which the training performance of the network will be
tracked. A context window of size 5 is used in the project, so this file
has to contain 5 whitespace separated words per line. The third word in
each line is the target word, and other words are the surrounding context.
Only the words are allowed here, that are present in \verb|vocab.txt|.
\item \verb|cfg.json| --- This file contains several key--value pairs for
configuration of the training procedure:
\begin{itemize}
\item \verb|"data_name"| --- The name of the dataset that is used to train
the network, can an alphanumeric string of your choice.
\item \verb|"bpe"| --- Batches per Epoch, the number of independent
iterations each Learner will perform before sending the weights back to
the Dispatcher.
\item \verb|"bs"| --- Batch Size, the number of context windows in a
training batch.
\item \verb|"target"| --- The targeted value of the neural network loss
function evaluated on the testing dataset. As soon as this value is
reached, the program will stop training and exit.
\end{itemize}
\end{itemize}
Once these files have been created, the program can be run from the repository
root by issuing the following command:
\begin{lstlisting}
mpiexec -n NUM_PROC ./build/fedavg_mpi /path/to/dataset/text{1,2,3}
\end{lstlisting}
For each text file passed as an argument, the system will create an input
pipeline, consisting of 3 nodes (Tokenizer, Filter, Batcher). Furthermore, each
pipeline needs at least one Learner. There also needs to be one Dispatcher node
for the whole application. Therefore, the formula for the minimum number of
processes to be requested from \verb|mpiexec| looks like the following:
\begin{lstlisting}
NUM_PROC >= (4 * num_text_files) + 1
\end{lstlisting}
To figure out how many Learners will be created, the following formula can be
used:
\begin{lstlisting}
num_learners = NUM_PROC - 1 - (3 * num_text_files)
\end{lstlisting}
During running, the program will create the folder \verb|trained| in the
repository root, if it doesn't already exist, and will save there after each
training round the weights of the neural network in form of an HDF5 file, and
also separately the embedding matrix, which is a whitespace separated CSV file
with rows representing the embedding vectors and having the same order as the
words in the \verb|config/vocab.txt| file. The embedding vectors are hard-coded
to have 32 dimensions.
\subsection{Component Implementation}
\paragraph{Configuration Reading} The files in the \verb|config/| directory are
read by the \verb|library.py| module on start-up, and the vocabulary, the test
dataset and the parameters of training are stored as global module objects. The
\verb|bridge.pyx| then imports the \verb|library.py| module and defines several
public C API functions for the \verb|main.c| code to access the configuration
parameters, or to perform a word index lookup or evaluate a neural network
based on the test dataset.
\paragraph{Tokenizer} A Tokenizer node is implemented in the \verb|tokenizer|
function in the \verb|main.c| file, which receives as an argument the path to a
text file, from which the tokens will be read. It then calls a function
\verb|get_tokens(WordList* wl, const char* filename)|, defined in the
\verb|bridge.pyx| file. The \verb|WordList| structure is a dynamically growable
list of \verb|Word| structs that records the number of \verb|Word|s in the
list as well as the memory available for storing the \verb|Word|s. A
\verb|Word| structure is a wrapper around the C \verb|char*|, keeping track of
the memory allocated to the pointer. The function \verb|get_tokens| consults a
global dictionary contained in \verb|bridge.pyx| that keeps track of the file
names for which a token generator already exists. If the generator for the file
was not yet created, or if it is already empty, then a new generator is
created, by calling the \verb|token_generator(filename)| function, defined in
\verb|library.py|, which returns a generator that yields a list of tokens
from a line in the file, line by line. A list of words is then queried from the
generator, and the \verb|WordList| structure is populated with the words from
the list, expanding the memory allocated to it if needed. The \verb|tokenizer|
function then sends the \verb|Word|s from the \verb|WordList| one-by-one to the
Filter node, and as soon as all words are sent it calls \verb|get_tokens|
again. In the current implementation the Tokenizer will loop on the input data
until it receives a signal from the Dispatcher to stop. After this, it will
send an empty \verb|Word| down the pipeline to inform the Filter and the
Batcher to stop too.
\paragraph{Filter} A Filter node, implemented in \verb|filter| function in
\verb|main.c| receives the \verb|Word|s one by one from the Tokenizer and looks
up their indices in the vocabulary by calling the \verb|vocab_idx_of(Word* w)|
function defined in \verb|bridge.pyx|. That function performs a dictionary
lookup for the word, based on the \verb|config/vocab.txt| file, and returns its
index on success or \verb|-1| if the word is not known. The Filter will
assemble valid indices in a \verb|long* window| variable until enough words are
received to send the context window to the Batcher. If a word received from the
Tokenizer is empty, the Filter sets the first element in the context window to
\verb|-1| and sends the window to a Batcher for termination.
\paragraph{Batcher} A Batcher is a rather simple pure C routine, that first
assembles the context windows into a batch, simultaneously converting
\verb|long| into \verb|float|, and then waits for some Learner to announce
itself. Once it receives a signal from a Learner it responds with a batch and
starts assembling the next batch. Since this node may receive signals from both
Filter and Learner, it also may need to receive termination signals from both
in order to avoid waiting for a signal from a finished process. Therefore, if
the first element of the received window from the Tokenizer is \verb|-1|, or if
the Learner sends \verb|-1| when announcing itself, then the Batcher will
terminate immediately.
\paragraph{Learner} A Learner, implemented in \verb|learner| function in
\verb|main.c|, first creates a TensorFlow neural network object and stores the
network as a \verb|PyObject*|. It also initializes a C \verb|WeightList| struct
to store the network weights and to serve as a buffer for communication with
the Dispatcher. It then waits for the Dispatcher to announce a new training
round, after which the Dispatcher will send the weights and the Learner will
receive the weights into the \verb|WeightList| struct. Since a
\verb|WeightList| has a rather complex structure, a pair of functions
\verb|send_weights| and \verb|recv_weights| are used for communicating the
weights. Then, the Learner will use the \verb|WeightList| to set the neural
network weights, by employing the \verb|set_net_weights| function defined in
\verb|bridge.pyx|. This is one of the cases where it is particularly convenient
to use Cython, since raw C memory pointers can be easily converted to
\verb|NumPy| arrays, which one then can directly use to set the weights of a
TensorFlow network. Then, the Learner will perform a number of training
iterations, specified by \verb|"bpe"| key in \verb|config/cfg.json| file. For
each iteration, the Learner will send its MPI id to its designated Batcher and
will receive a batch in form of a \verb|float*|. This \verb|float*|, together
with the \verb|PyObject*| network object can be passed to the \verb|step_net|
Cython function to perform one step of training. This function, again,
leverages the ease of converting C data into NumPy arrays in Cython. Finally,
after all iterations, the weights of the network will be written to the
\verb|WeightList| by a Cython routine \verb|update_weightlist| and the
\verb|WeightList| will be sent back to the Dispatcher, and the Learner will
wait for the signal to start the next training round. If it instead receives a
signal to stop training, then it will send a \verb|-1| to its designated
Batcher and terminate.
\paragraph{Dispatcher} The Dispatcher also initializes a neural network and a
\verb|WeightList| structure using the same procedure as the Learner. This
network will serve as the single source of truth for the whole application. For
each training round the Dispatcher will send out the \verb|WeightList| to the
Learners, and upon receiving all the \verb|WeightList|s back from the Learners
will compute their arithmetic element-wise average and store it in its own
\verb|WeightList| structure, using the function \verb|combo_weights| from
\verb|bridge.pyx|. This updated \verb|WeightList| will also be assigned to the
Dispatcher's network, after which the loss of the network will be evaluated
based on the testing dataset from the \verb|config/test.txt|. After each
iteration the network weights and the embedding matrix will be saved, as
described in \autoref{ssec:running}. These iterations will continue until the
loss is below the \verb|"target"|, defined in \verb|config/cfg.json|. In this
case instead of the signal to start the training round, the Dispatcher will
send a \verb|-1| to all Tokenizers and Learners, so that all pipelines can be
properly halted. After this the Dispatcher will compute and print some run
statistics and exit.
\section{Evaluation}
The main focus of evaluation was to determine if executing several neural
network training nodes in parallel can speed-up the training process.
The
employed approach was to define a \textit{target loss} that the network has to
achieve and then to measure \textit{the number of context windows} that each
Learner node has to process and, secondarily, the time it takes for the system
to reach the target. The motivation behind this approach is that although the
total number of training windows consumed by the system is the number of
windows for each Learner times the number of Learners, the Learners process
their windows in parallel, thus the longest computation path is as long as the
number of windows that each Learner processes, which is a reasonable
approximation for parallel performance. Moreover, the tests have shown that
Learners dominate the running time (the pipeline with a single Learner could
process around 45 batches/s, but over 500 batches/s when the call to the
training function in the Learner was commented out), therefore the number of
context windows processed by Learners is the most important parameter for the
overall performance. It is also possible to count the processed batches and not
the context windows, however it may be interesting to compare the influence of
the number of the context windows in a batch (i.e.\@ the \textit{batch size})
on the training performance, such that e.g.\@ increasing the batch size might
actually reduce the amount of data needed for training.
The wall time was only used as a secondary measure, since due to time
constraints and software incompatibility it was not possible to launch the
system on the computing cluster, so the tests had to be performed on a laptop
with a modest double core 1.3 GHz CPU, which means that using more than 2
Learner nodes would essentially result in sequential simulation of the parallel
processing, thus yielding no improvements in processing time.
The evaluations were performed on two datasets. The first one being the book
``Moby Dick'' by Herman Melville (approx.\@ 200k words), obtained from the
Project Gutenberg~\cite{gutenberg}, using the API provided by the NLTK toolkit.
The vocabulary used for this dataset are all words from the book excluding
English stop words, as defined by NLTK. The test part for this dataset were a
1000 randomly selected context windows from the book.
Another dataset was a part of a recent English Wikipedia dump~\cite{wikidump}
(approx.\@ 90M words), which was transformed into plain text using the
WikiExtractor~\cite{wikiextractor} tool. For this dataset the vocabulary is the
list of 10000 most frequently used English words, obtained
from~\cite{10k-words}, also excluding the stop words. As a test data, 5000
context windows were randomly sampled from the dump file.
The test configurations were:
\begin{itemize}
\item a single pipeline with 1, 2, 4, 8, 12 Learners (up to 17 total
processes);
\item or individual pipelines for 1, 2, 4, 8 Learners, each reading a
separate part of a dataset (up to 33 total processes).
\end{itemize}
For the smaller of the two datasets the target was set to \verb|8.4|, and it
can be observed in \autoref{fig:datasets}, that modest speedups can be achieved
by employing up to 8 Learners, with the system maxing out on 2.4x speed-up.
Furthermore, a \mbox{2 Learner -- 2 Pipeline} configuration training
independently on two different halves of the book never even reaches the
target. A possible explanation for this is that the ``Moby Dick'' book is too
small for multiple Learners to have sufficient data to train on.
For the larger dataset with the target set to \verb|8.3|, however, the results
were more promising, as can be seen in \autoref{fig:datasets} and
\autoref{fig:speedups}. Using 2 Learners instead of 1 resulted in nearly linear
reduction of both the amount of data consumed by each Learner (1.95x) and time
to target (1.94x). This result also validates the use of the number of context
windows consumed by each Learner as a proxy for system performance, since
scaling within the number of available cores results in an almost perfect
correlation between the amount of data per Learner and the wall time. Going
from 2 to 4 Learners decreases the amount of data per Learner by another 2x,
with the wall time remaining roughly the same, demonstrating the core depletion
on the laptop. Further increasing the number of Learner nodes results in
observable, but sub-linear speedups, with the 12 Learner System using 7x less
data per Learner to achieve the target loss of \verb|8.3|. This decrease in
gains can probably be linked to the deficiencies of the neural network model
being used, and thus, to achieve further speed-ups, the network architecture
and training hyperparameters have to be investigated in more depth.
Furthermore, the loss plots suggest that for longer training the difference
between configurations with different number of Learners should still be
observable, however, due to time and hardware constraints it was not possible
to investigate the speed-ups achieved in longer running trials in more detail.
Finally, as can be observed in \autoref{fig:datasets} and
\autoref{fig:speedups}, the systems with individual pipelines with independent
input data for each Learner initially perform and scale worse than the
single-pipeline systems. However, in the later stages of training the effect of
using multiple pipelines becomes more positive, e.g.\@ the \mbox{4 Learner -- 4
Pipeline} system almost catches up with the \mbox{12 Learner -- 1 Pipeline}
system. Since input pipelines are computationally cheap, and it is
computationally viable not to store the data as one big file but rather have it
split across multiple nodes, this mode of operation should be investigated
further and possibly preferred for large-scale training.
As a last note, the learned embeddings themselves were not of high importance
for the evaluation, since it is known that in order to obtain high quality
embeddings a much higher amount of data (a dataset of \mbox{$>$ 100B words})
and computation time is needed than it was feasible to do as a part of the
project. However, the learning outcomes were empirically evaluated and it was
found that even with relatively short training runs the networks could capture
some meaningful relationships between the vocabulary words.
\begin{figure}
\centering
\includegraphics[width=\linewidth]{fig/datasets.pdf}
\caption{Validation Loss Against the Amount of Data per Learner}
\label{fig:datasets}
\end{figure}
\begin{figure}
\centering
\includegraphics[width=\linewidth]{fig/speedups.pdf}
\caption{Scalability}
\label{fig:speedups}
\end{figure}
\section{Conclusion and Future Works}
Let us briefly summarize the main accomplishments of this project. First, the
resulting system demonstrates the power of Cython as a tool for incorporating
Python code into C applications. This aspect of Cython is often overlooked as
it is mostly used in the reverse direction --- accelerating Python with
embedded C code. The use of Cython allows to write independent idiomatic code
in both C and Python parts of the application and to seamlessly connect these
two parts. The drawbacks of this approach are that the full Python interpreter
still gets embedded into the C application, and, furthermore, some parts of
Python, such as the \verb|multiprocessing| module, result in failures when
embedded into a C application, which prohibits to use some Python libraries
like NLTK that use \verb|multiprocessing| internally.
Another major accomplishment is the creation of a modular distributed Deep
Learning architecture for a basic NLP task, which can be further expanded to
compute higher level problems, like word prediction or sentiment analysis.
Furthermore, the results of the tests show that there can be significant
improvements in terms of training times if the training is performed on
multiple nodes in parallel, even with independent data on each node.
The directions for future improvements can be identified as follows. First, the
system currently uses CPU for neural network training, which is inefficient.
Therefore, it might be interesting to investigate whether MPI can be used to
distribute the system across the cluster of GPU-equipped nodes. Furthermore,
the architecture of the neural network probably requires some fine-tuning to
achieve better scalability, as reported in~\cite{fedavg}. It would also be
interesting to investigate finer-grain parallelism with FG-MPI~\cite{fg-mpi},
especially for the input pipeline, since the pipeline nodes are rather too
lightweight for each of them to occupy a separate process, and therefore the
coroutine-based parallelism might be a better fit in this case. Finally, an
interesting direction would be to split the neural networks across multiple
nodes, with one neural network layer occupying one node (e.g.\@ as
in~\cite{syngrad}), which might distribute the computational load across the
nodes more evenly.
\bibliographystyle{IEEEtran}
\bibliography{IEEEabrv, references}
\end{document}