Skip to content

jmwallet.backends.bitcoin_core

jmwallet.backends.bitcoin_core

Bitcoin Core RPC blockchain backend. Uses RPC calls but NOT wallet functionality (no BDB dependency).

Attributes

DEFAULT_RPC_TIMEOUT = 30.0 module-attribute

SCAN_BASE_DELAY = 0.5 module-attribute

SCAN_MAX_RETRIES = 30 module-attribute

SCAN_RPC_TIMEOUT = 300.0 module-attribute

SCAN_STATUS_POLL_INTERVAL = 10.0 module-attribute

SENSITIVE_LOGGING = os.environ.get('SENSITIVE_LOGGING', '').lower() in ('1', 'true', 'yes') module-attribute

Classes

BitcoinCoreBackend

Bases: BlockchainBackend

Blockchain backend using Bitcoin Core RPC. Does NOT use Bitcoin Core wallet (avoids BDB issues). Uses scantxoutset and other non-wallet RPC methods.

Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
class BitcoinCoreBackend(BlockchainBackend):
    """
    Blockchain backend using Bitcoin Core RPC.
    Does NOT use Bitcoin Core wallet (avoids BDB issues).
    Uses scantxoutset and other non-wallet RPC methods.
    """

    def __init__(
        self,
        rpc_url: str = "http://127.0.0.1:18443",
        rpc_user: str = "rpcuser",
        rpc_password: str = "rpcpassword",
        scan_timeout: float = SCAN_RPC_TIMEOUT,
    ):
        self.rpc_url = rpc_url.rstrip("/")
        self.rpc_user = rpc_user
        self.rpc_password = rpc_password
        self.scan_timeout = scan_timeout
        # Client for regular RPC calls
        self.client = httpx.AsyncClient(timeout=DEFAULT_RPC_TIMEOUT, auth=(rpc_user, rpc_password))
        # Separate client for long-running scans
        self._scan_client = httpx.AsyncClient(timeout=scan_timeout, auth=(rpc_user, rpc_password))
        self._request_id = 0

    async def _rpc_call(
        self,
        method: str,
        params: list | None = None,
        client: httpx.AsyncClient | None = None,
    ) -> Any:
        """
        Make an RPC call to Bitcoin Core.

        Args:
            method: RPC method name
            params: Method parameters
            client: Optional httpx client (uses default client if not provided)

        Returns:
            RPC result

        Raises:
            ValueError: On RPC errors
            httpx.HTTPError: On connection/timeout errors
        """
        self._request_id += 1
        payload = {
            "jsonrpc": "2.0",
            "id": self._request_id,
            "method": method,
            "params": params or [],
        }

        use_client = client or self.client

        try:
            response = await use_client.post(self.rpc_url, json=payload)
            response.raise_for_status()
            data = response.json()

            if "error" in data and data["error"]:
                error_info = data["error"]
                error_code = error_info.get("code", "unknown")
                error_msg = error_info.get("message", str(error_info))
                raise ValueError(f"RPC error {error_code}: {error_msg}")

            return data.get("result")

        except httpx.TimeoutException as e:
            logger.error(f"RPC call timed out: {method} - {e}")
            raise
        except httpx.HTTPError as e:
            logger.error(f"RPC call failed: {method} - {e}")
            raise

    async def _rpc_batch(
        self,
        requests: list[tuple[str, list]],
        client: httpx.AsyncClient | None = None,
    ) -> list[Any]:
        """Make a JSON-RPC batch call to Bitcoin Core.

        Sends multiple RPC calls in a single HTTP request. This is dramatically
        more efficient than individual calls when verifying many UTXOs -- e.g.
        100 gettxout calls become 1 HTTP round-trip instead of 100.

        Args:
            requests: List of (method, params) tuples
            client: Optional httpx client (uses default client if not provided)

        Returns:
            List of results in the same order as requests. Failed calls return None.

        Raises:
            httpx.HTTPError: On connection/timeout errors
        """
        if not requests:
            return []

        batch_payload = []
        for i, (method, params) in enumerate(requests):
            batch_payload.append(
                {
                    "jsonrpc": "2.0",
                    "id": i,
                    "method": method,
                    "params": params,
                }
            )

        use_client = client or self.client

        try:
            response = await use_client.post(self.rpc_url, json=batch_payload)
            response.raise_for_status()
            data = response.json()
        except httpx.TimeoutException as e:
            logger.error(f"RPC batch call timed out ({len(requests)} requests) - {e}")
            raise
        except httpx.HTTPError as e:
            logger.error(f"RPC batch call failed ({len(requests)} requests) - {e}")
            raise

        # Bitcoin Core returns results as a list, but order is NOT guaranteed
        # to match request order. Index by id.
        results: list[Any] = [None] * len(requests)
        for item in data:
            idx = item.get("id")
            if idx is not None and 0 <= idx < len(requests):
                if item.get("error"):
                    error_info = item["error"]
                    error_code = error_info.get("code", "unknown")
                    error_msg = error_info.get("message", str(error_info))
                    logger.debug(
                        "RPC batch item %d (%s) error %s: %s",
                        idx,
                        requests[idx][0],
                        error_code,
                        error_msg,
                    )
                    results[idx] = None
                else:
                    results[idx] = item.get("result")

        return results

    async def _scantxoutset_with_retry(
        self, descriptors: Sequence[str | dict[str, Any]]
    ) -> dict[str, Any] | None:
        """
        Execute scantxoutset with retry logic for handling concurrent scan conflicts.

        Bitcoin Core only allows one scantxoutset at a time. This method:
        1. Checks if a scan is already in progress
        2. If so, waits for it to complete (via status polling) before starting ours
        3. Starts our scan with extended timeout for mainnet

        Args:
            descriptors: List of output descriptors to scan for. Can be:
                - Simple strings: "addr(bc1q...)"
                - Dicts with range: {"desc": "wpkh([fp/84'/0'/0'/0/*)", "range": [0, 999]}

        Returns:
            Scan result dict or None if all retries failed
        """
        for attempt in range(SCAN_MAX_RETRIES):
            try:
                # First check if a scan is already running
                status = await self._rpc_call("scantxoutset", ["status"])
                if status is not None:
                    # A scan is in progress - wait for it
                    # Bitcoin Core returns progress as 0-100, not 0-1
                    progress = status.get("progress", 0) / 100.0
                    logger.debug(
                        f"Another scan in progress ({progress:.1%}), waiting... "
                        f"(attempt {attempt + 1}/{SCAN_MAX_RETRIES})"
                    )
                    if attempt < SCAN_MAX_RETRIES - 1:
                        await asyncio.sleep(SCAN_STATUS_POLL_INTERVAL)
                        continue

                # Start our scan with extended timeout
                logger.debug(f"Starting UTXO scan for {len(descriptors)} descriptor(s)...")
                if SENSITIVE_LOGGING:
                    logger.debug(f"Descriptors for scan: {descriptors}")
                result = await self._rpc_call(
                    "scantxoutset", ["start", descriptors], client=self._scan_client
                )
                if result:
                    unspent_count = len(result.get("unspents", []))
                    total_amount = result.get("total_amount", 0)
                    logger.debug(
                        f"Scan completed: found {unspent_count} UTXOs, total {total_amount:.8f} BTC"
                    )
                    if SENSITIVE_LOGGING and unspent_count > 0:
                        logger.debug(f"Scan result: {result}")
                return result

            except ValueError as e:
                error_str = str(e)
                # Check for "scan already in progress" error (code -8)
                if "code': -8" in error_str or "Scan already in progress" in error_str:
                    if attempt < SCAN_MAX_RETRIES - 1:
                        delay = SCAN_BASE_DELAY * (2**attempt) + random.uniform(0, 0.5)
                        logger.debug(
                            f"Scan in progress (RPC error), retrying in {delay:.2f}s "
                            f"(attempt {attempt + 1}/{SCAN_MAX_RETRIES})"
                        )
                        await asyncio.sleep(delay)
                        continue
                    else:
                        logger.warning(
                            f"Max retries ({SCAN_MAX_RETRIES}) exceeded waiting for scan slot"
                        )
                        return None
                else:
                    # Other RPC errors - log and re-raise
                    logger.error(f"scantxoutset RPC error: {error_str}")
                    raise

            except httpx.TimeoutException:
                # Timeout during scan - this is a real failure on mainnet
                logger.error(
                    f"scantxoutset timed out after {self.scan_timeout}s. "
                    "Try increasing scan_timeout for mainnet."
                )
                return None

            except Exception as e:
                logger.error(f"Unexpected error during scantxoutset: {type(e).__name__}: {e}")
                raise

        logger.warning(f"scantxoutset failed after {SCAN_MAX_RETRIES} attempts")
        return None

    async def get_utxos(self, addresses: list[str]) -> list[UTXO]:
        utxos: list[UTXO] = []
        if not addresses:
            return utxos

        # Get tip height once for confirmation calculation
        try:
            tip_height = await self.get_block_height()
        except Exception as e:
            logger.error(f"Failed to get block height for UTXO scan: {e}")
            return utxos

        # Process in batches to avoid huge RPC requests
        batch_size: int = 1000
        for i in range(0, len(addresses), batch_size):
            chunk = addresses[i : i + batch_size]
            descriptors = [f"addr({addr})" for addr in chunk]
            if SENSITIVE_LOGGING:
                logger.debug(f"Scanning addresses batch {i // batch_size + 1}: {chunk}")

            try:
                # Scan for all addresses in this chunk at once (with retry for conflicts)
                result = await self._scantxoutset_with_retry(descriptors)

                if not result or "unspents" not in result:
                    continue

                for utxo_data in result["unspents"]:
                    confirmations = 0
                    if utxo_data.get("height", 0) > 0:
                        confirmations = tip_height - utxo_data["height"] + 1

                    # Extract address from descriptor "addr(ADDRESS)#checksum" or "addr(ADDRESS)"
                    desc = utxo_data.get("desc", "")
                    # Remove checksum if present
                    if "#" in desc:
                        desc = desc.split("#")[0]

                    address = ""
                    if desc.startswith("addr(") and desc.endswith(")"):
                        address = desc[5:-1]
                    else:
                        # Only log warning if we really can't parse it (and it's not empty)
                        if desc:
                            logger.warning(f"Failed to parse address from descriptor: '{desc}'")

                    utxo = UTXO(
                        txid=utxo_data["txid"],
                        vout=utxo_data["vout"],
                        value=btc_to_sats(utxo_data["amount"]),
                        address=address,
                        confirmations=confirmations,
                        scriptpubkey=utxo_data.get("scriptPubKey", ""),
                        height=utxo_data.get("height"),
                    )
                    utxos.append(utxo)

                logger.debug(
                    f"Scanned {len(chunk)} addresses, found {len(result['unspents'])} UTXOs"
                )

            except Exception as e:
                logger.warning(f"Failed to scan UTXOs for batch starting {chunk[0]}: {e}")
                continue

        return utxos

    async def scan_descriptors(
        self, descriptors: Sequence[str | dict[str, Any]]
    ) -> dict[str, Any] | None:
        """
        Scan the UTXO set using output descriptors.

        This is much more efficient than scanning individual addresses,
        especially for HD wallets where you can use xpub descriptors with
        ranges to scan thousands of addresses in a single UTXO set pass.

        Example descriptors:
            - "addr(bc1q...)" - single address
            - "wpkh(xpub.../0/*)" - HD wallet external addresses (default range 0-1000)
            - {"desc": "wpkh(xpub.../0/*)", "range": [0, 999]} - explicit range

        Args:
            descriptors: List of output descriptors (strings or dicts with range)

        Returns:
            Raw scan result dict from Bitcoin Core, or None on failure.
            Result includes:
                - success: bool
                - txouts: number of UTXOs scanned
                - height: current block height
                - unspents: list of found UTXOs with txid, vout, scriptPubKey,
                            desc (matched descriptor), amount, height
                - total_amount: sum of all found UTXOs
        """
        if not descriptors:
            return {"success": True, "unspents": [], "total_amount": 0}

        logger.info(f"Starting descriptor scan with {len(descriptors)} descriptor(s)...")
        result = await self._scantxoutset_with_retry(descriptors)

        if result:
            unspent_count = len(result.get("unspents", []))
            total = result.get("total_amount", 0)
            logger.info(
                f"Descriptor scan complete: found {unspent_count} UTXOs, total {total:.8f} BTC"
            )
        else:
            logger.warning("Descriptor scan failed or returned no results")

        return result

    async def get_address_balance(self, address: str) -> int:
        utxos = await self.get_utxos([address])
        balance = sum(utxo.value for utxo in utxos)
        logger.debug(f"Balance for {address}: {balance} sats")
        return balance

    async def broadcast_transaction(self, tx_hex: str) -> str:
        try:
            txid = await self._rpc_call("sendrawtransaction", [tx_hex])
            logger.info(f"Broadcast transaction: {txid}")
            return txid

        except Exception as e:
            logger.error(f"Failed to broadcast transaction: {e}")
            raise ValueError(f"Broadcast failed: {e}") from e

    async def get_transaction(self, txid: str) -> Transaction | None:
        try:
            tx_data = await self._rpc_call("getrawtransaction", [txid, True])

            if not tx_data:
                return None

            confirmations = tx_data.get("confirmations", 0)
            block_height = None
            block_time = None

            if "blockhash" in tx_data:
                block_info = await self._rpc_call("getblockheader", [tx_data["blockhash"]])
                block_height = block_info.get("height")
                block_time = block_info.get("time")

            raw_hex = tx_data.get("hex", "")

            return Transaction(
                txid=txid,
                raw=raw_hex,
                confirmations=confirmations,
                block_height=block_height,
                block_time=block_time,
            )

        except Exception as e:
            logger.debug(f"Failed to fetch transaction {txid}: {e}")
            return None

    async def estimate_fee(self, target_blocks: int) -> float:
        try:
            result = await self._rpc_call("estimatesmartfee", [target_blocks])

            if "feerate" in result:
                btc_per_kb = result["feerate"]
                # Convert BTC/kB to sat/vB (keep precision for sub-sat rates)
                sat_per_vbyte = btc_to_sats(btc_per_kb) / 1000
                logger.debug(f"Estimated fee for {target_blocks} blocks: {sat_per_vbyte} sat/vB")
                return sat_per_vbyte
            else:
                logger.warning("Fee estimation unavailable, using fallback")
                return 1.0

        except Exception as e:
            logger.warning(f"Failed to estimate fee: {e}, using fallback")
            return 1.0

    async def get_mempool_min_fee(self) -> float | None:
        """Get the minimum fee rate (in sat/vB) for transaction to be accepted into mempool.

        Returns:
            Minimum fee rate in sat/vB, or None if unavailable.
        """
        try:
            result = await self._rpc_call("getmempoolinfo", [])
            if "mempoolminfee" in result:
                btc_per_kb = result["mempoolminfee"]
                # Convert BTC/kB to sat/vB
                sat_per_vbyte = btc_to_sats(btc_per_kb) / 1000
                logger.debug(f"Mempool min fee: {sat_per_vbyte} sat/vB")
                return sat_per_vbyte
            return None
        except Exception as e:
            logger.debug(f"Failed to get mempool min fee: {e}")
            return None

    async def get_block_height(self) -> int:
        try:
            info = await self._rpc_call("getblockchaininfo", [])
            height = info.get("blocks", 0)
            logger.debug(f"Current block height: {height}")
            return height

        except Exception as e:
            logger.error(f"Failed to fetch block height: {e}")
            raise

    async def get_block_time(self, block_height: int) -> int:
        try:
            block_hash = await self.get_block_hash(block_height)
            block_header = await self._rpc_call("getblockheader", [block_hash])
            timestamp = block_header.get("time", 0)
            logger.debug(f"Block {block_height} timestamp: {timestamp}")
            return timestamp

        except Exception as e:
            logger.error(f"Failed to fetch block time for height {block_height}: {e}")
            raise

    async def get_block_hash(self, block_height: int) -> str:
        try:
            block_hash = await self._rpc_call("getblockhash", [block_height])
            logger.debug(f"Block hash for height {block_height}: {block_hash}")
            return block_hash

        except Exception as e:
            logger.error(f"Failed to fetch block hash for height {block_height}: {e}")
            raise

    async def get_utxo(self, txid: str, vout: int) -> UTXO | None:
        """Get a specific UTXO from the blockchain UTXO set using gettxout.
        Returns None if the UTXO does not exist or has been spent.

        If not found in confirmed UTXO set, checks mempool for unconfirmed transactions.
        """
        try:
            # gettxout returns None if UTXO doesn't exist or is spent
            # include_mempool=True checks both confirmed and unconfirmed outputs
            result = await self._rpc_call("gettxout", [txid, vout, True])

            if result is None:
                # Not found in UTXO set - check if it's in mempool (unconfirmed)
                logger.debug(
                    f"UTXO {txid}:{vout} not found in confirmed UTXO set, checking mempool..."
                )
                try:
                    # Get raw transaction from mempool
                    tx_data = await self._rpc_call("getrawtransaction", [txid, True])

                    if tx_data and "vout" in tx_data:
                        # Check if the vout exists and hasn't been spent
                        if vout < len(tx_data["vout"]):
                            vout_data = tx_data["vout"][vout]
                            value = btc_to_sats(vout_data.get("value", 0))

                            # Extract address from scriptPubKey
                            script_pub_key = vout_data.get("scriptPubKey", {})
                            address = script_pub_key.get("address", "")
                            # For multiple addresses (e.g., multisig), join them
                            if not address and "addresses" in script_pub_key:
                                addresses = script_pub_key.get("addresses", [])
                                address = addresses[0] if addresses else ""
                            scriptpubkey = script_pub_key.get("hex", "")

                            # Unconfirmed transaction has 0 confirmations
                            logger.info(f"Found UTXO {txid}:{vout} in mempool (unconfirmed)")
                            return UTXO(
                                txid=txid,
                                vout=vout,
                                value=value,
                                address=address,
                                confirmations=0,
                                scriptpubkey=scriptpubkey,
                                height=None,
                            )
                except Exception as mempool_err:
                    logger.debug(f"UTXO {txid}:{vout} not in mempool either: {mempool_err}")

                logger.debug(f"UTXO {txid}:{vout} not found (spent or doesn't exist)")
                return None

            # Get tip height for confirmation calculation
            tip_height = await self.get_block_height()

            confirmations = result.get("confirmations", 0)
            value = btc_to_sats(result.get("value", 0))  # BTC to sats

            # Extract address from scriptPubKey
            script_pub_key = result.get("scriptPubKey", {})
            address = script_pub_key.get("address", "")
            scriptpubkey = script_pub_key.get("hex", "")

            # Calculate height from confirmations
            height = None
            if confirmations > 0:
                height = tip_height - confirmations + 1

            return UTXO(
                txid=txid,
                vout=vout,
                value=value,
                address=address,
                confirmations=confirmations,
                scriptpubkey=scriptpubkey,
                height=height,
            )

        except Exception as e:
            logger.error(f"Failed to get UTXO {txid}:{vout}: {e}")
            return None

    async def verify_bonds(
        self,
        bonds: list[BondVerificationRequest],
    ) -> list[BondVerificationResult]:
        """Verify fidelity bond UTXOs using batched JSON-RPC calls.

        Uses ``_rpc_batch()`` to verify all bonds in just 2-3 HTTP requests:
        1. Batch ``gettxout`` for all bonds (1 request)
        2. ``getblockchaininfo`` for current height (1 request, concurrent with #1)
        3. Batch ``getblockheader`` for unique block hashes from results (1 request)

        For 100 bonds this is ~3 HTTP round-trips instead of ~200 sequential ones.
        """
        if not bonds:
            return []

        # Step 1: Batch gettxout + getblockchaininfo concurrently
        gettxout_requests: list[tuple[str, list]] = [
            ("gettxout", [b.txid, b.vout, True]) for b in bonds
        ]

        gettxout_task = self._rpc_batch(gettxout_requests)
        height_task = self.get_block_height()
        gettxout_results, current_height = await asyncio.gather(gettxout_task, height_task)

        # Step 2: Collect unique block hashes that need timestamp lookups
        for result in gettxout_results:
            if result is not None and result.get("confirmations", 0) > 0:
                # gettxout returns bestblock but we need the block at confirmation height
                # confirmations = tip - conf_height + 1, so conf_height = tip - confs + 1
                confs = result["confirmations"]
                conf_height = current_height - confs + 1
                # We'll need the block hash for this height; collect heights first
                result["_conf_height"] = conf_height

        # Get block hashes for all unique confirmation heights
        unique_conf_heights: set[int] = set()
        for result in gettxout_results:
            if result is not None and "_conf_height" in result:
                unique_conf_heights.add(result["_conf_height"])

        # Batch getblockhash for unique heights
        height_to_time: dict[int, int] = {}
        if unique_conf_heights:
            sorted_heights = sorted(unique_conf_heights)
            hash_requests: list[tuple[str, list]] = [("getblockhash", [h]) for h in sorted_heights]
            hash_results = await self._rpc_batch(hash_requests)

            # Now batch getblockheader for the hashes
            header_requests: list[tuple[str, list]] = []
            height_order: list[int] = []
            for i, h in enumerate(sorted_heights):
                block_hash = hash_results[i]
                if block_hash is not None:
                    header_requests.append(("getblockheader", [block_hash]))
                    height_order.append(h)

            if header_requests:
                header_results = await self._rpc_batch(header_requests)
                for i, h in enumerate(height_order):
                    header = header_results[i]
                    if header is not None:
                        height_to_time[h] = header.get("time", 0)

        # Step 3: Build results
        results: list[BondVerificationResult] = []
        for i, bond in enumerate(bonds):
            gettxout_result = gettxout_results[i]

            if gettxout_result is None:
                results.append(
                    BondVerificationResult(
                        txid=bond.txid,
                        vout=bond.vout,
                        value=0,
                        confirmations=0,
                        block_time=0,
                        valid=False,
                        error="UTXO not found or spent",
                    )
                )
                continue

            confs = gettxout_result.get("confirmations", 0)
            if confs <= 0:
                value = btc_to_sats(gettxout_result.get("value", 0))
                results.append(
                    BondVerificationResult(
                        txid=bond.txid,
                        vout=bond.vout,
                        value=value,
                        confirmations=0,
                        block_time=0,
                        valid=False,
                        error="UTXO unconfirmed",
                    )
                )
                continue

            value = btc_to_sats(gettxout_result.get("value", 0))
            conf_height = gettxout_result.get("_conf_height", 0)
            block_time = height_to_time.get(conf_height, 0)

            results.append(
                BondVerificationResult(
                    txid=bond.txid,
                    vout=bond.vout,
                    value=value,
                    confirmations=confs,
                    block_time=block_time,
                    valid=True,
                )
            )

        logger.debug(
            "Verified %d bonds: %d valid, %d invalid",
            len(bonds),
            sum(1 for r in results if r.valid),
            sum(1 for r in results if not r.valid),
        )
        return results

    def can_provide_neutrino_metadata(self) -> bool:
        """
        Bitcoin Core can provide Neutrino-compatible metadata.

        Full node can access scriptpubkey and blockheight for all UTXOs,
        allowing Neutrino takers to use our makers.

        Returns:
            True - Bitcoin Core always provides extended UTXO metadata
        """
        return True

    async def close(self) -> None:
        """Close backend connections and reset clients so the backend can be reused."""
        await self.client.aclose()
        await self._scan_client.aclose()
        # Re-create fresh clients so this instance is usable again if the
        # wallet service is restarted (e.g. maker stop → start in jmwalletd).
        self.client = httpx.AsyncClient(
            timeout=DEFAULT_RPC_TIMEOUT, auth=(self.rpc_user, self.rpc_password)
        )
        self._scan_client = httpx.AsyncClient(
            timeout=self.scan_timeout, auth=(self.rpc_user, self.rpc_password)
        )
