Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Shared queue contents not visible in Python multiprocessing

I have a few coroutines running on one process (A) and one heavier unbounded job running on a separate process(B). I would like that heavier job to dispatch its results into a queue which is consumed by the original process (A).

Similar to this:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor


def process__heavy(pipe):
    print("[B] starting...")
    while True:
        print(f"[B] Pipe queue: {pipe.qsize()}")
        pipe.put_nowait(str(time.time()))
        time.sleep(0.5)

async def coroutine__stats(pipe):
    print("[A] starting...")
    while True:
        print(f"[A] Pipe queue: {pipe.qsize()}")
        await asyncio.sleep(1)

  
async def main():
    pipe = asyncio.Queue()
    executor = ProcessPoolExecutor()

    jobs = await asyncio.gather(
        asyncio.get_running_loop().run_in_executor(executor, process__heavy, pipe),
        coroutine__stats(pipe)
    )

    print(f"Finished with result: {jobs.result()}")


if __name__ == '__main__':
    asyncio.run(main())
    print("Bye.")

Outut

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

[A] starting...
[A] Pipe queue: 0
[B] starting...
[B] Pipe queue: 0
[B] Pipe queue: 1
[A] Pipe queue: 0 <--- why zero?
[B] Pipe queue: 2
[B] Pipe queue: 3
[A] Pipe queue: 0 <---
[B] Pipe queue: 4
[B] Pipe queue: 5
[A] Pipe queue: 0 <---
[B] Pipe queue: 6
[B] Pipe queue: 7
[A] Pipe queue: 0
[B] Pipe queue: 8

The original process (A) does not see any data put into the shared queue. I do not remember if in python you can do object sharing across processes or if is it all pickled and the only result you can get is when the process exits and returns?

What am I doing wrong and what would be the best way to create a data pipe between those 2 processes?

>Solution :

Use a multiprocessing.Manager() to create the Queue instead of asyncio.Queue:

import multiprocessing as mp
# ...
pipe = mp.Manager().Queue()

With that change to the OP code:

[A] starting...
[A] Pipe queue: 0
[B] starting...
[B] Pipe queue: 0
[B] Pipe queue: 1
[A] Pipe queue: 2
[B] Pipe queue: 2
[B] Pipe queue: 3
[A] Pipe queue: 4
[B] Pipe queue: 4
[B] Pipe queue: 5
[A] Pipe queue: 6
[B] Pipe queue: 6
[B] Pipe queue: 7
[A] Pipe queue: 8
[B] Pipe queue: 8
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading