First commit

This commit is contained in:
marcel
2024-02-14 16:29:31 +01:00
parent 54da6fbfac
commit 5b646d27ae
221 changed files with 186799 additions and 2 deletions

View File

@@ -0,0 +1,68 @@
# MIT License
#
# Copyright (c) 2022 Mark Qvist / unsigned.io
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import RNS.Cryptography.Provider as cp
import RNS.vendor.platformutils as pu
if cp.PROVIDER == cp.PROVIDER_INTERNAL:
from .aes import AES
elif cp.PROVIDER == cp.PROVIDER_PYCA:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
if pu.cryptography_old_api():
from cryptography.hazmat.backends import default_backend
class AES_128_CBC:
@staticmethod
def encrypt(plaintext, key, iv):
if cp.PROVIDER == cp.PROVIDER_INTERNAL:
cipher = AES(key)
return cipher.encrypt(plaintext, iv)
elif cp.PROVIDER == cp.PROVIDER_PYCA:
if not pu.cryptography_old_api():
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
else:
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
return ciphertext
@staticmethod
def decrypt(ciphertext, key, iv):
if cp.PROVIDER == cp.PROVIDER_INTERNAL:
cipher = AES(key)
return cipher.decrypt(ciphertext, iv)
elif cp.PROVIDER == cp.PROVIDER_PYCA:
if not pu.cryptography_old_api():
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
else:
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext

View File

@@ -0,0 +1,41 @@
import os
from .pure25519 import ed25519_oop as ed25519
class Ed25519PrivateKey:
def __init__(self, seed):
self.seed = seed
self.sk = ed25519.SigningKey(self.seed)
#self.vk = self.sk.get_verifying_key()
@classmethod
def generate(cls):
return cls.from_private_bytes(os.urandom(32))
@classmethod
def from_private_bytes(cls, data):
return cls(seed=data)
def private_bytes(self):
return self.seed
def public_key(self):
return Ed25519PublicKey.from_public_bytes(self.sk.vk_s)
def sign(self, message):
return self.sk.sign(message)
class Ed25519PublicKey:
def __init__(self, seed):
self.seed = seed
self.vk = ed25519.VerifyingKey(self.seed)
@classmethod
def from_public_bytes(cls, data):
return cls(data)
def public_bytes(self):
return self.vk.to_bytes()
def verify(self, signature, message):
self.vk.verify(signature, message)

View File

@@ -0,0 +1,110 @@
# MIT License
#
# Copyright (c) 2022 Mark Qvist / unsigned.io
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
import time
from RNS.Cryptography import HMAC
from RNS.Cryptography import PKCS7
from RNS.Cryptography.AES import AES_128_CBC
class Fernet():
"""
This class provides a slightly modified implementation of the Fernet spec
found at: https://github.com/fernet/spec/blob/master/Spec.md
According to the spec, a Fernet token includes a one byte VERSION and
eight byte TIMESTAMP field at the start of each token. These fields are
not relevant to Reticulum. They are therefore stripped from this
implementation, since they incur overhead and leak initiator metadata.
"""
FERNET_OVERHEAD = 48 # Bytes
@staticmethod
def generate_key():
return os.urandom(32)
def __init__(self, key = None):
if key == None:
raise ValueError("Token key cannot be None")
if len(key) != 32:
raise ValueError("Token key must be 32 bytes, not "+str(len(key)))
self._signing_key = key[:16]
self._encryption_key = key[16:]
def verify_hmac(self, token):
if len(token) <= 32:
raise ValueError("Cannot verify HMAC on token of only "+str(len(token))+" bytes")
else:
received_hmac = token[-32:]
expected_hmac = HMAC.new(self._signing_key, token[:-32]).digest()
if received_hmac == expected_hmac:
return True
else:
return False
def encrypt(self, data = None):
iv = os.urandom(16)
current_time = int(time.time())
if not isinstance(data, bytes):
raise TypeError("Token plaintext input must be bytes")
ciphertext = AES_128_CBC.encrypt(
plaintext = PKCS7.pad(data),
key = self._encryption_key,
iv = iv,
)
signed_parts = iv+ciphertext
return signed_parts + HMAC.new(self._signing_key, signed_parts).digest()
def decrypt(self, token = None):
if not isinstance(token, bytes):
raise TypeError("Token must be bytes")
if not self.verify_hmac(token):
raise ValueError("Token HMAC was invalid")
iv = token[:16]
ciphertext = token[16:-32]
try:
plaintext = PKCS7.unpad(
AES_128_CBC.decrypt(
ciphertext,
self._encryption_key,
iv,
)
)
return plaintext
except Exception as e:
raise ValueError("Could not decrypt token")

View File

@@ -0,0 +1,54 @@
# MIT License
#
# Copyright (c) 2022 Mark Qvist / unsigned.io
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import hashlib
from math import ceil
from RNS.Cryptography import HMAC
def hkdf(length=None, derive_from=None, salt=None, context=None):
hash_len = 32
def hmac_sha256(key, data):
return HMAC.new(key, data).digest()
if length == None or length < 1:
raise ValueError("Invalid output key length")
if derive_from == None or derive_from == "":
raise ValueError("Cannot derive key from empty input material")
if salt == None or len(salt) == 0:
salt = bytes([0] * hash_len)
if context == None:
context = b""
pseudorandom_key = hmac_sha256(salt, derive_from)
block = b""
derived = b""
for i in range(ceil(length / hash_len)):
block = hmac_sha256(pseudorandom_key, block + context + bytes([i + 1]))
derived += block
return derived[:length]

View File

@@ -0,0 +1,183 @@
# This HMAC implementation comes directly from the HMAC implementation
# included in Python 3.10.4, and is almost completely identical. It has
# been modified to be a pure Python implementation, that is not dependent
# on the system having OpenSSL binaries installed.
import warnings as _warnings
import hashlib as _hashlib
trans_5C = bytes((x ^ 0x5C) for x in range(256))
trans_36 = bytes((x ^ 0x36) for x in range(256))
# The size of the digests returned by HMAC depends on the underlying
# hashing module used. Use digest_size from the instance of HMAC instead.
digest_size = None
class HMAC:
"""RFC 2104 HMAC class. Also complies with RFC 4231.
This supports the API for Cryptographic Hash Functions (PEP 247).
"""
blocksize = 64 # 512-bit HMAC; can be changed in subclasses.
__slots__ = (
"_hmac", "_inner", "_outer", "block_size", "digest_size"
)
def __init__(self, key, msg=None, digestmod=_hashlib.sha256):
"""Create a new HMAC object.
key: bytes or buffer, key for the keyed hash object.
msg: bytes or buffer, Initial input for the hash or None.
digestmod: A hash name suitable for hashlib.new(). *OR*
A hashlib constructor returning a new hash object. *OR*
A module supporting PEP 247.
Required as of 3.8, despite its position after the optional
msg argument. Passing it as a keyword argument is
recommended, though not required for legacy API reasons.
"""
if not isinstance(key, (bytes, bytearray)):
raise TypeError("key: expected bytes or bytearray, but got %r" % type(key).__name__)
if not digestmod:
raise TypeError("Missing required parameter 'digestmod'.")
self._hmac_init(key, msg, digestmod)
def _hmac_init(self, key, msg, digestmod):
if callable(digestmod):
digest_cons = digestmod
elif isinstance(digestmod, str):
digest_cons = lambda d=b'': _hashlib.new(digestmod, d)
else:
digest_cons = lambda d=b'': digestmod.new(d)
self._hmac = None
self._outer = digest_cons()
self._inner = digest_cons()
self.digest_size = self._inner.digest_size
if hasattr(self._inner, 'block_size'):
blocksize = self._inner.block_size
if blocksize < 16:
_warnings.warn('block_size of %d seems too small; using our '
'default of %d.' % (blocksize, self.blocksize),
RuntimeWarning, 2)
blocksize = self.blocksize
else:
_warnings.warn('No block_size attribute on given digest object; '
'Assuming %d.' % (self.blocksize),
RuntimeWarning, 2)
blocksize = self.blocksize
if len(key) > blocksize:
key = digest_cons(key).digest()
# self.blocksize is the default blocksize. self.block_size is
# effective block size as well as the public API attribute.
self.block_size = blocksize
key = key.ljust(blocksize, b'\0')
self._outer.update(key.translate(trans_5C))
self._inner.update(key.translate(trans_36))
if msg is not None:
self.update(msg)
@property
def name(self):
if self._hmac:
return self._hmac.name
else:
return f"hmac-{self._inner.name}"
def update(self, msg):
"""Feed data from msg into this hashing object."""
inst = self._hmac or self._inner
inst.update(msg)
def copy(self):
"""Return a separate copy of this hashing object.
An update to this copy won't affect the original object.
"""
# Call __new__ directly to avoid the expensive __init__.
other = self.__class__.__new__(self.__class__)
other.digest_size = self.digest_size
if self._hmac:
other._hmac = self._hmac.copy()
other._inner = other._outer = None
else:
other._hmac = None
other._inner = self._inner.copy()
other._outer = self._outer.copy()
return other
def _current(self):
"""Return a hash object for the current state.
To be used only internally with digest() and hexdigest().
"""
if self._hmac:
return self._hmac
else:
h = self._outer.copy()
h.update(self._inner.digest())
return h
def digest(self):
"""Return the hash value of this hashing object.
This returns the hmac value as bytes. The object is
not altered in any way by this function; you can continue
updating the object after calling this function.
"""
h = self._current()
return h.digest()
def hexdigest(self):
"""Like digest(), but returns a string of hexadecimal digits instead.
"""
h = self._current()
return h.hexdigest()
def new(key, msg=None, digestmod=_hashlib.sha256):
"""Create a new hashing object and return it.
key: bytes or buffer, The starting key for the hash.
msg: bytes or buffer, Initial input for the hash, or None.
digestmod: A hash name suitable for hashlib.new(). *OR*
A hashlib constructor returning a new hash object. *OR*
A module supporting PEP 247.
Required as of 3.8, despite its position after the optional
msg argument. Passing it as a keyword argument is
recommended, though not required for legacy API reasons.
You can now feed arbitrary bytes into the object using its update()
method, and can ask for the hash value at any time by calling its digest()
or hexdigest() methods.
"""
return HMAC(key, msg, digestmod)
def digest(key, msg, digest):
"""Fast inline implementation of HMAC.
key: bytes or buffer, The key for the keyed hash object.
msg: bytes or buffer, Input message.
digest: A hash name suitable for hashlib.new() for best performance. *OR*
A hashlib constructor returning a new hash object. *OR*
A module supporting PEP 247.
"""
if callable(digest):
digest_cons = digest
elif isinstance(digest, str):
digest_cons = lambda d=b'': _hashlib.new(digest, d)
else:
digest_cons = lambda d=b'': digest.new(d)
inner = digest_cons()
outer = digest_cons()
blocksize = getattr(inner, 'block_size', 64)
if len(key) > blocksize:
key = digest_cons(key).digest()
key = key + b'\x00' * (blocksize - len(key))
inner.update(key.translate(trans_36))
outer.update(key.translate(trans_5C))
inner.update(msg)
outer.update(inner.digest())
return outer.digest()

View File

@@ -0,0 +1,34 @@
import importlib
if importlib.util.find_spec('hashlib') != None:
import hashlib
else:
hashlib = None
if hasattr(hashlib, "sha512"):
from hashlib import sha512 as ext_sha512
else:
from .SHA512 import sha512 as ext_sha512
if hasattr(hashlib, "sha256"):
from hashlib import sha256 as ext_sha256
else:
from .SHA256 import sha256 as ext_sha256
"""
The SHA primitives are abstracted here to allow platform-
aware hardware acceleration in the future. Currently only
uses Python's internal SHA-256 implementation. All SHA-256
calls in RNS end up here.
"""
def sha256(data):
digest = ext_sha256()
digest.update(data)
return digest.digest()
def sha512(data):
digest = ext_sha512()
digest.update(data)
return digest.digest()

View File

@@ -0,0 +1,40 @@
# MIT License
#
# Copyright (c) 2022 Mark Qvist / unsigned.io
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
class PKCS7:
BLOCKSIZE = 16
@staticmethod
def pad(data, bs=BLOCKSIZE):
l = len(data)
n = bs-l%bs
v = bytes([n])
return data+v*n
@staticmethod
def unpad(data, bs=BLOCKSIZE):
l = len(data)
n = data[-1]
if n > bs:
raise ValueError("Cannot unpad, invalid padding length of "+str(n)+" bytes")
else:
return data[:l-n]

View File

@@ -0,0 +1,38 @@
import importlib
PROVIDER_NONE = 0x00
PROVIDER_INTERNAL = 0x01
PROVIDER_PYCA = 0x02
PROVIDER = PROVIDER_NONE
pyca_v = None
use_pyca = False
try:
if importlib.util.find_spec('cryptography') != None:
import cryptography
pyca_v = cryptography.__version__
v = pyca_v.split(".")
if int(v[0]) == 2:
if int(v[1]) >= 8:
use_pyca = True
elif int(v[0]) >= 3:
use_pyca = True
except Exception as e:
pass
if use_pyca:
PROVIDER = PROVIDER_PYCA
else:
PROVIDER = PROVIDER_INTERNAL
def backend():
if PROVIDER == PROVIDER_NONE:
return "none"
elif PROVIDER == PROVIDER_INTERNAL:
return "internal"
elif PROVIDER == PROVIDER_PYCA:
return "openssl, PyCA "+str(pyca_v)

View File

@@ -0,0 +1,90 @@
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
# These proxy classes exist to create a uniform API accross
# cryptography primitive providers.
class X25519PrivateKeyProxy:
def __init__(self, real):
self.real = real
@classmethod
def generate(cls):
return cls(X25519PrivateKey.generate())
@classmethod
def from_private_bytes(cls, data):
return cls(X25519PrivateKey.from_private_bytes(data))
def private_bytes(self):
return self.real.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
def public_key(self):
return X25519PublicKeyProxy(self.real.public_key())
def exchange(self, peer_public_key):
return self.real.exchange(peer_public_key.real)
class X25519PublicKeyProxy:
def __init__(self, real):
self.real = real
@classmethod
def from_public_bytes(cls, data):
return cls(X25519PublicKey.from_public_bytes(data))
def public_bytes(self):
return self.real.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
class Ed25519PrivateKeyProxy:
def __init__(self, real):
self.real = real
@classmethod
def generate(cls):
return cls(Ed25519PrivateKey.generate())
@classmethod
def from_private_bytes(cls, data):
return cls(Ed25519PrivateKey.from_private_bytes(data))
def private_bytes(self):
return self.real.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption()
)
def public_key(self):
return Ed25519PublicKeyProxy(self.real.public_key())
def sign(self, message):
return self.real.sign(message)
class Ed25519PublicKeyProxy:
def __init__(self, real):
self.real = real
@classmethod
def from_public_bytes(cls, data):
return cls(Ed25519PublicKey.from_public_bytes(data))
def public_bytes(self):
return self.real.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
def verify(self, signature, message):
self.real.verify(signature, message)

View File

@@ -0,0 +1,129 @@
# MIT License
#
# Copyright (c) 2017 Thomas Dixon
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import copy
import struct
import sys
def new(m=None):
return sha256(m)
class sha256(object):
_k = (0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5,
0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5,
0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3,
0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174,
0xe49b69c1, 0xefbe4786, 0x0fc19dc6, 0x240ca1cc,
0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da,
0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7,
0xc6e00bf3, 0xd5a79147, 0x06ca6351, 0x14292967,
0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13,
0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85,
0xa2bfe8a1, 0xa81a664b, 0xc24b8b70, 0xc76c51a3,
0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070,
0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5,
0x391c0cb3, 0x4ed8aa4a, 0x5b9cca4f, 0x682e6ff3,
0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208,
0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2)
_h = (0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a,
0x510e527f, 0x9b05688c, 0x1f83d9ab, 0x5be0cd19)
_output_size = 8
blocksize = 1
block_size = 64
digest_size = 32
def __init__(self, m=None):
self._buffer = b""
self._counter = 0
if m is not None:
if type(m) is not bytes:
raise TypeError('%s() argument 1 must be bytes, not %s' % (self.__class__.__name__, type(m).__name__))
self.update(m)
def _rotr(self, x, y):
return ((x >> y) | (x << (32-y))) & 0xFFFFFFFF
def _sha256_process(self, c):
w = [0]*64
w[0:16] = struct.unpack('!16L', c)
for i in range(16, 64):
s0 = self._rotr(w[i-15], 7) ^ self._rotr(w[i-15], 18) ^ (w[i-15] >> 3)
s1 = self._rotr(w[i-2], 17) ^ self._rotr(w[i-2], 19) ^ (w[i-2] >> 10)
w[i] = (w[i-16] + s0 + w[i-7] + s1) & 0xFFFFFFFF
a,b,c,d,e,f,g,h = self._h
for i in range(64):
s0 = self._rotr(a, 2) ^ self._rotr(a, 13) ^ self._rotr(a, 22)
maj = (a & b) ^ (a & c) ^ (b & c)
t2 = s0 + maj
s1 = self._rotr(e, 6) ^ self._rotr(e, 11) ^ self._rotr(e, 25)
ch = (e & f) ^ ((~e) & g)
t1 = h + s1 + ch + self._k[i] + w[i]
h = g
g = f
f = e
e = (d + t1) & 0xFFFFFFFF
d = c
c = b
b = a
a = (t1 + t2) & 0xFFFFFFFF
self._h = [(x+y) & 0xFFFFFFFF for x,y in zip(self._h, [a,b,c,d,e,f,g,h])]
def update(self, m):
if not m:
return
if type(m) is not bytes:
raise TypeError('%s() argument 1 must be bytes, not %s' % (sys._getframe().f_code.co_name, type(m).__name__))
self._buffer += m
self._counter += len(m)
while len(self._buffer) >= 64:
self._sha256_process(self._buffer[:64])
self._buffer = self._buffer[64:]
def digest(self):
mdi = self._counter & 0x3F
length = struct.pack('!Q', self._counter<<3)
if mdi < 56:
padlen = 55-mdi
else:
padlen = 119-mdi
r = self.copy()
r.update(b'\x80'+(b'\x00'*padlen)+length)
return b''.join([struct.pack('!L', i) for i in r._h[:self._output_size]])
def hexdigest(self):
return self.digest().encode('hex')
def copy(self):
return copy.deepcopy(self)

View File

@@ -0,0 +1,129 @@
# MIT License
#
# Copyright (c) 2017 Thomas Dixon
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import copy, struct, sys
def new(m=None):
return sha512(m)
class sha512(object):
_k = (0x428a2f98d728ae22, 0x7137449123ef65cd, 0xb5c0fbcfec4d3b2f, 0xe9b5dba58189dbbc,
0x3956c25bf348b538, 0x59f111f1b605d019, 0x923f82a4af194f9b, 0xab1c5ed5da6d8118,
0xd807aa98a3030242, 0x12835b0145706fbe, 0x243185be4ee4b28c, 0x550c7dc3d5ffb4e2,
0x72be5d74f27b896f, 0x80deb1fe3b1696b1, 0x9bdc06a725c71235, 0xc19bf174cf692694,
0xe49b69c19ef14ad2, 0xefbe4786384f25e3, 0x0fc19dc68b8cd5b5, 0x240ca1cc77ac9c65,
0x2de92c6f592b0275, 0x4a7484aa6ea6e483, 0x5cb0a9dcbd41fbd4, 0x76f988da831153b5,
0x983e5152ee66dfab, 0xa831c66d2db43210, 0xb00327c898fb213f, 0xbf597fc7beef0ee4,
0xc6e00bf33da88fc2, 0xd5a79147930aa725, 0x06ca6351e003826f, 0x142929670a0e6e70,
0x27b70a8546d22ffc, 0x2e1b21385c26c926, 0x4d2c6dfc5ac42aed, 0x53380d139d95b3df,
0x650a73548baf63de, 0x766a0abb3c77b2a8, 0x81c2c92e47edaee6, 0x92722c851482353b,
0xa2bfe8a14cf10364, 0xa81a664bbc423001, 0xc24b8b70d0f89791, 0xc76c51a30654be30,
0xd192e819d6ef5218, 0xd69906245565a910, 0xf40e35855771202a, 0x106aa07032bbd1b8,
0x19a4c116b8d2d0c8, 0x1e376c085141ab53, 0x2748774cdf8eeb99, 0x34b0bcb5e19b48a8,
0x391c0cb3c5c95a63, 0x4ed8aa4ae3418acb, 0x5b9cca4f7763e373, 0x682e6ff3d6b2b8a3,
0x748f82ee5defb2fc, 0x78a5636f43172f60, 0x84c87814a1f0ab72, 0x8cc702081a6439ec,
0x90befffa23631e28, 0xa4506cebde82bde9, 0xbef9a3f7b2c67915, 0xc67178f2e372532b,
0xca273eceea26619c, 0xd186b8c721c0c207, 0xeada7dd6cde0eb1e, 0xf57d4f7fee6ed178,
0x06f067aa72176fba, 0x0a637dc5a2c898a6, 0x113f9804bef90dae, 0x1b710b35131c471b,
0x28db77f523047d84, 0x32caab7b40c72493, 0x3c9ebe0a15c9bebc, 0x431d67c49c100d4c,
0x4cc5d4becb3e42b6, 0x597f299cfc657e2a, 0x5fcb6fab3ad6faec, 0x6c44198c4a475817)
_h = (0x6a09e667f3bcc908, 0xbb67ae8584caa73b, 0x3c6ef372fe94f82b, 0xa54ff53a5f1d36f1,
0x510e527fade682d1, 0x9b05688c2b3e6c1f, 0x1f83d9abfb41bd6b, 0x5be0cd19137e2179)
_output_size = 8
blocksize = 1
block_size = 128
digest_size = 64
def __init__(self, m=None):
self._buffer = b''
self._counter = 0
if m is not None:
if type(m) is not bytes:
raise TypeError('%s() argument 1 must be bytes, not %s' % (self.__class__.__name__, type(m).__name__))
self.update(m)
def _rotr(self, x, y):
return ((x >> y) | (x << (64-y))) & 0xFFFFFFFFFFFFFFFF
def _sha512_process(self, chunk):
w = [0]*80
w[0:16] = struct.unpack('!16Q', chunk)
for i in range(16, 80):
s0 = self._rotr(w[i-15], 1) ^ self._rotr(w[i-15], 8) ^ (w[i-15] >> 7)
s1 = self._rotr(w[i-2], 19) ^ self._rotr(w[i-2], 61) ^ (w[i-2] >> 6)
w[i] = (w[i-16] + s0 + w[i-7] + s1) & 0xFFFFFFFFFFFFFFFF
a,b,c,d,e,f,g,h = self._h
for i in range(80):
s0 = self._rotr(a, 28) ^ self._rotr(a, 34) ^ self._rotr(a, 39)
maj = (a & b) ^ (a & c) ^ (b & c)
t2 = s0 + maj
s1 = self._rotr(e, 14) ^ self._rotr(e, 18) ^ self._rotr(e, 41)
ch = (e & f) ^ ((~e) & g)
t1 = h + s1 + ch + self._k[i] + w[i]
h = g
g = f
f = e
e = (d + t1) & 0xFFFFFFFFFFFFFFFF
d = c
c = b
b = a
a = (t1 + t2) & 0xFFFFFFFFFFFFFFFF
self._h = [(x+y) & 0xFFFFFFFFFFFFFFFF for x,y in zip(self._h, [a,b,c,d,e,f,g,h])]
def update(self, m):
if not m:
return
if type(m) is not bytes:
raise TypeError('%s() argument 1 must be bytes, not %s' % (sys._getframe().f_code.co_name, type(m).__name__))
self._buffer += m
self._counter += len(m)
while len(self._buffer) >= 128:
self._sha512_process(self._buffer[:128])
self._buffer = self._buffer[128:]
def digest(self):
mdi = self._counter & 0x7F
length = struct.pack('!Q', self._counter<<3)
if mdi < 112:
padlen = 111-mdi
else:
padlen = 239-mdi
r = self.copy()
r.update(b'\x80'+(b'\x00'*(padlen+8))+length)
return b''.join([struct.pack('!Q', i) for i in r._h[:self._output_size]])
def hexdigest(self):
return self.digest().encode('hex')
def copy(self):
return copy.deepcopy(self)

View File

@@ -0,0 +1,171 @@
# By Nicko van Someren, 2021. This code is released into the public domain.
# Small modifications for use in Reticulum, and constant time key exchange
# added by Mark Qvist in 2022.
# WARNING! Only the X25519PrivateKey.exchange() method attempts to hide execution time.
# In the context of Reticulum, this is sufficient, but it may not be in other systems. If
# this code is to be used to provide cryptographic security in an environment where the
# start and end times of the execution can be guessed, inferred or measured then it is
# critical that steps are taken to hide the execution time, for instance by adding a
# delay so that encrypted packets are not sent until a fixed time after the _start_ of
# execution.
import os
import time
P = 2 ** 255 - 19
_A = 486662
def _point_add(point_n, point_m, point_diff):
"""Given the projection of two points and their difference, return their sum"""
(xn, zn) = point_n
(xm, zm) = point_m
(x_diff, z_diff) = point_diff
x = (z_diff << 2) * (xm * xn - zm * zn) ** 2
z = (x_diff << 2) * (xm * zn - zm * xn) ** 2
return x % P, z % P
def _point_double(point_n):
"""Double a point provided in projective coordinates"""
(xn, zn) = point_n
xn2 = xn ** 2
zn2 = zn ** 2
x = (xn2 - zn2) ** 2
xzn = xn * zn
z = 4 * xzn * (xn2 + _A * xzn + zn2)
return x % P, z % P
def _const_time_swap(a, b, swap):
"""Swap two values in constant time"""
index = int(swap) * 2
temp = (a, b, b, a)
return temp[index:index+2]
def _raw_curve25519(base, n):
"""Raise the point base to the power n"""
zero = (1, 0)
one = (base, 1)
mP, m1P = zero, one
for i in reversed(range(256)):
bit = bool(n & (1 << i))
mP, m1P = _const_time_swap(mP, m1P, bit)
mP, m1P = _point_double(mP), _point_add(mP, m1P, one)
mP, m1P = _const_time_swap(mP, m1P, bit)
x, z = mP
inv_z = pow(z, P - 2, P)
return (x * inv_z) % P
def _unpack_number(s):
"""Unpack 32 bytes to a 256 bit value"""
if len(s) != 32:
raise ValueError('Curve25519 values must be 32 bytes')
return int.from_bytes(s, "little")
def _pack_number(n):
"""Pack a value into 32 bytes"""
return n.to_bytes(32, "little")
def _fix_secret(n):
"""Mask a value to be an acceptable exponent"""
n &= ~7
n &= ~(128 << 8 * 31)
n |= 64 << 8 * 31
return n
def curve25519(base_point_raw, secret_raw):
"""Raise the base point to a given power"""
base_point = _unpack_number(base_point_raw)
secret = _fix_secret(_unpack_number(secret_raw))
return _pack_number(_raw_curve25519(base_point, secret))
def curve25519_base(secret_raw):
"""Raise the generator point to a given power"""
secret = _fix_secret(_unpack_number(secret_raw))
return _pack_number(_raw_curve25519(9, secret))
class X25519PublicKey:
def __init__(self, x):
self.x = x
@classmethod
def from_public_bytes(cls, data):
return cls(_unpack_number(data))
def public_bytes(self):
return _pack_number(self.x)
class X25519PrivateKey:
MIN_EXEC_TIME = 0.002
MAX_EXEC_TIME = 0.5
DELAY_WINDOW = 10
T_CLEAR = None
T_MAX = 0
def __init__(self, a):
self.a = a
@classmethod
def generate(cls):
return cls.from_private_bytes(os.urandom(32))
@classmethod
def from_private_bytes(cls, data):
return cls(_fix_secret(_unpack_number(data)))
def private_bytes(self):
return _pack_number(self.a)
def public_key(self):
return X25519PublicKey.from_public_bytes(_pack_number(_raw_curve25519(9, self.a)))
def exchange(self, peer_public_key):
if isinstance(peer_public_key, bytes):
peer_public_key = X25519PublicKey.from_public_bytes(peer_public_key)
start = time.time()
shared = _pack_number(_raw_curve25519(peer_public_key.x, self.a))
end = time.time()
duration = end-start
if X25519PrivateKey.T_CLEAR == None:
X25519PrivateKey.T_CLEAR = end + X25519PrivateKey.DELAY_WINDOW
if end > X25519PrivateKey.T_CLEAR:
X25519PrivateKey.T_CLEAR = end + X25519PrivateKey.DELAY_WINDOW
X25519PrivateKey.T_MAX = 0
if duration < X25519PrivateKey.T_MAX or duration < X25519PrivateKey.MIN_EXEC_TIME:
target = start+X25519PrivateKey.T_MAX
if target > start+X25519PrivateKey.MAX_EXEC_TIME:
target = start+X25519PrivateKey.MAX_EXEC_TIME
if target < start+X25519PrivateKey.MIN_EXEC_TIME:
target = start+X25519PrivateKey.MIN_EXEC_TIME
try:
time.sleep(target-time.time())
except Exception as e:
pass
elif duration > X25519PrivateKey.T_MAX:
X25519PrivateKey.T_MAX = duration
return shared

