Skip to content

jmcore.commitment_blacklist

jmcore.commitment_blacklist

PoDLE commitment blacklist for preventing commitment reuse.

When a PoDLE commitment is used in a CoinJoin (whether successful or failed), it should be blacklisted to prevent reuse. This module provides persistence and checking of the commitment blacklist.

The blacklist is shared across the JoinMarket network via !hp2 messages.

Attributes

COMMITMENT_HEX_LENGTH = 64 module-attribute

Classes

CommitmentBlacklist

Thread-safe commitment blacklist with file persistence.

The blacklist is stored as a simple text file with one commitment per line. This matches the reference implementation's format for compatibility.

Source code in jmcore/src/jmcore/commitment_blacklist.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class CommitmentBlacklist:
    """
    Thread-safe commitment blacklist with file persistence.

    The blacklist is stored as a simple text file with one commitment per line.
    This matches the reference implementation's format for compatibility.
    """

    def __init__(self, blacklist_path: Path | None = None, data_dir: Path | None = None):
        """
        Initialize the commitment blacklist.

        Args:
            blacklist_path: Path to the blacklist file. If None, uses data_dir.
            data_dir: Data directory for JoinMarket (defaults to get_default_data_dir()).
                     Only used if blacklist_path is None.
        """
        if blacklist_path is None:
            blacklist_path = get_commitment_blacklist_path(data_dir)
        self.blacklist_path = blacklist_path

        # In-memory cache of blacklisted commitments
        self._commitments: set[str] = set()
        self._lock = threading.Lock()

        # Load existing blacklist from disk
        self._load_from_disk()

    def _load_from_disk(self) -> None:
        """Load blacklist from disk into memory.

        Commitments are normalized to lowercase on load to ensure consistent
        case-insensitive matching. This is important because hex-encoded
        commitments are case-insensitive by nature ('ABCD' == 'abcd'), and
        files written by the reference implementation may contain mixed-case
        entries.
        """
        if not self.blacklist_path.exists():
            logger.debug(f"No existing blacklist at {self.blacklist_path}")
            return

        try:
            with open(self.blacklist_path, encoding="ascii") as f:
                for line in f:
                    commitment = line.strip().lower()
                    if commitment:
                        self._commitments.add(commitment)
            logger.info(f"Loaded {len(self._commitments)} commitments from blacklist")
        except Exception as e:
            logger.error(f"Failed to load blacklist from {self.blacklist_path}: {e}")

    def _save_to_disk(self) -> None:
        """Save in-memory blacklist to disk."""
        try:
            # Ensure parent directory exists
            self.blacklist_path.parent.mkdir(parents=True, exist_ok=True)

            with open(self.blacklist_path, "w", encoding="ascii") as f:
                for commitment in sorted(self._commitments):
                    f.write(commitment + "\n")
                f.flush()
            logger.debug(f"Saved {len(self._commitments)} commitments to blacklist")
        except Exception as e:
            logger.error(f"Failed to save blacklist to {self.blacklist_path}: {e}")

    def is_blacklisted(self, commitment: str) -> bool:
        """
        Check if a commitment is blacklisted.

        Args:
            commitment: The commitment hash (hex string, typically 64 chars)

        Returns:
            True if the commitment is blacklisted, False otherwise
        """
        # Normalize commitment (strip whitespace, lowercase)
        commitment = commitment.strip().lower()

        with self._lock:
            return commitment in self._commitments

    def add(self, commitment: str, persist: bool = True) -> bool:
        """
        Add a commitment to the blacklist.

        Args:
            commitment: The commitment hash (hex string)
            persist: If True, save to disk immediately

        Returns:
            True if the commitment was newly added, False if already present
        """
        # Normalize commitment
        commitment = commitment.strip().lower()

        if not commitment:
            logger.warning("Attempted to add empty commitment to blacklist")
            return False

        with self._lock:
            if commitment in self._commitments:
                return False

            self._commitments.add(commitment)
            logger.debug(f"Added commitment to blacklist: {commitment[:16]}...")

            if persist:
                self._save_to_disk()

            return True

    def check_and_add(self, commitment: str, persist: bool = True) -> bool:
        """
        Check if a commitment is blacklisted, and if not, add it.

        This is the primary method for handling commitments during CoinJoin.
        It atomically checks and adds in a single operation.

        Args:
            commitment: The commitment hash (hex string)
            persist: If True, save to disk immediately after adding

        Returns:
            True if the commitment is NEW (allowed), False if already blacklisted
        """
        # Normalize commitment
        commitment = commitment.strip().lower()

        if not commitment:
            logger.warning("Attempted to check empty commitment")
            return False

        with self._lock:
            if commitment in self._commitments:
                logger.info(f"Commitment already blacklisted: {commitment[:16]}...")
                return False

            self._commitments.add(commitment)
            logger.debug(f"Added commitment to blacklist: {commitment[:16]}...")

            if persist:
                self._save_to_disk()

            return True

    def __len__(self) -> int:
        """Return the number of blacklisted commitments."""
        with self._lock:
            return len(self._commitments)

    def __contains__(self, commitment: str) -> bool:
        """Check if a commitment is blacklisted using 'in' operator."""
        return self.is_blacklisted(commitment)
