Skip to content

directory_server.message_router

directory_server.message_router

Message routing logic for forwarding messages between peers.

Implements Single Responsibility Principle: only handles message routing.

Attributes

DEFAULT_BROADCAST_BATCH_SIZE = 50 module-attribute

FailedSendCallback = Callable[[str], Awaitable[None]] module-attribute

SendCallback = Callable[[str, bytes], Awaitable[None]] module-attribute

Classes

MessageRouter

Source code in directory_server/src/directory_server/message_router.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
class MessageRouter:
    def __init__(
        self,
        peer_registry: PeerRegistry,
        send_callback: SendCallback,
        broadcast_batch_size: int = DEFAULT_BROADCAST_BATCH_SIZE,
        on_send_failed: FailedSendCallback | None = None,
    ):
        self.peer_registry = peer_registry
        self.send_callback = send_callback
        self.broadcast_batch_size = broadcast_batch_size
        self.on_send_failed = on_send_failed
        # Track peers that failed during current operation to avoid repeated attempts
        self._failed_peers: set[str] = set()
        # Track offers per peer (peer_key -> set of order IDs)
        self._peer_offers: dict[str, set[str]] = {}

    async def route_message(self, envelope: MessageEnvelope, from_key: str) -> None:
        if envelope.message_type == MessageType.PUBMSG:
            await self._handle_public_message(envelope, from_key)
        elif envelope.message_type == MessageType.PRIVMSG:
            await self._handle_private_message(envelope, from_key)
        elif envelope.message_type == MessageType.GETPEERLIST:
            await self._handle_peerlist_request(from_key)
        elif envelope.message_type == MessageType.PING:
            await self._handle_ping(from_key)
        else:
            logger.debug(f"Unhandled message type: {envelope.message_type}")

    async def _handle_public_message(self, envelope: MessageEnvelope, from_key: str) -> None:
        parsed = parse_jm_message(envelope.payload)
        if not parsed:
            logger.warning("Invalid public message format")
            return

        from_nick, to_nick, rest = parsed
        if to_nick != "PUBLIC":
            logger.warning(f"Public message not addressed to PUBLIC: {to_nick}")
            return

        from_peer = self.peer_registry.get_by_key(from_key)
        if not from_peer:
            logger.warning(f"Unknown peer sending public message: {from_key}")
            return

        # Track offers (absorder, absoffer, reloffer, relorder)
        if rest:
            message_parts = rest.split()
            if (
                message_parts
                and message_parts[0]
                in (
                    "!absorder",
                    "!absoffer",
                    "!reloffer",
                    "!relorder",
                    "sw0absorder",
                    "sw0absoffer",
                    "sw0reloffer",
                    "sw0relorder",
                )
                and len(message_parts) >= 2
            ):
                # Extract order ID (second field in offer messages)
                try:
                    order_id = message_parts[1]
                    if from_key not in self._peer_offers:
                        self._peer_offers[from_key] = set()
                    self._peer_offers[from_key].add(order_id)
                    logger.trace(
                        f"Tracked offer {order_id} from {from_nick} "
                        f"(total offers: {len(self._peer_offers[from_key])})"
                    )
                except (ValueError, IndexError):
                    pass

        # Pre-serialize envelope once instead of per-peer
        envelope_bytes = envelope.to_bytes()

        # Use generator to avoid building full target list in memory
        def target_generator() -> Iterator[tuple[str, str | None]]:
            for peer in self.peer_registry.iter_connected(from_peer.network):
                peer_key = (
                    peer.nick
                    if peer.location_string == "NOT-SERVING-ONION"
                    else peer.location_string
                )
                if peer_key != from_key:
                    yield (peer_key, peer.nick)

        # Execute sends in batches to limit memory usage
        sent_count = await self._batched_broadcast_iter(target_generator(), envelope_bytes)

        logger.trace(f"Broadcasted public message from {from_nick} to {sent_count} peers")

    async def _safe_send(self, peer_key: str, data: bytes, nick: str | None = None) -> None:
        """Send with exception handling to prevent one failed send from affecting others."""
        # Skip if this peer already failed in current operation
        if peer_key in self._failed_peers:
            return

        try:
            await self.send_callback(peer_key, data)
        except Exception as e:
            logger.warning(f"Failed to send to {nick or peer_key}: {e}")
            # Mark peer as failed to prevent repeated attempts
            self._failed_peers.add(peer_key)
            # Notify server to clean up this peer
            if self.on_send_failed:
                try:
                    await self.on_send_failed(peer_key)
                except Exception as cleanup_err:
                    logger.trace(f"Error in on_send_failed callback: {cleanup_err}")

    async def _batched_broadcast(self, targets: list[tuple[str, str | None]], data: bytes) -> int:
        """
        Broadcast data to targets in batches to limit memory usage.

        Instead of creating all coroutines at once (which caused 2GB+ memory usage),
        we process in batches of broadcast_batch_size to keep memory bounded.

        Returns the number of targets processed.
        """
        return await self._batched_broadcast_iter(iter(targets), data)

    async def _batched_broadcast_iter(
        self, targets: Iterator[tuple[str, str | None]], data: bytes
    ) -> int:
        """
        Broadcast data to targets from an iterator in batches.

        This is the memory-efficient version that consumes targets lazily,
        only materializing batch_size items at a time.

        Returns the number of targets processed.
        """
        # Clear failed peers set at start of broadcast to allow fresh attempts
        # while still preventing repeated attempts within this broadcast
        self._failed_peers.clear()

        total_sent = 0
        batch: list[tuple[str, str | None]] = []

        for target in targets:
            peer_key, nick = target
            # Skip peers that have already failed in this broadcast
            if peer_key in self._failed_peers:
                continue
            batch.append(target)

            if len(batch) >= self.broadcast_batch_size:
                tasks = [self._safe_send(pk, data, n) for pk, n in batch]
                await asyncio.gather(*tasks)
                total_sent += len(batch)
                batch = []

        # Process remaining items
        if batch:
            tasks = [self._safe_send(pk, data, n) for pk, n in batch]
            await asyncio.gather(*tasks)
            total_sent += len(batch)

        return total_sent

    async def _handle_private_message(self, envelope: MessageEnvelope, from_key: str) -> None:
        parsed = parse_jm_message(envelope.payload)
        if not parsed:
            logger.warning("Invalid private message format")
            return

        from_nick, to_nick, rest = parsed
        logger.info(f"PRIVMSG routing: {from_nick} -> {to_nick} (rest: {rest[:50]}...)")

        to_peer = self.peer_registry.get_by_nick(to_nick)
        if not to_peer:
            logger.warning(f"Target peer not found: {to_nick}")
            # Log all registered peers for debugging
            all_peers = list(self.peer_registry._peers.keys())
            logger.info(f"Registered peer keys: {all_peers}")
            nick_map = dict(self.peer_registry._nick_to_key)
            logger.info(f"Nick to key map: {nick_map}")
            return

        from_peer = self.peer_registry.get_by_key(from_key)
        if not from_peer or from_peer.network != to_peer.network:
            logger.warning("Network mismatch or unknown sender")
            return

        try:
            to_peer_key = (
                to_peer.nick
                if to_peer.location_string == "NOT-SERVING-ONION"
                else to_peer.location_string
            )
            logger.info(f"Sending to peer_key: {to_peer_key}")
            await self.send_callback(to_peer_key, envelope.to_bytes())
            logger.info(f"Successfully routed private message: {from_nick} -> {to_nick}")

            await self._send_peer_location(to_peer_key, from_peer)
        except Exception as e:
            logger.warning(f"Failed to route private message to {to_nick}: {e}")
            # Notify server to clean up this peer's mapping
            if self.on_send_failed:
                to_peer_key = (
                    to_peer.nick
                    if to_peer.location_string == "NOT-SERVING-ONION"
                    else to_peer.location_string
                )
                with contextlib.suppress(Exception):
                    await self.on_send_failed(to_peer_key)

    async def _handle_peerlist_request(self, from_key: str) -> None:
        peer = self.peer_registry.get_by_key(from_key)
        if not peer:
            return

        # Check if requesting peer supports peerlist_features
        include_features = peer.features.get("peerlist_features", False)
        await self.send_peerlist(from_key, peer.network, include_features=include_features)

    async def _handle_ping(self, from_key: str) -> None:
        pong_envelope = MessageEnvelope(message_type=MessageType.PONG, payload="")
        try:
            await self.send_callback(from_key, pong_envelope.to_bytes())
            logger.trace(f"Sent PONG to {from_key}")
        except Exception as e:
            logger.trace(f"Failed to send PONG: {e}")

    async def send_peerlist(
        self,
        to_key: str,
        network: NetworkType,
        include_features: bool = False,
        chunk_size: int = 20,
    ) -> None:
        """
        Send peerlist to a peer in chunks.

        Sends multiple PEERLIST messages to avoid overwhelming slow Tor connections.
        Each chunk contains up to `chunk_size` peer entries. Clients should accumulate
        entries from multiple PEERLIST messages.

        Args:
            to_key: Key of the peer to send to
            network: Network to filter peers by
            include_features: If True, include F: suffix with features for each peer.
                             This is enabled when the requesting peer supports peerlist_features.
            chunk_size: Maximum number of peer entries per PEERLIST message (default: 20)
        """
        logger.debug(
            f"send_peerlist called for {to_key}, network={network}, "
            f"include_features={include_features}"
        )

        # Build list of entries
        entries: list[str] = []
        if include_features:
            peers_with_features = self.peer_registry.get_peerlist_with_features(network)
            entries = [
                create_peerlist_entry(nick, loc, features=features)
                for nick, loc, features in peers_with_features
            ]
        else:
            peers = self.peer_registry.get_peerlist_for_network(network)
            entries = [create_peerlist_entry(nick, loc) for nick, loc in peers]

        # Always send at least one response (even if empty) - clients wait for PEERLIST
        if not entries:
            envelope = MessageEnvelope(message_type=MessageType.PEERLIST, payload="")
            try:
                await self.send_callback(to_key, envelope.to_bytes())
                logger.debug(f"Sent empty peerlist to {to_key}")
            except Exception as e:
                logger.warning(f"Failed to send peerlist to {to_key}: {e}")
            return

        # Send entries in chunks
        chunks_sent = 0
        for i in range(0, len(entries), chunk_size):
            chunk = entries[i : i + chunk_size]
            peerlist_msg = ",".join(chunk)
            envelope = MessageEnvelope(message_type=MessageType.PEERLIST, payload=peerlist_msg)

            try:
                await self.send_callback(to_key, envelope.to_bytes())
                chunks_sent += 1
                # Small delay between chunks to avoid overwhelming the connection
                if i + chunk_size < len(entries):
                    await asyncio.sleep(0.05)
            except Exception as e:
                logger.warning(f"Failed to send peerlist chunk {chunks_sent + 1} to {to_key}: {e}")
                return

        logger.debug(
            f"Sent peerlist to {to_key} ({len(entries)} peers in {chunks_sent} chunks, "
            f"include_features={include_features})"
        )

    async def _send_peer_location(self, to_location: str, peer_info: PeerInfo) -> None:
        if peer_info.onion_address == "NOT-SERVING-ONION":
            return

        # Include features if the peer has any - this ensures recipients can learn about
        # the peer's capabilities (e.g., neutrino_compat) when they receive the peerlist update
        features = FeatureSet(features={k for k, v in peer_info.features.items() if v is True})
        # Debug: Log when features are being sent
        if peer_info.features and not features.features:
            logger.warning(
                f"Peer {peer_info.nick} has features dict {peer_info.features} but "
                f"FeatureSet is empty after 'v is True' filter"
            )
        entry = create_peerlist_entry(peer_info.nick, peer_info.location_string, features=features)
        envelope = MessageEnvelope(message_type=MessageType.PEERLIST, payload=entry)

        try:
            await self.send_callback(to_location, envelope.to_bytes())
        except Exception as e:
            logger.trace(f"Failed to send peer location: {e}")

    async def broadcast_peer_disconnect(self, peer_location: str, network: NetworkType) -> None:
        peer = self.peer_registry.get_by_location(peer_location)
        if not peer or not peer.nick:
            return

        entry = create_peerlist_entry(peer.nick, peer.location_string, disconnected=True)
        envelope = MessageEnvelope(message_type=MessageType.PEERLIST, payload=entry)

        # Pre-serialize envelope once instead of per-peer
        envelope_bytes = envelope.to_bytes()

        # Use generator to avoid building full target list in memory
        def target_generator() -> Iterator[tuple[str, str | None]]:
            for p in self.peer_registry.iter_connected(network):
                if p.location_string == peer_location:
                    continue
                peer_key = p.nick if p.location_string == "NOT-SERVING-ONION" else p.location_string
                yield (peer_key, p.nick)

        # Execute sends in batches to limit memory usage
        sent_count = await self._batched_broadcast_iter(target_generator(), envelope_bytes)

        logger.info(f"Broadcasted disconnect for {peer.nick} to {sent_count} peers")

    def get_offer_stats(self) -> dict:
        """Get statistics about tracked offers."""
        total_offers = sum(len(offers) for offers in self._peer_offers.values())
        peers_with_offers = len([k for k, v in self._peer_offers.items() if v])

        # Find peers with more than 2 offers
        peers_many_offers = []
        for peer_key, offers in self._peer_offers.items():
            if len(offers) > 2:
                peer_info = self.peer_registry.get_by_key(peer_key)
                nick = peer_info.nick if peer_info else peer_key
                peers_many_offers.append((nick, len(offers)))

        # Sort by offer count descending
        peers_many_offers.sort(key=lambda x: x[1], reverse=True)

        return {
            "total_offers": total_offers,
            "peers_with_offers": peers_with_offers,
            "peers_many_offers": peers_many_offers[:10],  # Top 10
        }

    def remove_peer_offers(self, peer_key: str) -> None:
        """Remove offer tracking for a disconnected peer."""
        self._peer_offers.pop(peer_key, None)