Attributes
client = httpx.AsyncClient(timeout=DEFAULT_RPC_TIMEOUT, auth=(rpc_user, rpc_password)) instance-attribute
rpc_password = rpc_password instance-attribute
rpc_url = rpc_url.rstrip('/') instance-attribute
rpc_user = rpc_user instance-attribute
scan_timeout = scan_timeout instance-attribute
Functions
__init__(rpc_url: str = 'http://127.0.0.1:18443', rpc_user: str = 'rpcuser', rpc_password: str = 'rpcpassword', scan_timeout: float = SCAN_RPC_TIMEOUT)
Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(
    self,
    rpc_url: str = "http://127.0.0.1:18443",
    rpc_user: str = "rpcuser",
    rpc_password: str = "rpcpassword",
    scan_timeout: float = SCAN_RPC_TIMEOUT,
):
    self.rpc_url = rpc_url.rstrip("/")
    self.rpc_user = rpc_user
    self.rpc_password = rpc_password
    self.scan_timeout = scan_timeout
    # Client for regular RPC calls
    self.client = httpx.AsyncClient(timeout=DEFAULT_RPC_TIMEOUT, auth=(rpc_user, rpc_password))
    # Separate client for long-running scans
    self._scan_client = httpx.AsyncClient(timeout=scan_timeout, auth=(rpc_user, rpc_password))
    self._request_id = 0
