From 233bd7ec1b0f9bc8221d720ea83560105c62a61a Mon Sep 17 00:00:00 2001 From: John Andersen Date: Fri, 10 Nov 2023 18:22:53 +0100 Subject: [PATCH] IN PROGRESS: docs: registration policies: CWT decode Signed-off-by: John Andersen --- docs/registration_policies.md | 51 ++++++++++++++++++++++++++++++++--- tests/test_docs.py | 28 ++++++++++++++++--- 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/docs/registration_policies.md b/docs/registration_policies.md index fc23db88..80b07785 100644 --- a/docs/registration_policies.md +++ b/docs/registration_policies.md @@ -77,6 +77,28 @@ Simple drop rule based on claim content allowlist. } ``` +**TODO** This example being brought up of date with recent CWT within COSESign1 +changes. Currently the `JWKS_URI` environment variable must be set to discover +valid notary keys used with `COSESign1` and CWT signing on statement creation. + +```console +$ cat private-key.pem | python -c 'import sys, json, jwcrypto.jwt; key = jwcrypto.jwt.JWK(); key.import_from_pem(sys.stdin.buffer.read()); print(json.dumps({"keys":[{**key.export_public(as_dict=True),"use": "sig","kid": key.thumbprint()}]}, indent=4, sort_keys=True))' | tee jwks +{ + "keys": [ + { + "crv": "P-384", + "kid": "y96luxaBaw6FeWVEMti_iqLWPSYk8cKLzZG8X45PA2k", + "kty": "EC", + "use": "sig", + "x": "ZQazDzYmcMHF5Dstkbw7SwWvR_oXQHFS-TLppri-0xDby8TmCpzHyr6TH03CLBxj", + "y": "lsIbRskEv06Rf0vttkB3vpXdZ-a50ck74MVyRwOvN55P4s8usQAm3PY1KnAgWtHF" + } + ] +} +$ python -m http.server 7777 & +$ export JWKS_URI="http://localhost:7777/jwks" +``` + **jsonschema_validator.py** ```python @@ -86,12 +108,18 @@ import json import pathlib import traceback +import jwt +import cwt +import cwt.algs.ec2 import cbor2 import pycose +# TODO Remove this once we have a example flow for proper key verification +import jwcrypto.jwk from jsonschema import validate, ValidationError +import pycose.keys.ec2 from pycose.messages import CoseMessage, Sign1Message -from scitt_emulator.scitt import ClaimInvalidError, COSE_Headers_Issuer +from scitt_emulator.scitt import ClaimInvalidError, CWTClaims claim = sys.stdin.buffer.read() @@ -99,21 +127,36 @@ msg = CoseMessage.decode(claim) if pycose.headers.ContentType not in msg.phdr: raise ClaimInvalidError("Claim does not have a content type header parameter") -if COSE_Headers_Issuer not in msg.phdr: - raise ClaimInvalidError("Claim does not have an issuer header parameter") if not msg.phdr[pycose.headers.ContentType].startswith("application/json"): raise TypeError( f"Claim content type does not start with application/json: {msg.phdr[pycose.headers.ContentType]!r}" ) +# TODO jwt.PyJWKClient.get_signing_key_from_cwt ? +keys = list( + [ + pycose.keys.ec2.EC2Key.from_dict( + cwt.algs.ec2.EC2Key.to_cose_key( + cwt.COSEKey.from_pem( + jwcrypto.jwk.JWK.from_json(key._jwk_data).export_to_pem(), + kid=jwcrypto.jwk.JWK.from_json(key._jwk_data).thumbprint() + ).to_dict() + ) + ) + for key in jwt.PyJWKClient(os.environ["JWKS_URI"]).get_signing_keys() + ] +) +cwt_claims = cwt.decode(msg.phdr[CWTClaims], keys) + SCHEMA = json.loads(pathlib.Path(os.environ["SCHEMA_PATH"]).read_text()) try: validate( instance={ "$schema": "https://schema.example.com/scitt-policy-engine-jsonschema.schema.json", - "issuer": msg.phdr[COSE_Headers_Issuer], + "issuer": cwt_claims[1], + "subject": cwt_claims[2], "claim": json.loads(msg.payload.decode()), }, schema=SCHEMA, diff --git a/tests/test_docs.py b/tests/test_docs.py index 465f0199..d68e12e4 100644 --- a/tests/test_docs.py +++ b/tests/test_docs.py @@ -19,6 +19,8 @@ import docutils.nodes import docutils.utils +import jwcrypto + from scitt_emulator.client import ClaimOperationError from .test_cli import ( @@ -26,12 +28,12 @@ content_type, payload, execute_cli, + create_flask_app_oidc_server, ) repo_root = pathlib.Path(__file__).parents[1] docs_dir = repo_root.joinpath("docs") -allowlisted_issuer = "did:web:example.org" non_allowlisted_issuer = "did:web:example.com" CLAIM_DENIED_ERROR = {"type": "denied", "detail": "content_address_of_reason"} CLAIM_DENIED_ERROR_BLOCKED = { @@ -170,7 +172,15 @@ def test_docs_registration_policies(tmp_path): for name, content in docutils_find_code_samples(nodes).items(): tmp_path.joinpath(name).write_text(content) + key = jwcrypto.jwk.JWK.generate(kty="EC", crv="P-384") + algorithm = "ES384" + audience = "scitt.example.org" + subject = "repo:scitt-community/scitt-api-emulator:ref:refs/heads/main" + with Service( + {"key": key, "algorithms": [algorithm]}, + create_flask_app=create_flask_app_oidc_server, + ) as oidc_service, Service( { "tree_alg": "CCF", "workspace": workspace_path, @@ -188,6 +198,18 @@ def test_docs_registration_policies(tmp_path): # set the policy to enforce service.server.app.scitt_service.service_parameters["insertPolicy"] = "external" + # replace example issuer with test OIDC service issuer (URL) + claim_denied_error_blocked = CLAIM_DENIED_ERROR_BLOCKED + claim_denied_error_blocked["detail"] = claim_denied_error_blocked["detail"].replace( + "did:web:example.org", oidc_service.url, + ) + allowlist_schema_json_path = tmp_path.joinpath("allowlist.schema.json") + allowlist_schema_json_path.write_text( + allowlist_schema_json_path.read_text().replace( + "did:web:example.org", oidc_service.url, + ) + ) + # create denied claim command = [ "client", @@ -226,7 +248,7 @@ def test_docs_registration_policies(tmp_path): check_error = error assert check_error assert "error" in check_error.operation - assert check_error.operation["error"] == CLAIM_DENIED_ERROR_BLOCKED + assert check_error.operation["error"] == claim_denied_error_blocked assert not os.path.exists(receipt_path) assert not os.path.exists(entry_id_path) @@ -237,7 +259,7 @@ def test_docs_registration_policies(tmp_path): "--out", claim_path, "--issuer", - allowlisted_issuer, + oidc_service.url, "--subject", "test", "--content-type",