Attributes
broadcast_batch_size = broadcast_batch_size instance-attribute
on_send_failed = on_send_failed instance-attribute
peer_registry = peer_registry instance-attribute
send_callback = send_callback instance-attribute
Functions
__init__(peer_registry: PeerRegistry, send_callback: SendCallback, broadcast_batch_size: int = DEFAULT_BROADCAST_BATCH_SIZE, on_send_failed: FailedSendCallback | None = None)
Source code in directory_server/src/directory_server/message_router.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(
    self,
    peer_registry: PeerRegistry,
    send_callback: SendCallback,
    broadcast_batch_size: int = DEFAULT_BROADCAST_BATCH_SIZE,
    on_send_failed: FailedSendCallback | None = None,
):
    self.peer_registry = peer_registry
    self.send_callback = send_callback
    self.broadcast_batch_size = broadcast_batch_size
    self.on_send_failed = on_send_failed
    # Track peers that failed during current operation to avoid repeated attempts
    self._failed_peers: set[str] = set()
    # Track offers per peer (peer_key -> set of order IDs)
    self._peer_offers: dict[str, set[str]] = {}
broadcast_peer_disconnect(peer_location: str, network: NetworkType) -> None async
Source code in directory_server/src/directory_server/message_router.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
async def broadcast_peer_disconnect(self, peer_location: str, network: NetworkType) -> None:
    peer = self.peer_registry.get_by_location(peer_location)
    if not peer or not peer.nick:
        return

    entry = create_peerlist_entry(peer.nick, peer.location_string, disconnected=True)
    envelope = MessageEnvelope(message_type=MessageType.PEERLIST, payload=entry)

    # Pre-serialize envelope once instead of per-peer
    envelope_bytes = envelope.to_bytes()

    # Use generator to avoid building full target list in memory
    def target_generator() -> Iterator[tuple[str, str | None]]:
        for p in self.peer_registry.iter_connected(network):
            if p.location_string == peer_location:
                continue
            peer_key = p.nick if p.location_string == "NOT-SERVING-ONION" else p.location_string
            yield (peer_key, p.nick)

    # Execute sends in batches to limit memory usage
    sent_count = await self._batched_broadcast_iter(target_generator(), envelope_bytes)

    logger.info(f"Broadcasted disconnect for {peer.nick} to {sent_count} peers")
