Using multiple processes to construct train batches may significantly reduce total training time of your network.
Basically, if you are using GPU for training, you can reduce additional batch construction time almost to zero. This is achieved through pipelining of computations: while GPU crunches numbers, CPU makes preprocessing. Python multiprocessing
module allows us to implement such pipelining as elegant as it is possible in the language with GIL.
PyTorch DataLoader
class, for example, also uses multiprocessing
in it’s internals.
Unfortunately DataLoader
suffers lack of flexibility. It’s impossible to create batch with any complex structure within standard DataLoader
class. So it should be useful to be able to apply raw multiprocessing
.
multiprocessing
gives us a set of useful APIs to distribute computations among several processes. Processes does not share memory with each other, so data is transmitted via inter-process communication protocols. For example in linux-like operation systems multiprocessing
uses pipes. Such organization leads to some pitfalls that I am going to tell you.
map
vs imap
Methods map
and imap
may be used to apply preprocessing to batches. Both of them take processing function and iterable as argument. The difference is that imap
is lazy. It will return processed elements as soon as they are ready. In this case all processed batched should not be stored in RAM simultaneously. For training NN you should always prefer imap
:
def process(batch_reader):
with Pool(threads) as pool:
for batch in pool.imap(foo, batch_reader):
....
yield batch
....
Serialization
Other pitfall is associated with the need to transfer objects via pipes. In addition to the processing results, multiprocessing
will also serialize transformation object if it is used like this: pool.imap(transformer.foo, batch_reader)
. transformer
will be serialized and send to subprocess. It may lead to some problems if transformer
object has large properties. In this case it may be better to store large properties as singleton class variables:
class Transformer():
large_dictinary = None
def __init__(self, large_dictinary, **kwargs):
self.__class__.large_dictinary = large_dictinary
def foo(self, x):
....
y = self.large_dictinary[x]
....
Another difficulty that you may encounter is if the preprocessor is faster than GPU learning. In this case unprocessed batches accumulate in memory. If your memory is not to large enough you will get Out-of-Memory error. One way to solve this problem is to limit batch preprocessing until GPU learning is done.
Semaphore
is perfect solution for this task:
def batch_reader(semaphore):
for batch in source:
semaphore.acquire()
yield batch
def process(x):
return x + 1
def pooling():
with Pool(threads) as pool:
semaphore = Semaphore(limit)
for x in pool.imap(plus, batch_reader(semaphore)):
yield x
semaphore.release()
for x in pooling():
learn_gpu(x)
Semaphore
has internal counter syncronized across all working processes. It’s logic will block execution if some process tries to increase counet value above limit with semaphore.acquire()