Skip to content

orderbook_watcher.health_checker

orderbook_watcher.health_checker

Maker health checking via direct onion connection.

This module provides functionality to verify maker availability by connecting directly to their onion addresses when possible, performing handshakes to extract features, and tracking reachability status.

Attributes

Classes

MakerHealthChecker

Checks maker reachability via direct onion connections.

This class performs periodic health checks on makers by: 1. Connecting directly to their onion addresses 2. Performing handshake to verify they're online 3. Extracting feature flags from handshake response 4. Tracking reachability history

Health checks are rate-limited to avoid stressing makers.

Source code in orderbook_watcher/src/orderbook_watcher/health_checker.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
class MakerHealthChecker:
    """
    Checks maker reachability via direct onion connections.

    This class performs periodic health checks on makers by:
    1. Connecting directly to their onion addresses
    2. Performing handshake to verify they're online
    3. Extracting feature flags from handshake response
    4. Tracking reachability history

    Health checks are rate-limited to avoid stressing makers.
    """

    def __init__(
        self,
        network: str,
        socks_host: str = "127.0.0.1",
        socks_port: int = 9050,
        timeout: float = 15.0,
        check_interval: float = 600.0,  # 10 minutes
        max_concurrent_checks: int = 10,
        socks_username: str | None = None,
        socks_password: str | None = None,
    ) -> None:
        """
        Initialize MakerHealthChecker.

        Args:
            network: Bitcoin network (mainnet, testnet, signet, regtest)
            socks_host: SOCKS proxy host for Tor
            socks_port: SOCKS proxy port for Tor
            timeout: Connection timeout in seconds
            check_interval: Minimum seconds between checks for same maker
            max_concurrent_checks: Maximum concurrent health checks
            socks_username: SOCKS5 username for Tor stream isolation (optional)
            socks_password: SOCKS5 password for Tor stream isolation (optional)
        """
        self.network = network
        self.socks_host = socks_host
        self.socks_port = socks_port
        self.socks_username = socks_username
        self.socks_password = socks_password
        self.timeout = timeout
        self.check_interval = check_interval
        self.max_concurrent_checks = max_concurrent_checks

        # Health status tracking: location -> status
        self.health_status: dict[str, MakerHealthStatus] = {}

        # Semaphore to limit concurrent checks
        self._check_semaphore = asyncio.Semaphore(max_concurrent_checks)

        # Nick identity for handshake (ephemeral)
        self.nick_identity = NickIdentity(JM_VERSION)

    async def check_maker(self, nick: str, location: str, force: bool = False) -> MakerHealthStatus:
        """
        Check if a maker is reachable via direct connection.

        Args:
            nick: Maker's nick
            location: Maker's onion address (format: onion:port)
            force: Force check even if recently checked

        Returns:
            MakerHealthStatus with reachability info and features
        """
        # Check if we should skip this check (rate limiting)
        current_time = time.time()
        if not force and location in self.health_status:
            last_check = self.health_status[location].last_check_time
            if current_time - last_check < self.check_interval:
                logger.debug(
                    f"Skipping health check for {nick} at {location} "
                    f"(checked {current_time - last_check:.0f}s ago)"
                )
                return self.health_status[location]

        async with self._check_semaphore:
            return await self._check_maker_impl(nick, location, current_time)

    async def _check_maker_impl(
        self, nick: str, location: str, current_time: float
    ) -> MakerHealthStatus:
        """Internal implementation of maker health check."""
        # Parse location
        if location == "NOT-SERVING-ONION":
            # Cannot check makers that don't serve onion
            status = MakerHealthStatus(
                location=location,
                nick=nick,
                reachable=False,
                last_check_time=current_time,
                last_success_time=None,
                consecutive_failures=0,
                features=FeatureSet(),
                error="NOT-SERVING-ONION",
            )
            self.health_status[location] = status
            return status

        try:
            host, port_str = location.split(":")
            port = int(port_str)
        except (ValueError, AttributeError) as e:
            logger.warning(f"Invalid location format: {location}: {e}")
            status = MakerHealthStatus(
                location=location,
                nick=nick,
                reachable=False,
                last_check_time=current_time,
                last_success_time=None,
                consecutive_failures=self.health_status.get(
                    location,
                    MakerHealthStatus(
                        location=location,
                        nick=nick,
                        reachable=False,
                        last_check_time=0,
                        last_success_time=None,
                        consecutive_failures=0,
                        features=FeatureSet(),
                    ),
                ).consecutive_failures
                + 1,
                features=FeatureSet(),
                error=f"Invalid location: {e}",
            )
            self.health_status[location] = status
            return status

        # Try to connect and perform handshake
        logger.debug(f"Health check: connecting to {nick} at {location}")
        connection = None
        try:
            # Connect via Tor
            connection = await connect_via_tor(
                host,
                port,
                self.socks_host,
                self.socks_port,
                max_message_size=2097152,
                timeout=self.timeout,
                socks_username=self.socks_username,
                socks_password=self.socks_password,
            )

            # Perform handshake
            # Request peerlist_features support to get maker's features from handshake

            our_features = FeatureSet(features={FEATURE_PEERLIST_FEATURES})
            handshake_data = create_handshake_request(
                nick=self.nick_identity.nick,
                location="NOT-SERVING-ONION",
                network=self.network,
                directory=False,
                features=our_features,
            )

            handshake_msg = {
                "type": 793,  # MessageType.HANDSHAKE
                "line": json.dumps(handshake_data),
            }
            await connection.send(json.dumps(handshake_msg).encode("utf-8"))

            # Wait for response
            response_data = await asyncio.wait_for(connection.receive(), timeout=self.timeout)
            response = json.loads(response_data.decode("utf-8"))

            if response.get("type") not in (793, 795):  # HANDSHAKE or DN_HANDSHAKE
                raise Exception(f"Unexpected response type: {response.get('type')}")

            handshake_response = json.loads(response["line"])

            # DN_HANDSHAKE (795) responses from directories have an "accepted" field.
            # HANDSHAKE (793) responses from non-directory peers (makers) use the
            # client handshake format which does NOT have "accepted" -- receiving a
            # valid handshake back implicitly means acceptance.
            response_type = response.get("type")
            if response_type == 795 and not handshake_response.get("accepted", False):
                raise Exception("Handshake rejected")
            # For type 793 (peer handshake), receiving a valid response = accepted

            # Extract features from handshake
            features = FeatureSet.from_handshake(handshake_response)

            # Maker is reachable!
            status = MakerHealthStatus(
                location=location,
                nick=nick,
                reachable=True,
                last_check_time=current_time,
                last_success_time=current_time,
                consecutive_failures=0,
                features=features,
                error=None,
            )
            self.health_status[location] = status
            logger.info(
                f"Health check: {nick} at {location} is REACHABLE "
                f"(features: {features.to_comma_string() or 'none'})"
            )
            return status

        except TimeoutError:
            error = "Connection timeout"
            logger.debug(f"Health check: {nick} at {location} timed out")
        except Exception as e:
            error = str(e)
            logger.debug(f"Health check: {nick} at {location} failed: {e}")

        finally:
            if connection:
                with contextlib.suppress(Exception):
                    await connection.close()

        # Maker is unreachable
        old_status = self.health_status.get(location)
        consecutive_failures = (old_status.consecutive_failures + 1) if old_status else 1
        last_success = old_status.last_success_time if old_status else None

        status = MakerHealthStatus(
            location=location,
            nick=nick,
            reachable=False,
            last_check_time=current_time,
            last_success_time=last_success,
            consecutive_failures=consecutive_failures,
            features=old_status.features if old_status else FeatureSet(),
            error=error,
        )
        self.health_status[location] = status

        if consecutive_failures >= 3:
            logger.warning(
                f"Health check: {nick} at {location} is UNREACHABLE "
                f"({consecutive_failures} consecutive failures, error: {error})"
            )

        return status

    async def check_makers_batch(
        self, makers: list[tuple[str, str]], force: bool = False
    ) -> dict[str, MakerHealthStatus]:
        """
        Check health of multiple makers in parallel.

        Args:
            makers: List of (nick, location) tuples
            force: Force check even if recently checked

        Returns:
            Dict mapping location to MakerHealthStatus
        """
        tasks = [self.check_maker(nick, location, force) for nick, location in makers]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        status_map: dict[str, MakerHealthStatus] = {}
        for (nick, location), result in zip(makers, results, strict=True):
            if isinstance(result, BaseException):
                # Handle both Exception and BaseException (e.g., asyncio.CancelledError)
                logger.error(f"Health check for {nick} at {location} raised exception: {result}")
                status_map[location] = MakerHealthStatus(
                    location=location,
                    nick=nick,
                    reachable=False,
                    last_check_time=time.time(),
                    last_success_time=None,
                    consecutive_failures=self.health_status.get(
                        location,
                        MakerHealthStatus(
                            location=location,
                            nick=nick,
                            reachable=False,
                            last_check_time=0,
                            last_success_time=None,
                            consecutive_failures=0,
                            features=FeatureSet(),
                        ),
                    ).consecutive_failures
                    + 1,
                    features=FeatureSet(),
                    error=str(result),
                )
            else:
                # Type narrowing: result is MakerHealthStatus here
                status_map[location] = result

        return status_map

    def get_unreachable_locations(self, max_consecutive_failures: int = 3) -> set[str]:
        """
        Get set of locations that are considered unreachable.

        Args:
            max_consecutive_failures: Number of failures before marking unreachable

        Returns:
            Set of location strings for unreachable makers
        """
        return {
            location
            for location, status in self.health_status.items()
            if not status.is_healthy(max_consecutive_failures)
        }

    def get_feature_map(self) -> dict[str, FeatureSet]:
        """
        Get map of locations to their feature sets.

        Only includes makers that have been successfully checked.

        Returns:
            Dict mapping location to FeatureSet
        """
        return {
            location: status.features
            for location, status in self.health_status.items()
            if status.last_success_time is not None
        }

    def clear_status(self, location: str) -> None:
        """Clear health status for a location (e.g., when maker reconnects)."""
        self.health_status.pop(location, None)
Attributes
check_interval = check_interval instance-attribute
health_status: dict[str, MakerHealthStatus] = {} instance-attribute
max_concurrent_checks = max_concurrent_checks instance-attribute
network = network instance-attribute
nick_identity = NickIdentity(JM_VERSION) instance-attribute
socks_host = socks_host instance-attribute
socks_password = socks_password instance-attribute
socks_port = socks_port instance-attribute
socks_username = socks_username instance-attribute
timeout = timeout instance-attribute
Functions
__init__(network: str, socks_host: str = '127.0.0.1', socks_port: int = 9050, timeout: float = 15.0, check_interval: float = 600.0, max_concurrent_checks: int = 10, socks_username: str | None = None, socks_password: str | None = None) -> None

Initialize MakerHealthChecker.

Args: network: Bitcoin network (mainnet, testnet, signet, regtest) socks_host: SOCKS proxy host for Tor socks_port: SOCKS proxy port for Tor timeout: Connection timeout in seconds check_interval: Minimum seconds between checks for same maker max_concurrent_checks: Maximum concurrent health checks socks_username: SOCKS5 username for Tor stream isolation (optional) socks_password: SOCKS5 password for Tor stream isolation (optional)

Source code in orderbook_watcher/src/orderbook_watcher/health_checker.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __init__(
    self,
    network: str,
    socks_host: str = "127.0.0.1",
    socks_port: int = 9050,
    timeout: float = 15.0,
    check_interval: float = 600.0,  # 10 minutes
    max_concurrent_checks: int = 10,
    socks_username: str | None = None,
    socks_password: str | None = None,
) -> None:
    """
    Initialize MakerHealthChecker.

    Args:
        network: Bitcoin network (mainnet, testnet, signet, regtest)
        socks_host: SOCKS proxy host for Tor
        socks_port: SOCKS proxy port for Tor
        timeout: Connection timeout in seconds
        check_interval: Minimum seconds between checks for same maker
        max_concurrent_checks: Maximum concurrent health checks
        socks_username: SOCKS5 username for Tor stream isolation (optional)
        socks_password: SOCKS5 password for Tor stream isolation (optional)
    """
    self.network = network
    self.socks_host = socks_host
    self.socks_port = socks_port
    self.socks_username = socks_username
    self.socks_password = socks_password
    self.timeout = timeout
    self.check_interval = check_interval
    self.max_concurrent_checks = max_concurrent_checks

    # Health status tracking: location -> status
    self.health_status: dict[str, MakerHealthStatus] = {}

    # Semaphore to limit concurrent checks
    self._check_semaphore = asyncio.Semaphore(max_concurrent_checks)

    # Nick identity for handshake (ephemeral)
    self.nick_identity = NickIdentity(JM_VERSION)
