Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(set_workflow_status): validate endpoint arguments (#589) #589

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1116,19 +1116,30 @@
"type": "string"
},
{
"description": "Optional. Additional input parameters and operational options for workflow execution. Possible parameters are `CACHE=on/off`, passed to disable caching of results in serial workflows, `all_runs=True/False` deletes all runs of a given workflow if status is set to deleted and `workspace=True/False` which deletes the workspace of a workflow.",
"description": "Optional. Additional parameters to customise the workflow status change.",
"in": "body",
"name": "parameters",
"required": false,
"schema": {
"properties": {
"CACHE": {
"type": "string"
},
"all_runs": {
"description": "Optional. If true, delete all runs of the workflow. Only allowed when status is `deleted`.",
"type": "boolean"
},
"input_parameters": {
"description": "Optional. Additional input parameters that override the ones from the workflow specification. Only allowed when status is `start`.",
"type": "object"
},
"operational_options": {
"description": "Optional. Additional operational options for workflow execution. Only allowed when status is `start`.",
"type": "object"
},
"restart": {
"description": "Optional. If true, the workflow is a restart of an earlier workflow execution. Only allowed when status is `start`.",
"type": "boolean"
},
"workspace": {
"description": "Optional, but must be set to true if provided. If true, delete also the workspace of the workflow. Only allowed when status is `deleted`.",
"type": "boolean"
}
},
Expand Down
23 changes: 11 additions & 12 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ def start_workflow(workflow, parameters):

def _start_workflow_db(workflow, parameters):
workflow.status = RunStatus.pending
if parameters:
workflow.input_parameters = parameters.get("input_parameters")
workflow.operational_options = parameters.get("operational_options")
workflow.input_parameters = parameters.get("input_parameters", {})
workflow.operational_options = parameters.get("operational_options", {})
current_db_sessions.add(workflow)
current_db_sessions.commit()

Expand All @@ -95,15 +94,15 @@ def _start_workflow_db(workflow, parameters):
verb=get_workflow_status_change_verb(workflow.status.name),
status=str(workflow.status.name),
)
if "restart" in parameters.keys():
if parameters["restart"]:
if workflow.status not in [
RunStatus.failed,
RunStatus.finished,
RunStatus.queued,
RunStatus.pending,
]:
raise REANAWorkflowControllerError(failure_message)

if parameters.get("restart"):
if workflow.status not in [
RunStatus.failed,
RunStatus.finished,
RunStatus.queued,
RunStatus.pending,
]:
raise REANAWorkflowControllerError(failure_message)
elif workflow.status not in [RunStatus.created, RunStatus.queued]:
if workflow.status == RunStatus.deleted:
raise REANAWorkflowStatusError(failure_message)
Expand Down
80 changes: 53 additions & 27 deletions reana_workflow_controller/rest/workflows_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import json

from flask import Blueprint, jsonify, request
from webargs import fields
from webargs.flaskparser import use_kwargs


from reana_commons.config import WORKFLOW_TIME_FORMAT
from reana_commons.errors import REANASecretDoesNotExist
Expand Down Expand Up @@ -314,7 +317,28 @@ def get_workflow_status(workflow_id_or_name): # noqa


@blueprint.route("/workflows/<workflow_id_or_name>/status", methods=["PUT"])
def set_workflow_status(workflow_id_or_name): # noqa
@use_kwargs(
{
# parameters for "start"
"input_parameters": fields.Dict(),
"operational_options": fields.Dict(),
"restart": fields.Boolean(),
# parameters for "deleted"
"all_runs": fields.Boolean(),
"workspace": fields.Boolean(),
},
location="json",
)
@use_kwargs(
{
"user": fields.Str(required=True),
"status": fields.Str(required=True),
},
location="query",
)
def set_workflow_status(
workflow_id_or_name: str, user: str, status: str, **parameters: dict
): # noqa
r"""Set workflow status.

---
Expand Down Expand Up @@ -348,21 +372,36 @@ def set_workflow_status(workflow_id_or_name): # noqa
- name: parameters
in: body
description: >-
Optional. Additional input parameters and operational options for
workflow execution. Possible parameters are `CACHE=on/off`, passed
to disable caching of results in serial workflows,
`all_runs=True/False` deletes all runs of a given workflow
if status is set to deleted and `workspace=True/False` which deletes
the workspace of a workflow.
Optional. Additional parameters to customise the workflow status change.
required: false
schema:
type: object
properties:
CACHE:
type: string
operational_options:
description: >-
Optional. Additional operational options for workflow execution.
Only allowed when status is `start`.
type: object
input_parameters:
description: >-
Optional. Additional input parameters that override the ones
from the workflow specification. Only allowed when status is `start`.
type: object
restart:
description: >-
Optional. If true, the workflow is a restart of an earlier workflow execution.
Only allowed when status is `start`.
type: boolean
all_runs:
description: >-
Optional. If true, delete all runs of the workflow.
Only allowed when status is `deleted`.
type: boolean
workspace:
description: >-
Optional, but must be set to true if provided.
If true, delete also the workspace of the workflow.
Only allowed when status is `deleted`.
type: boolean
responses:
200:
Expand Down Expand Up @@ -456,24 +495,11 @@ def set_workflow_status(workflow_id_or_name): # noqa
"""

try:
user_uuid = request.args["user"]
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid)
status = request.args.get("status")
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user)
if not (status in STATUSES):
return (
jsonify(
{
"message": "Status {0} is not one of: {1}".format(
status, ", ".join(STATUSES)
)
}
),
400,
)
error_msg = f"Status {status} is not one of: {', '.join(STATUSES)}"
return jsonify({"message": error_msg}), 400

