Skip to content

jmcore.protocol

jmcore.protocol

JoinMarket protocol definitions, message types, and serialization.

Feature Flag System

This implementation uses feature flags for capability negotiation instead of protocol version bumping. This allows incremental feature adoption while maintaining full compatibility with the reference implementation from joinmarket-clientserver.

Features are advertised in the handshake features dict and negotiated per-CoinJoin session via extended !fill/!pubkey messages.

Available Features: - neutrino_compat: Extended UTXO metadata (scriptpubkey, blockheight) for light client verification. Required for Neutrino backend takers. - push_encrypted: Encrypted !push command with session binding. Prevents abuse of makers as unauthenticated broadcast bots.

Feature Dependencies: - neutrino_compat: No dependencies - push_encrypted: Requires active NaCl encryption session (implicit)

Nick Format:

JoinMarket nicks encode the protocol version: J{version}{hash} All nicks use version 5 for maximum compatibility with reference implementation. Feature detection happens via handshake and !fill/!pubkey exchange, not nick.

Cross-Implementation Compatibility:

Our Implementation ↔ Reference (JAM): - We use J5 nicks and proto-ver=5 in handshake - Features field is ignored by reference implementation - Legacy UTXO format used unless both peers advertise neutrino_compat - Graceful fallback to v5 behavior for all features

Feature Negotiation During CoinJoin: - Taker advertises features in !fill (optional JSON suffix) - Maker responds with features in !pubkey (optional JSON suffix) - Extended formats used only when both peers support the feature

Peerlist Feature Extension: Our directory server extends the peerlist format to include features: - Legacy format: nick;location (or nick;location;D for disconnected) - Extended format: nick;location;F:feature1+feature2 (features as plus-separated list) The extended format is backward compatible - legacy clients will ignore the F: suffix. Note: Plus separator is used because the peerlist itself uses commas to separate entries.

Attributes

ALL_FEATURES = {FEATURE_NEUTRINO_COMPAT, FEATURE_PUSH_ENCRYPTED, FEATURE_PEERLIST_FEATURES} module-attribute

COMMAND_PREFIX = '!' module-attribute

FEATURE_DEPENDENCIES: dict[str, list[str]] = {FEATURE_NEUTRINO_COMPAT: [], FEATURE_PUSH_ENCRYPTED: [], FEATURE_PEERLIST_FEATURES: []} module-attribute

FEATURE_NEUTRINO_COMPAT = 'neutrino_compat' module-attribute

FEATURE_PEERLIST_FEATURES = 'peerlist_features' module-attribute

FEATURE_PUSH_ENCRYPTED = 'push_encrypted' module-attribute

JM_VERSION = 5 module-attribute

JM_VERSION_MIN = JM_VERSION module-attribute

NICK_HASH_LENGTH = 10 module-attribute

NICK_MAX_ENCODED = 14 module-attribute

NICK_PEERLOCATOR_SEPARATOR = ';' module-attribute

NOT_SERVING_ONION_HOSTNAME = 'NOT-SERVING-ONION' module-attribute

ONION_VIRTUAL_PORT = 5222 module-attribute

Classes

FeatureSet

Represents a set of protocol features advertised by a peer.

Used for feature negotiation during handshake and CoinJoin sessions.

Source code in jmcore/src/jmcore/protocol.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@dataclass
class FeatureSet:
    """
    Represents a set of protocol features advertised by a peer.

    Used for feature negotiation during handshake and CoinJoin sessions.
    """

    features: set[str] = Field(default_factory=set)

    @classmethod
    def from_handshake(cls, handshake_data: dict[str, Any]) -> FeatureSet:
        """Extract features from a handshake payload."""
        features_dict = handshake_data.get("features", {})
        # Only include features that are set to True
        features = {k for k, v in features_dict.items() if v is True}
        return cls(features=features)

    @classmethod
    def from_list(cls, feature_list: list[str]) -> FeatureSet:
        """Create from a list of feature names."""
        return cls(features=set(feature_list))

    @classmethod
    def from_comma_string(cls, s: str) -> FeatureSet:
        """Parse from plus-separated string (e.g., 'neutrino_compat+push_encrypted').

        Note: Despite the method name, uses '+' as separator because the peerlist
        itself uses ',' to separate entries. The name is kept for backward compatibility.
        Also accepts ',' for legacy/handshake use cases.
        """
        if not s or not s.strip():
            return cls(features=set())
        # Support both + (peerlist) and , (legacy/handshake) separators
        if "+" in s:
            return cls(features={f.strip() for f in s.split("+") if f.strip()})
        return cls(features={f.strip() for f in s.split(",") if f.strip()})

    def to_dict(self) -> dict[str, bool]:
        """Convert to dict for JSON serialization."""
        return dict.fromkeys(sorted(self.features), True)

    def to_comma_string(self) -> str:
        """Convert to plus-separated string for peerlist F: suffix.

        Note: Uses '+' as separator instead of ',' because the peerlist
        itself uses ',' to separate entries. Using ',' for features would
        cause parsing ambiguity.
        """
        return "+".join(sorted(self.features))

    def supports(self, feature: str) -> bool:
        """Check if this set includes a specific feature."""
        return feature in self.features

    def supports_neutrino_compat(self) -> bool:
        """Check if neutrino_compat is supported."""
        return FEATURE_NEUTRINO_COMPAT in self.features

    def supports_push_encrypted(self) -> bool:
        """Check if push_encrypted is supported."""
        return FEATURE_PUSH_ENCRYPTED in self.features

    def supports_peerlist_features(self) -> bool:
        """Check if peer supports extended peerlist with features (F: suffix)."""
        return FEATURE_PEERLIST_FEATURES in self.features

    def validate_dependencies(self) -> tuple[bool, str]:
        """Check that all feature dependencies are satisfied."""
        for feature in self.features:
            deps = FEATURE_DEPENDENCIES.get(feature, [])
            for dep in deps:
                if dep not in self.features:
                    return False, f"Feature '{feature}' requires '{dep}'"
        return True, ""

    def intersection(self, other: FeatureSet) -> FeatureSet:
        """Return features supported by both sets."""
        return FeatureSet(features=self.features & other.features)

    def __bool__(self) -> bool:
        """True if any features are set."""
        return bool(self.features)

    def __contains__(self, feature: str) -> bool:
        return feature in self.features

    def __iter__(self):
        return iter(self.features)

    def __len__(self) -> int:
        return len(self.features)
Attributes
features: set[str] = Field(default_factory=set) class-attribute instance-attribute
Functions
__bool__() -> bool

True if any features are set.

Source code in jmcore/src/jmcore/protocol.py
170
171
172
def __bool__(self) -> bool:
    """True if any features are set."""
    return bool(self.features)
__contains__(feature: str) -> bool
Source code in jmcore/src/jmcore/protocol.py
174
175
def __contains__(self, feature: str) -> bool:
    return feature in self.features
__iter__()
Source code in jmcore/src/jmcore/protocol.py
177
178
def __iter__(self):
    return iter(self.features)
__len__() -> int
Source code in jmcore/src/jmcore/protocol.py
180
181
def __len__(self) -> int:
    return len(self.features)
from_comma_string(s: str) -> FeatureSet classmethod

Parse from plus-separated string (e.g., 'neutrino_compat+push_encrypted').

Note: Despite the method name, uses '+' as separator because the peerlist itself uses ',' to separate entries. The name is kept for backward compatibility. Also accepts ',' for legacy/handshake use cases.

Source code in jmcore/src/jmcore/protocol.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@classmethod
def from_comma_string(cls, s: str) -> FeatureSet:
    """Parse from plus-separated string (e.g., 'neutrino_compat+push_encrypted').

    Note: Despite the method name, uses '+' as separator because the peerlist
    itself uses ',' to separate entries. The name is kept for backward compatibility.
    Also accepts ',' for legacy/handshake use cases.
    """
    if not s or not s.strip():
        return cls(features=set())
    # Support both + (peerlist) and , (legacy/handshake) separators
    if "+" in s:
        return cls(features={f.strip() for f in s.split("+") if f.strip()})
    return cls(features={f.strip() for f in s.split(",") if f.strip()})
from_handshake(handshake_data: dict[str, Any]) -> FeatureSet classmethod

Extract features from a handshake payload.

Source code in jmcore/src/jmcore/protocol.py
100
101
102
103
104
105
106
@classmethod
def from_handshake(cls, handshake_data: dict[str, Any]) -> FeatureSet:
    """Extract features from a handshake payload."""
    features_dict = handshake_data.get("features", {})
    # Only include features that are set to True
    features = {k for k, v in features_dict.items() if v is True}
    return cls(features=features)
from_list(feature_list: list[str]) -> FeatureSet classmethod

Create from a list of feature names.

Source code in jmcore/src/jmcore/protocol.py
108
109
110
111
@classmethod
def from_list(cls, feature_list: list[str]) -> FeatureSet:
    """Create from a list of feature names."""
    return cls(features=set(feature_list))
intersection(other: FeatureSet) -> FeatureSet

Return features supported by both sets.

Source code in jmcore/src/jmcore/protocol.py
166
167
168
def intersection(self, other: FeatureSet) -> FeatureSet:
    """Return features supported by both sets."""
    return FeatureSet(features=self.features & other.features)
supports(feature: str) -> bool

Check if this set includes a specific feature.

Source code in jmcore/src/jmcore/protocol.py
141
142
143
def supports(self, feature: str) -> bool:
    """Check if this set includes a specific feature."""
    return feature in self.features
supports_neutrino_compat() -> bool

Check if neutrino_compat is supported.

Source code in jmcore/src/jmcore/protocol.py
145
146
147
def supports_neutrino_compat(self) -> bool:
    """Check if neutrino_compat is supported."""
    return FEATURE_NEUTRINO_COMPAT in self.features
supports_peerlist_features() -> bool

Check if peer supports extended peerlist with features (F: suffix).

Source code in jmcore/src/jmcore/protocol.py
153
154
155
def supports_peerlist_features(self) -> bool:
    """Check if peer supports extended peerlist with features (F: suffix)."""
    return FEATURE_PEERLIST_FEATURES in self.features
supports_push_encrypted() -> bool

Check if push_encrypted is supported.

Source code in jmcore/src/jmcore/protocol.py
149
150
151
def supports_push_encrypted(self) -> bool:
    """Check if push_encrypted is supported."""
    return FEATURE_PUSH_ENCRYPTED in self.features
to_comma_string() -> str

Convert to plus-separated string for peerlist F: suffix.

Note: Uses '+' as separator instead of ',' because the peerlist itself uses ',' to separate entries. Using ',' for features would cause parsing ambiguity.

Source code in jmcore/src/jmcore/protocol.py
132
133
134
135
136
137
138
139
def to_comma_string(self) -> str:
    """Convert to plus-separated string for peerlist F: suffix.

    Note: Uses '+' as separator instead of ',' because the peerlist
    itself uses ',' to separate entries. Using ',' for features would
    cause parsing ambiguity.
    """
    return "+".join(sorted(self.features))
to_dict() -> dict[str, bool]

Convert to dict for JSON serialization.

Source code in jmcore/src/jmcore/protocol.py
128
129
130
def to_dict(self) -> dict[str, bool]:
    """Convert to dict for JSON serialization."""
    return dict.fromkeys(sorted(self.features), True)
validate_dependencies() -> tuple[bool, str]

Check that all feature dependencies are satisfied.

Source code in jmcore/src/jmcore/protocol.py
157
158
159
160
161
162
163
164
def validate_dependencies(self) -> tuple[bool, str]:
    """Check that all feature dependencies are satisfied."""
    for feature in self.features:
        deps = FEATURE_DEPENDENCIES.get(feature, [])
        for dep in deps:
            if dep not in self.features:
                return False, f"Feature '{feature}' requires '{dep}'"
    return True, ""

MessageType

Bases: IntEnum

Source code in jmcore/src/jmcore/protocol.py
336
337
338
339
340
341
342
343
344
345
346
347
348
class MessageType(IntEnum):
    PRIVMSG = 685
    PUBMSG = 687
    PEERLIST = 789
    GETPEERLIST = 791
    HANDSHAKE = 793
    DN_HANDSHAKE = 795
    PING = 797
    PONG = 799
    DISCONNECT = 801

    CONNECT = 785
    CONNECT_IN = 797
