Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staging debug #24

Open
wants to merge 28 commits into
base: freshservice_staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a356c6a
disabled reconciler
ab48917 Mar 22, 2024
64624d6
debug logs
prabhakaranE6157 Mar 27, 2024
a82a9e2
Merge branch 'conductorRecon' into staging_debug
prabhakaranE6157 Mar 27, 2024
839af83
debug logs
prabhakaranE6157 Apr 2, 2024
7352690
debug logs
prabhakaranE6157 Apr 2, 2024
9033983
debug logs
prabhakaranE6157 Apr 2, 2024
9055e98
added logger
ab48917 Apr 2, 2024
10843cc
handle pod level race conditions for workflow status update
ab48917 Apr 2, 2024
c84fb85
handle pod level race condition
ab48917 Apr 2, 2024
8d9b4c2
removed debug log
ab48917 May 9, 2024
0ebf41d
Merge branch 'freshservice_staging' into staging_debug
prabhakaranE6157 May 10, 2024
612b678
disabled workflow monitor
ab48917 May 10, 2024
5554377
Merge branch 'staging_debug' of github.com:freshdesk/conductor into s…
ab48917 May 10, 2024
8056b38
separated bg and main pod configurations
ab48917 May 14, 2024
7b8bd8b
fix
keerthivaasan-kanagaraj May 17, 2024
e15eaa7
added log for decider filters and disabled Workflow pending state mon…
ab48917 May 20, 2024
f81d0d6
pulling file from physical loc
ab48917 May 21, 2024
689967a
[FS-172266] : Fixing conductor prom metrics (#20)
keerthivaasan-kanagaraj May 21, 2024
4fda09a
pushed config
ab48917 May 22, 2024
6b81f1a
Merge branch 'staging_debug' of github.com:freshdesk/conductor into s…
ab48917 May 22, 2024
37acbdb
updated lt config
ab48917 May 22, 2024
8aaba0c
added ttl
ab48917 May 24, 2024
8c5514c
added logger
ab48917 May 25, 2024
bece9c7
added logger
ab48917 May 25, 2024
c27d889
added check for duplicate task instance
ab48917 May 25, 2024
56876f8
added check
ab48917 May 25, 2024
5d794d8
addded instance check
ab48917 May 25, 2024
f3aa842
hz scale
prabhakaranE6157 Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ public String updateWorkflow(WorkflowModel workflowModel) {
workflowModel.setEndTime(System.currentTimeMillis());
}
externalizeWorkflowData(workflowModel);
LOGGER.debug(
"Updating workflow: {} in the ExecutionFacadeDao with status {} ",
workflowModel.getWorkflowId(),
workflowModel.getStatus());
executionDAO.updateWorkflow(workflowModel);
if (properties.isAsyncIndexingEnabled()) {
if (workflowModel.getStatus().isTerminal()
Expand Down Expand Up @@ -322,6 +326,10 @@ public String updateWorkflow(WorkflowModel workflowModel) {
} else {
indexDAO.indexWorkflow(new WorkflowSummary(workflowModel.toWorkflow()));
}
LOGGER.debug(
"Updated workflow: {} in the ExecutionFacadeDao with status {} ",
workflowModel.getWorkflowId(),
workflowModel.getStatus());
return workflowModel.getWorkflowId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ public List<TaskModel> getTasksToBeScheduled(
|| runningTask.getStatus().isTerminal())
.map(TaskModel::getReferenceTaskName)
.collect(Collectors.toList());
LOGGER.info("DeciderService getTasksToBeScheduled tasksInWorkflow {}", tasksInWorkflow);

String taskId = idGenerator.generate();
TaskMapperContext taskMapperContext =
Expand All @@ -866,6 +867,12 @@ public List<TaskModel> getTasksToBeScheduled(
// fork.
// A new task must only be scheduled if a task, with the same reference name is not already
// in this workflow instance
LOGGER.info("DeciderService getTasksToBeScheduled filtered tasks by referenceName {}", taskMappers
.getOrDefault(type, taskMappers.get(USER_DEFINED.name()))
.getMappedTasks(taskMapperContext)
.stream()
.filter(task -> !tasksInWorkflow.contains(task.getReferenceTaskName()))
.collect(Collectors.toList()));
return taskMappers
.getOrDefault(type, taskMappers.get(USER_DEFINED.name()))
.getMappedTasks(taskMapperContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) {

executionLockService.releaseLock(workflow.getWorkflowId());
executionLockService.deleteLock(workflow.getWorkflowId());
LOGGER.debug("Completed workflow execution && released all the locks for {}", workflow.getWorkflowId());
return workflow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public void pollAndSweep() {

private void recordQueueDepth() {
int currentQueueSize = queueDAO.getSize(DECIDER_QUEUE);
LOGGER.debug(
"recordQueueDepth currentQueueSize processed from reconciler ",
DECIDER_QUEUE + "__" + currentQueueSize);
Monitors.recordGauge(DECIDER_QUEUE, currentQueueSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,15 @@ public void reportMetrics() {
refreshCounter = metadataRefreshInterval;
}

getPendingWorkflowToOwnerAppMap(workflowDefs)
.forEach(
(workflowName, ownerApp) -> {
long count =
executionDAOFacade.getPendingWorkflowCount(workflowName);
Monitors.recordRunningWorkflows(count, workflowName, ownerApp);
});
// Commented out as we don't have use case for Pending Workflows as of now and hence we don't have pending workflow implementation from
// scylla persistence.
// getPendingWorkflowToOwnerAppMap(workflowDefs)
// .forEach(
// (workflowName, ownerApp) -> {
// long count =
// executionDAOFacade.getPendingWorkflowCount(workflowName);
// Monitors.recordRunningWorkflows(count, workflowName, ownerApp);
// });

taskDefs.forEach(
taskDef -> {
Expand Down
8 changes: 8 additions & 0 deletions docker/server/config/config-conductor-server-bg.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Servers background workers configuration
conductor.system-task-workers.enabled=false
conductor.app.isolated-system-task-enabled=false
conductor.workflow-reconciler.enabled=true
conductor.workflow-monitor.enabled=true
conductor.workflow-repair-service.enabled=true
conductor.workflow-monitor.metadata-refresh-interval=200
conductor.sweep-frequency.millis=1000
6 changes: 6 additions & 0 deletions docker/server/config/config-conductor-server-main.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Servers background workers configuration
conductor.system-task-workers.enabled=true
conductor.app.isolated-system-task-enabled=true
conductor.workflow-reconciler.enabled=false
conductor.workflow-monitor.enabled=false
conductor.workflow-repair-service.enabled=false
25 changes: 6 additions & 19 deletions docker/server/config/config-conductor-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@ conductor.queue.type=redis_standalone
conductor.redis.hosts=${REDIS_HOST}:${REDIS_PORT}:${REDIS_RACK_HOSTED_ZONE}:${REDIS_PASSWORD}
conductor.redis.workflowNamespacePrefix=${REDIS_WORKFLOW_NS_PREFIX}
conductor.redis.queueNamespacePrefix=${REDIS_QUEUE_NS_PREFIX}
#conductor.redis.queuesNonQuorumPort=6379

conductor.app.stack=staging

# Database persistence type.
conductor.db.type=scylla

#spring.data.cassandra.keyspace-name=conductor
#spring.data.cassandra.contact-points=cassandra
#spring.data.cassandra.port=9042

conductor.scylla.hostAddress=${SCYLLA_HOST}
conductor.scylla.password=${SCYLLA_PASSWORD}
Expand All @@ -26,23 +22,14 @@ conductor.scylla.replicationFactorValue=${SCYLLA_REPLICATIONFACTOR}

conductor.workflow-status-listener.type=archive

conductor.system-task-workers.enabled=true
conductor.workflow-status-listener.enabled=false
conductor.default-event-processor.enabled=false
conductor.event-queues.default.enabled=false
conductor.default-event-queue-processor.enabled=true

# Hikari pool sizes are -1 by default and prevent startup
#spring.datasource.hikari.maximum-pool-size=10
#spring.datasource.hikari.minimum-idle=2
# Conditional configuration based on process-name(main/bg) property from k8s
spring.config.import=optional:classpath:/config-conductor-server-${PROCESS_NAME}.properties

# Elastic search instance indexing is enabled.
conductor.indexing.enabled=false

# Transport address to elasticsearch
#conductor.elasticsearch.url=http://es:9200

# Name of the elasticsearch cluster
#conductor.elasticsearch.indexName=conductor

# Load sample kitchen sink workflow
loadSample=false

# Pushing Prom Metrics
conductor.metrics-prometheus.enabled = true
Expand Down
2 changes: 0 additions & 2 deletions docker/server/config/config-scylla.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ conductor.scylla.username=scylla
conductor.scylla.password=scylla

conductor.workflow-status-listener.type=archive

conductor.system-task-workers.enabled=true

# Hikari pool sizes are -1 by default and prevent startup
#spring.datasource.hikari.maximum-pool-size=10
#spring.datasource.hikari.minimum-idle=2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,12 +542,29 @@ public String updateWorkflow(WorkflowModel workflow) {
recordCassandraDaoRequests("updateWorkflow", "n/a", workflow.getWorkflowName());
recordCassandraDaoPayloadSize(
"updateWorkflow", payload.length(), "n/a", workflow.getWorkflowName());
session.execute(
updateWorkflowStatement.bind(
payload, UUID.fromString(workflow.getWorkflowId()),correlationId));
WorkflowModel prevWorkflow = getWorkflow(workflow.getWorkflowId(), false);
LOGGER.debug(
"prevWorkflow status updated for workflow_id {} is {}",
prevWorkflow.getWorkflowId(),
prevWorkflow.getStatus());
// Added this change to update the workflow only if it is not completed, Rerun cannot be done on completed workflows
if (!prevWorkflow.getStatus().equals(WorkflowModel.Status.COMPLETED)) {
session.execute(
updateWorkflowStatement.bind(
payload, UUID.fromString(workflow.getWorkflowId()), correlationId));
} else {
LOGGER.info(
"Workflow {} is already completed. Not updating the workflow current status {}",
workflow.getWorkflowId(), workflow.getStatus());
return workflow.getWorkflowId();
}
LOGGER.debug(
"Workflow status updated for workflow_id {} is {}",
workflow.getWorkflowId(),
workflow.getStatus());
workflow.setTasks(tasks);
return workflow.getWorkflowId();
} catch (DriverException e) {
} catch (Exception e) {
Monitors.error(CLASS_NAME, "updateWorkflow");
String errorMsg =
String.format("Failed to update workflow: %s", workflow.getWorkflowId());
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,5 @@ conductor.workflow-execution-lock.type=noop_lock
# Additional modules for metrics collection exposed to Datadog (optional)
management.metrics.export.datadog.enabled=${conductor.metrics-datadog.enabled:false}
management.metrics.export.datadog.api-key=${conductor.metrics-datadog.api-key:}

#logging.level.root=DEBUG