Skip to content

jmwallet.wallet.bond_registry

jmwallet.wallet.bond_registry

Fidelity bond registry for persistent storage of bond metadata.

This module provides storage and retrieval of fidelity bond information, including addresses, locktimes, witness scripts, and UTXO tracking.

Classes

BondRegistry

Bases: BaseModel

Registry of all fidelity bonds for a wallet.

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class BondRegistry(BaseModel):
    """Registry of all fidelity bonds for a wallet."""

    version: int = 1
    bonds: list[FidelityBondInfo] = []

    def add_bond(self, bond: FidelityBondInfo) -> None:
        """Add a new bond to the registry."""
        # Check for duplicate address
        for existing in self.bonds:
            if existing.address == bond.address:
                logger.warning(f"Bond with address {bond.address} already exists, updating")
                self.bonds.remove(existing)
                break
        self.bonds.append(bond)

    def get_bond_by_address(self, address: str) -> FidelityBondInfo | None:
        """Get a bond by its address."""
        for bond in self.bonds:
            if bond.address == address:
                return bond
        return None

    def get_bond_by_index(self, index: int, locktime: int) -> FidelityBondInfo | None:
        """Get a bond by its index and locktime."""
        for bond in self.bonds:
            if bond.index == index and bond.locktime == locktime:
                return bond
        return None

    def get_funded_bonds(self) -> list[FidelityBondInfo]:
        """Get all funded bonds."""
        return [b for b in self.bonds if b.is_funded]

    def get_active_bonds(self) -> list[FidelityBondInfo]:
        """Get all funded bonds that are not yet expired."""
        return [b for b in self.bonds if b.is_funded and not b.is_expired]

    def get_best_bond(self) -> FidelityBondInfo | None:
        """
        Get the best bond for advertising.

        Selection criteria (in order):
        1. Must be funded
        2. Must not be expired
        3. Highest value wins
        4. If tied, longest locktime remaining wins
        """
        active = self.get_active_bonds()
        if not active:
            return None

        # Sort by value (descending), then by time_until_unlock (descending)
        active.sort(key=lambda b: (b.value or 0, b.time_until_unlock), reverse=True)
        return active[0]

    def update_utxo_info(
        self,
        address: str,
        txid: str,
        vout: int,
        value: int,
        confirmations: int,
    ) -> bool:
        """Update UTXO information for a bond."""
        bond = self.get_bond_by_address(address)
        if bond:
            bond.txid = txid
            bond.vout = vout
            bond.value = value
            bond.confirmations = confirmations
            return True
        return False
Attributes
bonds: list[FidelityBondInfo] = [] class-attribute instance-attribute
version: int = 1 class-attribute instance-attribute
Functions
add_bond(bond: FidelityBondInfo) -> None

Add a new bond to the registry.

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
 97
 98
 99
100
101
102
103
104
105
def add_bond(self, bond: FidelityBondInfo) -> None:
    """Add a new bond to the registry."""
    # Check for duplicate address
    for existing in self.bonds:
        if existing.address == bond.address:
            logger.warning(f"Bond with address {bond.address} already exists, updating")
            self.bonds.remove(existing)
            break
    self.bonds.append(bond)
get_active_bonds() -> list[FidelityBondInfo]

Get all funded bonds that are not yet expired.

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
125
126
127
def get_active_bonds(self) -> list[FidelityBondInfo]:
    """Get all funded bonds that are not yet expired."""
    return [b for b in self.bonds if b.is_funded and not b.is_expired]
get_best_bond() -> FidelityBondInfo | None

Get the best bond for advertising.

Selection criteria (in order): 1. Must be funded 2. Must not be expired 3. Highest value wins 4. If tied, longest locktime remaining wins

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def get_best_bond(self) -> FidelityBondInfo | None:
    """
    Get the best bond for advertising.

    Selection criteria (in order):
    1. Must be funded
    2. Must not be expired
    3. Highest value wins
    4. If tied, longest locktime remaining wins
    """
    active = self.get_active_bonds()
    if not active:
        return None

    # Sort by value (descending), then by time_until_unlock (descending)
    active.sort(key=lambda b: (b.value or 0, b.time_until_unlock), reverse=True)
    return active[0]
get_bond_by_address(address: str) -> FidelityBondInfo | None

Get a bond by its address.

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
107
108
109
110
111
112
def get_bond_by_address(self, address: str) -> FidelityBondInfo | None:
    """Get a bond by its address."""
    for bond in self.bonds:
        if bond.address == address:
            return bond
    return None
get_bond_by_index(index: int, locktime: int) -> FidelityBondInfo | None

Get a bond by its index and locktime.

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
114
115
116
117
118
119
def get_bond_by_index(self, index: int, locktime: int) -> FidelityBondInfo | None:
    """Get a bond by its index and locktime."""
    for bond in self.bonds:
        if bond.index == index and bond.locktime == locktime:
            return bond
    return None