Attributes
CONNECT = 785 class-attribute instance-attribute
CONNECT_IN = 797 class-attribute instance-attribute
DISCONNECT = 801 class-attribute instance-attribute
DN_HANDSHAKE = 795 class-attribute instance-attribute
GETPEERLIST = 791 class-attribute instance-attribute
HANDSHAKE = 793 class-attribute instance-attribute
PEERLIST = 789 class-attribute instance-attribute
PING = 797 class-attribute instance-attribute
PONG = 799 class-attribute instance-attribute
PRIVMSG = 685 class-attribute instance-attribute
PUBMSG = 687 class-attribute instance-attribute

ProtocolMessage

Bases: BaseModel

Source code in jmcore/src/jmcore/protocol.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
class ProtocolMessage(BaseModel):
    type: MessageType
    payload: dict[str, Any]

    def to_json(self) -> str:
        return json.dumps({"type": self.type.value, "data": self.payload})

    @classmethod
    def from_json(cls, data: str) -> ProtocolMessage:
        obj = json.loads(data)
        return cls(type=MessageType(obj["type"]), payload=obj["data"])

    def to_bytes(self) -> bytes:
        return self.to_json().encode("utf-8")

    @classmethod
    def from_bytes(cls, data: bytes) -> ProtocolMessage:
        return cls.from_json(data.decode("utf-8"))
Attributes
payload: dict[str, Any] instance-attribute
type: MessageType instance-attribute
Functions
from_bytes(data: bytes) -> ProtocolMessage classmethod
Source code in jmcore/src/jmcore/protocol.py
366
367
368
@classmethod
def from_bytes(cls, data: bytes) -> ProtocolMessage:
    return cls.from_json(data.decode("utf-8"))
from_json(data: str) -> ProtocolMessage classmethod
Source code in jmcore/src/jmcore/protocol.py
358
359
360
361
@classmethod
def from_json(cls, data: str) -> ProtocolMessage:
    obj = json.loads(data)
    return cls(type=MessageType(obj["type"]), payload=obj["data"])
to_bytes() -> bytes
Source code in jmcore/src/jmcore/protocol.py
363
364
def to_bytes(self) -> bytes:
    return self.to_json().encode("utf-8")
to_json() -> str
Source code in jmcore/src/jmcore/protocol.py
355
356
def to_json(self) -> str:
    return json.dumps({"type": self.type.value, "data": self.payload})

RequiredFeatures

Features that this peer requires from counterparties.

Used to filter incompatible peers during maker selection.

Source code in jmcore/src/jmcore/protocol.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@dataclass
class RequiredFeatures:
    """
    Features that this peer requires from counterparties.

    Used to filter incompatible peers during maker selection.
    """

    required: set[str] = Field(default_factory=set)

    @classmethod
    def for_neutrino_taker(cls) -> RequiredFeatures:
        """Create requirements for a taker using Neutrino backend."""
        return cls(required={FEATURE_NEUTRINO_COMPAT})

    @classmethod
    def none(cls) -> RequiredFeatures:
        """No required features."""
        return cls(required=set())

    def is_compatible(self, peer_features: FeatureSet) -> tuple[bool, str]:
        """Check if peer supports all required features."""
        missing = self.required - peer_features.features
        if missing:
            return False, f"Missing required features: {missing}"
        return True, ""

    def __bool__(self) -> bool:
        return bool(self.required)
Attributes
required: set[str] = Field(default_factory=set) class-attribute instance-attribute
Functions
__bool__() -> bool
Source code in jmcore/src/jmcore/protocol.py
211
212
def __bool__(self) -> bool:
    return bool(self.required)
for_neutrino_taker() -> RequiredFeatures classmethod

Create requirements for a taker using Neutrino backend.

Source code in jmcore/src/jmcore/protocol.py
194
195
196
197
@classmethod
def for_neutrino_taker(cls) -> RequiredFeatures:
    """Create requirements for a taker using Neutrino backend."""
    return cls(required={FEATURE_NEUTRINO_COMPAT})
is_compatible(peer_features: FeatureSet) -> tuple[bool, str]

Check if peer supports all required features.