Attributes
blacklist_path = blacklist_path instance-attribute
Functions
__contains__(commitment: str) -> bool

Check if a commitment is blacklisted using 'in' operator.

Source code in jmcore/src/jmcore/commitment_blacklist.py
205
206
207
def __contains__(self, commitment: str) -> bool:
    """Check if a commitment is blacklisted using 'in' operator."""
    return self.is_blacklisted(commitment)
__init__(blacklist_path: Path | None = None, data_dir: Path | None = None)

Initialize the commitment blacklist.

Args: blacklist_path: Path to the blacklist file. If None, uses data_dir. data_dir: Data directory for JoinMarket (defaults to get_default_data_dir()). Only used if blacklist_path is None.

Source code in jmcore/src/jmcore/commitment_blacklist.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def __init__(self, blacklist_path: Path | None = None, data_dir: Path | None = None):
    """
    Initialize the commitment blacklist.

    Args:
        blacklist_path: Path to the blacklist file. If None, uses data_dir.
        data_dir: Data directory for JoinMarket (defaults to get_default_data_dir()).
                 Only used if blacklist_path is None.
    """
    if blacklist_path is None:
        blacklist_path = get_commitment_blacklist_path(data_dir)
    self.blacklist_path = blacklist_path

    # In-memory cache of blacklisted commitments
    self._commitments: set[str] = set()
    self._lock = threading.Lock()

    # Load existing blacklist from disk
    self._load_from_disk()
__len__() -> int

Return the number of blacklisted commitments.

Source code in jmcore/src/jmcore/commitment_blacklist.py
200
201
202
203
def __len__(self) -> int:
    """Return the number of blacklisted commitments."""
    with self._lock:
        return len(self._commitments)
add(commitment: str, persist: bool = True) -> bool

Add a commitment to the blacklist.

Args: commitment: The commitment hash (hex string) persist: If True, save to disk immediately

Returns: True if the commitment was newly added, False if already present

Source code in jmcore/src/jmcore/commitment_blacklist.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def add(self, commitment: str, persist: bool = True) -> bool:
    """
    Add a commitment to the blacklist.

    Args:
        commitment: The commitment hash (hex string)
        persist: If True, save to disk immediately

    Returns:
        True if the commitment was newly added, False if already present
    """
    # Normalize commitment
    commitment = commitment.strip().lower()

    if not commitment:
        logger.warning("Attempted to add empty commitment to blacklist")
        return False

    with self._lock:
        if commitment in self._commitments:
            return False

        self._commitments.add(commitment)
        logger.debug(f"Added commitment to blacklist: {commitment[:16]}...")

        if persist:
            self._save_to_disk()

        return True
