Skip to content

jmcore.encryption

jmcore.encryption

End-to-end encryption wrapper using NaCl public-key authenticated encryption.

This implements the JoinMarket encryption protocol using Diffie-Hellman key exchange to set up symmetric encryption between makers and takers.

Classes

CryptoSession

Manages encryption state for a coinjoin session with a taker.

Source code in jmcore/src/jmcore/encryption.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class CryptoSession:
    """
    Manages encryption state for a coinjoin session with a taker.
    """

    def __init__(self) -> None:
        """Initialize a new crypto session with a fresh keypair."""
        self.keypair: SecretKey = init_keypair()
        self.box: Box | None = None
        self.counterparty_pubkey: str = ""

    def get_pubkey_hex(self) -> str:
        """Get our public key as hex string."""
        pk = get_pubkey(self.keypair, as_hex=True)
        assert isinstance(pk, str)
        return pk

    def setup_encryption(self, counterparty_pubkey_hex: str) -> None:
        """
        Set up encryption with a counterparty's public key.

        Args:
            counterparty_pubkey_hex: Counterparty's public key in hex.
        """
        try:
            counterparty_pk = init_pubkey(counterparty_pubkey_hex)
            self.box = create_encryption_box(self.keypair, counterparty_pk)
            self.counterparty_pubkey = counterparty_pubkey_hex
            logger.debug("Set up encryption box with counterparty")
        except NaclError as e:
            logger.error(f"Failed to set up encryption: {e}")
            raise

    def encrypt(self, message: str) -> str:
        """
        Encrypt a message for the counterparty.

        Args:
            message: Plaintext message.

        Returns:
            Base64-encoded encrypted message.
        """
        if self.box is None:
            raise NaclError("Encryption not set up - call setup_encryption first")
        return encrypt_encode(message, self.box)

    def decrypt(self, message: str) -> str:
        """
        Decrypt a message from the counterparty.

        Args:
            message: Base64-encoded encrypted message.

        Returns:
            Decrypted plaintext.
        """
        if self.box is None:
            raise NaclError("Encryption not set up - call setup_encryption first")
        decrypted = decode_decrypt(message, self.box)
        return decrypted.decode("utf-8")

    @property
    def is_encrypted(self) -> bool:
        """Check if encryption has been set up."""
        return self.box is not None
Attributes
box: Box | None = None instance-attribute
counterparty_pubkey: str = '' instance-attribute
is_encrypted: bool property

Check if encryption has been set up.

keypair: SecretKey = init_keypair() instance-attribute
Functions
__init__() -> None

Initialize a new crypto session with a fresh keypair.

Source code in jmcore/src/jmcore/encryption.py
129
130
131
132
133
def __init__(self) -> None:
    """Initialize a new crypto session with a fresh keypair."""
    self.keypair: SecretKey = init_keypair()
    self.box: Box | None = None
    self.counterparty_pubkey: str = ""
decrypt(message: str) -> str

Decrypt a message from the counterparty.

Args: message: Base64-encoded encrypted message.

Returns: Decrypted plaintext.

Source code in jmcore/src/jmcore/encryption.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def decrypt(self, message: str) -> str:
    """
    Decrypt a message from the counterparty.

    Args:
        message: Base64-encoded encrypted message.

    Returns:
        Decrypted plaintext.
    """
    if self.box is None:
        raise NaclError("Encryption not set up - call setup_encryption first")
    decrypted = decode_decrypt(message, self.box)
    return decrypted.decode("utf-8")
encrypt(message: str) -> str

Encrypt a message for the counterparty.

Args: message: Plaintext message.

Returns: Base64-encoded encrypted message.

Source code in jmcore/src/jmcore/encryption.py
157
158
159
160
161
162
163
164
165
166
167
168
169
def encrypt(self, message: str) -> str:
    """
    Encrypt a message for the counterparty.

    Args:
        message: Plaintext message.

    Returns:
        Base64-encoded encrypted message.
    """
    if self.box is None:
        raise NaclError("Encryption not set up - call setup_encryption first")
    return encrypt_encode(message, self.box)
get_pubkey_hex() -> str

Get our public key as hex string.

Source code in jmcore/src/jmcore/encryption.py
135
136
137
138
139
def get_pubkey_hex(self) -> str:
    """Get our public key as hex string."""
    pk = get_pubkey(self.keypair, as_hex=True)
    assert isinstance(pk, str)
    return pk
setup_encryption(counterparty_pubkey_hex: str) -> None

Set up encryption with a counterparty's public key.

Args: counterparty_pubkey_hex: Counterparty's public key in hex.

Source code in jmcore/src/jmcore/encryption.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def setup_encryption(self, counterparty_pubkey_hex: str) -> None:
    """
    Set up encryption with a counterparty's public key.

    Args:
        counterparty_pubkey_hex: Counterparty's public key in hex.
    """
    try:
        counterparty_pk = init_pubkey(counterparty_pubkey_hex)
        self.box = create_encryption_box(self.keypair, counterparty_pk)
        self.counterparty_pubkey = counterparty_pubkey_hex
        logger.debug("Set up encryption box with counterparty")
    except NaclError as e:
        logger.error(f"Failed to set up encryption: {e}")
        raise

