28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372 | class TakerMonitoringMixin:
"""Mixin class providing background monitoring tasks for the Taker.
Requires the following attributes on the host class:
- self.running: bool
- self.backend: BlockchainBackend
- self.wallet: WalletService
- self.config: TakerConfig
- self.directory_client: MultiDirectoryClient
"""
# Type hints for attributes provided by the host class
running: bool
backend: BlockchainBackend
wallet: WalletService
config: TakerConfig
directory_client: Any # MultiDirectoryClient
async def _monitor_pending_transactions(self) -> None:
"""
Background task to monitor pending transactions and update their status.
Checks pending transactions every 60 seconds and updates their confirmation
status in the history file. Transactions are marked as successful once they
receive their first confirmation.
Neutrino-specific behavior:
- Neutrino cannot fetch arbitrary transactions by txid (get_transaction returns None)
- Instead, we use verify_tx_output() with the destination address hint
- This uses compact block filters to check if the output exists in confirmed blocks
- For Neutrino, we must wait for confirmation before we can verify the transaction
"""
logger.info("Starting pending transaction monitor...")
check_interval = 60.0 # Check every 60 seconds
has_mempool = self.backend.has_mempool_access()
if not has_mempool:
logger.info(
"Backend has no mempool access (Neutrino). "
"Pending transactions will be verified via block confirmation only."
)
while self.running:
try:
await asyncio.sleep(check_interval)
if not self.running:
break
pending = get_pending_transactions(data_dir=self.config.data_dir)
if not pending:
continue
logger.debug(f"Checking {len(pending)} pending transaction(s)...")
for entry in pending:
if not entry.txid:
continue
try:
if has_mempool:
# Full node / Mempool API: can get transaction directly
await self._check_pending_with_mempool(entry)
else:
# Neutrino: must use address-based verification
await self._check_pending_without_mempool(entry)
except Exception as e:
logger.debug(f"Error checking transaction {entry.txid[:16]}...: {e}")
except asyncio.CancelledError:
logger.info("Pending transaction monitor cancelled")
break
except Exception as e:
logger.error(f"Error in pending transaction monitor: {e}")
logger.info("Pending transaction monitor stopped")
async def _check_pending_with_mempool(self, entry: TransactionHistoryEntry) -> None:
"""Check pending transaction status using get_transaction (requires mempool access)."""
tx_info = await self.backend.get_transaction(entry.txid)
if tx_info is None:
# Transaction not found - might have been rejected/replaced
from datetime import datetime
timestamp = datetime.fromisoformat(entry.timestamp)
age_hours = (datetime.now() - timestamp).total_seconds() / 3600
if age_hours > 24:
logger.warning(
f"Transaction {entry.txid[:16]}... not found after "
f"{age_hours:.1f} hours, may have been rejected"
)
return
confirmations = tx_info.confirmations
if confirmations > 0:
# Update history with confirmation
update_transaction_confirmation(
txid=entry.txid,
confirmations=confirmations,
data_dir=self.config.data_dir,
)
logger.info(
f"CoinJoin {entry.txid[:16]}... confirmed! "
f"({confirmations} confirmation{'s' if confirmations != 1 else ''})"
)
async def _check_pending_without_mempool(self, entry: TransactionHistoryEntry) -> None:
"""Check pending transaction status without mempool access (Neutrino).
Uses verify_tx_output() with the destination address to check if the
CoinJoin output has been confirmed in a block. This works because
Neutrino compact block filters can match on addresses.
Note: This cannot detect unconfirmed transactions, so we must wait
for block confirmation. The transaction may be in mempool but we
won't know until it's mined.
"""
from datetime import datetime
# Need destination address for Neutrino verification
if not entry.destination_address:
logger.debug(
f"Transaction {entry.txid[:16]}... has no destination_address, "
"cannot verify with Neutrino"
)
return
# Get current block height for efficient scanning
try:
current_height = await self.backend.get_block_height()
except Exception:
current_height = None
# Try to verify the CJ output exists in a confirmed block
# We use vout=0 as a guess for the CJ output position, but this may not be accurate
# A more robust solution would store the vout in history
# For now, we rely on the fact that if the address has a UTXO with this txid,
# the transaction is confirmed
verified = await self.backend.verify_tx_output(
txid=entry.txid,
vout=0, # CJ outputs are typically first, but this is a guess
address=entry.destination_address,
start_height=current_height,
)
if verified:
# Transaction output found in a confirmed block
# We don't know exact confirmation count with Neutrino, assume 1
update_transaction_confirmation(
txid=entry.txid,
confirmations=1, # We know it's confirmed but not exact count
data_dir=self.config.data_dir,
)
logger.info(
f"CoinJoin {entry.txid[:16]}... confirmed! (verified via Neutrino block filters)"
)
else:
# Not found yet - could be in mempool or not broadcast
timestamp = datetime.fromisoformat(entry.timestamp)
age_hours = (datetime.now() - timestamp).total_seconds() / 3600
# For Neutrino, be more patient before warning since we can't see mempool
# Only log at WARNING level if it's been a long time, otherwise DEBUG to reduce noise
if age_hours > 10: # 10 hour timeout for Neutrino
logger.warning(
f"Transaction {entry.txid[:16]}... not confirmed after "
f"{age_hours:.1f} hours. May still be in mempool (not visible to Neutrino) "
"or may have been rejected/never broadcast."
)
elif age_hours > 1: # Log at debug for txs older than 1 hour
logger.debug(
f"Transaction {entry.txid[:16]}... not confirmed after "
f"{age_hours:.1f} hours (may be in mempool, waiting for confirmation)"
)
async def _update_pending_transaction_now(
self, txid: str, destination_address: str | None = None
) -> None:
"""
Immediately check and update a pending transaction's status.
This is called right after recording a new transaction in history to check
if it's already visible in mempool (for full nodes) or confirmed (for Neutrino).
This is important for one-shot coinjoin CLI calls that exit immediately after
broadcast without waiting for the background monitor.
Args:
txid: Transaction ID to check
destination_address: Optional destination address (needed for Neutrino)
"""
try:
has_mempool = self.backend.has_mempool_access()
if has_mempool:
# Full node: can check mempool directly.
# Wait a few seconds before the first check — the tx cannot be in the
# mempool immediately after broadcast due to network propagation delay.
await asyncio.sleep(5)
tx_info = await self.backend.get_transaction(txid)
if tx_info is not None:
confirmations = tx_info.confirmations
if confirmations >= 0:
# Transaction is in mempool (0 confs) or confirmed (>0 confs)
# Mark as success even with 0 confs (mempool visible)
update_transaction_confirmation(
txid=txid,
confirmations=max(confirmations, 1),
data_dir=self.config.data_dir,
)
if confirmations > 0:
logger.info(
f"CoinJoin {txid[:16]}... already confirmed "
f"({confirmations} confirmation{'s' if confirmations != 1 else ''})"
)
else:
logger.info(f"CoinJoin {txid[:16]}... visible in mempool")
else:
# Neutrino: can only check confirmed blocks, not mempool
# For Neutrino, we need to wait for block confirmation
# This will be handled by the background monitor on next startup
if destination_address:
try:
current_height = await self.backend.get_block_height()
except Exception:
current_height = None
verified = await self.backend.verify_tx_output(
txid=txid,
vout=0, # CJ outputs are typically first
address=destination_address,
start_height=current_height,
)
if verified:
update_transaction_confirmation(
txid=txid,
confirmations=1,
data_dir=self.config.data_dir,
)
logger.info(f"CoinJoin {txid[:16]}... confirmed via Neutrino block filters")
else:
logger.debug(
f"CoinJoin {txid[:16]}... not yet confirmed "
"(may be in mempool, Neutrino will verify on next block)"
)
except Exception as e:
logger.debug(f"Could not update transaction status immediately: {e}")
async def _periodic_rescan(self) -> None:
"""Background task to periodically rescan wallet.
This runs every `rescan_interval_sec` (default: 10 minutes) to:
1. Detect confirmed transactions
2. Update wallet balance after external transactions
3. Update pending transaction status
This is useful when running schedule/tumbler mode to ensure wallet
state is fresh between CoinJoins.
"""
logger.info(
f"Starting periodic rescan task (interval: {self.config.rescan_interval_sec}s)..."
)
while self.running:
try:
await asyncio.sleep(self.config.rescan_interval_sec)
if not self.running:
break
logger.info("Periodic wallet rescan starting...")
# Use fast descriptor wallet sync if available
from jmwallet.backends.descriptor_wallet import DescriptorWalletBackend
if isinstance(self.backend, DescriptorWalletBackend):
await self.wallet.sync_with_descriptor_wallet()
else:
await self.wallet.sync_all()
total_balance = await self.wallet.get_total_balance()
logger.info(f"Wallet re-synced. Total balance: {total_balance:,} sats")
except asyncio.CancelledError:
logger.info("Periodic rescan task cancelled")
break
except Exception as e:
logger.error(f"Error in periodic rescan: {e}")
logger.info("Periodic rescan task stopped")
async def _periodic_directory_connection_status(self) -> None:
"""Background task to periodically log directory connection status.
This runs every 10 minutes to provide visibility into orderbook
connectivity. Shows:
- Total directory servers configured
- Currently connected servers
- Disconnected servers (if any)
"""
# First log after 5 minutes (give time for initial connection)
await asyncio.sleep(300)
while self.running:
try:
total_servers = len(self.directory_client.directory_servers)
connected_servers = list(self.directory_client.clients.keys())
connected_count = len(connected_servers)
disconnected_servers = [
server
for server in self.directory_client.directory_servers
if server not in connected_servers
]
if disconnected_servers:
disconnected_str = ", ".join(disconnected_servers[:5])
if len(disconnected_servers) > 5:
disconnected_str += f", ... and {len(disconnected_servers) - 5} more"
logger.warning(
f"Directory connection status: {connected_count}/{total_servers} "
f"connected. Disconnected: [{disconnected_str}]"
)
else:
logger.info(
f"Directory connection status: {connected_count}/{total_servers} connected "
f"[{', '.join(connected_servers)}]"
)
# Log again in 10 minutes
await asyncio.sleep(600)
except asyncio.CancelledError:
logger.info("Directory connection status task cancelled")
break
except Exception as e:
logger.error(f"Error in directory connection status task: {e}")
await asyncio.sleep(600)
logger.info("Directory connection status task stopped")
|