diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 847f7337..c68db251 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,7 +38,9 @@ jobs: with: activate-environment: scitt environment-file: environment.yml - - run: python -m pytest + - run: | + python -m pip install -e . + python -m pytest ci-cd-build-and-push-image-container: name: CI/CD (container) diff --git a/README.md b/README.md index 7bf84e9d..13f61e3e 100644 --- a/README.md +++ b/README.md @@ -91,13 +91,14 @@ They can be used with the built-in server or an external service implementation. ```sh ./scitt-emulator.sh client create-claim \ - --issuer did:web:example.com \ --content-type application/json \ + --subject 'solar' \ --payload '{"sun": "yellow"}' \ --out claim.cose ``` - _**Note:** The emulator generates an ad-hoc key pair to sign the claim and does not verify claim signatures upon submission._ + _**Note:** The emulator generates an ad-hoc key pair to sign the claim if +``--issuer`` and ``--public-key-pem`` are not given. See [Registration Policies](docs/registration_policies.md) docs for more deatiled examples_ 2. View the signed claim by uploading `claim.cose` to one of the [CBOR or COSE Debugging Tools](#cose-and-cbor-debugging) diff --git a/docs/registration_policies.md b/docs/registration_policies.md index 61c63a7a..64e604d3 100644 --- a/docs/registration_policies.md +++ b/docs/registration_policies.md @@ -12,14 +12,14 @@ The SCITT API emulator can deny entry based on presence of This is a simple way to enable evaluation of claims prior to submission by arbitrary policy engines which watch the workspace (fanotify, inotify, etc.). -[![asciicast-of-simple-decoupled-file-based-policy-engine](https://asciinema.org/a/572766.svg)](https://asciinema.org/a/572766) +[![asciicast-of-simple-decoupled-file-based-policy-engine](https://asciinema.org/a/620587.svg)](https://asciinema.org/a/620587) Start the server ```console $ rm -rf workspace/ $ mkdir -p workspace/storage/operations -$ scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro +$ timeout 1s scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro Service parameters: workspace/service_parameters.json ^C ``` @@ -84,43 +84,66 @@ import os import sys import json import pathlib -import traceback +import unittest -import cbor2 +import cwt import pycose +from pycose.messages import Sign1Message from jsonschema import validate, ValidationError -from pycose.messages import CoseMessage, Sign1Message -from scitt_emulator.scitt import ClaimInvalidError, COSE_Headers_Issuer +from scitt_emulator.scitt import ClaimInvalidError, CWTClaims +from scitt_emulator.verify_statement import verify_statement +from scitt_emulator.key_helpers import verification_key_to_object + -claim = sys.stdin.buffer.read() +def main(): + claim = sys.stdin.buffer.read() -msg = CoseMessage.decode(claim) + msg = Sign1Message.decode(claim, tag=True) -if pycose.headers.ContentType not in msg.phdr: - raise ClaimInvalidError("Claim does not have a content type header parameter") -if COSE_Headers_Issuer not in msg.phdr: - raise ClaimInvalidError("Claim does not have an issuer header parameter") + if pycose.headers.ContentType not in msg.phdr: + raise ClaimInvalidError("Claim does not have a content type header parameter") + if not msg.phdr[pycose.headers.ContentType].startswith("application/json"): + raise TypeError( + f"Claim content type does not start with application/json: {msg.phdr[pycose.headers.ContentType]!r}" + ) -if not msg.phdr[pycose.headers.ContentType].startswith("application/json"): - raise TypeError( - f"Claim content type does not start with application/json: {msg.phdr[pycose.headers.ContentType]!r}" + verification_key = verify_statement(msg) + unittest.TestCase().assertTrue( + verification_key, + "Failed to verify signature on statement", ) -SCHEMA = json.loads(pathlib.Path(os.environ["SCHEMA_PATH"]).read_text()) + cwt_protected = cwt.decode(msg.phdr[CWTClaims], verification_key.cwt) + issuer = cwt_protected[1] + subject = cwt_protected[2] -try: - validate( - instance={ - "$schema": "https://schema.example.com/scitt-policy-engine-jsonschema.schema.json", - "issuer": msg.phdr[COSE_Headers_Issuer], - "claim": json.loads(msg.payload.decode()), - }, - schema=SCHEMA, + issuer_key_as_object = verification_key_to_object(verification_key) + unittest.TestCase().assertTrue( + issuer_key_as_object, + "Failed to convert issuer key to JSON schema verifiable object", ) -except ValidationError as error: - print(str(error), file=sys.stderr) - sys.exit(1) + + SCHEMA = json.loads(pathlib.Path(os.environ["SCHEMA_PATH"]).read_text()) + + try: + validate( + instance={ + "$schema": "https://schema.example.com/scitt-policy-engine-jsonschema.schema.json", + "issuer": issuer, + "issuer_key": issuer_key_as_object, + "subject": subject, + "claim": json.loads(msg.payload.decode()), + }, + schema=SCHEMA, + ) + except ValidationError as error: + print(str(error), file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() ``` We'll create a small wrapper to serve in place of a more fully featured policy @@ -140,21 +163,134 @@ echo ${CLAIM_PATH} Example running allowlist check and enforcement. ```console -npm install -g nodemon -nodemon -e .cose --exec 'find workspace/storage/operations -name \*.cose -exec nohup sh -xe policy_engine.sh $(cat workspace/service_parameters.json | jq -r .insertPolicy) {} \;' +$ npm install nodemon && \ + DID_WEB_ASSUME_SCHEME=http node_modules/.bin/nodemon -e .cose --exec 'find workspace/storage/operations -name \*.cose -exec nohup sh -xe policy_engine.sh $(cat workspace/service_parameters.json | jq -r .insertPolicy) {} \;' ``` Also ensure you restart the server with the new config we edited. ```console -scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro +$ scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro +``` + +The current emulator notary (create-statement) implementation will sign +statements using a generated ephemeral key or a key we provide via the +`--private-key-pem` argument. + +Since we need to export the key for verification by the policy engine, we will +first generate it using `ssh-keygen`. + +```console +$ export ISSUER_PORT="9000" \ + && export ISSUER_URL="http://localhost:${ISSUER_PORT}" \ + && ssh-keygen -q -f /dev/stdout -t ecdsa -b 384 -N '' -I $RANDOM <</dev/null | python -c 'import sys; from cryptography.hazmat.primitives import serialization; print(serialization.load_ssh_private_key(sys.stdin.buffer.read(), password=None).private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode().rstrip())' > private-key.pem \ + && scitt-emulator client create-claim \ + --private-key-pem private-key.pem \ + --issuer "${ISSUER_URL}" \ + --subject "solar" \ + --content-type application/json \ + --payload '{"sun": "yellow"}' \ + --out claim.cose ``` -Create claim from allowed issuer (`.org`) and from non-allowed (`.com`). +The core of policy engine we implemented in `jsonschema_validator.py` will +verify the COSE message generated using the public portion of the notary's key. +We've implemented two possible styles of key resolution. Both of them require +resolution of public keys via an HTTP server. + +Let's start the HTTP server now, we'll populate the needed files in the +sections corresponding to each resolution style. + +```console +$ python -m http.server "${ISSUER_PORT}" & +$ python_http_server_pid=$! +``` + +### SSH `authorized_keys` style notary public key resolution + +Keys are discovered via making an HTTP GET request to the URL given by the +`issuer` parameter via the `web` DID method and de-serializing the SSH +public keys found within the response body. + +GitHub exports a users authentication keys at https://github.com/username.keys +Leveraging this URL as an issuer `did:web:github.com:username.keys` with the +following pattern would enable a GitHub user to act as a SCITT notary. + +Start an HTTP server with an SSH public key served at the root. + +```console +$ cat private-key.pem | ssh-keygen -f /dev/stdin -y | tee index.html +``` + +### OpenID Connect token style notary public key resolution + +Keys are discovered two part resolution of HTTP paths relative to the issuer + +`/.well-known/openid-configuration` path is requested via HTTP GET. The +response body is parsed as JSON and the value of the `jwks_uri` key is +requested via HTTP GET. + +`/.well-known/jwks` (is typically the value of `jwks_uri`) path is requested +via HTTP GET. The response body is parsed as JSON. Public keys are loaded +from the value of the `keys` key which stores an array of JSON Web Key (JWK) +style serializations. + +```console +$ mkdir -p .well-known/ +$ cat > .well-known/openid-configuration < @@ -174,10 +310,29 @@ Failed validating 'enum' in schema['properties']['issuer']: On instance['issuer']: 'did:web:example.com' +``` + +### Policy engine executing allowlist policy on allowed issuer + +Modify the allowlist to ensure that our issuer, aka our local HTTP server with +our keys, is set to be the allowed issuer. + +```console +$ export allowlist="$(cat allowlist.schema.json)" && \ + jq '.properties.issuer.enum = [env.ISSUER_URL, "http://localhost:8000"]' <(echo "${allowlist}") \ + | tee allowlist.schema.json +``` -$ scitt-emulator client create-claim --issuer did:web:example.org --content-type application/json --payload '{"sun": "yellow"}' --out claim.cose -A COSE signed Claim was written to: claim.cose +Submit the statement from the issuer we just added to the allowlist. + +```console $ scitt-emulator client submit-claim --claim claim.cose --out claim.receipt.cbor Claim registered with entry ID 1 Receipt written to claim.receipt.cbor ``` + +Stop the server that serves the public keys + +```console +$ kill $python_http_server_pid +``` diff --git a/environment.yml b/environment.yml index b62f9e81..f83f20ba 100644 --- a/environment.yml +++ b/environment.yml @@ -39,3 +39,4 @@ dependencies: - jwcrypto==1.5.0 - PyJWT==2.8.0 - werkzeug==2.2.2 + - cwt==2.7.1 diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..8efa2302 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,8 @@ +[pytest] +# https://docs.pytest.org/en/7.1.x/how-to/doctest.html#using-doctest-options +doctest_optionflags = NORMALIZE_WHITESPACE IGNORE_EXCEPTION_DETAIL +# Alternatively, options can be enabled by an inline comment in the doc test itself: +# >>> something_that_raises() # doctest: +IGNORE_EXCEPTION_DETAIL +# Traceback (most recent call last): +# ValueError: ... +addopts = --doctest-modules diff --git a/scitt_emulator/ccf.py b/scitt_emulator/ccf.py index 825c7eea..06296f86 100644 --- a/scitt_emulator/ccf.py +++ b/scitt_emulator/ccf.py @@ -5,8 +5,10 @@ from pathlib import Path from hashlib import sha256 import datetime +import pathlib import json +import jwcrypto.jwk from cryptography.hazmat.primitives.asymmetric import ec, utils from cryptography.hazmat.primitives.serialization import ( Encoding, @@ -72,6 +74,18 @@ def initialize_service(self): json.dump(self.service_parameters, f) print(f"Service parameters written to {self.service_parameters_path}") + def keys_as_jwks(self): + key = jwcrypto.jwk.JWK() + key_bytes = pathlib.Path(self._service_private_key_path).read_bytes() + key.import_from_pem(key_bytes) + return [ + { + **key.export_public(as_dict=True), + "use": "sig", + "kid": key.thumbprint(), + } + ] + def create_receipt_contents(self, countersign_tbi: bytes, entry_id: str): # Load service private key and certificate with open(self._service_private_key_path, "rb") as f: diff --git a/scitt_emulator/client.py b/scitt_emulator/client.py index b4ff35ee..2511f9fb 100644 --- a/scitt_emulator/client.py +++ b/scitt_emulator/client.py @@ -8,7 +8,7 @@ import httpx -import scitt_emulator.scitt as scitt +from scitt_emulator import create_statement from scitt_emulator.tree_algs import TREE_ALGS DEFAULT_URL = "http://127.0.0.1:8000" @@ -72,10 +72,6 @@ def post(self, *args, **kwargs): return self._request("POST", *args, **kwargs) -def create_claim(issuer: str, content_type: str, payload: str, claim_path: Path): - scitt.create_claim(claim_path, issuer, content_type, payload) - - def submit_claim( url: str, claim_path: Path, @@ -170,16 +166,7 @@ def cli(fn): parser = fn(description="Execute client commands") sub = parser.add_subparsers(dest="cmd", help="Command to execute", required=True) - p = sub.add_parser("create-claim", description="Create a fake SCITT claim") - p.add_argument("--out", required=True, type=Path) - p.add_argument("--issuer", required=True, type=str) - p.add_argument("--content-type", required=True, type=str) - p.add_argument("--payload", required=True, type=str) - p.set_defaults( - func=lambda args: scitt.create_claim( - args.out, args.issuer, args.content_type, args.payload - ) - ) + create_statement.cli(sub.add_parser) p = sub.add_parser( "submit-claim", description="Submit a SCITT claim and retrieve the receipt" diff --git a/scitt_emulator/create_statement.py b/scitt_emulator/create_statement.py new file mode 100644 index 00000000..0ea8f7da --- /dev/null +++ b/scitt_emulator/create_statement.py @@ -0,0 +1,204 @@ +# Copyright (c) SCITT Authors +# Licensed under the MIT License. +import base64 +import pathlib +import argparse +from typing import Union, Optional, List + +import cwt +import pycose +import pycose.headers +import pycose.messages +import pycose.keys.ec2 +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat +from cryptography.hazmat.primitives.serialization import load_pem_private_key + +# TODO jwcrypto is LGPLv3, is there another option with a permissive licence? +import jwcrypto.jwk + +from scitt_emulator.did_helpers import DID_JWK_METHOD + + +@pycose.headers.CoseHeaderAttribute.register_attribute() +class CWTClaims(pycose.headers.CoseHeaderAttribute): + identifier = 14 + fullname = "CWT_CLAIMS" + + +@pycose.headers.CoseHeaderAttribute.register_attribute() +class RegInfo(pycose.headers.CoseHeaderAttribute): + identifier = 393 + fullname = "REG_INFO" + + +@pycose.headers.CoseHeaderAttribute.register_attribute() +class Receipts(pycose.headers.CoseHeaderAttribute): + identifier = 394 + fullname = "RECEIPTS" + + +@pycose.headers.CoseHeaderAttribute.register_attribute() +class TBD(pycose.headers.CoseHeaderAttribute): + identifier = 395 + fullname = "TBD" + + +def create_claim( + claim_path: pathlib.Path, + issuer: Union[str, None], + subject: str, + content_type: str, + payload: bytes, + private_key_pem_path: Optional[str] = None, + receipts: Optional[List[bytes]] = None, +): + # https://ietf-wg-scitt.github.io/draft-ietf-scitt-architecture/draft-ietf-scitt-architecture.html#name-signed-statement-envelope + + # Registration Policy (label: TBD, temporary: 393): A map containing + # key/value pairs set by the Issuer which are sealed on Registration and + # non-opaque to the Transparency Service. The key/value pair semantics are + # specified by the Issuer or are specific to the CWT_Claims iss and + # CWT_Claims sub tuple. + # Examples: the sequence number of signed statements + # on a CWT_Claims Subject, Issuer metadata, or a reference to other + # Transparent Statements (e.g., augments, replaces, new-version, CPE-for) + # Reg_Info = { + reg_info = { + # ? "register_by": uint .within (~time), + "register_by": 1000, + # ? "sequence_no": uint, + "sequence_no": 0, + # ? "issuance_ts": uint .within (~time), + "issuance_ts": 1000, + # ? "no_replay": null, + "no_replay": None, + # * tstr => any + } + # } + + # Create COSE_Sign1 structure + # Create an ad-hoc key + # oct: size(int) + # RSA: public_exponent(int), size(int) + # EC: crv(str) (one of P-256, P-384, P-521, secp256k1) + # OKP: crv(str) (one of Ed25519, Ed448, X25519, X448) + key = jwcrypto.jwk.JWK() + if private_key_pem_path and private_key_pem_path.exists(): + key.import_from_pem(private_key_pem_path.read_bytes()) + else: + key = key.generate(kty="EC", crv="P-384") + # https://python-cwt.readthedocs.io/en/stable/algorithms.html + alg = key.key_curve.replace("P-", "ES") + kid = key.thumbprint() + key_as_pem_bytes = key.export_to_pem(private_key=True, password=None) + # cwt_cose_key = cwt.COSEKey.generate_symmetric_key(alg=alg, kid=kid) + cwt_cose_key = cwt.COSEKey.from_pem(key_as_pem_bytes, kid=kid) + # cwt_cose_key_to_cose_key = cwt.algs.ec2.EC2Key.to_cose_key(cwt_cose_key) + cwt_cose_key_to_cose_key = cwt_cose_key.to_dict() + sign1_message_key = pycose.keys.ec2.EC2Key.from_dict(cwt_cose_key_to_cose_key) + + # If issuer was not given used did:jwk of public key + if issuer is None: + issuer = DID_JWK_METHOD + base64.urlsafe_b64encode(key.export_public().encode()).decode() + + # CWT_Claims (label: 14 pending [CWT_CLAIM_COSE]): A CWT representing + # the Issuer (iss) making the statement, and the Subject (sub) to + # correlate a collection of statements about an Artifact. Additional + # [CWT_CLAIMS] MAY be used, while iss and sub MUST be provided + # CWT_Claims = { + cwt_claims = { + # iss (CWT_Claim Key 1): The Identifier of the signer, as a string + # Example: did:web:example.com + # 1 => tstr; iss, the issuer making statements, + 1: issuer, + # sub (CWT_Claim Key 2): The Subject to which the Statement refers, + # chosen by the Issuer + # Example: github.com/opensbom-generator/spdx-sbom-generator/releases/tag/v0.0.13 + # 2 => tstr; sub, the subject of the statements, + 2: subject, + # * tstr => any + } + # } + cwt_token = cwt.encode(cwt_claims, cwt_cose_key) + + # Protected_Header = { + protected = { + # algorithm (label: 1): Asymmetric signature algorithm used by the + # Issuer of a Signed Statement, as an integer. + # Example: -35 is the registered algorithm identifier for ECDSA with + # SHA-384, see COSE Algorithms Registry [IANA.cose]. + # 1 => int ; algorithm identifier, + # https://www.iana.org/assignments/cose/cose.xhtml#algorithms + # pycose.headers.Algorithm: "ES256", + pycose.headers.Algorithm: getattr(cwt.enums.COSEAlgs, alg), + # Key ID (label: 4): Key ID, as a bytestring + # 4 => bstr ; Key ID, + pycose.headers.KID: kid.encode("ascii"), + # 14 => CWT_Claims ; CBOR Web Token Claims, + CWTClaims: cwt_token, + # 393 => Reg_Info ; Registration Policy info, + RegInfo: reg_info, + # 3 => tstr ; payload type + pycose.headers.ContentType: content_type, + } + # } + + # Unprotected_Header = { + unprotected = { + # ; TBD, Labels are temporary, + TBD: "TBD", + # ? 394 => [+ Receipts] + Receipts: receipts, + } + # } + + # https://github.com/TimothyClaeys/pycose/blob/e527e79b611f6cc6673bbb694056a7468c2eef75/pycose/messages/cosemessage.py#L84-L91 + msg = pycose.messages.Sign1Message( + phdr=protected, + uhdr=unprotected, + payload=payload, + ) + + # Sign + msg.key = sign1_message_key + # https://github.com/TimothyClaeys/pycose/blob/e527e79b611f6cc6673bbb694056a7468c2eef75/pycose/messages/cosemessage.py#L143 + claim = msg.encode(tag=True) + claim_path.write_bytes(claim) + + # Write out private key in PEM format if argument given and not exists + if private_key_pem_path and not private_key_pem_path.exists(): + private_key_pem_path.write_bytes(key_as_pem_bytes) + + +def cli(fn): + p = fn("create-claim", description="Create a fake SCITT claim") + p.add_argument("--out", required=True, type=pathlib.Path) + p.add_argument("--issuer", required=False, type=str, default=None) + p.add_argument("--subject", required=True, type=str) + p.add_argument("--content-type", required=True, type=str) + p.add_argument("--payload", required=True, type=str) + p.add_argument("--private-key-pem", required=False, type=pathlib.Path) + p.add_argument("--receipts", type=pathlib.Path, nargs="*", default=[]) + p.set_defaults( + func=lambda args: create_claim( + args.out, + args.issuer, + args.subject, + args.content_type, + args.payload.encode("utf-8"), + private_key_pem_path=args.private_key_pem, + receipts=[receipt.read_bytes() for receipt in args.receipts], + ) + ) + + return p + + +def main(argv=None): + parser = cli(argparse.ArgumentParser) + args = parser.parse_args(argv) + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/scitt_emulator/did_helpers.py b/scitt_emulator/did_helpers.py new file mode 100644 index 00000000..145be371 --- /dev/null +++ b/scitt_emulator/did_helpers.py @@ -0,0 +1,31 @@ +import os +import urllib.parse +from typing import Optional + + +DID_JWK_METHOD = "did:jwk:" + + +def did_web_to_url( + did_web_string: str, + *, + scheme: Optional[str] = None, +): + if scheme is None: + scheme = os.environ.get("DID_WEB_ASSUME_SCHEME", "https") + return "/".join( + [ + f"{scheme}:/", + *[urllib.parse.unquote(i) for i in did_web_string.split(":")[2:]], + ] + ) + + +def url_to_did_web(url_string): + url = urllib.parse.urlparse(url_string) + return ":".join( + [ + urllib.parse.quote(i) + for i in ["did", "web", url.netloc, *filter(bool, url.path.split("/"))] + ] + ) diff --git a/scitt_emulator/key_helper_dataclasses.py b/scitt_emulator/key_helper_dataclasses.py new file mode 100644 index 00000000..505afe88 --- /dev/null +++ b/scitt_emulator/key_helper_dataclasses.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass, field +from typing import List, Any, Union + +import cwt +import pycose.keys.ec2 + + +@dataclass +class VerificationKey: + transforms: List[Any] + original: Any + original_content_type: str + original_bytes: bytes + original_bytes_encoding: str + usable: bool + cwt: Union[cwt.COSEKey, None] + cose: Union[pycose.keys.ec2.EC2Key, None] diff --git a/scitt_emulator/key_helpers.py b/scitt_emulator/key_helpers.py new file mode 100644 index 00000000..5d692ee9 --- /dev/null +++ b/scitt_emulator/key_helpers.py @@ -0,0 +1,41 @@ +import itertools +import importlib.metadata +from typing import Optional, Callable, List, Tuple + +from scitt_emulator.key_helper_dataclasses import VerificationKey + + +ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT = "scitt_emulator.key_helpers.verification_key_to_object" + + +def verification_key_to_object( + verification_key: VerificationKey, + *, + key_transforms: Optional[List[Callable[[VerificationKey], dict]]] = None, +) -> bool: + """ + Resolve keys for statement issuer and verify signature on COSESign1 + statement and embedded CWT + """ + if key_transforms is None: + key_transforms = [] + # There is some difference in the return value of entry_points across + # Python versions/envs (conda vs. non-conda). Python 3.8 returns a dict. + entrypoints = importlib.metadata.entry_points() + if isinstance(entrypoints, dict): + for entrypoint in entrypoints.get(ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT, []): + key_transforms.append(entrypoint.load()) + elif isinstance(entrypoints, getattr(importlib.metadata, "EntryPoints", list)): + for entrypoint in entrypoints: + if entrypoint.group == ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT: + key_transforms.append(entrypoint.load()) + else: + raise TypeError(f"importlib.metadata.entry_points returned unknown type: {type(entrypoints)}: {entrypoints!r}") + + for key_transform in key_transforms: + verification_key_as_object = key_transform(verification_key) + # Skip keys that we couldn't derive COSE keys for + if verification_key_as_object: + return verification_key_as_object + + return None diff --git a/scitt_emulator/key_loader_format_did_jwk.py b/scitt_emulator/key_loader_format_did_jwk.py new file mode 100644 index 00000000..b22f96c0 --- /dev/null +++ b/scitt_emulator/key_loader_format_did_jwk.py @@ -0,0 +1,53 @@ +import base64 +from typing import List, Tuple + +import cwt +import cwt.algs.ec2 +import pycose +import pycose.keys.ec2 +import cryptography.hazmat.primitives.asymmetric.ec +from cryptography.hazmat.primitives import serialization + +import jwcrypto.jwk + +from scitt_emulator.did_helpers import DID_JWK_METHOD +from scitt_emulator.key_helper_dataclasses import VerificationKey + + +CONTENT_TYPE = "application/did+jwk" + + +def key_loader_format_did_jwk( + unverified_issuer: str, +) -> List[VerificationKey]: + if not unverified_issuer.startswith(DID_JWK_METHOD): + return [] + key = jwcrypto.jwk.JWK.from_json( + base64.urlsafe_b64decode(unverified_issuer[len(DID_JWK_METHOD):]).decode() + ) + return [ + VerificationKey( + transforms=[key], + original=key, + original_content_type=CONTENT_TYPE, + original_bytes=unverified_issuer.encode("utf-8"), + original_bytes_encoding="utf-8", + usable=False, + cwt=None, + cose=None, + ) + ] + + +def to_object_jwk(verification_key: VerificationKey) -> dict: + if not isinstance(verification_key.original, jwcrypto.jwk.JWK): + return + + return { + "content_type": verification_key.original_content_type, + "key": { + **verification_key.original.export_public(as_dict=True), + "use": "sig", + "kid": verification_key.original.thumbprint(), + }, + } diff --git a/scitt_emulator/key_loader_format_url_referencing_oidc_issuer.py b/scitt_emulator/key_loader_format_url_referencing_oidc_issuer.py new file mode 100644 index 00000000..56409282 --- /dev/null +++ b/scitt_emulator/key_loader_format_url_referencing_oidc_issuer.py @@ -0,0 +1,64 @@ +import json +import contextlib +import urllib.parse +import urllib.request +from typing import List, Tuple + +import cwt +import cwt.algs.ec2 +import pycose +import pycose.keys.ec2 + +# TODO Remove this once we have a example flow for proper key verification +import jwcrypto.jwk + +from scitt_emulator.did_helpers import did_web_to_url +from scitt_emulator.key_helper_dataclasses import VerificationKey +from scitt_emulator.key_loader_format_did_jwk import to_object_jwk + + +CONTENT_TYPE = "application/jwk+json" + + +def key_loader_format_url_referencing_oidc_issuer( + unverified_issuer: str, +) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]: + keys = [] + + if unverified_issuer.startswith("did:web:"): + unverified_issuer = did_web_to_url(unverified_issuer) + + if "://" not in unverified_issuer or unverified_issuer.startswith("file://"): + return keys + + # TODO Logging for URLErrors + # Check if OIDC issuer + unverified_issuer_parsed_url = urllib.parse.urlparse(unverified_issuer) + openid_configuration_url = unverified_issuer_parsed_url._replace( + path="/.well-known/openid-configuration", + ).geturl() + with contextlib.suppress(urllib.request.URLError): + with urllib.request.urlopen(openid_configuration_url) as response: + if response.status == 200: + openid_configuration = json.loads(response.read()) + jwks_uri = openid_configuration["jwks_uri"] + with urllib.request.urlopen(jwks_uri) as response: + if response.status == 200: + jwks = json.loads(response.read()) + for jwk_key_as_dict in jwks["keys"]: + jwk_key_as_string = json.dumps(jwk_key_as_dict) + jwk_key = jwcrypto.jwk.JWK.from_json(jwk_key_as_string) + keys.append( + VerificationKey( + transforms=[jwk_key], + original=jwk_key, + original_content_type=CONTENT_TYPE, + original_bytes=jwk_key_as_string.encode("utf-8"), + original_bytes_encoding="utf-8", + usable=False, + cwt=None, + cose=None, + ) + ) + + return keys diff --git a/scitt_emulator/key_loader_format_url_referencing_scitt_scrapi.py b/scitt_emulator/key_loader_format_url_referencing_scitt_scrapi.py new file mode 100644 index 00000000..67ab3603 --- /dev/null +++ b/scitt_emulator/key_loader_format_url_referencing_scitt_scrapi.py @@ -0,0 +1,72 @@ +import json +import contextlib +import urllib.parse +import urllib.request +from typing import List, Tuple + +import cwt +import cwt.algs.ec2 +import pycose +import pycose.keys.ec2 + +# TODO Remove this once we have a example flow for proper key verification +import jwcrypto.jwk + +from scitt_emulator.did_helpers import did_web_to_url +from scitt_emulator.key_helper_dataclasses import VerificationKey +from scitt_emulator.key_loader_format_did_jwk import to_object_jwk + + +CONTENT_TYPE = "application/scitt+jwk+set+json" + + +def key_loader_format_url_referencing_scitt_scrapi( + unverified_issuer: str, +) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]: + keys = [] + + if unverified_issuer.startswith("did:web:"): + unverified_issuer = did_web_to_url(unverified_issuer) + + if "://" not in unverified_issuer or unverified_issuer.startswith("file://"): + return keys + + # TODO Logging for URLErrors + # Check if OIDC issuer + unverified_issuer_parsed_url = urllib.parse.urlparse(unverified_issuer) + openid_configuration_url = unverified_issuer_parsed_url._replace( + path="/.well-known/transparency-configuration", + ).geturl() + with contextlib.suppress(urllib.request.URLError): + with urllib.request.urlopen(openid_configuration_url) as response: + if response.status == 200: + openid_configuration = json.loads(response.read()) + jwks = openid_configuration["jwks"] + for jwk_key_as_dict in jwks["keys"]: + jwk_key_as_string = json.dumps(jwk_key_as_dict) + jwk_key = jwcrypto.jwk.JWK.from_json(jwk_key_as_string) + keys.append( + VerificationKey( + transforms=[jwk_key], + original=jwk_key, + original_content_type=CONTENT_TYPE, + original_bytes=jwk_key_as_string.encode("utf-8"), + original_bytes_encoding="utf-8", + usable=False, + cwt=None, + cose=None, + ) + ) + + return keys + + +def transform_key_instance_jwcrypto_jwk_to_cwt_cose( + key: jwcrypto.jwk.JWK, +) -> cwt.COSEKey: + if not isinstance(key, jwcrypto.jwk.JWK): + raise TypeError(key) + return cwt.COSEKey.from_pem( + key.export_to_pem(), + kid=key.thumbprint(), + ) diff --git a/scitt_emulator/key_loader_format_url_referencing_ssh_authorized_keys.py b/scitt_emulator/key_loader_format_url_referencing_ssh_authorized_keys.py new file mode 100644 index 00000000..4b118412 --- /dev/null +++ b/scitt_emulator/key_loader_format_url_referencing_ssh_authorized_keys.py @@ -0,0 +1,85 @@ +import contextlib +import dataclasses +import urllib.parse +import urllib.request +from typing import List, Tuple + +import cwt +import cwt.algs.ec2 +import pycose +import pycose.keys.ec2 +import cryptography.exceptions +from cryptography.hazmat.primitives import serialization + +# TODO Remove this once we have a example flow for proper key verification +import jwcrypto.jwk + +from scitt_emulator.did_helpers import did_web_to_url +from scitt_emulator.key_helper_dataclasses import VerificationKey +from scitt_emulator.key_loader_format_did_jwk import to_object_jwk + +CONTENT_TYPE = "application/key+ssh" + + +def key_loader_format_url_referencing_ssh_authorized_keys( + unverified_issuer: str, +) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]: + keys = [] + + if unverified_issuer.startswith("did:web:"): + unverified_issuer = did_web_to_url(unverified_issuer) + + if "://" not in unverified_issuer or unverified_issuer.startswith("file://"): + return keys + + # Try loading ssh keys. Example: https://github.com/username.keys + with contextlib.suppress(urllib.request.URLError): + with urllib.request.urlopen(unverified_issuer) as response: + while line := response.readline(): + with contextlib.suppress( + (ValueError, cryptography.exceptions.UnsupportedAlgorithm) + ): + key = serialization.load_ssh_public_key(line) + keys.append( + VerificationKey( + transforms=[key], + original=key, + original_content_type=CONTENT_TYPE, + original_bytes=line, + original_bytes_encoding="utf-8", + usable=False, + cwt=None, + cose=None, + ) + ) + + return keys + + +def transform_key_instance_cryptography_ecc_public_to_jwcrypto_jwk( + key: cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey, +) -> jwcrypto.jwk.JWK: + if not isinstance( + key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey + ): + raise TypeError(key) + return jwcrypto.jwk.JWK.from_pem( + key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + ) + + +def to_object_ssh_public(verification_key: VerificationKey) -> dict: + if verification_key.original_content_type != CONTENT_TYPE: + return + + return to_object_jwk( + dataclasses.replace( + verification_key, + original=transform_key_instance_cryptography_ecc_public_to_jwcrypto_jwk( + verification_key.original, + ) + ) + ) diff --git a/scitt_emulator/key_transforms.py b/scitt_emulator/key_transforms.py new file mode 100644 index 00000000..67eb9ed1 --- /dev/null +++ b/scitt_emulator/key_transforms.py @@ -0,0 +1,89 @@ +import inspect +import itertools +import importlib.metadata +from typing import Optional, Callable, List, Tuple + +import cwt +import pycose.keys.ec2 + +from scitt_emulator.key_helper_dataclasses import VerificationKey + + +ENTRYPOINT_KEY_TRANSFORMS_KEY_INSTANCES = "scitt_emulator.key_helpers.transforms_key_instances" + + +def preform_verification_key_transforms( + verification_keys: List[VerificationKey], + *, + key_transforms: Optional[List[Callable[[VerificationKey], dict]]] = None, +) -> None: + """ + Resolve keys for statement issuer and verify signature on COSESign1 + statement and embedded CWT + """ + # In case of iterators since we have to loop multiple times + verification_keys = list(verification_keys) + + if key_transforms is None: + key_transforms = [] + # There is some difference in the return value of entry_points across + # Python versions/envs (conda vs. non-conda). Python 3.8 returns a dict. + entrypoints = importlib.metadata.entry_points() + if isinstance(entrypoints, dict): + for entrypoint in entrypoints.get(ENTRYPOINT_KEY_TRANSFORMS_KEY_INSTANCES, []): + key_transforms.append(entrypoint.load()) + elif isinstance(entrypoints, getattr(importlib.metadata, "EntryPoints", list)): + for entrypoint in entrypoints: + if entrypoint.group == ENTRYPOINT_KEY_TRANSFORMS_KEY_INSTANCES: + key_transforms.append(entrypoint.load()) + else: + raise TypeError(f"importlib.metadata.entry_points returned unknown type: {type(entrypoints)}: {entrypoints!r}") + + key_transform_types = tuple( + [ + list(inspect.signature(key_transform).parameters.values())[0].annotation + for key_transform in key_transforms + ] + ) + + for verification_key in verification_keys: + while not verification_key.usable: + # Attempt key transforms + for key_transform in key_transforms: + key = verification_key.transforms[-1] + if isinstance(key, list(inspect.signature(key_transform).parameters.values())[0].annotation): + transformed_key = key_transform(key) + if transformed_key: + verification_key.transforms.append(transformed_key) + # Check if key is usable yet + for key in reversed(verification_key.transforms): + if not verification_key.cwt and isinstance(key, cwt.algs.ec2.EC2Key): + verification_key.cwt = key + if ( + not verification_key.cose + and isinstance( + key, + ( + pycose.keys.ec2.EC2Key, + ) + ) + ): + verification_key.cose = key + if verification_key.cwt and verification_key.cose: + verification_key.usable = True + break + # If we are unable to transform further, raise exception + key = verification_key.transforms[-1] + if not isinstance(key, key_transform_types): + raise NotImplementedError(f"Unable to transform {type(key)} into CWT and COSE keys needed. Transforms available: {key_transforms}. Transform types accepted: {key_transform_types}. Transforms completed: {verification_key.transforms}") + + return verification_keys + + +def transform_key_instance_cwt_cose_ec2_to_pycose_ec2( + key: cwt.algs.ec2.EC2Key, +) -> pycose.keys.ec2.EC2Key: + if not isinstance(key, cwt.algs.ec2.EC2Key): + raise TypeError(key) + cwt_ec2_key_as_dict = key.to_dict() + return pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict) diff --git a/scitt_emulator/oidc.py b/scitt_emulator/oidc.py index 4ca770d2..75d82d0b 100644 --- a/scitt_emulator/oidc.py +++ b/scitt_emulator/oidc.py @@ -2,9 +2,7 @@ # Licensed under the MIT License. import jwt import json -import jwcrypto.jwk import jsonschema -from flask import jsonify from werkzeug.wrappers import Request from scitt_emulator.client import HttpClient diff --git a/scitt_emulator/rkvst.py b/scitt_emulator/rkvst.py index 54f9f5ee..add848ad 100644 --- a/scitt_emulator/rkvst.py +++ b/scitt_emulator/rkvst.py @@ -5,8 +5,7 @@ from typing import Optional from pathlib import Path import json -import cbor2 -from pycose.messages import CoseMessage, Sign1Message +from pycose.messages import Sign1Message import pycose.headers import base64 from os import getenv @@ -59,6 +58,9 @@ def initialize_service(self): "serviceCertificate": None, } + def keys_as_jwks(self): + return [] + def _event_id_to_operation_id(self, event_id: str): return event_id.replace('/', '_') diff --git a/scitt_emulator/scitt.py b/scitt_emulator/scitt.py index 3311b778..8486efb8 100644 --- a/scitt_emulator/scitt.py +++ b/scitt_emulator/scitt.py @@ -4,19 +4,16 @@ from typing import Optional from abc import ABC, abstractmethod from pathlib import Path -import contextlib import time import json import uuid import cbor2 -from pycose.messages import CoseMessage, Sign1Message +from pycose.messages import Sign1Message import pycose.headers -from pycose.keys.ec2 import EC2Key -import pycose.keys.curves -# temporary claim header labels, see draft-birkholz-scitt-architecture -COSE_Headers_Issuer = 391 +from scitt_emulator.create_statement import CWTClaims +from scitt_emulator.verify_statement import verify_statement # temporary receipt header labels, see draft-birkholz-scitt-receipts COSE_Headers_Service_Id = "service_id" @@ -63,6 +60,10 @@ def __init__( def initialize_service(self): raise NotImplementedError + @abstractmethod + def keys_as_jwks(self): + raise NotImplementedError + @abstractmethod def create_receipt_contents(self, countersign_tbi: bytes, entry_id: str): raise NotImplementedError @@ -225,7 +226,7 @@ def _create_receipt(self, claim: bytes, entry_id: str): # Note: This emulator does not verify the claim signature and does not apply # registration policies. try: - msg = CoseMessage.decode(claim) + msg = Sign1Message.decode(claim, tag=True) except: raise ClaimInvalidError("Claim is not a valid COSE message") if not isinstance(msg, Sign1Message): @@ -236,10 +237,8 @@ def _create_receipt(self, claim: bytes, entry_id: str): raise ClaimInvalidError( "Claim does not have a content type header parameter" ) - if COSE_Headers_Issuer not in msg.phdr: - raise ClaimInvalidError("Claim does not have an issuer header parameter") - if not isinstance(msg.phdr[COSE_Headers_Issuer], str): - raise ClaimInvalidError("Claim issuer is not a string") + if CWTClaims not in msg.phdr: + raise ClaimInvalidError("Claim does not have a CWTClaims header parameter") # Extract fields of COSE_Sign1 for countersigning outer = cbor2.loads(claim) @@ -304,28 +303,6 @@ def verify_receipt(self, cose_path: Path, receipt_path: Path): self.verify_receipt_contents(receipt_contents, countersign_tbi) -def create_claim(claim_path: Path, issuer: str, content_type: str, payload: str): - # Create COSE_Sign1 structure - protected = { - pycose.headers.Algorithm: "ES256", - pycose.headers.ContentType: content_type, - COSE_Headers_Issuer: issuer, - } - msg = Sign1Message(phdr=protected, payload=payload.encode("utf-8")) - - # Create an ad-hoc key - # Note: The emulator does not validate signatures, hence the short-cut. - key = EC2Key.generate_key(pycose.keys.curves.P256) - - # Sign - msg.key = key - claim = msg.encode(tag=True) - - with open(claim_path, "wb") as f: - f.write(claim) - print(f"A COSE signed Claim was written to: {claim_path}") - - def create_countersign_to_be_included( body_protected, sign_protected, payload, signature ): diff --git a/scitt_emulator/server.py b/scitt_emulator/server.py index 1ab4e60b..0bd6a9fc 100644 --- a/scitt_emulator/server.py +++ b/scitt_emulator/server.py @@ -6,7 +6,7 @@ from io import BytesIO import random -from flask import Flask, request, send_file, make_response +from flask import Flask, request, send_file, make_response, jsonify from scitt_emulator.tree_algs import TREE_ALGS from scitt_emulator.plugin_helpers import entrypoint_style_load @@ -56,6 +56,23 @@ def create_flask_app(config): def is_unavailable(): return random.random() <= error_rate + @app.route("/.well-known/transparency-configuration", methods=["GET"]) + def get_transparency_configuration(): + if is_unavailable(): + return make_unavailable_error() + return jsonify( + { + "issuer": "/", + "registration_endpoint": f"/entries", + "nonce_endpoint": f"/nonce", + "registration_policy": f"/statements/TODO", + "supported_signature_algorithms": ["ES256"], + "jwks": { + "keys": app.scitt_service.keys_as_jwks(), + } + } + ) + @app.route("/entries//receipt", methods=["GET"]) def get_receipt(entry_id: str): if is_unavailable(): diff --git a/scitt_emulator/verify_statement.py b/scitt_emulator/verify_statement.py new file mode 100644 index 00000000..9fa60c15 --- /dev/null +++ b/scitt_emulator/verify_statement.py @@ -0,0 +1,73 @@ +import os +import itertools +import contextlib +import dataclasses +import urllib.parse +import urllib.request +import importlib.metadata +from typing import Optional, Callable, List, Tuple + +import cwt +import cwt.algs.ec2 +import pycose +import pycose.keys.ec2 +from pycose.messages import Sign1Message + +from scitt_emulator.did_helpers import did_web_to_url +from scitt_emulator.create_statement import CWTClaims +from scitt_emulator.key_helper_dataclasses import VerificationKey +from scitt_emulator.key_transforms import preform_verification_key_transforms + + +ENTRYPOINT_KEY_LOADERS = "scitt_emulator.verify_signature.key_loaders" + + +def verify_statement( + msg: Sign1Message, + *, + key_loaders: Optional[List[Callable[[str], List[VerificationKey]]]] = None, +) -> bool: + """ + Resolve keys for statement issuer and verify signature on COSESign1 + statement and embedded CWT + """ + if key_loaders is None: + key_loaders = [] + # There is some difference in the return value of entry_points across + # Python versions/envs (conda vs. non-conda). Python 3.8 returns a dict. + entrypoints = importlib.metadata.entry_points() + if isinstance(entrypoints, dict): + for entrypoint in entrypoints.get(ENTRYPOINT_KEY_LOADERS, []): + key_loaders.append(entrypoint.load()) + elif isinstance(entrypoints, getattr(importlib.metadata, "EntryPoints", list)): + for entrypoint in entrypoints: + if entrypoint.group == ENTRYPOINT_KEY_LOADERS: + key_loaders.append(entrypoint.load()) + else: + raise TypeError(f"importlib.metadata.entry_points returned unknown type: {type(entrypoints)}: {entrypoints!r}") + + # Figure out what the issuer is + cwt_cose_loads = cwt.cose.COSE()._loads + cwt_unverified_protected = cwt_cose_loads( + cwt_cose_loads(msg.phdr[CWTClaims]).value[2] + ) + unverified_issuer = cwt_unverified_protected[1] + + # Load keys from issuer and attempt verification. Return key used to verify + for verification_key in preform_verification_key_transforms( + itertools.chain( + *[key_loader(unverified_issuer) for key_loader in key_loaders] + ) + ): + # Skip keys that we couldn't derive COSE keys for + if not verification_key.usable: + # TODO Logging + continue + msg.key = verification_key.cose + verify_signature = False + with contextlib.suppress(Exception): + verify_signature = msg.verify_signature() + if verify_signature: + return verification_key + + return None diff --git a/setup.py b/setup.py index 466dd6fc..215b9ea6 100644 --- a/setup.py +++ b/setup.py @@ -10,12 +10,29 @@ entry_points = { 'console_scripts': [ 'scitt-emulator=scitt_emulator.cli:main' - ] + ], + 'scitt_emulator.verify_signature.key_loaders': [ + 'did_jwk=scitt_emulator.key_loader_format_did_jwk:key_loader_format_did_jwk', + 'url_referencing_scitt_scrapi=scitt_emulator.key_loader_format_url_referencing_scitt_scrapi:key_loader_format_url_referencing_scitt_scrapi', + 'url_referencing_oidc_issuer=scitt_emulator.key_loader_format_url_referencing_oidc_issuer:key_loader_format_url_referencing_oidc_issuer', + 'url_referencing_ssh_authorized_keys=scitt_emulator.key_loader_format_url_referencing_ssh_authorized_keys:key_loader_format_url_referencing_ssh_authorized_keys', + ], + 'scitt_emulator.key_helpers.transforms_key_instances': [ + 'transform_key_instance_cwt_cose_ec2_to_pycose_ec2=scitt_emulator.key_transforms:transform_key_instance_cwt_cose_ec2_to_pycose_ec2', + 'transform_key_instance_jwcrypto_jwk_to_cwt_cose=scitt_emulator.key_loader_format_url_referencing_scitt_scrapi:transform_key_instance_jwcrypto_jwk_to_cwt_cose', + 'transform_key_instance_cryptography_ecc_public_to_jwcrypto_jwk=scitt_emulator:key_loader_format_url_referencing_ssh_authorized_keys.transform_key_instance_cryptography_ecc_public_to_jwcrypto_jwk', + ], + 'scitt_emulator.key_helpers.verification_key_to_object': [ + 'to_object_jwk=scitt_emulator.key_loader_format_did_jwk:to_object_jwk', + 'to_object_ssh_public=scitt_emulator.key_loader_format_url_referencing_ssh_authorized_keys:to_object_ssh_public', + ], }, python_requires=">=3.8", install_requires=[ "cryptography", "cbor2", + "cwt", + "jwcrypto", "pycose", "httpx", "flask", diff --git a/tests/test_cli.py b/tests/test_cli.py index 95319901..5eb2e58a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,17 +1,17 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. import os +import io import json import threading import pytest import jwt import jwcrypto -from flask import Flask, jsonify +from flask import Flask, jsonify, send_file from werkzeug.serving import make_server from scitt_emulator import cli, server from scitt_emulator.oidc import OIDCAuthMiddleware -issuer = "did:web:example.com" content_type = "application/json" payload = '{"foo": "bar"}' @@ -71,8 +71,8 @@ def test_client_cli(use_lro: bool, tmp_path): "create-claim", "--out", claim_path, - "--issuer", - issuer, + "--subject", + "test", "--content-type", content_type, "--payload", @@ -155,6 +155,48 @@ def test_client_cli(use_lro: bool, tmp_path): receipt_2 = f.read() assert receipt == receipt_2 + # create transparent statement + command = [ + "client", + "create-claim", + "--out", + claim_path, + "--subject", + "test", + "--content-type", + content_type, + "--payload", + payload, + "--receipts", + receipt_path, + ] + execute_cli(command) + assert os.path.exists(claim_path) + + +def create_flask_app_ssh_authorized_keys_server(config): + app = Flask("ssh_authorized_keys_server") + + app.config.update(dict(DEBUG=True)) + app.config.update(config) + + @app.route("/", methods=["GET"]) + def ssh_public_keys(): + from cryptography.hazmat.primitives import serialization + return send_file( + io.BytesIO( + serialization.load_pem_public_key( + app.config["key"].export_to_pem(), + ).public_bytes( + encoding=serialization.Encoding.OpenSSH, + format=serialization.PublicFormat.OpenSSH, + ) + ), + mimetype="text/plain", + ) + + return app + def create_flask_app_oidc_server(config): app = Flask("oidc_server") @@ -246,8 +288,8 @@ def test_client_cli_token(tmp_path): "create-claim", "--out", claim_path, - "--issuer", - issuer, + "--subject", + "test", "--content-type", content_type, "--payload", diff --git a/tests/test_docs.py b/tests/test_docs.py index ea3d92d9..6ed369f8 100644 --- a/tests/test_docs.py +++ b/tests/test_docs.py @@ -12,39 +12,44 @@ import threading import itertools import subprocess -import contextlib -import unittest.mock +import urllib.parse + import pytest import myst_parser.parsers.docutils_ import docutils.nodes import docutils.utils +from flask import Flask + +import jwcrypto from scitt_emulator.client import ClaimOperationError +from scitt_emulator.did_helpers import url_to_did_web from .test_cli import ( Service, content_type, payload, execute_cli, + create_flask_app_oidc_server, + create_flask_app_ssh_authorized_keys_server, ) repo_root = pathlib.Path(__file__).parents[1] docs_dir = repo_root.joinpath("docs") -allowlisted_issuer = "did:web:example.org" -non_allowlisted_issuer = "did:web:example.com" +non_allowlisted_issuer = "did:web:denied.example.com" CLAIM_DENIED_ERROR = {"type": "denied", "detail": "content_address_of_reason"} CLAIM_DENIED_ERROR_BLOCKED = { "type": "denied", "detail": textwrap.dedent( """ - 'did:web:example.com' is not one of ['did:web:example.org'] + 'did:web:denied.example.com' is not one of ['did:web:example.org'] Failed validating 'enum' in schema['properties']['issuer']: {'enum': ['did:web:example.org'], 'type': 'string'} On instance['issuer']: - 'did:web:example.com' + 'did:web:denied.example.com' """ ).lstrip(), } @@ -152,13 +157,31 @@ def docutils_find_code_samples(nodes): samples[node.astext()] = nodes[i + 3].astext() return samples -def test_docs_registration_policies(tmp_path): +def create_flask_app_nop_scitt_scrapi(config): + # Used to test resolving keys from scrapi + # /.well-known/transparency-configuration + app = Flask("nop") + + app.config.update(dict(DEBUG=True)) + app.config.update(config) + + return app + +@pytest.mark.parametrize( + "create_flask_app_notary_identity", [ + create_flask_app_oidc_server, + create_flask_app_ssh_authorized_keys_server, + create_flask_app_nop_scitt_scrapi, + ], +) +def test_docs_registration_policies(create_flask_app_notary_identity, tmp_path): workspace_path = tmp_path / "workspace" claim_path = tmp_path / "claim.cose" receipt_path = tmp_path / "claim.receipt.cbor" entry_id_path = tmp_path / "claim.entry_id.txt" retrieved_claim_path = tmp_path / "claim.retrieved.cose" + private_key_pem_path = tmp_path / "notary-private-key.pem" # Grab code samples from docs # TODO Abstract into abitrary docs testing code @@ -170,11 +193,26 @@ def test_docs_registration_policies(tmp_path): for name, content in docutils_find_code_samples(nodes).items(): tmp_path.joinpath(name).write_text(content) + key = jwcrypto.jwk.JWK.generate(kty="EC", crv="P-384") + # cwt_cose_key = cwt.COSEKey.generate_symmetric_key(alg=alg, kid=kid) + private_key_pem_path.write_bytes( + key.export_to_pem(private_key=True, password=None), + ) + algorithm = "ES384" + audience = "scitt.example.org" + subject = "repo:scitt-community/scitt-api-emulator:ref:refs/heads/main" + + # tell jsonschema_validator.py that we want to assume non-TLS URLs for tests + os.environ["DID_WEB_ASSUME_SCHEME"] = "http" + with Service( + {"key": key, "algorithms": [algorithm]}, + create_flask_app=create_flask_app_notary_identity, + ) as oidc_service, Service( { "tree_alg": "CCF", "workspace": workspace_path, - "error_rate": 0.1, + "error_rate": 0, "use_lro": True, } ) as service, SimpleFileBasedPolicyEngine( @@ -188,22 +226,41 @@ def test_docs_registration_policies(tmp_path): # set the policy to enforce service.server.app.scitt_service.service_parameters["insertPolicy"] = "external" - # create denied claim + if create_flask_app_nop_scitt_scrapi is create_flask_app_notary_identity: + # set the issuer to the SCITT SCRAPI service + issuer = url_to_did_web(service.url) + # use private key from SCITT SCRAPI service to sign + private_key_pem_path = workspace_path.joinpath("storage", "service_private_key.pem") + else: + # set the issuer to the did:web version of the OIDC / SSH keys service + issuer = url_to_did_web(oidc_service.url) + + # create claim command = [ "client", "create-claim", "--out", claim_path, "--issuer", - non_allowlisted_issuer, + issuer, + "--subject", + subject, "--content-type", content_type, "--payload", payload, + "--private-key-pem", + private_key_pem_path, ] execute_cli(command) assert os.path.exists(claim_path) + # replace example issuer with test OIDC service issuer (URL) in error + claim_denied_error_blocked = copy.deepcopy(CLAIM_DENIED_ERROR_BLOCKED) + claim_denied_error_blocked["detail"] = claim_denied_error_blocked["detail"].replace( + "did:web:denied.example.com", issuer, + ) + # submit denied claim command = [ "client", @@ -224,25 +281,18 @@ def test_docs_registration_policies(tmp_path): check_error = error assert check_error assert "error" in check_error.operation - assert check_error.operation["error"] == CLAIM_DENIED_ERROR_BLOCKED + if check_error.operation["error"] != claim_denied_error_blocked: + raise check_error assert not os.path.exists(receipt_path) assert not os.path.exists(entry_id_path) - # create accepted claim - command = [ - "client", - "create-claim", - "--out", - claim_path, - "--issuer", - allowlisted_issuer, - "--content-type", - content_type, - "--payload", - payload, - ] - execute_cli(command) - assert os.path.exists(claim_path) + # replace example issuer with test OIDC service issuer in allowlist + allowlist_schema_json_path = tmp_path.joinpath("allowlist.schema.json") + allowlist_schema_json_path.write_text( + allowlist_schema_json_path.read_text().replace( + "did:web:example.org", issuer, + ) + ) # submit accepted claim command = [ @@ -259,4 +309,6 @@ def test_docs_registration_policies(tmp_path): ] execute_cli(command) assert os.path.exists(receipt_path) + receipt_path.unlink() assert os.path.exists(entry_id_path) + receipt_path.unlink(entry_id_path)