broadcast_transaction(tx_hex: str) -> str async
Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
398
399
400
401
402
403
404
405
406
async def broadcast_transaction(self, tx_hex: str) -> str:
    try:
        txid = await self._rpc_call("sendrawtransaction", [tx_hex])
        logger.info(f"Broadcast transaction: {txid}")
        return txid

    except Exception as e:
        logger.error(f"Failed to broadcast transaction: {e}")
        raise ValueError(f"Broadcast failed: {e}") from e
can_provide_neutrino_metadata() -> bool

Bitcoin Core can provide Neutrino-compatible metadata.

Full node can access scriptpubkey and blockheight for all UTXOs, allowing Neutrino takers to use our makers.

Returns: True - Bitcoin Core always provides extended UTXO metadata

Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
712
713
714
715
716
717
718
719
720
721
722
def can_provide_neutrino_metadata(self) -> bool:
    """
    Bitcoin Core can provide Neutrino-compatible metadata.

    Full node can access scriptpubkey and blockheight for all UTXOs,
    allowing Neutrino takers to use our makers.

    Returns:
        True - Bitcoin Core always provides extended UTXO metadata
    """
    return True
close() -> None async

Close backend connections and reset clients so the backend can be reused.

Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
724
725
726
727
728
729
730
731
732
733
734
735
async def close(self) -> None:
    """Close backend connections and reset clients so the backend can be reused."""
    await self.client.aclose()
    await self._scan_client.aclose()
    # Re-create fresh clients so this instance is usable again if the
    # wallet service is restarted (e.g. maker stop → start in jmwalletd).
    self.client = httpx.AsyncClient(
        timeout=DEFAULT_RPC_TIMEOUT, auth=(self.rpc_user, self.rpc_password)
    )
    self._scan_client = httpx.AsyncClient(
        timeout=self.scan_timeout, auth=(self.rpc_user, self.rpc_password)
    )