check_maker(nick: str, location: str, force: bool = False) -> MakerHealthStatus async

Check if a maker is reachable via direct connection.

Args: nick: Maker's nick location: Maker's onion address (format: onion:port) force: Force check even if recently checked

Returns: MakerHealthStatus with reachability info and features

Source code in orderbook_watcher/src/orderbook_watcher/health_checker.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def check_maker(self, nick: str, location: str, force: bool = False) -> MakerHealthStatus:
    """
    Check if a maker is reachable via direct connection.

    Args:
        nick: Maker's nick
        location: Maker's onion address (format: onion:port)
        force: Force check even if recently checked

    Returns:
        MakerHealthStatus with reachability info and features
    """
    # Check if we should skip this check (rate limiting)
    current_time = time.time()
    if not force and location in self.health_status:
        last_check = self.health_status[location].last_check_time
        if current_time - last_check < self.check_interval:
            logger.debug(
                f"Skipping health check for {nick} at {location} "
                f"(checked {current_time - last_check:.0f}s ago)"
            )
            return self.health_status[location]

    async with self._check_semaphore:
        return await self._check_maker_impl(nick, location, current_time)
check_makers_batch(makers: list[tuple[str, str]], force: bool = False) -> dict[str, MakerHealthStatus] async

Check health of multiple makers in parallel.

