From e4b590b32ed9ba2b293c255eb88e20dffa480bb0 Mon Sep 17 00:00:00 2001 From: John Andersen Date: Fri, 10 Nov 2023 18:22:53 +0100 Subject: [PATCH] docs: registration policies: CWT decode and COSESign1.verify_signature - Working with SSH authorized_keys and OIDC style jwks - CWT decode - COSESign1.verify_signature - Working registration policy Signed-off-by: John Andersen --- docs/registration_policies.md | 250 +++++++++++++++++++++++++++++++--- tests/test_cli.py | 19 ++- tests/test_docs.py | 90 +++++++++--- 3 files changed, 321 insertions(+), 38 deletions(-) diff --git a/docs/registration_policies.md b/docs/registration_policies.md index fc23db88..fe3a8294 100644 --- a/docs/registration_policies.md +++ b/docs/registration_policies.md @@ -12,14 +12,14 @@ The SCITT API emulator can deny entry based on presence of This is a simple way to enable evaluation of claims prior to submission by arbitrary policy engines which watch the workspace (fanotify, inotify, etc.). -[![asciicast-of-simple-decoupled-file-based-policy-engine](https://asciinema.org/a/572766.svg)](https://asciinema.org/a/572766) +[![asciicast-of-simple-decoupled-file-based-policy-engine](https://asciinema.org/a/620587.svg)](https://asciinema.org/a/620587) Start the server ```console $ rm -rf workspace/ $ mkdir -p workspace/storage/operations -$ scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro +$ timeout 0.5s scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro Service parameters: workspace/service_parameters.json ^C ``` @@ -84,36 +84,155 @@ import os import sys import json import pathlib +import unittest import traceback +import contextlib +import urllib.parse +import jwt +import cwt +import cwt.algs.ec2 import cbor2 import pycose + +# TODO Remove this once we have a example flow for proper key verification +import jwcrypto.jwk from jsonschema import validate, ValidationError -from pycose.messages import CoseMessage, Sign1Message +import pycose.keys.ec2 +import cryptography.hazmat.primitives.serialization +from pycose.messages import Sign1Message -from scitt_emulator.scitt import ClaimInvalidError, COSE_Headers_Issuer +from scitt_emulator.scitt import ClaimInvalidError, CWTClaims claim = sys.stdin.buffer.read() -msg = CoseMessage.decode(claim) +msg = Sign1Message.decode(claim, tag=True) if pycose.headers.ContentType not in msg.phdr: raise ClaimInvalidError("Claim does not have a content type header parameter") -if COSE_Headers_Issuer not in msg.phdr: - raise ClaimInvalidError("Claim does not have an issuer header parameter") if not msg.phdr[pycose.headers.ContentType].startswith("application/json"): raise TypeError( f"Claim content type does not start with application/json: {msg.phdr[pycose.headers.ContentType]!r}" ) +# TODO Whatever the opisite of COSESign1 is + +# Figure out what the issuer is +cwt_cose_loads = cwt.cose.COSE()._loads +cwt_unverified_protected = cwt_cose_loads(cwt_cose_loads(msg.phdr[CWTClaims]).value[2]) +unverified_issuer = cwt_unverified_protected[1] + +def did_web_to_url(did_web_string, scheme=os.environ.get("DID_WEB_ASSUME_SCHEME", "https")): + return "/".join( + [ + f"{scheme}:/", + *[urllib.parse.unquote(i) for i in did_web_string.split(":")[2:]], + ] + ) + +if unverified_issuer.startswith("did:web:"): + unverified_issuer = did_web_to_url(unverified_issuer) + +# TODO Should we use audiance? I think no, just want to make sure we've +# documented why thought if not. No usage makes sense to me becasue we don't +# know the intended audiance, it could be federated into multiple TS + +# TODO Can you just pass a whole public key as an issuer? + +# Load keys from issuer +jwk_keys = [] + +import urllib.request +import urllib.parse + +# TODO did:web: -> URL +from cryptography.hazmat.primitives import serialization + +cryptography_ssh_keys = [] +if "://" in unverified_issuer and not unverified_issuer.startswith("file://"): + # TODO Logging for URLErrors + # Check if OIDC issuer + unverified_issuer_parsed_url = urllib.parse.urlparse(unverified_issuer) + openid_configuration_url = unverified_issuer_parsed_url._replace( + path="/.well-known/openid-configuration", + ).geturl() + with contextlib.suppress(urllib.request.URLError): + with urllib.request.urlopen(openid_configuration_url) as response: + if response.status == 200: + openid_configuration = json.loads(response.read()) + jwks_uri = openid_configuration["jwks_uri"] + with urllib.request.urlopen(jwks_uri) as response: + if response.status == 200: + jwks = json.loads(response.read()) + for jwk_key_as_dict in jwks["keys"]: + jwk_key_as_string = json.dumps(jwk_key_as_dict) + jwk_keys.append( + jwcrypto.jwk.JWK.from_json(jwk_key_as_string), + ) + + # Try loading ssh keys. Example: https://github.com/username.keys + with contextlib.suppress(urllib.request.URLError): + with urllib.request.urlopen(unverified_issuer) as response: + while line := response.readline(): + with contextlib.suppress( + (ValueError, cryptography.exceptions.UnsupportedAlgorithm) + ): + cryptography_ssh_keys.append( + cryptography.hazmat.primitives.serialization.load_ssh_public_key( + line + ) + ) + +for cryptography_ssh_key in cryptography_ssh_keys: + jwk_keys.append( + jwcrypto.jwk.JWK.from_pem( + cryptography_ssh_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + ) + ) + +cwt_cose_keys = [] +pycose_cose_keys = [] + +for jwk_key in jwk_keys: + cwt_cose_key = cwt.COSEKey.from_pem( + jwk_key.export_to_pem(), + kid=jwk_key.thumbprint(), + ) + cwt_cose_keys.append(cwt_cose_key) + cwt_ec2_key_as_dict = cwt_cose_key.to_dict() + pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict) + pycose_cose_keys.append(pycose_cose_key) + +verify_signature = False +for pycose_cose_key in pycose_cose_keys: + with contextlib.suppress(Exception): + msg.key = pycose_cose_key + verify_signature = msg.verify_signature() + if verify_signature: + break + +unittest.TestCase().assertTrue( + verify_signature, + "Failed to verify signature on statement", +) + +cwt_protected = cwt.decode(msg.phdr[CWTClaims], cwt_cose_keys) +issuer = cwt_protected[1] +subject = cwt_protected[2] + +# TODO Validate content type is JSON? SCHEMA = json.loads(pathlib.Path(os.environ["SCHEMA_PATH"]).read_text()) try: validate( instance={ "$schema": "https://schema.example.com/scitt-policy-engine-jsonschema.schema.json", - "issuer": msg.phdr[COSE_Headers_Issuer], + "issuer": issuer, + "subject": subject, "claim": json.loads(msg.payload.decode()), }, schema=SCHEMA, @@ -140,21 +259,103 @@ echo ${CLAIM_PATH} Example running allowlist check and enforcement. ```console -npm install -g nodemon -nodemon -e .cose --exec 'find workspace/storage/operations -name \*.cose -exec nohup sh -xe policy_engine.sh $(cat workspace/service_parameters.json | jq -r .insertPolicy) {} \;' +$ npm install nodemon && \ + node_modules/.bin/nodemon -e .cose --exec 'find workspace/storage/operations -name \*.cose -exec nohup sh -xe policy_engine.sh $(cat workspace/service_parameters.json | jq -r .insertPolicy) {} \;' ``` Also ensure you restart the server with the new config we edited. ```console -scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro +$ scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro ``` -Create claim from allowed issuer (`.org`) and from non-allowed (`.com`). +The current emulator notary (create-statement) implementation will sign +statements using a generated key or a key we provide via the `--private-key-pem` +argument. If we provide the `--private-key-pem` argument but the key at the +given path does not exist, the generated key will be written out to that path. + +```console +$ export ISSUER_PORT="9000" && \ + export ISSUER_URL="http://localhost:${ISSUER_PORT}" +$ scitt-emulator client create-claim \ + --private-key-pem private-key.pem \ + --issuer "${ISSUER_URL}" \ + --subject "solar" \ + --content-type application/json \ + --payload '{"sun": "yellow"}' \ + --out claim.cose +``` + +The core of policy engine we implemented in `jsonschema_validator.py` will +verify the COSE message generated using the public portion of the notary's key. +We've implemented two possible styles of key resolution. Both of them require +resolution of public keys via an HTTP server. + +Let's start the HTTP server now, we'll populate the needed files in the +sections corresponding to each resolution style. + +```console +$ python -m http.server "${ISSUER_PORT}" & +$ python_http_server_pid=$! +``` + +### SSH `authorized_keys` style notary public key resolution + +Keys are discovered via making an HTTP GET request to the URL given by the +`issuer` parameter via the `web` DID method and de-serializing the SSH +public keys found within the response body. + +Start an HTTP server with an SSH public key served at the root. + +```console +$ cat private-key.pem | ssh-keygen -f /dev/stdin -y | tee index.html +``` + +### OpenID Connect token style notary public key resolution + +Keys are discovered two part resolution of HTTP paths relative to the issuer + +`/.well-known/openid-configuration` path is requested via HTTP GET. The +response body is parsed as JSON and the value of the `jwks_uri` key is +requested via HTTP GET. + +`/.well-known/jwks` (is typically the value of `jwks_uri`) path is requested +via HTTP GET. The response body is parsed as JSON. Public keys are loaded +from the value of the `keys` key which stores an array of JSON Web Key (JWK) +style serializations. + +```console +$ mkdir -p .well-known/ +$ cat > .well-known/openid-configuration < @@ -174,10 +375,27 @@ Failed validating 'enum' in schema['properties']['issuer']: On instance['issuer']: 'did:web:example.com' +``` + +Modify the allowlist to ensure that our issuer, aka our local HTTP server with +our keys, is set to be the allowed issuer. + +```console +$ export allowlist="$(cat allowlist.schema.json)" && \ + jq '.properties.issuer.enum[0] = env.ISSUER_URL' <(echo "${allowlist}") \ + | tee allowlist.schema.json +``` -$ scitt-emulator client create-claim --issuer did:web:example.org --subject "solar" --content-type application/json --payload '{"sun": "yellow"}' --out claim.cose -A COSE signed Claim was written to: claim.cose +Submit the statement from the issuer we just added to the allowlist. + +```console $ scitt-emulator client submit-claim --claim claim.cose --out claim.receipt.cbor Claim registered with entry ID 1 Receipt written to claim.receipt.cbor ``` + +Stop the server that serves the public keys + +```console +$ kill $python_http_server_pid +``` diff --git a/tests/test_cli.py b/tests/test_cli.py index 140d76ff..48c0d6f7 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,12 +1,13 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. import os +import io import json import threading import pytest import jwt import jwcrypto -from flask import Flask, jsonify +from flask import Flask, jsonify, send_file from werkzeug.serving import make_server from scitt_emulator import cli, server from scitt_emulator.oidc import OIDCAuthMiddleware @@ -164,6 +165,22 @@ def create_flask_app_oidc_server(config): app.config.update(dict(DEBUG=True)) app.config.update(config) + # TODO For testing ssh key style issuers, not OIDC related needs to be moved + @app.route("/", methods=["GET"]) + def ssh_public_keys(): + from cryptography.hazmat.primitives import serialization + return send_file( + io.BytesIO( + serialization.load_pem_public_key( + app.config["key"].export_to_pem(), + ).public_bytes( + encoding=serialization.Encoding.OpenSSH, + format=serialization.PublicFormat.OpenSSH, + ) + ), + mimetype="text/plain", + ) + @app.route("/.well-known/openid-configuration", methods=["GET"]) def openid_configuration(): return jsonify( diff --git a/tests/test_docs.py b/tests/test_docs.py index 465f0199..580736e7 100644 --- a/tests/test_docs.py +++ b/tests/test_docs.py @@ -13,12 +13,16 @@ import itertools import subprocess import contextlib +import urllib.parse import unittest.mock + import pytest import myst_parser.parsers.docutils_ import docutils.nodes import docutils.utils +import jwcrypto + from scitt_emulator.client import ClaimOperationError from .test_cli import ( @@ -26,25 +30,25 @@ content_type, payload, execute_cli, + create_flask_app_oidc_server, ) repo_root = pathlib.Path(__file__).parents[1] docs_dir = repo_root.joinpath("docs") -allowlisted_issuer = "did:web:example.org" -non_allowlisted_issuer = "did:web:example.com" +non_allowlisted_issuer = "did:web:denied.example.com" CLAIM_DENIED_ERROR = {"type": "denied", "detail": "content_address_of_reason"} CLAIM_DENIED_ERROR_BLOCKED = { "type": "denied", "detail": textwrap.dedent( """ - 'did:web:example.com' is not one of ['did:web:example.org'] + 'did:web:denied.example.com' is not one of ['did:web:example.org'] Failed validating 'enum' in schema['properties']['issuer']: {'enum': ['did:web:example.org'], 'type': 'string'} On instance['issuer']: - 'did:web:example.com' + 'did:web:denied.example.com' """ ).lstrip(), } @@ -152,6 +156,15 @@ def docutils_find_code_samples(nodes): samples[node.astext()] = nodes[i + 3].astext() return samples +def url_to_did_web(url_string): + url = urllib.parse.urlparse(url_string) + return ":".join( + [ + urllib.parse.quote(i) + for i in ["did", "web", url.netloc, *filter(bool, url.path.split("/"))] + ] + ) + def test_docs_registration_policies(tmp_path): workspace_path = tmp_path / "workspace" @@ -159,6 +172,7 @@ def test_docs_registration_policies(tmp_path): receipt_path = tmp_path / "claim.receipt.cbor" entry_id_path = tmp_path / "claim.entry_id.txt" retrieved_claim_path = tmp_path / "claim.retrieved.cose" + private_key_pem_path = tmp_path / "notary-private-key.pem" # Grab code samples from docs # TODO Abstract into abitrary docs testing code @@ -170,7 +184,22 @@ def test_docs_registration_policies(tmp_path): for name, content in docutils_find_code_samples(nodes).items(): tmp_path.joinpath(name).write_text(content) + key = jwcrypto.jwk.JWK.generate(kty="EC", crv="P-384") + # cwt_cose_key = cwt.COSEKey.generate_symmetric_key(alg=alg, kid=kid) + private_key_pem_path.write_bytes( + key.export_to_pem(private_key=True, password=None), + ) + algorithm = "ES384" + audience = "scitt.example.org" + subject = "repo:scitt-community/scitt-api-emulator:ref:refs/heads/main" + + # tell jsonschema_validator.py that we want to assume non-TLS URLs for tests + os.environ["DID_WEB_ASSUME_SCHEME"] = "http" + with Service( + {"key": key, "algorithms": [algorithm]}, + create_flask_app=create_flask_app_oidc_server, + ) as oidc_service, Service( { "tree_alg": "CCF", "workspace": workspace_path, @@ -188,24 +217,35 @@ def test_docs_registration_policies(tmp_path): # set the policy to enforce service.server.app.scitt_service.service_parameters["insertPolicy"] = "external" - # create denied claim + # set the issuer to the did:web version of the OIDC / SSH keys service + issuer = url_to_did_web(oidc_service.url) + + # create claim command = [ "client", "create-claim", "--out", claim_path, "--issuer", - non_allowlisted_issuer, + issuer, "--subject", - "test", + subject, "--content-type", content_type, "--payload", payload, + "--private-key-pem", + private_key_pem_path, ] execute_cli(command) assert os.path.exists(claim_path) + # replace example issuer with test OIDC service issuer (URL) in error + claim_denied_error_blocked = CLAIM_DENIED_ERROR_BLOCKED + claim_denied_error_blocked["detail"] = claim_denied_error_blocked["detail"].replace( + "did:web:denied.example.com", issuer, + ) + # submit denied claim command = [ "client", @@ -226,29 +266,37 @@ def test_docs_registration_policies(tmp_path): check_error = error assert check_error assert "error" in check_error.operation - assert check_error.operation["error"] == CLAIM_DENIED_ERROR_BLOCKED + assert check_error.operation["error"] == claim_denied_error_blocked assert not os.path.exists(receipt_path) assert not os.path.exists(entry_id_path) - # create accepted claim + # replace example issuer with test OIDC service issuer in allowlist + allowlist_schema_json_path = tmp_path.joinpath("allowlist.schema.json") + allowlist_schema_json_path.write_text( + allowlist_schema_json_path.read_text().replace( + "did:web:example.org", issuer, + ) + ) + + # submit accepted claim using SSH authorized_keys lookup command = [ "client", - "create-claim", - "--out", + "submit-claim", + "--claim", claim_path, - "--issuer", - allowlisted_issuer, - "--subject", - "test", - "--content-type", - content_type, - "--payload", - payload, + "--out", + receipt_path, + "--out-entry-id", + entry_id_path, + "--url", + service.url ] execute_cli(command) - assert os.path.exists(claim_path) + assert os.path.exists(receipt_path) + assert os.path.exists(entry_id_path) - # submit accepted claim + # TODO Switch back on the OIDC routes + # submit accepted claim using OIDC -> jwks lookup command = [ "client", "submit-claim",