estimate_fee(target_blocks: int) -> float async
Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
async def estimate_fee(self, target_blocks: int) -> float:
    try:
        result = await self._rpc_call("estimatesmartfee", [target_blocks])

        if "feerate" in result:
            btc_per_kb = result["feerate"]
            # Convert BTC/kB to sat/vB (keep precision for sub-sat rates)
            sat_per_vbyte = btc_to_sats(btc_per_kb) / 1000
            logger.debug(f"Estimated fee for {target_blocks} blocks: {sat_per_vbyte} sat/vB")
            return sat_per_vbyte
        else:
            logger.warning("Fee estimation unavailable, using fallback")
            return 1.0

    except Exception as e:
        logger.warning(f"Failed to estimate fee: {e}, using fallback")
        return 1.0
get_address_balance(address: str) -> int async
Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
392
393
394
395
396
async def get_address_balance(self, address: str) -> int:
    utxos = await self.get_utxos([address])
    balance = sum(utxo.value for utxo in utxos)
    logger.debug(f"Balance for {address}: {balance} sats")
    return balance
get_block_hash(block_height: int) -> str async
Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
498
499
500
501
502
503
504
505
506
async def get_block_hash(self, block_height: int) -> str:
    try:
        block_hash = await self._rpc_call("getblockhash", [block_height])
        logger.debug(f"Block hash for height {block_height}: {block_hash}")
        return block_hash

    except Exception as e:
        logger.error(f"Failed to fetch block hash for height {block_height}: {e}")
        raise
