Skip to content

maker.podle

maker.podle

Proof of Discrete Log Equivalence (PoDLE) verification for makers.

This module re-exports PoDLE verification functions from jmcore for backward compatibility. All new code should import directly from jmcore.podle.

PoDLE is used to prevent sybil attacks in JoinMarket by requiring takers to prove ownership of a UTXO without revealing which UTXO until after the maker commits to participate.

Reference: https://gist.github.com/AdamISZ/9cbba5e9408d23813ca8

Attributes

__all__ = ['PoDLEError', 'deserialize_revelation', 'parse_podle_revelation', 'verify_podle'] module-attribute

Classes

PoDLEError

Bases: Exception

PoDLE generation or verification error.

Source code in jmcore/src/jmcore/podle.py
69
70
71
72
class PoDLEError(Exception):
    """PoDLE generation or verification error."""

    pass

Functions

deserialize_revelation(revelation_str: str) -> dict[str, Any] | None

Deserialize PoDLE revelation from wire format.

Format: P|P2|sig|e|utxo (pipe-separated hex strings)

Source code in jmcore/src/jmcore/podle.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def deserialize_revelation(revelation_str: str) -> dict[str, Any] | None:
    """
    Deserialize PoDLE revelation from wire format.

    Format: P|P2|sig|e|utxo (pipe-separated hex strings)
    """
    try:
        parts = revelation_str.split("|")
        if len(parts) != 5:
            logger.warning(f"Invalid revelation format: expected 5 parts, got {len(parts)}")
            return None

        return {
            "P": parts[0],
            "P2": parts[1],
            "sig": parts[2],
            "e": parts[3],
            "utxo": parts[4],
        }

    except Exception as e:
        logger.error(f"Failed to deserialize PoDLE revelation: {e}")
        return None

parse_podle_revelation(revelation: dict[str, Any]) -> dict[str, Any] | None

Parse and validate PoDLE revelation structure.

Expected format from taker: { 'P': , 'P2': , 'sig': , 'e': , 'utxo': }

Returns parsed structure with bytes, or None if invalid. Extended format includes scriptpubkey and blockheight for neutrino_compat feature.

Source code in jmcore/src/jmcore/podle.py
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
def parse_podle_revelation(revelation: dict[str, Any]) -> dict[str, Any] | None:
    """
    Parse and validate PoDLE revelation structure.

    Expected format from taker:
    {
        'P': <hex string>,
        'P2': <hex string>,
        'sig': <hex string>,
        'e': <hex string>,
        'utxo': <txid:vout or txid:vout:scriptpubkey:blockheight string>
    }

    Returns parsed structure with bytes, or None if invalid.
    Extended format includes scriptpubkey and blockheight for neutrino_compat feature.
    """
    try:
        required_fields = ["P", "P2", "sig", "e", "utxo"]
        for field in required_fields:
            if field not in revelation:
                logger.warning(f"Missing required field in PoDLE revelation: {field}")
                return None

        p_bytes = bytes.fromhex(revelation["P"])
        p2_bytes = bytes.fromhex(revelation["P2"])
        sig_bytes = bytes.fromhex(revelation["sig"])
        e_bytes = bytes.fromhex(revelation["e"])

        utxo_parts = revelation["utxo"].split(":")

        # Legacy format: txid:vout (2 parts)
        # Extended format: txid:vout:scriptpubkey:blockheight (4 parts)
        if len(utxo_parts) == 2:
            txid = utxo_parts[0]
            vout = int(utxo_parts[1])
            scriptpubkey = None
            blockheight = None
        elif len(utxo_parts) == 4:
            txid = utxo_parts[0]
            vout = int(utxo_parts[1])
            scriptpubkey = utxo_parts[2]
            blockheight = int(utxo_parts[3])
            logger.debug(f"Parsed extended UTXO format: {txid}:{vout} with metadata")
        else:
            logger.warning(f"Invalid UTXO format: {revelation['utxo']}")
            return None

        result: dict[str, Any] = {
            "P": p_bytes,
            "P2": p2_bytes,
            "sig": sig_bytes,
            "e": e_bytes,
            "txid": txid,
            "vout": vout,
        }

        # Add extended metadata if present
        if scriptpubkey is not None:
            result["scriptpubkey"] = scriptpubkey
        if blockheight is not None:
            result["blockheight"] = blockheight

        return result

    except Exception as e:
        logger.error(f"Failed to parse PoDLE revelation: {e}")
        return None

