Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lt config load #40

Draft
wants to merge 33 commits into
base: nw_metrics
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9c29ea1
test
prabhakaranE6157 Sep 10, 2024
878957e
test
prabhakaranE6157 Sep 10, 2024
810972e
info
prabhakaranE6157 Sep 12, 2024
018aec1
info
prabhakaranE6157 Sep 12, 2024
243b188
info
prabhakaranE6157 Sep 12, 2024
d6b1a14
info
prabhakaranE6157 Sep 12, 2024
19f7b80
info
prabhakaranE6157 Sep 12, 2024
9d98729
t
prabhakaranE6157 Sep 23, 2024
e0784cc
t
prabhakaranE6157 Sep 23, 2024
f7cd5f6
t
prabhakaranE6157 Sep 23, 2024
9767316
t
prabhakaranE6157 Sep 23, 2024
5ae2839
Merge branch 'freshservice_staging' into lt_config_load
prabhakaranE6157 Sep 25, 2024
109dcb0
test
prabhakaranE6157 Sep 26, 2024
6ba50ca
log
prabhakaranE6157 Sep 28, 2024
9364009
Empty-Commit
prabhakaranE6157 Sep 28, 2024
9f9fa3f
t
prabhakaranE6157 Sep 28, 2024
0cd6663
t
prabhakaranE6157 Sep 28, 2024
717337c
t
prabhakaranE6157 Sep 28, 2024
ea42eb9
t
prabhakaranE6157 Oct 1, 2024
389cbf1
updated
kartik2k Oct 1, 2024
7dd25f6
updated
kartik2k Oct 1, 2024
e46b513
t
prabhakaranE6157 Oct 1, 2024
d5ec256
t
prabhakaranE6157 Oct 2, 2024
b88faa3
update scylla query
kartik2k Oct 3, 2024
b2d6607
Merge branch 'lt_config_load' of https://github.com/freshdesk/conduct…
kartik2k Oct 3, 2024
2535518
t
prabhakaranE6157 Oct 3, 2024
88efaad
updated
kartik2k Oct 3, 2024
6a3a7f5
remove scylla changes
kartik2k Oct 3, 2024
23a7f8f
t
prabhakaranE6157 Oct 3, 2024
a8b42aa
updated
kartik2k Oct 3, 2024
e36bc21
updated
kartik2k Oct 3, 2024
5f24e55
improved query
kartik2k Oct 3, 2024
a208d81
Merge branch 'nw_metrics' into lt_config_load
prabhakaranE6157 Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,14 @@ public void updateTask(TaskModel task) {
}
}

@Override
public void updateTasksInBatch(List<TaskModel> tasks) {
TaskModel task = tasks.get(0);
LOGGER.debug(
"Workflow Task removed from TASKS_IN_PROGRESS_STATUS with tasksInProgressKey: {}, workflowId: {}, taskId: {}, taskType: {}, taskStatus: {} during updateTask",
task.getTaskDefName(), task.getWorkflowInstanceId(), task.getTaskId(), task.getTaskType(), task.getStatus().name());
}

/**
* @method to verify the task status and update the task_in_progress table
* also removes if its a terminal task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void pollAndExecute(Worker worker) {
for (Task task : tasks) {
if (Objects.nonNull(task) && StringUtils.isNotBlank(task.getTaskId())) {
MetricsContainer.incrementTaskPollCount(taskType, 1);
LOGGER.debug(
LOGGER.info(
"Polled task: {} of type: {} in domain: '{}', from worker: {}",
task.getTaskId(),
taskType,
Expand Down Expand Up @@ -236,6 +236,7 @@ void shutdownAndAwaitTermination(ExecutorService executorService, int timeout) {
};

private Task processTask(Task task, Worker worker, PollingSemaphore pollingSemaphore) {
long start = System.currentTimeMillis();
LOGGER.debug(
"Executing task: {} of type: {} in worker: {} at {}",
task.getTaskId(),
Expand All @@ -251,10 +252,13 @@ private Task processTask(Task task, Worker worker, PollingSemaphore pollingSemap
} finally {
pollingSemaphore.complete(1);
}
LOGGER.info("[Conductor] time taken by task {} for WF {} in processTask is {}", task.getTaskId(), task.getWorkflowInstanceId(),
(System.currentTimeMillis() - start));
return task;
}

private void executeTask(Worker worker, Task task) {
long start = System.currentTimeMillis();
StopWatch stopwatch = new StopWatch();
stopwatch.start();
TaskResult result = null;
Expand Down Expand Up @@ -292,6 +296,8 @@ private void executeTask(Worker worker, Task task) {
worker.getIdentity(),
result.getStatus());
updateTaskResult(updateRetryCount, task, result, worker);
LOGGER.info("[Conductor] time taken by task {} for WF {} in executeTask is {}", task.getTaskId(), task.getWorkflowInstanceId(),
(System.currentTimeMillis() - start));
}

private void finalizeTask(Task task, Throwable throwable) {
Expand Down Expand Up @@ -334,7 +340,11 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo

retryOperation(
(TaskResult taskResult) -> {
long start = System.currentTimeMillis();
taskClient.updateTask(taskResult);
LOGGER.info("[Conductor] time taken by task {} for WF {} in updateTaskResult is {}", task.getTaskId(),
task.getWorkflowInstanceId(),
(System.currentTimeMillis() - start));
return null;
},
count,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,12 @@ private void populateTaskPayloads(Task task) {
* @param taskResult the {@link TaskResult} of the executed task to be updated.
*/
public void updateTask(TaskResult taskResult) {
long start = System.currentTimeMillis();
Validate.notNull(taskResult, "Task result cannot be null");
postForEntityWithRequestOnly("tasks", taskResult);
LOGGER.info("[Conductor] time taken by task {} for WF {} in updateTask is {}", taskResult.getTaskId(),
taskResult.getWorkflowInstanceId(),
(System.currentTimeMillis() - start));
}