get_block_height() -> int async
Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
475
476
477
478
479
480
481
482
483
484
async def get_block_height(self) -> int:
    try:
        info = await self._rpc_call("getblockchaininfo", [])
        height = info.get("blocks", 0)
        logger.debug(f"Current block height: {height}")
        return height

    except Exception as e:
        logger.error(f"Failed to fetch block height: {e}")
        raise
get_block_time(block_height: int) -> int async
Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
486
487
488
489
490
491
492
493
494
495
496
async def get_block_time(self, block_height: int) -> int:
    try:
        block_hash = await self.get_block_hash(block_height)
        block_header = await self._rpc_call("getblockheader", [block_hash])
        timestamp = block_header.get("time", 0)
        logger.debug(f"Block {block_height} timestamp: {timestamp}")
        return timestamp

    except Exception as e:
        logger.error(f"Failed to fetch block time for height {block_height}: {e}")
        raise
get_mempool_min_fee() -> float | None async

Get the minimum fee rate (in sat/vB) for transaction to be accepted into mempool.

Returns: Minimum fee rate in sat/vB, or None if unavailable.

Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
async def get_mempool_min_fee(self) -> float | None:
    """Get the minimum fee rate (in sat/vB) for transaction to be accepted into mempool.

    Returns:
        Minimum fee rate in sat/vB, or None if unavailable.
    """
    try:
        result = await self._rpc_call("getmempoolinfo", [])
        if "mempoolminfee" in result:
            btc_per_kb = result["mempoolminfee"]
            # Convert BTC/kB to sat/vB
            sat_per_vbyte = btc_to_sats(btc_per_kb) / 1000
            logger.debug(f"Mempool min fee: {sat_per_vbyte} sat/vB")
            return sat_per_vbyte
        return None
    except Exception as e:
        logger.debug(f"Failed to get mempool min fee: {e}")
        return None