check_and_add(commitment: str, persist: bool = True) -> bool

Check if a commitment is blacklisted, and if not, add it.

This is the primary method for handling commitments during CoinJoin. It atomically checks and adds in a single operation.

Args: commitment: The commitment hash (hex string) persist: If True, save to disk immediately after adding

Returns: True if the commitment is NEW (allowed), False if already blacklisted

Source code in jmcore/src/jmcore/commitment_blacklist.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def check_and_add(self, commitment: str, persist: bool = True) -> bool:
    """
    Check if a commitment is blacklisted, and if not, add it.

    This is the primary method for handling commitments during CoinJoin.
    It atomically checks and adds in a single operation.

    Args:
        commitment: The commitment hash (hex string)
        persist: If True, save to disk immediately after adding

    Returns:
        True if the commitment is NEW (allowed), False if already blacklisted
    """
    # Normalize commitment
    commitment = commitment.strip().lower()

    if not commitment:
        logger.warning("Attempted to check empty commitment")
        return False

    with self._lock:
        if commitment in self._commitments:
            logger.info(f"Commitment already blacklisted: {commitment[:16]}...")
            return False

        self._commitments.add(commitment)
        logger.debug(f"Added commitment to blacklist: {commitment[:16]}...")

        if persist:
            self._save_to_disk()

        return True
is_blacklisted(commitment: str) -> bool

Check if a commitment is blacklisted.

Args: commitment: The commitment hash (hex string, typically 64 chars)

Returns: True if the commitment is blacklisted, False otherwise

Source code in jmcore/src/jmcore/commitment_blacklist.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def is_blacklisted(self, commitment: str) -> bool:
    """
    Check if a commitment is blacklisted.

    Args:
        commitment: The commitment hash (hex string, typically 64 chars)

    Returns:
        True if the commitment is blacklisted, False otherwise
    """
    # Normalize commitment (strip whitespace, lowercase)
    commitment = commitment.strip().lower()

    with self._lock:
        return commitment in self._commitments

Functions

add_commitment(commitment: str, persist: bool = True) -> bool

Add a commitment to the global blacklist.

Convenience function that uses the global blacklist.

Args: commitment: The commitment hash (hex string) persist: If True, save to disk immediately

Returns: True if the commitment was newly added, False if already present

Source code in jmcore/src/jmcore/commitment_blacklist.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def add_commitment(commitment: str, persist: bool = True) -> bool:
    """
    Add a commitment to the global blacklist.

    Convenience function that uses the global blacklist.

    Args:
        commitment: The commitment hash (hex string)
        persist: If True, save to disk immediately

    Returns:
        True if the commitment was newly added, False if already present
    """
    return get_blacklist().add(commitment, persist=persist)

check_and_add_commitment(commitment: str, persist: bool = True) -> bool

Check if a commitment is allowed and add it to the blacklist.

Convenience function that uses the global blacklist. This is the primary function to use during CoinJoin processing.

Args: commitment: The commitment hash (hex string) persist: If True, save to disk immediately after adding

Returns: True if the commitment is NEW (allowed), False if already blacklisted

Source code in jmcore/src/jmcore/commitment_blacklist.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def check_and_add_commitment(commitment: str, persist: bool = True) -> bool:
    """
    Check if a commitment is allowed and add it to the blacklist.

    Convenience function that uses the global blacklist.
    This is the primary function to use during CoinJoin processing.

    Args:
        commitment: The commitment hash (hex string)
        persist: If True, save to disk immediately after adding

    Returns:
        True if the commitment is NEW (allowed), False if already blacklisted
    """
    return get_blacklist().check_and_add(commitment, persist=persist)

check_commitment(commitment: str) -> bool

Check if a commitment is allowed (not blacklisted).

Convenience function that uses the global blacklist.