get_funded_bonds() -> list[FidelityBondInfo]

Get all funded bonds.

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
121
122
123
def get_funded_bonds(self) -> list[FidelityBondInfo]:
    """Get all funded bonds."""
    return [b for b in self.bonds if b.is_funded]
update_utxo_info(address: str, txid: str, vout: int, value: int, confirmations: int) -> bool

Update UTXO information for a bond.

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def update_utxo_info(
    self,
    address: str,
    txid: str,
    vout: int,
    value: int,
    confirmations: int,
) -> bool:
    """Update UTXO information for a bond."""
    bond = self.get_bond_by_address(address)
    if bond:
        bond.txid = txid
        bond.vout = vout
        bond.value = value
        bond.confirmations = confirmations
        return True
    return False

FidelityBondInfo

Bases: BaseModel

Information about a single fidelity bond.

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class FidelityBondInfo(BaseModel):
    """Information about a single fidelity bond."""

    address: str
    locktime: int
    locktime_human: str
    index: int
    path: str
    pubkey: str
    witness_script_hex: str
    network: str
    created_at: str
    # UTXO info (populated when bond is funded)
    txid: str | None = None
    vout: int | None = None
    value: int | None = None  # in satoshis
    confirmations: int | None = None
    # Certificate info (for cold wallet support)
    # Allows keeping bond UTXO private key in cold storage (hardware wallet)
    # while using a hot wallet certificate key for signing nick proofs
    cert_pubkey: str | None = None  # Hot wallet certificate public key (hex)
    cert_privkey: str | None = None  # Hot wallet certificate private key (hex)
    cert_signature: str | None = None  # Certificate signature by UTXO key (hex)
    cert_expiry: int | None = None  # Certificate expiry in 2016-block periods

    @property
    def is_funded(self) -> bool:
        """Check if this bond has been funded."""
        return self.txid is not None and self.value is not None and self.value > 0

    @property
    def is_expired(self) -> bool:
        """Check if the locktime has passed."""
        import time

        return time.time() >= self.locktime

    @property
    def time_until_unlock(self) -> int:
        """Seconds until the bond can be unlocked. Returns 0 if already expired."""
        import time

        remaining = self.locktime - int(time.time())
        return max(0, remaining)

    @property
    def has_certificate(self) -> bool:
        """Check if this bond has a certificate configured (for cold wallet mode)."""
        return (
            self.cert_pubkey is not None
            and self.cert_privkey is not None
            and self.cert_signature is not None
            and self.cert_expiry is not None
        )

    def is_certificate_expired(self, current_block_height: int) -> bool:
        """
        Check if the certificate has expired based on current block height.

        Args:
            current_block_height: Current blockchain height

        Returns:
            True if certificate is expired or not configured
        """
        if not self.has_certificate or self.cert_expiry is None:
            return True

        # cert_expiry is stored in 2016-block periods
        expiry_height = self.cert_expiry * 2016
        return current_block_height >= expiry_height
Attributes
address: str instance-attribute
cert_expiry: int | None = None class-attribute instance-attribute
cert_privkey: str | None = None class-attribute instance-attribute
cert_pubkey: str | None = None class-attribute instance-attribute
cert_signature: str | None = None class-attribute instance-attribute
confirmations: int | None = None class-attribute instance-attribute
created_at: str instance-attribute
has_certificate: bool property

Check if this bond has a certificate configured (for cold wallet mode).

index: int instance-attribute
is_expired: bool property

Check if the locktime has passed.

is_funded: bool property

Check if this bond has been funded.

locktime: int instance-attribute
locktime_human: str instance-attribute
network: str instance-attribute
path: str instance-attribute
pubkey: str instance-attribute
time_until_unlock: int property

Seconds until the bond can be unlocked. Returns 0 if already expired.

txid: str | None = None class-attribute instance-attribute
value: int | None = None class-attribute instance-attribute
vout: int | None = None class-attribute instance-attribute
witness_script_hex: str instance-attribute
Functions
is_certificate_expired(current_block_height: int) -> bool

Check if the certificate has expired based on current block height.

Args: current_block_height: Current blockchain height

Returns: True if certificate is expired or not configured

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def is_certificate_expired(self, current_block_height: int) -> bool:
    """
    Check if the certificate has expired based on current block height.

    Args:
        current_block_height: Current blockchain height

    Returns:
        True if certificate is expired or not configured
    """
    if not self.has_certificate or self.cert_expiry is None:
        return True

    # cert_expiry is stored in 2016-block periods
    expiry_height = self.cert_expiry * 2016
    return current_block_height >= expiry_height

Functions

create_bond_info(address: str, locktime: int, index: int, path: str, pubkey_hex: str, witness_script: bytes, network: str) -> FidelityBondInfo

Create a FidelityBondInfo instance.