get_transaction(txid: str) -> Transaction | None async
Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
async def get_transaction(self, txid: str) -> Transaction | None:
    try:
        tx_data = await self._rpc_call("getrawtransaction", [txid, True])

        if not tx_data:
            return None

        confirmations = tx_data.get("confirmations", 0)
        block_height = None
        block_time = None

        if "blockhash" in tx_data:
            block_info = await self._rpc_call("getblockheader", [tx_data["blockhash"]])
            block_height = block_info.get("height")
            block_time = block_info.get("time")

        raw_hex = tx_data.get("hex", "")

        return Transaction(
            txid=txid,
            raw=raw_hex,
            confirmations=confirmations,
            block_height=block_height,
            block_time=block_time,
        )

    except Exception as e:
        logger.debug(f"Failed to fetch transaction {txid}: {e}")
        return None
get_utxo(txid: str, vout: int) -> UTXO | None async

Get a specific UTXO from the blockchain UTXO set using gettxout. Returns None if the UTXO does not exist or has been spent.

If not found in confirmed UTXO set, checks mempool for unconfirmed transactions.

Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
async def get_utxo(self, txid: str, vout: int) -> UTXO | None:
    """Get a specific UTXO from the blockchain UTXO set using gettxout.
    Returns None if the UTXO does not exist or has been spent.

    If not found in confirmed UTXO set, checks mempool for unconfirmed transactions.
    """
    try:
        # gettxout returns None if UTXO doesn't exist or is spent
        # include_mempool=True checks both confirmed and unconfirmed outputs
        result = await self._rpc_call("gettxout", [txid, vout, True])

        if result is None:
            # Not found in UTXO set - check if it's in mempool (unconfirmed)
            logger.debug(
                f"UTXO {txid}:{vout} not found in confirmed UTXO set, checking mempool..."
            )
            try:
                # Get raw transaction from mempool
                tx_data = await self._rpc_call("getrawtransaction", [txid, True])

                if tx_data and "vout" in tx_data:
                    # Check if the vout exists and hasn't been spent
                    if vout < len(tx_data["vout"]):
                        vout_data = tx_data["vout"][vout]
                        value = btc_to_sats(vout_data.get("value", 0))

                        # Extract address from scriptPubKey
                        script_pub_key = vout_data.get("scriptPubKey", {})
                        address = script_pub_key.get("address", "")
                        # For multiple addresses (e.g., multisig), join them
                        if not address and "addresses" in script_pub_key:
                            addresses = script_pub_key.get("addresses", [])
                            address = addresses[0] if addresses else ""
                        scriptpubkey = script_pub_key.get("hex", "")

                        # Unconfirmed transaction has 0 confirmations
                        logger.info(f"Found UTXO {txid}:{vout} in mempool (unconfirmed)")
                        return UTXO(
                            txid=txid,
                            vout=vout,
                            value=value,
                            address=address,
                            confirmations=0,
                            scriptpubkey=scriptpubkey,
                            height=None,
                        )
            except Exception as mempool_err:
                logger.debug(f"UTXO {txid}:{vout} not in mempool either: {mempool_err}")

            logger.debug(f"UTXO {txid}:{vout} not found (spent or doesn't exist)")
            return None

        # Get tip height for confirmation calculation
        tip_height = await self.get_block_height()

        confirmations = result.get("confirmations", 0)
        value = btc_to_sats(result.get("value", 0))  # BTC to sats

        # Extract address from scriptPubKey
        script_pub_key = result.get("scriptPubKey", {})
        address = script_pub_key.get("address", "")
        scriptpubkey = script_pub_key.get("hex", "")

        # Calculate height from confirmations
        height = None
        if confirmations > 0:
            height = tip_height - confirmations + 1

        return UTXO(
            txid=txid,
            vout=vout,
            value=value,
            address=address,
            confirmations=confirmations,
            scriptpubkey=scriptpubkey,
            height=height,
        )

    except Exception as e:
        logger.error(f"Failed to get UTXO {txid}:{vout}: {e}")
        return None
get_utxos(addresses: list[str]) -> list[UTXO] async
Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
async def get_utxos(self, addresses: list[str]) -> list[UTXO]:
    utxos: list[UTXO] = []
    if not addresses:
        return utxos

    # Get tip height once for confirmation calculation
    try:
        tip_height = await self.get_block_height()
    except Exception as e:
        logger.error(f"Failed to get block height for UTXO scan: {e}")
        return utxos

    # Process in batches to avoid huge RPC requests
    batch_size: int = 1000
    for i in range(0, len(addresses), batch_size):
        chunk = addresses[i : i + batch_size]
        descriptors = [f"addr({addr})" for addr in chunk]
        if SENSITIVE_LOGGING:
            logger.debug(f"Scanning addresses batch {i // batch_size + 1}: {chunk}")

        try:
            # Scan for all addresses in this chunk at once (with retry for conflicts)
            result = await self._scantxoutset_with_retry(descriptors)

            if not result or "unspents" not in result:
                continue

            for utxo_data in result["unspents"]:
                confirmations = 0
                if utxo_data.get("height", 0) > 0:
                    confirmations = tip_height - utxo_data["height"] + 1

                # Extract address from descriptor "addr(ADDRESS)#checksum" or "addr(ADDRESS)"
                desc = utxo_data.get("desc", "")
                # Remove checksum if present
                if "#" in desc:
                    desc = desc.split("#")[0]

                address = ""
                if desc.startswith("addr(") and desc.endswith(")"):
                    address = desc[5:-1]
                else:
                    # Only log warning if we really can't parse it (and it's not empty)
                    if desc:
                        logger.warning(f"Failed to parse address from descriptor: '{desc}'")

                utxo = UTXO(
                    txid=utxo_data["txid"],
                    vout=utxo_data["vout"],
                    value=btc_to_sats(utxo_data["amount"]),
                    address=address,
                    confirmations=confirmations,
                    scriptpubkey=utxo_data.get("scriptPubKey", ""),
                    height=utxo_data.get("height"),
                )
                utxos.append(utxo)

            logger.debug(
                f"Scanned {len(chunk)} addresses, found {len(result['unspents'])} UTXOs"
            )

        except Exception as e:
            logger.warning(f"Failed to scan UTXOs for batch starting {chunk[0]}: {e}")
            continue

    return utxos
scan_descriptors(descriptors: Sequence[str | dict[str, Any]]) -> dict[str, Any] | None async

Scan the UTXO set using output descriptors.

This is much more efficient than scanning individual addresses, especially for HD wallets where you can use xpub descriptors with ranges to scan thousands of addresses in a single UTXO set pass.

Example descriptors: - "addr(bc1q...)" - single address - "wpkh(xpub.../0/)" - HD wallet external addresses (default range 0-1000) - {"desc": "wpkh(xpub.../0/)", "range": [0, 999]} - explicit range

Args: descriptors: List of output descriptors (strings or dicts with range)

