Skip to content

directory_server.heartbeat

directory_server.heartbeat

Heartbeat liveness detection for directory server peers.

Implements the PING/PONG heartbeat protocol compatible with joinmarket-rs. The directory server periodically probes idle peers and evicts unresponsive ones.

Protocol flow (matching joinmarket-rs behavior): 1. Every sweep_interval_sec seconds, run a heartbeat sweep. 2. Hard-evict all peers whose last_seen exceeds hard_evict_sec. 3. Probe peers idle longer than idle_threshold_sec: - Ping-capable peers: send PING (type 798), mark pong_pending. - Non-ping makers: send unicast !orderbook to elicit an offer re-announcement (which updates last_seen). - Non-ping takers/watchers: no probe -- they will be hard-evicted when hard_evict_sec is reached. 4. Wait pong_wait_sec seconds for responses. 5. Evict all peers still having pong_pending = True.

Timing constants match joinmarket-rs defaults: - sweep_interval_sec = 60 - idle_threshold_sec = 600 (10 minutes) - hard_evict_sec = 1500 (25 minutes) - pong_wait_sec = 30

Attributes

EvictCallback = Callable[[str, str], Awaitable[None]] module-attribute

SendCallback = Callable[[str, bytes], Awaitable[None]] module-attribute

Classes

HeartbeatConfig dataclass

Heartbeat timing configuration.

Defaults match joinmarket-rs for interoperability.

Source code in directory_server/src/directory_server/heartbeat.py
44
45
46
47
48
49
50
51
52
53
54
@dataclass
class HeartbeatConfig:
    """Heartbeat timing configuration.

    Defaults match joinmarket-rs for interoperability.
    """

    sweep_interval_sec: float = 60.0
    idle_threshold_sec: float = 600.0  # 10 minutes
    hard_evict_sec: float = 1500.0  # 25 minutes
    pong_wait_sec: float = 30.0
Attributes
hard_evict_sec: float = 1500.0 class-attribute instance-attribute
idle_threshold_sec: float = 600.0 class-attribute instance-attribute
pong_wait_sec: float = 30.0 class-attribute instance-attribute
sweep_interval_sec: float = 60.0 class-attribute instance-attribute

HeartbeatManager

Manages periodic heartbeat sweeps for the directory server.

Args: peer_registry: Registry to query peer state and last_seen. send_callback: Async callable (peer_key, data_bytes) -> None. evict_callback: Async callable (peer_key, reason) -> None that disconnects and unregisters a peer. config: Timing configuration.