Args: address: The P2WSH address locktime: Unix timestamp locktime index: Derivation index path: Full derivation path pubkey_hex: Public key as hex witness_script: The witness script bytes network: Network name

Returns: FidelityBondInfo instance

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def create_bond_info(
    address: str,
    locktime: int,
    index: int,
    path: str,
    pubkey_hex: str,
    witness_script: bytes,
    network: str,
) -> FidelityBondInfo:
    """
    Create a FidelityBondInfo instance.

    Args:
        address: The P2WSH address
        locktime: Unix timestamp locktime
        index: Derivation index
        path: Full derivation path
        pubkey_hex: Public key as hex
        witness_script: The witness script bytes
        network: Network name

    Returns:
        FidelityBondInfo instance
    """
    locktime_dt = datetime.fromtimestamp(locktime)
    return FidelityBondInfo(
        address=address,
        locktime=locktime,
        locktime_human=locktime_dt.strftime("%Y-%m-%d %H:%M:%S"),
        index=index,
        path=path,
        pubkey=pubkey_hex,
        witness_script_hex=witness_script.hex(),
        network=network,
        created_at=datetime.now().isoformat(),
    )

get_active_locktimes(data_dir: Path) -> list[int]

Get all locktimes from the bond registry that have funded, active bonds.

This is useful for the maker bot to automatically discover which locktimes to scan for when syncing fidelity bonds, without requiring the user to manually specify --fidelity-bond-locktime.

Args: data_dir: Data directory path

Returns: List of unique locktimes (Unix timestamps) for active bonds

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def get_active_locktimes(data_dir: Path) -> list[int]:
    """
    Get all locktimes from the bond registry that have funded, active bonds.

    This is useful for the maker bot to automatically discover which locktimes
    to scan for when syncing fidelity bonds, without requiring the user to
    manually specify --fidelity-bond-locktime.

    Args:
        data_dir: Data directory path

    Returns:
        List of unique locktimes (Unix timestamps) for active bonds
    """
    registry = load_registry(data_dir)
    active_bonds = registry.get_active_bonds()
    # Get unique locktimes
    locktimes = list({bond.locktime for bond in active_bonds})
    return sorted(locktimes)

get_all_locktimes(data_dir: Path) -> list[int]

Get all locktimes from the bond registry (funded or not).

This includes all bonds in the registry to allow scanning for UTXOs that may have been funded since the last sync.

Args: data_dir: Data directory path

Returns: List of unique locktimes (Unix timestamps) for all bonds

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def get_all_locktimes(data_dir: Path) -> list[int]:
    """
    Get all locktimes from the bond registry (funded or not).

    This includes all bonds in the registry to allow scanning for UTXOs
    that may have been funded since the last sync.

    Args:
        data_dir: Data directory path

    Returns:
        List of unique locktimes (Unix timestamps) for all bonds
    """
    registry = load_registry(data_dir)
    # Get unique locktimes from ALL bonds (not just funded ones)
    locktimes = list({bond.locktime for bond in registry.bonds})
    return sorted(locktimes)

get_registry_path(data_dir: Path) -> Path

Get the path to the bond registry file.

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
166
167
168
def get_registry_path(data_dir: Path) -> Path:
    """Get the path to the bond registry file."""
    return data_dir / "fidelity_bonds.json"

load_registry(data_dir: Path) -> BondRegistry

Load the bond registry from disk.

Args: data_dir: Data directory path

Returns: BondRegistry instance (empty if file doesn't exist)

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def load_registry(data_dir: Path) -> BondRegistry:
    """
    Load the bond registry from disk.

    Args:
        data_dir: Data directory path

    Returns:
        BondRegistry instance (empty if file doesn't exist)
    """
    registry_path = get_registry_path(data_dir)
    if not registry_path.exists():
        return BondRegistry()

    try:
        data = json.loads(registry_path.read_text())
        return BondRegistry.model_validate(data)
    except Exception as e:
        logger.error(f"Failed to load bond registry: {e}")
        # Return empty registry on error, but don't overwrite the file
        return BondRegistry()

save_registry(registry: BondRegistry, data_dir: Path) -> None

Save the bond registry to disk.

Args: registry: BondRegistry instance data_dir: Data directory path

Source code in jmwallet/src/jmwallet/wallet/bond_registry.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def save_registry(registry: BondRegistry, data_dir: Path) -> None:
    """
    Save the bond registry to disk.

    Args:
        registry: BondRegistry instance
        data_dir: Data directory path
    """
    registry_path = get_registry_path(data_dir)
    registry_path.parent.mkdir(parents=True, exist_ok=True)

    try:
        registry_path.write_text(registry.model_dump_json(indent=2))
        logger.debug(f"Saved bond registry to {registry_path}")
    except Exception as e:
        logger.error(f"Failed to save bond registry: {e}")
        raise