Skip to content

directory_server.peer_registry

directory_server.peer_registry

Peer registry for tracking active peers and their metadata.

Implements Single Responsibility Principle: only manages peer state.

Classes

PeerNotFoundError

Bases: Exception

Source code in directory_server/src/directory_server/peer_registry.py
15
16
class PeerNotFoundError(Exception):
    pass

PeerRegistry

Source code in directory_server/src/directory_server/peer_registry.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class PeerRegistry:
    def __init__(self, max_peers: int = 1000):
        self.max_peers = max_peers
        self._peers: dict[str, PeerInfo] = {}
        self._nick_to_key: dict[str, str] = {}

    def register(self, peer: PeerInfo) -> None:
        if len(self._peers) >= self.max_peers:
            raise ValueError(f"Maximum peers reached: {self.max_peers}")

        location = peer.location_string
        key = peer.nick if location == "NOT-SERVING-ONION" else location

        self._peers[key] = peer
        if peer.nick:
            self._nick_to_key[peer.nick] = key

        peer.last_seen = datetime.now(UTC)
        logger.info(f"Registered peer: {peer.nick} at {location}")

    def unregister(self, key: str) -> None:
        if key not in self._peers:
            return

        peer = self._peers[key]
        if peer.nick in self._nick_to_key:
            del self._nick_to_key[peer.nick]

        del self._peers[key]
        logger.info(f"Unregistered peer: {peer.nick} at {peer.location_string}")

    def get_by_key(self, key: str) -> PeerInfo | None:
        return self._peers.get(key)

    def get_by_location(self, location: str) -> PeerInfo | None:
        return self._peers.get(location)

    def get_by_nick(self, nick: str) -> PeerInfo | None:
        key = self._nick_to_key.get(nick)
        if key:
            return self._peers.get(key)
        return None

    def update_status(self, key: str, status: PeerStatus) -> None:
        peer = self.get_by_key(key)
        if peer:
            peer.status = status
            if status in (PeerStatus.CONNECTED, PeerStatus.HANDSHAKED):
                peer.last_seen = datetime.now(UTC)

    def update_last_seen(self, key: str) -> None:
        """Update the last_seen timestamp for a peer.

        Called on every received message to track peer liveness for heartbeat.
        """
        peer = self.get_by_key(key)
        if peer:
            peer.last_seen = datetime.now(UTC)

    def _iter_connected(self, network: NetworkType | None = None) -> Iterator[PeerInfo]:
        """Iterator over connected peers.

        Creates a snapshot of peers to avoid RuntimeError if dict is modified during iteration.
        """
        for p in list(self._peers.values()):
            if (
                p.status == PeerStatus.HANDSHAKED
                and not p.is_directory
                and (network is None or p.network == network)
            ):
                yield p

    def iter_connected(self, network: NetworkType | None = None) -> Iterator[PeerInfo]:
        """Public memory-efficient iterator over connected peers."""
        return self._iter_connected(network)

    def get_all_connected(self, network: NetworkType | None = None) -> list[PeerInfo]:
        return list(self._iter_connected(network))

    def get_peerlist_for_network(self, network: NetworkType) -> list[tuple[str, str]]:
        # Use generator to avoid intermediate list
        # Include all connected peers, even NOT-SERVING-ONION
        # While they can't be directly connected to, they are reachable via the directory
        # for private messages, so this information is useful
        return [(peer.nick, peer.location_string) for peer in self._iter_connected(network)]

    def get_peerlist_with_features(self, network: NetworkType) -> list[tuple[str, str, FeatureSet]]:
        """
        Get peerlist with features for peers on a network.

        Returns list of (nick, location, features) tuples for connected peers.
        Includes all peers, even NOT-SERVING-ONION, as they are still reachable
        via the directory for private messaging.
        """
        result = []
        for peer in self._iter_connected(network):
            # Build FeatureSet from peer.features dict
            features = FeatureSet(features={k for k, v in peer.features.items() if v is True})
            # Debug: Log when features are extracted for peerlist
            if peer.features and not features.features:
                logger.warning(
                    f"Peer {peer.nick} has features dict {peer.features} but "
                    f"FeatureSet is empty after 'v is True' filter"
                )
            result.append((peer.nick, peer.location_string, features))
        return result

    def count(self) -> int:
        return len(self._peers)

    def clear(self) -> None:
        self._peers.clear()
        self._nick_to_key.clear()

    def get_passive_peers(self, network: NetworkType | None = None) -> list[PeerInfo]:
        """
        Get passive peers (NOT-SERVING-ONION).

        These are typically orderbook watchers/takers that don't host their own
        onion service but connect to the directory to watch offers.
        """
        return [p for p in self._iter_connected(network) if p.onion_address == "NOT-SERVING-ONION"]

    def get_active_peers(self, network: NetworkType | None = None) -> list[PeerInfo]:
        """
        Get active peers (serving onion address).

        These are typically makers that host their own onion service and
        publish offers to the orderbook.
        """
        return [p for p in self._iter_connected(network) if p.onion_address != "NOT-SERVING-ONION"]

    def get_stats(self) -> dict[str, int]:
        connected = 0
        passive = 0
        active = 0
        neutrino_compat = 0
        peerlist_features = 0
        push_encrypted = 0

        for p in list(self._peers.values()):
            if p.status == PeerStatus.HANDSHAKED and not p.is_directory:
                connected += 1
                if p.onion_address == "NOT-SERVING-ONION":
                    passive += 1
                else:
                    active += 1
                # Count feature support from features dict
                features = p.features
                if features.get("neutrino_compat"):
                    neutrino_compat += 1
                if features.get("peerlist_features"):
                    peerlist_features += 1
                if features.get("push_encrypted"):
                    push_encrypted += 1

        return {
            "total_peers": len(self._peers),
            "connected_peers": connected,
            "passive_peers": passive,
            "active_peers": active,
            "neutrino_compat_peers": neutrino_compat,
            "peerlist_features_peers": peerlist_features,
            "push_encrypted_peers": push_encrypted,
        }

    def get_neutrino_compat_peers(self, network: NetworkType | None = None) -> list[PeerInfo]:
        """
        Get peers that support neutrino_compat feature.

        These peers advertise extended UTXO metadata (scriptpubkey, blockheight)
        which is required for Neutrino backend verification.
        """
        return [p for p in self._iter_connected(network) if p.neutrino_compat]

    def get_peers_idle_since(self, cutoff: datetime) -> list[tuple[str, PeerInfo]]:
        """Get connected peers whose last_seen is older than cutoff.

        Returns list of (peer_key, peer_info) tuples.
        """
        result: list[tuple[str, PeerInfo]] = []
        for key, peer in list(self._peers.items()):
            if (
                peer.status == PeerStatus.HANDSHAKED
                and not peer.is_directory
                and peer.last_seen is not None
                and peer.last_seen < cutoff
            ):
                result.append((key, peer))
        return result

    def supports_ping(self, key: str) -> bool:
        """Check if a peer supports PING/PONG heartbeat."""
        peer = self.get_by_key(key)
        if peer is None:
            return False
        return peer.features.get("ping", False) is True

    def is_maker(self, key: str) -> bool:
        """Check if a peer is a maker (serves an onion address)."""
        peer = self.get_by_key(key)
        if peer is None:
            return False
        return peer.onion_address != "NOT-SERVING-ONION"