View File

@@ -0,0 +1,24 @@
import os
import glob
from .Hashes import sha256
from .Hashes import sha512
from .HKDF import hkdf
from .PKCS7 import PKCS7
from .Fernet import Fernet
from .Provider import backend
import RNS.Cryptography.Provider as cp
if cp.PROVIDER == cp.PROVIDER_INTERNAL:
from RNS.Cryptography.X25519 import X25519PrivateKey, X25519PublicKey
from RNS.Cryptography.Ed25519 import Ed25519PrivateKey, Ed25519PublicKey
elif cp.PROVIDER == cp.PROVIDER_PYCA:
from RNS.Cryptography.Proxies import X25519PrivateKeyProxy as X25519PrivateKey
from RNS.Cryptography.Proxies import X25519PublicKeyProxy as X25519PublicKey
from RNS.Cryptography.Proxies import Ed25519PrivateKeyProxy as Ed25519PrivateKey
from RNS.Cryptography.Proxies import Ed25519PublicKeyProxy as Ed25519PublicKey
modules = glob.glob(os.path.dirname(__file__)+"/*.py")
__all__ = [ os.path.basename(f)[:-3] for f in modules if not f.endswith('__init__.py')]

View File

@@ -0,0 +1 @@
from .aes import AES

View File

@@ -0,0 +1,271 @@
# MIT License
# Copyright (c) 2021 Or Gur Arie
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from .utils import *
class AES:
# AES-128 block size
block_size = 16
# AES-128 encrypts messages with 10 rounds
_rounds = 10
# initiate the AES objecy
def __init__(self, key):
"""
Initializes the object with a given key.
"""
# make sure key length is right
assert len(key) == AES.block_size
# ExpandKey
self._round_keys = self._expand_key(key)
# will perform the AES ExpandKey phase
def _expand_key(self, master_key):
"""
Expands and returns a list of key matrices for the given master_key.
"""
# Initialize round keys with raw key material.
key_columns = bytes2matrix(master_key)
iteration_size = len(master_key) // 4
# Each iteration has exactly as many columns as the key material.
i = 1
while len(key_columns) < (self._rounds + 1) * 4:
# Copy previous word.
word = list(key_columns[-1])
# Perform schedule_core once every "row".
if len(key_columns) % iteration_size == 0:
# Circular shift.
word.append(word.pop(0))
# Map to S-BOX.
word = [s_box[b] for b in word]
# XOR with first byte of R-CON, since the others bytes of R-CON are 0.
word[0] ^= r_con[i]
i += 1
elif len(master_key) == 32 and len(key_columns) % iteration_size == 4:
# Run word through S-box in the fourth iteration when using a
# 256-bit key.
word = [s_box[b] for b in word]
# XOR with equivalent word from previous iteration.
word = bytes(i^j for i, j in zip(word, key_columns[-iteration_size]))
key_columns.append(word)
# Group key words in 4x4 byte matrices.
return [key_columns[4*i : 4*(i+1)] for i in range(len(key_columns) // 4)]
# encrypt a single block of data with AES
def _encrypt_block(self, plaintext):
"""
Encrypts a single block of 16 byte long plaintext.
"""
# length of a single block
assert len(plaintext) == AES.block_size
# perform on a matrix
state = bytes2matrix(plaintext)
# AddRoundKey
add_round_key(state, self._round_keys[0])
# 9 main rounds
for i in range(1, self._rounds):
# SubBytes
sub_bytes(state)
# ShiftRows
shift_rows(state)
# MixCols
mix_columns(state)
# AddRoundKey
add_round_key(state, self._round_keys[i])
# last round, w/t AddRoundKey step
sub_bytes(state)
shift_rows(state)
add_round_key(state, self._round_keys[-1])
# return the encrypted matrix as bytes
return matrix2bytes(state)
# decrypt a single block of data with AES
def _decrypt_block(self, ciphertext):
"""
Decrypts a single block of 16 byte long ciphertext.
"""
# length of a single block
assert len(ciphertext) == AES.block_size
# perform on a matrix
state = bytes2matrix(ciphertext)
# in reverse order, last round is first
add_round_key(state, self._round_keys[-1])
inv_shift_rows(state)
inv_sub_bytes(state)
for i in range(self._rounds - 1, 0, -1):
# nain rounds
add_round_key(state, self._round_keys[i])
inv_mix_columns(state)
inv_shift_rows(state)
inv_sub_bytes(state)
# initial AddRoundKey phase
add_round_key(state, self._round_keys[0])
# return bytes
return matrix2bytes(state)
# will encrypt the entire data
def encrypt(self, plaintext, iv):
"""
Encrypts `plaintext` using CBC mode and PKCS#7 padding, with the given
initialization vector (iv).
"""
# iv length must be same as block size
assert len(iv) == AES.block_size
assert len(plaintext) % AES.block_size == 0
ciphertext_blocks = []
previous = iv
for plaintext_block in split_blocks(plaintext):
# in CBC mode every block is XOR'd with the previous block
xorred = xor_bytes(plaintext_block, previous)
# encrypt current block
block = self._encrypt_block(xorred)
previous = block
# append to ciphertext
ciphertext_blocks.append(block)
# return as bytes
return b''.join(ciphertext_blocks)
# will decrypt the entire data
def decrypt(self, ciphertext, iv):
"""
Decrypts `ciphertext` using CBC mode and PKCS#7 padding, with the given
initialization vector (iv).
"""
# iv length must be same as block size
assert len(iv) == AES.block_size
plaintext_blocks = []
previous = iv
for ciphertext_block in split_blocks(ciphertext):
# in CBC mode every block is XOR'd with the previous block
xorred = xor_bytes(previous, self._decrypt_block(ciphertext_block))
# append plaintext
plaintext_blocks.append(xorred)
previous = ciphertext_block
return b''.join(plaintext_blocks)
def test():
# modules and classes requiered for test only
import os
class bcolors:
OK = '\033[92m' #GREEN
WARNING = '\033[93m' #YELLOW
FAIL = '\033[91m' #RED
RESET = '\033[0m' #RESET COLOR
# will test AES class by performing an encryption / decryption
print("AES Tests")
print("=========")
# generate a secret key and print details
key = os.urandom(AES.block_size)
_aes = AES(key)
print(f"Algorithm: AES-CBC-{AES.block_size*8}")
print(f"Secret Key: {key.hex()}")
print()
# test single block encryption / decryption
iv = os.urandom(AES.block_size)
single_block_text = b"SingleBlock Text"
print("Single Block Tests")
print("------------------")
print(f"iv: {iv.hex()}")
print(f"plain text: '{single_block_text.decode()}'")
ciphertext_block = _aes._encrypt_block(single_block_text)
plaintext_block = _aes._decrypt_block(ciphertext_block)
print(f"Ciphertext Hex: {ciphertext_block.hex()}")
print(f"Plaintext: {plaintext_block.decode()}")
assert plaintext_block == single_block_text
print(bcolors.OK + "Single Block Test Passed Successfully" + bcolors.RESET)
print()
# test a less than a block length phrase
iv = os.urandom(AES.block_size)
short_text = b"Just Text"
print("Short Text Tests")
print("----------------")
print(f"iv: {iv.hex()}")
print(f"plain text: '{short_text.decode()}'")
ciphertext_short = _aes.encrypt(short_text, iv)
plaintext_short = _aes.decrypt(ciphertext_short, iv)
print(f"Ciphertext Hex: {ciphertext_short.hex()}")
print(f"Plaintext: {plaintext_short.decode()}")
assert short_text == plaintext_short
print(bcolors.OK + "Short Text Test Passed Successfully" + bcolors.RESET)
print()
# test an arbitrary length phrase
iv = os.urandom(AES.block_size)
text = b"This Text is longer than one block"
print("Arbitrary Length Tests")
print("----------------------")
print(f"iv: {iv.hex()}")
print(f"plain text: '{text.decode()}'")
ciphertext = _aes.encrypt(text, iv)
plaintext = _aes.decrypt(ciphertext, iv)
print(f"Ciphertext Hex: {ciphertext.hex()}")
print(f"Plaintext: {plaintext.decode()}")
assert text == plaintext
print(bcolors.OK + "Arbitrary Length Text Test Passed Successfully" + bcolors.RESET)
print()
if __name__ == "__main__":
# test AES class
test()

View File

@@ -0,0 +1,159 @@
# MIT License
# Copyright (c) 2021 Or Gur Arie
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
'''
Utils class for AES encryption / decryption
'''
## AES lookup tables
# resource: https://en.wikipedia.org/wiki/Rijndael_S-box
s_box = (
0x63, 0x7C, 0x77, 0x7B, 0xF2, 0x6B, 0x6F, 0xC5, 0x30, 0x01, 0x67, 0x2B, 0xFE, 0xD7, 0xAB, 0x76,
0xCA, 0x82, 0xC9, 0x7D, 0xFA, 0x59, 0x47, 0xF0, 0xAD, 0xD4, 0xA2, 0xAF, 0x9C, 0xA4, 0x72, 0xC0,
0xB7, 0xFD, 0x93, 0x26, 0x36, 0x3F, 0xF7, 0xCC, 0x34, 0xA5, 0xE5, 0xF1, 0x71, 0xD8, 0x31, 0x15,
0x04, 0xC7, 0x23, 0xC3, 0x18, 0x96, 0x05, 0x9A, 0x07, 0x12, 0x80, 0xE2, 0xEB, 0x27, 0xB2, 0x75,
0x09, 0x83, 0x2C, 0x1A, 0x1B, 0x6E, 0x5A, 0xA0, 0x52, 0x3B, 0xD6, 0xB3, 0x29, 0xE3, 0x2F, 0x84,
0x53, 0xD1, 0x00, 0xED, 0x20, 0xFC, 0xB1, 0x5B, 0x6A, 0xCB, 0xBE, 0x39, 0x4A, 0x4C, 0x58, 0xCF,
0xD0, 0xEF, 0xAA, 0xFB, 0x43, 0x4D, 0x33, 0x85, 0x45, 0xF9, 0x02, 0x7F, 0x50, 0x3C, 0x9F, 0xA8,
0x51, 0xA3, 0x40, 0x8F, 0x92, 0x9D, 0x38, 0xF5, 0xBC, 0xB6, 0xDA, 0x21, 0x10, 0xFF, 0xF3, 0xD2,
0xCD, 0x0C, 0x13, 0xEC, 0x5F, 0x97, 0x44, 0x17, 0xC4, 0xA7, 0x7E, 0x3D, 0x64, 0x5D, 0x19, 0x73,
0x60, 0x81, 0x4F, 0xDC, 0x22, 0x2A, 0x90, 0x88, 0x46, 0xEE, 0xB8, 0x14, 0xDE, 0x5E, 0x0B, 0xDB,
0xE0, 0x32, 0x3A, 0x0A, 0x49, 0x06, 0x24, 0x5C, 0xC2, 0xD3, 0xAC, 0x62, 0x91, 0x95, 0xE4, 0x79,
0xE7, 0xC8, 0x37, 0x6D, 0x8D, 0xD5, 0x4E, 0xA9, 0x6C, 0x56, 0xF4, 0xEA, 0x65, 0x7A, 0xAE, 0x08,
0xBA, 0x78, 0x25, 0x2E, 0x1C, 0xA6, 0xB4, 0xC6, 0xE8, 0xDD, 0x74, 0x1F, 0x4B, 0xBD, 0x8B, 0x8A,
0x70, 0x3E, 0xB5, 0x66, 0x48, 0x03, 0xF6, 0x0E, 0x61, 0x35, 0x57, 0xB9, 0x86, 0xC1, 0x1D, 0x9E,
0xE1, 0xF8, 0x98, 0x11, 0x69, 0xD9, 0x8E, 0x94, 0x9B, 0x1E, 0x87, 0xE9, 0xCE, 0x55, 0x28, 0xDF,
0x8C, 0xA1, 0x89, 0x0D, 0xBF, 0xE6, 0x42, 0x68, 0x41, 0x99, 0x2D, 0x0F, 0xB0, 0x54, 0xBB, 0x16,
)
inv_s_box = (
0x52, 0x09, 0x6A, 0xD5, 0x30, 0x36, 0xA5, 0x38, 0xBF, 0x40, 0xA3, 0x9E, 0x81, 0xF3, 0xD7, 0xFB,
0x7C, 0xE3, 0x39, 0x82, 0x9B, 0x2F, 0xFF, 0x87, 0x34, 0x8E, 0x43, 0x44, 0xC4, 0xDE, 0xE9, 0xCB,
0x54, 0x7B, 0x94, 0x32, 0xA6, 0xC2, 0x23, 0x3D, 0xEE, 0x4C, 0x95, 0x0B, 0x42, 0xFA, 0xC3, 0x4E,
0x08, 0x2E, 0xA1, 0x66, 0x28, 0xD9, 0x24, 0xB2, 0x76, 0x5B, 0xA2, 0x49, 0x6D, 0x8B, 0xD1, 0x25,
0x72, 0xF8, 0xF6, 0x64, 0x86, 0x68, 0x98, 0x16, 0xD4, 0xA4, 0x5C, 0xCC, 0x5D, 0x65, 0xB6, 0x92,
0x6C, 0x70, 0x48, 0x50, 0xFD, 0xED, 0xB9, 0xDA, 0x5E, 0x15, 0x46, 0x57, 0xA7, 0x8D, 0x9D, 0x84,
0x90, 0xD8, 0xAB, 0x00, 0x8C, 0xBC, 0xD3, 0x0A, 0xF7, 0xE4, 0x58, 0x05, 0xB8, 0xB3, 0x45, 0x06,
0xD0, 0x2C, 0x1E, 0x8F, 0xCA, 0x3F, 0x0F, 0x02, 0xC1, 0xAF, 0xBD, 0x03, 0x01, 0x13, 0x8A, 0x6B,
0x3A, 0x91, 0x11, 0x41, 0x4F, 0x67, 0xDC, 0xEA, 0x97, 0xF2, 0xCF, 0xCE, 0xF0, 0xB4, 0xE6, 0x73,
0x96, 0xAC, 0x74, 0x22, 0xE7, 0xAD, 0x35, 0x85, 0xE2, 0xF9, 0x37, 0xE8, 0x1C, 0x75, 0xDF, 0x6E,
0x47, 0xF1, 0x1A, 0x71, 0x1D, 0x29, 0xC5, 0x89, 0x6F, 0xB7, 0x62, 0x0E, 0xAA, 0x18, 0xBE, 0x1B,
0xFC, 0x56, 0x3E, 0x4B, 0xC6, 0xD2, 0x79, 0x20, 0x9A, 0xDB, 0xC0, 0xFE, 0x78, 0xCD, 0x5A, 0xF4,
0x1F, 0xDD, 0xA8, 0x33, 0x88, 0x07, 0xC7, 0x31, 0xB1, 0x12, 0x10, 0x59, 0x27, 0x80, 0xEC, 0x5F,
0x60, 0x51, 0x7F, 0xA9, 0x19, 0xB5, 0x4A, 0x0D, 0x2D, 0xE5, 0x7A, 0x9F, 0x93, 0xC9, 0x9C, 0xEF,
0xA0, 0xE0, 0x3B, 0x4D, 0xAE, 0x2A, 0xF5, 0xB0, 0xC8, 0xEB, 0xBB, 0x3C, 0x83, 0x53, 0x99, 0x61,
0x17, 0x2B, 0x04, 0x7E, 0xBA, 0x77, 0xD6, 0x26, 0xE1, 0x69, 0x14, 0x63, 0x55, 0x21, 0x0C, 0x7D,
)
## AES AddRoundKey
# Round constants https://en.wikipedia.org/wiki/AES_key_schedule#Round_constants
r_con = (
0x00, 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40,
0x80, 0x1B, 0x36, 0x6C, 0xD8, 0xAB, 0x4D, 0x9A,
0x2F, 0x5E, 0xBC, 0x63, 0xC6, 0x97, 0x35, 0x6A,
0xD4, 0xB3, 0x7D, 0xFA, 0xEF, 0xC5, 0x91, 0x39,
)
def add_round_key(s, k):
for i in range(4):
for j in range(4):
s[i][j] ^= k[i][j]
## AES SubBytes
def sub_bytes(s):
for i in range(4):
for j in range(4):
s[i][j] = s_box[s[i][j]]
def inv_sub_bytes(s):
for i in range(4):
for j in range(4):
s[i][j] = inv_s_box[s[i][j]]
## AES ShiftRows
def shift_rows(s):
s[0][1], s[1][1], s[2][1], s[3][1] = s[1][1], s[2][1], s[3][1], s[0][1]
s[0][2], s[1][2], s[2][2], s[3][2] = s[2][2], s[3][2], s[0][2], s[1][2]
s[0][3], s[1][3], s[2][3], s[3][3] = s[3][3], s[0][3], s[1][3], s[2][3]
def inv_shift_rows(s):
s[0][1], s[1][1], s[2][1], s[3][1] = s[3][1], s[0][1], s[1][1], s[2][1]
s[0][2], s[1][2], s[2][2], s[3][2] = s[2][2], s[3][2], s[0][2], s[1][2]
s[0][3], s[1][3], s[2][3], s[3][3] = s[1][3], s[2][3], s[3][3], s[0][3]
## AES MixColumns
# learned from http://cs.ucsb.edu/~koc/cs178/projects/JT/aes.c
xtime = lambda a: (((a << 1) ^ 0x1B) & 0xFF) if (a & 0x80) else (a << 1)
def mix_single_column(a):
# see Sec 4.1.2 in The Design of Rijndael
t = a[0] ^ a[1] ^ a[2] ^ a[3]
u = a[0]
a[0] ^= t ^ xtime(a[0] ^ a[1])
a[1] ^= t ^ xtime(a[1] ^ a[2])
a[2] ^= t ^ xtime(a[2] ^ a[3])
a[3] ^= t ^ xtime(a[3] ^ u)
def mix_columns(s):
for i in range(4):
mix_single_column(s[i])
def inv_mix_columns(s):
# see Sec 4.1.3 in The Design of Rijndael
for i in range(4):
u = xtime(xtime(s[i][0] ^ s[i][2]))
v = xtime(xtime(s[i][1] ^ s[i][3]))
s[i][0] ^= u
s[i][1] ^= v
s[i][2] ^= u
s[i][3] ^= v
mix_columns(s)
## AES Bytes
def bytes2matrix(text):
""" Converts a 16-byte array into a 4x4 matrix. """
return [list(text[i:i+4]) for i in range(0, len(text), 4)]
def matrix2bytes(matrix):
""" Converts a 4x4 matrix into a 16-byte array. """
return bytes(sum(matrix, []))
def xor_bytes(a, b):
""" Returns a new byte array with the elements xor'ed. """
return bytes(i^j for i, j in zip(a, b))
def split_blocks(message, block_size=16, require_padding=True):
assert len(message) % block_size == 0 or not require_padding
return [message[i:i+16] for i in range(0, len(message), block_size)]

View File

@@ -0,0 +1,58 @@
# MIT License
#
# Copyright (c) 2015 Brian Warner and other contributors
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from . import eddsa
class BadSignatureError(Exception):
pass
SECRETKEYBYTES = 64
PUBLICKEYBYTES = 32
SIGNATUREKEYBYTES = 64
def publickey(seed32):
assert len(seed32) == 32
vk32 = eddsa.publickey(seed32)
return vk32, seed32+vk32
def sign(msg, skvk):
assert len(skvk) == 64
sk = skvk[:32]
vk = skvk[32:]
sig = eddsa.signature(msg, sk, vk)
return sig+msg
def open(sigmsg, vk):
assert len(vk) == 32
sig = sigmsg[:64]
msg = sigmsg[64:]
try:
valid = eddsa.checkvalid(sig, msg, vk)
except ValueError as e:
raise BadSignatureError(e)
except Exception as e:
if str(e) == "decoding point that is not on curve":
raise BadSignatureError(e)
raise
if not valid:
raise BadSignatureError()
return msg

View File

@@ -0,0 +1,368 @@
# MIT License
#
# Copyright (c) 2015 Brian Warner and other contributors
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import binascii, hashlib, itertools
Q = 2**255 - 19
L = 2**252 + 27742317777372353535851937790883648493
def inv(x):
return pow(x, Q-2, Q)
d = -121665 * inv(121666)
I = pow(2,(Q-1)//4,Q)
def xrecover(y):
xx = (y*y-1) * inv(d*y*y+1)
x = pow(xx,(Q+3)//8,Q)
if (x*x - xx) % Q != 0: x = (x*I) % Q
if x % 2 != 0: x = Q-x
return x
By = 4 * inv(5)
Bx = xrecover(By)
B = [Bx % Q,By % Q]
# Extended Coordinates: x=X/Z, y=Y/Z, x*y=T/Z
# http://www.hyperelliptic.org/EFD/g1p/auto-twisted-extended-1.html
def xform_affine_to_extended(pt):
(x, y) = pt
return (x%Q, y%Q, 1, (x*y)%Q) # (X,Y,Z,T)
def xform_extended_to_affine(pt):
(x, y, z, _) = pt
return ((x*inv(z))%Q, (y*inv(z))%Q)
def double_element(pt): # extended->extended
# dbl-2008-hwcd
(X1, Y1, Z1, _) = pt
A = (X1*X1)
B = (Y1*Y1)
C = (2*Z1*Z1)
D = (-A) % Q
J = (X1+Y1) % Q
E = (J*J-A-B) % Q
G = (D+B) % Q
F = (G-C) % Q
H = (D-B) % Q
X3 = (E*F) % Q
Y3 = (G*H) % Q
Z3 = (F*G) % Q
T3 = (E*H) % Q
return (X3, Y3, Z3, T3)
def add_elements(pt1, pt2): # extended->extended
# add-2008-hwcd-3 . Slightly slower than add-2008-hwcd-4, but -3 is
# unified, so it's safe for general-purpose addition
(X1, Y1, Z1, T1) = pt1
(X2, Y2, Z2, T2) = pt2
A = ((Y1-X1)*(Y2-X2)) % Q
B = ((Y1+X1)*(Y2+X2)) % Q
C = T1*(2*d)*T2 % Q
D = Z1*2*Z2 % Q
E = (B-A) % Q
F = (D-C) % Q
G = (D+C) % Q
H = (B+A) % Q
X3 = (E*F) % Q
Y3 = (G*H) % Q
T3 = (E*H) % Q
Z3 = (F*G) % Q
return (X3, Y3, Z3, T3)
def scalarmult_element_safe_slow(pt, n):
# this form is slightly slower, but tolerates arbitrary points, including
# those which are not in the main 1*L subgroup. This includes points of
# order 1 (the neutral element Zero), 2, 4, and 8.
assert n >= 0
if n==0:
return xform_affine_to_extended((0,1))
_ = double_element(scalarmult_element_safe_slow(pt, n>>1))
return add_elements(_, pt) if n&1 else _
def _add_elements_nonunfied(pt1, pt2): # extended->extended
# add-2008-hwcd-4 : NOT unified, only for pt1!=pt2. About 10% faster than
# the (unified) add-2008-hwcd-3, and safe to use inside scalarmult if you
# aren't using points of order 1/2/4/8
(X1, Y1, Z1, T1) = pt1
(X2, Y2, Z2, T2) = pt2
A = ((Y1-X1)*(Y2+X2)) % Q
B = ((Y1+X1)*(Y2-X2)) % Q
C = (Z1*2*T2) % Q
D = (T1*2*Z2) % Q
E = (D+C) % Q
F = (B-A) % Q
G = (B+A) % Q
H = (D-C) % Q
X3 = (E*F) % Q
Y3 = (G*H) % Q
Z3 = (F*G) % Q
T3 = (E*H) % Q
return (X3, Y3, Z3, T3)
def scalarmult_element(pt, n): # extended->extended
# This form only works properly when given points that are a member of
# the main 1*L subgroup. It will give incorrect answers when called with
# the points of order 1/2/4/8, including point Zero. (it will also work
# properly when given points of order 2*L/4*L/8*L)
assert n >= 0
if n==0:
return xform_affine_to_extended((0,1))
_ = double_element(scalarmult_element(pt, n>>1))
return _add_elements_nonunfied(_, pt) if n&1 else _
# points are encoded as 32-bytes little-endian, b255 is sign, b2b1b0 are 0
def encodepoint(P):
x = P[0]
y = P[1]
# MSB of output equals x.b0 (=x&1)
# rest of output is little-endian y
assert 0 <= y < (1<<255) # always < 0x7fff..ff
if x & 1:
y += 1<<255
return binascii.unhexlify("%064x" % y)[::-1]
def isoncurve(P):
x = P[0]
y = P[1]
return (-x*x + y*y - 1 - d*x*x*y*y) % Q == 0
class NotOnCurve(Exception):
pass
def decodepoint(s):
unclamped = int(binascii.hexlify(s[:32][::-1]), 16)
clamp = (1 << 255) - 1
y = unclamped & clamp # clear MSB
x = xrecover(y)
if bool(x & 1) != bool(unclamped & (1<<255)): x = Q-x
P = [x,y]
if not isoncurve(P): raise NotOnCurve("decoding point that is not on curve")
return P
# scalars are encoded as 32-bytes little-endian
def bytes_to_scalar(s):
assert len(s) == 32, len(s)
return int(binascii.hexlify(s[::-1]), 16)
def bytes_to_clamped_scalar(s):
# Ed25519 private keys clamp the scalar to ensure two things:
# 1: integer value is in L/2 .. L, to avoid small-logarithm
# non-wraparaound
# 2: low-order 3 bits are zero, so a small-subgroup attack won't learn
# any information
# set the top two bits to 01, and the bottom three to 000
a_unclamped = bytes_to_scalar(s)
AND_CLAMP = (1<<254) - 1 - 7
OR_CLAMP = (1<<254)
a_clamped = (a_unclamped & AND_CLAMP) | OR_CLAMP
return a_clamped
def random_scalar(entropy_f): # 0..L-1 inclusive
# reduce the bias to a safe level by generating 256 extra bits
oversized = int(binascii.hexlify(entropy_f(32+32)), 16)
return oversized % L
def password_to_scalar(pw):
oversized = hashlib.sha512(pw).digest()
return int(binascii.hexlify(oversized), 16) % L
def scalar_to_bytes(y):
y = y % L
assert 0 <= y < 2**256
return binascii.unhexlify("%064x" % y)[::-1]
# Elements, of various orders
def is_extended_zero(XYTZ):
# catch Zero
(X, Y, Z, T) = XYTZ
Y = Y % Q
Z = Z % Q
if X==0 and Y==Z and Y!=0:
return True
return False
class ElementOfUnknownGroup:
# This is used for points of order 2,4,8,2*L,4*L,8*L
def __init__(self, XYTZ):
assert isinstance(XYTZ, tuple)
assert len(XYTZ) == 4
self.XYTZ = XYTZ
def add(self, other):
if not isinstance(other, ElementOfUnknownGroup):
raise TypeError("elements can only be added to other elements")
sum_XYTZ = add_elements(self.XYTZ, other.XYTZ)
if is_extended_zero(sum_XYTZ):
return Zero
return ElementOfUnknownGroup(sum_XYTZ)
def scalarmult(self, s):
if isinstance(s, ElementOfUnknownGroup):
raise TypeError("elements cannot be multiplied together")
assert s >= 0
product = scalarmult_element_safe_slow(self.XYTZ, s)
return ElementOfUnknownGroup(product)
def to_bytes(self):
return encodepoint(xform_extended_to_affine(self.XYTZ))
def __eq__(self, other):
return self.to_bytes() == other.to_bytes()
def __ne__(self, other):
return not self == other
class Element(ElementOfUnknownGroup):
# this only holds elements in the main 1*L subgroup. It never holds Zero,
# or elements of order 1/2/4/8, or 2*L/4*L/8*L.
def add(self, other):
if not isinstance(other, ElementOfUnknownGroup):
raise TypeError("elements can only be added to other elements")
sum_element = ElementOfUnknownGroup.add(self, other)
if sum_element is Zero:
return sum_element
if isinstance(other, Element):
# adding two subgroup elements results in another subgroup
# element, or Zero, and we've already excluded Zero
return Element(sum_element.XYTZ)
# not necessarily a subgroup member, so assume not
return sum_element
def scalarmult(self, s):
if isinstance(s, ElementOfUnknownGroup):
raise TypeError("elements cannot be multiplied together")
# scalarmult of subgroup members can be done modulo the subgroup
# order, and using the faster non-unified function.
s = s % L
# scalarmult(s=0) gets you Zero
if s == 0:
return Zero
# scalarmult(s=1) gets you self, which is a subgroup member
# scalarmult(s<grouporder) gets you a different subgroup member
return Element(scalarmult_element(self.XYTZ, s))
# negation and subtraction only make sense for the main subgroup
def negate(self):
# slow. Prefer e.scalarmult(-pw) to e.scalarmult(pw).negate()
return Element(scalarmult_element(self.XYTZ, L-2))
def subtract(self, other):
return self.add(other.negate())
class _ZeroElement(ElementOfUnknownGroup):
def add(self, other):
return other # zero+anything = anything
def scalarmult(self, s):
return self # zero*anything = zero
def negate(self):
return self # -zero = zero
def subtract(self, other):
return self.add(other.negate())
Base = Element(xform_affine_to_extended(B))
Zero = _ZeroElement(xform_affine_to_extended((0,1))) # the neutral (identity) element
_zero_bytes = Zero.to_bytes()
def arbitrary_element(seed): # unknown DL
# TODO: if we don't need uniformity, maybe use just sha256 here?
hseed = hashlib.sha512(seed).digest()
y = int(binascii.hexlify(hseed), 16) % Q
# we try successive Y values until we find a valid point
for plus in itertools.count(0):
y_plus = (y + plus) % Q
x = xrecover(y_plus)
Pa = [x,y_plus] # no attempt to use both "positive" and "negative" X
# only about 50% of Y coordinates map to valid curve points (I think
# the other half give you points on the "twist").
if not isoncurve(Pa):
continue
P = ElementOfUnknownGroup(xform_affine_to_extended(Pa))
# even if the point is on our curve, it may not be in our particular
# (order=L) subgroup. The curve has order 8*L, so an arbitrary point
# could have order 1,2,4,8,1*L,2*L,4*L,8*L (everything which divides
# the group order).
# [I MAY BE COMPLETELY WRONG ABOUT THIS, but my brief statistical
# tests suggest it's not too far off] There are phi(x) points with
# order x, so:
# 1 element of order 1: [(x=0,y=1)=Zero]
# 1 element of order 2 [(x=0,y=-1)]
# 2 elements of order 4
# 4 elements of order 8
# L-1 elements of order L (including Base)
# L-1 elements of order 2*L
# 2*(L-1) elements of order 4*L
# 4*(L-1) elements of order 8*L
# So 50% of random points will have order 8*L, 25% will have order
# 4*L, 13% order 2*L, and 13% will have our desired order 1*L (and a
# vanishingly small fraction will have 1/2/4/8). If we multiply any
# of the 8*L points by 2, we're sure to get an 4*L point (and
# multiplying a 4*L point by 2 gives us a 2*L point, and so on).
# Multiplying a 1*L point by 2 gives us a different 1*L point. So
# multiplying by 8 gets us from almost any point into a uniform point
# on the correct 1*L subgroup.
P8 = P.scalarmult(8)
# if we got really unlucky and picked one of the 8 low-order points,
# multiplying by 8 will get us to the identity (Zero), which we check
# for explicitly.
if is_extended_zero(P8.XYTZ):
continue
# Test that we're finally in the right group. We want to scalarmult
# by L, and we want to *not* use the trick in Group.scalarmult()
# which does x%L, because that would bypass the check we care about.
# P is still an _ElementOfUnknownGroup, which doesn't use x%L because
# that's not correct for points outside the main group.
assert is_extended_zero(P8.scalarmult(L).XYTZ)
return Element(P8.XYTZ)
# never reached
def bytes_to_unknown_group_element(bytes):
# this accepts all elements, including Zero and wrong-subgroup ones
if bytes == _zero_bytes:
return Zero
XYTZ = xform_affine_to_extended(decodepoint(bytes))
return ElementOfUnknownGroup(XYTZ)
def bytes_to_element(bytes):
# this strictly only accepts elements in the right subgroup
P = bytes_to_unknown_group_element(bytes)
if P is Zero:
raise ValueError("element was Zero")
if not is_extended_zero(P.scalarmult(L).XYTZ):
raise ValueError("element is not in the right group")
# the point is in the expected 1*L subgroup, not in the 2/4/8 groups,
# or in the 2*L/4*L/8*L groups. Promote it to a correct-group Element.
return Element(P.XYTZ)

View File

@@ -0,0 +1,213 @@
# MIT License
#
# Copyright (c) 2015 Brian Warner and other contributors
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
import base64
from . import _ed25519
BadSignatureError = _ed25519.BadSignatureError
def create_keypair(entropy=os.urandom):
SEEDLEN = int(_ed25519.SECRETKEYBYTES/2)
assert SEEDLEN == 32
seed = entropy(SEEDLEN)
sk = SigningKey(seed)
vk = sk.get_verifying_key()
return sk, vk
class BadPrefixError(Exception):
pass
def remove_prefix(s_bytes, prefix):
assert(type(s_bytes) == type(prefix))
if s_bytes[:len(prefix)] != prefix:
raise BadPrefixError("did not see expected '%s' prefix" % (prefix,))
return s_bytes[len(prefix):]
def to_ascii(s_bytes, prefix="", encoding="base64"):
"""Return a version-prefixed ASCII representation of the given binary
string. 'encoding' indicates how to do the encoding, and can be one of:
* base64
* base32
* base16 (or hex)
This function handles bytes, not bits, so it does not append any trailing
'=' (unlike standard base64.b64encode). It also lowercases the base32
output.
'prefix' will be prepended to the encoded form, and is useful for
distinguishing the purpose and version of the binary string. E.g. you
could prepend 'pub0-' to a VerifyingKey string to allow the receiving
code to raise a useful error if someone pasted in a signature string by
mistake.
"""
assert isinstance(s_bytes, bytes)
if not isinstance(prefix, bytes):
prefix = prefix.encode('ascii')
if encoding == "base64":
s_ascii = base64.b64encode(s_bytes).decode('ascii').rstrip("=")
elif encoding == "base32":
s_ascii = base64.b32encode(s_bytes).decode('ascii').rstrip("=").lower()
elif encoding in ("base16", "hex"):
s_ascii = base64.b16encode(s_bytes).decode('ascii').lower()
else:
raise NotImplementedError
return prefix+s_ascii.encode('ascii')
def from_ascii(s_ascii, prefix="", encoding="base64"):
"""This is the opposite of to_ascii. It will throw BadPrefixError if
the prefix is not found.
"""
if isinstance(s_ascii, bytes):
s_ascii = s_ascii.decode('ascii')
if isinstance(prefix, bytes):
prefix = prefix.decode('ascii')
s_ascii = remove_prefix(s_ascii.strip(), prefix)
if encoding == "base64":
s_ascii += "="*((4 - len(s_ascii)%4)%4)
s_bytes = base64.b64decode(s_ascii)
elif encoding == "base32":
s_ascii += "="*((8 - len(s_ascii)%8)%8)
s_bytes = base64.b32decode(s_ascii.upper())
elif encoding in ("base16", "hex"):
s_bytes = base64.b16decode(s_ascii.upper())
else:
raise NotImplementedError
return s_bytes
class SigningKey(object):
# this can only be used to reconstruct a key created by create_keypair().
def __init__(self, sk_s, prefix="", encoding=None):
assert isinstance(sk_s, bytes)
if not isinstance(prefix, bytes):
prefix = prefix.encode('ascii')
sk_s = remove_prefix(sk_s, prefix)
if encoding is not None:
sk_s = from_ascii(sk_s, encoding=encoding)
if len(sk_s) == 32:
# create from seed
vk_s, sk_s = _ed25519.publickey(sk_s)
else:
if len(sk_s) != 32+32:
raise ValueError("SigningKey takes 32-byte seed or 64-byte string")
self.sk_s = sk_s # seed+pubkey
self.vk_s = sk_s[32:] # just pubkey
def to_bytes(self, prefix=""):
if not isinstance(prefix, bytes):
prefix = prefix.encode('ascii')
return prefix+self.sk_s
def to_ascii(self, prefix="", encoding=None):
assert encoding
if not isinstance(prefix, bytes):
prefix = prefix.encode('ascii')
return to_ascii(self.to_seed(), prefix, encoding)
def to_seed(self, prefix=""):
if not isinstance(prefix, bytes):
prefix = prefix.encode('ascii')
return prefix+self.sk_s[:32]
def __eq__(self, them):
if not isinstance(them, object): return False
return (them.__class__ == self.__class__
and them.sk_s == self.sk_s)
def get_verifying_key(self):
return VerifyingKey(self.vk_s)
def sign(self, msg, prefix="", encoding=None):
assert isinstance(msg, bytes)
if not isinstance(prefix, bytes):
prefix = prefix.encode('ascii')
sig_and_msg = _ed25519.sign(msg, self.sk_s)
# the response is R+S+msg
sig_R = sig_and_msg[0:32]
sig_S = sig_and_msg[32:64]
msg_out = sig_and_msg[64:]
sig_out = sig_R + sig_S
assert msg_out == msg
if encoding:
return to_ascii(sig_out, prefix, encoding)
return prefix+sig_out
class VerifyingKey(object):
def __init__(self, vk_s, prefix="", encoding=None):
if not isinstance(prefix, bytes):
prefix = prefix.encode('ascii')
if not isinstance(vk_s, bytes):
vk_s = vk_s.encode('ascii')
assert isinstance(vk_s, bytes)
vk_s = remove_prefix(vk_s, prefix)
if encoding is not None:
vk_s = from_ascii(vk_s, encoding=encoding)
assert len(vk_s) == 32
self.vk_s = vk_s
def to_bytes(self, prefix=""):
if not isinstance(prefix, bytes):
prefix = prefix.encode('ascii')
return prefix+self.vk_s
def to_ascii(self, prefix="", encoding=None):
assert encoding
if not isinstance(prefix, bytes):
prefix = prefix.encode('ascii')
return to_ascii(self.vk_s, prefix, encoding)
def __eq__(self, them):
if not isinstance(them, object): return False
return (them.__class__ == self.__class__
and them.vk_s == self.vk_s)
def verify(self, sig, msg, prefix="", encoding=None):
if not isinstance(sig, bytes):
sig = sig.encode('ascii')
if not isinstance(prefix, bytes):
prefix = prefix.encode('ascii')
assert isinstance(sig, bytes)
assert isinstance(msg, bytes)
if encoding:
sig = from_ascii(sig, prefix, encoding)
else:
sig = remove_prefix(sig, prefix)
assert len(sig) == 64
sig_R = sig[:32]
sig_S = sig[32:]
sig_and_msg = sig_R + sig_S + msg
# this might raise BadSignatureError
msg2 = _ed25519.open(sig_and_msg, self.vk_s)
assert msg2 == msg
def selftest():
message = b"crypto libraries should always test themselves at powerup"
sk = SigningKey(b"priv0-VIsfn5OFGa09Un2MR6Hm7BQ5++xhcQskU2OGXG8jSJl4cWLZrRrVcSN2gVYMGtZT+3354J5jfmqAcuRSD9KIyg",
prefix="priv0-", encoding="base64")
vk = VerifyingKey(b"pub0-eHFi2a0a1XEjdoFWDBrWU/t9+eCeY35qgHLkUg/SiMo",
prefix="pub0-", encoding="base64")
assert sk.get_verifying_key() == vk
sig = sk.sign(message, prefix="sig0-", encoding="base64")
assert sig == b"sig0-E/QrwtSF52x8+q0l4ahA7eJbRKc777ClKNg217Q0z4fiYMCdmAOI+rTLVkiFhX6k3D+wQQfKdJYMxaTUFfv1DQ", sig
vk.verify(sig, message, prefix="sig0-", encoding="base64")
selftest()

View File

@@ -0,0 +1,94 @@
# MIT License
#
# Copyright (c) 2015 Brian Warner and other contributors
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from RNS.Cryptography.Hashes import sha512
from .basic import (bytes_to_clamped_scalar,
bytes_to_scalar, scalar_to_bytes,
bytes_to_element, Base)
import hashlib, binascii
def H(m):
return sha512(m)
def publickey(seed):
# turn first half of SHA512(seed) into scalar, then into point
assert len(seed) == 32
a = bytes_to_clamped_scalar(H(seed)[:32])
A = Base.scalarmult(a)
return A.to_bytes()
def Hint(m):
h = H(m)
return int(binascii.hexlify(h[::-1]), 16)
def signature(m,sk,pk):
assert len(sk) == 32 # seed
assert len(pk) == 32
h = H(sk[:32])
a_bytes, inter = h[:32], h[32:]
a = bytes_to_clamped_scalar(a_bytes)
r = Hint(inter + m)
R = Base.scalarmult(r)
R_bytes = R.to_bytes()
S = r + Hint(R_bytes + pk + m) * a
return R_bytes + scalar_to_bytes(S)
def checkvalid(s, m, pk):
if len(s) != 64: raise Exception("signature length is wrong")
if len(pk) != 32: raise Exception("public-key length is wrong")
R = bytes_to_element(s[:32])
A = bytes_to_element(pk)
S = bytes_to_scalar(s[32:])
h = Hint(s[:32] + pk + m)
v1 = Base.scalarmult(S)
v2 = R.add(A.scalarmult(h))
return v1==v2
# wrappers
import os
def create_signing_key():
seed = os.urandom(32)
return seed
def create_verifying_key(signing_key):
return publickey(signing_key)
def sign(skbytes, msg):
"""Return just the signature, given the message and just the secret
key."""
if len(skbytes) != 32:
raise ValueError("Bad signing key length %d" % len(skbytes))
vkbytes = create_verifying_key(skbytes)
sig = signature(msg, skbytes, vkbytes)
return sig
def verify(vkbytes, sig, msg):
if len(vkbytes) != 32:
raise ValueError("Bad verifying key length %d" % len(vkbytes))
if len(sig) != 64:
raise ValueError("Bad signature length %d" % len(sig))
rc = checkvalid(sig, msg, vkbytes)
if not rc:
raise ValueError("rc != 0", rc)
return True