Source code in directory_server/src/directory_server/heartbeat.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class HeartbeatManager:
    """Manages periodic heartbeat sweeps for the directory server.

    Args:
        peer_registry: Registry to query peer state and last_seen.
        send_callback: Async callable ``(peer_key, data_bytes) -> None``.
        evict_callback: Async callable ``(peer_key, reason) -> None``
            that disconnects and unregisters a peer.
        config: Timing configuration.
    """

    def __init__(
        self,
        peer_registry: PeerRegistry,
        send_callback: SendCallback,
        evict_callback: EvictCallback,
        config: HeartbeatConfig | None = None,
        server_nick: str = "",
    ) -> None:
        self.peer_registry = peer_registry
        self.send_callback = send_callback
        self.evict_callback = evict_callback
        self.config = config or HeartbeatConfig()
        self.server_nick = server_nick

        # Peers waiting for a PONG reply
        self._pong_pending: set[str] = set()
        self._task: asyncio.Task[None] | None = None

    # -- public API --

    def start(self) -> None:
        """Start the background heartbeat loop."""
        if self._task is not None:
            return
        self._task = asyncio.create_task(self._loop(), name="heartbeat")
        logger.info(
            f"Heartbeat started (sweep={self.config.sweep_interval_sec}s, "
            f"idle={self.config.idle_threshold_sec}s, "
            f"hard_evict={self.config.hard_evict_sec}s, "
            f"pong_wait={self.config.pong_wait_sec}s)"
        )

    async def stop(self) -> None:
        """Stop the background heartbeat loop."""
        if self._task is None:
            return
        self._task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._task
        self._task = None
        self._pong_pending.clear()
        logger.info("Heartbeat stopped")

    def handle_pong(self, peer_key: str) -> None:
        """Called by the message router when a PONG is received.

        Clears ``pong_pending`` so the peer is not evicted.
        """
        if peer_key in self._pong_pending:
            self._pong_pending.discard(peer_key)
            logger.trace(f"Heartbeat: PONG received from {peer_key}")

    @property
    def pong_pending(self) -> frozenset[str]:
        """Read-only view of peers with pending pongs (for testing)."""
        return frozenset(self._pong_pending)

    # -- internal --

    async def _loop(self) -> None:
        """Main heartbeat loop."""
        while True:
            await asyncio.sleep(self.config.sweep_interval_sec)
            try:
                await self._sweep()
            except asyncio.CancelledError:
                raise
            except Exception:
                logger.exception("Heartbeat sweep failed")

    async def _sweep(self) -> None:
        """Run a single heartbeat sweep."""
        from datetime import UTC, datetime, timedelta

        now = datetime.now(UTC)
        hard_cutoff = now - timedelta(seconds=self.config.hard_evict_sec)
        idle_cutoff = now - timedelta(seconds=self.config.idle_threshold_sec)

        # Phase 1: hard-evict peers that exceeded the absolute timeout
        hard_evict_peers = self.peer_registry.get_peers_idle_since(hard_cutoff)
        for peer_key, peer_info in hard_evict_peers:
            logger.info(
                f"Heartbeat: hard-evicting {peer_info.nick} ({peer_key}) "
                f"-- idle since {peer_info.last_seen}"
            )
            self._pong_pending.discard(peer_key)
            await self.evict_callback(peer_key, "idle timeout")

        # Phase 2: probe idle peers (idle > idle_threshold but < hard_evict)
        idle_peers = self.peer_registry.get_peers_idle_since(idle_cutoff)
        pinged_keys: list[str] = []

        for peer_key, peer_info in idle_peers:
            # Skip peers already hard-evicted above
            if self.peer_registry.get_by_key(peer_key) is None:
                continue

            if self.peer_registry.supports_ping(peer_key):
                # Send PING to ping-capable peers
                await self._send_ping(peer_key)
                self._pong_pending.add(peer_key)
                pinged_keys.append(peer_key)
            elif self.peer_registry.is_maker(peer_key):
                # Send unicast !orderbook to non-ping makers to elicit a response
                await self._send_orderbook_probe(peer_key, peer_info.nick)
            # else: non-ping takers/watchers -- no probe, will be hard-evicted later

        if not pinged_keys:
            return

        logger.debug(f"Heartbeat: pinged {len(pinged_keys)} idle peers, waiting for PONGs")

        # Phase 3: wait for PONGs
        await asyncio.sleep(self.config.pong_wait_sec)

        # Phase 4: evict peers that did not respond
        timed_out = [k for k in pinged_keys if k in self._pong_pending]
        for peer_key in timed_out:
            peer = self.peer_registry.get_by_key(peer_key)
            nick = peer.nick if peer else peer_key
            logger.info(f"Heartbeat: evicting {nick} ({peer_key}) -- no PONG response")
            self._pong_pending.discard(peer_key)
            await self.evict_callback(peer_key, "pong timeout")

        if timed_out:
            logger.info(f"Heartbeat: evicted {len(timed_out)} unresponsive peers")

    async def _send_ping(self, peer_key: str) -> None:
        """Send a PING message to a peer."""
        ping_envelope = MessageEnvelope(message_type=MessageType.PING, payload="")
        try:
            await self.send_callback(peer_key, ping_envelope.to_bytes())
            logger.trace(f"Heartbeat: sent PING to {peer_key}")
        except Exception as e:
            logger.debug(f"Heartbeat: failed to send PING to {peer_key}: {e}")
            # Send failure likely means peer is already gone; evict immediately
            self._pong_pending.discard(peer_key)
            await self.evict_callback(peer_key, f"send failed: {e}")

    async def _send_orderbook_probe(self, peer_key: str, nick: str) -> None:
        """Send a unicast !orderbook to a non-ping maker to probe liveness.

        The maker will respond with its offers, which updates last_seen on the
        next message receive.  This matches the joinmarket-rs fallback for
        peers that don't support PING.

        The probe is formatted as a standard PUBMSG addressed to PUBLIC so the
        reference implementation routes it through ``on_pubmsg`` (which handles
        ``!orderbook``) rather than ``on_privmsg`` (which requires a signature).
        Although the wire message is sent unicast to the target peer, the
        payload uses the correct ``from_nick!PUBLIC!orderbook`` format.
        """
        from jmcore.protocol import COMMAND_PREFIX

        # Build a properly formatted PUBMSG:
        # from_nick!PUBLIC!orderbook
        # The reference client splits on COMMAND_PREFIX ("!") and checks
        # to_nick == "PUBLIC" to route to on_pubmsg.
        payload = f"{self.server_nick}{COMMAND_PREFIX}PUBLIC{COMMAND_PREFIX}orderbook"
        probe_envelope = MessageEnvelope(
            message_type=MessageType.PUBMSG,
            payload=payload,
        )
        try:
            await self.send_callback(peer_key, probe_envelope.to_bytes())
            logger.trace(f"Heartbeat: sent !orderbook probe to {nick} ({peer_key})")
        except Exception as e:
            logger.debug(f"Heartbeat: failed to send !orderbook probe to {peer_key}: {e}")
