Using multiple processes to construct train batches may significantly reduce total training time of your network.
Basically, if you are using GPU for training, you can reduce additional batch construction time almost to zero. This is achieved through pipelining of computations: while GPU crunches numbers, CPU makes preprocessing. Python
multiprocessing module allows us to implement such pipelining as elegant as it is possible in the language with GIL.
DataLoader class, for example, also uses
multiprocessing in it’s internals.
DataLoader suffers lack of flexibility. It’s impossible to create batch with any complex structure within standard
DataLoader class. So it should be useful to be able to apply raw
multiprocessing gives us a set of useful APIs to distribute computations among several processes. Processes does not share memory with each other, so data is transmitted via inter-process communication protocols. For example in linux-like operation systems
multiprocessing uses pipes. Such organization leads to some pitfalls that I am going to tell you.
imap may be used to apply preprocessing to batches. Both of them take processing function and iterable as argument. The difference is that
imap is lazy. It will return processed elements as soon as they are ready. In this case all processed batched should not be stored in RAM simultaneously. For training NN you should always prefer
def process(batch_reader): with Pool(threads) as pool: for batch in pool.imap(foo, batch_reader): .... yield batch ....
Other pitfall is associated with the need to transfer objects via pipes. In addition to the processing results,
multiprocessing will also serialize transformation object if it is used like this:
transformer will be serialized and send to subprocess. It may lead to some problems if
transformer object has large properties. In this case it may be better to store large properties as singleton class variables:
class Transformer(): large_dictinary = None def __init__(self, large_dictinary, **kwargs): self.__class__.large_dictinary = large_dictinary def foo(self, x): .... y = self.large_dictinary[x] ....
Another difficulty that you may encounter is if the preprocessor is faster than GPU learning. In this case unprocessed batches accumulate in memory. If your memory is not to large enough you will get Out-of-Memory error. One way to solve this problem is to limit batch preprocessing until GPU learning is done.
Semaphore is perfect solution for this task:
def batch_reader(semaphore): for batch in source: semaphore.acquire() yield batch def process(x): return x + 1 def pooling(): with Pool(threads) as pool: semaphore = Semaphore(limit) for x in pool.imap(plus, batch_reader(semaphore)): yield x semaphore.release() for x in pooling(): learn_gpu(x)
Semaphore has internal counter syncronized across all working processes. It’s logic will block execution if some process tries to increase counet value above limit with