From 35037bf29b8abfa036eb45cf365677b4ee9858ab Mon Sep 17 00:00:00 2001 From: s12f Date: Mon, 30 Oct 2023 14:35:40 +0800 Subject: [PATCH] feat(standalone-sink-elasticsearch): add connector_skipped_records metric --- .../io/standalone/StandaloneSinkTaskContext.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/java-toolkit/lib/src/main/java/io/hstream/io/standalone/StandaloneSinkTaskContext.java b/java-toolkit/lib/src/main/java/io/hstream/io/standalone/StandaloneSinkTaskContext.java index 02cccae..3856c44 100644 --- a/java-toolkit/lib/src/main/java/io/hstream/io/standalone/StandaloneSinkTaskContext.java +++ b/java-toolkit/lib/src/main/java/io/hstream/io/standalone/StandaloneSinkTaskContext.java @@ -46,6 +46,7 @@ public class StandaloneSinkTaskContext implements SinkTaskContext { Counter readBytesCounter; Counter deliveredRecordsCounter; Counter deliveredBytesCounter; + Counter skippedRecordsCounter; Histogram readLatency; Histogram deliveredLatency; String stream; @@ -222,6 +223,10 @@ void handleWithRetry(Consumer handler, SinkRecordBatch batch) { log.warn("delivery record failed:{}, tried:{}", e.getMessage(), count); if (!retryStrategy.showRetry(batch.getShardId(), e)) { if (sinkSkipStrategy.trySkipBatch(batch, e.getMessage())) { + if (enablePrometheusReport) { + skippedRecordsCounter.labelValues(stream, String.valueOf(batch.getShardId())) + .inc(batch.getSinkRecords().size()); + } return; } else { fail(); @@ -300,6 +305,12 @@ void setupPrometheusReport() { .labelNames("streamName", "shardId") .register(); + skippedRecordsCounter = Counter.builder() + .name("connector_skipped_records") + .help("skipped records") + .labelNames("streamName", "shardId") + .register(); + // latency readLatency = Histogram.builder() .name("connector_read_latency")