Skip to content

Commit

Permalink
typing
Browse files Browse the repository at this point in the history
  • Loading branch information
evgeni committed Nov 1, 2024
1 parent 42ab0a5 commit 9b6f90d
Showing 1 changed file with 29 additions and 43 deletions.
72 changes: 29 additions & 43 deletions plugins/module_utils/_apypie.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,8 @@ class InvalidArgumentTypesError(Exception):

import time

from typing import cast, Optional, Set, Tuple

PER_PAGE = 2 << 31


Expand Down Expand Up @@ -678,20 +680,20 @@ def __init__(self, **kwargs):
kwargs['api_version'] = 2
super().__init__(**kwargs)

def _resource(self, resource):
def _resource(self, resource: str) -> 'Resource':
if resource not in self.resources:
raise ForemanApiException(msg="The server doesn't know about {0}, is the right plugin installed?".format(resource))
raise ForemanApiException(msg=f"The server doesn't know about {resource}, is the right plugin installed?")
return self.resource(resource)

def _resource_call(self, resource, *args, **kwargs):
def _resource_call(self, resource: str, *args, **kwargs) -> Optional[dict]:
return self._resource(resource).call(*args, **kwargs)

def _resource_prepare_params(self, resource, action, params):
def _resource_prepare_params(self, resource: str, action: str, params: dict) -> dict:
api_action = self._resource(resource).action(action)
return api_action.prepare_params(params)

def resource_action(self, resource, action, params, options=None, data=None, files=None, # pylint: disable=too-many-arguments
ignore_task_errors=False):
def resource_action(self, resource: str, action: str, params: dict, options=None, data=None, files=None, # pylint: disable=too-many-arguments
ignore_task_errors: bool = False) -> Optional[dict]:
"""
Perform a generic action on a resource
Expand All @@ -703,15 +705,14 @@ def resource_action(self, resource, action, params, options=None, data=None, fil
try:
result = self._resource_call(resource, action, resource_payload, options=options, data=data, files=files)
is_foreman_task = isinstance(result, dict) and 'action' in result and 'state' in result and 'started_at' in result
if is_foreman_task:
if result and is_foreman_task:
result = self.wait_for_task(result, ignore_errors=ignore_task_errors)
except Exception as exc:
msg = 'Error while performing {0} on {1}: {2}'.format(
action, resource, str(exc))
msg = f'Error while performing {action} on {resource}: {exc}'
raise ForemanApiException.from_exception(exc, msg) from exc
return result

def wait_for_task(self, task, ignore_errors=False):
def wait_for_task(self, task: dict, ignore_errors: bool = False) -> dict:
"""
Wait for a foreman-tasks task, polling it every ``self.task_poll`` seconds.
Expand All @@ -721,102 +722,91 @@ def wait_for_task(self, task, ignore_errors=False):
while task['state'] not in ['paused', 'stopped']:
duration -= self.task_poll
if duration <= 0:
raise ForemanApiException(msg="Timeout waiting for Task {0}".format(task['id']))
raise ForemanApiException(msg=f"Timeout waiting for Task {task['id']}")
time.sleep(self.task_poll)

resource_payload = self._resource_prepare_params('foreman_tasks', 'show', {'id': task['id']})
task = self._resource_call('foreman_tasks', 'show', resource_payload)
task = cast(dict, self._resource_call('foreman_tasks', 'show', resource_payload))
if not ignore_errors and task['result'] != 'success':
msg = 'Task {0}({1}) did not succeed. Task information: {2}'.format(task['action'], task['id'], task['humanized']['errors'])
msg = f"Task {task['action']}({task['id']}) did not succeed. Task information: {task['humanized']['errors']}"
raise ForemanApiException(msg=msg)
return task

def show(self, resource, resource_id, params=None):
def show(self, resource: str, resource_id: int, params: Optional[dict] = None) -> Optional[dict]:
"""
Execute the ``show`` action on an entity.
:param resource: Plural name of the api resource to show
:type resource: str
:param resource_id: The ID of the entity to show
:type resource_id: int
:param params: Lookup parameters (i.e. parent_id for nested entities)
:type params: Union[dict,None], optional
:return: The entity
"""
payload = {'id': resource_id}
if params:
payload.update(params)
return self.resource_action(resource, 'show', payload)

def list(self, resource, search=None, params=None):
def list(self, resource: str, search: Optional[str] = None, params: Optional[dict] = None) -> list:
"""
Execute the ``index`` action on an resource.
:param resource: Plural name of the api resource to show
:type resource: str
:param search: Search string as accepted by the API to limit the results
:type search: str, optional
:param params: Lookup parameters (i.e. parent_id for nested entities)
:type params: Union[dict,None], optional
:return: List of results
"""
payload = {'per_page': PER_PAGE}
payload: dict = {'per_page': PER_PAGE}
if search is not None:
payload['search'] = search
if params:
payload.update(params)

return self.resource_action(resource, 'index', payload)['results']
result = self.resource_action(resource, 'index', payload)
if result:
return result['results']
return []

def create(self, resource, desired_entity, params=None):
def create(self, resource: str, desired_entity: dict, params: Optional[dict] = None) -> Optional[dict]:
"""
Create entity with given properties
:param resource: Plural name of the api resource to manipulate
:type resource: str
:param desired_entity: Desired properties of the entity
:type desired_entity: dict
:param params: Lookup parameters (i.e. parent_id for nested entities)
:type params: dict, optional
:return: The new current state of the entity
:rtype: dict
"""
payload = desired_entity.copy()
if params:
payload.update(params)
return self.resource_action(resource, 'create', payload)

def update(self, resource, desired_entity, params=None):
def update(self, resource: str, desired_entity: dict, params: Optional[dict] = None) -> Optional[dict]:
"""
Update entity with given properties
:param resource: Plural name of the api resource to manipulate
:type resource: str
:param desired_entity: Desired properties of the entity
:type desired_entity: dict
:param params: Lookup parameters (i.e. parent_id for nested entities)
:type params: dict, optional
:return: The new current state of the entity
:rtype: dict
"""
payload = desired_entity.copy()
if params:
payload.update(params)
return self.resource_action(resource, 'update', payload)

def delete(self, resource, current_entity, params=None): # pylint: disable=useless-return
def delete(self, resource: str, current_entity: dict, params: Optional[dict] = None) -> None: # pylint: disable=useless-return
"""
Delete a given entity
:param resource: Plural name of the api resource to manipulate
:type resource: str
:param current_entity: Current properties of the entity
:type current_entity: dict
:param params: Lookup parameters (i.e. parent_id for nested entities)
:type params: dict, optional
:return: The new current state of the entity
:rtype: Union[dict,None]
"""
payload = {'id': current_entity['id']}
if params:
Expand All @@ -829,19 +819,15 @@ def delete(self, resource, current_entity, params=None): # pylint: disable=usel

return None

def validate_payload(self, resource, action, payload):
def validate_payload(self, resource: str, action: str, payload: dict) -> Tuple[dict, Set[str]]:
"""
Check whether the payload only contains supported keys.
:param resource: Plural name of the api resource to check
:type resource: str
:param action: Name of the action to check payload against
:type action: str
:param payload: API paylod to be checked
:type payload: dict
:return: The payload as it can be submitted to the API and set of unssuported parameters
:rtype: tuple(dict, set)
"""
filtered_payload = self._resource_prepare_params(resource, action, payload)
unsupported_parameters = set(payload.keys()) - set(_recursive_dict_keys(filtered_payload))
Expand Down

0 comments on commit 9b6f90d

Please sign in to comment.