get_offer_stats() -> dict

Get statistics about tracked offers.

Source code in directory_server/src/directory_server/message_router.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def get_offer_stats(self) -> dict:
    """Get statistics about tracked offers."""
    total_offers = sum(len(offers) for offers in self._peer_offers.values())
    peers_with_offers = len([k for k, v in self._peer_offers.items() if v])

    # Find peers with more than 2 offers
    peers_many_offers = []
    for peer_key, offers in self._peer_offers.items():
        if len(offers) > 2:
            peer_info = self.peer_registry.get_by_key(peer_key)
            nick = peer_info.nick if peer_info else peer_key
            peers_many_offers.append((nick, len(offers)))

    # Sort by offer count descending
    peers_many_offers.sort(key=lambda x: x[1], reverse=True)

    return {
        "total_offers": total_offers,
        "peers_with_offers": peers_with_offers,
        "peers_many_offers": peers_many_offers[:10],  # Top 10
    }
remove_peer_offers(peer_key: str) -> None

Remove offer tracking for a disconnected peer.

Source code in directory_server/src/directory_server/message_router.py
390
391
392
def remove_peer_offers(self, peer_key: str) -> None:
    """Remove offer tracking for a disconnected peer."""
    self._peer_offers.pop(peer_key, None)
route_message(envelope: MessageEnvelope, from_key: str) -> None async
Source code in directory_server/src/directory_server/message_router.py
42
43
44
45
46
47
48
49
50
51
52
async def route_message(self, envelope: MessageEnvelope, from_key: str) -> None:
    if envelope.message_type == MessageType.PUBMSG:
        await self._handle_public_message(envelope, from_key)
    elif envelope.message_type == MessageType.PRIVMSG:
        await self._handle_private_message(envelope, from_key)
    elif envelope.message_type == MessageType.GETPEERLIST:
        await self._handle_peerlist_request(from_key)
    elif envelope.message_type == MessageType.PING:
        await self._handle_ping(from_key)
    else:
        logger.debug(f"Unhandled message type: {envelope.message_type}")
send_peerlist(to_key: str, network: NetworkType, include_features: bool = False, chunk_size: int = 20) -> None async

Send peerlist to a peer in chunks.

Sends multiple PEERLIST messages to avoid overwhelming slow Tor connections. Each chunk contains up to chunk_size peer entries. Clients should accumulate entries from multiple PEERLIST messages.

Args: to_key: Key of the peer to send to network: Network to filter peers by include_features: If True, include F: suffix with features for each peer. This is enabled when the requesting peer supports peerlist_features. chunk_size: Maximum number of peer entries per PEERLIST message (default: 20)

Source code in directory_server/src/directory_server/message_router.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
async def send_peerlist(
    self,
    to_key: str,
    network: NetworkType,
    include_features: bool = False,
    chunk_size: int = 20,
) -> None:
    """
    Send peerlist to a peer in chunks.

    Sends multiple PEERLIST messages to avoid overwhelming slow Tor connections.
    Each chunk contains up to `chunk_size` peer entries. Clients should accumulate
    entries from multiple PEERLIST messages.

    Args:
        to_key: Key of the peer to send to
        network: Network to filter peers by
        include_features: If True, include F: suffix with features for each peer.
                         This is enabled when the requesting peer supports peerlist_features.
        chunk_size: Maximum number of peer entries per PEERLIST message (default: 20)
    """
    logger.debug(
        f"send_peerlist called for {to_key}, network={network}, "
        f"include_features={include_features}"
    )

    # Build list of entries
    entries: list[str] = []
    if include_features:
        peers_with_features = self.peer_registry.get_peerlist_with_features(network)
        entries = [
            create_peerlist_entry(nick, loc, features=features)
            for nick, loc, features in peers_with_features
        ]
    else:
        peers = self.peer_registry.get_peerlist_for_network(network)
        entries = [create_peerlist_entry(nick, loc) for nick, loc in peers]

    # Always send at least one response (even if empty) - clients wait for PEERLIST
    if not entries:
        envelope = MessageEnvelope(message_type=MessageType.PEERLIST, payload="")
        try:
            await self.send_callback(to_key, envelope.to_bytes())
            logger.debug(f"Sent empty peerlist to {to_key}")
        except Exception as e:
            logger.warning(f"Failed to send peerlist to {to_key}: {e}")
        return

    # Send entries in chunks
    chunks_sent = 0
    for i in range(0, len(entries), chunk_size):
        chunk = entries[i : i + chunk_size]
        peerlist_msg = ",".join(chunk)
        envelope = MessageEnvelope(message_type=MessageType.PEERLIST, payload=peerlist_msg)

        try:
            await self.send_callback(to_key, envelope.to_bytes())
            chunks_sent += 1
            # Small delay between chunks to avoid overwhelming the connection
            if i + chunk_size < len(entries):
                await asyncio.sleep(0.05)
        except Exception as e:
            logger.warning(f"Failed to send peerlist chunk {chunks_sent + 1} to {to_key}: {e}")
            return

    logger.debug(
        f"Sent peerlist to {to_key} ({len(entries)} peers in {chunks_sent} chunks, "
        f"include_features={include_features})"
    )