Skip to content

jmcore.tasks

jmcore.tasks

Shared async task utilities for JoinMarket components.

Provides common patterns for periodic background tasks used by both maker and taker components.

Functions

parse_directory_address(server: str, default_port: int = 5222) -> tuple[str, int]

Parse a directory server address string into host and port.

Args: server: Server address in "host:port" or "host" format default_port: Port to use if not specified (default: 5222)

Returns: Tuple of (host, port)

Source code in jmcore/src/jmcore/tasks.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def parse_directory_address(server: str, default_port: int = 5222) -> tuple[str, int]:
    """
    Parse a directory server address string into host and port.

    Args:
        server: Server address in "host:port" or "host" format
        default_port: Port to use if not specified (default: 5222)

    Returns:
        Tuple of (host, port)
    """
    parts = server.split(":")
    host = parts[0]
    port = int(parts[1]) if len(parts) > 1 else default_port
    return host, port

run_periodic_task(name: str, callback: Callable[[], Coroutine[Any, Any, None]], interval: float, initial_delay: float = 0.0, running_check: Callable[[], bool] | None = None) -> None async

Run a callback periodically until cancelled or running_check returns False.

Args: name: Human-readable task name for logging callback: Async function to call each interval interval: Seconds between invocations initial_delay: Seconds to wait before first invocation running_check: Optional callable returning False to stop the task

Source code in jmcore/src/jmcore/tasks.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
async def run_periodic_task(
    name: str,
    callback: Callable[[], Coroutine[Any, Any, None]],
    interval: float,
    initial_delay: float = 0.0,
    running_check: Callable[[], bool] | None = None,
) -> None:
    """
    Run a callback periodically until cancelled or running_check returns False.

    Args:
        name: Human-readable task name for logging
        callback: Async function to call each interval
        interval: Seconds between invocations
        initial_delay: Seconds to wait before first invocation
        running_check: Optional callable returning False to stop the task
    """
    if initial_delay > 0:
        await asyncio.sleep(initial_delay)

    while running_check is None or running_check():
        try:
            await asyncio.sleep(interval)
            await callback()
        except asyncio.CancelledError:
            logger.info(f"{name} task cancelled")
            break
        except Exception as e:
            logger.error(f"Error in {name}: {e}")

    logger.info(f"{name} task stopped")