Attributes
max_peers = max_peers instance-attribute
Functions
__init__(max_peers: int = 1000)
Source code in directory_server/src/directory_server/peer_registry.py
20
21
22
23
def __init__(self, max_peers: int = 1000):
    self.max_peers = max_peers
    self._peers: dict[str, PeerInfo] = {}
    self._nick_to_key: dict[str, str] = {}
clear() -> None
Source code in directory_server/src/directory_server/peer_registry.py
129
130
131
def clear(self) -> None:
    self._peers.clear()
    self._nick_to_key.clear()
count() -> int
Source code in directory_server/src/directory_server/peer_registry.py
126
127
def count(self) -> int:
    return len(self._peers)
get_active_peers(network: NetworkType | None = None) -> list[PeerInfo]

Get active peers (serving onion address).

These are typically makers that host their own onion service and publish offers to the orderbook.

Source code in directory_server/src/directory_server/peer_registry.py
142
143
144
145
146
147
148
149
def get_active_peers(self, network: NetworkType | None = None) -> list[PeerInfo]:
    """
    Get active peers (serving onion address).

    These are typically makers that host their own onion service and
    publish offers to the orderbook.
    """
    return [p for p in self._iter_connected(network) if p.onion_address != "NOT-SERVING-ONION"]
get_all_connected(network: NetworkType | None = None) -> list[PeerInfo]
Source code in directory_server/src/directory_server/peer_registry.py
95
96
def get_all_connected(self, network: NetworkType | None = None) -> list[PeerInfo]:
    return list(self._iter_connected(network))
