Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

How to cancel tasks in anyio.TaskGroup context?

I write a script to find out the fastest one in a list of cdn hosts:

#!/usr/bin/env python3.11
import time
from contextlib import contextmanager
from enum import StrEnum

import anyio
import httpx


@contextmanager
def timeit(msg: str):
    start = time.time()
    yield
    cost = time.time() - start
    print(msg, f"{cost = }")


class CdnHost(StrEnum):
    jsdelivr = "https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9.0/swagger-ui.css"
    unpkg = "https://unpkg.com/swagger-ui-dist@5.9.0/swagger-ui.css"
    cloudflare = (
        "https://cdnjs.cloudflare.com/ajax/libs/swagger-ui/5.9.0/swagger-ui.css"
    )


TIMEOUT = 5
LOOP_INTERVAL = 0.1


async def fetch(client, url, results, index):
    try:
        r = await client.get(url)
    except (httpx.ConnectError, httpx.ReadError):
        ...
    else:
        print(f"{url = }\n{r.elapsed = }")
        if r.status_code < 300:
            results[index] = r.content


class StopNow(Exception):
    ...


async def find_fastest_host(timeout=TIMEOUT, loop_interval=LOOP_INTERVAL) -> str:
    urls = list(CdnHost)
    results = [None] * len(urls)
    try:
        async with anyio.create_task_group() as tg:
            with anyio.move_on_after(timeout):
                async with httpx.AsyncClient() as client:
                    for i, url in enumerate(urls):
                        tg.start_soon(fetch, client, url, results, i)
                    for _ in range(int(timeout / loop_interval) + 1):
                        for res in results:
                            if res is not None:
                                raise StopNow
                        await anyio.sleep(0.1)
    except (
        StopNow,
        httpx.ReadError,
        httpx.ReadTimeout,
        httpx.ConnectError,
        httpx.ConnectTimeout,
    ):
        ...
    for url, res in zip(urls, results):
        if res is not None:
            return url
    return urls[0]


async def main():
    with timeit("Sniff hosts"):
        url = await find_fastest_host()
    print("cdn host:", CdnHost)
    print("result:", url)


if __name__ == "__main__":
    anyio.run(main)

There are three cdn hosts (https://cdn.jsdelivr.net, https://unpkg.com, https://cdnjs.cloudflare.com). I make three concurrent async task to fetch them by httpx.
If one of them get a response with status_code<300, then stop all task and return the right url. But I don’t know how to cancel tasks without using a custom exception (in the script is StopNow).

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

>Solution :

You can call the cancel method of the cancel_scope attribute of the task group to cancel all of its tasks:

async with anyio.create_task_group() as tg:
    ...
    tg.cancel_scope.cancel()
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading