From 5945d7fca095531b3601e551c527457f9413643c Mon Sep 17 00:00:00 2001 From: Marco Donadoni Date: Tue, 25 Jun 2024 18:19:31 +0200 Subject: [PATCH] fix(set_workflow_status): validate endpoint arguments (#589) Closes reanahub/reana-client#718 --- docs/openapi.json | 19 ++++- reana_workflow_controller/rest/utils.py | 23 +++--- .../rest/workflows_status.py | 80 ++++++++++++------- tests/test_views.py | 49 +++++++++++- 4 files changed, 126 insertions(+), 45 deletions(-) diff --git a/docs/openapi.json b/docs/openapi.json index 506ab87b..e082d75d 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -1116,19 +1116,30 @@ "type": "string" }, { - "description": "Optional. Additional input parameters and operational options for workflow execution. Possible parameters are `CACHE=on/off`, passed to disable caching of results in serial workflows, `all_runs=True/False` deletes all runs of a given workflow if status is set to deleted and `workspace=True/False` which deletes the workspace of a workflow.", + "description": "Optional. Additional parameters to customise the workflow status change.", "in": "body", "name": "parameters", "required": false, "schema": { "properties": { - "CACHE": { - "type": "string" - }, "all_runs": { + "description": "Optional. If true, delete all runs of the workflow. Only allowed when status is `deleted`.", + "type": "boolean" + }, + "input_parameters": { + "description": "Optional. Additional input parameters that override the ones from the workflow specification. Only allowed when status is `start`.", + "type": "object" + }, + "operational_options": { + "description": "Optional. Additional operational options for workflow execution. Only allowed when status is `start`.", + "type": "object" + }, + "restart": { + "description": "Optional. If true, the workflow is a restart of an earlier workflow execution. Only allowed when status is `start`.", "type": "boolean" }, "workspace": { + "description": "Optional, but must be set to true if provided. If true, delete also the workspace of the workflow. Only allowed when status is `deleted`.", "type": "boolean" } }, diff --git a/reana_workflow_controller/rest/utils.py b/reana_workflow_controller/rest/utils.py index 235e514b..ee67c167 100644 --- a/reana_workflow_controller/rest/utils.py +++ b/reana_workflow_controller/rest/utils.py @@ -79,9 +79,8 @@ def start_workflow(workflow, parameters): def _start_workflow_db(workflow, parameters): workflow.status = RunStatus.pending - if parameters: - workflow.input_parameters = parameters.get("input_parameters") - workflow.operational_options = parameters.get("operational_options") + workflow.input_parameters = parameters.get("input_parameters", {}) + workflow.operational_options = parameters.get("operational_options", {}) current_db_sessions.add(workflow) current_db_sessions.commit() @@ -95,15 +94,15 @@ def _start_workflow_db(workflow, parameters): verb=get_workflow_status_change_verb(workflow.status.name), status=str(workflow.status.name), ) - if "restart" in parameters.keys(): - if parameters["restart"]: - if workflow.status not in [ - RunStatus.failed, - RunStatus.finished, - RunStatus.queued, - RunStatus.pending, - ]: - raise REANAWorkflowControllerError(failure_message) + + if parameters.get("restart"): + if workflow.status not in [ + RunStatus.failed, + RunStatus.finished, + RunStatus.queued, + RunStatus.pending, + ]: + raise REANAWorkflowControllerError(failure_message) elif workflow.status not in [RunStatus.created, RunStatus.queued]: if workflow.status == RunStatus.deleted: raise REANAWorkflowStatusError(failure_message) diff --git a/reana_workflow_controller/rest/workflows_status.py b/reana_workflow_controller/rest/workflows_status.py index 72c0b3b1..0d3c2548 100644 --- a/reana_workflow_controller/rest/workflows_status.py +++ b/reana_workflow_controller/rest/workflows_status.py @@ -11,6 +11,9 @@ import json from flask import Blueprint, jsonify, request +from webargs import fields +from webargs.flaskparser import use_kwargs + from reana_commons.config import WORKFLOW_TIME_FORMAT from reana_commons.errors import REANASecretDoesNotExist @@ -314,7 +317,28 @@ def get_workflow_status(workflow_id_or_name): # noqa @blueprint.route("/workflows//status", methods=["PUT"]) -def set_workflow_status(workflow_id_or_name): # noqa +@use_kwargs( + { + # parameters for "start" + "input_parameters": fields.Dict(), + "operational_options": fields.Dict(), + "restart": fields.Boolean(), + # parameters for "deleted" + "all_runs": fields.Boolean(), + "workspace": fields.Boolean(), + }, + location="json", +) +@use_kwargs( + { + "user": fields.Str(required=True), + "status": fields.Str(required=True), + }, + location="query", +) +def set_workflow_status( + workflow_id_or_name: str, user: str, status: str, **parameters: dict +): # noqa r"""Set workflow status. --- @@ -348,21 +372,36 @@ def set_workflow_status(workflow_id_or_name): # noqa - name: parameters in: body description: >- - Optional. Additional input parameters and operational options for - workflow execution. Possible parameters are `CACHE=on/off`, passed - to disable caching of results in serial workflows, - `all_runs=True/False` deletes all runs of a given workflow - if status is set to deleted and `workspace=True/False` which deletes - the workspace of a workflow. + Optional. Additional parameters to customise the workflow status change. required: false schema: type: object properties: - CACHE: - type: string + operational_options: + description: >- + Optional. Additional operational options for workflow execution. + Only allowed when status is `start`. + type: object + input_parameters: + description: >- + Optional. Additional input parameters that override the ones + from the workflow specification. Only allowed when status is `start`. + type: object + restart: + description: >- + Optional. If true, the workflow is a restart of an earlier workflow execution. + Only allowed when status is `start`. + type: boolean all_runs: + description: >- + Optional. If true, delete all runs of the workflow. + Only allowed when status is `deleted`. type: boolean workspace: + description: >- + Optional, but must be set to true if provided. + If true, delete also the workspace of the workflow. + Only allowed when status is `deleted`. type: boolean responses: 200: @@ -456,24 +495,11 @@ def set_workflow_status(workflow_id_or_name): # noqa """ try: - user_uuid = request.args["user"] - workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid) - status = request.args.get("status") + workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user) if not (status in STATUSES): - return ( - jsonify( - { - "message": "Status {0} is not one of: {1}".format( - status, ", ".join(STATUSES) - ) - } - ), - 400, - ) + error_msg = f"Status {status} is not one of: {', '.join(STATUSES)}" + return jsonify({"message": error_msg}), 400 - parameters = {} - if request.is_json: - parameters = request.json if status == START: start_workflow(workflow, parameters) return ( @@ -489,8 +515,8 @@ def set_workflow_status(workflow_id_or_name): # noqa 200, ) elif status == DELETED: - all_runs = True if request.json.get("all_runs") else False - workspace = True if request.json.get("workspace", True) else False + all_runs = parameters.get("all_runs", False) + workspace = parameters.get("workspace", True) if not workspace: return ( jsonify( diff --git a/tests/test_views.py b/tests/test_views.py index 97d80e3d..4ee902ce 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -923,9 +923,9 @@ def test_set_workflow_status_unknown_workflow( url_for( "statuses.set_workflow_status", workflow_id_or_name=random_workflow_uuid ), - query_string={"user": default_user.id_}, + query_string={"user": default_user.id_, "status": payload}, content_type="application/json", - data=json.dumps(payload), + data=json.dumps({}), ) assert res.status_code == 404 @@ -1151,6 +1151,51 @@ def test_start_input_parameters( assert workflow.input_parameters == parameters["input_parameters"] +def test_start_no_input_parameters( + app, + session, + default_user, + user_secrets, + corev1_api_client_with_user_secrets, + sample_serial_workflow_in_db, +): + """Test start workflow with inupt parameters.""" + workflow = sample_serial_workflow_in_db + workflow_uuid = str(sample_serial_workflow_in_db.id_) + + with app.test_client() as client: + # create workflow + workflow.status = RunStatus.created + session.add(workflow) + session.commit() + + payload = START + parameters = {"operational_options": {}} + with mock.patch( + "reana_workflow_controller.workflow_run_manager." + "current_k8s_batchv1_api_client" + ): + # provide user secret store + with mock.patch( + "reana_commons.k8s.secrets.current_k8s_corev1_api_client", + corev1_api_client_with_user_secrets(user_secrets), + ): + # set workflow status to START and pass parameters + res = client.put( + url_for( + "statuses.set_workflow_status", + workflow_id_or_name=workflow_uuid, + ), + query_string={"user": default_user.id_, "status": "start"}, + content_type="application/json", + data=json.dumps(parameters), + ) + json_response = json.loads(res.data.decode()) + assert json_response["status"] == status_dict[payload].name + workflow = Workflow.query.filter(Workflow.id_ == workflow_uuid).first() + assert workflow.input_parameters == dict() + + def test_start_workflow_db_failure( app, session,