Source code in jmcore/src/jmcore/protocol.py
204
205
206
207
208
209
def is_compatible(self, peer_features: FeatureSet) -> tuple[bool, str]:
    """Check if peer supports all required features."""
    missing = self.required - peer_features.features
    if missing:
        return False, f"Missing required features: {missing}"
    return True, ""
none() -> RequiredFeatures classmethod

No required features.

Source code in jmcore/src/jmcore/protocol.py
199
200
201
202
@classmethod
def none(cls) -> RequiredFeatures:
    """No required features."""
    return cls(required=set())

UTXOMetadata

Extended UTXO metadata for Neutrino-compatible verification.

This allows light clients to verify UTXOs without arbitrary blockchain queries by providing the scriptPubKey (for Neutrino watch list) and block height (for efficient rescan starting point).

Source code in jmcore/src/jmcore/protocol.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
@dataclass
class UTXOMetadata:
    """
    Extended UTXO metadata for Neutrino-compatible verification.

    This allows light clients to verify UTXOs without arbitrary blockchain queries
    by providing the scriptPubKey (for Neutrino watch list) and block height
    (for efficient rescan starting point).
    """

    txid: str
    vout: int
    scriptpubkey: str | None = None  # Hex-encoded scriptPubKey
    blockheight: int | None = None  # Block height where UTXO was confirmed

    def to_legacy_str(self) -> str:
        """Format as legacy string: txid:vout"""
        return f"{self.txid}:{self.vout}"

    def to_extended_str(self) -> str:
        """Format as extended string: txid:vout:scriptpubkey:blockheight"""
        if self.scriptpubkey is None or self.blockheight is None:
            return self.to_legacy_str()
        return f"{self.txid}:{self.vout}:{self.scriptpubkey}:{self.blockheight}"

    @classmethod
    def from_str(cls, s: str) -> UTXOMetadata:
        """
        Parse UTXO string in either legacy or extended format.

        Legacy format: txid:vout
        Extended format: txid:vout:scriptpubkey:blockheight
        """
        parts = s.split(":")
        if len(parts) == 2:
            # Legacy format
            return cls(txid=parts[0], vout=int(parts[1]))
        elif len(parts) == 4:
            # Extended format
            return cls(
                txid=parts[0],
                vout=int(parts[1]),
                scriptpubkey=parts[2],
                blockheight=int(parts[3]),
            )
        else:
            raise ValueError(f"Invalid UTXO format: {s}")

    def has_neutrino_metadata(self) -> bool:
        """Check if this UTXO has the metadata needed for Neutrino verification."""
        return self.scriptpubkey is not None and self.blockheight is not None

    @staticmethod
    def is_valid_scriptpubkey(scriptpubkey: str) -> bool:
        """Validate scriptPubKey format (hex string)."""
        if not scriptpubkey:
            return False
        # Must be valid hex
        if not re.match(r"^[0-9a-fA-F]+$", scriptpubkey):
            return False
        # Common scriptPubKey lengths (in hex chars):
        # P2PKH: 50 (25 bytes), P2SH: 46 (23 bytes)
        # P2WPKH: 44 (22 bytes), P2WSH: 68 (34 bytes)
        # P2TR: 68 (34 bytes)
        return not (len(scriptpubkey) < 4 or len(scriptpubkey) > 200)
Attributes
blockheight: int | None = None class-attribute instance-attribute
scriptpubkey: str | None = None class-attribute instance-attribute
txid: str instance-attribute
vout: int instance-attribute
Functions
from_str(s: str) -> UTXOMetadata classmethod

Parse UTXO string in either legacy or extended format.

Legacy format: txid:vout Extended format: txid:vout:scriptpubkey:blockheight

Source code in jmcore/src/jmcore/protocol.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
@classmethod
def from_str(cls, s: str) -> UTXOMetadata:
    """
    Parse UTXO string in either legacy or extended format.

    Legacy format: txid:vout
    Extended format: txid:vout:scriptpubkey:blockheight
    """
    parts = s.split(":")
    if len(parts) == 2:
        # Legacy format
        return cls(txid=parts[0], vout=int(parts[1]))
    elif len(parts) == 4:
        # Extended format
        return cls(
            txid=parts[0],
            vout=int(parts[1]),
            scriptpubkey=parts[2],
            blockheight=int(parts[3]),
        )
    else:
        raise ValueError(f"Invalid UTXO format: {s}")
