Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Testing concurrent.futures.TimeoutError and logging in a threaded function using Pytest

I’ve come across a testing challenge in my Python project and I’m hoping to get some insights from the community. I have a utility module containing a function threaded_execute which utilizes the concurrent.futures module to execute a function in a separate thread. If a concurrent.futures.TimeoutError occurs, it logs a warning and retries the function. I’m using pytest for testing and I want to specifically test the logging of the TimeoutError without installing any additional packages.

Here’s a simplified version of my code:

import logging
from concurrent import futures

logger = logging.getLogger(__name__)

THREAD_TIMEOUT: float = 60  # For simplicity, just hardcoding the value here

def threaded_execute(func, *args, timeout=None, **kwargs):
    timeout = timeout or THREAD_TIMEOUT
    while True:
        with futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(func, *args, **kwargs)
            try:
                return future.result(timeout=timeout)
            except futures.TimeoutError as exc:
                logger.warning(exc)
                continue

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

>Solution :

I’d write a mock executee that takes its sweet time on the first invocation, and none on the next:

import logging

import time
from concurrent import futures

logger = logging.getLogger(__name__)


def threaded_execute(func, *args, timeout: float, **kwargs):
    while True:
        with futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(func, *args, **kwargs)
            try:
                return future.result(timeout=timeout)
            except futures.TimeoutError as exc:
                logger.warning("Timed out", exc_info=True)
                continue


def test_threaded_execute(caplog):
    i = 0

    def fun():
        nonlocal i
        i += 1
        if i == 1:  # first time around, time out
            time.sleep(1)
        return f"OK {i}"

    assert threaded_execute(fun, timeout=0.5) == "OK 2"
    assert caplog.records[0].message == "Timed out"
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading