Returning results is very slow in multiprocess code. What should I do?

No multiprocessing code:

from time import time

func1Results = []

def func1(valList):
    num = 0

    for val in valList:
        num += val

    func1Results.append(num)

if __name__ == '__main__':
    st = time()
    
    for valList in [range(40000000), range(40000000), range(40000000), range(40000000)]:
        func1(valList)

    ed = time()
    
    for r in func1Results:
        print(r)

    print(ed - st)

Output:
799999980000000
799999980000000
799999980000000
799999980000000
13.679119348526001

Multiprocess code:

from multiprocessing import Process, Queue
from time import time

queue = Queue()
processList, func1Results = [], []

def func1(valList, queue):
    num = 0

    for val in valList:
        num += val

    queue.put(num)

if __name__ == '__main__':
    st = time()

    for valList in [range(40000000), range(40000000), range(40000000), range(40000000)]:
        xProcess = Process(target=func1, args=(valList, queue))
        xProcess.start()
        
        func1Results.append(queue.get()), processList.append(xProcess)

    for xProcess in processList:
        xProcess.join()

    ed = time()

    for i in func1Results:
        print(i)

    print(ed - st)

Output:
799999980000000
799999980000000
799999980000000
799999980000000
13.916456937789917

When I use the ‘Put’ and ‘Get’ commands, the processing time of the multiprocessing code increases significantly. I know that returning results in multiprocessing is quite time consuming. But this is exactly what I need. What can I do to return the result more efficiently?

>Solution :

Here’s a restructured approach to the original code where we allow all the sub-processes to terminate before we examine the queue.

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
from functools import partial
import time

N = 40000000

def calc(q, rng):
    num = 0
    for n in rng:
        num += n
    q.put(num)

def main():
    with Manager() as manager:
        queue = manager.Queue()
        rlist = [range(N), range(N), range(N), range(N)]
        p = partial(calc, queue)
        with ProcessPoolExecutor() as executor:
            executor.map(p, rlist)
        while not queue.empty():
            print(queue.get())

if __name__ == '__main__':
    start = time.perf_counter()
    main()
    end = time.perf_counter()
    print(f'Duration = {end-start:.2f}s')

Output:

799999980000000
799999980000000
799999980000000
799999980000000
Duration = 1.93s

Note:

Of course, you don’t need a queue to get the results from the sub-process. You could just do this:

from concurrent.futures import ProcessPoolExecutor
import time

N = 40000000

def calc(rng):
    num = 0
    for n in rng:
        num += n
    return num

def main():
    rlist = [range(N), range(N), range(N), range(N)]
    with ProcessPoolExecutor() as executor:
        print(*executor.map(calc, rlist), sep='\n')
    
if __name__ == '__main__':
    start = time.perf_counter()
    main()
    end = time.perf_counter()
    print(f'Duration = {end-start:.2f}s')

Output:

799999980000000
799999980000000
799999980000000
799999980000000
Duration = 1.83s

Leave a Reply