Source code for pvss.pvss

Implementation of PVSS algorithms.

from __future__ import annotations

from abc import ABC, abstractmethod
from import Iterable, Iterator, Sequence
from fractions import Fraction
from functools import cached_property, reduce
from hashlib import sha256
from itertools import zip_longest
from operator import mul
from typing import Any, Optional, TypeVar, Union, cast, get_type_hints, overload

from asn1crypto.core import Asn1Value, Integer, OctetString, SequenceOf

from . import asn1 as _asn1
from .groups import ImageGroup, ImageValue, PreGroup, PreGroupValue

def zip_strict(*args: Iterable[Any]) -> Iterator[Any]:
    Like zip, but raise an exception unless all iterables have the same size

    sentinel = object()
    for things in zip_longest(*args, fillvalue=sentinel):
        if any(thing is sentinel for thing in things):
            raise ValueError("Not all iters finished at the same time")
        yield things

_T0 = TypeVar("_T0")

def prod(items: Iterable[_T0], initializer: Optional[_T0] = None) -> _T0:
    Product function, like sum() but with multiplication.

        items: items to multiply
        initializer: Optional initializer


    if initializer is None:
        return reduce(mul, items)
        return reduce(mul, items, initializer)

_T1 = TypeVar("_T1", bound="Asn1Object")

class Asn1Object(ABC):
    Abstract base class for all other PVSS ASN.1 objects.

    # Reference to PVSS object that holds all ASN.1 objects.
    pvss: Pvss

    # ASN.1 type. Child classes set a more concrete type.
    asn1: Asn1Value

    def __init__(self, pvss: Pvss, asn1: Asn1Value) -> None:
        self.pvss = pvss
        self.asn1 = asn1

    def from_der(cls: type[_T1], pvss: Pvss, data: bytes) -> _T1:
        return cls(pvss, get_type_hints(cls)["asn1"].load(data))

    def der(self) -> bytes:
        return self.asn1.dump()

    def params(self) -> SystemParameters:
        return self.pvss.params

    def _validate(self) -> None:

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, type(self)):
            return False

        return self.der == other.der

_T2 = TypeVar("_T2", bound="SystemParameters")

class SystemParameters(Asn1Object):
    Base class for system parameter object.
    Holds the group descriptions and generators.

    # Set by child class to denote one of the registered group algorithms.
    ALGO: str

    asn1: _asn1.SystemParameters

    def __new__(cls, pvss: Pvss, asn1: _asn1.SystemParameters) -> SystemParameters:
        algo = asn1["algorithm"].native
        impl: type[SystemParameters]
        if algo == "qr_mod_p":
            from . import qr

            impl = qr.QrParameters
        elif algo == "ristretto_255":
            from . import ristretto_255

            impl = ristretto_255.Ristretto255Parameters
            raise ValueError(f"Algorithm {algo} not implemented")
        return super().__new__(impl)

    def create(cls: type[_T2], pvss: Pvss, params: Any) -> _T2:
        return cls(pvss, _asn1.SystemParameters({"algorithm": cls.ALGO, "parameters": params}))

    def pre_group(self) -> PreGroup:

    def g(self) -> tuple[ImageValue, ImageValue]:
        return self._make_gen("g_0"), self._make_gen("g_1")

    def G(self) -> tuple[ImageValue, ImageValue]:
        return self._make_gen("G_0"), self._make_gen("G_1")

    def _validate(self) -> None:
        # Load groups to validate them

    def img_group(self) -> ImageGroup:

    def _make_gen(self, seed: str) -> ImageValue:

class PrivateKey(Asn1Object):
    Private user or receiver key.

    asn1: _asn1.PrivateKey

    def _validate(self) -> None:

    def create(cls, pvss: Pvss, priv: PreGroupValue) -> PrivateKey:
        return cls(pvss, _asn1.PrivateKey({"priv": priv.asn1}))

    def create_random(cls, pvss: Pvss) -> PrivateKey:
        return cls.create(pvss, pvss.params.pre_group.rand_nonzero)

    def priv(self) -> PreGroupValue:
        return self.pvss.params.pre_group(self.asn1["priv"])

    def pub(self, name: str) -> PublicKey:
        return PublicKey.create(
            self.pvss, name, (self.params.G[0] ** self.priv, self.params.G[1] ** self.priv)

class PublicKey(Asn1Object):
    Public user or receiver key.

    asn1: _asn1.PublicKey

    def _validate(self) -> None:
        if not
            raise ValueError()

    def create(cls, pvss: Pvss, name: str, pub: tuple[ImageValue, ImageValue]) -> PublicKey:
        return cls(
            pvss, _asn1.PublicKey({"name": str(name), "pub0": pub[0].asn1, "pub1": pub[1].asn1})

    def name(self) -> str:
        return str(self.asn1["name"])

    def pub(self) -> tuple[ImageValue, ImageValue]:
        return (

class Secret(Asn1Object):
    Secret that is protected through PVSS.
    The DER encoding of this object can be used to protect some actual payload.

    asn1: _asn1.Secret

    def create(cls, pvss: Pvss, secret: ImageValue) -> Secret:
        return cls(pvss, _asn1.Secret({"secret": secret.asn1}))

    def secret(self) -> ImageValue:
        return self.params.img_group(self.asn1["secret"])

    def _validate(self) -> None:

    def reconstruct(cls, pvss: Pvss, private_key: PrivateKey) -> Secret:
        Decrypt the re-encrypted shares with the private key and reconstruct the secret

            pvss: Pvss object with public values
            der_private_key: Receiver's Private key


        if len(pvss.reencrypted_shares) < len(pvss.shares.coefficients):
            raise Exception(
                f"Need at least {len(pvss.shares.coefficients)} shares, only got {len(pvss.reencrypted_shares)}"

        shares = {
            reenc_share.idx: reenc_share.elg_b * ((reenc_share.elg_a ** private_key.priv) ** -1)
            for reenc_share in pvss.reencrypted_shares

        secret = prod(
            ** prod(
                (Fraction(idx1, idx1 - idx0) for idx1 in shares.keys() if idx0 != idx1),
            for idx0, share0 in shares.items()

        return cls.create(pvss, secret)

class Share(Asn1Object):
    asn1: _asn1.Share

    def create(
        pvss: Pvss,
        pub_name: str,
        share: ImageValue,
        resp: tuple[PreGroupValue, PreGroupValue],
    ) -> Share:
        return cls(
                    "pub": pub_name,
                    "share": share.asn1,
                    "response_f0": resp[0].asn1,
                    "response_f1": resp[1].asn1,

    def pub_name(self) -> str:
        return cast(str, self.asn1["pub"].native)

    def pub(self) -> PublicKey:

        return self.pvss.user_public_keys[self.pub_name]

    def share(self) -> ImageValue:
        return self.params.img_group(self.asn1["share"])

    def resp(self) -> tuple[PreGroupValue, PreGroupValue]:
        return (

    def _validate(self) -> None:
        # XXX check if pubname in pvss

class SharedSecret(Asn1Object):
    All shares of a shared secret, along with Zero-Knowledge proof.

    asn1: _asn1.SharedSecret

    def create(
        pvss: Pvss,
        shares: Iterable[Share],
        coeffs: Iterable[ImageValue],
        challenge: bytes,
    ) -> SharedSecret:
        return cls(
                    "shares": [share.asn1 for share in shares],
                    "coefficients": [coeff.asn1 for coeff in coeffs],
                    "challenge": bytes(challenge),

    def shares(self) -> list[Share]:
        return [Share(self.pvss, share) for share in cast(SequenceOf, self.asn1["shares"])]

    def coefficients(self) -> list[ImageValue]:
        return [
            for coeff in cast(SequenceOf, self.asn1["coefficients"])

    def digest(self) -> bytes:
        return bytes(cast(OctetString, self.asn1["challenge"]))

    def challenge(self) -> PreGroupValue:
        return self.pvss.params.pre_group(int.from_bytes(self.digest, "big"))

    def qualified_size(self) -> int:
        return len(self.coefficients)

    def _validate(self) -> None:
        X = [
            prod(c ** (i ** j) for j, c in enumerate(self.coefficients))
            for i in range(1, len(self.shares) + 1)

        min_c = -self.challenge
        r = [
                (self.params.g[0] ** share.resp[0])
                * (self.params.g[1] ** share.resp[1])
                * (xi ** min_c),
                ([0] ** share.resp[0])
                * ([1] ** share.resp[1])
                * (share.share ** min_c),
            for xi, share in zip(X, self.shares)

        challenge = SharesChallenge.create(
            pubs=[ for share in self.shares],
            shares=[share.share for share in self.shares],

        if challenge.digest != self.digest:
            raise ValueError("Verification failed: could not compute same challenge")

    def create_shared_secret(
        cls, pvss: Pvss, qualified_size: int
    ) -> tuple[Secret, SharedSecret]:
        Create a secret, split it and compute the encrypted shares.

            pvss: Pvss object with public values
            qualified_size: Number of shares required to reconstruct the secret

            Random secret and the encrypted shares

        # user public keys
        pub = pvss.user_public_keys.values()

        # polynomials, chosen from Z_q
        alpha = (
                (pvss.params.pre_group.rand for __ in range(qualified_size)),
                (pvss.params.pre_group.rand for __ in range(qualified_size)),

        # secret to be split
        S = Secret.create(
            pvss, (pvss.params.G[0] ** alpha[0](0)) * (pvss.params.G[1] ** alpha[1](0))

        # commitments for coeffs
        C = [
            (pvss.params.g[0] ** coeff[0]) * (pvss.params.g[1] ** coeff[1])
            for coeff in zip(alpha[0], alpha[1])

        # encrypted shares
        Y = [
            ([0] ** alpha[0](i)) * ([1] ** alpha[1](i))
            for i, pi in enumerate(pub, 1)

        # X_i computed by prover
        X = [
            (pvss.params.g[0] ** alpha[0](i)) * (pvss.params.g[1] ** alpha[1](i))
            for i in range(1, len(pub) + 1)

        # rand [0,q)
        k = [(pvss.params.pre_group.rand, pvss.params.pre_group.rand) for __ in range(len(pub))]

        # random commitments
        r = [
                (pvss.params.g[0] ** ki[0]) * (pvss.params.g[1] ** ki[1]),
                ([0] ** ki[0]) * ([1] ** ki[1]),
            for pi, ki in zip(pub, k)

        # challenge is computed by hash function
        challenge = SharesChallenge.create(pvss, pub, C, X, Y, r)
        c = challenge.challenge

        # response
        s = [(ki[0] + alpha[0](i) * c, ki[1] + alpha[1](i) * c) for i, ki in enumerate(k, 1)]

        shared_secret = SharedSecret.create(
                Share.create(pvss=pvss,, share=share, resp=resp)
                for pub_key, share, resp in zip(pub, Y, s)

        return S, shared_secret

class ReencryptedShare(Asn1Object):
    Secret Share after reencryption.

    asn1: _asn1.ReencryptedShare

    def create(
        pvss: Pvss,
        idx: int,
        elg_a: ImageValue,
        elg_b: ImageValue,
        response_priv: PreGroupValue,
        response_v: tuple[PreGroupValue, PreGroupValue],
        response_w: tuple[PreGroupValue, PreGroupValue],
        challenge: bytes,
    ) -> ReencryptedShare:
        return cls(
                    "idx": idx,
                    "elg_a": elg_a.asn1,
                    "elg_b": elg_b.asn1,
                    "response_priv": response_priv.asn1,
                    "response_v0": response_v[0].asn1,
                    "response_v1": response_v[1].asn1,
                    "response_w0": response_w[0].asn1,
                    "response_w1": response_w[1].asn1,
                    "challenge": bytes(challenge),

    def idx(self) -> int:
        return int(cast(Integer, self.asn1["idx"]))

    def elg_a(self) -> ImageValue:
        return self.params.img_group(self.asn1["elg_a"])

    def elg_b(self) -> ImageValue:
        return self.params.img_group(self.asn1["elg_b"])

    def response_priv(self) -> PreGroupValue:
        return self.params.pre_group(self.asn1["response_priv"])

    def response_v(self) -> tuple[PreGroupValue, PreGroupValue]:
        return (

    def response_w(self) -> tuple[PreGroupValue, PreGroupValue]:
        return (

    def digest(self) -> bytes:
        return bytes(cast(OctetString, self.asn1["challenge"]))

    def challenge(self) -> PreGroupValue:
        return self.pvss.params.pre_group(int.from_bytes(self.digest, "big"))

    def share(self) -> Share:
        return self.pvss.shares.shares[self.idx - 1]

    def _validate(self) -> None:
        Verify Zero-Knowledge proof of the ReencryptedShare.

            ValueError: If verification failed.

        # Compute -c
        minus_c = -self.challenge

        # Grab public key of share's sender.
        pub =

        # Compute commitment for public key.
        rand_pub = ((self.params.G[0] * self.params.G[1]) ** self.response_priv) * (
            ([0] *[1]) ** minus_c

        # Compute commitment for share.
        rand_share = (
            (self.elg_b ** self.response_priv)
            * ([0] ** self.response_v[0])
            * ([1] ** self.response_v[1])
        ) * (self.pvss.shares.shares[self.idx - 1].share ** minus_c)

        # Compute commitment for ElGamal value.
        rand_elg_a = (
            (self.params.G[0] ** self.response_w[0]) * (self.params.G[1] ** self.response_w[1])
        ) * (self.elg_a ** minus_c)

        # Compute commitment for Identity.
        rand_id = (
            (self.elg_a ** self.response_priv)
            * (self.params.G[0] ** self.response_v[0])
            * (self.params.G[1] ** self.response_v[1])

        # Compute challenge for Zero-Knowledge proof.
        challenge = ReencryptedChallenge.create(
            self.pvss, rand_pub, rand_share, rand_elg_a, rand_id

        # Verify that the digests match.
        if challenge.digest != self.digest:
            raise ValueError("Verification failed: could not compute same challenge")

    def reencrypt(cls, pvss: Pvss, private_key: PrivateKey) -> ReencryptedShare:
        Decrypt a share of the encrypted secret with the private_key and
        re-encrypt it with another public key

            pvss: Pvss object with public values
            private_key: A user's private key

            Re-encrypted share

        # Locate our share.
        for idx, enc_share in enumerate(pvss.shares.shares, 1):
            if ==
            raise ValueError("No matching public key found")

        # Decrypt our share.
        share = enc_share.share ** private_key.priv.inv

        # Choose random values.
        w = [pvss.params.pre_group.rand, pvss.params.pre_group.rand]

        # Reencrypt share with ElGamal encryption using the receiver's public key.
        elg_a = (pvss.params.G[0] ** w[0]) * (pvss.params.G[1] ** w[1])
        elg_b = (
            * ([0] ** w[0])
            * ([1] ** w[1])

        # Compute helper variables.
        v = [-w[0] * private_key.priv, -w[1] * private_key.priv]

        # Choose random pre-group values for the commitments.
        kpi = pvss.params.pre_group.rand
        kv = (pvss.params.pre_group.rand, pvss.params.pre_group.rand)
        kw = (pvss.params.pre_group.rand, pvss.params.pre_group.rand)

        # Compute commitment for public key.
        rand_pub = (pvss.params.G[0] * pvss.params.G[1]) ** kpi

        # Compute commitment for share.
        rand_share = (
            (elg_b ** kpi)
            * ([0] ** kv[0])
            * ([1] ** kv[1])

        # Compute commitment for ElGamal value.
        rand_elg_a = (pvss.params.G[0] ** kw[0]) * (pvss.params.G[1] ** kw[1])

        # Compute commitment for Identity.
        rand_id = (elg_a ** kpi) * (pvss.params.G[0] ** kv[0]) * (pvss.params.G[1] ** kv[1])

        # Compute challenge for Zero-Knowledge proof.
        challenge = ReencryptedChallenge.create(pvss, rand_pub, rand_share, rand_elg_a, rand_id)
        c = challenge.challenge

        # Compute responses for Zero-Knowledge proof.
        resp_priv = kpi + private_key.priv * c
        resp_v = (kv[0] + v[0] * c, kv[1] + v[1] * c)
        resp_w = (kw[0] + w[0] * c, kw[1] + w[1] * c)

        # Assemble the values for the result.
        return ReencryptedShare.create(
            pvss, idx, elg_a, elg_b, resp_priv, resp_v, resp_w, challenge.digest

class Challenge(Asn1Object):
    def _validate(self) -> None:
        Noop because this is never loaded
        # XXX is still loaded in constructor. What to do?!

    def digest(self) -> bytes:
        Compute the sha256 digest over the DER encoding
        return sha256(self.der).digest()

    def challenge(self) -> PreGroupValue:
        Convert the digest into a pre-group element so it can be used in the algorithms
        return self.pvss.params.pre_group(int.from_bytes(self.digest, "big"))

class SharesChallenge(Challenge):
    Zero-Knowledge challenge for "shared secret" message.

    asn1: _asn1.SharesChallenge

    def create(
        pvss: Pvss,
        pubs: Iterable[PublicKey],
        coeffs: Iterable[ImageValue],
        commitments: Iterable[ImageValue],
        shares: Iterable[ImageValue],
        randoms: Iterable[tuple[ImageValue, ImageValue]],
    ) -> SharesChallenge:
        return cls(
                    "parameters": pvss.params.asn1,
                    "coefficients": [coeff.asn1 for coeff in coeffs],
                    "users": [
                            "pub": pub.asn1,
                            "commitment": commitment.asn1,
                            "random_commitment": rand_commit.asn1,
                            "share": share.asn1,
                            "random_share": rand_share.asn1,
                        for pub, commitment, share, (rand_commit, rand_share) in zip_strict(
                            pubs, commitments, shares, randoms

class ReencryptedChallenge(Challenge):
    Zero-Knowledge challenge for "reencrypted share" message.

    asn1: _asn1.ReencryptedChallenge

    def create(
        pvss: Pvss,
        rand_pub: ImageValue,
        rand_share: ImageValue,
        rand_elg_a: ImageValue,
        rand_id: ImageValue,
    ) -> ReencryptedChallenge:
        return cls(
                    "parameters": pvss.params.asn1,
                    "public_keys": [ for share in pvss.shares.shares],
                    "shares": pvss.shares.asn1,
                    "receiver_public_key": pvss.receiver_public_key.asn1,
                    "rand_pub": rand_pub.asn1,
                    "rand_share": rand_share.asn1,
                    "rand_elg_a": rand_elg_a.asn1,
                    "rand_id": rand_id.asn1,

[docs]class Pvss: """ Main class to work with Pvss. Stores all public messages and exposes the PVSS operations. """ _params: SystemParameters _user_public_keys: dict[str, PublicKey] _shares: SharedSecret _reencrypted_shares: list[ReencryptedShare] _receiver_public_key: PublicKey def __init__(self) -> None: """ The constructor takes no parameters. """ self._user_public_keys = {} self._reencrypted_shares = [] @property def params(self) -> SystemParameters: """ Retrieve system parameters. Returns: The system parameters. """ return self._params
[docs] def set_params(self, data: bytes) -> SystemParameters: """ Set system parameters. Args data: DER encoded system parameters. Returns: Decoded system parameters. Raises: Exception: If already set. """ params = SystemParameters.from_der(self, data) if hasattr(self, "_params"): raise Exception("Parameters already set") self._params = params return params
@property def user_public_keys(self) -> dict[str, PublicKey]: """ Retrieve all user public keys, as mapping from username to PublicKey. Returns: Mapping of username to PublicKey. """ return dict(self._user_public_keys)
[docs] def add_user_public_key(self, data: bytes) -> PublicKey: """ Add a user public key to the internal state. Args: data: DER encoded public key Returns: Decoded user public key. Raises: ValueError: On duplicate name or public key value """ pub_key = PublicKey.from_der(self, data) for pub in self._user_public_keys.values(): if == raise ValueError(f"Duplicate name: {}") if == raise ValueError( f"Duplicate public key value in keys {} and {}" ) self._user_public_keys[] = pub_key return pub_key
@property def shares(self) -> SharedSecret: """ Retrieve the shares of the secret. Returns: Shares of the secret. """ return self._shares
[docs] def set_shares(self, data: bytes) -> SharedSecret: """ Set the shares of the secret. Args: data: DER encoded secret shares. Returns: Decoded secret shares. Raises: Exception: If already set. """ shares = SharedSecret.from_der(self, data) if hasattr(self, "_shares"): raise Exception("Shares already set") self._shares = shares return shares
@property def reencrypted_shares(self) -> list[ReencryptedShare]: """ Retrieve the list of reencrypted shares. Returns: List of reencrypted shares. """ return self._reencrypted_shares
[docs] def add_reencrypted_share(self, data: bytes) -> ReencryptedShare: """ Add a re-encrypted share to the internal state. Args: data: DER encoded re-encrypted share. Returns: Decoded reencrypted share. Raises: ValueError: On duplicate """ reenc_share = ReencryptedShare.from_der(self, data) if reenc_share.idx in {s.idx for s in self._reencrypted_shares}: raise ValueError(f"Duplicate index: {reenc_share.idx}") self._reencrypted_shares.append(reenc_share) return reenc_share
@property def receiver_public_key(self) -> PublicKey: """ Retrieve receiver's public key. Returns: Receiver's public key. """ return self._receiver_public_key
[docs] def set_receiver_public_key(self, data: bytes) -> PublicKey: """ Add the receiver's public key to the internal state. Args: data: DER encoded receiver's public key. Returns: Decoded receiver's public key. Raises: Exception: On duplicate """ pub = PublicKey.from_der(self, data) if hasattr(self, "_receiver_public_key"): raise Exception("Receiver key already set") self._receiver_public_key = pub return pub
[docs] def create_user_keypair(self, name: str) -> tuple[bytes, bytes]: """ Create a random key pair for a user. Args: name: Name of key; will be included in the public key. Returns: DER encoded private key and public key """ priv = PrivateKey.create_random(self) pub = self.add_user_public_key(pub) return priv.der, pub
[docs] def create_receiver_keypair(self, name: str) -> tuple[bytes, bytes]: """ Create a random key pair for the receiver. Args: name: Name of key; will be included in the public key. Returns: DER encoded private key and public key """ priv = PrivateKey.create_random(self) pub = self.set_receiver_public_key(pub) return priv.der, pub
[docs] def share_secret(self, qualified_size: int) -> tuple[bytes, bytes]: """ Create a secret, split it and compute the encrypted shares. Args: qualified_size: Number of shares required to reconstruct the secret Returns: DER encoded shared secret and the DER encoded encrypted shares """ secret, shares = SharedSecret.create_shared_secret(self, qualified_size) self.set_shares(shares.der) return secret.der, shares.der
[docs] def reencrypt_share(self, der_private_key: bytes) -> bytes: """ Decrypt a share of the encrypted secret with the private_key and re-encrypt it with another public key Args: der_private_key: A user's DER encoded private key Returns: DER encoded re-encrypted share """ private_key = PrivateKey.from_der(self, der_private_key) share = ReencryptedShare.reencrypt(self, private_key).der self.add_reencrypted_share(share) return share
[docs] def reconstruct_secret(self, der_private_key: bytes) -> bytes: """ Decrypt the re-encrypted shares with the private key and reconstruct the secret Args: der_private_key: Receiver's DER encoded private key Returns: DER encoded secret """ private_key = PrivateKey.from_der(self, der_private_key) return Secret.reconstruct(self, private_key).der
class Poly(Sequence[PreGroupValue]): """ Polynomial with random coefficients. """ _coeffs: list[PreGroupValue] _zero: PreGroupValue def __init__(self, coeffs: Iterable[PreGroupValue], zero: PreGroupValue) -> None: self._coeffs = list(coeffs) self._zero = zero def __call__(self, i: int) -> PreGroupValue: return sum((coeff * (i ** j) for j, coeff in enumerate(self._coeffs)), self._zero) def __len__(self) -> int: return len(self._coeffs) @overload def __getitem__(self, index: int) -> PreGroupValue: """ """ @overload def __getitem__(self, index: slice) -> Sequence[PreGroupValue]: """ """ def __getitem__( self, idx: Union[int, slice] ) -> Union[PreGroupValue, Sequence[PreGroupValue]]: return self._coeffs[idx] def __repr__(self) -> str: return "Poly([" + ", ".join(str(coeff) for coeff in self._coeffs) + "])"