Parallel preprocessing

July 24, 2018

Using multiple processes to construct train batches may significantly reduce total training time of your network. Basically, if you are using GPU for training, you can reduce additional batch construction time almost to zero. This is achieved through pipelining of computations: while GPU crunches numbers, CPU makes preprocessing. Python multiprocessing module allows us to implement such pipelining as elegant as it is possible in the language with GIL.

PyTorch DataLoader class, for example, also uses multiprocessing in it’s internals. Unfortunately DataLoader suffers lack of flexibility. It’s impossible to create batch with any complex structure within standard DataLoader class. So it should be useful to be able to apply raw multiprocessing.

multiprocessing gives us a set of useful APIs to distribute computations among several processes. Processes does not share memory with each other, so data is transmitted via inter-process communication protocols. For example in linux-like operation systems multiprocessing uses pipes. Such organization leads to some pitfalls that I am going to tell you.

map vs imap

Methods map and imap may be used to apply preprocessing to batches. Both of them take processing function and iterable as argument. The difference is that imap is lazy. It will return processed elements as soon as they are ready. In this case all processed batched should not be stored in RAM simultaneously. For training NN you should always prefer imap:

def process(batch_reader):
    with Pool(threads) as pool:
        for batch in pool.imap(foo, batch_reader):
            ....
            yield batch
            ....

Serialization

Other pitfall is associated with the need to transfer objects via pipes. In addition to the processing results, multiprocessing will also serialize transformation object if it is used like this: pool.imap(transformer.foo, batch_reader). transformer will be serialized and send to subprocess. It may lead to some problems if transformer object has large properties. In this case it may be better to store large properties as singleton class variables:

class Transformer():
    large_dictinary = None

    def __init__(self, large_dictinary, **kwargs):
        self.__class__.large_dictinary = large_dictinary

    def foo(self, x):
        ....
        y = self.large_dictinary[x]
        ....

Another difficulty that you may encounter is if the preprocessor is faster than GPU learning. In this case unprocessed batches accumulate in memory. If your memory is not to large enough you will get Out-of-Memory error. One way to solve this problem is to limit batch preprocessing until GPU learning is done. Semaphore is perfect solution for this task:

def batch_reader(semaphore):
    for batch in source:
        semaphore.acquire()
        yield batch


def process(x):
    return x + 1


def pooling():
    with Pool(threads) as pool:
        semaphore = Semaphore(limit)
        for x in pool.imap(plus, batch_reader(semaphore)):
            yield x
            semaphore.release()


for x in pooling():
    learn_gpu(x)

Semaphore has internal counter syncronized across all working processes. It’s logic will block execution if some process tries to increase counet value above limit with semaphore.acquire()