Attributes
config = config or HeartbeatConfig() instance-attribute
evict_callback = evict_callback instance-attribute
peer_registry = peer_registry instance-attribute
pong_pending: frozenset[str] property

Read-only view of peers with pending pongs (for testing).

send_callback = send_callback instance-attribute
server_nick = server_nick instance-attribute
Functions
__init__(peer_registry: PeerRegistry, send_callback: SendCallback, evict_callback: EvictCallback, config: HeartbeatConfig | None = None, server_nick: str = '') -> None
Source code in directory_server/src/directory_server/heartbeat.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(
    self,
    peer_registry: PeerRegistry,
    send_callback: SendCallback,
    evict_callback: EvictCallback,
    config: HeartbeatConfig | None = None,
    server_nick: str = "",
) -> None:
    self.peer_registry = peer_registry
    self.send_callback = send_callback
    self.evict_callback = evict_callback
    self.config = config or HeartbeatConfig()
    self.server_nick = server_nick

    # Peers waiting for a PONG reply
    self._pong_pending: set[str] = set()
    self._task: asyncio.Task[None] | None = None
handle_pong(peer_key: str) -> None

Called by the message router when a PONG is received.

Clears pong_pending so the peer is not evicted.

Source code in directory_server/src/directory_server/heartbeat.py
111
112
113
114
115
116
117
118
def handle_pong(self, peer_key: str) -> None:
    """Called by the message router when a PONG is received.

    Clears ``pong_pending`` so the peer is not evicted.
    """
    if peer_key in self._pong_pending:
        self._pong_pending.discard(peer_key)
        logger.trace(f"Heartbeat: PONG received from {peer_key}")
start() -> None

Start the background heartbeat loop.

Source code in directory_server/src/directory_server/heartbeat.py
88
89
90
91
92
93
94
95
96
97
98
def start(self) -> None:
    """Start the background heartbeat loop."""
    if self._task is not None:
        return
    self._task = asyncio.create_task(self._loop(), name="heartbeat")
    logger.info(
        f"Heartbeat started (sweep={self.config.sweep_interval_sec}s, "
        f"idle={self.config.idle_threshold_sec}s, "
        f"hard_evict={self.config.hard_evict_sec}s, "
        f"pong_wait={self.config.pong_wait_sec}s)"
    )
stop() -> None async

Stop the background heartbeat loop.

Source code in directory_server/src/directory_server/heartbeat.py
100
101
102
103
104
105
106
107
108
109
async def stop(self) -> None:
    """Stop the background heartbeat loop."""
    if self._task is None:
        return
    self._task.cancel()
    with contextlib.suppress(asyncio.CancelledError):
        await self._task
    self._task = None
    self._pong_pending.clear()
    logger.info("Heartbeat stopped")