get_by_key(key: str) -> PeerInfo | None
Source code in directory_server/src/directory_server/peer_registry.py
50
51
def get_by_key(self, key: str) -> PeerInfo | None:
    return self._peers.get(key)
get_by_location(location: str) -> PeerInfo | None
Source code in directory_server/src/directory_server/peer_registry.py
53
54
def get_by_location(self, location: str) -> PeerInfo | None:
    return self._peers.get(location)
get_by_nick(nick: str) -> PeerInfo | None
Source code in directory_server/src/directory_server/peer_registry.py
56
57
58
59
60
def get_by_nick(self, nick: str) -> PeerInfo | None:
    key = self._nick_to_key.get(nick)
    if key:
        return self._peers.get(key)
    return None
get_neutrino_compat_peers(network: NetworkType | None = None) -> list[PeerInfo]

Get peers that support neutrino_compat feature.

These peers advertise extended UTXO metadata (scriptpubkey, blockheight) which is required for Neutrino backend verification.

Source code in directory_server/src/directory_server/peer_registry.py
185
186
187
188
189
190
191
192
def get_neutrino_compat_peers(self, network: NetworkType | None = None) -> list[PeerInfo]:
    """
    Get peers that support neutrino_compat feature.

    These peers advertise extended UTXO metadata (scriptpubkey, blockheight)
    which is required for Neutrino backend verification.
    """
    return [p for p in self._iter_connected(network) if p.neutrino_compat]
get_passive_peers(network: NetworkType | None = None) -> list[PeerInfo]

Get passive peers (NOT-SERVING-ONION).

These are typically orderbook watchers/takers that don't host their own onion service but connect to the directory to watch offers.

Source code in directory_server/src/directory_server/peer_registry.py
133
134
135
136
137
138
139
140
def get_passive_peers(self, network: NetworkType | None = None) -> list[PeerInfo]:
    """
    Get passive peers (NOT-SERVING-ONION).

    These are typically orderbook watchers/takers that don't host their own
    onion service but connect to the directory to watch offers.
    """
    return [p for p in self._iter_connected(network) if p.onion_address == "NOT-SERVING-ONION"]
get_peerlist_for_network(network: NetworkType) -> list[tuple[str, str]]
Source code in directory_server/src/directory_server/peer_registry.py
 98
 99
100
101
102
103
def get_peerlist_for_network(self, network: NetworkType) -> list[tuple[str, str]]:
    # Use generator to avoid intermediate list
    # Include all connected peers, even NOT-SERVING-ONION
    # While they can't be directly connected to, they are reachable via the directory
    # for private messages, so this information is useful
    return [(peer.nick, peer.location_string) for peer in self._iter_connected(network)]
get_peerlist_with_features(network: NetworkType) -> list[tuple[str, str, FeatureSet]]

Get peerlist with features for peers on a network.

Returns list of (nick, location, features) tuples for connected peers. Includes all peers, even NOT-SERVING-ONION, as they are still reachable via the directory for private messaging.

Source code in directory_server/src/directory_server/peer_registry.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def get_peerlist_with_features(self, network: NetworkType) -> list[tuple[str, str, FeatureSet]]:
    """
    Get peerlist with features for peers on a network.

    Returns list of (nick, location, features) tuples for connected peers.
    Includes all peers, even NOT-SERVING-ONION, as they are still reachable
    via the directory for private messaging.
    """
    result = []
    for peer in self._iter_connected(network):
        # Build FeatureSet from peer.features dict
        features = FeatureSet(features={k for k, v in peer.features.items() if v is True})
        # Debug: Log when features are extracted for peerlist
        if peer.features and not features.features:
            logger.warning(
                f"Peer {peer.nick} has features dict {peer.features} but "
                f"FeatureSet is empty after 'v is True' filter"
            )
        result.append((peer.nick, peer.location_string, features))
    return result
get_peers_idle_since(cutoff: datetime) -> list[tuple[str, PeerInfo]]

Get connected peers whose last_seen is older than cutoff.

Returns list of (peer_key, peer_info) tuples.

Source code in directory_server/src/directory_server/peer_registry.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def get_peers_idle_since(self, cutoff: datetime) -> list[tuple[str, PeerInfo]]:
    """Get connected peers whose last_seen is older than cutoff.

    Returns list of (peer_key, peer_info) tuples.
    """
    result: list[tuple[str, PeerInfo]] = []
    for key, peer in list(self._peers.items()):
        if (
            peer.status == PeerStatus.HANDSHAKED
            and not peer.is_directory
            and peer.last_seen is not None
            and peer.last_seen < cutoff
        ):
            result.append((key, peer))
    return result