Args: commitment: The commitment hash (hex string)

Returns: True if the commitment is allowed, False if blacklisted

Source code in jmcore/src/jmcore/commitment_blacklist.py
256
257
258
259
260
261
262
263
264
265
266
267
268
def check_commitment(commitment: str) -> bool:
    """
    Check if a commitment is allowed (not blacklisted).

    Convenience function that uses the global blacklist.

    Args:
        commitment: The commitment hash (hex string)

    Returns:
        True if the commitment is allowed, False if blacklisted
    """
    return not get_blacklist().is_blacklisted(commitment)

get_blacklist(blacklist_path: Path | None = None, data_dir: Path | None = None) -> CommitmentBlacklist

Get the global commitment blacklist instance.

Args: blacklist_path: Path to the blacklist file. Only used on first call to initialize the singleton. data_dir: Data directory for JoinMarket. Only used on first call to initialize the singleton.

Returns: The global CommitmentBlacklist instance

Source code in jmcore/src/jmcore/commitment_blacklist.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def get_blacklist(
    blacklist_path: Path | None = None, data_dir: Path | None = None
) -> CommitmentBlacklist:
    """
    Get the global commitment blacklist instance.

    Args:
        blacklist_path: Path to the blacklist file. Only used on first call
                       to initialize the singleton.
        data_dir: Data directory for JoinMarket. Only used on first call
                 to initialize the singleton.

    Returns:
        The global CommitmentBlacklist instance
    """
    global _global_blacklist

    with _global_blacklist_lock:
        if _global_blacklist is None:
            _global_blacklist = CommitmentBlacklist(blacklist_path, data_dir)
        return _global_blacklist

set_blacklist_path(blacklist_path: Path | None = None, data_dir: Path | None = None) -> None

Set the path for the global blacklist.

Must be called before any blacklist operations. If the blacklist has already been initialized, this will reinitialize it with the new path.

Args: blacklist_path: Explicit path to blacklist file data_dir: Data directory (used if blacklist_path is None)

Source code in jmcore/src/jmcore/commitment_blacklist.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def set_blacklist_path(blacklist_path: Path | None = None, data_dir: Path | None = None) -> None:
    """
    Set the path for the global blacklist.

    Must be called before any blacklist operations. If the blacklist
    has already been initialized, this will reinitialize it with the new path.

    Args:
        blacklist_path: Explicit path to blacklist file
        data_dir: Data directory (used if blacklist_path is None)
    """
    global _global_blacklist

    with _global_blacklist_lock:
        _global_blacklist = CommitmentBlacklist(blacklist_path, data_dir)
        logger.info(f"Set blacklist path to {_global_blacklist.blacklist_path}")

validate_commitment_hex(commitment: str) -> tuple[bool, str]

Validate that a commitment string is well-formed hex of the correct length.

A valid commitment (after prefix stripping) must be exactly 64 hex characters representing 32 bytes (SHA256 output).

Args: commitment: The raw commitment string (prefix already stripped).

Returns: A (valid, error_message) tuple. When valid is True the error_message is the empty string.

Source code in jmcore/src/jmcore/commitment_blacklist.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def validate_commitment_hex(commitment: str) -> tuple[bool, str]:
    """Validate that a commitment string is well-formed hex of the correct length.

    A valid commitment (after prefix stripping) must be exactly 64
    hex characters representing 32 bytes (SHA256 output).

    Args:
        commitment: The raw commitment string (prefix already stripped).

    Returns:
        A ``(valid, error_message)`` tuple.  When valid is True the
        error_message is the empty string.
    """
    if not commitment:
        return False, "empty commitment"

    if len(commitment) != COMMITMENT_HEX_LENGTH:
        return False, (
            f"invalid commitment length {len(commitment)}, expected {COMMITMENT_HEX_LENGTH}"
        )

    if not _HEX_RE.match(commitment):
        return False, "commitment contains non-hex characters"

    return True, ""