Returns: Raw scan result dict from Bitcoin Core, or None on failure. Result includes: - success: bool - txouts: number of UTXOs scanned - height: current block height - unspents: list of found UTXOs with txid, vout, scriptPubKey, desc (matched descriptor), amount, height - total_amount: sum of all found UTXOs

Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
async def scan_descriptors(
    self, descriptors: Sequence[str | dict[str, Any]]
) -> dict[str, Any] | None:
    """
    Scan the UTXO set using output descriptors.

    This is much more efficient than scanning individual addresses,
    especially for HD wallets where you can use xpub descriptors with
    ranges to scan thousands of addresses in a single UTXO set pass.

    Example descriptors:
        - "addr(bc1q...)" - single address
        - "wpkh(xpub.../0/*)" - HD wallet external addresses (default range 0-1000)
        - {"desc": "wpkh(xpub.../0/*)", "range": [0, 999]} - explicit range

    Args:
        descriptors: List of output descriptors (strings or dicts with range)

    Returns:
        Raw scan result dict from Bitcoin Core, or None on failure.
        Result includes:
            - success: bool
            - txouts: number of UTXOs scanned
            - height: current block height
            - unspents: list of found UTXOs with txid, vout, scriptPubKey,
                        desc (matched descriptor), amount, height
            - total_amount: sum of all found UTXOs
    """
    if not descriptors:
        return {"success": True, "unspents": [], "total_amount": 0}

    logger.info(f"Starting descriptor scan with {len(descriptors)} descriptor(s)...")
    result = await self._scantxoutset_with_retry(descriptors)

    if result:
        unspent_count = len(result.get("unspents", []))
        total = result.get("total_amount", 0)
        logger.info(
            f"Descriptor scan complete: found {unspent_count} UTXOs, total {total:.8f} BTC"
        )
    else:
        logger.warning("Descriptor scan failed or returned no results")

    return result
verify_bonds(bonds: list[BondVerificationRequest]) -> list[BondVerificationResult] async

Verify fidelity bond UTXOs using batched JSON-RPC calls.

Uses _rpc_batch() to verify all bonds in just 2-3 HTTP requests: 1. Batch gettxout for all bonds (1 request) 2. getblockchaininfo for current height (1 request, concurrent with #1) 3. Batch getblockheader for unique block hashes from results (1 request)

For 100 bonds this is ~3 HTTP round-trips instead of ~200 sequential ones.

Source code in jmwallet/src/jmwallet/backends/bitcoin_core.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
async def verify_bonds(
    self,
    bonds: list[BondVerificationRequest],
) -> list[BondVerificationResult]:
    """Verify fidelity bond UTXOs using batched JSON-RPC calls.

    Uses ``_rpc_batch()`` to verify all bonds in just 2-3 HTTP requests:
    1. Batch ``gettxout`` for all bonds (1 request)
    2. ``getblockchaininfo`` for current height (1 request, concurrent with #1)
    3. Batch ``getblockheader`` for unique block hashes from results (1 request)

    For 100 bonds this is ~3 HTTP round-trips instead of ~200 sequential ones.
    """
    if not bonds:
        return []

    # Step 1: Batch gettxout + getblockchaininfo concurrently
    gettxout_requests: list[tuple[str, list]] = [
        ("gettxout", [b.txid, b.vout, True]) for b in bonds
    ]

    gettxout_task = self._rpc_batch(gettxout_requests)
    height_task = self.get_block_height()
    gettxout_results, current_height = await asyncio.gather(gettxout_task, height_task)

    # Step 2: Collect unique block hashes that need timestamp lookups
    for result in gettxout_results:
        if result is not None and result.get("confirmations", 0) > 0:
            # gettxout returns bestblock but we need the block at confirmation height
            # confirmations = tip - conf_height + 1, so conf_height = tip - confs + 1
            confs = result["confirmations"]
            conf_height = current_height - confs + 1
            # We'll need the block hash for this height; collect heights first
            result["_conf_height"] = conf_height

    # Get block hashes for all unique confirmation heights
    unique_conf_heights: set[int] = set()
    for result in gettxout_results:
        if result is not None and "_conf_height" in result:
            unique_conf_heights.add(result["_conf_height"])

    # Batch getblockhash for unique heights
    height_to_time: dict[int, int] = {}
    if unique_conf_heights:
        sorted_heights = sorted(unique_conf_heights)
        hash_requests: list[tuple[str, list]] = [("getblockhash", [h]) for h in sorted_heights]
        hash_results = await self._rpc_batch(hash_requests)

        # Now batch getblockheader for the hashes
        header_requests: list[tuple[str, list]] = []
        height_order: list[int] = []
        for i, h in enumerate(sorted_heights):
            block_hash = hash_results[i]
            if block_hash is not None:
                header_requests.append(("getblockheader", [block_hash]))
                height_order.append(h)

        if header_requests:
            header_results = await self._rpc_batch(header_requests)
            for i, h in enumerate(height_order):
                header = header_results[i]
                if header is not None:
                    height_to_time[h] = header.get("time", 0)

    # Step 3: Build results
    results: list[BondVerificationResult] = []
    for i, bond in enumerate(bonds):
        gettxout_result = gettxout_results[i]

        if gettxout_result is None:
            results.append(
                BondVerificationResult(
                    txid=bond.txid,
                    vout=bond.vout,
                    value=0,
                    confirmations=0,
                    block_time=0,
                    valid=False,
                    error="UTXO not found or spent",
                )
            )
            continue

        confs = gettxout_result.get("confirmations", 0)
        if confs <= 0:
            value = btc_to_sats(gettxout_result.get("value", 0))
            results.append(
                BondVerificationResult(
                    txid=bond.txid,
                    vout=bond.vout,
                    value=value,
                    confirmations=0,
                    block_time=0,
                    valid=False,
                    error="UTXO unconfirmed",
                )
            )
            continue

        value = btc_to_sats(gettxout_result.get("value", 0))
        conf_height = gettxout_result.get("_conf_height", 0)
        block_time = height_to_time.get(conf_height, 0)

        results.append(
            BondVerificationResult(
                txid=bond.txid,
                vout=bond.vout,
                value=value,
                confirmations=confs,
                block_time=block_time,
                valid=True,
            )
        )

    logger.debug(
        "Verified %d bonds: %d valid, %d invalid",
        len(bonds),
        sum(1 for r in results if r.valid),
        sum(1 for r in results if not r.valid),
    )
    return results

Functions