Skip to content

directory_server.peer_registry

directory_server.peer_registry

Peer registry for tracking active peers and their metadata.

Implements Single Responsibility Principle: only manages peer state.

Classes

PeerNotFoundError

Bases: Exception

Source code in directory_server/src/directory_server/peer_registry.py
15
16
class PeerNotFoundError(Exception):
    pass

PeerRegistry

Source code in directory_server/src/directory_server/peer_registry.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class PeerRegistry:
    def __init__(self, max_peers: int = 1000):
        self.max_peers = max_peers
        self._peers: dict[str, PeerInfo] = {}
        self._nick_to_key: dict[str, str] = {}

    def register(self, peer: PeerInfo) -> None:
        if len(self._peers) >= self.max_peers:
            raise ValueError(f"Maximum peers reached: {self.max_peers}")

        location = peer.location_string
        key = peer.nick if location == "NOT-SERVING-ONION" else location

        self._peers[key] = peer
        if peer.nick:
            self._nick_to_key[peer.nick] = key

        peer.last_seen = datetime.now(UTC)
        logger.info(f"Registered peer: {peer.nick} at {location}")

    def unregister(self, key: str) -> None:
        if key not in self._peers:
            return

        peer = self._peers[key]
        if peer.nick in self._nick_to_key:
            del self._nick_to_key[peer.nick]

        del self._peers[key]
        logger.info(f"Unregistered peer: {peer.nick} at {peer.location_string}")

    def get_by_key(self, key: str) -> PeerInfo | None:
        return self._peers.get(key)

    def get_by_location(self, location: str) -> PeerInfo | None:
        return self._peers.get(location)

    def get_by_nick(self, nick: str) -> PeerInfo | None:
        key = self._nick_to_key.get(nick)
        if key:
            return self._peers.get(key)
        return None

    def update_status(self, key: str, status: PeerStatus) -> None:
        peer = self.get_by_key(key)
        if peer:
            peer.status = status
            if status in (PeerStatus.CONNECTED, PeerStatus.HANDSHAKED):
                peer.last_seen = datetime.now(UTC)

    def _iter_connected(self, network: NetworkType | None = None) -> Iterator[PeerInfo]:
        """Iterator over connected peers.

        Creates a snapshot of peers to avoid RuntimeError if dict is modified during iteration.
        """
        for p in list(self._peers.values()):
            if (
                p.status == PeerStatus.HANDSHAKED
                and not p.is_directory
                and (network is None or p.network == network)
            ):
                yield p

    def iter_connected(self, network: NetworkType | None = None) -> Iterator[PeerInfo]:
        """Public memory-efficient iterator over connected peers."""
        return self._iter_connected(network)

    def get_all_connected(self, network: NetworkType | None = None) -> list[PeerInfo]:
        return list(self._iter_connected(network))

    def get_peerlist_for_network(self, network: NetworkType) -> list[tuple[str, str]]:
        # Use generator to avoid intermediate list
        # Include all connected peers, even NOT-SERVING-ONION
        # While they can't be directly connected to, they are reachable via the directory
        # for private messages, so this information is useful
        return [(peer.nick, peer.location_string) for peer in self._iter_connected(network)]

    def get_peerlist_with_features(self, network: NetworkType) -> list[tuple[str, str, FeatureSet]]:
        """
        Get peerlist with features for peers on a network.

        Returns list of (nick, location, features) tuples for connected peers.
        Includes all peers, even NOT-SERVING-ONION, as they are still reachable
        via the directory for private messaging.
        """
        result = []
        for peer in self._iter_connected(network):
            # Build FeatureSet from peer.features dict
            features = FeatureSet(features={k for k, v in peer.features.items() if v is True})
            # Debug: Log when features are extracted for peerlist
            if peer.features and not features.features:
                logger.warning(
                    f"Peer {peer.nick} has features dict {peer.features} but "
                    f"FeatureSet is empty after 'v is True' filter"
                )
            result.append((peer.nick, peer.location_string, features))
        return result

    def count(self) -> int:
        return len(self._peers)

    def clear(self) -> None:
        self._peers.clear()
        self._nick_to_key.clear()

    def get_passive_peers(self, network: NetworkType | None = None) -> list[PeerInfo]:
        """
        Get passive peers (NOT-SERVING-ONION).

        These are typically orderbook watchers/takers that don't host their own
        onion service but connect to the directory to watch offers.
        """
        return [p for p in self._iter_connected(network) if p.onion_address == "NOT-SERVING-ONION"]

    def get_active_peers(self, network: NetworkType | None = None) -> list[PeerInfo]:
        """
        Get active peers (serving onion address).

        These are typically makers that host their own onion service and
        publish offers to the orderbook.
        """
        return [p for p in self._iter_connected(network) if p.onion_address != "NOT-SERVING-ONION"]

    def get_stats(self) -> dict[str, int]:
        connected = 0
        passive = 0
        active = 0
        neutrino_compat = 0
        peerlist_features = 0
        push_encrypted = 0

        for p in list(self._peers.values()):
            if p.status == PeerStatus.HANDSHAKED and not p.is_directory:
                connected += 1
                if p.onion_address == "NOT-SERVING-ONION":
                    passive += 1
                else:
                    active += 1
                # Count feature support from features dict
                features = p.features
                if features.get("neutrino_compat"):
                    neutrino_compat += 1
                if features.get("peerlist_features"):
                    peerlist_features += 1
                if features.get("push_encrypted"):
                    push_encrypted += 1

        return {
            "total_peers": len(self._peers),
            "connected_peers": connected,
            "passive_peers": passive,
            "active_peers": active,
            "neutrino_compat_peers": neutrino_compat,
            "peerlist_features_peers": peerlist_features,
            "push_encrypted_peers": push_encrypted,
        }

    def get_neutrino_compat_peers(self, network: NetworkType | None = None) -> list[PeerInfo]:
        """
        Get peers that support neutrino_compat feature.

        These peers advertise extended UTXO metadata (scriptpubkey, blockheight)
        which is required for Neutrino backend verification.
        """
        return [p for p in self._iter_connected(network) if p.neutrino_compat]
Attributes
max_peers = max_peers instance-attribute
Functions
__init__(max_peers: int = 1000)
Source code in directory_server/src/directory_server/peer_registry.py
20
21
22
23
def __init__(self, max_peers: int = 1000):
    self.max_peers = max_peers
    self._peers: dict[str, PeerInfo] = {}
    self._nick_to_key: dict[str, str] = {}
clear() -> None
Source code in directory_server/src/directory_server/peer_registry.py
120
121
122
def clear(self) -> None:
    self._peers.clear()
    self._nick_to_key.clear()
count() -> int
Source code in directory_server/src/directory_server/peer_registry.py
117
118
def count(self) -> int:
    return len(self._peers)
get_active_peers(network: NetworkType | None = None) -> list[PeerInfo]

Get active peers (serving onion address).

These are typically makers that host their own onion service and publish offers to the orderbook.

Source code in directory_server/src/directory_server/peer_registry.py
133
134
135
136
137
138
139
140
def get_active_peers(self, network: NetworkType | None = None) -> list[PeerInfo]:
    """
    Get active peers (serving onion address).

    These are typically makers that host their own onion service and
    publish offers to the orderbook.
    """
    return [p for p in self._iter_connected(network) if p.onion_address != "NOT-SERVING-ONION"]
get_all_connected(network: NetworkType | None = None) -> list[PeerInfo]
Source code in directory_server/src/directory_server/peer_registry.py
86
87
def get_all_connected(self, network: NetworkType | None = None) -> list[PeerInfo]:
    return list(self._iter_connected(network))
get_by_key(key: str) -> PeerInfo | None
Source code in directory_server/src/directory_server/peer_registry.py
50
51
def get_by_key(self, key: str) -> PeerInfo | None:
    return self._peers.get(key)
get_by_location(location: str) -> PeerInfo | None
Source code in directory_server/src/directory_server/peer_registry.py
53
54
def get_by_location(self, location: str) -> PeerInfo | None:
    return self._peers.get(location)
get_by_nick(nick: str) -> PeerInfo | None
Source code in directory_server/src/directory_server/peer_registry.py
56
57
58
59
60
def get_by_nick(self, nick: str) -> PeerInfo | None:
    key = self._nick_to_key.get(nick)
    if key:
        return self._peers.get(key)
    return None
get_neutrino_compat_peers(network: NetworkType | None = None) -> list[PeerInfo]

Get peers that support neutrino_compat feature.

These peers advertise extended UTXO metadata (scriptpubkey, blockheight) which is required for Neutrino backend verification.

Source code in directory_server/src/directory_server/peer_registry.py
176
177
178
179
180
181
182
183
def get_neutrino_compat_peers(self, network: NetworkType | None = None) -> list[PeerInfo]:
    """
    Get peers that support neutrino_compat feature.

    These peers advertise extended UTXO metadata (scriptpubkey, blockheight)
    which is required for Neutrino backend verification.
    """
    return [p for p in self._iter_connected(network) if p.neutrino_compat]
get_passive_peers(network: NetworkType | None = None) -> list[PeerInfo]

Get passive peers (NOT-SERVING-ONION).

These are typically orderbook watchers/takers that don't host their own onion service but connect to the directory to watch offers.

Source code in directory_server/src/directory_server/peer_registry.py
124
125
126
127
128
129
130
131
def get_passive_peers(self, network: NetworkType | None = None) -> list[PeerInfo]:
    """
    Get passive peers (NOT-SERVING-ONION).

    These are typically orderbook watchers/takers that don't host their own
    onion service but connect to the directory to watch offers.
    """
    return [p for p in self._iter_connected(network) if p.onion_address == "NOT-SERVING-ONION"]
get_peerlist_for_network(network: NetworkType) -> list[tuple[str, str]]
Source code in directory_server/src/directory_server/peer_registry.py
89
90
91
92
93
94
def get_peerlist_for_network(self, network: NetworkType) -> list[tuple[str, str]]:
    # Use generator to avoid intermediate list
    # Include all connected peers, even NOT-SERVING-ONION
    # While they can't be directly connected to, they are reachable via the directory
    # for private messages, so this information is useful
    return [(peer.nick, peer.location_string) for peer in self._iter_connected(network)]
get_peerlist_with_features(network: NetworkType) -> list[tuple[str, str, FeatureSet]]

Get peerlist with features for peers on a network.

Returns list of (nick, location, features) tuples for connected peers. Includes all peers, even NOT-SERVING-ONION, as they are still reachable via the directory for private messaging.

Source code in directory_server/src/directory_server/peer_registry.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def get_peerlist_with_features(self, network: NetworkType) -> list[tuple[str, str, FeatureSet]]:
    """
    Get peerlist with features for peers on a network.

    Returns list of (nick, location, features) tuples for connected peers.
    Includes all peers, even NOT-SERVING-ONION, as they are still reachable
    via the directory for private messaging.
    """
    result = []
    for peer in self._iter_connected(network):
        # Build FeatureSet from peer.features dict
        features = FeatureSet(features={k for k, v in peer.features.items() if v is True})
        # Debug: Log when features are extracted for peerlist
        if peer.features and not features.features:
            logger.warning(
                f"Peer {peer.nick} has features dict {peer.features} but "
                f"FeatureSet is empty after 'v is True' filter"
            )
        result.append((peer.nick, peer.location_string, features))
    return result
get_stats() -> dict[str, int]
Source code in directory_server/src/directory_server/peer_registry.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def get_stats(self) -> dict[str, int]:
    connected = 0
    passive = 0
    active = 0
    neutrino_compat = 0
    peerlist_features = 0
    push_encrypted = 0

    for p in list(self._peers.values()):
        if p.status == PeerStatus.HANDSHAKED and not p.is_directory:
            connected += 1
            if p.onion_address == "NOT-SERVING-ONION":
                passive += 1
            else:
                active += 1
            # Count feature support from features dict
            features = p.features
            if features.get("neutrino_compat"):
                neutrino_compat += 1
            if features.get("peerlist_features"):
                peerlist_features += 1
            if features.get("push_encrypted"):
                push_encrypted += 1

    return {
        "total_peers": len(self._peers),
        "connected_peers": connected,
        "passive_peers": passive,
        "active_peers": active,
        "neutrino_compat_peers": neutrino_compat,
        "peerlist_features_peers": peerlist_features,
        "push_encrypted_peers": push_encrypted,
    }
iter_connected(network: NetworkType | None = None) -> Iterator[PeerInfo]

Public memory-efficient iterator over connected peers.

Source code in directory_server/src/directory_server/peer_registry.py
82
83
84
def iter_connected(self, network: NetworkType | None = None) -> Iterator[PeerInfo]:
    """Public memory-efficient iterator over connected peers."""
    return self._iter_connected(network)
register(peer: PeerInfo) -> None
Source code in directory_server/src/directory_server/peer_registry.py
25
26
27
28
29
30
31
32
33
34
35
36
37
def register(self, peer: PeerInfo) -> None:
    if len(self._peers) >= self.max_peers:
        raise ValueError(f"Maximum peers reached: {self.max_peers}")

    location = peer.location_string
    key = peer.nick if location == "NOT-SERVING-ONION" else location

    self._peers[key] = peer
    if peer.nick:
        self._nick_to_key[peer.nick] = key

    peer.last_seen = datetime.now(UTC)
    logger.info(f"Registered peer: {peer.nick} at {location}")
unregister(key: str) -> None
Source code in directory_server/src/directory_server/peer_registry.py
39
40
41
42
43
44
45
46
47
48
def unregister(self, key: str) -> None:
    if key not in self._peers:
        return

    peer = self._peers[key]
    if peer.nick in self._nick_to_key:
        del self._nick_to_key[peer.nick]

    del self._peers[key]
    logger.info(f"Unregistered peer: {peer.nick} at {peer.location_string}")
update_status(key: str, status: PeerStatus) -> None
Source code in directory_server/src/directory_server/peer_registry.py
62
63
64
65
66
67
def update_status(self, key: str, status: PeerStatus) -> None:
    peer = self.get_by_key(key)
    if peer:
        peer.status = status
        if status in (PeerStatus.CONNECTED, PeerStatus.HANDSHAKED):
            peer.last_seen = datetime.now(UTC)