84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177 | async def run_watcher(log_level: str | None = None) -> None:
settings = get_settings()
# Use CLI log level if provided, otherwise fall back to settings
effective_log_level = log_level if log_level else settings.logging.level
setup_logging(effective_log_level)
network = settings.network_config.network
watcher_settings = settings.orderbook_watcher
data_dir = settings.get_data_dir()
# Generate a nick for the orderbook watcher
nick_identity = NickIdentity(JM_VERSION)
watcher_nick = nick_identity.nick
logger.info("=" * 80)
logger.info("Starting JoinMarket Orderbook Watcher")
logger.info(f"Network: {network.value}")
logger.info(f"Nick: {watcher_nick}")
logger.info(f"HTTP server: {watcher_settings.http_host}:{watcher_settings.http_port}")
logger.info(f"Update interval: {watcher_settings.update_interval}s")
logger.info(f"Mempool API: {watcher_settings.mempool_api_url}")
# Directory nodes from env var (DIRECTORY_NODES) or config
directory_nodes_str = os.environ.get("DIRECTORY_NODES", "")
if not directory_nodes_str:
# Fall back to directory servers from network config
if settings.network_config.directory_servers:
directory_nodes_str = ",".join(settings.network_config.directory_servers)
else:
# Use default directory servers
directory_nodes_str = ",".join(settings.get_directory_servers())
directory_nodes = get_directory_nodes(directory_nodes_str)
if not directory_nodes:
logger.error("No directory nodes configured. Set DIRECTORY_NODES environment variable.")
logger.error("Example: DIRECTORY_NODES=node1.onion:5222,node2.onion:5222")
sys.exit(1)
logger.info(f"Directory nodes: {len(directory_nodes)}")
for node in directory_nodes:
logger.info(f" - {node[0]}:{node[1]}")
logger.info("=" * 80)
# Write nick state file for external tracking
write_nick_state(data_dir, "orderbook", watcher_nick)
logger.info(f"Nick state written to {data_dir}/state/orderbook.nick")
# Create blockchain backend for bond verification if configured
blockchain_backend = _create_blockchain_backend(settings)
aggregator = OrderbookAggregator(
directory_nodes=directory_nodes,
network=network.value,
socks_host=settings.tor.socks_host,
socks_port=settings.tor.socks_port,
timeout=watcher_settings.connection_timeout,
mempool_api_url=watcher_settings.mempool_api_url,
max_message_size=watcher_settings.max_message_size,
uptime_grace_period=watcher_settings.uptime_grace_period,
stream_isolation=settings.tor.stream_isolation,
blockchain_backend=blockchain_backend,
)
server = OrderbookServer(watcher_settings, aggregator)
loop = asyncio.get_running_loop()
shutdown_event = asyncio.Event()
def shutdown_handler() -> None:
logger.info("Received shutdown signal")
shutdown_event.set()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, shutdown_handler)
try:
# Send startup notification immediately (including nick)
notifier = get_notifier(settings, component_name="Orderbook")
await notifier.notify_startup(
component="Orderbook Watcher",
network=network.value,
nick=watcher_nick,
)
await server.start()
await shutdown_event.wait()
except asyncio.CancelledError:
logger.info("Watcher cancelled")
except Exception as e:
logger.error(f"Watcher error: {e}")
raise
finally:
# Clean up nick state file on shutdown
remove_nick_state(data_dir, "orderbook")
await server.stop()
|