public Optional<String> evaluateAndUploadLargePayload(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ public long getInProgressTaskCount(String taskDefName) {
* payload fails.
*/
public void updateTask(TaskModel taskModel) {
long start = System.currentTimeMillis();
if (taskModel.getStatus() != null) {
if (!taskModel.getStatus().isTerminal()
|| (taskModel.getStatus().isTerminal() && taskModel.getUpdateTime() == 0)) {
Expand Down Expand Up @@ -524,10 +525,13 @@ public void updateTask(TaskModel taskModel) {
LOGGER.error(errorMsg, e);
throw new TransientException(errorMsg, e);
}
LOGGER.info("[Conductor] [ExecutionDAOFacade] updateTask Time taken for task: {},for workflowInstanceId {} and status {} and time is :{}",
taskModel.getTaskId(), taskModel.getWorkflowInstanceId(), taskModel.getStatus(), (System.currentTimeMillis() - start));
}

public void updateTasks(List<TaskModel> tasks) {
tasks.forEach(this::updateTask);
executionDAO.updateTasksInBatch(tasks);
//tasks.forEach(this::updateTask);
}

public void removeTask(String taskId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.core.execution;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -705,6 +706,7 @@ public WorkflowModel terminateWorkflow(
* @throws NotFoundException if the Task is not found.
*/
public void updateTask(TaskResult taskResult) {
long start = System.currentTimeMillis();
if (taskResult == null) {
throw new IllegalArgumentException("Task object is null");
} else if (taskResult.isExtendLease()) {
Expand All @@ -714,14 +716,42 @@ public void updateTask(TaskResult taskResult) {

String workflowId = taskResult.getWorkflowInstanceId();
WorkflowModel workflowInstance = executionDAOFacade.getWorkflowModel(workflowId, false);
LOGGER.info(
"[Conductor] [WorkflowExecutor] workflowInstance Time taken for task: {},for workflowInstanceId {} and status {} "
+ "and workerId {} and time is :{}",
taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), taskResult.getWorkerId(),
(System.currentTimeMillis() - start));

long start1 = System.currentTimeMillis();
List<TaskModel> tasks = workflowInstance.getTasks();
TaskModel task = null;
for (TaskModel t : tasks) {
if (t.getTaskId().equalsIgnoreCase(taskResult.getTaskId())) {
task = t;
}
LOGGER.info(
"[Conductor] [WorkflowExecutor] getTaskModel Time taken for task: {},for workflowInstanceId {} and status {} "
+ "and workerId {} and time is :{}",
taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), taskResult.getWorkerId(),
(System.currentTimeMillis() - start1));
}
if (task == null) {
throw new NotFoundException("No such task found by id: %s", taskResult.getTaskId());
}

/*long start1 = System.currentTimeMillis();
TaskModel task =
Optional.ofNullable(executionDAOFacade.getTaskModel(taskResult.getTaskId()))
.orElseThrow(
() ->
new NotFoundException(
"No such task found by id: %s",
taskResult.getTaskId()));
LOGGER.info(
"[Conductor] [WorkflowExecutor] getTaskModel Time taken for task: {},for workflowInstanceId {} and status {} "
+ "and workerId {} and time is :{}",
taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), taskResult.getWorkerId(),
(System.currentTimeMillis() - start1));*/

LOGGER.debug("WE updateTask: taskId {} with taskStatus {} belonging to workflowId {} being updated with workflowStatus {}",
task.getTaskId(), task.getStatus(), workflowInstance, workflowInstance.getStatus());
Expand Down Expand Up @@ -838,9 +868,15 @@ public void updateTask(TaskResult taskResult) {

// Throw a TransientException if below operations fail to avoid workflow inconsistencies.
try {
long start2 = System.currentTimeMillis();
LOGGER.debug("WE calling SED.updateTask: taskId {} with taskStatus {} belonging to workflowId {} and workflowStatus {}",
task.getTaskId(), task.getStatus(), workflowInstance, workflowInstance.getStatus());
executionDAOFacade.updateTask(task);
LOGGER.info(
"[Conductor] [WorkflowExecutor] internal updateTask Time taken for task: {},for workflowInstanceId {} and status {} "
+ "and workerId {} and time is :{}",
taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), taskResult.getWorkerId(),
(System.currentTimeMillis() - start2));
} catch (Exception e) {
String errorMsg =
String.format(
Expand All @@ -861,7 +897,10 @@ public void updateTask(TaskResult taskResult) {
LOGGER.error(errorMsg, e);
}

taskResult.getLogs().forEach(taskExecLog -> taskExecLog.setTaskId(task.getTaskId()));
for(TaskExecLog taskExecLog : taskResult.getLogs()) {
taskExecLog.setTaskId(task.getTaskId());
}
//taskResult.getLogs().forEach(taskExecLog -> taskExecLog.setTaskId(task.getTaskId()));
executionDAOFacade.addTaskExecLog(taskResult.getLogs());

if (task.getStatus().isTerminal()) {
Expand All @@ -873,9 +912,19 @@ public void updateTask(TaskResult taskResult) {
task.getTaskDefName(), lastDuration, false, task.getStatus());
}

long start3 = System.currentTimeMillis();
if (!isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) {
decide(workflowId);
//decide(workflowId);
decide(workflowInstance);
}
LOGGER.info(
"[Conductor] [WorkflowExecutor] decide Time taken for task: {},for workflowInstanceId {} and status {} "
+ "and workerId {} and time is :{}",
taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), taskResult.getWorkerId(),
(System.currentTimeMillis() - start3));
LOGGER.info(
"[Conductor] [WorkflowExecutor] complete updateTask Time taken for task: {},for workflowInstanceId {} and status {} and time is :{}",
taskResult.getTaskId(), taskResult.getWorkflowInstanceId(), taskResult.getStatus(), (System.currentTimeMillis() - start));
}

private void notifyTaskStatusListener(TaskModel task) {
Expand Down Expand Up @@ -1037,6 +1086,9 @@ public WorkflowModel decide(String workflowId) {
* No locking is required or lock is acquired externally
*/
public WorkflowModel decide(WorkflowModel workflow) {
long start1 = System.currentTimeMillis();

long start2 = System.currentTimeMillis();
if (workflow.getStatus().isTerminal()) {
if (!workflow.getStatus().isSuccessful()) {
cancelNonTerminalTasks(workflow);
Expand All @@ -1047,7 +1099,6 @@ public WorkflowModel decide(WorkflowModel workflow) {
// we find any sub workflow tasks that have changed
// and change the workflow/task state accordingly
adjustStateIfSubWorkflowChanged(workflow);

try {
DeciderService.DeciderOutcome outcome = deciderService.decide(workflow);
if (outcome.isComplete) {
Expand All @@ -1061,8 +1112,19 @@ public WorkflowModel decide(WorkflowModel workflow) {

tasksToBeScheduled = dedupAndAddTasks(workflow, tasksToBeScheduled);

LOGGER.info(
"[Conductor] [WorkflowExecutor] [decide] 1st adjustStateIfSubWorkflowChanged Time taken for workflowInstanceId {} "
+ "and outcome.isComplete {} and time is :{}",
workflow.getWorkflowId(), outcome.isComplete, (System.currentTimeMillis() - start2));

start2 = System.currentTimeMillis();
boolean stateChanged = scheduleTask(workflow, tasksToBeScheduled); // start
LOGGER.info(
"[Conductor] [WorkflowExecutor] [decide] 2nd find scheduleTask stateChanged Time taken for workflowInstanceId {} "
+ "and stateChanged {} and time is :{}",
workflow.getWorkflowId(), stateChanged, (System.currentTimeMillis() - start2));

long start3 = System.currentTimeMillis();
for (TaskModel task : outcome.tasksToBeScheduled) {
executionDAOFacade.populateTaskData(task);
if (systemTaskRegistry.isSystemTask(task.getTaskType())
Expand All @@ -1076,21 +1138,35 @@ public WorkflowModel decide(WorkflowModel workflow) {
}
}
}
LOGGER.info(
"[Conductor] [WorkflowExecutor] [decide] 3rd loop outcome.tasksToBeScheduled Time taken for workflowInstanceId {} "
+ "and stateChanged {} and time is :{}",
workflow.getWorkflowId(), stateChanged, (System.currentTimeMillis() - start3));

if (!outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty()) {
executionDAOFacade.updateTasks(tasksToBeUpdated);
}
LOGGER.info(
"[Conductor] [WorkflowExecutor] [decide] 4th scheduleTask and stateChanged Time taken for workflowInstanceId {} "
+ "and stateChanged {} and time is :{}",
workflow.getWorkflowId(), stateChanged, (System.currentTimeMillis() - start2));

if (stateChanged) {
return decide(workflow);
}

if (!outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty()) {
start2 = System.currentTimeMillis();
boolean flag = !outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty();
if (flag) {
executionDAOFacade.updateWorkflow(workflow);
}

LOGGER.info(
"[Conductor] [WorkflowExecutor] [decide] 5th updateWorkflow and flag Time taken for workflowInstanceId {} "
+ "and flag {} and time is :{}",
workflow.getWorkflowId(), flag, (System.currentTimeMillis() - start2));
LOGGER.info(
"[Conductor] [WorkflowExecutor] [decide] final Whole decide complete Time taken for workflowInstanceId {} and time is :{}",
workflow.getWorkflowId(), (System.currentTimeMillis() - start1));
return workflow;

} catch (TerminateWorkflowException twe) {
LOGGER.info("Execution terminated of workflow: {}", workflow, twe);
terminate(workflow, twe);
Expand Down Expand Up @@ -1443,6 +1519,7 @@ private long getTaskDuration(long s, TaskModel task) {

@VisibleForTesting
boolean scheduleTask(WorkflowModel workflow, List<TaskModel> tasks) {
long start1 = System.currentTimeMillis();
List<TaskModel> tasksToBeQueued;
boolean startedSystemTasks = false;

Expand All @@ -1466,8 +1543,12 @@ boolean scheduleTask(WorkflowModel workflow, List<TaskModel> tasks) {
workflow.getWorkflowName(),
String.valueOf(workflow.getWorkflowVersion()));

long start2 = System.currentTimeMillis();
// Save the tasks in the DAO
executionDAOFacade.createTasks(tasks);
LOGGER.info("[Conductor] [WorkflowExecutor] [decide] [scheduleTask] 1st createTasks Time taken for workflowInstanceId {} " +
"and tasks.size {} and time is :{}", workflow.getWorkflowId(), tasks.size(),
(System.currentTimeMillis() - start2));

List<TaskModel> systemTasks =
tasks.stream()
Expand Down Expand Up @@ -1506,7 +1587,12 @@ boolean scheduleTask(WorkflowModel workflow, List<TaskModel> tasks) {
throw new NonTransientException(errorMsg, e);
}
startedSystemTasks = true;
start2 = System.currentTimeMillis();
// Save the tasks in the DAO
executionDAOFacade.updateTask(task);
LOGGER.info("[Conductor] [WorkflowExecutor] [decide] [scheduleTask] 2nd updateTask Time taken for workflowInstanceId {} " +
"and tasks.size {} and time is :{}", workflow.getWorkflowId(), tasks.size(),
(System.currentTimeMillis() - start2));
} else {
tasksToBeQueued.add(task);
}
Expand Down Expand Up @@ -1538,6 +1624,10 @@ boolean scheduleTask(WorkflowModel workflow, List<TaskModel> tasks) {
LOGGER.warn(errorMsg, e);
Monitors.error(CLASS_NAME, "scheduleTask");
}

LOGGER.info("[Conductor] [WorkflowExecutor] [decide] [scheduleTask] final scheduleTask Time taken for workflowInstanceId {} " +
"and tasksToBeQueued.size {} and time is :{}", workflow.getWorkflowId(), tasksToBeQueued.size(),
(System.currentTimeMillis() - start1));
return startedSystemTasks;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public interface ExecutionDAO {
*/
void updateTask(TaskModel task);

/**
* @param tasks Task to be updated
*/
void updateTasksInBatch(List<TaskModel> tasks);

/**
* Checks if the number of tasks in progress for the given taskDef will exceed the limit if the
* task is scheduled to be in progress (given to the worker or for system tasks start() method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,18 @@ public void updateTask(TaskModel task) {
}
}

@Override
public void updateTasksInBatch(List<TaskModel> tasks) {
TaskModel task = tasks.get(0);
LOGGER.debug(
"Workflow Task removed from TASKS_IN_PROGRESS_STATUS with tasksInProgressKey: {}, workflowId: {}, taskId: {}, taskType: {}, taskStatus: {} during updateTask",
nsKey(IN_PROGRESS_TASKS, task.getTaskDefName()),
task.getWorkflowInstanceId(),
task.getTaskId(),
task.getTaskType(),
task.getStatus().name());
}

@Override
public boolean exceedsLimit(TaskModel task) {
Optional<TaskDef> taskDefinition = task.getTaskDefinition();
Expand Down
Loading