has_neutrino_metadata() -> bool

Check if this UTXO has the metadata needed for Neutrino verification.

Source code in jmcore/src/jmcore/protocol.py
277
278
279
def has_neutrino_metadata(self) -> bool:
    """Check if this UTXO has the metadata needed for Neutrino verification."""
    return self.scriptpubkey is not None and self.blockheight is not None
is_valid_scriptpubkey(scriptpubkey: str) -> bool staticmethod

Validate scriptPubKey format (hex string).

Source code in jmcore/src/jmcore/protocol.py
281
282
283
284
285
286
287
288
289
290
291
292
293
@staticmethod
def is_valid_scriptpubkey(scriptpubkey: str) -> bool:
    """Validate scriptPubKey format (hex string)."""
    if not scriptpubkey:
        return False
    # Must be valid hex
    if not re.match(r"^[0-9a-fA-F]+$", scriptpubkey):
        return False
    # Common scriptPubKey lengths (in hex chars):
    # P2PKH: 50 (25 bytes), P2SH: 46 (23 bytes)
    # P2WPKH: 44 (22 bytes), P2WSH: 68 (34 bytes)
    # P2TR: 68 (34 bytes)
    return not (len(scriptpubkey) < 4 or len(scriptpubkey) > 200)
to_extended_str() -> str

Format as extended string: txid:vout:scriptpubkey:blockheight

Source code in jmcore/src/jmcore/protocol.py
248
249
250
251
252
def to_extended_str(self) -> str:
    """Format as extended string: txid:vout:scriptpubkey:blockheight"""
    if self.scriptpubkey is None or self.blockheight is None:
        return self.to_legacy_str()
    return f"{self.txid}:{self.vout}:{self.scriptpubkey}:{self.blockheight}"
to_legacy_str() -> str

Format as legacy string: txid:vout

Source code in jmcore/src/jmcore/protocol.py
244
245
246
def to_legacy_str(self) -> str:
    """Format as legacy string: txid:vout"""
    return f"{self.txid}:{self.vout}"

Functions

create_handshake_request(nick: str, location: str, network: str, directory: bool = False, neutrino_compat: bool = False, features: FeatureSet | None = None) -> dict[str, Any]

Create a handshake request message.

Args: nick: Bot nickname location: Onion address or NOT-SERVING-ONION network: Bitcoin network (mainnet, testnet, signet, regtest) directory: True if this is a directory server neutrino_compat: True to advertise Neutrino-compatible UTXO metadata support features: FeatureSet to advertise (overrides neutrino_compat if provided)

Returns: Handshake request payload dict

Source code in jmcore/src/jmcore/protocol.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def create_handshake_request(
    nick: str,
    location: str,
    network: str,
    directory: bool = False,
    neutrino_compat: bool = False,
    features: FeatureSet | None = None,
) -> dict[str, Any]:
    """
    Create a handshake request message.

    Args:
        nick: Bot nickname
        location: Onion address or NOT-SERVING-ONION
        network: Bitcoin network (mainnet, testnet, signet, regtest)
        directory: True if this is a directory server
        neutrino_compat: True to advertise Neutrino-compatible UTXO metadata support
        features: FeatureSet to advertise (overrides neutrino_compat if provided)

    Returns:
        Handshake request payload dict
    """
    if features is not None:
        features_dict = features.to_dict()
    else:
        features_dict = {}
        if neutrino_compat:
            features_dict[FEATURE_NEUTRINO_COMPAT] = True

    return {
        "app-name": "joinmarket",
        "directory": directory,
        "location-string": location,
        "proto-ver": JM_VERSION,
        "features": features_dict,
        "nick": nick,
        "network": network,
    }

create_handshake_response(nick: str, network: str, accepted: bool = True, motd: str = 'JoinMarket Directory Server', neutrino_compat: bool = False, features: FeatureSet | None = None) -> dict[str, Any]

Create a handshake response message.

Args: nick: Directory server nickname network: Bitcoin network accepted: Whether the connection is accepted motd: Message of the day neutrino_compat: True to advertise Neutrino-compatible UTXO metadata support features: FeatureSet to advertise (overrides neutrino_compat if provided)