get_stats() -> dict[str, int]
Source code in directory_server/src/directory_server/peer_registry.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def get_stats(self) -> dict[str, int]:
    connected = 0
    passive = 0
    active = 0
    neutrino_compat = 0
    peerlist_features = 0
    push_encrypted = 0

    for p in list(self._peers.values()):
        if p.status == PeerStatus.HANDSHAKED and not p.is_directory:
            connected += 1
            if p.onion_address == "NOT-SERVING-ONION":
                passive += 1
            else:
                active += 1
            # Count feature support from features dict
            features = p.features
            if features.get("neutrino_compat"):
                neutrino_compat += 1
            if features.get("peerlist_features"):
                peerlist_features += 1
            if features.get("push_encrypted"):
                push_encrypted += 1

    return {
        "total_peers": len(self._peers),
        "connected_peers": connected,
        "passive_peers": passive,
        "active_peers": active,
        "neutrino_compat_peers": neutrino_compat,
        "peerlist_features_peers": peerlist_features,
        "push_encrypted_peers": push_encrypted,
    }
is_maker(key: str) -> bool

Check if a peer is a maker (serves an onion address).

Source code in directory_server/src/directory_server/peer_registry.py
217
218
219
220
221
222
def is_maker(self, key: str) -> bool:
    """Check if a peer is a maker (serves an onion address)."""
    peer = self.get_by_key(key)
    if peer is None:
        return False
    return peer.onion_address != "NOT-SERVING-ONION"
iter_connected(network: NetworkType | None = None) -> Iterator[PeerInfo]

Public memory-efficient iterator over connected peers.

Source code in directory_server/src/directory_server/peer_registry.py
91
92
93
def iter_connected(self, network: NetworkType | None = None) -> Iterator[PeerInfo]:
    """Public memory-efficient iterator over connected peers."""
    return self._iter_connected(network)
register(peer: PeerInfo) -> None
Source code in directory_server/src/directory_server/peer_registry.py
25
26
27
28
29
30
31
32
33
34
35
36
37
def register(self, peer: PeerInfo) -> None:
    if len(self._peers) >= self.max_peers:
        raise ValueError(f"Maximum peers reached: {self.max_peers}")

    location = peer.location_string
    key = peer.nick if location == "NOT-SERVING-ONION" else location

    self._peers[key] = peer
    if peer.nick:
        self._nick_to_key[peer.nick] = key

    peer.last_seen = datetime.now(UTC)
    logger.info(f"Registered peer: {peer.nick} at {location}")
supports_ping(key: str) -> bool

Check if a peer supports PING/PONG heartbeat.

Source code in directory_server/src/directory_server/peer_registry.py
210
211
212
213
214
215
def supports_ping(self, key: str) -> bool:
    """Check if a peer supports PING/PONG heartbeat."""
    peer = self.get_by_key(key)
    if peer is None:
        return False
    return peer.features.get("ping", False) is True
unregister(key: str) -> None
Source code in directory_server/src/directory_server/peer_registry.py
39
40
41
42
43
44
45
46
47
48
def unregister(self, key: str) -> None:
    if key not in self._peers:
        return

    peer = self._peers[key]
    if peer.nick in self._nick_to_key:
        del self._nick_to_key[peer.nick]

    del self._peers[key]
    logger.info(f"Unregistered peer: {peer.nick} at {peer.location_string}")
update_last_seen(key: str) -> None

Update the last_seen timestamp for a peer.

Called on every received message to track peer liveness for heartbeat.

Source code in directory_server/src/directory_server/peer_registry.py
69
70
71
72
73
74
75
76
def update_last_seen(self, key: str) -> None:
    """Update the last_seen timestamp for a peer.

    Called on every received message to track peer liveness for heartbeat.
    """
    peer = self.get_by_key(key)
    if peer:
        peer.last_seen = datetime.now(UTC)
update_status(key: str, status: PeerStatus) -> None
Source code in directory_server/src/directory_server/peer_registry.py
62
63
64
65
66
67
def update_status(self, key: str, status: PeerStatus) -> None:
    peer = self.get_by_key(key)
    if peer:
        peer.status = status
        if status in (PeerStatus.CONNECTED, PeerStatus.HANDSHAKED):
            peer.last_seen = datetime.now(UTC)