parameters = {}
if request.is_json:
parameters = request.json
if status == START:
start_workflow(workflow, parameters)
return (
Expand All @@ -489,8 +515,8 @@ def set_workflow_status(workflow_id_or_name): # noqa
200,
)
elif status == DELETED:
all_runs = True if request.json.get("all_runs") else False
workspace = True if request.json.get("workspace", True) else False
all_runs = parameters.get("all_runs", False)
workspace = parameters.get("workspace", True)
if not workspace:
return (
jsonify(
Expand Down
49 changes: 47 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,9 +923,9 @@ def test_set_workflow_status_unknown_workflow(
url_for(
"statuses.set_workflow_status", workflow_id_or_name=random_workflow_uuid
),
query_string={"user": default_user.id_},
query_string={"user": default_user.id_, "status": payload},
content_type="application/json",
data=json.dumps(payload),
data=json.dumps({}),
)
assert res.status_code == 404

Expand Down Expand Up @@ -1151,6 +1151,51 @@ def test_start_input_parameters(
assert workflow.input_parameters == parameters["input_parameters"]


def test_start_no_input_parameters(
app,
session,
default_user,
user_secrets,
corev1_api_client_with_user_secrets,
sample_serial_workflow_in_db,
):
"""Test start workflow with inupt parameters."""
workflow = sample_serial_workflow_in_db
workflow_uuid = str(sample_serial_workflow_in_db.id_)

with app.test_client() as client:
# create workflow
workflow.status = RunStatus.created
session.add(workflow)
session.commit()

payload = START
parameters = {"operational_options": {}}
with mock.patch(
"reana_workflow_controller.workflow_run_manager."
"current_k8s_batchv1_api_client"
):
# provide user secret store
with mock.patch(
"reana_commons.k8s.secrets.current_k8s_corev1_api_client",
corev1_api_client_with_user_secrets(user_secrets),
):
# set workflow status to START and pass parameters
res = client.put(
url_for(
"statuses.set_workflow_status",
workflow_id_or_name=workflow_uuid,
),
query_string={"user": default_user.id_, "status": "start"},
content_type="application/json",
data=json.dumps(parameters),
)
json_response = json.loads(res.data.decode())
assert json_response["status"] == status_dict[payload].name
workflow = Workflow.query.filter(Workflow.id_ == workflow_uuid).first()
assert workflow.input_parameters == dict()


def test_start_workflow_db_failure(
app,
session,
Expand Down
Loading