NaclError

Bases: Exception

Exception for NaCl encryption errors.

Source code in jmcore/src/jmcore/encryption.py
21
22
23
24
class NaclError(Exception):
    """Exception for NaCl encryption errors."""

    pass

Functions

create_encryption_box(keypair: SecretKey, counterparty_pk: PublicKey) -> Box

Create an encryption box for communicating with a counterparty.

Args: keypair: Our NaCl keypair. counterparty_pk: Counterparty's public key.

Returns: NaCl Box object for encryption/decryption.

Source code in jmcore/src/jmcore/encryption.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def create_encryption_box(keypair: SecretKey, counterparty_pk: PublicKey) -> Box:
    """
    Create an encryption box for communicating with a counterparty.

    Args:
        keypair: Our NaCl keypair.
        counterparty_pk: Counterparty's public key.

    Returns:
        NaCl Box object for encryption/decryption.
    """
    if not isinstance(counterparty_pk, public.PublicKey):
        raise NaclError("Object is not a public key")
    if not isinstance(keypair, public.SecretKey):
        raise NaclError("Object is not a nacl keypair")
    return public.Box(keypair.sk, counterparty_pk)

decode_decrypt(message: str, box: Box) -> bytes

Decode and decrypt a message received from counterparty.

Args: message: Base64-encoded ciphertext. box: NaCl encryption box.

Returns: Decrypted plaintext as bytes.

Source code in jmcore/src/jmcore/encryption.py
109
110
111
112
113
114
115
116
117
118
119
120
121
def decode_decrypt(message: str, box: Box) -> bytes:
    """
    Decode and decrypt a message received from counterparty.

    Args:
        message: Base64-encoded ciphertext.
        box: NaCl encryption box.

    Returns:
        Decrypted plaintext as bytes.
    """
    decoded = base64.b64decode(message)
    return box.decrypt(decoded)

encrypt_encode(message: bytes | str, box: Box) -> str

Encrypt a message and encode as base64 for transmission.

Args: message: Plaintext message (bytes or string). box: NaCl encryption box.

Returns: Base64-encoded ciphertext.

Source code in jmcore/src/jmcore/encryption.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def encrypt_encode(message: bytes | str, box: Box) -> str:
    """
    Encrypt a message and encode as base64 for transmission.

    Args:
        message: Plaintext message (bytes or string).
        box: NaCl encryption box.

    Returns:
        Base64-encoded ciphertext.
    """
    if isinstance(message, str):
        message = message.encode("utf-8")
    encrypted = box.encrypt(message)
    return base64.b64encode(encrypted).decode("ascii")

get_pubkey(keypair: SecretKey, as_hex: bool = False) -> bytes | str

Get the public key from a keypair.

Args: keypair: NaCl keypair object. as_hex: Return as hex string if True, otherwise raw bytes.

Returns: Public key as hex string or bytes.

Source code in jmcore/src/jmcore/encryption.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def get_pubkey(keypair: SecretKey, as_hex: bool = False) -> bytes | str:
    """
    Get the public key from a keypair.

    Args:
        keypair: NaCl keypair object.
        as_hex: Return as hex string if True, otherwise raw bytes.

    Returns:
        Public key as hex string or bytes.
    """
    if not isinstance(keypair, public.SecretKey):
        raise NaclError("Object is not a nacl keypair")
    if as_hex:
        return keypair.hex_pk().decode("ascii")
    return keypair.pk

init_keypair() -> SecretKey

Create a new encryption keypair.

Returns: A NaCl SecretKey object containing the keypair.

Source code in jmcore/src/jmcore/encryption.py
27
28
29
30
31
32
33
34
def init_keypair() -> SecretKey:
    """
    Create a new encryption keypair.

    Returns:
        A NaCl SecretKey object containing the keypair.
    """
    return public.SecretKey()

init_pubkey(hexpk: str) -> PublicKey

Create a public key object from a hex-encoded string.

Args: hexpk: Hex-encoded 32-byte public key.

Returns: NaCl PublicKey object.

Source code in jmcore/src/jmcore/encryption.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def init_pubkey(hexpk: str) -> PublicKey:
    """
    Create a public key object from a hex-encoded string.

    Args:
        hexpk: Hex-encoded 32-byte public key.

    Returns:
        NaCl PublicKey object.
    """
    try:
        bin_pk = binascii.unhexlify(hexpk)
    except (TypeError, binascii.Error) as exc:
        raise NaclError("Invalid hex format") from exc
    if len(bin_pk) != 32:
        raise NaclError("Public key must be 32 bytes")
    return public.PublicKey(bin_pk)