Returns: Handshake response payload dict

Source code in jmcore/src/jmcore/protocol.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def create_handshake_response(
    nick: str,
    network: str,
    accepted: bool = True,
    motd: str = "JoinMarket Directory Server",
    neutrino_compat: bool = False,
    features: FeatureSet | None = None,
) -> dict[str, Any]:
    """
    Create a handshake response message.

    Args:
        nick: Directory server nickname
        network: Bitcoin network
        accepted: Whether the connection is accepted
        motd: Message of the day
        neutrino_compat: True to advertise Neutrino-compatible UTXO metadata support
        features: FeatureSet to advertise (overrides neutrino_compat if provided)

    Returns:
        Handshake response payload dict
    """
    if features is not None:
        features_dict = features.to_dict()
    else:
        features_dict = {}
        if neutrino_compat:
            features_dict[FEATURE_NEUTRINO_COMPAT] = True

    return {
        "app-name": "joinmarket",
        "directory": True,
        "proto-ver-min": JM_VERSION,
        "proto-ver-max": JM_VERSION,
        "features": features_dict,
        "accepted": accepted,
        "nick": nick,
        "network": network,
        "motd": motd,
    }

create_peerlist_entry(nick: str, location: str, disconnected: bool = False, features: FeatureSet | None = None) -> str

Create a peerlist entry string.

Format: - Legacy: nick;location or nick;location;D - Extended: nick;location;F:feature1,feature2 or nick;location;D;F:feature1,feature2

The F: prefix is used to identify the features field and maintain backward compatibility.

Source code in jmcore/src/jmcore/protocol.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
def create_peerlist_entry(
    nick: str,
    location: str,
    disconnected: bool = False,
    features: FeatureSet | None = None,
) -> str:
    """
    Create a peerlist entry string.

    Format:
    - Legacy: nick;location or nick;location;D
    - Extended: nick;location;F:feature1,feature2 or nick;location;D;F:feature1,feature2

    The F: prefix is used to identify the features field and maintain backward compatibility.
    """
    entry = f"{nick}{NICK_PEERLOCATOR_SEPARATOR}{location}"
    if disconnected:
        entry += f"{NICK_PEERLOCATOR_SEPARATOR}D"
    if features and features.features:
        entry += f"{NICK_PEERLOCATOR_SEPARATOR}F:{features.to_comma_string()}"
    return entry

format_jm_message(from_nick: str, to_nick: str, cmd: str, message: str) -> str

Source code in jmcore/src/jmcore/protocol.py
529
530
def format_jm_message(from_nick: str, to_nick: str, cmd: str, message: str) -> str:
    return f"{from_nick}{COMMAND_PREFIX}{to_nick}{COMMAND_PREFIX}{cmd} {message}"

format_utxo_list(utxos: list[UTXOMetadata], extended: bool = False) -> str

Format a list of UTXOs as comma-separated string.

Args: utxos: List of UTXOMetadata objects extended: If True, use extended format with scriptpubkey:blockheight

Returns: Comma-separated UTXO string

Source code in jmcore/src/jmcore/protocol.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def format_utxo_list(utxos: list[UTXOMetadata], extended: bool = False) -> str:
    """
    Format a list of UTXOs as comma-separated string.

    Args:
        utxos: List of UTXOMetadata objects
        extended: If True, use extended format with scriptpubkey:blockheight

    Returns:
        Comma-separated UTXO string
    """
    if extended:
        return ",".join(u.to_extended_str() for u in utxos)
    else:
        return ",".join(u.to_legacy_str() for u in utxos)

get_nick_version(nick: str) -> int

Extract protocol version from a JoinMarket nick.

Nick format: J{version}{hash} where version is a single digit. Example: J5abc123... (v5)

Returns JM_VERSION (5) if version cannot be determined.

Source code in jmcore/src/jmcore/protocol.py
215
216
217
218
219
220
221
222
223
224
225
226
def get_nick_version(nick: str) -> int:
    """
    Extract protocol version from a JoinMarket nick.

    Nick format: J{version}{hash} where version is a single digit.
    Example: J5abc123... (v5)

    Returns JM_VERSION (5) if version cannot be determined.
    """
    if nick and len(nick) >= 2 and nick[0] == "J" and nick[1].isdigit():
        return int(nick[1])
    return JM_VERSION

