Skip to content

Commit

Permalink
Merge pull request #534 from AndreKurait/LoggingFix
Browse files Browse the repository at this point in the history
Fix logging and metrics for TrackingKafkaConsumer with partition assignment
  • Loading branch information
AndreKurait authored Mar 27, 2024
2 parents 5d165d7 + 8a05831 commit 0cbcdbc
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public TrackingKafkaConsumer(@NonNull RootReplayerContext globalContext,

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
if (partitions.isEmpty()) {
log.atDebug().setMessage(() -> this + " revoked no partitions.").log();
return;
}

new KafkaConsumerContexts.AsyncListeningContext(globalContext).onPartitionsRevoked(partitions);
synchronized (commitDataLock) {
safeCommit(globalContext::createCommitContext);
Expand All @@ -138,19 +143,24 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
kafkaRecordsLeftToCommitEventually.set(partitionToOffsetLifecycleTrackerMap.values().stream()
.mapToInt(OffsetLifecycleTracker::size).sum());
kafkaRecordsReadyToCommit.set(!nextSetOfCommitsMap.values().isEmpty());
log.atWarn().setMessage(() -> this + "partitions revoked for " + partitions.stream()
.map(p -> p + "").collect(Collectors.joining(","))).log();
log.atWarn().setMessage(() -> this + " partitions revoked for " + partitions.stream()
.map(String::valueOf).collect(Collectors.joining(","))).log();
}
}

@Override public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) {
if (newPartitions.isEmpty()) {
log.atInfo().setMessage(() -> this + " assigned no new partitions.").log();
return;
}

new KafkaConsumerContexts.AsyncListeningContext(globalContext).onPartitionsAssigned(newPartitions);
synchronized (commitDataLock) {
consumerConnectionGeneration.incrementAndGet();
newPartitions.forEach(p -> partitionToOffsetLifecycleTrackerMap.computeIfAbsent(p.partition(),
x -> new OffsetLifecycleTracker(consumerConnectionGeneration.get())));
log.atWarn().setMessage(() -> this + "partitions added for " + newPartitions.stream()
.map(p -> p + "").collect(Collectors.joining(","))).log();
log.atInfo().setMessage(() -> this + " partitions added for " + newPartitions.stream()
.map(String::valueOf).collect(Collectors.joining(","))).log();
}
}

Expand Down Expand Up @@ -199,11 +209,11 @@ private void pause() {
} catch (IllegalStateException e) {
log.atError().setCause(e).setMessage(()->"Unable to pause the topic partitions: " + topic + ". " +
"The active partitions passed here : " + activePartitions.stream()
.map(x->""+x).collect(Collectors.joining(",")) + ". " +
.map(String::valueOf).collect(Collectors.joining(",")) + ". " +
"The active partitions as tracked here are: " + getActivePartitions().stream()
.map(x->""+x).collect(Collectors.joining(",")) + ". " +
.map(String::valueOf).collect(Collectors.joining(",")) + ". " +
"The active partitions according to the consumer: " + kafkaConsumer.assignment().stream()
.map(x->""+x).collect(Collectors.joining(","))
.map(String::valueOf).collect(Collectors.joining(","))
).log();
}
}
Expand All @@ -217,11 +227,11 @@ private void resume() {
"This may not be a fatal error for the entire process as the consumer should eventually"
+ " rejoin and rebalance. " +
"The active partitions passed here : " + activePartitions.stream()
.map(x->""+x).collect(Collectors.joining(",")) + ". " +
.map(String::valueOf).collect(Collectors.joining(",")) + ". " +
"The active partitions as tracked here are: " + getActivePartitions().stream()
.map(x->""+x).collect(Collectors.joining(",")) + ". " +
.map(String::valueOf).collect(Collectors.joining(",")) + ". " +
"The active partitions according to the consumer: " + kafkaConsumer.assignment().stream()
.map(x->""+x).collect(Collectors.joining(","))
.map(String::valueOf).collect(Collectors.joining(","))
).log();
}
}
Expand Down Expand Up @@ -390,4 +400,17 @@ String nextCommitsToString() {
return "nextCommits="+nextSetOfCommitsMap.entrySet().stream()
.map(kvp->kvp.getKey()+"->"+kvp.getValue()).collect(Collectors.joining(","));
}

@Override
public String toString() {
synchronized (commitDataLock) {
int partitionCount = partitionToOffsetLifecycleTrackerMap.size();
int commitsPending = nextSetOfCommitsMap.size();
int recordsLeftToCommit = kafkaRecordsLeftToCommitEventually.get();
boolean recordsReadyToCommit = kafkaRecordsReadyToCommit.get();
return String.format("TrackingKafkaConsumer{topic='%s', partitionCount=%d, commitsPending=%d, " +
"recordsLeftToCommit=%d, recordsReadyToCommit=%b}",
topic, partitionCount, commitsPending, recordsLeftToCommit, recordsReadyToCommit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.migrations.tracing.CommonMetricInstruments;
import org.opensearch.migrations.tracing.CommonScopedMetricInstruments;
import org.opensearch.migrations.tracing.DirectNestedSpanContext;
import org.opensearch.migrations.tracing.IInstrumentationAttributes;
import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes;

import java.util.Collection;
Expand Down Expand Up @@ -59,15 +58,15 @@ private MetricInstruments(Meter meter) {

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
meterIncrementEvent(getMetrics().kafkaPartitionsRevokedCounter);
onParitionsAssignedChanged(partitions.size());
onPartitionsAssignedChanged(-1 * partitions.size());
}

public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
meterIncrementEvent(getMetrics().kafkaPartitionsAssignedCounter);
onParitionsAssignedChanged(partitions.size());
onPartitionsAssignedChanged(partitions.size());
}

private void onParitionsAssignedChanged(int delta) {
private void onPartitionsAssignedChanged(int delta) {
meterDeltaEvent(getMetrics().kafkaActivePartitionsCounter, delta);
}
}
Expand Down

0 comments on commit 0cbcdbc

Please sign in to comment.