Skip to content

Commit

Permalink
Merge branch 'nw_metrics' into update_api_query_improvement_v2
Browse files Browse the repository at this point in the history
  • Loading branch information
prabhakaranE6157 authored Oct 24, 2024
2 parents 7e2bf0d + d66f80b commit 8d669fd
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@ public void sweep(String workflowId) {
long workflowOffsetTimeout =
workflowOffsetWithJitter(properties.getWorkflowOffsetTimeout().getSeconds());
if (workflow != null) {
long startTime = Instant.now().toEpochMilli();
// long startTime = Instant.now().toEpochMilli();
unack(workflow, workflowOffsetTimeout);
long endTime = Instant.now().toEpochMilli();
Monitors.recordUnackTime(workflow.getWorkflowName(), endTime - startTime);
// long endTime = Instant.now().toEpochMilli();
// This metrics(recordUnackTime) has workflowName in our prod account we have more number of workflows.
// So we may get cardinality problems in haystack due to this.
// Disabling this temporarily.
// Monitors.recordUnackTime(workflow.getWorkflowName(), endTime - startTime);
} else {
LOGGER.warn(
"Workflow with {} id can not be found. Attempting to unack using the id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,51 +74,64 @@ public WorkflowMonitor(
}

@Scheduled(
initialDelayString = "${conductor.workflow-monitor.stats.initial-delay:120000}",
fixedDelayString = "${conductor.workflow-monitor.stats.delay:60000}")
initialDelayString = "${conductor.workflow-monitor.stats.initial-delay:30000}",
fixedDelayString = "${conductor.workflow-monitor.stats.delay:10000}")
public void reportMetrics() {
try {
if (refreshCounter <= 0) {
workflowDefs = metadataService.getWorkflowDefs();
LOGGER.info("reportMetrics starting");
if (refreshCounter <= 0) {
// workflowDefs = metadataService.getWorkflowDefs();
taskDefs = new ArrayList<>(metadataService.getTaskDefs());
refreshCounter = metadataRefreshInterval;
}

getPendingWorkflowToOwnerAppMap(workflowDefs)
.forEach(
(workflowName, ownerApp) -> {
long count =
executionDAOFacade.getPendingWorkflowCount(workflowName);
Monitors.recordRunningWorkflows(count, workflowName, ownerApp);
});

taskDefs.forEach(
taskDef -> {
}

// Commented out as we don't have use case for Pending Workflows as of now and hence we don't have pending workflow implementation from
// scylla persistence.
// try {
// getPendingWorkflowToOwnerAppMap(workflowDefs)
// .forEach(
// (workflowName, ownerApp) -> {
// long count =
// executionDAOFacade.getPendingWorkflowCount(workflowName);
// Monitors.recordRunningWorkflows(count, workflowName, ownerApp);
// });
// } catch (Exception e) {
// LOGGER.error("Error while publishing scheduled metrics", e);
// }

try {
LOGGER.info("reportMetrics starting metrics collection");
taskDefs.forEach(
taskDef -> {
long size = queueDAO.getSize(taskDef.getName());
long inProgressCount =
executionDAOFacade.getInProgressTaskCount(taskDef.getName());
// long inProgressCount =
// executionDAOFacade.getInProgressTaskCount(taskDef.getName());
LOGGER.info("reportMetrics starting each task {}", taskDef.getName());
Monitors.recordQueueDepth(taskDef.getName(), size, taskDef.getOwnerApp());
if (taskDef.concurrencyLimit() > 0) {
Monitors.recordTaskInProgress(
taskDef.getName(), inProgressCount, taskDef.getOwnerApp());
}
});

asyncSystemTasks.forEach(
workflowSystemTask -> {
// if (taskDef.concurrencyLimit() > 0) {
// Monitors.recordTaskInProgress(
// taskDef.getName(), inProgressCount, taskDef.getOwnerApp());
// }
});
} catch (Exception e) {
LOGGER.error("Error while publishing scheduled metrics", e);
}

try {
asyncSystemTasks.forEach(
workflowSystemTask -> {
long size = queueDAO.getSize(workflowSystemTask.getTaskType());
long inProgressCount =
executionDAOFacade.getInProgressTaskCount(
workflowSystemTask.getTaskType());
// long inProgressCount =
// executionDAOFacade.getInProgressTaskCount(
// workflowSystemTask.getTaskType());
LOGGER.info("reportMetrics starting each system task {}", workflowSystemTask.getTaskType());
Monitors.recordQueueDepth(workflowSystemTask.getTaskType(), size, "system");
Monitors.recordTaskInProgress(
workflowSystemTask.getTaskType(), inProgressCount, "system");
});

refreshCounter--;
// Monitors.recordTaskInProgress(
// workflowSystemTask.getTaskType(), inProgressCount, "system");
});
} catch (Exception e) {
LOGGER.error("Error while publishing scheduled metrics", e);
LOGGER.error("Error while publishing scheduled metrics", e);
}
// refreshCounter--;
}

/**
Expand Down
1 change: 1 addition & 0 deletions docker/server/FSDockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ RUN yum install -y procps shadow-utils mysql screen nc wget
#RUN yum install python3-pip python3-wheel python3-devel gcc -y
#RUN pip3 install scylla-cqlsh


# This Jar is used for profiling with java-agent
RUN wget https://github.com/grafana/pyroscope-java/releases/download/v0.14.0/pyroscope.jar -P /app/libs

Expand Down
3 changes: 2 additions & 1 deletion docker/server/config/config-conductor-server-lt.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ conductor.scylla.replicationStrategy=${SCYLLA_REPLICATION_STRATEGY}

# Workflow Execution configuration for cpu and memory limits
conductor.workflow-reconciler.enabled=true
conductor.workflow-monitor.enabled=false
conductor.workflow-monitor.enabled=true
conductor.workflow-repair-service.enabled=true

conductor.redis.queueShardingStrategy=${REDIS_QUEUE_SHARDING_STRATEGY}
Expand Down Expand Up @@ -63,6 +63,7 @@ conductor.metrics-prometheus.enabled = true
#Actuator Endpoints
management.endpoints.web.exposure.include=info,metrics,health,prometheus,httptrace
management.metrics.distribution.sla.http.server.requests=100ms,150ms,250ms,500ms,1s
management.metrics.distribution.percentiles-histogram.http.server.requests=true
management.metrics.enable.jvm=true
management.endpoint.metrics.enabled=true
management.endpoint.prometheus.enabled=true
Expand Down
3 changes: 2 additions & 1 deletion docker/server/config/config-conductor-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ conductor.indexing.enabled=false

# Workflow Execution configuration for cpu and memory limits
conductor.workflow-reconciler.enabled=true
conductor.workflow-monitor.enabled=false
conductor.workflow-monitor.enabled=true
conductor.workflow-repair-service.enabled=true
conductor.workflow-scylla-execution-lock.enabled=true
conductor.redis-lock.serverAddress=redis://${REDIS_HOST}:${REDIS_PORT}
Expand All @@ -38,6 +38,7 @@ conductor.metrics-prometheus.enabled = true
#Actuator Endpoints
management.endpoints.web.exposure.include=info,metrics,health,prometheus,httptrace
management.metrics.distribution.sla.http.server.requests=100ms,150ms,250ms,500ms,1s
management.metrics.distribution.percentiles-histogram.http.server.requests=true
management.metrics.enable.jvm=true
management.endpoint.metrics.enabled=true
management.endpoint.prometheus.enabled=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,20 +248,20 @@ <T> T readValue(String json, Class<T> clazz) {
}

void recordCassandraDaoRequests(String action) {
recordCassandraDaoRequests(action, "n/a", "n/a");
// recordCassandraDaoRequests(action, "n/a", "n/a");
}

void recordCassandraDaoRequests(String action, String taskType, String workflowType) {
Monitors.recordDaoRequests(DAO_NAME, action, taskType, workflowType);
// Monitors.recordDaoRequests(DAO_NAME, action, taskType, workflowType);
}

void recordCassandraDaoEventRequests(String action, String event) {
Monitors.recordDaoEventRequests(DAO_NAME, action, event);
// Monitors.recordDaoEventRequests(DAO_NAME, action, event);
}

void recordCassandraDaoPayloadSize(
String action, int size, String taskType, String workflowType) {
Monitors.recordDaoPayloadSize(DAO_NAME, action, taskType, workflowType, size);
// Monitors.recordDaoPayloadSize(DAO_NAME, action, taskType, workflowType, size);
}

static class WorkflowMetadata {
Expand Down
1 change: 1 addition & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies {
implementation 'com.netflix.conductor:conductor-kafka:3.13.3'

implementation 'io.micrometer:micrometer-registry-prometheus:1.11.0'
implementation "com.netflix.spectator:spectator-reg-micrometer:${revSpectator}"

implementation "io.opentelemetry:opentelemetry-extension-annotations:1.18.0"
implementation "io.opentelemetry:opentelemetry-api:1.35.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.netflix.conductor.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;

import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import com.netflix.spectator.micrometer.MicrometerRegistry;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.micrometer.prometheus.PrometheusRenameFilter;
import io.prometheus.client.CollectorRegistry;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Metrics;

// This class loads all the configurations related to prometheus.
@Configuration
public class PrometheusIntegrationConfig
implements CommandLineRunner {

private static final Logger log = LoggerFactory.getLogger(PrometheusIntegrationConfig.class);
private static PrometheusMeterRegistry prometheusRegistry;

public PrometheusIntegrationConfig(PrometheusMeterRegistry prometheusRegistry) {
this.prometheusRegistry = prometheusRegistry;
}

@Override
public void run(String... args) throws Exception {
setupPrometheusRegistry();
}

/**
* To Register PrometheusRegistry
*/
private static void setupPrometheusRegistry() {
log.info("Registered PrometheusRegistry");
final MicrometerRegistry metricsRegistry = new MicrometerRegistry(prometheusRegistry);
prometheusRegistry.config().meterFilter(new PrometheusRenameFilter());
Spectator.globalRegistry().add(metricsRegistry);
}

}

0 comments on commit 8d669fd

Please sign in to comment.