Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tests w/ sending outputs as subset of receiving #7

Open
wants to merge 14 commits into
base: silent-payments-bip
Choose a base branch
from
Open
457 changes: 457 additions & 0 deletions bip-0352.mediawiki

Large diffs are not rendered by default.

135 changes: 135 additions & 0 deletions bip-0352/bech32m.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright (c) 2017, 2020 Pieter Wuille
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

"""Reference implementation for Bech32/Bech32m and segwit addresses."""


from enum import Enum

class Encoding(Enum):
"""Enumeration type to list the various supported encodings."""
BECH32 = 1
BECH32M = 2

CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
BECH32M_CONST = 0x2bc830a3

def bech32_polymod(values):
"""Internal function that computes the Bech32 checksum."""
generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3]
chk = 1
for value in values:
top = chk >> 25
chk = (chk & 0x1ffffff) << 5 ^ value
for i in range(5):
chk ^= generator[i] if ((top >> i) & 1) else 0
return chk


def bech32_hrp_expand(hrp):
"""Expand the HRP into values for checksum computation."""
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]


def bech32_verify_checksum(hrp, data):
"""Verify a checksum given HRP and converted data characters."""
const = bech32_polymod(bech32_hrp_expand(hrp) + data)
if const == 1:
return Encoding.BECH32
if const == BECH32M_CONST:
return Encoding.BECH32M
return None

def bech32_create_checksum(hrp, data, spec):
"""Compute the checksum values given HRP and data."""
values = bech32_hrp_expand(hrp) + data
const = BECH32M_CONST if spec == Encoding.BECH32M else 1
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ const
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]


def bech32_encode(hrp, data, spec):
"""Compute a Bech32 string given HRP and data values."""
combined = data + bech32_create_checksum(hrp, data, spec)
return hrp + '1' + ''.join([CHARSET[d] for d in combined])

def bech32_decode(bech):
"""Validate a Bech32/Bech32m string, and determine HRP and data."""
if ((any(ord(x) < 33 or ord(x) > 126 for x in bech)) or
(bech.lower() != bech and bech.upper() != bech)):
return (None, None, None)
bech = bech.lower()
pos = bech.rfind('1')

# remove the requirement that bech32m be less than 90 chars
if pos < 1 or pos + 7 > len(bech):
return (None, None, None)
if not all(x in CHARSET for x in bech[pos+1:]):
return (None, None, None)
hrp = bech[:pos]
data = [CHARSET.find(x) for x in bech[pos+1:]]
spec = bech32_verify_checksum(hrp, data)
if spec is None:
return (None, None, None)
return (hrp, data[:-6], spec)

def convertbits(data, frombits, tobits, pad=True):
"""General power-of-2 base conversion."""
acc = 0
bits = 0
ret = []
maxv = (1 << tobits) - 1
max_acc = (1 << (frombits + tobits - 1)) - 1
for value in data:
if value < 0 or (value >> frombits):
return None
acc = ((acc << frombits) | value) & max_acc
bits += frombits
while bits >= tobits:
bits -= tobits
ret.append((acc >> bits) & maxv)
if pad:
if bits:
ret.append((acc << (tobits - bits)) & maxv)
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
return None
return ret


def decode(hrp, addr):
"""Decode a segwit address."""
hrpgot, data, spec = bech32_decode(addr)
if hrpgot != hrp:
return (None, None)
decoded = convertbits(data[1:], 5, 8, False)
if decoded is None or len(decoded) < 2:
return (None, None)
if data[0] > 16:
return (None, None)
return (data[0], decoded)


def encode(hrp, witver, witprog):
"""Encode a segwit address."""
spec = Encoding.BECH32 if witver == 0 else Encoding.BECH32M
ret = bech32_encode(hrp, [witver] + convertbits(witprog, 8, 5), spec)
if decode(hrp, ret) == (None, None):
return None
return ret
215 changes: 215 additions & 0 deletions bip-0352/reference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
#!/usr/bin/env python3

import hashlib
import json
import bip32
from typing import List, Tuple, Dict, Union

# local files
from bech32m import convertbits, bech32_encode, decode, Encoding
from secp256k1 import ECKey, ECPubKey


def sha256(s: Union[bytes, bytearray]) -> bytes:
return hashlib.sha256(s).digest()


def ser_uint32(u: int) -> bytes:
return u.to_bytes(4, 'big')


def hash_outpoints(outpoints: List[Tuple[str, int]]) -> bytes:

outpoints_sorted = sorted([
bytearray.fromhex(txid)[::-1] + n.to_bytes(4, 'little')
for txid, n in outpoints
])

s = hashlib.new('sha256')
for op in outpoints_sorted:
s.update(op)

return s.digest()


def derive_silent_payment_key_pair(seed: bytes) -> Tuple[ECKey, ECKey, ECPubKey, ECPubKey]:

SPEND_KEY="m/352h/0h/0h/0h/0"
SCAN_KEY="m/352h/0h/0h/1h/0"

master = bip32.BIP32.from_seed(seed)
scan = ECKey().set(master.get_privkey_from_path(SCAN_KEY))
spend = ECKey().set(master.get_privkey_from_path(SPEND_KEY))
Scan = scan.get_pubkey()
Spend = spend.get_pubkey()

return scan, spend, Scan, Spend


def encode_silent_payment_address(B_scan: ECPubKey, B_m: ECPubKey, hrp: str = "sp", version: int = 0) -> str:

data = convertbits(B_scan.get_bytes(False) + B_m.get_bytes(False), 8, 5)
return bech32_encode(hrp, [version] + data, Encoding.BECH32M)


def create_labeled_silent_payment_address(B_scan: ECPubKey, B_spend: ECPubKey, m: int, hrp: str = "sp", version: int = 0) -> str:

G = ECKey().set(1).get_pubkey()
B_m = B_spend + m * G
labeled_address = encode_silent_payment_address(B_scan, B_m, hrp, version)

return labeled_address


def decode_silent_payment_address(address: str, hrp: str = "sp") -> Tuple[ECPubKey, ECPubKey]:

version, data = decode(hrp, address)
B_scan = ECPubKey().set(data[:33])
B_spend = ECPubKey().set(data[33:])

return B_scan, B_spend


def create_outputs(input_priv_keys: List[Tuple[ECKey, bool]], outpoints_hash: bytes, recipients: List[Tuple[str, float]]) -> List[Dict[str, float]]:

G = ECKey().set(1).get_pubkey()
negated_keys = []
for key, is_xonly in input_priv_keys:
if is_xonly and key.get_pubkey().get_y()%2 != 0:
key.negate()
negated_keys.append(key)

a_sum = sum(negated_keys)
silent_payment_groups: Dict[ECPubKey, List[Tuple[ECPubKey, float]]] = {}
for recipient in recipients:
addr, amount = recipient
B_scan, B_m = decode_silent_payment_address(addr)
if B_scan in silent_payment_groups:
silent_payment_groups[B_scan].append((B_m, amount))
else:
silent_payment_groups[B_scan] = [(B_m, amount)]

outputs = []
for B_scan, B_m_values in silent_payment_groups.items():
n = 0
ecdh_shared_secret = outpoints_hash * a_sum * B_scan
for B_m, amount in B_m_values:
t_n = sha256(ecdh_shared_secret.get_bytes(False) + ser_uint32(n))
P_nm = B_m + t_n*G
outputs.append({P_nm.get_bytes().hex(): amount})
n += 1
n += 1
return outputs


def scanning(b_scan: ECKey, B_spend: ECPubKey, A_sum: ECPubKey, outpoints_hash: bytes, outputs_to_check: List[ECPubKey], labels: Dict[str, int] = None) -> List[Dict[str, str]]:

G = ECKey().set(1).get_pubkey()
ecdh_shared_secret = outpoints_hash * b_scan * A_sum
n = 0
keep_scanning = True
wallet = []
while True:
t_n = sha256(ecdh_shared_secret.get_bytes(False) + ser_uint32(n))
P_n = B_spend + t_n*G
for output in outputs_to_check:
if P_n == output:
wallet.append({"pub_key": P_n.get_bytes().hex(), "priv_key_tweak": t_n.hex()})
outputs_to_check.remove(output)
n += 1
break
elif labels:
m_G_sub = output - P_n
found = False
if (m_G_sub.get_bytes(False).hex() in labels):
P_nm = P_n + m_G_sub
m_G = m_G_sub
found = True
else:
output.negate()
m_G_sub = output - P_n
if (m_G_sub.get_bytes(False).hex() in labels):
P_nm = P_n + m_G_sub
m_G = m_G_sub
found = True
if found:
wallet.append({
"pub_key": P_nm.get_bytes().hex(),
"priv_key_tweak": (ECKey().set(t_n).add(labels[m_G.get_bytes(False).hex()])).get_bytes().hex()
})
outputs_to_check.remove(output)
n += 1
break
else:
break
return wallet


if __name__ == "__main__":

with open("send_and_receive_test_vectors.json", "r") as f:
test_data = json.loads(f.read())

for case in test_data:
print(case["comment"])
# Test sending
for sending_test in case["sending"]:

given = sending_test["given"]
expected = sending_test["expected"]
input_priv_keys = [(ECKey().set(bytes.fromhex(key)), is_xonly) for key, is_xonly in given["input_priv_keys"]]
outpoints_hash = hash_outpoints(given["outpoints"])
sending_outputs = create_outputs(input_priv_keys, outpoints_hash, given["recipients"])

# Check that for a given set of inputs, we were able to generate the expected outputs for the receiver
assert sending_outputs == expected["outputs"], "Sending test failed"

# Test receiving
msg = sha256(b'message')
aux = sha256(b'random auxiliary data')
for receiving_test in case["receiving"]:
given = receiving_test["given"]
expected = receiving_test["expected"]
outputs_to_check = [ECPubKey().set(bytes.fromhex(p)) for p in given["outputs"]]
sending_output_pub_keys = [ECPubKey().set(bytes.fromhex(next(iter(p)))) for p in sending_outputs]

# Check that the given inputs for the receiving test match what was generated during the sending test
assert set(sending_output_pub_keys).issubset(set(outputs_to_check)), "Receiving test does not match sending test"

receiving_addresses = []
b_scan, b_spend, B_scan, B_spend = derive_silent_payment_key_pair(bytes(given["bip32_seed"], "utf-8"))
receiving_addresses.append(encode_silent_payment_address(B_scan, B_spend))
if given["labels"]:
for _, v in given["labels"].items():
receiving_addresses.append(create_labeled_silent_payment_address(B_scan, B_spend, m=v))

# Check that the silent payment addresses match for the given BIP32 seed and labels dictionary
assert receiving_addresses == expected["addresses"], "Receiving addresses don't match"

outpoints_hash = hash_outpoints(given["outpoints"])
input_pub_keys = [ECPubKey().set(bytes.fromhex(key)) for key in given["input_pub_keys"]]
A_sum = sum(input_pub_keys)
add_to_wallet = scanning(
b_scan=b_scan,
B_spend=B_spend,
A_sum=A_sum,
outpoints_hash=outpoints_hash,
outputs_to_check=outputs_to_check,
labels=given["labels"],
)

# Check that the private key is correct for the found output public key
for output in add_to_wallet:
pub_key = ECPubKey().set(bytes.fromhex(output['pub_key']))
full_private_key = b_spend.add(bytes.fromhex(output['priv_key_tweak']))
if full_private_key.get_pubkey().get_y()%2 != 0:
full_private_key.negate()

sig = full_private_key.sign_schnorr(msg, aux)
assert pub_key.verify_schnorr(sig, msg), f"Invalid signature for {pub_key}"
output["signature"] = sig.hex()

# Check if the found output public keys match the expected output public keys
assert add_to_wallet == expected["outputs"], "Receiving test failed"

print("All tests passed")
Binary file added bip-0352/scan_data_downloader_per_month.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading