Skip to content

taker.monitoring

taker.monitoring

Taker monitoring mixin for background tasks.

Provides monitoring capabilities for pending transactions, periodic wallet rescans, and directory connection status reporting. These are background tasks that run concurrently with CoinJoin operations.

Classes

TakerMonitoringMixin

Mixin class providing background monitoring tasks for the Taker.

Requires the following attributes on the host class: - self.running: bool - self.backend: BlockchainBackend - self.wallet: WalletService - self.config: TakerConfig - self.directory_client: MultiDirectoryClient

Source code in taker/src/taker/monitoring.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
class TakerMonitoringMixin:
    """Mixin class providing background monitoring tasks for the Taker.

    Requires the following attributes on the host class:
    - self.running: bool
    - self.backend: BlockchainBackend
    - self.wallet: WalletService
    - self.config: TakerConfig
    - self.directory_client: MultiDirectoryClient
    """

    # Type hints for attributes provided by the host class
    running: bool
    backend: BlockchainBackend
    wallet: WalletService
    config: TakerConfig
    directory_client: Any  # MultiDirectoryClient

    async def _monitor_pending_transactions(self) -> None:
        """
        Background task to monitor pending transactions and update their status.

        Checks pending transactions every 60 seconds and updates their confirmation
        status in the history file. Transactions are marked as successful once they
        receive their first confirmation.

        Neutrino-specific behavior:
        - Neutrino cannot fetch arbitrary transactions by txid (get_transaction returns None)
        - Instead, we use verify_tx_output() with the destination address hint
        - This uses compact block filters to check if the output exists in confirmed blocks
        - For Neutrino, we must wait for confirmation before we can verify the transaction
        """
        logger.info("Starting pending transaction monitor...")
        check_interval = 60.0  # Check every 60 seconds
        has_mempool = self.backend.has_mempool_access()

        if not has_mempool:
            logger.info(
                "Backend has no mempool access (Neutrino). "
                "Pending transactions will be verified via block confirmation only."
            )

        while self.running:
            try:
                await asyncio.sleep(check_interval)

                if not self.running:
                    break

                pending = get_pending_transactions(data_dir=self.config.data_dir)
                if not pending:
                    continue

                logger.debug(f"Checking {len(pending)} pending transaction(s)...")

                for entry in pending:
                    if not entry.txid:
                        continue

                    try:
                        if has_mempool:
                            # Full node / Mempool API: can get transaction directly
                            await self._check_pending_with_mempool(entry)
                        else:
                            # Neutrino: must use address-based verification
                            await self._check_pending_without_mempool(entry)

                    except Exception as e:
                        logger.debug(f"Error checking transaction {entry.txid[:16]}...: {e}")

            except asyncio.CancelledError:
                logger.info("Pending transaction monitor cancelled")
                break
            except Exception as e:
                logger.error(f"Error in pending transaction monitor: {e}")

        logger.info("Pending transaction monitor stopped")

    async def _check_pending_with_mempool(self, entry: TransactionHistoryEntry) -> None:
        """Check pending transaction status using get_transaction (requires mempool access)."""
        tx_info = await self.backend.get_transaction(entry.txid)

        if tx_info is None:
            # Transaction not found - might have been rejected/replaced
            from datetime import datetime

            timestamp = datetime.fromisoformat(entry.timestamp)
            age_hours = (datetime.now() - timestamp).total_seconds() / 3600

            if age_hours > 24:
                logger.warning(
                    f"Transaction {entry.txid[:16]}... not found after "
                    f"{age_hours:.1f} hours, may have been rejected"
                )
            return

        confirmations = tx_info.confirmations

        if confirmations > 0:
            # Update history with confirmation
            update_transaction_confirmation(
                txid=entry.txid,
                confirmations=confirmations,
                data_dir=self.config.data_dir,
            )

            logger.info(
                f"CoinJoin {entry.txid[:16]}... confirmed! "
                f"({confirmations} confirmation{'s' if confirmations != 1 else ''})"
            )

    async def _check_pending_without_mempool(self, entry: TransactionHistoryEntry) -> None:
        """Check pending transaction status without mempool access (Neutrino).

        Uses verify_tx_output() with the destination address to check if the
        CoinJoin output has been confirmed in a block. This works because
        Neutrino compact block filters can match on addresses.

        Note: This cannot detect unconfirmed transactions, so we must wait
        for block confirmation. The transaction may be in mempool but we
        won't know until it's mined.
        """
        from datetime import datetime

        # Need destination address for Neutrino verification
        if not entry.destination_address:
            logger.debug(
                f"Transaction {entry.txid[:16]}... has no destination_address, "
                "cannot verify with Neutrino"
            )
            return

        # Get current block height for efficient scanning
        try:
            current_height = await self.backend.get_block_height()
        except Exception:
            current_height = None

        # Try to verify the CJ output exists in a confirmed block
        # We use vout=0 as a guess for the CJ output position, but this may not be accurate
        # A more robust solution would store the vout in history
        # For now, we rely on the fact that if the address has a UTXO with this txid,
        # the transaction is confirmed
        verified = await self.backend.verify_tx_output(
            txid=entry.txid,
            vout=0,  # CJ outputs are typically first, but this is a guess
            address=entry.destination_address,
            start_height=current_height,
        )

        if verified:
            # Transaction output found in a confirmed block
            # We don't know exact confirmation count with Neutrino, assume 1
            update_transaction_confirmation(
                txid=entry.txid,
                confirmations=1,  # We know it's confirmed but not exact count
                data_dir=self.config.data_dir,
            )

            logger.info(
                f"CoinJoin {entry.txid[:16]}... confirmed! (verified via Neutrino block filters)"
            )
        else:
            # Not found yet - could be in mempool or not broadcast
            timestamp = datetime.fromisoformat(entry.timestamp)
            age_hours = (datetime.now() - timestamp).total_seconds() / 3600

            # For Neutrino, be more patient before warning since we can't see mempool
            # Only log at WARNING level if it's been a long time, otherwise DEBUG to reduce noise
            if age_hours > 10:  # 10 hour timeout for Neutrino
                logger.warning(
                    f"Transaction {entry.txid[:16]}... not confirmed after "
                    f"{age_hours:.1f} hours. May still be in mempool (not visible to Neutrino) "
                    "or may have been rejected/never broadcast."
                )
            elif age_hours > 1:  # Log at debug for txs older than 1 hour
                logger.debug(
                    f"Transaction {entry.txid[:16]}... not confirmed after "
                    f"{age_hours:.1f} hours (may be in mempool, waiting for confirmation)"
                )

    async def _update_pending_transaction_now(
        self, txid: str, destination_address: str | None = None
    ) -> None:
        """
        Immediately check and update a pending transaction's status.

        This is called right after recording a new transaction in history to check
        if it's already visible in mempool (for full nodes) or confirmed (for Neutrino).
        This is important for one-shot coinjoin CLI calls that exit immediately after
        broadcast without waiting for the background monitor.

        Args:
            txid: Transaction ID to check
            destination_address: Optional destination address (needed for Neutrino)
        """
        try:
            has_mempool = self.backend.has_mempool_access()

            if has_mempool:
                # Full node: can check mempool directly.
                # Wait a few seconds before the first check — the tx cannot be in the
                # mempool immediately after broadcast due to network propagation delay.
                await asyncio.sleep(5)
                tx_info = await self.backend.get_transaction(txid)
                if tx_info is not None:
                    confirmations = tx_info.confirmations
                    if confirmations >= 0:
                        # Transaction is in mempool (0 confs) or confirmed (>0 confs)
                        # Mark as success even with 0 confs (mempool visible)
                        update_transaction_confirmation(
                            txid=txid,
                            confirmations=max(confirmations, 1),
                            data_dir=self.config.data_dir,
                        )
                        if confirmations > 0:
                            logger.info(
                                f"CoinJoin {txid[:16]}... already confirmed "
                                f"({confirmations} confirmation{'s' if confirmations != 1 else ''})"
                            )
                        else:
                            logger.info(f"CoinJoin {txid[:16]}... visible in mempool")
            else:
                # Neutrino: can only check confirmed blocks, not mempool
                # For Neutrino, we need to wait for block confirmation
                # This will be handled by the background monitor on next startup
                if destination_address:
                    try:
                        current_height = await self.backend.get_block_height()
                    except Exception:
                        current_height = None

                    verified = await self.backend.verify_tx_output(
                        txid=txid,
                        vout=0,  # CJ outputs are typically first
                        address=destination_address,
                        start_height=current_height,
                    )

                    if verified:
                        update_transaction_confirmation(
                            txid=txid,
                            confirmations=1,
                            data_dir=self.config.data_dir,
                        )
                        logger.info(f"CoinJoin {txid[:16]}... confirmed via Neutrino block filters")
                    else:
                        logger.debug(
                            f"CoinJoin {txid[:16]}... not yet confirmed "
                            "(may be in mempool, Neutrino will verify on next block)"
                        )
        except Exception as e:
            logger.debug(f"Could not update transaction status immediately: {e}")

    async def _periodic_rescan(self) -> None:
        """Background task to periodically rescan wallet.

        This runs every `rescan_interval_sec` (default: 10 minutes) to:
        1. Detect confirmed transactions
        2. Update wallet balance after external transactions
        3. Update pending transaction status

        This is useful when running schedule/tumbler mode to ensure wallet
        state is fresh between CoinJoins.
        """
        logger.info(
            f"Starting periodic rescan task (interval: {self.config.rescan_interval_sec}s)..."
        )

        while self.running:
            try:
                await asyncio.sleep(self.config.rescan_interval_sec)

                if not self.running:
                    break

                logger.info("Periodic wallet rescan starting...")

                # Use fast descriptor wallet sync if available
                from jmwallet.backends.descriptor_wallet import DescriptorWalletBackend

                if isinstance(self.backend, DescriptorWalletBackend):
                    await self.wallet.sync_with_descriptor_wallet()
                else:
                    await self.wallet.sync_all()

                total_balance = await self.wallet.get_total_balance()
                logger.info(f"Wallet re-synced. Total balance: {total_balance:,} sats")

            except asyncio.CancelledError:
                logger.info("Periodic rescan task cancelled")
                break
            except Exception as e:
                logger.error(f"Error in periodic rescan: {e}")

        logger.info("Periodic rescan task stopped")

    async def _periodic_directory_connection_status(self) -> None:
        """Background task to periodically log directory connection status.

        This runs every 10 minutes to provide visibility into orderbook
        connectivity. Shows:
        - Total directory servers configured
        - Currently connected servers
        - Disconnected servers (if any)
        """
        # First log after 5 minutes (give time for initial connection)
        await asyncio.sleep(300)

        while self.running:
            try:
                total_servers = len(self.directory_client.directory_servers)
                connected_servers = list(self.directory_client.clients.keys())
                connected_count = len(connected_servers)
                disconnected_servers = [
                    server
                    for server in self.directory_client.directory_servers
                    if server not in connected_servers
                ]

                if disconnected_servers:
                    disconnected_str = ", ".join(disconnected_servers[:5])
                    if len(disconnected_servers) > 5:
                        disconnected_str += f", ... and {len(disconnected_servers) - 5} more"
                    logger.warning(
                        f"Directory connection status: {connected_count}/{total_servers} "
                        f"connected. Disconnected: [{disconnected_str}]"
                    )
                else:
                    logger.info(
                        f"Directory connection status: {connected_count}/{total_servers} connected "
                        f"[{', '.join(connected_servers)}]"
                    )

                # Log again in 10 minutes
                await asyncio.sleep(600)

            except asyncio.CancelledError:
                logger.info("Directory connection status task cancelled")
                break
            except Exception as e:
                logger.error(f"Error in directory connection status task: {e}")
                await asyncio.sleep(600)

        logger.info("Directory connection status task stopped")
Attributes
backend: BlockchainBackend instance-attribute
config: TakerConfig instance-attribute
directory_client: Any instance-attribute
running: bool instance-attribute
wallet: WalletService instance-attribute

Functions