Skip to content

Commit

Permalink
rename logger and use DI in fastapi example
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilrybacki committed Jul 12, 2024
1 parent 8218376 commit b663756
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
24 changes: 21 additions & 3 deletions examples/fastapi_models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import logging
import typing

import fastapi
import uvicorn
Expand Down Expand Up @@ -46,12 +48,13 @@ def index():
"""


@app.get("/mockument/{mockument_id}")
def mockument(mockument_id: int):
def build_mockument_data(mockument_id: int):
if not (mockument_data := MOCKUMENTS.get(mockument_id)):
raise fastapi.HTTPException(status_code=404, detail="Mockument not found")
if validated_data := response_model.build(mockument_data):
manager.logger.info(f"Built mockument data: {validated_data}")
return validated_data
manager.logger.info(f"Failed to build mockument data: {mockument_data}")
raise fastapi.HTTPException(
status_code=400,
detail=f"Invalid data: {'; '.join([
Expand All @@ -61,5 +64,20 @@ def mockument(mockument_id: int):
)


@app.get("/mockument/{mockument_id}")
def get_mockument(
mockument: typing.Annotated[
type[phaistos.schema.TranspiledSchema],
fastapi.Depends(build_mockument_data)
]
):
return mockument


if __name__ == "__main__":
uvicorn.run(app, host="localhost", port=42069)
logging.basicConfig(level=logging.INFO)
uvicorn.run(
app,
host="localhost",
port=42069,
)
18 changes: 9 additions & 9 deletions phaistos/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@

class Manager:
_schemas: dict[str, SchemaInstancesFactory] = {}
_logger: typing.ClassVar[logging.Logger] = MANAGER_LOGGER
logger: typing.ClassVar[logging.Logger] = MANAGER_LOGGER

__instance: typing.ClassVar[Manager | None] = None
__started: typing.ClassVar[bool] = False
__last_used_schemas_dir: typing.ClassVar[str] = ''

def validate(self, data: dict, schema: str) -> ValidationResults:
self._logger.info(f'Validating data against schema: {schema}')
self.logger.info(f'Validating data against schema: {schema}')
return self.get_factory(schema).validate(data)

@classmethod
def start(cls) -> Manager:
if cls.__started and cls.__last_used_schemas_dir != os.environ.get('PHAISTOS__SCHEMA_PATH', ''):
cls._logger.info('Schema path has changed. Reloading schemas.')
cls.logger.info('Schema path has changed. Reloading schemas.')
cls.__instance = None
cls.__started = False
if not cls.__instance:
cls._logger.info('Starting Phaistos manager!')
cls.logger.info('Starting Phaistos manager!')
cls.__started = True
cls.__last_used_schemas_dir = os.environ.get('PHAISTOS__SCHEMA_PATH', '')
cls.__instance = cls()
Expand Down Expand Up @@ -74,26 +74,26 @@ def get_available_schemas(self, path: str = '') -> dict[str, SchemaInstancesFact
_model=schema
)
except tuple(DISCOVERY_EXCEPTIONS.keys()) as schema_discovery_error:
self._logger.error(
self.logger.error(
DISCOVERY_EXCEPTIONS.get(type(schema_discovery_error), f'Error while discovering schemas: {schema_discovery_error}')
)
raise schema_discovery_error

self._logger.info(
self.logger.info(
f'Available schemas: {", ".join(discovered_schemas.keys())}'
)
return discovered_schemas

def __discover_schemas(self, target_path: str) -> list[type[TranspiledSchema]]:
self._logger.info(f'Discovering schemas in: {target_path}')
self.logger.info(f'Discovering schemas in: {target_path}')
schemas: list[type[TranspiledSchema]] = []
for schema in os.listdir(target_path):
if schema.startswith('_'):
continue

schema_path = f'{target_path}/{schema}'
if not os.path.isdir(schema_path):
self._logger.info(f'Importing schema: {schema_path}')
self.logger.info(f'Importing schema: {schema_path}')

with open(schema_path, 'r', encoding='utf-8') as schema_file:
schema_data = yaml.safe_load(schema_file)
Expand All @@ -105,7 +105,7 @@ def __discover_schemas(self, target_path: str) -> list[type[TranspiledSchema]]:
return schemas

def load_schema(self, schema: SchemaInputFile) -> str:
self._logger.info(f'Loading schema: {schema["name"]}')
self.logger.info(f'Loading schema: {schema["name"]}')
schema_class = Transpiler.make_schema(schema)
self._schemas[schema['name']] = SchemaInstancesFactory(
name=schema_class.transpilation_name,
Expand Down

0 comments on commit b663756

Please sign in to comment.