Args: makers: List of (nick, location) tuples force: Force check even if recently checked

Returns: Dict mapping location to MakerHealthStatus

Source code in orderbook_watcher/src/orderbook_watcher/health_checker.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
async def check_makers_batch(
    self, makers: list[tuple[str, str]], force: bool = False
) -> dict[str, MakerHealthStatus]:
    """
    Check health of multiple makers in parallel.

    Args:
        makers: List of (nick, location) tuples
        force: Force check even if recently checked

    Returns:
        Dict mapping location to MakerHealthStatus
    """
    tasks = [self.check_maker(nick, location, force) for nick, location in makers]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    status_map: dict[str, MakerHealthStatus] = {}
    for (nick, location), result in zip(makers, results, strict=True):
        if isinstance(result, BaseException):
            # Handle both Exception and BaseException (e.g., asyncio.CancelledError)
            logger.error(f"Health check for {nick} at {location} raised exception: {result}")
            status_map[location] = MakerHealthStatus(
                location=location,
                nick=nick,
                reachable=False,
                last_check_time=time.time(),
                last_success_time=None,
                consecutive_failures=self.health_status.get(
                    location,
                    MakerHealthStatus(
                        location=location,
                        nick=nick,
                        reachable=False,
                        last_check_time=0,
                        last_success_time=None,
                        consecutive_failures=0,
                        features=FeatureSet(),
                    ),
                ).consecutive_failures
                + 1,
                features=FeatureSet(),
                error=str(result),
            )
        else:
            # Type narrowing: result is MakerHealthStatus here
            status_map[location] = result

    return status_map
clear_status(location: str) -> None

Clear health status for a location (e.g., when maker reconnects).

Source code in orderbook_watcher/src/orderbook_watcher/health_checker.py
367
368
369
def clear_status(self, location: str) -> None:
    """Clear health status for a location (e.g., when maker reconnects)."""
    self.health_status.pop(location, None)
get_feature_map() -> dict[str, FeatureSet]

Get map of locations to their feature sets.

Only includes makers that have been successfully checked.

Returns: Dict mapping location to FeatureSet

Source code in orderbook_watcher/src/orderbook_watcher/health_checker.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def get_feature_map(self) -> dict[str, FeatureSet]:
    """
    Get map of locations to their feature sets.

    Only includes makers that have been successfully checked.

    Returns:
        Dict mapping location to FeatureSet
    """
    return {
        location: status.features
        for location, status in self.health_status.items()
        if status.last_success_time is not None
    }
get_unreachable_locations(max_consecutive_failures: int = 3) -> set[str]

Get set of locations that are considered unreachable.

Args: max_consecutive_failures: Number of failures before marking unreachable

Returns: Set of location strings for unreachable makers

Source code in orderbook_watcher/src/orderbook_watcher/health_checker.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def get_unreachable_locations(self, max_consecutive_failures: int = 3) -> set[str]:
    """
    Get set of locations that are considered unreachable.

    Args:
        max_consecutive_failures: Number of failures before marking unreachable

    Returns:
        Set of location strings for unreachable makers
    """
    return {
        location
        for location, status in self.health_status.items()
        if not status.is_healthy(max_consecutive_failures)
    }

MakerHealthStatus dataclass

Health status for a maker.

Source code in orderbook_watcher/src/orderbook_watcher/health_checker.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@dataclass
class MakerHealthStatus:
    """Health status for a maker."""

    location: str  # onion:port
    nick: str
    reachable: bool
    last_check_time: float
    last_success_time: float | None
    consecutive_failures: int
    features: FeatureSet
    error: str | None = None

    def is_healthy(self, max_consecutive_failures: int = 3) -> bool:
        """Check if maker is considered healthy."""
        return self.reachable and self.consecutive_failures < max_consecutive_failures
Attributes
consecutive_failures: int instance-attribute
error: str | None = None class-attribute instance-attribute
features: FeatureSet instance-attribute
last_check_time: float instance-attribute
last_success_time: float | None instance-attribute
location: str instance-attribute
nick: str instance-attribute
reachable: bool instance-attribute
Functions
is_healthy(max_consecutive_failures: int = 3) -> bool

Check if maker is considered healthy.

Source code in orderbook_watcher/src/orderbook_watcher/health_checker.py
41
42
43
def is_healthy(self, max_consecutive_failures: int = 3) -> bool:
    """Check if maker is considered healthy."""
    return self.reachable and self.consecutive_failures < max_consecutive_failures

Functions