Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Python aiohttp + asyncio: How to execute code after loop.run_forever()

Code:

#!/usr/bin/env python

import asyncio
import os
import socket
import time
import traceback
from aiohttp import web
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count

CPU_COUNT = cpu_count()
print("CPU Count:", CPU_COUNT)

def mk_socket(host="127.0.0.1", port=9090, reuseport=False):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    if reuseport:
        SO_REUSEPORT = 15
        sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
    sock.bind((host, port))
    return sock

async def index(request):
    icecast_index_path = os.path.abspath("../test/icecast/icecast_index.html")
    print(icecast_index_path)
    try:
        content = open(icecast_index_path, encoding="utf8").read()
        return web.Response(content_type="text/html", text=content)
    except Exception as e:
        return web.Response(content_type="text/html", text="<!doctype html><body><h1>Error: "+str(e)+"</h1></body></html>")

async def start_server():
    try:
        host = "127.0.0.1"
        port=8080
        reuseport = True
        app = web.Application()
        app.add_routes([web.get('/', index)])
        runner = web.AppRunner(app)
        await runner.setup()
        sock = mk_socket(host, port, reuseport=reuseport)
        srv = web.SockSite(runner, sock)
        await srv.start()
        print('Server started at http://127.0.0.1:8080')
        return srv, app, runner
    except Exception:
        traceback.print_exc()
        raise

async def finalize(srv, app, runner):
    sock = srv.sockets[0]
    app.loop.remove_reader(sock.fileno())
    sock.close()

    #await handler.finish_connections(1.0)
    await runner.cleanup()
    srv.close()
    await srv.wait_closed()
    await app.finish()

def init():
    loop = asyncio.get_event_loop()
    srv, app, runner = loop.run_until_complete(start_server())
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.run_until_complete((finalize(srv, app, runner)))


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        for i in range(0, int(CPU_COUNT/2)):
            executor.submit(init)

    #after the aiohttp start i want to execute more code
    #for example:
    print("Hello world.")
    #in actual programm the ProcessPoolExecutor is called
    #inside a pyqt5 app
    #so i don't want the pyqt5 app to freeze.

The problem is that with that code i can’t execute code after the ProcessPoolExecutor calls.

How can i fix that?

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

I tried to remove this part:

try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.run_until_complete((finalize(srv, app, runner)))

in init() method but after that the aiohttp server is closed instantly.

Edit: If i use a thread instead of ProcessPoolExecutor then there is an aiohttp errors that says:

  1. RuntimeError: set_wakeup_fd only works in main thread
  2. RuntimeError: There is no current event loop in thread
  3. Aiohttp + asyncio related errors.

Maybe i may use a @ signature up to def declarations (i suppose).

>Solution :

Using with with an Executor will cause your process to block until the jobs in the executor are completed; since they’re running infinite event loops, they will never complete and your Executor will never unblock.

Instead, just use the executor to kick off the jobs, and run your stuff afterwards. When you’re finally done, call .shutdown() to wait for processes to exit:

executor = ProcessPoolExecutor()
for i in range(0, int(CPU_COUNT/2)):
    executor.submit(init)
# other code…
executor.shutdown()
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading