Skip to content

Commit

Permalink
Merge develop branch. Release v0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
serhiibuniak-okta authored Jul 30, 2021
2 parents ca0b68f + 44a4fcc commit bff9fd3
Show file tree
Hide file tree
Showing 11 changed files with 342 additions and 56 deletions.
18 changes: 18 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# IDE generated files
.vscode/

# Python generated files
__pycache__/
*.pyc
.pytest_cache

# Packaging generated files
*.egg-info
build/
dist/

# Tox
.tox

# Code coverage
.coverage
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
# OKTA JWT Verifier Changelog

## v0.2.0
- Add classes IDTokenVerifier and AccessTokenVerifier
- Mark JWTVerifier class as deprecated. This class will be removed in the next major version.
- Add proxy support
- Update README
- Few codebase improvements

_New features:_
- Separate classes for verifying ID Tokens and Access Tokens
- Add proxy support

## v0.1.0
- Initial release
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ publish\:test:

publish\:prod:
python3 -m twine upload dist/*

test:
@echo "Run unittests"
pytest tests/unit/
50 changes: 48 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ from okta_jwt_verifier import JWTVerifier


async def main():
jwt_verifier = JWTVerifier('{ISSUER}', '{CLIENT_ID}', 'api://default')
jwt_verifier = JWTVerifier(issuer='{ISSUER}', audience='api://default')
await jwt_verifier.verify_access_token('{JWT}')
print('Token validated successfully.')

Expand All @@ -77,7 +77,7 @@ from okta_jwt_verifier import JWTVerifier


async def main():
jwt_verifier = JWTVerifier('{ISSUER}', '{CLIENT_ID}', 'api://default')
jwt_verifier = JWTVerifier(issuer='{ISSUER}', client_id='{CLIENT_ID}', audience='api://default')
await jwt_verifier.verify_id_token('{JWT}', nonce='{NONCE}')
print('Token validated successfully.')

Expand All @@ -87,6 +87,40 @@ loop.run_until_complete(main())
```
> Note: parameter `nonce` is optional and required only if token was generated with nonce.
Another option - use class dedicated to ID tokens verification:
```py
import asyncio

from okta_jwt_verifier import IDTokenVerifier


async def main():
jwt_verifier = IDTokenVerifier(issuer='{ISSUER}', client_id='{CLIENT_ID}', audience='api://default')
await jwt_verifier.verify('{JWT}', nonce='{NONCE}')
print('Token validated successfully.')


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
```

Verify Access Token
```py
import asyncio

from okta_jwt_verifier import AccessTokenVerifier


async def main():
jwt_verifier = AccessTokenVerifier(issuer='{ISSUER}', audience='api://default')
await jwt_verifier.verify('{JWT}')
print('Token validated successfully.')


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
```

It is possible to verify signature if JWK is provided (no async requests):
```py
from okta_jwt_verifier import JWTVerifier
Expand Down Expand Up @@ -151,6 +185,18 @@ def main():
main()
```

v 0.2.0 allows to work via proxy:
```py
# JWTVerifier will be deprecated soon
jwt_verifier = JWTVerifier(issuer='{ISSUER}', proxy='{PROXY}')

# The same for AccessTokenVerifier
jwt_verifier = AccessTokenVerifier(issuer='{ISSUER}', proxy='{PROXY}')

# or IDTokenVerifier
jwt_verifier = IDTokenVerifier(issuer='{ISSUER}', proxy='{PROXY}')
```

## Exceptions

If token is invalid (malformed, expired, etc.), verifier will raise an exception `JWTValidationException`:
Expand Down
5 changes: 3 additions & 2 deletions okta_jwt_verifier/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Allow to verify JWT locally
"""
__version__ = '0.1.0'
__version__ = '0.2.0'

from .jwt_verifier import JWTVerifier # noqa
from .jwt_verifier import JWTVerifier, AccessTokenVerifier, IDTokenVerifier # noqa
from .jwt_utils import JWTUtils # noqa
33 changes: 23 additions & 10 deletions okta_jwt_verifier/config_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ def __init__(self, config):

def validate_config(self):
"""Main method, validates whole config."""
self.validate_issuer(self.config.get('issuer'))
self.validate_client_id(self.config.get('client_id'))
self.validate_audience(self.config.get('audience'))
numbers = ('max_retries', 'max_requests', 'request_timeout', 'leeway')
for number_variable in numbers:
self.validate_number(self.config.get(number_variable), number_variable)
self.validate_issuer()
self.validate_client_id()
self.validate_audience()
self.validate_numbers()

def validate_issuer(self, issuer, https_check=True):
def validate_issuer(self, issuer=None, https_check=True):
"""Validates issuer."""
issuer = issuer or self.config.get('issuer')
if not issuer:
raise JWTInvalidConfigException(ERROR_MESSAGE_ORG_URL_MISSING)
if not isinstance(issuer, str):
Expand All @@ -44,20 +44,33 @@ def validate_issuer(self, issuer, https_check=True):
if issuer.count('://') > 1:
raise JWTInvalidConfigException(ERROR_MESSAGE_ORG_URL_TYPO)

def validate_client_id(self, client_id):
def validate_client_id(self, client_id=None):
"""Validates client_id."""
client_id = client_id or self.config.get('client_id')
if not client_id:
raise JWTInvalidConfigException(ERROR_MESSAGE_CLIENT_ID_MISSING)
if not isinstance(client_id, str):
raise JWTInvalidConfigException(ERROR_MESSAGE_CLIENT_ID_WRONG_TYPE)
if '{clientId}' in client_id:
raise JWTInvalidConfigException(ERROR_MESSAGE_CLIENT_ID_DEFAULT)

def validate_audience(self, audience):
def validate_audience(self, audience=None):
"""Validates audience."""
audience = audience or self.config.get('audience')
if not audience:
raise JWTInvalidConfigException(ERROR_MESSAGE_AUDIENCE_MISSING)

def validate_number(self, number, variable_name):
def _validate_number(self, number, variable_name):
"""Validates param which should be represented as integer and >= 0"""
if not isinstance(number, int):
raise JWTInvalidConfigException(f'{variable_name} should be type of int.')
if number < 0:
raise JWTInvalidConfigException(f'Value of {variable_name} should be 0 or greater.')

def validate_numbers(self, numbers=('max_retries',
'max_requests',
'request_timeout',
'leeway')):
"""Validates all number parameters."""
for number_variable in numbers:
self._validate_number(self.config.get(number_variable), number_variable)
64 changes: 64 additions & 0 deletions okta_jwt_verifier/jwt_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import json

from jose import jwt, jws

from .constants import LEEWAY
from .exceptions import JWTValidationException


class JWTUtils:
"""Contains different utils and common methods for jwt verification."""

@staticmethod
def parse_token(token):
"""Parse JWT token, get headers, claims and signature.
Return:
tuple (headers, claims, signing_input, signature)
"""
headers, payload, signing_input, signature = jws._load(token)
claims = json.loads(payload.decode('utf-8'))
return (headers, claims, signing_input, signature)

@staticmethod
def verify_claims(claims,
claims_to_verify,
audience,
issuer,
leeway=LEEWAY):
"""Verify claims are present and valid."""
# Check if required claims are present, because library "jose" doesn't raise an exception
for claim in claims_to_verify:
if claim not in claims:
raise JWTValidationException(f'Required claim "{claim}" is not present.')

# Overwrite defaults in python-jose library
options = {'verify_aud': 'aud' in claims_to_verify,
'verify_iat': 'iat' in claims_to_verify,
'verify_exp': 'exp' in claims_to_verify,
'verify_nbf': 'nbf' in claims_to_verify,
'verify_iss': 'iss' in claims_to_verify,
'verify_sub': 'sub' in claims_to_verify,
'verify_jti': 'jti' in claims_to_verify,
'leeway': leeway}
# Validate claims
jwt._validate_claims(claims,
audience=audience,
issuer=issuer,
options=options)

@staticmethod
def verify_signature(token, okta_jwk):
"""Verify token signature using received jwk."""
headers, claims, signing_input, signature = JWTUtils.parse_token(token)
jws._verify_signature(signing_input=signing_input,
header=headers,
signature=signature,
key=okta_jwk,
algorithms=['RS256'])

@staticmethod
def verify_expiration(token, leeway=LEEWAY):
"""Verify if token is not expired."""
headers, claims, signing_input, signature = JWTUtils.parse_token(token)
JWTUtils.verify_claims(claims, claims_to_verify=('exp'), leeway=LEEWAY)
Loading

0 comments on commit bff9fd3

Please sign in to comment.