I’m trying to use multiple threads to fetch data from a source. Data item from a fetch call is approximately 150 mb. I simulate that by returning a numpy array of similar size.
In the output of this script, you can see that as I increase the number of worker threads, the time to fetch the data item increases linearly. Shouldn’t the call to the function fetch_data be independent for each thread? Why is the time taken for this function increasing linearly with the number of threads?
import numpy as np
from threading import Thread
from threading import Barrier
from time import perf_counter as pc
def fetch_data():
# simulating fetching large data item (approximately 150 mb)
return np.random.random((1000*128, 150))
def producer(barrier, identifier):
times = []
for i in range(16):
t0 = pc()
data = fetch_data()
t1 = pc()
times.append(t1 - t0)
barrier.wait()
if identifier == 0:
print(f'average data fetch time: {sum(times) / len(times)}')
def main(num_workers):
barrier = Barrier(num_workers)
producers = [Thread(target=producer, args=(barrier, i)) for i in range(num_workers)]
for prod in producers:
prod.start()
for prod in producers:
prod.join()
if __name__ == '__main__':
num_worker_options = [1, 2, 4, 8]
print('--'*25)
for n in num_worker_options:
print(f'running with num_workers={n}')
main(n)
print('--'*25)
Output
--------------------------------------------------
running with num_workers=1
average data fetch time: 0.1944406289999847
--------------------------------------------------
running with num_workers=2
average data fetch time: 0.37914658368754317
--------------------------------------------------
running with num_workers=4
average data fetch time: 0.7443576513123276
--------------------------------------------------
running with num_workers=8
average data fetch time: 1.4798094858125523
--------------------------------------------------
>Solution :
The threads allow concurrency, but not parallelism – they will alternate and do the work concurrently, but they won’t use additional cores, so the only gains are in not having to wait for I/O or similar costly operations. As a result, the gains are minimal. Try multiprocessing instead – although the overhead is far more costly there, for sufficiently large tasks the actual parallelism will get you what you’re looking for.
For example:
import numpy as np
import multiprocessing
from time import perf_counter as pc
def fetch_data():
# simulating fetching large data item (approximately 150 mb)
return np.random.random((1000*128, 150))
def producer(identifier):
times = []
for i in range(16):
t0 = pc()
data = fetch_data()
t1 = pc()
times.append(t1 - t0)
if identifier == 0:
print(f'average data fetch time: {sum(times) / len(times)}')
def main(num_workers):
ps = [multiprocessing.Process(target=producer, args=(i,)) for i in range(num_workers)]
for p in ps:
p.start()
for p in ps:
p.join()
if __name__ == '__main__':
num_worker_options = [1, 2, 4, 8]
if __name__ == '__main__':
for n in num_worker_options:
print(f'running with num_workers={n}')
main(n)
print('--'*25)
Output:
running with num_workers=1
average data fetch time: 0.1159221250002247
--------------------------------------------------
running with num_workers=2
average data fetch time: 0.11778964999984964
--------------------------------------------------
running with num_workers=4
average data fetch time: 0.13016788124969025
--------------------------------------------------
running with num_workers=8
average data fetch time: 0.14858553749945713
--------------------------------------------------
There’s some added overhead, but the main difference is that I can hear the fans speeding up for the later number of workers.
For comparison, the first step when running your code on my machine:
--------------------------------------------------
running with num_workers=1
average data fetch time: 0.11301828124987878
--------------------------------------------------
So, the threaded solution is faster overall than the multiprocessing one, for such a trivial task, but not by an awful lot.