diff --git a/setup.py b/setup.py index 51bb389ea..f9fa34993 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ install_requires=[ "pyop >= v3.4.0", "pysaml2 >= 6.5.1", - "pycryptodomex", + "cryptojwt >= 1.8.3", "requests", "PyYAML", "gunicorn", diff --git a/src/satosa/backends/apple.py b/src/satosa/backends/apple.py index 37f756a68..177bb4d52 100644 --- a/src/satosa/backends/apple.py +++ b/src/satosa/backends/apple.py @@ -258,7 +258,7 @@ def _translate_response(self, response, issuer): :type subject_type: str :rtype: InternalData - :param response: Dictioary with attribute name as key. + :param response: Dictionary with attribute name as key. :param issuer: The oidc op that gave the repsonse. :param subject_type: public or pairwise according to oidc standard. :return: A SATOSA internal response. diff --git a/src/satosa/backends/base.py b/src/satosa/backends/base.py index 8d0432da8..b90f5ee05 100644 --- a/src/satosa/backends/base.py +++ b/src/satosa/backends/base.py @@ -66,7 +66,7 @@ def register_endpoints(self): def get_metadata_desc(self): """ Returns a description of the backend module. - This is used when creating SAML metadata for the frontend of the proxy + :rtype: satosa.metadata_creation.description.MetadataDescription :return: A description of the backend """ diff --git a/src/satosa/backends/idpy_oidc.py b/src/satosa/backends/idpy_oidc.py index f3ea43f61..b627315e4 100644 --- a/src/satosa/backends/idpy_oidc.py +++ b/src/satosa/backends/idpy_oidc.py @@ -1,22 +1,21 @@ """ OIDC/OAuth2 backend module. """ -import datetime import logging +from datetime import datetime from urllib.parse import urlparse from idpyoidc.client.oauth2.stand_alone_client import StandAloneClient from idpyoidc.server.user_authn.authn_context import UNSPECIFIED +import satosa.logging_util as lu from satosa.backends.base import BackendModule from satosa.internal import AuthenticationInformation from satosa.internal import InternalData -import satosa.logging_util as lu from ..exception import SATOSAAuthenticationError from ..exception import SATOSAError from ..response import Redirect - UTC = datetime.timezone.utc logger = logging.getLogger(__name__) @@ -48,14 +47,7 @@ def __init__(self, auth_callback_func, internal_attributes, config, base_url, na super().__init__(auth_callback_func, internal_attributes, base_url, name) # self.auth_callback_func = auth_callback_func # self.config = config - self.client = StandAloneClient(config=config["client"], client_type="oidc") - self.client.do_provider_info() - self.client.do_client_registration() - - _redirect_uris = self.client.context.claims.get_usage('redirect_uris') - if not _redirect_uris: - raise SATOSAError("Missing path in redirect uri") - self.redirect_path = urlparse(_redirect_uris[0]).path + self.client = create_client(config["client"]) def start_auth(self, context, internal_request): """ @@ -77,7 +69,11 @@ def register_endpoints(self): :return: A list that can be used to map the request to SATOSA to this endpoint. """ url_map = [] - url_map.append((f"^{self.redirect_path.lstrip('/')}$", self.response_endpoint)) + redirect_path = self.client.context.claims.get_usage('redirect_uris') + if not redirect_path: + raise SATOSAError("Missing path in redirect uri") + redirect_path = urlparse(redirect_path[0]).path + url_map.append(("^%s$" % redirect_path.lstrip("/"), self.response_endpoint)) return url_map def response_endpoint(self, context, *args): @@ -123,16 +119,7 @@ def _translate_response(self, response, issuer): :param subject_type: public or pairwise according to oidc standard. :return: A SATOSA internal response. """ - timestamp_epoch = ( - response.get("auth_time") - or response.get("iat") - or int(datetime.datetime.now(UTC).timestamp()) - ) - timestamp_dt = datetime.datetime.fromtimestamp(timestamp_epoch, UTC) - timestamp_iso = timestamp_dt.isoformat().replace("+00:00", "Z") - auth_class_ref = response.get("acr") or response.get("amr") or UNSPECIFIED - auth_info = AuthenticationInformation(auth_class_ref, timestamp_iso, issuer) - + auth_info = AuthenticationInformation(UNSPECIFIED, str(datetime.now()), issuer) internal_resp = InternalData(auth_info=auth_info) internal_resp.attributes = self.converter.to_internal("openid", response) internal_resp.subject_id = response["sub"] @@ -154,3 +141,10 @@ def _check_error_response(self, response, context): logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg) logger.debug(logline) raise SATOSAAuthenticationError(context.state, "Access denied") + +def create_client(config: dict): + _client_type = config.get('client_type') or "oidc" + _client = StandAloneClient(config=config, client_type=_client_type) + _client.do_provider_info() + _client.do_client_registration() + return _client diff --git a/src/satosa/base.py b/src/satosa/base.py index 1e17c8cbe..1a06e2201 100644 --- a/src/satosa/base.py +++ b/src/satosa/base.py @@ -5,7 +5,7 @@ import logging import uuid -from saml2.s_utils import UnknownSystemEntity +# from saml2.s_utils import UnknownSystemEntity from satosa import util from satosa.response import BadRequest @@ -309,48 +309,11 @@ def run(self, context): redirect_url = f"{generic_error_url}?errorid={error_id}" return Redirect(generic_error_url) raise - except SATOSANoBoundEndpointError as e: - error_id = uuid.uuid4().urn - msg = { - "message": "URL-path is not bound to any endpoint function", - "error": str(e), - "error_id": error_id, - } - logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg) - logger.error(logline) - generic_error_url = self.config.get("ERROR_URL") - if generic_error_url: - redirect_url = f"{generic_error_url}?errorid={error_id}" - return Redirect(generic_error_url) - return NotFound("The Service or Identity Provider you requested could not be found.") - except SATOSAError as e: - error_id = uuid.uuid4().urn - msg = { - "message": "Uncaught SATOSA error", - "error": str(e), - "error_id": error_id, - } - logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg) - logger.error(logline) - generic_error_url = self.config.get("ERROR_URL") - if generic_error_url: - redirect_url = f"{generic_error_url}?errorid={error_id}" - return Redirect(generic_error_url) - raise - except UnknownSystemEntity as e: - error_id = uuid.uuid4().urn - msg = { - "message": "Configuration error: unknown system entity", - "error": str(e), - "error_id": error_id, - } - logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg) - logger.error(logline) - generic_error_url = self.config.get("ERROR_URL") - if generic_error_url: - redirect_url = f"{generic_error_url}?errorid={error_id}" - return Redirect(generic_error_url) - raise + # except UnknownSystemEntity as err: + # msg = "configuration error: unknown system entity " + str(err) + # logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg) + # logger.error(logline, exc_info=False) + # raise except Exception as e: error_id = uuid.uuid4().urn msg = { diff --git a/src/satosa/cert_util.py b/src/satosa/cert_util.py new file mode 100644 index 000000000..e8581f749 --- /dev/null +++ b/src/satosa/cert_util.py @@ -0,0 +1,75 @@ +import datetime + +from cryptography import x509 +from cryptography.hazmat._oid import NameOID +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptojwt.jwk.rsa import import_private_rsa_key_from_file +from cryptojwt.jwk.rsa import RSAKey + + +def create_certificate(cert_info): + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, cert_info['cn']), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, cert_info['state']), + x509.NameAttribute(NameOID.LOCALITY_NAME, cert_info['state']), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, cert_info['organization']), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, cert_info['organization_unit']), + ]) + item = x509.CertificateBuilder().subject_name( + subject + ).issuer_name( + issuer + ).public_key( + key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.utcnow() + ).not_valid_after( + datetime.datetime.utcnow() + datetime.timedelta(days=10) + ) + + if 'dns_name' in cert_info: + item.add_extension( + x509.SubjectAlternativeName([x509.DNSName(cert_info['dns_name'])]), critical=False + ) + + cert = item.sign(key, hashes.SHA256()) + cert_str = cert.public_bytes(serialization.Encoding.PEM) + + key_str = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + return cert_str, key_str + + +def generate_cert(): + cert_info = { + "cn": "SE", + "country_code": "se", + "state": "ac", + "city": "Umea", + "organization": "ITS", + "organization_unit": "DIRG" + } + cert_str, key_str = create_certificate(cert_info) + return cert_str, key_str + + +def write_cert(cert_path, key_path): + cert, key = generate_cert() + with open(cert_path, "wb") as cert_file: + cert_file.write(cert) + with open(key_path, "wb") as key_file: + key_file.write(key) + +def rsa_key_from_pem(file_name, **kwargs): + _key = RSAKey(**kwargs) + _key.load_key(import_private_rsa_key_from_file(file_name)) + return _key diff --git a/src/satosa/micro_services/account_linking.py b/src/satosa/micro_services/account_linking.py index 7305c3d79..4f21afdb3 100644 --- a/src/satosa/micro_services/account_linking.py +++ b/src/satosa/micro_services/account_linking.py @@ -5,10 +5,11 @@ import logging import requests -from jwkest.jwk import rsa_load, RSAKey -from jwkest.jws import JWS +from cryptojwt import JWS +from cryptojwt.jwk.rsa import import_private_rsa_key_from_file from satosa.internal import InternalData +from ..cert_util import rsa_key_from_pem from ..exception import SATOSAAuthenticationError from ..micro_services.base import ResponseMicroService from ..response import Redirect @@ -30,7 +31,8 @@ def __init__(self, config, *args, **kwargs): super().__init__(*args, **kwargs) self.api_url = config["api_url"] self.redirect_url = config["redirect_url"] - self.signing_key = RSAKey(key=rsa_load(config["sign_key"]), use="sig", alg="RS256") + self.signing_key = rsa_key_from_pem(config["sign_key"]) + self.signing_key.alg = config.get('signing_alg', "RS256") self.endpoint = "/handle_account_linking" self.id_to_attr = config.get("id_to_attr", None) logger.info("Account linking is active") diff --git a/src/satosa/micro_services/consent.py b/src/satosa/micro_services/consent.py index a469e2189..90af824df 100644 --- a/src/satosa/micro_services/consent.py +++ b/src/satosa/micro_services/consent.py @@ -7,12 +7,16 @@ from base64 import urlsafe_b64encode import requests -from jwkest.jwk import RSAKey -from jwkest.jwk import rsa_load -from jwkest.jws import JWS +from cryptojwt import JWS +from cryptojwt.jwk.rsa import import_private_rsa_key_from_file +from cryptojwt.jwk.rsa import RSAKey +# from jwkest.jwk import RSAKey +# from jwkest.jwk import rsa_load +# from jwkest.jws import JWS from requests.exceptions import ConnectionError import satosa.logging_util as lu +from satosa.cert_util import rsa_key_from_pem from satosa.internal import InternalData from satosa.micro_services.base import ResponseMicroService from satosa.response import Redirect @@ -41,7 +45,7 @@ def __init__(self, config, internal_attributes, *args, **kwargs): if "user_id_to_attr" in internal_attributes: self.locked_attr = internal_attributes["user_id_to_attr"] - self.signing_key = RSAKey(key=rsa_load(config["sign_key"]), use="sig", alg="RS256") + self.signing_key = rsa_key_from_pem(config["sign_key"], use="sig", alg="RS256") self.endpoint = "/handle_consent" logger.info("Consent flow is active") diff --git a/src/satosa/state.py b/src/satosa/state.py index 1fc768425..bd628fd6b 100644 --- a/src/satosa/state.py +++ b/src/satosa/state.py @@ -4,22 +4,21 @@ """ import base64 import copy -import hashlib import json import logging from collections import UserDict -from satosa.cookies import SimpleCookie +from lzma import LZMACompressor +from lzma import LZMADecompressor from uuid import uuid4 -from lzma import LZMACompressor, LZMADecompressor - -from Cryptodome import Random -from Cryptodome.Cipher import AES +# from cryptography.hazmat.primitives.ciphers.algorithms import AES +from cryptojwt.jwe.aes import AES_GCMEncrypter +from cryptojwt.jwe.utils import get_random_bytes import satosa.logging_util as lu +from satosa.cookies import SimpleCookie from satosa.exception import SATOSAStateError - logger = logging.getLogger(__name__) _SESSION_ID_KEY = "SESSION_ID" @@ -27,11 +26,11 @@ class State(UserDict): """ - This class holds a state attribute object. A state object must be able to be converted to - a json string, otherwise will an exception be raised. + This class holds a state attribute object. A state object must be possible to convert to + a json string, otherwise an exception will be raised. """ - def __init__(self, urlstate_data=None, encryption_key=None): + def __init__(self, urlstate_data=None, encryption_key: str = ""): """ If urlstate is empty a new empty state instance will be returned. @@ -52,27 +51,9 @@ def __init__(self, urlstate_data=None, encryption_key=None): raise ValueError("If an 'urlstate_data' is supplied 'encrypt_key' must be specified.") if urlstate_data: - try: - urlstate_data_bytes = urlstate_data.encode("utf-8") - urlstate_data_b64decoded = base64.urlsafe_b64decode(urlstate_data_bytes) - lzma = LZMADecompressor() - urlstate_data_decompressed = lzma.decompress(urlstate_data_b64decoded) - urlstate_data_decrypted = _AESCipher(encryption_key).decrypt( - urlstate_data_decompressed - ) - lzma = LZMADecompressor() - urlstate_data_decrypted_decompressed = lzma.decompress(urlstate_data_decrypted) - urlstate_data_obj = json.loads(urlstate_data_decrypted_decompressed) - except Exception as e: - error_context = { - "message": "Failed to load state data. Reinitializing empty state.", - "reason": str(e), - "urlstate_data": urlstate_data, - } - logger.warning(error_context) + urlstate_data = self.unpack(urlstate_data, encryption_key=encryption_key) + if urlstate_data is None: urlstate_data = {} - else: - urlstate_data = urlstate_data_obj session_id = ( urlstate_data[_SESSION_ID_KEY] @@ -87,25 +68,54 @@ def __init__(self, urlstate_data=None, encryption_key=None): def session_id(self): return self.data.get(_SESSION_ID_KEY) - def urlstate(self, encryption_key): + def unpack(self, data: str, encryption_key): + """ + + :param data: A string created by the method pack in this class. """ - Will return a url safe representation of the state. + try: + data_bytes = data.encode("utf-8") + data_b64decoded = base64.urlsafe_b64decode(data_bytes) + lzma = LZMADecompressor() + data_decompressed = lzma.decompress(data_b64decoded) + _iv = data_decompressed[:12] + _msg = data_decompressed[12:] + data_decrypted = AES_GCMEncrypter(key=encryption_key).decrypt(_msg, iv=_iv) + lzma = LZMADecompressor() + data_decrypted_decompressed = lzma.decompress(data_decrypted) + data_obj = json.loads(data_decrypted_decompressed) + except Exception as e: + error_context = { + "message": "Failed to load state data. Reinitializing empty state.", + "reason": str(e), + "urlstate_data": data, + } + logger.warning(error_context) + data_obj = None + + return data_obj + + def pack(self, encryption_key): + """ + Will return an url safe representation of the state. :type encryption_key: Key used for encryption. :rtype: str :return: Url representation av of the state. """ + lzma = LZMACompressor() - urlstate_data = json.dumps(self.data) - urlstate_data = lzma.compress(urlstate_data.encode("UTF-8")) - urlstate_data += lzma.flush() - urlstate_data = _AESCipher(encryption_key).encrypt(urlstate_data) + _data = json.dumps(self.data) + _iv = get_random_bytes(12) + _data = lzma.compress(_data.encode("UTF-8")) + _data += lzma.flush() + _data = _iv + AES_GCMEncrypter(key=encryption_key).encrypt(_data, iv=_iv) lzma = LZMACompressor() - urlstate_data = lzma.compress(urlstate_data) - urlstate_data += lzma.flush() - urlstate_data = base64.urlsafe_b64encode(urlstate_data) - return urlstate_data.decode("utf-8") + _data = lzma.compress(_data) + _data += lzma.flush() + _data = base64.urlsafe_b64encode(_data) + return _data.decode("utf-8") def copy(self): """ @@ -129,15 +139,15 @@ def state_dict(self): def state_to_cookie( - state: State, - *, - name: str, - path: str, - encryption_key: str, - secure: bool = None, - httponly: bool = None, - samesite: str = None, - max_age: str = None, + state: State, + # *, + name: str, + path: str, + encryption_key: str, + secure: bool = None, + httponly: bool = None, + samesite: str = None, + max_age: str = None, ) -> SimpleCookie: """ Saves a state to a cookie @@ -156,7 +166,7 @@ def state_to_cookie( :return: A cookie object """ cookie = SimpleCookie() - cookie[name] = "" if state.delete else state.urlstate(encryption_key) + cookie[name] = "" if state.delete else state.pack(encryption_key) cookie[name]["path"] = path cookie[name]["secure"] = secure if secure is not None else True cookie[name]["httponly"] = httponly if httponly is not None else "" @@ -205,71 +215,3 @@ def cookie_to_state(cookie_str: str, name: str, encryption_key: str) -> State: else: return state - -class _AESCipher(object): - """ - This class will perform AES encryption/decryption with a keylength of 256. - - @see: http://stackoverflow.com/questions/12524994/encrypt-decrypt-using-pycrypto-aes-256 - """ - - def __init__(self, key): - """ - Constructor - - :type key: str - - :param key: The key used for encryption and decryption. The longer key the better. - """ - self.bs = 32 - self.key = hashlib.sha256(key.encode()).digest() - - def encrypt(self, raw): - """ - Encryptes the parameter raw. - - :type raw: bytes - :rtype: str - - :param: bytes to be encrypted. - - :return: A base 64 encoded string. - """ - raw = self._pad(raw) - iv = Random.new().read(AES.block_size) - cipher = AES.new(self.key, AES.MODE_CBC, iv) - return base64.urlsafe_b64encode(iv + cipher.encrypt(raw)) - - def decrypt(self, enc): - """ - Decryptes the parameter enc. - - :type enc: bytes - :rtype: bytes - - :param: The value to be decrypted. - :return: The decrypted value. - """ - enc = base64.urlsafe_b64decode(enc) - iv = enc[:AES.block_size] - cipher = AES.new(self.key, AES.MODE_CBC, iv) - return self._unpad(cipher.decrypt(enc[AES.block_size:])) - - def _pad(self, b): - """ - Will padd the param to be of the correct length for the encryption alg. - - :type b: bytes - :rtype: bytes - """ - return b + (self.bs - len(b) % self.bs) * chr(self.bs - len(b) % self.bs).encode("UTF-8") - - @staticmethod - def _unpad(b): - """ - Removes the padding performed by the method _pad. - - :type b: bytes - :rtype: bytes - """ - return b[:-ord(b[len(b) - 1:])] diff --git a/tests/conftest.py b/tests/conftest.py index f0602a028..d7d54f231 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,15 +1,11 @@ -import copy import os import pytest -from saml2 import BINDING_HTTP_REDIRECT, BINDING_HTTP_POST -from saml2.extension.idpdisc import BINDING_DISCO -from saml2.saml import NAME_FORMAT_URI, NAMEID_FORMAT_TRANSIENT, NAMEID_FORMAT_PERSISTENT +from satosa.cert_util import generate_cert +from satosa.cert_util import write_cert from satosa.context import Context from satosa.state import State -from .util import create_metadata_from_config_dict -from .util import generate_cert, write_cert BASE_URL = "https://test-proxy.com" @@ -36,87 +32,6 @@ def cert_and_key(tmpdir): return cert_path, key_path -@pytest.fixture -def sp_conf(cert_and_key): - sp_base = "http://example.com" - spconfig = { - "entityid": "{}/unittest_sp.xml".format(sp_base), - "service": { - "sp": { - "endpoints": { - "assertion_consumer_service": [ - ("%s/acs/redirect" % sp_base, BINDING_HTTP_REDIRECT) - ], - "discovery_response": [("%s/disco" % sp_base, BINDING_DISCO)] - }, - "want_response_signed": False, - "allow_unsolicited": True, - "name_id_format": [NAMEID_FORMAT_PERSISTENT] - }, - }, - "cert_file": cert_and_key[0], - "key_file": cert_and_key[1], - "metadata": {"inline": []}, - } - - return spconfig - - -@pytest.fixture -def idp_conf(cert_and_key): - idp_base = "http://idp.example.com" - - idpconfig = { - "entityid": "{}/{}/proxy.xml".format(idp_base, "Saml2IDP"), - "description": "A SAML2SAML proxy", - "service": { - "idp": { - "name": "Proxy IdP", - "endpoints": { - "single_sign_on_service": [ - ("%s/sso/redirect" % idp_base, BINDING_HTTP_REDIRECT), - ], - }, - "policy": { - "default": { - "lifetime": {"minutes": 15}, - "attribute_restrictions": None, # means all I have - "name_form": NAME_FORMAT_URI, - "fail_on_missing_requested": False - }, - }, - "subject_data": {}, - "name_id_format": [NAMEID_FORMAT_TRANSIENT, - NAMEID_FORMAT_PERSISTENT], - "want_authn_requests_signed": False, - "ui_info": { - "display_name": [{"text": "SATOSA Test IdP", "lang": "en"}], - "description": [{"text": "Test IdP for SATOSA unit tests.", "lang": "en"}], - "logo": [{"text": "https://idp.example.com/static/logo.png", "width": "120", "height": "60", - "lang": "en"}], - }, - }, - }, - "cert_file": cert_and_key[0], - "key_file": cert_and_key[1], - "metadata": {"inline": []}, - "organization": { - "name": [["Test IdP Org.", "en"]], - "display_name": [["Test IdP", "en"]], - "url": [["https://idp.example.com/about", "en"]] - }, - "contact_person": [ - {"given_name": "Test IdP", "sur_name": "Support", "email_address": ["help@idp.example.com"], - "contact_type": "support" - }, - {"given_name": "Test IdP", "sur_name": "Tech support", - "email_address": ["tech@idp.example.com"], "contact_type": "technical"} - ] - } - - return idpconfig - - @pytest.fixture def context(): context = Context() @@ -180,153 +95,6 @@ def response_microservice_config(): return data -@pytest.fixture -def saml_frontend_config(cert_and_key, sp_conf): - data = { - "module": "satosa.frontends.saml2.SAMLFrontend", - "name": "SAML2Frontend", - "config": { - "idp_config": { - "entityid": "frontend-entity_id", - "service": { - "idp": { - "endpoints": { - "single_sign_on_service": [] - }, - "name": "Frontend IdP", - "name_id_format": NAMEID_FORMAT_TRANSIENT, - "policy": { - "default": { - "attribute_restrictions": None, - "fail_on_missing_requested": False, - "lifetime": {"minutes": 15}, - "name_form": NAME_FORMAT_URI - } - } - } - }, - "cert_file": cert_and_key[0], - "key_file": cert_and_key[1], - "metadata": {"inline": [create_metadata_from_config_dict(sp_conf)]}, - "organization": { - "name": [["SATOSA Org.", "en"]], - "display_name": [["SATOSA", "en"]], - "url": [["https://satosa.example.com/about", "en"]] - }, - "contact_person": [ - {"given_name": "SATOSA", "sur_name": "Support", "email_address": ["help@satosa.example.com"], - "contact_type": "support" - }, - {"given_name": "SATOSA", "sur_name": "Tech Support", "email_address": ["tech@satosa.example.com"], - "contact_type": "technical" - } - ] - }, - - "endpoints": { - "single_sign_on_service": {BINDING_HTTP_POST: "sso/post", - BINDING_HTTP_REDIRECT: "sso/redirect"} - } - } - } - - return data - - -@pytest.fixture -def saml_backend_config(idp_conf): - name = "SAML2Backend" - data = { - "module": "satosa.backends.saml2.SAMLBackend", - "name": name, - "config": { - "sp_config": { - "entityid": "backend-entity_id", - "organization": {"display_name": "Example Identities", "name": "Test Identities Org.", - "url": "http://www.example.com"}, - "contact_person": [ - {"contact_type": "technical", "email_address": "technical@example.com", - "given_name": "Technical"}, - {"contact_type": "support", "email_address": "support@example.com", "given_name": "Support"} - ], - "service": { - "sp": { - "want_response_signed": False, - "allow_unsolicited": True, - "endpoints": { - "assertion_consumer_service": [ - ("{}/{}/acs/redirect".format(BASE_URL, name), BINDING_HTTP_REDIRECT)], - "discovery_response": [("{}/disco", BINDING_DISCO)] - - } - } - }, - "metadata": {"inline": [create_metadata_from_config_dict(idp_conf)]} - } - } - } - return data - - -@pytest.fixture -def saml_mirror_frontend_config(saml_frontend_config): - data = copy.deepcopy(saml_frontend_config) - data["module"] = "satosa.frontends.saml2.SAMLMirrorFrontend" - data["name"] = "SAMLMirrorFrontend" - return data - - -@pytest.fixture -def oidc_backend_config(): - data = { - "module": "satosa.backends.openid_connect.OpenIDConnectBackend", - "name": "OIDCBackend", - "config": { - "provider_metadata": { - "issuer": "https://op.example.com", - "authorization_endpoint": "https://example.com/authorization" - }, - "client": { - "auth_req_params": { - "response_type": "code", - "scope": "openid, profile, email, address, phone" - }, - "client_metadata": { - "client_id": "backend_client", - "application_name": "SATOSA", - "application_type": "web", - "contacts": ["suppert@example.com"], - "redirect_uris": ["http://example.com/OIDCBackend"], - "subject_type": "public", - } - }, - "entity_info": { - "contact_person": [{ - "contact_type": "technical", - "email_address": ["technical_test@example.com", "support_test@example.com"], - "given_name": "Test", - "sur_name": "OP" - }, { - "contact_type": "support", - "email_address": ["support_test@example.com"], - "given_name": "Support_test" - }], - "organization": { - "display_name": ["OP Identities", "en"], - "name": [["En test-OP", "se"], ["A test OP", "en"]], - "url": [["http://www.example.com", "en"], ["http://www.example.se", "se"]], - "ui_info": { - "description": [["This is a test OP", "en"]], - "display_name": [["OP - TEST", "en"]] - } - } - } - } - } - - return data - - @pytest.fixture def account_linking_module_config(signing_key_path): account_linking_config = { diff --git a/tests/conftest_oidc.py b/tests/conftest_oidc.py new file mode 100644 index 000000000..20eb218c0 --- /dev/null +++ b/tests/conftest_oidc.py @@ -0,0 +1,54 @@ +import pytest + + +@pytest.fixture +def oidc_backend_config(): + data = { + "module": "satosa.backends.openid_connect.OpenIDConnectBackend", + "name": "OIDCBackend", + "config": { + "provider_metadata": { + "issuer": "https://op.example.com", + "authorization_endpoint": "https://example.com/authorization" + }, + "client": { + "auth_req_params": { + "response_type": "code", + "scope": "openid, profile, email, address, phone" + }, + "client_metadata": { + "client_id": "backend_client", + "application_name": "SATOSA", + "application_type": "web", + "contacts": ["suppert@example.com"], + "redirect_uris": ["http://example.com/OIDCBackend"], + "subject_type": "public", + } + }, + "entity_info": { + "contact_person": [{ + "contact_type": "technical", + "email_address": ["technical_test@example.com", "support_test@example.com"], + "given_name": "Test", + "sur_name": "OP" + }, { + "contact_type": "support", + "email_address": ["support_test@example.com"], + "given_name": "Support_test" + }], + "organization": { + "display_name": ["OP Identities", "en"], + "name": [["En test-OP", "se"], ["A test OP", "en"]], + "url": [["http://www.example.com", "en"], ["http://www.example.se", "se"]], + "ui_info": { + "description": [["This is a test OP", "en"]], + "display_name": [["OP - TEST", "en"]] + } + } + } + } + } + + return data + + diff --git a/tests/conftest_saml2.py b/tests/conftest_saml2.py new file mode 100644 index 000000000..bcc72f71a --- /dev/null +++ b/tests/conftest_saml2.py @@ -0,0 +1,256 @@ +import copy + +import pytest +from saml2 import BINDING_HTTP_REDIRECT, BINDING_HTTP_POST +from saml2.extension.idpdisc import BINDING_DISCO +from saml2.saml import NAME_FORMAT_URI, NAMEID_FORMAT_TRANSIENT + +from .util import create_metadata_from_config_dict + +BASE_URL = "https://test-proxy.com" + + +@pytest.fixture +def sp_conf(cert_and_key): + sp_base = "http://example.com" + spconfig = { + "entityid": "{}/unittest_sp.xml".format(sp_base), + "service": { + "sp": { + "endpoints": { + "assertion_consumer_service": [ + ("%s/acs/redirect" % sp_base, BINDING_HTTP_REDIRECT) + ], + "discovery_response": [("%s/disco" % sp_base, BINDING_DISCO)] + }, + "want_response_signed": False, + "allow_unsolicited": True, + "name_id_format": [NAMEID_FORMAT_PERSISTENT] + }, + }, + "cert_file": cert_and_key[0], + "key_file": cert_and_key[1], + "metadata": {"inline": []}, + } + + return spconfig + + +@pytest.fixture +def idp_conf(cert_and_key): + idp_base = "http://idp.example.com" + + idpconfig = { + "entityid": "{}/{}/proxy.xml".format(idp_base, "Saml2IDP"), + "description": "A SAML2SAML proxy", + "service": { + "idp": { + "name": "Proxy IdP", + "endpoints": { + "single_sign_on_service": [ + ("%s/sso/redirect" % idp_base, BINDING_HTTP_REDIRECT), + ], + }, + "policy": { + "default": { + "lifetime": {"minutes": 15}, + "attribute_restrictions": None, # means all I have + "name_form": NAME_FORMAT_URI, + "fail_on_missing_requested": False + }, + }, + "subject_data": {}, + "name_id_format": [NAMEID_FORMAT_TRANSIENT, + NAMEID_FORMAT_PERSISTENT], + "want_authn_requests_signed": False, + "ui_info": { + "display_name": [{"text": "SATOSA Test IdP", "lang": "en"}], + "description": [{"text": "Test IdP for SATOSA unit tests.", "lang": "en"}], + "logo": [{"text": "https://idp.example.com/static/logo.png", "width": "120", "height": "60", + "lang": "en"}], + }, + }, + }, + "cert_file": cert_and_key[0], + "key_file": cert_and_key[1], + "metadata": {"inline": []}, + "organization": { + "name": [["Test IdP Org.", "en"]], + "display_name": [["Test IdP", "en"]], + "url": [["https://idp.example.com/about", "en"]] + }, + "contact_person": [ + {"given_name": "Test IdP", "sur_name": "Support", "email_address": ["help@idp.example.com"], + "contact_type": "support" + }, + {"given_name": "Test IdP", "sur_name": "Tech support", + "email_address": ["tech@idp.example.com"], "contact_type": "technical"} + ] + } + + return idpconfig + + +@pytest.fixture +def context(): + context = Context() + context.state = State() + return context + + +@pytest.fixture +def satosa_config_dict(backend_plugin_config, frontend_plugin_config, request_microservice_config, + response_microservice_config): + config = { + "BASE": BASE_URL, + "COOKIE_STATE_NAME": "TEST_STATE", + "INTERNAL_ATTRIBUTES": {"attributes": {}}, + "STATE_ENCRYPTION_KEY": "state_encryption_key", + "CUSTOM_PLUGIN_MODULE_PATHS": [os.path.dirname(__file__)], + "BACKEND_MODULES": [backend_plugin_config], + "FRONTEND_MODULES": [frontend_plugin_config], + "MICRO_SERVICES": [request_microservice_config, response_microservice_config], + "LOGGING": {"version": 1} + } + return config + + +@pytest.fixture +def backend_plugin_config(): + data = { + "module": "util.TestBackend", + "name": "backend", + "config": {"foo": "bar"} + } + return data + + +@pytest.fixture +def frontend_plugin_config(): + data = { + "module": "util.TestFrontend", + "name": "frontend", + "config": {"abc": "xyz"} + } + return data + + +@pytest.fixture +def request_microservice_config(): + data = { + "module": "util.TestRequestMicroservice", + "name": "request-microservice", + } + return data + + +@pytest.fixture +def response_microservice_config(): + data = { + "module": "util.TestResponseMicroservice", + "name": "response-microservice", + "config": {"qwe": "rty"} + } + return data + + +@pytest.fixture +def saml_frontend_config(cert_and_key, sp_conf): + data = { + "module": "satosa.frontends.saml2.SAMLFrontend", + "name": "SAML2Frontend", + "config": { + "idp_config": { + "entityid": "frontend-entity_id", + "service": { + "idp": { + "endpoints": { + "single_sign_on_service": [] + }, + "name": "Frontend IdP", + "name_id_format": NAMEID_FORMAT_TRANSIENT, + "policy": { + "default": { + "attribute_restrictions": None, + "fail_on_missing_requested": False, + "lifetime": {"minutes": 15}, + "name_form": NAME_FORMAT_URI + } + } + } + }, + "cert_file": cert_and_key[0], + "key_file": cert_and_key[1], + "metadata": {"inline": [create_metadata_from_config_dict(sp_conf)]}, + "organization": { + "name": [["SATOSA Org.", "en"]], + "display_name": [["SATOSA", "en"]], + "url": [["https://satosa.example.com/about", "en"]] + }, + "contact_person": [ + {"given_name": "SATOSA", "sur_name": "Support", + "email_address": ["help@satosa.example.com"], + "contact_type": "support" + }, + {"given_name": "SATOSA", "sur_name": "Tech Support", + "email_address": ["tech@satosa.example.com"], + "contact_type": "technical" + } + ] + }, + + "endpoints": { + "single_sign_on_service": {BINDING_HTTP_POST: "sso/post", + BINDING_HTTP_REDIRECT: "sso/redirect"} + } + } + } + + return data + + +@pytest.fixture +def saml_backend_config(idp_conf): + name = "SAML2Backend" + data = { + "module": "satosa.backends.saml2.SAMLBackend", + "name": name, + "config": { + "sp_config": { + "entityid": "backend-entity_id", + "organization": {"display_name": "Example Identities", + "name": "Test Identities Org.", + "url": "http://www.example.com"}, + "contact_person": [ + {"contact_type": "technical", "email_address": "technical@example.com", + "given_name": "Technical"}, + {"contact_type": "support", "email_address": "support@example.com", + "given_name": "Support"} + ], + "service": { + "sp": { + "want_response_signed": False, + "allow_unsolicited": True, + "endpoints": { + "assertion_consumer_service": [ + ("{}/{}/acs/redirect".format(BASE_URL, name), + BINDING_HTTP_REDIRECT)], + "discovery_response": [("{}/disco", BINDING_DISCO)] + + } + } + }, + "metadata": {"inline": [create_metadata_from_config_dict(idp_conf)]} + } + } + } + return data + + +@pytest.fixture +def saml_mirror_frontend_config(saml_frontend_config): + data = copy.deepcopy(saml_frontend_config) + data["module"] = "satosa.frontends.saml2.SAMLMirrorFrontend" + data["name"] = "SAMLMirrorFrontend" + return data + diff --git a/tests/flows/test_account_linking.py b/tests/flows/test_account_linking.py index 94f53a431..2c1c194bd 100644 --- a/tests/flows/test_account_linking.py +++ b/tests/flows/test_account_linking.py @@ -1,7 +1,9 @@ +import pytest import responses from werkzeug.test import Client from werkzeug.wrappers import Response +saml2 = pytest.importorskip('saml2') from satosa.proxy_server import make_app from satosa.satosa_config import SATOSAConfig diff --git a/tests/flows/test_consent.py b/tests/flows/test_consent.py index 76dff496b..6460c5603 100644 --- a/tests/flows/test_consent.py +++ b/tests/flows/test_consent.py @@ -1,10 +1,12 @@ import json import re +import pytest import responses from werkzeug.test import Client from werkzeug.wrappers import Response +saml2 = pytest.importorskip('saml2') from satosa.proxy_server import make_app from satosa.satosa_config import SATOSAConfig diff --git a/tests/flows/test_oidc-saml.py b/tests/flows/test_oidc-saml.py index 2a299bfef..1c8776523 100644 --- a/tests/flows/test_oidc-saml.py +++ b/tests/flows/test_oidc-saml.py @@ -3,8 +3,11 @@ import base64 from urllib.parse import urlparse, urlencode, parse_qsl -import mongomock + import pytest +mongomock = pytest.importorskip('mongomock') +oic = pytest.importorskip('oic') + from jwkest.jwk import rsa_load, RSAKey from jwkest.jws import JWS from oic.oic.message import ClaimsRequest, Claims diff --git a/tests/flows/test_saml-oidc.py b/tests/flows/test_saml-oidc.py index bc41acfe1..98560b91d 100644 --- a/tests/flows/test_saml-oidc.py +++ b/tests/flows/test_saml-oidc.py @@ -1,6 +1,10 @@ import time from urllib.parse import urlparse, parse_qsl, urlencode +import pytest +oic = pytest.importorskip('oic') +saml2 = pytest.importorskip('saml2') + from oic.oic.message import IdToken from saml2 import BINDING_HTTP_REDIRECT from saml2.config import SPConfig diff --git a/tests/flows/test_saml-saml.py b/tests/flows/test_saml-saml.py index 91c350495..58acc2086 100644 --- a/tests/flows/test_saml-saml.py +++ b/tests/flows/test_saml-saml.py @@ -1,5 +1,8 @@ from urllib.parse import parse_qsl, urlparse, urlencode +import pytest +saml2 = pytest.importorskip('saml2') + from saml2 import BINDING_HTTP_REDIRECT from saml2.config import SPConfig, IdPConfig from werkzeug.test import Client diff --git a/tests/flows/test_wsgi_flow.py b/tests/flows/test_wsgi_flow.py index ab9d636f5..11f65f5ba 100644 --- a/tests/flows/test_wsgi_flow.py +++ b/tests/flows/test_wsgi_flow.py @@ -4,6 +4,9 @@ from werkzeug.test import Client from werkzeug.wrappers import Response +import pytest +saml2 = pytest.importorskip('saml2') + from satosa.proxy_server import make_app from satosa.response import NotFound from satosa.satosa_config import SATOSAConfig diff --git a/tests/satosa/backends/test_bitbucket.py b/tests/satosa/backends/test_bitbucket.py index d6cf25bac..c99a9ee8e 100644 --- a/tests/satosa/backends/test_bitbucket.py +++ b/tests/satosa/backends/test_bitbucket.py @@ -5,8 +5,7 @@ import pytest import responses -from saml2.saml import NAMEID_FORMAT_TRANSIENT - +pytest.importorskip('oic') from satosa.backends.bitbucket import BitBucketBackend from satosa.internal import InternalData @@ -130,7 +129,7 @@ def test_register_endpoints(self): def test_start_auth(self, context): context.path = 'bitbucket/sso/redirect' internal_request = InternalData( - subject_type=NAMEID_FORMAT_TRANSIENT, requester='test_requester' + subject_type="transient", requester='test_requester' ) resp = self.bb_backend.start_auth(context, @@ -180,7 +179,7 @@ def test_entire_flow(self, context): context.path = 'bitbucket/sso/redirect' internal_request = InternalData( - subject_type=NAMEID_FORMAT_TRANSIENT, requester='test_requester' + subject_type="transient", requester='test_requester' ) self.bb_backend.start_auth(context, internal_request, mock_get_state) diff --git a/tests/satosa/backends/test_idpy_oidc.py b/tests/satosa/backends/test_idpy_oidc.py index 95e8b427c..1d68c6505 100644 --- a/tests/satosa/backends/test_idpy_oidc.py +++ b/tests/satosa/backends/test_idpy_oidc.py @@ -1,20 +1,20 @@ import json import re import time -from datetime import datetime from unittest.mock import Mock from urllib.parse import parse_qsl from urllib.parse import urlparse +import pytest +import responses from cryptojwt.key_jar import build_keyjar -from idpyoidc.client.defaults import DEFAULT_KEY_DEFS + +idpyoidc = pytest.importorskip('idpyoidc') + from idpyoidc.client.oauth2.stand_alone_client import StandAloneClient -from idpyoidc.message.oidc import AuthorizationResponse from idpyoidc.message.oidc import IdToken -from oic.oic import AuthorizationRequest -import pytest -import responses +from satosa.backends.idpy_oidc import create_client from satosa.backends.idpy_oidc import IdpyOIDCBackend from satosa.context import Context from satosa.internal import InternalData @@ -22,64 +22,65 @@ ISSUER = "https://provider.example.com" CLIENT_ID = "test_client" -CLIENT_BASE_URL = "https://client.test.com" NONCE = "the nonce" +NONCE_KEY = "oidc_nonce" +STATE_KEY = "oidc_state" + +KEYDEFS = [ + {"type": "RSA", "key": "", "use": ["sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]}, +] + + +class TestOpenIDConnectBackend(object): + + @pytest.fixture(autouse=True) + def create_backend(self, internal_attributes, backend_config): + self.oidc_backend = IdpyOIDCBackend(Mock(), internal_attributes, backend_config, + "base_url", "idpy_oidc") + self.issuer_keyjar = build_keyjar(KEYDEFS, issuer_id=ISSUER) + self.oidc_backend.client.keyjar.import_jwks( + self.issuer_keyjar.export_jwks(issuer_id=ISSUER), + ISSUER + ) + + @pytest.fixture + def internal_attributes(self): + return { + "attributes": { + "givenname": {"openid": ["given_name"]}, + "mail": {"openid": ["email"]}, + "edupersontargetedid": {"openid": ["sub"]}, + "surname": {"openid": ["family_name"]} + } + } -class TestIdpyOIDCBackend(object): @pytest.fixture def backend_config(self): return { "client": { - "base_url": CLIENT_BASE_URL, "client_id": CLIENT_ID, - "client_type": "oidc", "client_secret": "ZJYCqe3GGRvdrudKyZS0XhGv_Z45DuKhCUk0gBR1vZk", "application_type": "web", "application_name": "SATOSA Test", "contacts": ["ops@example.com"], + "redirect_uris": ["https://client.test.com/authz_cb"], "response_types_supported": ["code"], - "response_type": "code id_token token", - "scope": "openid foo", - "key_conf": {"key_defs": DEFAULT_KEY_DEFS}, - "jwks_uri": f"{CLIENT_BASE_URL}/jwks.json", + "subject_types_supported": "pairwise", + # "response_type": "code id_token token", + "scopes_supported": ["openid", "foo"], "provider_info": { "issuer": ISSUER, - "authorization_endpoint": f"{ISSUER}/authn", - "token_endpoint": f"{ISSUER}/token", - "userinfo_endpoint": f"{ISSUER}/user", - "jwks_uri": f"{ISSUER}/static/jwks" + "authorization_endpoint": ISSUER + "/authorization", + "token_endpoint": ISSUER + "/token", + "userinfo_endpoint": ISSUER + "/userinfo", + "registration_endpoint": ISSUER + "/registration", + "jwks_uri": ISSUER + "/static/jwks" } } } - @pytest.fixture - def internal_attributes(self): - return { - "attributes": { - "givenname": {"openid": ["given_name"]}, - "mail": {"openid": ["email"]}, - "edupersontargetedid": {"openid": ["sub"]}, - "surname": {"openid": ["family_name"]} - } - } - - @pytest.fixture(autouse=True) - @responses.activate - def create_backend(self, internal_attributes, backend_config): - base_url = backend_config['client']['base_url'] - self.issuer_keys = build_keyjar(DEFAULT_KEY_DEFS) - with responses.RequestsMock() as rsps: - rsps.add( - responses.GET, - backend_config['client']['provider_info']['jwks_uri'], - body=self.issuer_keys.export_jwks_as_json(), - status=200, - content_type="application/json") - - self.oidc_backend = IdpyOIDCBackend(Mock(), internal_attributes, backend_config, - base_url, "oidc") - @pytest.fixture def userinfo(self): return { @@ -90,45 +91,17 @@ def userinfo(self): } @pytest.fixture - def id_token(self, userinfo): - issuer_keys = build_keyjar(DEFAULT_KEY_DEFS) - signing_key = issuer_keys.get_signing_key(key_type='RSA')[0] - signing_key.alg = "RS256" - auth_time = int(datetime.utcnow().timestamp()) - id_token_claims = { - "auth_time": auth_time, - "iss": ISSUER, - "sub": userinfo["sub"], - "aud": CLIENT_ID, - "nonce": NONCE, - "exp": auth_time + 3600, - "iat": auth_time, - } - id_token = IdToken(**id_token_claims) - return id_token - - @pytest.fixture - def all_user_claims(self, userinfo, id_token): - all_user_claims = {**userinfo, **id_token} - return all_user_claims - - def test_client(self, backend_config): - assert isinstance(self.oidc_backend.client, StandAloneClient) - # 3 signing keys. One RSA, one EC and one symmetric - assert len(self.oidc_backend.client.context.keyjar.get_signing_key()) == 3 - assert self.oidc_backend.client.context.jwks_uri == backend_config['client']['jwks_uri'] + def signing_key(self): + return self.issuer_keyjar.get_signing_key("rsa", issuer_id=ISSUER)[0] def assert_expected_attributes(self, attr_map, user_claims, actual_attributes): - expected_attributes = { - out_attr: [user_claims[in_mapping["openid"][0]]] - for out_attr, in_mapping in attr_map["attributes"].items() - } + expected_attributes = {} + for out_attr, in_mapping in attr_map["attributes"].items(): + expected_attributes[out_attr] = [user_claims[in_mapping["openid"][0]]] + assert actual_attributes == expected_attributes - def setup_token_endpoint(self, userinfo): - _client = self.oidc_backend.client - signing_key = self.issuer_keys.get_signing_key(key_type='RSA')[0] - signing_key.alg = "RS256" + def setup_token_endpoint(self, token_endpoint_url, userinfo, signing_key): id_token_claims = { "iss": ISSUER, "sub": userinfo["sub"], @@ -137,7 +110,7 @@ def setup_token_endpoint(self, userinfo): "exp": time.time() + 3600, "iat": time.time() } - id_token = IdToken(**id_token_claims).to_jwt([signing_key], algorithm=signing_key.alg) + id_token = IdToken(**id_token_claims).to_jwt(key=[signing_key], algorithm="RS256") token_response = { "access_token": "SlAV32hkKG", "token_type": "Bearer", @@ -146,90 +119,193 @@ def setup_token_endpoint(self, userinfo): "id_token": id_token } responses.add(responses.POST, - _client.context.provider_info['token_endpoint'], + token_endpoint_url, body=json.dumps(token_response), status=200, content_type="application/json") - def setup_userinfo_endpoint(self, userinfo): + def setup_userinfo_endpoint(self, userinfo_endpoint_url, userinfo): responses.add(responses.GET, - self.oidc_backend.client.context.provider_info['userinfo_endpoint'], + userinfo_endpoint_url, body=json.dumps(userinfo), status=200, content_type="application/json") + def get_redirect_uri_path(self, backend_config): + return urlparse( + backend_config["client"]["redirect_uris"][0]).path.lstrip("/") + @pytest.fixture - def incoming_authn_response(self): - _context = self.oidc_backend.client.context + def incoming_authn_response(self, context, backend_config): oidc_state = "my state" - _uri = _context.claims.get_usage("redirect_uris")[0] - _request = AuthorizationRequest( - redirect_uri=_uri, - response_type="code", - client_id=_context.get_client_id(), - scope=_context.claims.get_usage("scope"), - nonce=NONCE - ) - _context.cstate.set(oidc_state, {"iss": _context.issuer}) - _context.cstate.bind_key(NONCE, oidc_state) - _context.cstate.update(oidc_state, _request) - - response = AuthorizationResponse( - code="F+R4uWbN46U+Bq9moQPC4lEvRd2De4o=", - state=oidc_state, - iss=_context.issuer, - nonce=NONCE + context.path = self.get_redirect_uri_path(backend_config) + context.request = { + "code": "F+R4uWbN46U+Bq9moQPC4lEvRd2De4o=", + "state": oidc_state + } + + # Set state + _client_context = self.oidc_backend.client.get_context() + _client_context.cstate.set( + 'my state', + { + 'iss': ISSUER, + "response_type": "code", + "redirect_uri": backend_config["client"]["redirect_uris"][0], + "nonce": NONCE + } ) - return response.to_dict() + _client_context.cstate.bind_key(NONCE, "my state") + return context - def test_register_endpoints(self): - _uri = self.oidc_backend.client.context.claims.get_usage("redirect_uris")[0] - redirect_uri_path = urlparse(_uri).path.lstrip('/') + def test_register_endpoints(self, backend_config): + redirect_uri_path = self.get_redirect_uri_path(backend_config) url_map = self.oidc_backend.register_endpoints() regex, callback = url_map[0] assert re.search(regex, redirect_uri_path) assert callback == self.oidc_backend.response_endpoint - def test_translate_response_to_internal_response(self, all_user_claims): - internal_response = self.oidc_backend._translate_response(all_user_claims, ISSUER) - assert internal_response.subject_id == all_user_claims["sub"] - self.assert_expected_attributes( - self.oidc_backend.internal_attributes, - all_user_claims, - internal_response.attributes, - ) + def test_translate_response_to_internal_response(self, internal_attributes, userinfo): + internal_response = self.oidc_backend._translate_response(userinfo, ISSUER) + assert internal_response.subject_id == userinfo["sub"] + self.assert_expected_attributes(internal_attributes, userinfo, + internal_response.attributes) @responses.activate - def test_response_endpoint(self, context, all_user_claims, incoming_authn_response): - self.setup_token_endpoint(all_user_claims) - self.setup_userinfo_endpoint(all_user_claims) + def test_response_endpoint(self, backend_config, internal_attributes, userinfo, signing_key, + incoming_authn_response): + self.setup_token_endpoint(backend_config["client"]["provider_info"]["token_endpoint"], + userinfo, + signing_key) + self.setup_userinfo_endpoint(backend_config["client"]["provider_info"]["userinfo_endpoint"], + userinfo) - response_context = Context() - response_context.request = incoming_authn_response - response_context.state = context.state - - self.oidc_backend.response_endpoint(response_context) + self.oidc_backend.response_endpoint(incoming_authn_response) + assert self.oidc_backend.name not in incoming_authn_response.state args = self.oidc_backend.auth_callback_func.call_args[0] assert isinstance(args[0], Context) assert isinstance(args[1], InternalData) - self.assert_expected_attributes( - self.oidc_backend.internal_attributes, all_user_claims, args[1].attributes - ) + self.assert_expected_attributes(internal_attributes, userinfo, args[1].attributes) - def test_start_auth_redirects_to_provider_authorization_endpoint(self, context): - _client = self.oidc_backend.client + def test_start_auth_redirects_to_provider_authorization_endpoint(self, context, + backend_config): auth_response = self.oidc_backend.start_auth(context, None) assert isinstance(auth_response, Response) login_url = auth_response.message parsed = urlparse(login_url) - assert login_url.startswith(_client.context.provider_info["authorization_endpoint"]) + _client_config = backend_config["client"] + assert login_url.startswith(_client_config["provider_info"]["authorization_endpoint"]) auth_params = dict(parse_qsl(parsed.query)) - assert auth_params["scope"] == " ".join(_client.context.claims.get_usage("scope")) - assert auth_params["response_type"] == _client.context.claims.get_usage("response_types")[0] - assert auth_params["client_id"] == _client.client_id - assert auth_params["redirect_uri"] == _client.context.claims.get_usage("redirect_uris")[0] + assert auth_params["scope"] == " ".join(_client_config["scopes_supported"]) + assert auth_params["response_type"] in _client_config["response_types_supported"] + assert auth_params["client_id"] == _client_config["client_id"] + assert auth_params["redirect_uri"] == _client_config["redirect_uris"][0] assert "state" in auth_params assert "nonce" in auth_params + +class TestBackendConfiguration(object): + + def create_config(self, client_metadata, provider_metadata=None): + _config = client_metadata.copy() + if provider_metadata: + _config['provider_info'] = provider_metadata + return _config + + @pytest.fixture + def internal_attributes(self): + return { + "attributes": { + "givenname": {"openid": ["given_name"]}, + "mail": {"openid": ["email"]}, + "edupersontargetedid": {"openid": ["sub"]}, + "surname": {"openid": ["family_name"]} + } + } + + @pytest.fixture + def provider_metadata(self): + return { + "issuer": ISSUER, + "authorization_endpoint": ISSUER + "/authorization", + "token_endpoint": ISSUER + "/token", + "registration_endpoint": ISSUER + "/registration", + "jwks_uri": ISSUER + "/jwks.json", + "response_types_supported": ["code"], + "subject_types_supported": ["public", "pairwise"], + "id_token_signing_alg_values_supported": ["RS256"] + } + + @pytest.fixture + def client_metadata(self): + return { + "client_id": "s6BhdRkqt3", + "client_secret": "ZJYCqe3GGRvdrudKyZS0XhGv_Z45DuKhCUk0gBR1vZk", + "application_type": "web", + "redirect_uris": + ["https://client.example.org/callback", + "https://client.example.org/callback2"], + "client_name": "SATOSA Test", + "logo_uri": "https://client.example.org/logo.png", + "subject_types_supported": ["pairwise"], + "token_endpoint_auth_methods_supported": ["client_secret_basic"], + "jwks_uri": "https://client.example.org/my_public_keys.jwks", + "contacts": ["ve7jtb@example.org", "mary@example.org"], + } + + def assert_provider_metadata(self, provider_metadata, client): + _provider_info = client.get_context().provider_info + assert _provider_info["authorization_endpoint"] == provider_metadata[ + "authorization_endpoint"] + assert _provider_info["token_endpoint"] == provider_metadata["token_endpoint"] + assert _provider_info["registration_endpoint"] == provider_metadata["registration_endpoint"] + assert all(x in _provider_info for x in provider_metadata.keys()) + + def assert_client_metadata(self, client_metadata, client): + _use = client.get_context().claims.use + for key in ["client_id", "client_secret", "client_name", "application_type", "logo_uri", + "contacts"]: + assert _use[key] == client_metadata[key] + + def test_init(self, client_metadata, provider_metadata): + client = create_client(self.create_config(client_metadata, provider_metadata)) + assert isinstance(client, StandAloneClient) + + def test_supports_static_provider_discovery(self, client_metadata, provider_metadata): + client = create_client(self.create_config(client_metadata, provider_metadata)) + self.assert_provider_metadata(provider_metadata, client) + + @responses.activate + def test_supports_dynamic_discovery(self, client_metadata, provider_metadata): + responses.add( + responses.GET, + ISSUER + "/.well-known/openid-configuration", + body=json.dumps(provider_metadata), + status=200, + content_type='application/json' + ) + _client_config = client_metadata + _client_config["issuer"] = provider_metadata['issuer'] + client = create_client(self.create_config(client_metadata)) + self.assert_provider_metadata(provider_metadata, client) + + def test_supports_static_client_registration(self, client_metadata, provider_metadata): + client = create_client(self.create_config(client_metadata, provider_metadata)) + self.assert_client_metadata(client_metadata, client) + + def test_supports_dynamic_client_registration(self, client_metadata, provider_metadata): + with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps: + rsps.add( + responses.POST, + provider_metadata["registration_endpoint"], + body=json.dumps(client_metadata), + status=200, + content_type='application/json' + ) + _client_metadata = { + "redirect_uris": client_metadata["redirect_uris"]} + client = create_client(self.create_config(_client_metadata, provider_metadata)) + + self.assert_client_metadata(client_metadata, client) diff --git a/tests/satosa/backends/test_oauth.py b/tests/satosa/backends/test_oauth.py index 22afc8ee7..d1dbdef83 100644 --- a/tests/satosa/backends/test_oauth.py +++ b/tests/satosa/backends/test_oauth.py @@ -5,8 +5,7 @@ import pytest import responses -from saml2.saml import NAMEID_FORMAT_TRANSIENT - +oic = pytest.importorskip('oic') from satosa.backends.oauth import FacebookBackend from satosa.internal import InternalData @@ -113,7 +112,7 @@ def test_register_endpoints(self): def test_start_auth(self, context): context.path = 'facebook/sso/redirect' internal_request = InternalData( - subject_type=NAMEID_FORMAT_TRANSIENT, requester='test_requester' + subject_type="transient", requester='test_requester' ) resp = self.fb_backend.start_auth(context, internal_request, mock_get_state) @@ -154,7 +153,7 @@ def test_entire_flow(self, context): context.path = 'facebook/sso/redirect' internal_request = InternalData( - subject_type=NAMEID_FORMAT_TRANSIENT, requester='test_requester' + subject_type="transient", requester='test_requester' ) self.fb_backend.start_auth(context, internal_request, mock_get_state) diff --git a/tests/satosa/backends/test_openid_connect.py b/tests/satosa/backends/test_openid_connect.py index 34bac79fe..548a3944e 100644 --- a/tests/satosa/backends/test_openid_connect.py +++ b/tests/satosa/backends/test_openid_connect.py @@ -2,17 +2,19 @@ import re import time from unittest.mock import Mock -from urllib.parse import urlparse, parse_qsl +from urllib.parse import parse_qsl +from urllib.parse import urlparse -import oic import pytest import responses -from Cryptodome.PublicKey import RSA -from jwkest.jwk import RSAKey + +oic = pytest.importorskip('oic') +from cryptojwt.jwk.rsa import new_rsa_key from oic.oic.message import IdToken from oic.utils.authn.client import CLIENT_AUTHN_METHOD -from satosa.backends.openid_connect import OpenIDConnectBackend, _create_client, STATE_KEY, NONCE_KEY +from satosa.backends.openid_connect import OpenIDConnectBackend, _create_client, STATE_KEY, \ + NONCE_KEY from satosa.context import Context from satosa.internal import InternalData from satosa.response import Response @@ -23,9 +25,11 @@ class TestOpenIDConnectBackend(object): + @pytest.fixture(autouse=True) def create_backend(self, internal_attributes, backend_config): - self.oidc_backend = OpenIDConnectBackend(Mock(), internal_attributes, backend_config, "base_url", "oidc") + self.oidc_backend = OpenIDConnectBackend(Mock(), internal_attributes, backend_config, + "base_url", "oidc") @pytest.fixture def backend_config(self): @@ -78,7 +82,7 @@ def userinfo(self): @pytest.fixture(scope="session") def signing_key(self): - return RSAKey(key=RSA.generate(2048), alg="RS256") + return new_rsa_key(key_size=2048) def assert_expected_attributes(self, attr_map, user_claims, actual_attributes): expected_attributes = {} @@ -126,7 +130,8 @@ def setup_userinfo_endpoint(self, userinfo_endpoint_url, userinfo): content_type="application/json") def get_redirect_uri_path(self, backend_config): - return urlparse(backend_config["client"]["client_metadata"]["redirect_uris"][0]).path.lstrip("/") + return urlparse( + backend_config["client"]["client_metadata"]["redirect_uris"][0]).path.lstrip("/") @pytest.fixture def incoming_authn_response(self, context, backend_config): @@ -157,10 +162,13 @@ def test_translate_response_to_internal_response(self, internal_attributes, user self.assert_expected_attributes(internal_attributes, userinfo, internal_response.attributes) @responses.activate - def test_response_endpoint(self, backend_config, internal_attributes, userinfo, signing_key, incoming_authn_response): + def test_response_endpoint(self, backend_config, internal_attributes, userinfo, signing_key, + incoming_authn_response): self.setup_jwks_uri(backend_config["provider_metadata"]["jwks_uri"], signing_key) - self.setup_token_endpoint(backend_config["provider_metadata"]["token_endpoint"], userinfo, signing_key) - self.setup_userinfo_endpoint(backend_config["provider_metadata"]["userinfo_endpoint"], userinfo) + self.setup_token_endpoint(backend_config["provider_metadata"]["token_endpoint"], userinfo, + signing_key) + self.setup_userinfo_endpoint(backend_config["provider_metadata"]["userinfo_endpoint"], + userinfo) self.oidc_backend.response_endpoint(incoming_authn_response) @@ -178,15 +186,18 @@ def test_start_auth_redirects_to_provider_authorization_endpoint(self, context, assert login_url.startswith(backend_config["provider_metadata"]["authorization_endpoint"]) auth_params = dict(parse_qsl(parsed.query)) assert auth_params["scope"] == backend_config["client"]["auth_req_params"]["scope"] - assert auth_params["response_type"] == backend_config["client"]["auth_req_params"]["response_type"] + assert auth_params["response_type"] == backend_config["client"]["auth_req_params"][ + "response_type"] assert auth_params["client_id"] == backend_config["client"]["client_metadata"]["client_id"] - assert auth_params["redirect_uri"] == backend_config["client"]["client_metadata"]["redirect_uris"][0] + assert auth_params["redirect_uri"] == \ + backend_config["client"]["client_metadata"]["redirect_uris"][0] assert "state" in auth_params assert "nonce" in auth_params @responses.activate def test_entire_flow(self, context, backend_config, internal_attributes, userinfo): - self.setup_userinfo_endpoint(backend_config["provider_metadata"]["userinfo_endpoint"], userinfo) + self.setup_userinfo_endpoint(backend_config["provider_metadata"]["userinfo_endpoint"], + userinfo) auth_response = self.oidc_backend.start_auth(context, None) auth_params = dict(parse_qsl(urlparse(auth_response.message).query)) @@ -202,6 +213,7 @@ def test_entire_flow(self, context, backend_config, internal_attributes, userinf class TestCreateClient(object): + @pytest.fixture def provider_metadata(self): return { @@ -222,8 +234,8 @@ def client_metadata(self): "https://client.example.org/callback2"], "client_name": "SATOSA Test", "logo_uri": "https://client.example.org/logo.png", - "subject_type": "pairwise", - "token_endpoint_auth_method": "client_secret_basic", + "subject_types_supported": "pairwise", + "token_endpoint_auth_methods_supported": "client_secret_basic", "jwks_uri": "https://client.example.org/my_public_keys.jwks", "contacts": ["ve7jtb@example.org", "mary@example.org"], } @@ -237,7 +249,8 @@ def assert_provider_metadata(self, provider_metadata, client): def assert_client_metadata(self, client_metadata, client): assert client.client_id == client_metadata["client_id"] assert client.client_secret == client_metadata["client_secret"] - assert all(x in client.registration_response.to_dict().items() for x in client_metadata.items()) + assert all( + x in client.registration_response.to_dict().items() for x in client_metadata.items()) def test_init(self, provider_metadata, client_metadata): client = _create_client(provider_metadata, client_metadata) @@ -273,6 +286,7 @@ def test_supports_dynamic_client_registration(self, provider_metadata, client_me status=200, content_type='application/json' ) - client = _create_client(provider_metadata, dict(redirect_uris=client_metadata["redirect_uris"])) + client = _create_client(provider_metadata, + dict(redirect_uris=client_metadata["redirect_uris"])) self.assert_client_metadata(client_metadata, client) diff --git a/tests/satosa/backends/test_orcid.py b/tests/satosa/backends/test_orcid.py index 5120d4e89..529042167 100644 --- a/tests/satosa/backends/test_orcid.py +++ b/tests/satosa/backends/test_orcid.py @@ -2,6 +2,7 @@ import pytest import responses +oic = pytest.importorskip('oic') from satosa.backends.orcid import OrcidBackend from satosa.context import Context from satosa.internal import InternalData diff --git a/tests/satosa/backends/test_saml2.py b/tests/satosa/backends/test_saml2.py index e1cc96466..9e05c5bdc 100644 --- a/tests/satosa/backends/test_saml2.py +++ b/tests/satosa/backends/test_saml2.py @@ -10,8 +10,8 @@ from urllib.parse import urlparse, parse_qs, parse_qsl import pytest +saml2 = pytest.importorskip('saml2') -import saml2 from saml2 import BINDING_HTTP_REDIRECT, BINDING_HTTP_POST from saml2.authn_context import PASSWORD from saml2.config import IdPConfig, SPConfig diff --git a/tests/satosa/frontends/test_openid_connect.py b/tests/satosa/frontends/test_openid_connect.py index f769b2c66..9eede56c0 100644 --- a/tests/satosa/frontends/test_openid_connect.py +++ b/tests/satosa/frontends/test_openid_connect.py @@ -9,6 +9,8 @@ from urllib.parse import urlparse, parse_qsl import pytest +oic = pytest.importorskip("oic") + from oic.oic.message import AuthorizationResponse, AuthorizationRequest, IdToken, ClaimsRequest, \ Claims, AuthorizationErrorResponse, RegistrationResponse, RegistrationRequest, \ ClientRegistrationErrorResponse, ProviderConfigurationResponse, AccessTokenRequest, AccessTokenResponse, \ diff --git a/tests/satosa/frontends/test_saml2.py b/tests/satosa/frontends/test_saml2.py index 978489429..76ddd9bc8 100644 --- a/tests/satosa/frontends/test_saml2.py +++ b/tests/satosa/frontends/test_saml2.py @@ -8,6 +8,8 @@ from urllib.parse import urlparse, parse_qs import pytest +saml2 = pytest.importorskip("saml2") + from saml2 import BINDING_HTTP_REDIRECT, BINDING_HTTP_POST from saml2.authn_context import PASSWORD from saml2.config import SPConfig diff --git a/tests/satosa/metadata_creation/test_description.py b/tests/satosa/metadata_creation/test_description.py index 818d01a03..64d04609d 100644 --- a/tests/satosa/metadata_creation/test_description.py +++ b/tests/satosa/metadata_creation/test_description.py @@ -1,9 +1,13 @@ import pytest -from satosa.metadata_creation.description import ContactPersonDesc, UIInfoDesc, OrganizationDesc, MetadataDescription +from satosa.metadata_creation.description import ContactPersonDesc +from satosa.metadata_creation.description import MetadataDescription +from satosa.metadata_creation.description import OrganizationDesc +from satosa.metadata_creation.description import UIInfoDesc class TestContactPersonDesc(object): + def test_to_dict(self): desc = ContactPersonDesc() desc.contact_type = "test" @@ -19,6 +23,7 @@ def test_to_dict(self): class TestUIInfoDesc(object): + def test_to_dict(self): desc = UIInfoDesc() desc.add_description("test", "en") @@ -51,6 +56,7 @@ def test_to_dict_with_empty(self): class TestOrganizationDesc(object): + def test_to_dict(self): desc = OrganizationDesc() desc.add_display_name("Foo Testing", "en") @@ -69,6 +75,7 @@ def test_to_dict_with_empty(self): class TestMetadataDescription(object): + def test_to_dict(self): org_desc = OrganizationDesc() org_desc.add_display_name("Foo Testing", "en") diff --git a/tests/satosa/metadata_creation/test_saml_metadata.py b/tests/satosa/metadata_creation/test_saml_metadata.py index 77e8ac1d7..4820cc61e 100644 --- a/tests/satosa/metadata_creation/test_saml_metadata.py +++ b/tests/satosa/metadata_creation/test_saml_metadata.py @@ -2,13 +2,17 @@ from base64 import urlsafe_b64encode import pytest + +saml2 = pytest.importorskip("saml2") + from saml2.config import SPConfig, Config from saml2.mdstore import InMemoryMetaData from saml2.metadata import entity_descriptor from saml2.sigver import security_context from saml2.time_util import in_a_while -from satosa.metadata_creation.saml_metadata import create_entity_descriptors, create_signed_entities_descriptor, \ +from satosa.metadata_creation.saml_metadata import create_entity_descriptors, \ + create_signed_entities_descriptor, \ create_signed_entity_descriptor from satosa.satosa_config import SATOSAConfig from tests.conftest import BASE_URL @@ -16,20 +20,26 @@ class TestCreateEntityDescriptors: - def assert_single_sign_on_endpoints_for_saml_frontend(self, entity_descriptor, saml_frontend_config, backend_names): + + def assert_single_sign_on_endpoints_for_saml_frontend(self, entity_descriptor, + saml_frontend_config, backend_names): metadata = InMemoryMetaData(None, str(entity_descriptor)) metadata.load() - sso = metadata.service(saml_frontend_config["config"]["idp_config"]["entityid"], "idpsso_descriptor", + sso = metadata.service(saml_frontend_config["config"]["idp_config"]["entityid"], + "idpsso_descriptor", "single_sign_on_service") for backend_name in backend_names: - for binding, path in saml_frontend_config["config"]["endpoints"]["single_sign_on_service"].items(): + for binding, path in saml_frontend_config["config"]["endpoints"][ + "single_sign_on_service"].items(): sso_urls_for_binding = [endpoint["location"] for endpoint in sso[binding]] expected_url = "{}/{}/{}".format(BASE_URL, backend_name, path) assert expected_url in sso_urls_for_binding - def assert_single_sign_on_endpoints_for_saml_mirror_frontend(self, entity_descriptors, encoded_target_entity_id, - saml_mirror_frontend_config, backend_names): + def assert_single_sign_on_endpoints_for_saml_mirror_frontend(self, entity_descriptors, + encoded_target_entity_id, + saml_mirror_frontend_config, + backend_names): expected_entity_id = saml_mirror_frontend_config["config"]["idp_config"][ "entityid"] + "/" + encoded_target_entity_id metadata = InMemoryMetaData(None, None) @@ -38,21 +48,27 @@ def assert_single_sign_on_endpoints_for_saml_mirror_frontend(self, entity_descri sso = metadata.service(expected_entity_id, "idpsso_descriptor", "single_sign_on_service") for backend_name in backend_names: - for binding, path in saml_mirror_frontend_config["config"]["endpoints"]["single_sign_on_service"].items(): + for binding, path in saml_mirror_frontend_config["config"]["endpoints"][ + "single_sign_on_service"].items(): sso_urls_for_binding = [endpoint["location"] for endpoint in sso[binding]] - expected_url = "{}/{}/{}/{}".format(BASE_URL, backend_name, encoded_target_entity_id, path) + expected_url = "{}/{}/{}/{}".format(BASE_URL, backend_name, + encoded_target_entity_id, path) assert expected_url in sso_urls_for_binding - def assert_assertion_consumer_service_endpoints_for_saml_backend(self, entity_descriptor, saml_backend_config): + def assert_assertion_consumer_service_endpoints_for_saml_backend(self, entity_descriptor, + saml_backend_config): metadata = InMemoryMetaData(None, str(entity_descriptor)) metadata.load() - acs = metadata.service(saml_backend_config["config"]["sp_config"]["entityid"], "spsso_descriptor", + acs = metadata.service(saml_backend_config["config"]["sp_config"]["entityid"], + "spsso_descriptor", "assertion_consumer_service") - for url, binding in saml_backend_config["config"]["sp_config"]["service"]["sp"]["endpoints"][ + for url, binding in \ + saml_backend_config["config"]["sp_config"]["service"]["sp"]["endpoints"][ "assertion_consumer_service"]: assert acs[binding][0]["location"] == url - def test_saml_frontend_with_saml_backend(self, satosa_config_dict, saml_frontend_config, saml_backend_config): + def test_saml_frontend_with_saml_backend(self, satosa_config_dict, saml_frontend_config, + saml_backend_config): satosa_config_dict["FRONTEND_MODULES"] = [saml_frontend_config] satosa_config_dict["BACKEND_MODULES"] = [saml_backend_config] satosa_config = SATOSAConfig(satosa_config_dict) @@ -61,14 +77,16 @@ def test_saml_frontend_with_saml_backend(self, satosa_config_dict, saml_frontend assert len(frontend_metadata) == 1 assert len(frontend_metadata[saml_frontend_config["name"]]) == 1 entity_descriptor = frontend_metadata[saml_frontend_config["name"]][0] - self.assert_single_sign_on_endpoints_for_saml_frontend(entity_descriptor, saml_frontend_config, + self.assert_single_sign_on_endpoints_for_saml_frontend(entity_descriptor, + saml_frontend_config, [saml_backend_config["name"]]) assert len(backend_metadata) == 1 self.assert_assertion_consumer_service_endpoints_for_saml_backend( backend_metadata[saml_backend_config["name"]][0], saml_backend_config) - def test_saml_frontend_with_oidc_backend(self, satosa_config_dict, saml_frontend_config, oidc_backend_config): + def test_saml_frontend_with_oidc_backend(self, satosa_config_dict, saml_frontend_config, + oidc_backend_config): satosa_config_dict["FRONTEND_MODULES"] = [saml_frontend_config] satosa_config_dict["BACKEND_MODULES"] = [oidc_backend_config] satosa_config = SATOSAConfig(satosa_config_dict) @@ -77,12 +95,14 @@ def test_saml_frontend_with_oidc_backend(self, satosa_config_dict, saml_frontend assert len(frontend_metadata) == 1 assert len(frontend_metadata[saml_frontend_config["name"]]) == 1 entity_descriptor = frontend_metadata[saml_frontend_config["name"]][0] - self.assert_single_sign_on_endpoints_for_saml_frontend(entity_descriptor, saml_frontend_config, + self.assert_single_sign_on_endpoints_for_saml_frontend(entity_descriptor, + saml_frontend_config, [oidc_backend_config["name"]]) # OIDC backend does not produce any SAML metadata assert not backend_metadata - def test_saml_frontend_with_multiple_backends(self, satosa_config_dict, saml_frontend_config, saml_backend_config, + def test_saml_frontend_with_multiple_backends(self, satosa_config_dict, saml_frontend_config, + saml_backend_config, oidc_backend_config): satosa_config_dict["FRONTEND_MODULES"] = [saml_frontend_config] satosa_config_dict["BACKEND_MODULES"] = [saml_backend_config, oidc_backend_config] @@ -92,7 +112,8 @@ def test_saml_frontend_with_multiple_backends(self, satosa_config_dict, saml_fro assert len(frontend_metadata) == 1 assert len(frontend_metadata[saml_frontend_config["name"]]) == 1 entity_descriptor = frontend_metadata[saml_frontend_config["name"]][0] - self.assert_single_sign_on_endpoints_for_saml_frontend(entity_descriptor, saml_frontend_config, + self.assert_single_sign_on_endpoints_for_saml_frontend(entity_descriptor, + saml_frontend_config, [saml_backend_config["name"], oidc_backend_config["name"]]) # only the SAML backend produces SAML metadata @@ -101,15 +122,18 @@ def test_saml_frontend_with_multiple_backends(self, satosa_config_dict, saml_fro backend_metadata[saml_backend_config["name"]][0], saml_backend_config) - def test_saml_mirror_frontend_with_saml_backend_with_multiple_target_providers(self, satosa_config_dict, idp_conf, + def test_saml_mirror_frontend_with_saml_backend_with_multiple_target_providers(self, + satosa_config_dict, + idp_conf, saml_mirror_frontend_config, saml_backend_config): idp_conf2 = copy.deepcopy(idp_conf) idp_conf2["entityid"] = "https://idp2.example.com" satosa_config_dict["FRONTEND_MODULES"] = [saml_mirror_frontend_config] - saml_backend_config["config"]["sp_config"]["metadata"] = {"inline": [create_metadata_from_config_dict(idp_conf), - create_metadata_from_config_dict( - idp_conf2)]} + saml_backend_config["config"]["sp_config"]["metadata"] = { + "inline": [create_metadata_from_config_dict(idp_conf), + create_metadata_from_config_dict( + idp_conf2)]} satosa_config_dict["BACKEND_MODULES"] = [saml_backend_config] satosa_config = SATOSAConfig(satosa_config_dict) frontend_metadata, backend_metadata = create_entity_descriptors(satosa_config) @@ -119,16 +143,20 @@ def test_saml_mirror_frontend_with_saml_backend_with_multiple_target_providers(s entity_descriptors = frontend_metadata[saml_mirror_frontend_config["name"]] for target_entity_id in [idp_conf["entityid"], idp_conf2["entityid"]]: - encoded_target_entity_id = urlsafe_b64encode(target_entity_id.encode("utf-8")).decode("utf-8") - self.assert_single_sign_on_endpoints_for_saml_mirror_frontend(entity_descriptors, encoded_target_entity_id, + encoded_target_entity_id = urlsafe_b64encode(target_entity_id.encode("utf-8")).decode( + "utf-8") + self.assert_single_sign_on_endpoints_for_saml_mirror_frontend(entity_descriptors, + encoded_target_entity_id, saml_mirror_frontend_config, - [saml_backend_config["name"]]) + [saml_backend_config[ + "name"]]) assert len(backend_metadata) == 1 self.assert_assertion_consumer_service_endpoints_for_saml_backend( backend_metadata[saml_backend_config["name"]][0], saml_backend_config) - def test_saml_mirror_frontend_with_oidc_backend(self, satosa_config_dict, saml_mirror_frontend_config, + def test_saml_mirror_frontend_with_oidc_backend(self, satosa_config_dict, + saml_mirror_frontend_config, oidc_backend_config): satosa_config_dict["FRONTEND_MODULES"] = [saml_mirror_frontend_config] satosa_config_dict["BACKEND_MODULES"] = [oidc_backend_config] @@ -139,8 +167,10 @@ def test_saml_mirror_frontend_with_oidc_backend(self, satosa_config_dict, saml_m assert len(frontend_metadata[saml_mirror_frontend_config["name"]]) == 1 entity_descriptors = frontend_metadata[saml_mirror_frontend_config["name"]] target_entity_id = oidc_backend_config["config"]["provider_metadata"]["issuer"] - encoded_target_entity_id = urlsafe_b64encode(target_entity_id.encode("utf-8")).decode("utf-8") - self.assert_single_sign_on_endpoints_for_saml_mirror_frontend(entity_descriptors, encoded_target_entity_id, + encoded_target_entity_id = urlsafe_b64encode(target_entity_id.encode("utf-8")).decode( + "utf-8") + self.assert_single_sign_on_endpoints_for_saml_mirror_frontend(entity_descriptors, + encoded_target_entity_id, saml_mirror_frontend_config, [oidc_backend_config["name"]]) @@ -159,12 +189,15 @@ def test_saml_mirror_frontend_with_multiple_backends(self, satosa_config_dict, i assert len(frontend_metadata) == 1 assert len(frontend_metadata[saml_mirror_frontend_config["name"]]) == 2 - params = zip([idp_conf["entityid"], oidc_backend_config["config"]["provider_metadata"]["issuer"]], - [saml_backend_config["name"], oidc_backend_config["name"]]) + params = zip( + [idp_conf["entityid"], oidc_backend_config["config"]["provider_metadata"]["issuer"]], + [saml_backend_config["name"], oidc_backend_config["name"]]) entity_descriptors = frontend_metadata[saml_mirror_frontend_config["name"]] for target_entity_id, backend_name in params: - encoded_target_entity_id = urlsafe_b64encode(target_entity_id.encode("utf-8")).decode("utf-8") - self.assert_single_sign_on_endpoints_for_saml_mirror_frontend(entity_descriptors, encoded_target_entity_id, + encoded_target_entity_id = urlsafe_b64encode(target_entity_id.encode("utf-8")).decode( + "utf-8") + self.assert_single_sign_on_endpoints_for_saml_mirror_frontend(entity_descriptors, + encoded_target_entity_id, saml_mirror_frontend_config, [backend_name]) @@ -174,7 +207,8 @@ def test_saml_mirror_frontend_with_multiple_backends(self, satosa_config_dict, i backend_metadata[saml_backend_config["name"]][0], saml_backend_config) - def test_two_saml_frontends(self, satosa_config_dict, saml_frontend_config, saml_mirror_frontend_config, + def test_two_saml_frontends(self, satosa_config_dict, saml_frontend_config, + saml_mirror_frontend_config, oidc_backend_config): satosa_config_dict["FRONTEND_MODULES"] = [saml_frontend_config, saml_mirror_frontend_config] @@ -187,21 +221,25 @@ def test_two_saml_frontends(self, satosa_config_dict, saml_frontend_config, saml saml_entities = frontend_metadata[saml_frontend_config["name"]] assert len(saml_entities) == 1 entity_descriptor = saml_entities[0] - self.assert_single_sign_on_endpoints_for_saml_frontend(entity_descriptor, saml_frontend_config, + self.assert_single_sign_on_endpoints_for_saml_frontend(entity_descriptor, + saml_frontend_config, [oidc_backend_config["name"]]) mirrored_saml_entities = frontend_metadata[saml_mirror_frontend_config["name"]] assert len(mirrored_saml_entities) == 1 target_entity_id = oidc_backend_config["config"]["provider_metadata"]["issuer"] - encoded_target_entity_id = urlsafe_b64encode(target_entity_id.encode("utf-8")).decode("utf-8") - self.assert_single_sign_on_endpoints_for_saml_mirror_frontend(mirrored_saml_entities, encoded_target_entity_id, + encoded_target_entity_id = urlsafe_b64encode(target_entity_id.encode("utf-8")).decode( + "utf-8") + self.assert_single_sign_on_endpoints_for_saml_mirror_frontend(mirrored_saml_entities, + encoded_target_entity_id, saml_mirror_frontend_config, [oidc_backend_config["name"]]) # OIDC backend does not produce any SAML metadata assert not backend_metadata - def test_create_mirrored_metadata_does_not_contain_target_contact_info(self, satosa_config_dict, idp_conf, + def test_create_mirrored_metadata_does_not_contain_target_contact_info(self, satosa_config_dict, + idp_conf, saml_mirror_frontend_config, saml_backend_config): @@ -222,18 +260,22 @@ def test_create_mirrored_metadata_does_not_contain_target_contact_info(self, sat assert len(entity_info["contact_person"]) == len(expected_entity_info["contact_person"]) for i, contact in enumerate(expected_entity_info["contact_person"]): assert entity_info["contact_person"][i]["contact_type"] == contact["contact_type"] - assert entity_info["contact_person"][i]["email_address"][0]["text"] == contact["email_address"][0] + assert entity_info["contact_person"][i]["email_address"][0]["text"] == \ + contact["email_address"][0] assert entity_info["contact_person"][i]["given_name"]["text"] == contact["given_name"] assert entity_info["contact_person"][i]["sur_name"]["text"] == contact["sur_name"] expected_org_info = expected_entity_info["organization"] assert entity_info["organization"]["organization_display_name"][0]["text"] == \ expected_org_info["display_name"][0][0] - assert entity_info["organization"]["organization_name"][0]["text"] == expected_org_info["name"][0][0] - assert entity_info["organization"]["organization_url"][0]["text"] == expected_org_info["url"][0][0] + assert entity_info["organization"]["organization_name"][0]["text"] == \ + expected_org_info["name"][0][0] + assert entity_info["organization"]["organization_url"][0]["text"] == \ + expected_org_info["url"][0][0] class TestCreateSignedEntitiesDescriptor: + @pytest.fixture def entity_desc(self, sp_conf): return entity_descriptor(SPConfig().load(sp_conf)) @@ -251,8 +293,10 @@ def signature_security_context(self, cert_and_key): conf.key_file = cert_and_key[1] return security_context(conf) - def test_signed_metadata(self, entity_desc, signature_security_context, verification_security_context): - signed_metadata = create_signed_entities_descriptor([entity_desc, entity_desc], signature_security_context) + def test_signed_metadata(self, entity_desc, signature_security_context, + verification_security_context): + signed_metadata = create_signed_entities_descriptor([entity_desc, entity_desc], + signature_security_context) md = InMemoryMetaData(None, security=verification_security_context) md.parse(signed_metadata) @@ -263,7 +307,8 @@ def test_signed_metadata(self, entity_desc, signature_security_context, verifica def test_valid_for(self, entity_desc, signature_security_context): valid_for = 4 # metadata valid for 4 hours expected_validity = in_a_while(hours=valid_for) - signed_metadata = create_signed_entities_descriptor([entity_desc], signature_security_context, + signed_metadata = create_signed_entities_descriptor([entity_desc], + signature_security_context, valid_for=valid_for) md = InMemoryMetaData(None) @@ -272,6 +317,7 @@ def test_valid_for(self, entity_desc, signature_security_context): class TestCreateSignedEntityDescriptor: + @pytest.fixture def entity_desc(self, sp_conf): return entity_descriptor(SPConfig().load(sp_conf)) @@ -289,7 +335,8 @@ def signature_security_context(self, cert_and_key): conf.key_file = cert_and_key[1] return security_context(conf) - def test_signed_metadata(self, entity_desc, signature_security_context, verification_security_context): + def test_signed_metadata(self, entity_desc, signature_security_context, + verification_security_context): signed_metadata = create_signed_entity_descriptor(entity_desc, signature_security_context) md = InMemoryMetaData(None, security=verification_security_context) diff --git a/tests/satosa/micro_services/test_account_linking.py b/tests/satosa/micro_services/test_account_linking.py index 859f3517d..7b8b7ec07 100644 --- a/tests/satosa/micro_services/test_account_linking.py +++ b/tests/satosa/micro_services/test_account_linking.py @@ -5,11 +5,9 @@ import requests import responses -from responses import matchers - -from jwkest.jwk import rsa_load, RSAKey -from jwkest.jws import JWS +from cryptojwt import JWS +from satosa.cert_util import rsa_key_from_pem from satosa.exception import SATOSAAuthenticationError from satosa.internal import AuthenticationInformation from satosa.internal import InternalData @@ -40,17 +38,17 @@ def create_account_linking(self, account_linking_config): self.account_linking.next = lambda ctx, data: data @responses.activate - def test_existing_account_linking_with_known_known_uuid(self, account_linking_config, internal_response, context): + def test_existing_account_linking_with_known_known_uuid(self, account_linking_config, + internal_response, context): uuid = "uuid" data = { "idp": internal_response.auth_info.issuer, "id": internal_response.subject_id, "redirect_endpoint": self.account_linking.base_url + "/account_linking/handle_account_linking" } - key = RSAKey(key=rsa_load(account_linking_config["sign_key"]), use="sig", alg="RS256") - jws = JWS(json.dumps(data), alg=key.alg).sign_compact([key]) - url = "%s/get_id" % account_linking_config["api_url"] - params = {"jwt": jws} + key = rsa_key_from_pem(account_linking_config["sign_key"]) + key.alg = "RS256" + jws = JWS(json.dumps(data), key.alg).sign_compact([key]) responses.add( responses.GET, url=url, @@ -82,8 +80,8 @@ def test_full_flow(self, account_linking_config, internal_response, context): "id": internal_response.subject_id, "redirect_endpoint": self.account_linking.base_url + "/account_linking/handle_account_linking" } - key = RSAKey(key=rsa_load(account_linking_config["sign_key"]), use="sig", alg="RS256") - jws = JWS(json.dumps(data), alg=key.alg).sign_compact([key]) + key = rsa_key_from_pem(account_linking_config["sign_key"]) + jws = JWS(json.dumps(data), alg="RS256").sign_compact([key]) uuid = "uuid" with responses.RequestsMock() as rsps: # account is linked, 200 OK diff --git a/tests/satosa/micro_services/test_attribute_modifications.py b/tests/satosa/micro_services/test_attribute_modifications.py index 41ce8a7c0..2a003af22 100644 --- a/tests/satosa/micro_services/test_attribute_modifications.py +++ b/tests/satosa/micro_services/test_attribute_modifications.py @@ -1,7 +1,5 @@ import pytest -from tests.util import FakeIdP, create_metadata_from_config_dict, FakeSP -from saml2.mdstore import MetadataStore -from saml2.config import Config + from satosa.context import Context from satosa.exception import SATOSAError from satosa.internal import AuthenticationInformation @@ -10,28 +8,14 @@ class TestFilterAttributeValues: + def create_filter_service(self, attribute_filters): - filter_service = FilterAttributeValues(config=dict(attribute_filters=attribute_filters), name="test_filter", + filter_service = FilterAttributeValues(config=dict(attribute_filters=attribute_filters), + name="test_filter", base_url="https://satosa.example.com") filter_service.next = lambda ctx, data: data return filter_service - def create_idp_metadata_conf_with_shibmd_scopes(self, idp_entityid, shibmd_scopes): - idp_conf = { - "entityid": idp_entityid, - "service": { - "idp":{} - } - } - - if shibmd_scopes is not None: - idp_conf["service"]["idp"]["scope"] = shibmd_scopes - - metadata_conf = { - "inline": [create_metadata_from_config_dict(idp_conf)] - } - return metadata_conf - def test_filter_all_attributes_from_all_target_providers_for_all_requesters(self): attribute_filters = { "": { # all providers @@ -49,7 +33,8 @@ def test_filter_all_attributes_from_all_target_providers_for_all_requesters(self "a3": ["a:foo:bar:b"] } filtered = filter_service.process(None, resp) - assert filtered.attributes == {"a1": [], "a2": ["foo:bar", "1:foo:bar:2"], "a3": ["a:foo:bar:b"]} + assert filtered.attributes == {"a1": [], "a2": ["foo:bar", "1:foo:bar:2"], + "a3": ["a:foo:bar:b"]} def test_filter_one_attribute_from_all_target_providers_for_all_requesters(self): attribute_filters = { @@ -139,7 +124,8 @@ def test_filter_one_attribute_for_one_target_provider_for_one_requester(self): filtered = filter_service.process(None, resp) assert filtered.attributes == {"a1": ["1:foo:bar:2"]} - def test_filter_one_attribute_from_all_target_providers_for_all_requesters_in_extended_notation(self): + def test_filter_one_attribute_from_all_target_providers_for_all_requesters_in_extended_notation( + self): attribute_filters = { "": { "": { @@ -199,203 +185,3 @@ def test_shibmdscope_match_value_filter_with_no_md_store_in_context(self): ctx = Context() filtered = filter_service.process(ctx, resp) assert filtered.attributes == {"a1": ["abc:xyz"], "a2": []} - - def test_shibmdscope_match_value_filter_with_empty_md_store_in_context(self): - attribute_filters = { - "": { - "": { - "a2": { - "shibmdscope_match_value": None - } - } - } - } - filter_service = self.create_filter_service(attribute_filters) - - resp = InternalData(AuthenticationInformation()) - resp.attributes = { - "a1": ["abc:xyz"], - "a2": ["foo:bar", "1:foo:bar:2"], - } - ctx = Context() - mdstore = MetadataStore(None, None) - ctx.decorate(Context.KEY_METADATA_STORE, mdstore) - filtered = filter_service.process(ctx, resp) - assert filtered.attributes == {"a1": ["abc:xyz"], "a2": []} - - def test_shibmdscope_match_value_filter_with_idp_md_with_no_scope(self): - attribute_filters = { - "": { - "": { - "a2": { - "shibmdscope_match_value": None - } - } - } - } - filter_service = self.create_filter_service(attribute_filters) - - resp = InternalData(AuthenticationInformation()) - resp.attributes = { - "a1": ["abc:xyz"], - "a2": ["foo.bar", "1.foo.bar.2"], - } - - idp_entityid = 'https://idp.example.org/' - resp.auth_info.issuer = idp_entityid - - mdstore = MetadataStore(None, Config()) - mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, None)) - ctx = Context() - ctx.decorate(Context.KEY_METADATA_STORE, mdstore) - - filtered = filter_service.process(ctx, resp) - assert filtered.attributes == {"a1": ["abc:xyz"], "a2": []} - - def test_shibmdscope_match_value_filter_with_idp_md_with_single_scope(self): - attribute_filters = { - "": { - "": { - "a2": { - "shibmdscope_match_value": None - } - } - } - } - filter_service = self.create_filter_service(attribute_filters) - - resp = InternalData(AuthenticationInformation()) - resp.attributes = { - "a1": ["abc:xyz"], - "a2": ["foo.bar", "1.foo.bar.2"], - } - - idp_entityid = 'https://idp.example.org/' - resp.auth_info.issuer = idp_entityid - - mdstore = MetadataStore(None, Config()) - mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["foo.bar"])) - ctx = Context() - ctx.decorate(Context.KEY_METADATA_STORE, mdstore) - - filtered = filter_service.process(ctx, resp) - assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["foo.bar"]} - - def test_shibmdscope_match_value_filter_with_idp_md_with_single_regexp_scope(self): - attribute_filters = { - "": { - "": { - "a2": { - "shibmdscope_match_value": None - } - } - } - } - filter_service = self.create_filter_service(attribute_filters) - - resp = InternalData(AuthenticationInformation()) - resp.attributes = { - "a1": ["abc:xyz"], - "a2": ["test.foo.bar", "1.foo.bar.2"], - } - - idp_entityid = 'https://idp.example.org/' - resp.auth_info.issuer = idp_entityid - - mdstore = MetadataStore(None, Config()) - mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, [r"[^.]*\.foo\.bar$"])) - # mark scope as regexp (cannot be done via pysaml2 YAML config) - mdstore[idp_entityid]['idpsso_descriptor'][0]['extensions']['extension_elements'][0]['regexp'] = 'true' - ctx = Context() - ctx.decorate(Context.KEY_METADATA_STORE, mdstore) - - filtered = filter_service.process(ctx, resp) - assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["test.foo.bar"]} - - def test_shibmdscope_match_value_filter_with_idp_md_with_multiple_scopes(self): - attribute_filters = { - "": { - "": { - "a2": { - "shibmdscope_match_value": None - } - } - } - } - filter_service = self.create_filter_service(attribute_filters) - - resp = InternalData(AuthenticationInformation()) - resp.attributes = { - "a1": ["abc:xyz"], - "a2": ["foo.bar", "1.foo.bar.2", "foo.baz", "foo.baz.com"], - } - - idp_entityid = 'https://idp.example.org/' - resp.auth_info.issuer = idp_entityid - - mdstore = MetadataStore(None, Config()) - mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["foo.bar", "foo.baz"])) - ctx = Context() - ctx.decorate(Context.KEY_METADATA_STORE, mdstore) - - filtered = filter_service.process(ctx, resp) - assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["foo.bar", "foo.baz"]} - - def test_shibmdscope_match_scope_filter_with_single_scope(self): - attribute_filters = { - "": { - "": { - "a2": { - "shibmdscope_match_scope": None - } - } - } - } - filter_service = self.create_filter_service(attribute_filters) - - resp = InternalData(AuthenticationInformation()) - resp.attributes = { - "a1": ["abc:xyz"], - "a2": ["foo.bar", "value@foo.bar", "1.foo.bar.2", "value@foo.bar.2", "value@extra@foo.bar"], - } - - idp_entityid = 'https://idp.example.org/' - resp.auth_info.issuer = idp_entityid - - mdstore = MetadataStore(None, Config()) - mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["foo.bar"])) - ctx = Context() - ctx.decorate(Context.KEY_METADATA_STORE, mdstore) - - filtered = filter_service.process(ctx, resp) - assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["value@foo.bar"]} - - def test_multiple_filters_for_single_attribute(self): - attribute_filters = { - "": { - "": { - "a2": { - "regexp": "^value1@", - "shibmdscope_match_scope": None - } - } - } - } - filter_service = self.create_filter_service(attribute_filters) - - resp = InternalData(AuthenticationInformation()) - resp.attributes = { - "a1": ["abc:xyz"], - "a2": ["foo.bar", "value1@foo.bar", "value2@foo.bar", "1.foo.bar.2", "value@foo.bar.2", "value@extra@foo.bar"], - } - - idp_entityid = 'https://idp.example.org/' - resp.auth_info.issuer = idp_entityid - - mdstore = MetadataStore(None, Config()) - mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["foo.bar"])) - ctx = Context() - ctx.decorate(Context.KEY_METADATA_STORE, mdstore) - - filtered = filter_service.process(ctx, resp) - assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["value1@foo.bar"]} diff --git a/tests/satosa/micro_services/test_attribute_modifications_saml2.py b/tests/satosa/micro_services/test_attribute_modifications_saml2.py new file mode 100644 index 000000000..2b8eb9c5f --- /dev/null +++ b/tests/satosa/micro_services/test_attribute_modifications_saml2.py @@ -0,0 +1,413 @@ +import pytest + +pytest.importorskip('saml2') +from satosa.context import Context +from satosa.exception import SATOSAError +from satosa.internal import AuthenticationInformation +from satosa.internal import InternalData +from satosa.micro_services.attribute_modifications import FilterAttributeValues +from tests.util_saml2 import create_metadata_from_config_dict + + +class TestFilterAttributeValues: + def create_filter_service(self, attribute_filters): + filter_service = FilterAttributeValues(config=dict(attribute_filters=attribute_filters), + name="test_filter", + base_url="https://satosa.example.com") + filter_service.next = lambda ctx, data: data + return filter_service + + def test_filter_all_attributes_from_all_target_providers_for_all_requesters(self): + attribute_filters = { + "": { # all providers + "": { # all requesters + "": "foo:bar" # all attributes + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(auth_info=AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo:bar", "1:foo:bar:2"], + "a3": ["a:foo:bar:b"] + } + filtered = filter_service.process(None, resp) + assert filtered.attributes == {"a1": [], "a2": ["foo:bar", "1:foo:bar:2"], "a3": ["a:foo:bar:b"]} + + def test_filter_one_attribute_from_all_target_providers_for_all_requesters(self): + attribute_filters = { + "": { + "": { + "a2": "^foo:bar$" + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo:bar", "1:foo:bar:2"], + } + filtered = filter_service.process(None, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["foo:bar"]} + + def test_filter_one_attribute_from_all_target_providers_for_one_requester(self): + requester = "test_requester" + attribute_filters = { + "": { + requester: + {"a1": "foo:bar"} + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(auth_info=AuthenticationInformation()) + resp.requester = requester + resp.attributes = { + "a1": ["abc:xyz", "1:foo:bar:2"], + } + filtered = filter_service.process(None, resp) + assert filtered.attributes == {"a1": ["1:foo:bar:2"]} + + def test_filter_attribute_not_in_response(self): + attribute_filters = { + "": { + "": + {"a0": "foo:bar"} + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(auth_info=AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz", "1:foo:bar:2"], + } + filtered = filter_service.process(None, resp) + assert filtered.attributes == {"a1": ["abc:xyz", "1:foo:bar:2"]} + + def test_filter_one_attribute_for_one_target_provider(self): + target_provider = "test_provider" + attribute_filters = { + target_provider: { + "": + {"a1": "foo:bar"} + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(auth_info=AuthenticationInformation(issuer=target_provider)) + resp.attributes = { + "a1": ["abc:xyz", "1:foo:bar:2"], + } + filtered = filter_service.process(None, resp) + assert filtered.attributes == {"a1": ["1:foo:bar:2"]} + + def test_filter_one_attribute_for_one_target_provider_for_one_requester(self): + target_provider = "test_provider" + requester = "test_requester" + attribute_filters = { + target_provider: { + requester: + {"a1": "foo:bar"} + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(auth_info=AuthenticationInformation(issuer=target_provider)) + resp.requester = requester + resp.attributes = { + "a1": ["abc:xyz", "1:foo:bar:2"], + } + filtered = filter_service.process(None, resp) + assert filtered.attributes == {"a1": ["1:foo:bar:2"]} + + def test_filter_one_attribute_from_all_target_providers_for_all_requesters_in_extended_notation(self): + attribute_filters = { + "": { + "": { + "a2": { + "regexp": "^foo:bar$" + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo:bar", "1:foo:bar:2"], + } + filtered = filter_service.process(None, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["foo:bar"]} + + def test_invalid_filter_type(self): + attribute_filters = { + "": { + "": { + "a2": { + "invalid_filter": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo:bar", "1:foo:bar:2"], + } + with pytest.raises(SATOSAError): + filtered = filter_service.process(None, resp) + + def test_shibmdscope_match_value_filter_with_no_md_store_in_context(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_value": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo:bar", "1:foo:bar:2"], + } + ctx = Context() + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": []} + +class TestFilterAttributeValuesWithMetadataStore: + pytest.importorskip('MetadataStore') + + def create_filter_service(self, attribute_filters): + filter_service = FilterAttributeValues(config=dict(attribute_filters=attribute_filters), + name="test_filter", + base_url="https://satosa.example.com") + filter_service.next = lambda ctx, data: data + return filter_service + + def create_idp_metadata_conf_with_shibmd_scopes(self, idp_entityid, shibmd_scopes): + idp_conf = { + "entityid": idp_entityid, + "service": { + "idp":{} + } + } + + if shibmd_scopes is not None: + idp_conf["service"]["idp"]["scope"] = shibmd_scopes + + metadata_conf = { + "inline": [create_metadata_from_config_dict(idp_conf)] + } + return metadata_conf + + def test_shibmdscope_match_value_filter_with_empty_md_store_in_context(self): + + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_value": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo:bar", "1:foo:bar:2"], + } + ctx = Context() + mdstore = MetadataStore(None, None) + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": []} + + def test_shibmdscope_match_value_filter_with_idp_md_with_no_scope(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_value": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo.bar", "1.foo.bar.2"], + } + + idp_entityid = 'https://idp.example.org/' + resp.auth_info.issuer = idp_entityid + + mdstore = MetadataStore(None, Config()) + mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, None)) + ctx = Context() + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": []} + + def test_shibmdscope_match_value_filter_with_idp_md_with_single_scope(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_value": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo.bar", "1.foo.bar.2"], + } + + idp_entityid = 'https://idp.example.org/' + resp.auth_info.issuer = idp_entityid + + mdstore = MetadataStore(None, Config()) + mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["foo.bar"])) + ctx = Context() + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["foo.bar"]} + + def test_shibmdscope_match_value_filter_with_idp_md_with_single_regexp_scope(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_value": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["test.foo.bar", "1.foo.bar.2"], + } + + idp_entityid = 'https://idp.example.org/' + resp.auth_info.issuer = idp_entityid + + mdstore = MetadataStore(None, Config()) + mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["[^.]*\.foo\.bar$"])) + # mark scope as regexp (cannot be done via pysaml2 YAML config) + mdstore[idp_entityid]['idpsso_descriptor'][0]['extensions']['extension_elements'][0]['regexp'] = 'true' + ctx = Context() + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["test.foo.bar"]} + + def test_shibmdscope_match_value_filter_with_idp_md_with_multiple_scopes(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_value": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo.bar", "1.foo.bar.2", "foo.baz", "foo.baz.com"], + } + + idp_entityid = 'https://idp.example.org/' + resp.auth_info.issuer = idp_entityid + + mdstore = MetadataStore(None, Config()) + mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["foo.bar", "foo.baz"])) + ctx = Context() + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["foo.bar", "foo.baz"]} + + def test_shibmdscope_match_scope_filter_with_single_scope(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_scope": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo.bar", "value@foo.bar", "1.foo.bar.2", "value@foo.bar.2", "value@extra@foo.bar"], + } + + idp_entityid = 'https://idp.example.org/' + resp.auth_info.issuer = idp_entityid + + mdstore = MetadataStore(None, Config()) + mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["foo.bar"])) + ctx = Context() + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["value@foo.bar"]} + + def test_multiple_filters_for_single_attribute(self): + attribute_filters = { + "": { + "": { + "a2": { + "regexp": "^value1@", + "shibmdscope_match_scope": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo.bar", "value1@foo.bar", "value2@foo.bar", "1.foo.bar.2", "value@foo.bar.2", "value@extra@foo.bar"], + } + + idp_entityid = 'https://idp.example.org/' + resp.auth_info.issuer = idp_entityid + + mdstore = MetadataStore(None, Config()) + mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["foo.bar"])) + ctx = Context() + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["value1@foo.bar"]} diff --git a/tests/satosa/micro_services/test_consent.py b/tests/satosa/micro_services/test_consent.py index a8eaed965..938d4b148 100644 --- a/tests/satosa/micro_services/test_consent.py +++ b/tests/satosa/micro_services/test_consent.py @@ -6,25 +6,31 @@ import pytest import requests import responses -from jwkest.jwk import RSAKey, rsa_load -from jwkest.jws import JWS - -from saml2.saml import NAMEID_FORMAT_PERSISTENT +from cryptojwt import JWS +from satosa.cert_util import rsa_key_from_pem from satosa.context import Context from satosa.internal import AuthenticationInformation from satosa.internal import InternalData from satosa.micro_services import consent -from satosa.micro_services.consent import Consent, UnexpectedResponseError +from satosa.micro_services.consent import Consent +from satosa.micro_services.consent import UnexpectedResponseError from satosa.response import Redirect +# from cryptojwt.jwk import RSAKey, rsa_load +# from jwkest.jws import JWS +# +# from saml2.saml import NAMEID_FORMAT_PERSISTENT + FILTER = ["displayName", "co"] CONSENT_SERVICE_URL = "https://consent.example.com" -ATTRIBUTES = {"displayName": ["Test"], "co": ["example"], "sn": ["should be removed by consent filter"]} +ATTRIBUTES = {"displayName": ["Test"], "co": ["example"], + "sn": ["should be removed by consent filter"]} USER_ID_ATTR = "user_id" class TestConsent: + @pytest.fixture def consent_config(self, signing_key_path): consent_config = { @@ -37,7 +43,8 @@ def consent_config(self, signing_key_path): @pytest.fixture(autouse=True) def create_module(self, consent_config): self.consent_module = Consent(consent_config, - internal_attributes={"attributes": {}, "user_id_to_attr": USER_ID_ATTR}, + internal_attributes={"attributes": {}, + "user_id_to_attr": USER_ID_ATTR}, name="Consent", base_url="https://satosa.example.com") self.consent_module.next = lambda ctx, data: (ctx, data) @@ -52,7 +59,7 @@ def internal_response(self): @pytest.fixture def internal_request(self): req = InternalData( - subject_type=NAMEID_FORMAT_PERSISTENT, + subject_type="PERSISTENT", requester="example_requester", ) req.attributes = FILTER + ["sn"] @@ -72,14 +79,15 @@ def assert_redirect(self, redirect_resp, expected_ticket): path = urlparse(redirect_resp.message).path assert path == "/consent/" + expected_ticket - def assert_registration_req(self, request, internal_response, sign_key_path, base_url, requester_name): + def assert_registration_req(self, request, internal_response, sign_key_path, base_url, + requester_name): split_path = request.path_url.lstrip("/").split("/") assert len(split_path) == 2 jwks = split_path[1] # Verify signature - sign_key = RSAKey(key=rsa_load(sign_key_path), use="sig") - jws = JWS() + sign_key = rsa_key_from_pem(sign_key_path, use="sig", alg="RS256") + jws = JWS(alg=sign_key.alg) jws.verify_compact(jwks, [sign_key]) consent_args = jws.msg @@ -218,16 +226,18 @@ def test_filter_attributes(self): assert Counter(filtered_attributes.keys()) == Counter(FILTER) @responses.activate - def test_manage_consent_without_filter_passes_through_all_attributes(self, context, internal_response, + def test_manage_consent_without_filter_passes_through_all_attributes(self, context, + internal_response, consent_verify_endpoint_regex): # fake previous consent responses.add(responses.GET, consent_verify_endpoint_regex, status=200, body=json.dumps(list(internal_response.attributes.keys()))) - context.state[consent.STATE_KEY] = {"filter": []} # No filter + context.state[consent.STATE_KEY] = {"filter": []} # No filter self.consent_module.process(context, internal_response) consent_hash = urlparse(responses.calls[0].request.url).path.split("/")[2] - expected_hash = self.consent_module._get_consent_id(internal_response.requester, internal_response.subject_id, + expected_hash = self.consent_module._get_consent_id(internal_response.requester, + internal_response.subject_id, internal_response.attributes) assert consent_hash == expected_hash diff --git a/tests/satosa/micro_services/test_ldap_attribute_store.py b/tests/satosa/micro_services/test_ldap_attribute_store.py index e3af1a7f5..ce7ebd609 100644 --- a/tests/satosa/micro_services/test_ldap_attribute_store.py +++ b/tests/satosa/micro_services/test_ldap_attribute_store.py @@ -1,15 +1,19 @@ -import pytest - from copy import deepcopy +import pytest + from satosa.internal import AuthenticationInformation from satosa.internal import InternalData + +pytest.importorskip('ldap3') from satosa.micro_services.ldap_attribute_store import LdapAttributeStore from satosa.context import Context import logging + logging.basicConfig(level=logging.DEBUG) + class TestLdapAttributeStore: ldap_attribute_store_config = { 'default': { @@ -47,7 +51,7 @@ class TestLdapAttributeStore: 'sn': 'Baxter', 'uid': 'jbaxter', 'mail': 'jbaxter@example.com' - } + } ], ['employeeNumber=1001,ou=people,dc=example,dc=com', { 'employeeNumber': '1001', @@ -56,7 +60,7 @@ class TestLdapAttributeStore: 'sn': 'Lawson', 'uid': 'booker.lawson', 'mail': 'blawson@example.com' - } + } ], ] @@ -82,7 +86,7 @@ def ldap_attribute_store(self): def test_attributes_general(self, ldap_attribute_store): ldap_to_internal_map = (self.ldap_attribute_store_config['default'] - ['ldap_to_internal_map']) + ['ldap_to_internal_map']) for dn, attributes in self.ldap_person_records: # Mock up the internal response the LDAP attribute store is @@ -106,4 +110,4 @@ def test_attributes_general(self, ldap_attribute_store): if ldap_attr in ldap_to_internal_map: internal_attr = ldap_to_internal_map[ldap_attr] response_attr = response.attributes[internal_attr] - assert(ldap_value in response_attr) + assert (ldap_value in response_attr) diff --git a/tests/satosa/scripts/test_satosa_saml_metadata.py b/tests/satosa/scripts/test_satosa_saml_metadata.py index f76f5d990..53f2ed8db 100644 --- a/tests/satosa/scripts/test_satosa_saml_metadata.py +++ b/tests/satosa/scripts/test_satosa_saml_metadata.py @@ -3,6 +3,8 @@ import mongomock import pytest + +pytest.importorskip('saml2') from saml2.config import Config from saml2.mdstore import MetaDataFile from saml2.sigver import security_context diff --git a/tests/satosa/test_routing.py b/tests/satosa/test_routing.py index be23456ad..02010e744 100644 --- a/tests/satosa/test_routing.py +++ b/tests/satosa/test_routing.py @@ -1,14 +1,19 @@ import pytest from satosa.context import Context -from satosa.routing import ModuleRouter, SATOSANoBoundEndpointError -from tests.util import TestBackend, TestFrontend, TestRequestMicroservice, TestResponseMicroservice +from satosa.routing import ModuleRouter +from satosa.routing import SATOSANoBoundEndpointError +from tests.util_sans_saml2 import TestBackend +from tests.util_sans_saml2 import TestFrontend +from tests.util_sans_saml2 import TestRequestMicroservice +from tests.util_sans_saml2 import TestResponseMicroservice -FRONTEND_NAMES = ["Saml2IDP", "VOPaaSSaml2IDP"] -BACKEND_NAMES = ["Saml2SP", "VOPaaSSaml2SP"] +FRONTEND_NAMES = ["IDP"] +BACKEND_NAMES = ["Client"] class TestModuleRouter: + @pytest.fixture(autouse=True) def create_router(self): backends = [] @@ -21,8 +26,10 @@ def create_router(self): request_micro_service_name = "RequestService" response_micro_service_name = "ResponseService" - microservices = [TestRequestMicroservice(request_micro_service_name, base_url="https://satosa.example.com"), - TestResponseMicroservice(response_micro_service_name, base_url="https://satosa.example.com")] + microservices = [TestRequestMicroservice(request_micro_service_name, + base_url="https://satosa.example.com"), + TestResponseMicroservice(response_micro_service_name, + base_url="https://satosa.example.com")] self.router = ModuleRouter(frontends, backends, microservices) @@ -30,7 +37,7 @@ def create_router(self): ("%s/%s/request" % (provider, receiver), receiver, provider) for receiver in FRONTEND_NAMES for provider in BACKEND_NAMES - ]) + ]) def test_endpoint_routing_to_frontend(self, url_path, expected_frontend, expected_backend): context = Context() context.path = url_path @@ -40,7 +47,7 @@ def test_endpoint_routing_to_frontend(self, url_path, expected_frontend, expecte @pytest.mark.parametrize('url_path, expected_backend', [ ("%s/response" % (provider,), provider) for provider in BACKEND_NAMES - ]) + ]) def test_endpoint_routing_to_backend(self, url_path, expected_backend): context = Context() context.path = url_path @@ -57,7 +64,8 @@ def test_endpoint_routing_to_microservice(self, url_path, expected_micro_service context.path = url_path microservice_callable = self.router.endpoint_routing(context) assert context.target_micro_service == expected_micro_service - assert microservice_callable == self.router.micro_services[expected_micro_service]["instance"].callback + assert microservice_callable == self.router.micro_services[expected_micro_service][ + "instance"].callback assert context.target_backend is None assert context.target_frontend is None @@ -65,7 +73,7 @@ def test_endpoint_routing_to_microservice(self, url_path, expected_micro_service ("%s/%s/request" % (provider, receiver), receiver, provider) for receiver in FRONTEND_NAMES for provider in BACKEND_NAMES - ]) + ]) def test_module_routing(self, url_path, expected_frontend, expected_backend, context): context.path = url_path diff --git a/tests/satosa/test_state.py b/tests/satosa/test_state.py index eadee2182..c0b9c2ade 100644 --- a/tests/satosa/test_state.py +++ b/tests/satosa/test_state.py @@ -7,6 +7,7 @@ from urllib.parse import quote_plus import pytest +from satosa.util import rndstr from satosa.state import State, state_to_cookie, cookie_to_state, SATOSAStateError @@ -53,7 +54,8 @@ def test_urlstate_length_should_fit_in_browser_cookie(self): :return: """ - enc_key = "Ireallyliketoencryptthisdictionary!" + # length must be 32,48 or 64 + enc_key = b"Ireallyliketoencryptthisdictiona" state = State() my_dict_frontend = get_dict(11, get_str(10), get_str(10)) my_dict_consent = get_dict(1, get_str(10), get_str(100)) @@ -65,13 +67,13 @@ def test_urlstate_length_should_fit_in_browser_cookie(self): state["my_dict_hash"] = my_dict_hash state["my_dict_router"] = my_dict_router state["my_dict_backend"] = my_dict_backend - urlstate = state.urlstate(enc_key) + urlstate = state.pack(enc_key) # Some browsers only support 2000bytes, and since state is not the only parameter it should # not be greater then half that size. urlstate_len = len(quote_plus(urlstate)) print("Size of state on the url is:%s" % urlstate_len) assert urlstate_len < 1000, "Urlstate is way to long!" - state = State(urlstate, enc_key) + state = State(urlstate_data=urlstate, encryption_key=enc_key) assert state["my_dict_frontend"] == my_dict_frontend assert state["my_dict_consent"] == my_dict_consent assert state["my_dict_hash"] == my_dict_hash @@ -98,7 +100,7 @@ def test_encode_decode_of_state(self): cookie_name = "state_cookie" path = "/" - encrypt_key = "2781y4hef90" + encrypt_key = rndstr(32).encode() # MUST be 32, 48 or 64 bytes long cookie = state_to_cookie(state, name=cookie_name, path=path, encryption_key=encrypt_key) cookie_str = cookie[cookie_name].OutputString() diff --git a/tests/util.py b/tests/util.py index c26c796fe..276c50702 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,262 +1,20 @@ """ Contains help methods and classes to perform tests. """ -import base64 import tempfile from datetime import datetime -from urllib.parse import parse_qsl, urlparse - -from Cryptodome.PublicKey import RSA -from bs4 import BeautifulSoup -from saml2 import server, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT -from saml2.authn_context import AuthnBroker, authn_context_class_ref, PASSWORD -from saml2.cert import OpenSSLWrapper -from saml2.client import Saml2Client -from saml2.config import Config -from saml2.metadata import entity_descriptor -from saml2.saml import name_id_from_string, NAMEID_FORMAT_TRANSIENT, NAMEID_FORMAT_PERSISTENT -from saml2.samlp import NameIDPolicy + +from satosa.cert_util import generate_cert from satosa.backends.base import BackendModule from satosa.frontends.base import FrontendModule from satosa.internal import AuthenticationInformation from satosa.internal import InternalData -from satosa.micro_services.base import RequestMicroService, ResponseMicroService +from satosa.micro_services.base import RequestMicroService +from satosa.micro_services.base import ResponseMicroService from satosa.response import Response -class FakeSP(Saml2Client): - """ - A SAML service provider that can be used to perform tests. - """ - - def __init__(self, config): - """ - :type config: {dict} - :param config: SP SAML configuration. - """ - Saml2Client.__init__(self, config) - - def make_auth_req(self, entity_id, nameid_format=None, relay_state="relay_state", - request_binding=BINDING_HTTP_REDIRECT, response_binding=BINDING_HTTP_REDIRECT, - subject=None): - """ - :type entity_id: str - :rtype: str - - :param entity_id: SAML entity id - :return: Authentication URL. - """ - # Picks a binding to use for sending the Request to the IDP - _binding, destination = self.pick_binding( - 'single_sign_on_service', - [request_binding], 'idpsso', - entity_id=entity_id) - - kwargs = {} - if subject: - kwargs['subject'] = subject - - req_id, req = self.create_authn_request( - destination, - binding=response_binding, - nameid_format=nameid_format, - **kwargs - ) - - ht_args = self.apply_binding(_binding, '%s' % req, destination, - relay_state=relay_state) - - if _binding == BINDING_HTTP_POST: - form_post_html = "\n".join(ht_args["data"]) - doctree = BeautifulSoup(form_post_html, "html.parser") - saml_request = doctree.find("input", {"name": "SAMLRequest"})["value"] - resp = {"SAMLRequest": saml_request, "RelayState": relay_state} - elif _binding == BINDING_HTTP_REDIRECT: - resp = dict(parse_qsl(urlparse(dict(ht_args["headers"])["Location"]).query)) - - return destination, resp - - -class FakeIdP(server.Server): - """ - A SAML IdP that can be used to perform tests. - """ - - def __init__(self, user_db, config): - """ - :type user_db: {dict} - :type config: {dict} - - :param user_db: A dictionary with the user id as key and parameter dictionary as value. - :param config: IdP SAML configuration. - """ - server.Server.__init__(self, config=config) - self.user_db = user_db - - def __create_authn_response(self, saml_request, relay_state, binding, - userid, response_binding=BINDING_HTTP_POST): - """ - Handles a SAML request, validates and creates a SAML response but - does not apply the binding to encode it. - :type saml_request: str - :type relay_state: str - :type binding: str - :type userid: str - :rtype: tuple [string, saml2.samlp.Response] - - :param saml_request: - :param relay_state: RelayState is a parameter used by some SAML - protocol implementations to identify the specific resource at the - resource provider in an IDP initiated single sign on scenario. - :param binding: - :param userid: The user identification. - :return: A tuple containing the destination and instance of - saml2.samlp.Response - """ - auth_req = self.parse_authn_request(saml_request, binding) - binding_out, destination = self.pick_binding( - 'assertion_consumer_service', - bindings=[response_binding], - entity_id=auth_req.message.issuer.text, request=auth_req.message) - - resp_args = self.response_args(auth_req.message) - authn_broker = AuthnBroker() - authn_broker.add(authn_context_class_ref(PASSWORD), lambda: None, 10, - 'unittest_idp.xml') - authn_broker.get_authn_by_accr(PASSWORD) - resp_args['authn'] = authn_broker.get_authn_by_accr(PASSWORD) - - resp = self.create_authn_response(self.user_db[userid], - userid=userid, - **resp_args) - - return destination, resp - - def __apply_binding_to_authn_response(self, - resp, - response_binding, - relay_state, - destination): - """ - Applies the binding to the response. - """ - if response_binding == BINDING_HTTP_POST: - saml_response = base64.b64encode(str(resp).encode("utf-8")) - resp = {"SAMLResponse": saml_response, "RelayState": relay_state} - elif response_binding == BINDING_HTTP_REDIRECT: - http_args = self.apply_binding( - response_binding, - '%s' % resp, - destination, - relay_state, - response=True) - resp = dict(parse_qsl(urlparse( - dict(http_args["headers"])["Location"]).query)) - - return resp - - def handle_auth_req(self, saml_request, relay_state, binding, userid, - response_binding=BINDING_HTTP_POST): - """ - Handles a SAML request, validates and creates a SAML response. - :type saml_request: str - :type relay_state: str - :type binding: str - :type userid: str - :rtype: tuple - - :param saml_request: - :param relay_state: RelayState is a parameter used by some SAML - protocol implementations to identify the specific resource at the - resource provider in an IDP initiated single sign on scenario. - :param binding: - :param userid: The user identification. - :return: A tuple with the destination and encoded response as a string - """ - - destination, _resp = self.__create_authn_response( - saml_request, - relay_state, - binding, - userid, - response_binding) - - resp = self.__apply_binding_to_authn_response( - _resp, - response_binding, - relay_state, - destination) - - return destination, resp - - def handle_auth_req_no_name_id(self, saml_request, relay_state, binding, - userid, response_binding=BINDING_HTTP_POST): - """ - Handles a SAML request, validates and creates a SAML response but - without a element. - :type saml_request: str - :type relay_state: str - :type binding: str - :type userid: str - :rtype: tuple - - :param saml_request: - :param relay_state: RelayState is a parameter used by some SAML - protocol implementations to identify the specific resource at the - resource provider in an IDP initiated single sign on scenario. - :param binding: - :param userid: The user identification. - :return: A tuple with the destination and encoded response as a string - """ - - destination, _resp = self.__create_authn_response( - saml_request, - relay_state, - binding, - userid, - response_binding) - - # Remove the element from the response. - _resp.assertion.subject.name_id = None - - resp = self.__apply_binding_to_authn_response( - _resp, - response_binding, - relay_state, - destination) - - return destination, resp - - -def create_metadata_from_config_dict(config): - nspair = {"xs": "http://www.w3.org/2001/XMLSchema"} - conf = Config().load(config) - return entity_descriptor(conf).to_string(nspair).decode("utf-8") - - -def generate_cert(): - cert_info = { - "cn": "localhost", - "country_code": "se", - "state": "ac", - "city": "Umea", - "organization": "ITS", - "organization_unit": "DIRG" - } - osw = OpenSSLWrapper() - cert_str, key_str = osw.create_certificate(cert_info, request=False) - return cert_str, key_str - - -def write_cert(cert_path, key_path): - cert, key = generate_cert() - with open(cert_path, "wb") as cert_file: - cert_file.write(cert) - with open(key_path, "wb") as key_file: - key_file.write(key) - - class FileGenerator(object): """ Creates different types of temporary files that is useful for testing. @@ -307,81 +65,10 @@ def generate_cert(self, code=None): self.generate_certs[code] = cert_file, key_file return cert_file, key_file - def create_metadata(self, config, code=None): - """ - Will generate a metadata file. If code is used the same metadata file will - always be returned for the same code. - :type config: {dict} - :type code: str - - :param config: A SAML configuration. - :param code: A unique code to represent a certificate and key. - """ - if code in self.metadata: - return self.metadata[code] - - desc = create_metadata_from_config_dict(config) - - tmp_file = tempfile.NamedTemporaryFile() - tmp_file.write(desc.encode("utf-8")) - tmp_file.flush() - if code: - self.metadata[code] = tmp_file - return tmp_file - - -def private_to_public_key(pk_file): - f = open(pk_file, 'r') - pk = RSA.importKey(f.read()) - return pk.publickey().exportKey('PEM') - - -def create_name_id(): - """ - :rtype: str - - :return: Returns a SAML nameid as XML string. - """ - test_name_id = """ - - tmatsuo@example.com - -""" - return name_id_from_string(test_name_id) - - -def create_name_id_policy_transient(): - """ - Creates a transient name id policy. - :return: - """ - nameid_format = NAMEID_FORMAT_TRANSIENT - name_id_policy = NameIDPolicy(format=nameid_format) - return name_id_policy - - -def create_name_id_policy_persistent(): - """ - Creates a transient name id policy. - :return: - """ - nameid_format = NAMEID_FORMAT_PERSISTENT - name_id_policy = NameIDPolicy(format=nameid_format) - return name_id_policy - class FakeBackend(BackendModule): - def __init__(self, start_auth_func=None, internal_attributes=None, - base_url="", name="FakeBackend", - register_endpoints_func=None): - super().__init__(None, internal_attributes, base_url, name) - - self.start_auth_func = start_auth_func - self.register_endpoints_func = register_endpoints_func - def start_auth(self, context, request_info, state): + def start_auth(self, context, internal_request): """ TODO comment :type context: TODO comment @@ -393,7 +80,7 @@ def start_auth(self, context, request_info, state): :param state: TODO comment """ if self.start_auth: - return self.start_auth(context, request_info, state) + return self.start_auth(context, internal_request) return None def register_endpoints(self): @@ -462,7 +149,7 @@ def __init__(self, auth_callback_func, internal_attributes, config, base_url, na super().__init__(auth_callback_func, internal_attributes, base_url, name) def register_endpoints(self): - return [("^{}/response$".format(self.name), self.handle_response)] + return [(f"^{self.name}/response$", self.handle_response)] def start_auth(self, context, internal_request): return Response("Auth request received, passed to test backend") @@ -482,12 +169,13 @@ def __init__(self, auth_req_callback_func, internal_attributes, config, base_url super().__init__(auth_req_callback_func, internal_attributes, base_url, name) def register_endpoints(self, backend_names): - url_map = [("^{}/{}/request$".format(p, self.name), self.handle_request) for p in backend_names] + url_map = [(f"^{p}/{self.name}/request$", self.handle_request) for p in backend_names] return url_map def handle_request(self, context): internal_req = InternalData( - subject_type=NAMEID_FORMAT_TRANSIENT, requester="test_client" + subject_type="urn:oasis:names:tc:SAML:2.0:nameid-format:transient", + requester="test_client" ) return self.auth_req_callback_func(context, internal_req)