parse_jm_message(msg: str) -> tuple[str, str, str] | None

Source code in jmcore/src/jmcore/protocol.py
533
534
535
536
537
538
539
540
541
542
543
def parse_jm_message(msg: str) -> tuple[str, str, str] | None:
    try:
        parts = msg.split(COMMAND_PREFIX)
        if len(parts) < 3:
            return None
        from_nick = parts[0]
        to_nick = parts[1]
        rest = COMMAND_PREFIX.join(parts[2:])
        return (from_nick, to_nick, rest)
    except Exception:
        return None

parse_peer_location(location: str) -> tuple[str, int]

Source code in jmcore/src/jmcore/protocol.py
467
468
469
470
471
472
473
474
475
476
477
def parse_peer_location(location: str) -> tuple[str, int]:
    if location == NOT_SERVING_ONION_HOSTNAME:
        return (location, -1)
    try:
        host, port_str = location.split(":")
        port = int(port_str)
        if port <= 0 or port > 65535:
            raise ValueError(f"Invalid port: {port}")
        return (host, port)
    except (ValueError, AttributeError) as e:
        raise ValueError(f"Invalid location string: {location}") from e

parse_peerlist_entry(entry: str) -> tuple[str, str, bool, FeatureSet]

Parse a peerlist entry string.

Returns: Tuple of (nick, location, disconnected, features)

Source code in jmcore/src/jmcore/protocol.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
def parse_peerlist_entry(entry: str) -> tuple[str, str, bool, FeatureSet]:
    """
    Parse a peerlist entry string.

    Returns:
        Tuple of (nick, location, disconnected, features)
    """
    parts = entry.split(NICK_PEERLOCATOR_SEPARATOR)
    if len(parts) < 2:
        raise ValueError(f"Invalid peerlist entry: {entry}")

    nick = parts[0]
    location = parts[1]
    disconnected = False
    features = FeatureSet()

    # Parse remaining parts
    for part in parts[2:]:
        if part == "D":
            disconnected = True
        elif part.startswith("F:"):
            features = FeatureSet.from_comma_string(part[2:])

    return (nick, location, disconnected, features)

parse_utxo_list(utxo_list_str: str, require_metadata: bool = False) -> list[UTXOMetadata]

Parse a comma-separated list of UTXOs.

Args: utxo_list_str: Comma-separated UTXOs (legacy or extended format) require_metadata: If True, raise error if any UTXO lacks Neutrino metadata

Returns: List of UTXOMetadata objects

Source code in jmcore/src/jmcore/protocol.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def parse_utxo_list(utxo_list_str: str, require_metadata: bool = False) -> list[UTXOMetadata]:
    """
    Parse a comma-separated list of UTXOs.

    Args:
        utxo_list_str: Comma-separated UTXOs (legacy or extended format)
        require_metadata: If True, raise error if any UTXO lacks Neutrino metadata

    Returns:
        List of UTXOMetadata objects
    """
    if not utxo_list_str:
        return []

    utxos = []
    for utxo_str in utxo_list_str.split(","):
        utxo = UTXOMetadata.from_str(utxo_str.strip())
        if require_metadata and not utxo.has_neutrino_metadata():
            raise ValueError(f"UTXO {utxo.to_legacy_str()} missing Neutrino metadata")
        utxos.append(utxo)
    return utxos

peer_supports_neutrino_compat(handshake_data: dict[str, Any]) -> bool

Check if a peer supports Neutrino-compatible UTXO metadata.

Args: handshake_data: Handshake payload from peer

Returns: True if peer advertises neutrino_compat feature

Source code in jmcore/src/jmcore/protocol.py
453
454
455
456
457
458
459
460
461
462
463
464
def peer_supports_neutrino_compat(handshake_data: dict[str, Any]) -> bool:
    """
    Check if a peer supports Neutrino-compatible UTXO metadata.

    Args:
        handshake_data: Handshake payload from peer

    Returns:
        True if peer advertises neutrino_compat feature
    """
    features = handshake_data.get("features", {})
    return features.get(FEATURE_NEUTRINO_COMPAT, False)