verify_podle(p: bytes, p2: bytes, sig: bytes, e: bytes, commitment: bytes, index_range: range = range(10)) -> tuple[bool, str]

Verify PoDLE proof.

Verifies that P and P2 have the same discrete log (private key) without revealing the private key itself.

Args: p: Public key bytes (33 bytes compressed) p2: Commitment public key bytes (33 bytes compressed) sig: Signature s value (32 bytes) e: Challenge e value (32 bytes) commitment: sha256(P2) commitment (32 bytes) index_range: Allowed NUMS indices to try

Returns: (is_valid, error_message)

Source code in jmcore/src/jmcore/podle.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def verify_podle(
    p: bytes,
    p2: bytes,
    sig: bytes,
    e: bytes,
    commitment: bytes,
    index_range: range = range(10),
) -> tuple[bool, str]:
    """
    Verify PoDLE proof.

    Verifies that P and P2 have the same discrete log (private key)
    without revealing the private key itself.

    Args:
        p: Public key bytes (33 bytes compressed)
        p2: Commitment public key bytes (33 bytes compressed)
        sig: Signature s value (32 bytes)
        e: Challenge e value (32 bytes)
        commitment: sha256(P2) commitment (32 bytes)
        index_range: Allowed NUMS indices to try

    Returns:
        (is_valid, error_message)
    """
    try:
        if len(p) != 33:
            return False, f"Invalid P length: {len(p)}, expected 33"
        if len(p2) != 33:
            return False, f"Invalid P2 length: {len(p2)}, expected 33"
        if len(sig) != 32:
            return False, f"Invalid sig length: {len(sig)}, expected 32"
        if len(e) != 32:
            return False, f"Invalid e length: {len(e)}, expected 32"
        if len(commitment) != 32:
            return False, f"Invalid commitment length: {len(commitment)}, expected 32"

        expected_commitment = hashlib.sha256(p2).digest()
        if commitment != expected_commitment:
            return False, "Commitment does not match H(P2)"

        p_point = PublicKey(p)
        p2_point = PublicKey(p2)

        s_int = int.from_bytes(sig, "big")
        e_int = int.from_bytes(e, "big")

        if s_int >= SECP256K1_N or e_int >= SECP256K1_N:
            return False, "Signature values out of range"

        # sg = s * G
        sg = scalar_mult_g(s_int) if s_int > 0 else None

        # Compute -e mod N for subtraction (JAM compatible: s = k + e*x, verify Kg = s*G - e*P)
        minus_e_int = (-e_int) % SECP256K1_N

        for index in index_range:
            try:
                j = get_nums_point(index)

                # Kg = s*G - e*P = s*G + (-e)*P (JAM compatible verification)
                minus_e_p = point_mult(minus_e_int, p_point)
                kg = point_add(sg, minus_e_p) if sg is not None else minus_e_p

                # Kj = s*J - e*P2 = s*J + (-e)*P2
                minus_e_p2 = point_mult(minus_e_int, p2_point)
                if s_int > 0:
                    sj = point_mult(s_int, j)
                    kj = point_add(sj, minus_e_p2)
                else:
                    kj = minus_e_p2

                kg_bytes = point_to_bytes(kg)
                kj_bytes = point_to_bytes(kj)

                e_check = hashlib.sha256(kg_bytes + kj_bytes + p + p2).digest()

                if e_check == e:
                    logger.debug(f"PoDLE verification successful at index {index}")
                    return True, ""

            except Exception as ex:
                logger.debug(f"PoDLE verification failed at index {index}: {ex}")
                continue

        return False, f"PoDLE verification failed for all indices in {index_range}"

    except Exception as ex:
        logger.error(f"PoDLE verification error: {ex}")
        return False, f"Verification error: {ex}"