diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java index 4d72ad495f..76a64ebd56 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java @@ -361,6 +361,14 @@ public void updateTask(TaskModel task) { } } + @Override + public void updateTasksInBatch(List tasks) { + TaskModel task = tasks.get(0); + LOGGER.debug( + "Workflow Task removed from TASKS_IN_PROGRESS_STATUS with tasksInProgressKey: {}, workflowId: {}, taskId: {}, taskType: {}, taskStatus: {} during updateTask", + task.getTaskDefName(), task.getWorkflowInstanceId(), task.getTaskId(), task.getTaskType(), task.getStatus().name()); + } + /** * @method to verify the task status and update the task_in_progress table * also removes if its a terminal task diff --git a/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java b/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java index 3b011c93db..51ab1083d8 100644 --- a/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java +++ b/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java @@ -164,7 +164,7 @@ void pollAndExecute(Worker worker) { for (Task task : tasks) { if (Objects.nonNull(task) && StringUtils.isNotBlank(task.getTaskId())) { MetricsContainer.incrementTaskPollCount(taskType, 1); - LOGGER.debug( + LOGGER.info( "Polled task: {} of type: {} in domain: '{}', from worker: {}", task.getTaskId(), taskType, @@ -236,6 +236,7 @@ void shutdownAndAwaitTermination(ExecutorService executorService, int timeout) { }; private Task processTask(Task task, Worker worker, PollingSemaphore pollingSemaphore) { + long start = System.currentTimeMillis(); LOGGER.debug( "Executing task: {} of type: {} in worker: {} at {}", task.getTaskId(), @@ -251,10 +252,13 @@ private Task processTask(Task task, Worker worker, PollingSemaphore pollingSemap } finally { pollingSemaphore.complete(1); } + LOGGER.info("[Conductor] time taken by task {} for WF {} in processTask is {}", task.getTaskId(), task.getWorkflowInstanceId(), + (System.currentTimeMillis() - start)); return task; } private void executeTask(Worker worker, Task task) { + long start = System.currentTimeMillis(); StopWatch stopwatch = new StopWatch(); stopwatch.start(); TaskResult result = null; @@ -292,6 +296,8 @@ private void executeTask(Worker worker, Task task) { worker.getIdentity(), result.getStatus()); updateTaskResult(updateRetryCount, task, result, worker); + LOGGER.info("[Conductor] time taken by task {} for WF {} in executeTask is {}", task.getTaskId(), task.getWorkflowInstanceId(), + (System.currentTimeMillis() - start)); } private void finalizeTask(Task task, Throwable throwable) { @@ -334,7 +340,11 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo retryOperation( (TaskResult taskResult) -> { + long start = System.currentTimeMillis(); taskClient.updateTask(taskResult); + LOGGER.info("[Conductor] time taken by task {} for WF {} in updateTaskResult is {}", task.getTaskId(), + task.getWorkflowInstanceId(), + (System.currentTimeMillis() - start)); return null; }, count, diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index d624ef0f6a..7e34213419 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -236,8 +236,12 @@ private void populateTaskPayloads(Task task) { * @param taskResult the {@link TaskResult} of the executed task to be updated. */ public void updateTask(TaskResult taskResult) { + long start = System.currentTimeMillis(); Validate.notNull(taskResult, "Task result cannot be null"); postForEntityWithRequestOnly("tasks", taskResult); + LOGGER.info("[Conductor] time taken by task {} for WF {} in updateTask is {}", taskResult.getTaskId(), + taskResult.getWorkflowInstanceId(), + (System.currentTimeMillis() - start)); } public Optional evaluateAndUploadLargePayload( diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index d881500058..7ef610213a 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -492,6 +492,7 @@ public long getInProgressTaskCount(String taskDefName) { * payload fails. */ public void updateTask(TaskModel taskModel) { + long start = System.currentTimeMillis(); if (taskModel.getStatus() != null) { if (!taskModel.getStatus().isTerminal() || (taskModel.getStatus().isTerminal() && taskModel.getUpdateTime() == 0)) { @@ -524,10 +525,13 @@ public void updateTask(TaskModel taskModel) { LOGGER.error(errorMsg, e); throw new TransientException(errorMsg, e); } + LOGGER.info("[Conductor] [ExecutionDAOFacade] updateTask Time taken for task: {},for workflowInstanceId {} and status {} and time is :{}", + taskModel.getTaskId(), taskModel.getWorkflowInstanceId(), taskModel.getStatus(), (System.currentTimeMillis() - start)); } public void updateTasks(List tasks) { - tasks.forEach(this::updateTask); + executionDAO.updateTasksInBatch(tasks); + //tasks.forEach(this::updateTask); } public void removeTask(String taskId) { diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index c4b68d749d..5b11e371ff 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -13,6 +13,7 @@ package com.netflix.conductor.core.execution; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -705,6 +706,7 @@ public WorkflowModel terminateWorkflow( * @throws NotFoundException if the Task is not found. */ public void updateTask(TaskResult taskResult) { + long start = System.currentTimeMillis(); if (taskResult == null) { throw new IllegalArgumentException("Task object is null"); } else if (taskResult.isExtendLease()) { @@ -714,7 +716,30 @@ public void updateTask(TaskResult taskResult) { String workflowId = taskResult.getWorkflowInstanceId(); WorkflowModel workflowInstance = executionDAOFacade.getWorkflowModel(workflowId, false); + LOGGER.info( + "[Conductor] [WorkflowExecutor] workflowInstance Time taken for task: {},for workflowInstanceId {} and status {} " + + "and workerId {} and time is :{}", + taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), taskResult.getWorkerId(), + (System.currentTimeMillis() - start)); + + long start1 = System.currentTimeMillis(); + List tasks = workflowInstance.getTasks(); + TaskModel task = null; + for (TaskModel t : tasks) { + if (t.getTaskId().equalsIgnoreCase(taskResult.getTaskId())) { + task = t; + } + LOGGER.info( + "[Conductor] [WorkflowExecutor] getTaskModel Time taken for task: {},for workflowInstanceId {} and status {} " + + "and workerId {} and time is :{}", + taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), taskResult.getWorkerId(), + (System.currentTimeMillis() - start1)); + } + if (task == null) { + throw new NotFoundException("No such task found by id: %s", taskResult.getTaskId()); + } + /*long start1 = System.currentTimeMillis(); TaskModel task = Optional.ofNullable(executionDAOFacade.getTaskModel(taskResult.getTaskId())) .orElseThrow( @@ -722,6 +747,11 @@ public void updateTask(TaskResult taskResult) { new NotFoundException( "No such task found by id: %s", taskResult.getTaskId())); + LOGGER.info( + "[Conductor] [WorkflowExecutor] getTaskModel Time taken for task: {},for workflowInstanceId {} and status {} " + + "and workerId {} and time is :{}", + taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), taskResult.getWorkerId(), + (System.currentTimeMillis() - start1));*/ LOGGER.debug("WE updateTask: taskId {} with taskStatus {} belonging to workflowId {} being updated with workflowStatus {}", task.getTaskId(), task.getStatus(), workflowInstance, workflowInstance.getStatus()); @@ -838,9 +868,15 @@ public void updateTask(TaskResult taskResult) { // Throw a TransientException if below operations fail to avoid workflow inconsistencies. try { + long start2 = System.currentTimeMillis(); LOGGER.debug("WE calling SED.updateTask: taskId {} with taskStatus {} belonging to workflowId {} and workflowStatus {}", task.getTaskId(), task.getStatus(), workflowInstance, workflowInstance.getStatus()); executionDAOFacade.updateTask(task); + LOGGER.info( + "[Conductor] [WorkflowExecutor] internal updateTask Time taken for task: {},for workflowInstanceId {} and status {} " + + "and workerId {} and time is :{}", + taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), taskResult.getWorkerId(), + (System.currentTimeMillis() - start2)); } catch (Exception e) { String errorMsg = String.format( @@ -861,7 +897,10 @@ public void updateTask(TaskResult taskResult) { LOGGER.error(errorMsg, e); } - taskResult.getLogs().forEach(taskExecLog -> taskExecLog.setTaskId(task.getTaskId())); + for(TaskExecLog taskExecLog : taskResult.getLogs()) { + taskExecLog.setTaskId(task.getTaskId()); + } + //taskResult.getLogs().forEach(taskExecLog -> taskExecLog.setTaskId(task.getTaskId())); executionDAOFacade.addTaskExecLog(taskResult.getLogs()); if (task.getStatus().isTerminal()) { @@ -873,9 +912,19 @@ public void updateTask(TaskResult taskResult) { task.getTaskDefName(), lastDuration, false, task.getStatus()); } + long start3 = System.currentTimeMillis(); if (!isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) { - decide(workflowId); + //decide(workflowId); + decide(workflowInstance); } + LOGGER.info( + "[Conductor] [WorkflowExecutor] decide Time taken for task: {},for workflowInstanceId {} and status {} " + + "and workerId {} and time is :{}", + taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), taskResult.getWorkerId(), + (System.currentTimeMillis() - start3)); + LOGGER.info( + "[Conductor] [WorkflowExecutor] complete updateTask Time taken for task: {},for workflowInstanceId {} and status {} and time is :{}", + taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), (System.currentTimeMillis() - start)); } private void notifyTaskStatusListener(TaskModel task) { @@ -1037,6 +1086,9 @@ public WorkflowModel decide(String workflowId) { * No locking is required or lock is acquired externally */ public WorkflowModel decide(WorkflowModel workflow) { + long start1 = System.currentTimeMillis(); + + long start2 = System.currentTimeMillis(); if (workflow.getStatus().isTerminal()) { if (!workflow.getStatus().isSuccessful()) { cancelNonTerminalTasks(workflow); @@ -1047,7 +1099,6 @@ public WorkflowModel decide(WorkflowModel workflow) { // we find any sub workflow tasks that have changed // and change the workflow/task state accordingly adjustStateIfSubWorkflowChanged(workflow); - try { DeciderService.DeciderOutcome outcome = deciderService.decide(workflow); if (outcome.isComplete) { @@ -1061,8 +1112,19 @@ public WorkflowModel decide(WorkflowModel workflow) { tasksToBeScheduled = dedupAndAddTasks(workflow, tasksToBeScheduled); + LOGGER.info( + "[Conductor] [WorkflowExecutor] [decide] 1st adjustStateIfSubWorkflowChanged Time taken for workflowInstanceId {} " + + "and outcome.isComplete {} and time is :{}", + workflow.getWorkflowId(), outcome.isComplete, (System.currentTimeMillis() - start2)); + + start2 = System.currentTimeMillis(); boolean stateChanged = scheduleTask(workflow, tasksToBeScheduled); // start + LOGGER.info( + "[Conductor] [WorkflowExecutor] [decide] 2nd find scheduleTask stateChanged Time taken for workflowInstanceId {} " + + "and stateChanged {} and time is :{}", + workflow.getWorkflowId(), stateChanged, (System.currentTimeMillis() - start2)); + long start3 = System.currentTimeMillis(); for (TaskModel task : outcome.tasksToBeScheduled) { executionDAOFacade.populateTaskData(task); if (systemTaskRegistry.isSystemTask(task.getTaskType()) @@ -1076,21 +1138,35 @@ public WorkflowModel decide(WorkflowModel workflow) { } } } + LOGGER.info( + "[Conductor] [WorkflowExecutor] [decide] 3rd loop outcome.tasksToBeScheduled Time taken for workflowInstanceId {} " + + "and stateChanged {} and time is :{}", + workflow.getWorkflowId(), stateChanged, (System.currentTimeMillis() - start3)); if (!outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty()) { executionDAOFacade.updateTasks(tasksToBeUpdated); } + LOGGER.info( + "[Conductor] [WorkflowExecutor] [decide] 4th scheduleTask and stateChanged Time taken for workflowInstanceId {} " + + "and stateChanged {} and time is :{}", + workflow.getWorkflowId(), stateChanged, (System.currentTimeMillis() - start2)); if (stateChanged) { return decide(workflow); } - - if (!outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty()) { + start2 = System.currentTimeMillis(); + boolean flag = !outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty(); + if (flag) { executionDAOFacade.updateWorkflow(workflow); } - + LOGGER.info( + "[Conductor] [WorkflowExecutor] [decide] 5th updateWorkflow and flag Time taken for workflowInstanceId {} " + + "and flag {} and time is :{}", + workflow.getWorkflowId(), flag, (System.currentTimeMillis() - start2)); + LOGGER.info( + "[Conductor] [WorkflowExecutor] [decide] final Whole decide complete Time taken for workflowInstanceId {} and time is :{}", + workflow.getWorkflowId(), (System.currentTimeMillis() - start1)); return workflow; - } catch (TerminateWorkflowException twe) { LOGGER.info("Execution terminated of workflow: {}", workflow, twe); terminate(workflow, twe); @@ -1443,6 +1519,7 @@ private long getTaskDuration(long s, TaskModel task) { @VisibleForTesting boolean scheduleTask(WorkflowModel workflow, List tasks) { + long start1 = System.currentTimeMillis(); List tasksToBeQueued; boolean startedSystemTasks = false; @@ -1466,8 +1543,12 @@ boolean scheduleTask(WorkflowModel workflow, List tasks) { workflow.getWorkflowName(), String.valueOf(workflow.getWorkflowVersion())); + long start2 = System.currentTimeMillis(); // Save the tasks in the DAO executionDAOFacade.createTasks(tasks); + LOGGER.info("[Conductor] [WorkflowExecutor] [decide] [scheduleTask] 1st createTasks Time taken for workflowInstanceId {} " + + "and tasks.size {} and time is :{}", workflow.getWorkflowId(), tasks.size(), + (System.currentTimeMillis() - start2)); List systemTasks = tasks.stream() @@ -1506,7 +1587,12 @@ boolean scheduleTask(WorkflowModel workflow, List tasks) { throw new NonTransientException(errorMsg, e); } startedSystemTasks = true; + start2 = System.currentTimeMillis(); + // Save the tasks in the DAO executionDAOFacade.updateTask(task); + LOGGER.info("[Conductor] [WorkflowExecutor] [decide] [scheduleTask] 2nd updateTask Time taken for workflowInstanceId {} " + + "and tasks.size {} and time is :{}", workflow.getWorkflowId(), tasks.size(), + (System.currentTimeMillis() - start2)); } else { tasksToBeQueued.add(task); } @@ -1538,6 +1624,10 @@ boolean scheduleTask(WorkflowModel workflow, List tasks) { LOGGER.warn(errorMsg, e); Monitors.error(CLASS_NAME, "scheduleTask"); } + + LOGGER.info("[Conductor] [WorkflowExecutor] [decide] [scheduleTask] final scheduleTask Time taken for workflowInstanceId {} " + + "and tasksToBeQueued.size {} and time is :{}", workflow.getWorkflowId(), tasksToBeQueued.size(), + (System.currentTimeMillis() - start1)); return startedSystemTasks; } diff --git a/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java b/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java index 8e33cac29b..0698abf885 100644 --- a/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java @@ -52,6 +52,11 @@ public interface ExecutionDAO { */ void updateTask(TaskModel task); + /** + * @param tasks Task to be updated + */ + void updateTasksInBatch(List tasks); + /** * Checks if the number of tasks in progress for the given taskDef will exceed the limit if the * task is scheduled to be in progress (given to the worker or for system tasks start() method diff --git a/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java index 04ed1db55f..1bc831ba74 100644 --- a/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; diff --git a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java index 33902640d4..01a2867267 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java @@ -260,6 +260,18 @@ public void updateTask(TaskModel task) { } } + @Override + public void updateTasksInBatch(List tasks) { + TaskModel task = tasks.get(0); + LOGGER.debug( + "Workflow Task removed from TASKS_IN_PROGRESS_STATUS with tasksInProgressKey: {}, workflowId: {}, taskId: {}, taskType: {}, taskStatus: {} during updateTask", + nsKey(IN_PROGRESS_TASKS, task.getTaskDefName()), + task.getWorkflowInstanceId(), + task.getTaskId(), + task.getTaskType(), + task.getStatus().name()); + } + @Override public boolean exceedsLimit(TaskModel task) { Optional taskDefinition = task.getTaskDefinition(); diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java index dfb62a5ddc..3c547fe508 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java @@ -16,6 +16,7 @@ import java.util.Map; import java.util.Optional; +import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.ResponseEntity; @@ -82,9 +83,23 @@ public ResponseEntity> batchPoll( @PostMapping(produces = TEXT_PLAIN_VALUE) @Operation(summary = "Update a task") public String updateTask(@RequestBody TaskResult taskResult) { + LOGGER.info("Update a task {}", taskResult); + try { + String jsonString = new ObjectMapper().writeValueAsString(taskResult); + System.out.println(jsonString); + LOGGER.info("Update a task jsonString {}", jsonString); + } catch (Exception ex) { + } + long start = System.currentTimeMillis(); LOGGER.debug("Received updateTask for task: {},for workflowInstanceId {} and status {} ", taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus()); - return taskService.updateTask(taskResult); + String result = taskService.updateTask(taskResult); + LOGGER.info( + "[Conductor] [TaskResource] updateTask Time taken for task: {},for workflowInstanceId {} and status {} " + + "and workerId {} and time is :{}", + taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), taskResult.getWorkerId(), + (System.currentTimeMillis() - start)); + return result; } @PostMapping("/{taskId}/log") diff --git a/rest/src/main/java/com/netflix/conductor/rest/filter/CustomRequestFilter.java b/rest/src/main/java/com/netflix/conductor/rest/filter/CustomRequestFilter.java new file mode 100644 index 0000000000..f3adca1b9a --- /dev/null +++ b/rest/src/main/java/com/netflix/conductor/rest/filter/CustomRequestFilter.java @@ -0,0 +1,47 @@ +package com.netflix.conductor.rest.filter; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.annotation.WebFilter; +import java.io.IOException; +import javax.servlet.http.HttpServletRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@WebFilter(urlPatterns = "/*") +@Component +public class CustomRequestFilter + implements Filter { + private static final Logger LOGGER = LoggerFactory.getLogger(CustomRequestFilter.class); + + @Override + public void init(FilterConfig filterConfig) throws ServletException { + // Initialization logic if needed + } + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + HttpServletRequest httpRequest = (HttpServletRequest) request; + + long startTime = System.currentTimeMillis(); + LOGGER.info("Incoming request: " + httpRequest.getMethod() + " " + httpRequest.getRequestURI()); + + chain.doFilter(request, response); + + long endTime = System.currentTimeMillis(); + LOGGER.info("Request for " + httpRequest.getRequestURI() + " took " + (endTime - startTime) + " ms."); + } + + @Override + public void destroy() { + // Cleanup logic if needed + } +} + diff --git a/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java b/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java index 0e85dd54b6..999c8b753c 100644 --- a/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java +++ b/scylla-persistence/src/main/java/com/netflix/conductor/scylla/dao/ScyllaExecutionDAO.java @@ -14,6 +14,7 @@ import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.DriverException; +import com.datastax.driver.core.querybuilder.QueryBuilder; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -237,6 +238,7 @@ public List getTasks(String taskType, String startKey, int count) { */ @Override public List createTasks(List tasks) { + long start = System.currentTimeMillis(); validateTasks(tasks); String workflowId = tasks.get(0).getWorkflowInstanceId(); String corelationId = tasks.get(0).getCorrelationId(); @@ -247,23 +249,34 @@ public List createTasks(List tasks) { int totalTasks = workflowMetadata.getTotalTasks() + tasks.size(); // update the task_lookup table // update the workflow_lookup table - LOGGER.debug("Create tasks list {} for workflowId {} ",tasks.stream() - .map(TaskModel::getReferenceTaskName).collect(Collectors.toList()),workflowId); + LOGGER.info("[Conductor] [WorkflowExecutor] [decide] [scheduleTask] [createTasks] Create tasks list {} for workflowId {} ", + tasks.stream().map(TaskModel::getReferenceTaskName).collect(Collectors.toList()), workflowId); + long start1 = System.currentTimeMillis(); tasks.forEach( task -> { + long start2 = System.currentTimeMillis(); if (task.getScheduledTime() == 0) { task.setScheduledTime(System.currentTimeMillis()); } session.execute( updateTaskLookupStatement.bind( workflowUUID, correlationId, toUUID(task.getTaskId(), "Invalid task id"))); - session.execute( - updateWorkflowLookupStatement.bind( - correlationId, workflowUUID)); + LOGGER.info("[Conductor] [WorkflowExecutor] [decide] [scheduleTask] [createTasks] [task_lookup 1] for workflowId {} time {}", + workflowId, (System.currentTimeMillis() - start2)); + session.execute(updateWorkflowLookupStatement.bind(correlationId, workflowUUID)); + LOGGER.info("[Conductor] [WorkflowExecutor] [decide] [scheduleTask] [createTasks] [workflow_lookup 2] for workflowId {} time {}", + workflowId, (System.currentTimeMillis() - start2)); // Added the task to task_in_progress table addTaskInProgress(task); + LOGGER.info( + "[Conductor] [WorkflowExecutor] [decide] [scheduleTask] [createTasks] [add task in progress] for workflowId {} time {}", + workflowId, (System.currentTimeMillis() - start2)); }); - + LOGGER.info( + "[Conductor] [WorkflowExecutor] [decide] [scheduleTask] [createTasks] 1st createTasks Time taken for workflowInstanceId {} " + + "and tasks.size {} and time is :{}", workflowId, tasks.size(), (System.currentTimeMillis() - start1)); + + start1 = System.currentTimeMillis(); // update all the tasks in the workflow using batch BatchStatement batchStatement = new BatchStatement(); tasks.forEach( @@ -290,11 +303,19 @@ public List createTasks(List tasks) { batchStatement.add( updateTotalTasksStatement.bind(totalTasks, workflowUUID, correlationId)); session.execute(batchStatement); + LOGGER.info( + "[Conductor] [WorkflowExecutor] [decide] [scheduleTask] [createTasks] 2nd updateTotalTasksStatement Time taken for workflowInstanceId {} " + + "and tasks.size {} and time is :{}", workflowId, tasks.size(), (System.currentTimeMillis() - start1)); + start1 = System.currentTimeMillis(); // update the total tasks and partitions for the workflow - session.execute( - updateTotalPartitionsStatement.bind( - DEFAULT_TOTAL_PARTITIONS, totalTasks, workflowUUID, correlationId)); + session.execute(updateTotalPartitionsStatement.bind(DEFAULT_TOTAL_PARTITIONS, totalTasks, workflowUUID, correlationId)); + LOGGER.info( + "[Conductor] [WorkflowExecutor] [decide] [scheduleTask] [createTasks] 3rd updateTotalPartitionsStatement Time taken for workflowInstanceId {} " + + "and tasks.size {} and time is :{}", workflowId, tasks.size(), (System.currentTimeMillis() - start1)); + LOGGER.info( + "[Conductor] [WorkflowExecutor] [decide] [scheduleTask] [createTasks] final createTasks Time taken for workflowInstanceId {} " + + "and tasks.size {} and time is :{}", workflowId, tasks.size(), (System.currentTimeMillis() - start)); return tasks; } catch (DriverException e) { Monitors.error(CLASS_NAME, "createTasks"); @@ -310,22 +331,27 @@ public List createTasks(List tasks) { * @method to add the task_in_progress table with the status of the task if task is not already present */ public void addTaskInProgress(TaskModel task) { - ResultSet resultSet = - session.execute( - selectTaskInProgressStatement.bind(task.getTaskDefName(), - UUID.fromString(task.getTaskId()))); - if (resultSet.all().isEmpty() || resultSet.all().size()<1) { - session.execute( - insertTaskInProgressStatement.bind(task.getTaskDefName(), - UUID.fromString(task.getTaskId()), - UUID.fromString(task.getWorkflowInstanceId()), - true)); - } - else { - LOGGER.info("Task with defName {} and Id {} and status {} in addTaskInProgress NOT inserted as already exists " - ,task.getTaskDefName(), task.getTaskId(),task.getStatus()); + long start = System.currentTimeMillis(); + + // Insert task if it doesn't already exist using lightweight transaction + ResultSet resultSet = session.execute( + QueryBuilder.insertInto(properties.getKeyspace(), TABLE_TASK_IN_PROGRESS) + .value(TASK_DEF_NAME_KEY, task.getTaskDefName()) + .value(TASK_ID_KEY, UUID.fromString(task.getTaskId())) + .value(WORKFLOW_ID_KEY, UUID.fromString(task.getWorkflowInstanceId())) + .value(TASK_IN_PROG_STATUS_KEY, true) + .ifNotExists() // Ensures this is a lightweight transaction + .getQueryString() + ); + + // Log the time taken for the insert operation + LOGGER.info("[Conductor] [WorkflowExecutor] [decide] [scheduleTask] [createTasks] [task_in_progress] [insert task] for workflowId {} time {}", + task.getWorkflowInstanceId(), System.currentTimeMillis() - start); + + // Check if the insert succeeded (LWT returns if the insert happened or not) + if (!resultSet.wasApplied()) { + LOGGER.info("Task with defName {} and Id {} already exists, insert skipped.", task.getTaskDefName(), task.getTaskId()); } - } /** @@ -347,6 +373,7 @@ public void updateTaskInProgress(TaskModel task, boolean inProgress) { @Override public void updateTask(TaskModel task) { try { + long start = System.currentTimeMillis(); Integer correlationId = Objects.isNull(task.getCorrelationId()) ? 0 : Integer.parseInt(task.getCorrelationId()); String taskPayload = toJson(task); recordCassandraDaoRequests("updateTask", task.getTaskType(), task.getWorkflowType()); @@ -359,18 +386,27 @@ public void updateTask(TaskModel task) { prevTask.getStatus()); if (!prevTask.getStatus().equals(TaskModel.Status.COMPLETED)) { + long tstart = System.currentTimeMillis(); session.execute( insertTaskStatement.bind( UUID.fromString(task.getWorkflowInstanceId()), correlationId, task.getTaskId(), taskPayload)); + LOGGER.info("[Conductor] insertTaskStatement for task.getWorkflowInstanceId {} and task.getTaskId {} is {} ", + task.getWorkflowInstanceId(), task.getTaskId(), System.currentTimeMillis() - tstart); + LOGGER.debug("Updated updateTask for task {} with taskStatus {} with taskRefName {} for workflowId {} ", task.getTaskId(), task.getStatus(), task.getReferenceTaskName(), task.getWorkflowInstanceId()); } + long tstart = System.currentTimeMillis(); verifyTaskStatus(task); + LOGGER.info("[Conductor] verifyTaskStatus for task.getWorkflowInstanceId {} and task.getTaskId {} is {} ", + task.getWorkflowInstanceId(), task.getTaskId(), System.currentTimeMillis() - tstart); } redisLock.releaseLock(task.getTaskId()); + LOGGER.info("[Conductor] [ScyllaExecutionDAO] updateTask Time taken for task: {},for workflowInstanceId {} and status {} and time is :{}", + task.getTaskId(), task.getWorkflowInstanceId(), task.getStatus(), (System.currentTimeMillis() - start)); } catch (DriverException e) { Monitors.error(CLASS_NAME, "updateTask"); String errorMsg = @@ -382,6 +418,50 @@ public void updateTask(TaskModel task) { } } + @Override + public void updateTasksInBatch(List tasks) { + if (tasks == null || tasks.isEmpty()) { + return; // No tasks to process + } + + long startBatch = System.currentTimeMillis(); + + try { + // Acquire a global lock for batch update if needed + // Create a batch statement to execute multiple updates in one call + BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.LOGGED); + + // Loop through the list of tasks and prepare batch updates + for (TaskModel task : tasks) { + Integer correlationId = Objects.isNull(task.getCorrelationId()) ? 0 : Integer.parseInt(task.getCorrelationId()); + String taskPayload = toJson(task); + + // Add the update statement for each task to the batch + batchStatement.add( + insertTaskStatement.bind( + UUID.fromString(task.getWorkflowInstanceId()), + correlationId, + task.getTaskId(), + taskPayload + ) + ); + } + + // Execute the batch of updates + long tstart = System.currentTimeMillis(); + session.execute(batchStatement); + LOGGER.info("Batch execution of task updates completed in {} ms for {} tasks.", + (System.currentTimeMillis() - tstart), tasks.size()); + } catch (DriverException e) { + Monitors.error(CLASS_NAME, "updateTaskInBatch"); + String errorMsg = String.format("Error updating batch of tasks. Size: %d", tasks.size()); + LOGGER.error(errorMsg, e); + throw new TransientException(errorMsg, e); + } + LOGGER.info("[Conductor] [ScyllaExecutionDAO] Batch updateTask Time taken for {} tasks: {} ms", + tasks.size(), (System.currentTimeMillis() - startBatch)); + } + /** * @method to verify the task status and update the task_in_progress table * also removes if its a terminal task @@ -392,10 +472,16 @@ private void verifyTaskStatus(TaskModel task) { && task.getStatus().equals(TaskModel.Status.IN_PROGRESS); updateTaskInProgress( task, inProgress); if (task.getStatus().isTerminal()) { + long tstart = System.currentTimeMillis(); removeTaskFromLimit(task); removeTaskInProgress(task); + LOGGER.info("[Conductor] verifyTaskStatus isTerminal for task.getWorkflowInstanceId {} and task.getTaskId {} is {} ", + task.getWorkflowInstanceId(), task.getTaskId(), System.currentTimeMillis() - tstart); } else if (task.getStatus() == TaskModel.Status.IN_PROGRESS) { + long tstart1 = System.currentTimeMillis(); addTaskToLimit(task); + LOGGER.info("[Conductor] verifyTaskStatus addTaskToLimit for task.getWorkflowInstanceId {} and task.getTaskId {} is {} ", + task.getWorkflowInstanceId(), task.getTaskId(), System.currentTimeMillis() - tstart1); } } @@ -926,11 +1012,14 @@ public void addTaskToLimit(TaskModel task) { try { recordCassandraDaoRequests( "addTaskToLimit", task.getTaskType(), task.getWorkflowType()); + long tstart = System.currentTimeMillis(); session.execute( updateTaskDefLimitStatement.bind( UUID.fromString(task.getWorkflowInstanceId()), task.getTaskDefName(), UUID.fromString(task.getTaskId()))); + LOGGER.info("Conductor updateTaskDefLimitStatement for task.getWorkflowInstanceId {} and task.getTaskId {} is {} ", + task.getWorkflowInstanceId(), task.getTaskId(), System.currentTimeMillis() - tstart); } catch (DriverException e) { Monitors.error(CLASS_NAME, "addTaskToLimit"); String errorMsg = diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index 9774107f7a..681485658d 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -128,5 +128,5 @@ management.metrics.export.datadog.enabled=${conductor.metrics-datadog.enabled:fa management.metrics.export.datadog.api-key=${conductor.metrics-datadog.api-key:} logging.level.root=INFO -logging.level.com.datastax.driver.core.QueryLogger.NORMAL=TRACE -logging.level.com.datastax.driver.core.QueryLogger.SLOW=TRACE \ No newline at end of file +logging.level.com.datastax.driver.core.QueryLogger.NORMAL=DEBUG +logging.level.com.datastax.driver.core.QueryLogger.SLOW=DEBUG \ No newline at end of file