diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java index 048172bea1..1b7f591f24 100644 --- a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java @@ -5,9 +5,10 @@ package org.opensearch.dataprepper; +import com.google.protobuf.Any; import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.common.grpc.GrpcStatusFunction; +import com.linecorp.armeria.common.grpc.GoogleGrpcExceptionHandlerFunction; import com.linecorp.armeria.server.RequestTimeoutException; import io.grpc.Metadata; import io.grpc.Status; @@ -22,9 +23,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.concurrent.TimeoutException; -public class GrpcRequestExceptionHandler implements GrpcStatusFunction { +public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFunction { private static final Logger LOG = LoggerFactory.getLogger(GrpcRequestExceptionHandler.class); static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full."; @@ -37,53 +39,57 @@ public class GrpcRequestExceptionHandler implements GrpcStatusFunction { private final Counter badRequestsCounter; private final Counter requestsTooLargeCounter; private final Counter internalServerErrorCounter; + private final GrpcRetryInfoCalculator retryInfoCalculator; - public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) { + public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics, Duration retryInfoMinDelay, Duration retryInfoMaxDelay) { requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS); badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS); requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE); internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR); + retryInfoCalculator = new GrpcRetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay); } @Override - public @Nullable Status apply(final RequestContext context, final Throwable exception, final Metadata metadata) { - final Throwable exceptionCause = exception instanceof BufferWriteException ? exception.getCause() : exception; - + public com.google.rpc.@Nullable Status applyStatusProto(RequestContext ctx, Throwable throwable, + Metadata metadata) { + final Throwable exceptionCause = throwable instanceof BufferWriteException ? throwable.getCause() : throwable; return handleExceptions(exceptionCause); } - private Status handleExceptions(final Throwable e) { + private com.google.rpc.Status handleExceptions(final Throwable e) { String message = e.getMessage(); if (e instanceof RequestTimeoutException || e instanceof TimeoutException) { requestTimeoutsCounter.increment(); - return createStatus(e, Status.RESOURCE_EXHAUSTED); + return createStatus(e, Status.Code.RESOURCE_EXHAUSTED); } else if (e instanceof SizeOverflowException) { requestsTooLargeCounter.increment(); - return createStatus(e, Status.RESOURCE_EXHAUSTED); + return createStatus(e, Status.Code.RESOURCE_EXHAUSTED); } else if (e instanceof BadRequestException) { badRequestsCounter.increment(); - return createStatus(e, Status.INVALID_ARGUMENT); + return createStatus(e, Status.Code.INVALID_ARGUMENT); } else if ((e instanceof StatusRuntimeException) && (message.contains("Invalid protobuf byte sequence") || message.contains("Can't decode compressed frame"))) { badRequestsCounter.increment(); - return createStatus(e, Status.INVALID_ARGUMENT); + return createStatus(e, Status.Code.INVALID_ARGUMENT); } else if (e instanceof RequestCancelledException) { requestTimeoutsCounter.increment(); - return createStatus(e, Status.CANCELLED); + return createStatus(e, Status.Code.CANCELLED); } internalServerErrorCounter.increment(); LOG.error("Unexpected exception handling gRPC request", e); - return createStatus(e, Status.INTERNAL); + return createStatus(e, Status.Code.INTERNAL); } - private Status createStatus(final Throwable e, final Status status) { - final String message; + private com.google.rpc.Status createStatus(final Throwable e, final Status.Code code) { + com.google.rpc.Status.Builder builder = com.google.rpc.Status.newBuilder().setCode(code.value()); if (e instanceof RequestTimeoutException) { - message = ARMERIA_REQUEST_TIMEOUT_MESSAGE; + builder.setMessage(ARMERIA_REQUEST_TIMEOUT_MESSAGE); } else { - message = e.getMessage() == null ? status.getCode().name() : e.getMessage(); + builder.setMessage(e.getMessage() == null ? code.name() :e.getMessage()); } - - return status.withDescription(message); + if (code == Status.Code.RESOURCE_EXHAUSTED) { + builder.addDetails(Any.pack(retryInfoCalculator.createRetryInfo())); + } + return builder.build(); } } diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRetryInfoCalculator.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRetryInfoCalculator.java new file mode 100644 index 0000000000..2b74b0b4bc --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRetryInfoCalculator.java @@ -0,0 +1,49 @@ +package org.opensearch.dataprepper; + +import com.google.rpc.RetryInfo; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; + +class GrpcRetryInfoCalculator { + + private final Duration minimumDelay; + private final Duration maximumDelay; + + private final AtomicReference lastTimeCalled; + private final AtomicReference nextDelay; + + GrpcRetryInfoCalculator(Duration minimumDelay, Duration maximumDelay) { + this.minimumDelay = minimumDelay; + this.maximumDelay = maximumDelay; + // Create a cushion so that the calculator treats a first quick exception (after prepper startup) as normal request (e.g. does not calculate a backoff) + this.lastTimeCalled = new AtomicReference<>(Instant.now().minus(maximumDelay)); + this.nextDelay = new AtomicReference<>(minimumDelay); + } + + private static RetryInfo createProtoResult(Duration delay) { + return RetryInfo.newBuilder().setRetryDelay(mapDuration(delay)).build(); + } + + private static Duration minDuration(Duration left, Duration right) { + return left.compareTo(right) <= 0 ? left : right; + } + + private static com.google.protobuf.Duration.Builder mapDuration(Duration duration) { + return com.google.protobuf.Duration.newBuilder().setSeconds(duration.getSeconds()).setNanos(duration.getNano()); + } + + RetryInfo createRetryInfo() { + Instant now = Instant.now(); + // Is the last time we got called longer ago than the next delay? + if (lastTimeCalled.getAndSet(now).isBefore(now.minus(nextDelay.get()))) { + // Use minimum delay and reset the saved delay + nextDelay.set(minimumDelay); + return createProtoResult(minimumDelay); + } + Duration delay = nextDelay.getAndUpdate(d -> minDuration(maximumDelay, d.multipliedBy(2))); + return createProtoResult(delay); + } + +} diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java index 7100891d3a..031861c50e 100644 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper; +import com.google.protobuf.Any; +import com.google.rpc.RetryInfo; import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.server.RequestTimeoutException; import io.grpc.Metadata; @@ -13,6 +15,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.exceptions.BadRequestException; @@ -22,11 +27,15 @@ import org.opensearch.dataprepper.model.buffer.SizeOverflowException; import java.io.IOException; +import java.time.Duration; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeoutException; +import static com.linecorp.armeria.internal.common.grpc.MetadataUtil.GRPC_STATUS_DETAILS_BIN_KEY; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -55,6 +64,9 @@ public class GrpcRequestExceptionHandlerTest { @Mock private Metadata metadata; + @Captor + private ArgumentCaptor status; + private GrpcRequestExceptionHandler grpcRequestExceptionHandler; @BeforeEach @@ -64,7 +76,7 @@ public void setUp() { when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUESTS_TOO_LARGE)).thenReturn(requestsTooLargeCounter); when(pluginMetrics.counter(HttpRequestExceptionHandler.INTERNAL_SERVER_ERROR)).thenReturn(internalServerErrorCounter); - grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); + grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics, Duration.ofMillis(100), Duration.ofSeconds(2)); } @Test @@ -99,6 +111,12 @@ public void testHandleTimeoutException() { assertThat(messageStatus.getDescription(), equalTo(exceptionMessage)); verify(requestTimeoutsCounter, times(2)).increment(); + + verify(metadata, times(2)).put(ArgumentMatchers.eq(GRPC_STATUS_DETAILS_BIN_KEY), status.capture()); + for (com.google.rpc.Status currentStatus: status.getAllValues()) { + Optional retryInfo = currentStatus.getDetailsList().stream().filter(d -> d.is(RetryInfo.class)).findFirst(); + assertTrue(retryInfo.isPresent(), "No RetryInfo at status:\n" + currentStatus.toString()); + } } @Test diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRetryInfoCalculatorTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRetryInfoCalculatorTest.java new file mode 100644 index 0000000000..5611826ef7 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRetryInfoCalculatorTest.java @@ -0,0 +1,83 @@ +package org.opensearch.dataprepper; + +import com.google.rpc.RetryInfo; +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class GrpcRetryInfoCalculatorTest { + + @Test + public void testMinimumDelayOnFirstCall() { + RetryInfo retryInfo = new GrpcRetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(1)).createRetryInfo(); + + assertThat(retryInfo.getRetryDelay().getNanos(), equalTo(100_000_000)); + assertThat(retryInfo.getRetryDelay().getSeconds(), equalTo(0L)); + } + + @Test + public void testExponentialBackoff() { + GrpcRetryInfoCalculator calculator = + new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(10)); + RetryInfo retryInfo1 = calculator.createRetryInfo(); + RetryInfo retryInfo2 = calculator.createRetryInfo(); + RetryInfo retryInfo3 = calculator.createRetryInfo(); + RetryInfo retryInfo4 = calculator.createRetryInfo(); + + assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(1L)); + assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(1L)); + assertThat(retryInfo3.getRetryDelay().getSeconds(), equalTo(2L)); + assertThat(retryInfo4.getRetryDelay().getSeconds(), equalTo(4L)); + } + + @Test + public void testUsesMaximumAsLongestDelay() { + GrpcRetryInfoCalculator calculator = + new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(2)); + RetryInfo retryInfo1 = calculator.createRetryInfo(); + RetryInfo retryInfo2 = calculator.createRetryInfo(); + RetryInfo retryInfo3 = calculator.createRetryInfo(); + + assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(1L)); + assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(1L)); + assertThat(retryInfo3.getRetryDelay().getSeconds(), equalTo(2L)); + } + + @Test + public void testResetAfterDelayWearsOff() throws InterruptedException { + int minDelayNanos = 1_000_000; + GrpcRetryInfoCalculator calculator = + new GrpcRetryInfoCalculator(Duration.ofNanos(minDelayNanos), Duration.ofSeconds(1)); + + RetryInfo retryInfo1 = calculator.createRetryInfo(); + RetryInfo retryInfo2 = calculator.createRetryInfo(); + RetryInfo retryInfo3 = calculator.createRetryInfo(); + sleep(retryInfo3); + RetryInfo retryInfo4 = calculator.createRetryInfo(); + + assertThat(retryInfo1.getRetryDelay().getNanos(), equalTo(minDelayNanos)); + assertThat(retryInfo2.getRetryDelay().getNanos(), equalTo(minDelayNanos)); + assertThat(retryInfo3.getRetryDelay().getNanos(), equalTo(minDelayNanos * 2)); + assertThat(retryInfo4.getRetryDelay().getNanos(), equalTo(minDelayNanos)); + } + + @Test + public void testQuickFirstExceptionDoesNotTriggerBackoffCalculationEvenWithLongMinDelay() throws InterruptedException { + GrpcRetryInfoCalculator calculator = + new GrpcRetryInfoCalculator(Duration.ofSeconds(10), Duration.ofSeconds(20)); + + RetryInfo retryInfo1 = calculator.createRetryInfo(); + RetryInfo retryInfo2 = calculator.createRetryInfo(); + + assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(10L)); + assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(10L)); + } + + private void sleep(RetryInfo retryInfo) throws InterruptedException { + // make sure we let enough time pass by adding a few milliseconds on top + Thread.sleep((retryInfo.getRetryDelay().getNanos() / 1_000_000) + 200 ); + } +} diff --git a/data-prepper-plugins/otel-logs-source/README.md b/data-prepper-plugins/otel-logs-source/README.md index 547e65527b..33e142fd44 100644 --- a/data-prepper-plugins/otel-logs-source/README.md +++ b/data-prepper-plugins/otel-logs-source/README.md @@ -26,6 +26,18 @@ source: * `none`: no compression * `gzip`: apply GZip de-compression on the incoming request. +### Retry Information + +Data Prepper replies with a `RetryInfo` specifying how long to wait for the next request in case backpressure builds up. The retry information is implemented as exponential backoff, with a max delay of `retry_info.max_delay`. + +```yaml +source: + otel_trace_source: + retry_info: + min_delay: 1000ms # defaults to 100ms + max_delay: 5s # defaults to 2s +``` + ### SSL * ssl(Optional) => A boolean enables TLS/SSL. Default is ```true```. diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java index 5f5428489f..bbe06ae06e 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.otellogs; +import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction; import com.linecorp.armeria.server.encoding.DecodingService; import org.opensearch.dataprepper.GrpcRequestExceptionHandler; import org.opensearch.dataprepper.plugins.codec.CompressionOption; @@ -43,6 +44,7 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; @@ -54,14 +56,16 @@ public class OTelLogsSource implements Source> { private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; static final String SERVER_CONNECTIONS = "serverConnections"; + // Default RetryInfo with minimum 100ms and maximum 2s + private static final RetryInfoConfig DEFAULT_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000)); + private final OTelLogsSourceConfig oTelLogsSourceConfig; private final String pipelineName; private final PluginMetrics pluginMetrics; private final GrpcAuthenticationProvider authenticationProvider; private final CertificateProviderFactory certificateProviderFactory; - private final GrpcRequestExceptionHandler requestExceptionHandler; + private final ByteDecoder byteDecoder; private Server server; - private ByteDecoder byteDecoder; @DataPrepperPluginConstructor public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig, @@ -80,7 +84,6 @@ public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig, this.certificateProviderFactory = certificateProviderFactory; this.pipelineName = pipelineDescription.getPipelineName(); this.authenticationProvider = createAuthenticationProvider(pluginFactory); - this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); this.byteDecoder = new OTelLogsDecoder(); } @@ -110,7 +113,7 @@ public void start(Buffer> buffer) { .builder() .useClientTimeoutHeader(false) .useBlockingTaskExecutor(true) - .exceptionMapping(requestExceptionHandler); + .exceptionHandler(createGrpExceptionHandler()); final MethodDescriptor methodDescriptor = LogsServiceGrpc.getExportMethod(); final String oTelLogsSourcePath = oTelLogsSourceConfig.getPath(); @@ -205,6 +208,14 @@ public void stop() { LOG.info("Stopped otel_logs_source."); } + private GrpcExceptionHandlerFunction createGrpExceptionHandler() { + RetryInfoConfig retryInfo = oTelLogsSourceConfig.getRetryInfo() != null + ? oTelLogsSourceConfig.getRetryInfo() + : DEFAULT_RETRY_INFO; + + return new GrpcRequestExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay()); + } + private List getAuthenticationInterceptor() { final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor(); if (authenticationInterceptor == null) { diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java index bfca665d76..24662ec802 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java @@ -32,6 +32,7 @@ public class OTelLogsSourceConfig { static final String MAX_CONNECTION_COUNT = "max_connection_count"; static final String ENABLE_UNFRAMED_REQUESTS = "unframed_requests"; static final String COMPRESSION = "compression"; + static final String RETRY_INFO = "retry_info"; static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000; static final int DEFAULT_PORT = 21892; static final int DEFAULT_THREAD_COUNT = 200; @@ -104,6 +105,9 @@ public class OTelLogsSourceConfig { @JsonProperty("max_request_length") private ByteCount maxRequestLength; + @JsonProperty(RETRY_INFO) + private RetryInfoConfig retryInfo; + @AssertTrue(message = "path should start with /") boolean isPathValid() { return path == null || path.startsWith("/"); @@ -217,5 +221,13 @@ public CompressionOption getCompression() { public ByteCount getMaxRequestLength() { return maxRequestLength; } + + public RetryInfoConfig getRetryInfo() { + return retryInfo; + } + + public void setRetryInfo(RetryInfoConfig retryInfo) { + this.retryInfo = retryInfo; + } } diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/RetryInfoConfig.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/RetryInfoConfig.java new file mode 100644 index 0000000000..1ae026202e --- /dev/null +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/RetryInfoConfig.java @@ -0,0 +1,38 @@ +package org.opensearch.dataprepper.plugins.source.otellogs; + +import java.time.Duration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RetryInfoConfig { + + @JsonProperty(value = "min_delay", defaultValue = "100ms") + private Duration minDelay; + + @JsonProperty(value = "max_delay", defaultValue = "2s") + private Duration maxDelay; + + // Jackson needs this constructor + public RetryInfoConfig() {} + + public RetryInfoConfig(Duration minDelay, Duration maxDelay) { + this.minDelay = minDelay; + this.maxDelay = maxDelay; + } + + public Duration getMinDelay() { + return minDelay; + } + + public void setMinDelay(Duration minDelay) { + this.minDelay = minDelay; + } + + public Duration getMaxDelay() { + return maxDelay; + } + + public void setMaxDelay(Duration maxDelay) { + this.maxDelay = maxDelay; + } +} diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java index 29d3536259..2ce4daba91 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java @@ -19,7 +19,6 @@ import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.SessionProtocol; -import com.linecorp.armeria.common.grpc.GrpcStatusFunction; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.grpc.GrpcService; @@ -55,6 +54,7 @@ import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.GrpcRequestExceptionHandler; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; import org.opensearch.dataprepper.metrics.MetricNames; @@ -521,7 +521,8 @@ void start_with_Health_configured_includes_HealthCheck_service() throws IOExcept grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder); when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder); when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder); - when(grpcServiceBuilder.exceptionMapping(any(GrpcStatusFunction.class))).thenReturn(grpcServiceBuilder); + when(grpcServiceBuilder.exceptionHandler(any( + GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder); when(server.stop()).thenReturn(completableFuture); final Path certFilePath = Path.of("data/certificate/test_cert.crt"); @@ -563,7 +564,8 @@ void start_without_Health_configured_does_not_include_HealthCheck_service() thro grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder); when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder); when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder); - when(grpcServiceBuilder.exceptionMapping(any(GrpcStatusFunction.class))).thenReturn(grpcServiceBuilder); + when(grpcServiceBuilder.exceptionHandler(any( + GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder); when(server.stop()).thenReturn(completableFuture); final Path certFilePath = Path.of("data/certificate/test_cert.crt"); diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSourceConfigTests.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSourceConfigTests.java index 69f92e5b1e..abf259da33 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSourceConfigTests.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSourceConfigTests.java @@ -6,6 +6,8 @@ package org.opensearch.dataprepper.plugins.source.otellogs; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -13,6 +15,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; @@ -34,6 +37,7 @@ import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.PORT; import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.PROTO_REFLECTION_SERVICE; import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.REQUEST_TIMEOUT; +import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.RETRY_INFO; import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.SSL; import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.SSL_KEY_CERT_FILE; import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.SSL_KEY_FILE; @@ -42,7 +46,7 @@ import static org.hamcrest.Matchers.equalTo; class OtelLogsSourceConfigTests { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()); private static final String PLUGIN_NAME = "otel_logs_source"; private static final String TEST_KEY_CERT = "test.crt"; private static final String TEST_KEY = "test.key"; @@ -276,6 +280,29 @@ void testInValidConfigWithCustomPath() { assertThat(oTelLogsSourceConfig.isPathValid(), equalTo(false)); } + @Test + void testRetryInfoConfig() { + final PluginSetting customPathPluginSetting = completePluginSettingForOtelLogsSource( + DEFAULT_REQUEST_TIMEOUT_MS, + DEFAULT_PORT, + null, + false, + false, + false, + true, + TEST_KEY_CERT, + "", + DEFAULT_THREAD_COUNT, + DEFAULT_MAX_CONNECTION_COUNT); + + final OTelLogsSourceConfig oTelLogsSourceConfig = OBJECT_MAPPER.convertValue(customPathPluginSetting.getSettings(), OTelLogsSourceConfig.class); + + + RetryInfoConfig retryInfo = oTelLogsSourceConfig.getRetryInfo(); + assertThat(retryInfo.getMaxDelay(), equalTo(Duration.ofMillis(100))); + assertThat(retryInfo.getMinDelay(), equalTo(Duration.ofMillis(50))); + } + private PluginSetting completePluginSettingForOtelLogsSource(final int requestTimeoutInMillis, final int port, final String path, @@ -299,6 +326,7 @@ private PluginSetting completePluginSettingForOtelLogsSource(final int requestTi settings.put(SSL_KEY_FILE, sslKeyFile); settings.put(THREAD_COUNT, threadCount); settings.put(MAX_CONNECTION_COUNT, maxConnectionCount); + settings.put(RETRY_INFO, new RetryInfoConfig(Duration.ofMillis(50), Duration.ofMillis(100))); return new PluginSetting(PLUGIN_NAME, settings); } } diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSource_RetryInfoTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSource_RetryInfoTest.java new file mode 100644 index 0000000000..40cdaa1091 --- /dev/null +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSource_RetryInfoTest.java @@ -0,0 +1,165 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.otellogs; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.DEFAULT_PORT; +import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS; + +import java.time.Duration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.buffer.SizeOverflowException; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.rpc.RetryInfo; +import com.linecorp.armeria.client.Clients; + +import io.grpc.Metadata; +import io.grpc.StatusRuntimeException; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.logs.v1.LogRecord; +import io.opentelemetry.proto.logs.v1.ResourceLogs; +import io.opentelemetry.proto.logs.v1.ScopeLogs; +import io.opentelemetry.proto.resource.v1.Resource; + +@ExtendWith(MockitoExtension.class) +class OtelLogsSource_RetryInfoTest { + private static final String GRPC_ENDPOINT = "gproto+http://127.0.0.1:21892/"; + private static final String TEST_PIPELINE_NAME = "test_pipeline"; + private static final RetryInfoConfig TEST_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000)); + + @Mock + private PluginFactory pluginFactory; + + @Mock + private GrpcBasicAuthenticationProvider authenticationProvider; + + @Mock(lenient = true) + private OTelLogsSourceConfig oTelLogsSourceConfig; + + @Mock + private Buffer> buffer; + + private OTelLogsSource SOURCE; + + @BeforeEach + void beforeEach() throws Exception { + lenient().when(authenticationProvider.getHttpAuthenticationService()).thenCallRealMethod(); + Mockito.lenient().doThrow(SizeOverflowException.class).when(buffer).writeAll(any(), anyInt()); + + when(oTelLogsSourceConfig.getPort()).thenReturn(DEFAULT_PORT); + when(oTelLogsSourceConfig.isSsl()).thenReturn(false); + when(oTelLogsSourceConfig.getRequestTimeoutInMillis()).thenReturn(DEFAULT_REQUEST_TIMEOUT_MS); + when(oTelLogsSourceConfig.getMaxConnectionCount()).thenReturn(10); + when(oTelLogsSourceConfig.getThreadCount()).thenReturn(5); + when(oTelLogsSourceConfig.getCompression()).thenReturn(CompressionOption.NONE); + when(oTelLogsSourceConfig.getRetryInfo()).thenReturn(TEST_RETRY_INFO); + + when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))) + .thenReturn(authenticationProvider); + + configureObjectUnderTest(); + SOURCE.start(buffer); + } + + @AfterEach + void afterEach() { + SOURCE.stop(); + } + + private void configureObjectUnderTest() { + PluginMetrics pluginMetrics = PluginMetrics.fromNames("otel_logs", "pipeline"); + PipelineDescription pipelineDescription = mock(PipelineDescription.class); + lenient().when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + + SOURCE = new OTelLogsSource(oTelLogsSourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + assertInstanceOf(OTelLogsDecoder.class, SOURCE.getDecoder()); + } + + @Test + public void gRPC_failed_request_returns_minimal_delay_in_status() throws Exception { + final LogsServiceGrpc.LogsServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT) + .build(LogsServiceGrpc.LogsServiceBlockingStub.class); + final StatusRuntimeException statusRuntimeException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportLogsRequest())); + + RetryInfo retryInfo = extractRetryInfoFromStatusRuntimeException(statusRuntimeException); + assertThat(Duration.ofNanos(retryInfo.getRetryDelay().getNanos()).toMillis(), equalTo(100L)); + } + + @Test + public void gRPC_failed_request_returns_extended_delay_in_status() throws Exception { + RetryInfo retryInfo = callService3TimesAndReturnRetryInfo(); + + assertThat(Duration.ofNanos(retryInfo.getRetryDelay().getNanos()).toMillis(), equalTo(200L)); + } + + private RetryInfo extractRetryInfoFromStatusRuntimeException(StatusRuntimeException e) throws InvalidProtocolBufferException { + com.google.rpc.Status status = com.google.rpc.Status.parseFrom(e.getTrailers().get(Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER))); + return RetryInfo.parseFrom(status.getDetails(0).getValue()); + } + + /** + * The back off is calculated with the second call, and returned with the third + */ + private RetryInfo callService3TimesAndReturnRetryInfo() throws Exception { + StatusRuntimeException e = null; + for (int i = 0; i < 3; i++) { + final LogsServiceGrpc.LogsServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT) + .build(LogsServiceGrpc.LogsServiceBlockingStub.class); + e = assertThrows(StatusRuntimeException.class, () -> client.export(createExportLogsRequest())); + } + + return extractRetryInfoFromStatusRuntimeException(e); + } + + private ExportLogsServiceRequest createExportLogsRequest() { + final Resource resource = Resource.newBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey("service.name") + .setValue(AnyValue.newBuilder().setStringValue("service").build()) + ).build(); + + final ResourceLogs resourceLogs = ResourceLogs.newBuilder() + .addScopeLogs(ScopeLogs.newBuilder() + .addLogRecords(LogRecord.newBuilder().setSeverityNumberValue(1)) + .build()) + .setResource(resource) + .build(); + + return ExportLogsServiceRequest.newBuilder() + .addResourceLogs(resourceLogs) + .build(); + } +} diff --git a/data-prepper-plugins/otel-metrics-source/README.md b/data-prepper-plugins/otel-metrics-source/README.md index 750520d5f3..93cb456cea 100644 --- a/data-prepper-plugins/otel-metrics-source/README.md +++ b/data-prepper-plugins/otel-metrics-source/README.md @@ -26,6 +26,18 @@ source: * `none`: no compression * `gzip`: apply GZip de-compression on the incoming request. +### Retry Information + +Data Prepper replies with a `RetryInfo` specifying how long to wait for the next request in case backpressure builds up. The retry information is implemented as exponential backoff, with a max delay of `retry_info.max_delay`. + +```yaml +source: + otel_trace_source: + retry_info: + min_delay: 1000ms # defaults to 100ms + max_delay: 5s # defaults to 2s +``` + ### SSL * ssl(Optional) => A boolean enables TLS/SSL. Default is ```true```. diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index 43e2e1f92d..8b168ba4dc 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.otelmetrics; +import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; @@ -45,6 +46,7 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -60,12 +62,14 @@ public class OTelMetricsSource implements Source> { private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; static final String SERVER_CONNECTIONS = "serverConnections"; + // Default RetryInfo with minimum 100ms and maximum 2s + private static final RetryInfoConfig DEFAULT_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000)); + private final OTelMetricsSourceConfig oTelMetricsSourceConfig; private final String pipelineName; private final PluginMetrics pluginMetrics; private final GrpcAuthenticationProvider authenticationProvider; private final CertificateProviderFactory certificateProviderFactory; - private final GrpcRequestExceptionHandler requestExceptionHandler; private Server server; private final ByteDecoder byteDecoder; @@ -84,7 +88,6 @@ public OTelMetricsSource(final OTelMetricsSourceConfig oTelMetricsSourceConfig, this.certificateProviderFactory = certificateProviderFactory; this.pipelineName = pipelineDescription.getPipelineName(); this.authenticationProvider = createAuthenticationProvider(pluginFactory); - this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); this.byteDecoder = new OTelMetricDecoder(); } @@ -113,7 +116,7 @@ public void start(Buffer> buffer) { .builder() .useClientTimeoutHeader(false) .useBlockingTaskExecutor(true) - .exceptionMapping(requestExceptionHandler); + .exceptionHandler(createGrpExceptionHandler()); final MethodDescriptor methodDescriptor = MetricsServiceGrpc.getExportMethod(); final String oTelMetricsSourcePath = oTelMetricsSourceConfig.getPath(); @@ -225,6 +228,14 @@ public void stop() { LOG.info("Stopped otel_metrics_source."); } + private GrpcExceptionHandlerFunction createGrpExceptionHandler() { + RetryInfoConfig retryInfo = oTelMetricsSourceConfig.getRetryInfo() != null + ? oTelMetricsSourceConfig.getRetryInfo() + : DEFAULT_RETRY_INFO; + + return new GrpcRequestExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay()); + } + private List getAuthenticationInterceptor() { final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor(); if (authenticationInterceptor == null) { diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java index ea590fd80b..248ecf2154 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java @@ -31,6 +31,7 @@ public class OTelMetricsSourceConfig { static final String MAX_CONNECTION_COUNT = "max_connection_count"; static final String ENABLE_UNFRAMED_REQUESTS = "unframed_requests"; static final String COMPRESSION = "compression"; + static final String RETRY_INFO = "retry_info"; static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000; static final int DEFAULT_PORT = 21891; static final int DEFAULT_THREAD_COUNT = 200; @@ -107,6 +108,9 @@ public class OTelMetricsSourceConfig { @JsonProperty("max_request_length") private ByteCount maxRequestLength; + @JsonProperty(RETRY_INFO) + private RetryInfoConfig retryInfo; + @AssertTrue(message = "path should start with /") boolean isPathValid() { return path == null || path.startsWith("/"); @@ -228,5 +232,13 @@ public CompressionOption getCompression() { public ByteCount getMaxRequestLength() { return maxRequestLength; } + + public RetryInfoConfig getRetryInfo() { + return retryInfo; + } + + public void setRetryInfo(RetryInfoConfig retryInfo) { + this.retryInfo = retryInfo; + } } diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/RetryInfoConfig.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/RetryInfoConfig.java new file mode 100644 index 0000000000..5fd80133c6 --- /dev/null +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/RetryInfoConfig.java @@ -0,0 +1,38 @@ +package org.opensearch.dataprepper.plugins.source.otelmetrics; + +import java.time.Duration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RetryInfoConfig { + + @JsonProperty(value = "min_delay", defaultValue = "100ms") + private Duration minDelay; + + @JsonProperty(value = "max_delay", defaultValue = "2s") + private Duration maxDelay; + + // Jackson needs this constructor + public RetryInfoConfig() {} + + public RetryInfoConfig(Duration minDelay, Duration maxDelay) { + this.minDelay = minDelay; + this.maxDelay = maxDelay; + } + + public Duration getMinDelay() { + return minDelay; + } + + public void setMinDelay(Duration minDelay) { + this.minDelay = minDelay; + } + + public Duration getMaxDelay() { + return maxDelay; + } + + public void setMaxDelay(Duration maxDelay) { + this.maxDelay = maxDelay; + } +} diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java index d03f84f07a..9972a81de8 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java @@ -20,7 +20,6 @@ import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.SessionProtocol; -import com.linecorp.armeria.common.grpc.GrpcStatusFunction; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.grpc.GrpcService; @@ -61,6 +60,7 @@ import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.GrpcRequestExceptionHandler; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; import org.opensearch.dataprepper.metrics.MetricNames; @@ -197,7 +197,8 @@ public void beforeEach() { lenient().when(grpcServiceBuilder.addService(any(BindableService.class))).thenReturn(grpcServiceBuilder); lenient().when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder); lenient().when(grpcServiceBuilder.useBlockingTaskExecutor(anyBoolean())).thenReturn(grpcServiceBuilder); - lenient().when(grpcServiceBuilder.exceptionMapping(any(GrpcStatusFunction.class))).thenReturn(grpcServiceBuilder); + lenient().when(grpcServiceBuilder.exceptionHandler(any( + GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder); lenient().when(grpcServiceBuilder.build()).thenReturn(grpcService); MetricsTestUtil.initMetrics(); pluginMetrics = PluginMetrics.fromNames("otel_metrics", "pipeline"); diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource_RetryInfoTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource_RetryInfoTest.java new file mode 100644 index 0000000000..403efee46b --- /dev/null +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource_RetryInfoTest.java @@ -0,0 +1,178 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.otelmetrics; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsSourceConfig.DEFAULT_PORT; +import static org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS; + +import java.time.Duration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.buffer.SizeOverflowException; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.metric.Metric; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.rpc.RetryInfo; +import com.linecorp.armeria.client.Clients; + +import io.grpc.Metadata; +import io.grpc.StatusRuntimeException; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.resource.v1.Resource; + +@ExtendWith(MockitoExtension.class) +class OTelMetricsSource_RetryInfoTest { + private static final String GRPC_ENDPOINT = "gproto+http://127.0.0.1:21891/"; + private static final String TEST_PIPELINE_NAME = "test_pipeline"; + private static final RetryInfoConfig TEST_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000)); + + @Mock + private PluginFactory pluginFactory; + + @Mock + private GrpcBasicAuthenticationProvider authenticationProvider; + + @Mock(lenient = true) + private OTelMetricsSourceConfig oTelMetricsSourceConfig; + + @Mock + private Buffer> buffer; + + private OTelMetricsSource SOURCE; + + @BeforeEach + void beforeEach() throws Exception { + lenient().when(authenticationProvider.getHttpAuthenticationService()).thenCallRealMethod(); + Mockito.lenient().doThrow(SizeOverflowException.class).when(buffer).writeAll(any(), anyInt()); + + when(oTelMetricsSourceConfig.getPort()).thenReturn(DEFAULT_PORT); + when(oTelMetricsSourceConfig.isSsl()).thenReturn(false); + when(oTelMetricsSourceConfig.getRequestTimeoutInMillis()).thenReturn(DEFAULT_REQUEST_TIMEOUT_MS); + when(oTelMetricsSourceConfig.getMaxConnectionCount()).thenReturn(10); + when(oTelMetricsSourceConfig.getThreadCount()).thenReturn(5); + when(oTelMetricsSourceConfig.getCompression()).thenReturn(CompressionOption.NONE); + when(oTelMetricsSourceConfig.getRetryInfo()).thenReturn(TEST_RETRY_INFO); + + when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))) + .thenReturn(authenticationProvider); + + configureObjectUnderTest(); + SOURCE.start(buffer); + } + + @AfterEach + void afterEach() { + SOURCE.stop(); + } + + private void configureObjectUnderTest() { + PluginMetrics pluginMetrics = PluginMetrics.fromNames("otel_trace", "pipeline"); + + PipelineDescription pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + SOURCE = new OTelMetricsSource(oTelMetricsSourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + } + + @Test + public void gRPC_failed_request_returns_minimal_delay_in_status() throws Exception { + final MetricsServiceGrpc.MetricsServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT) + .build(MetricsServiceGrpc.MetricsServiceBlockingStub.class); + final StatusRuntimeException statusRuntimeException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportMetricsRequest())); + + RetryInfo retryInfo = extractRetryInfoFromStatusRuntimeException(statusRuntimeException); + assertThat(Duration.ofNanos(retryInfo.getRetryDelay().getNanos()).toMillis(), equalTo(100L)); + } + + @Test + public void gRPC_failed_request_returns_extended_delay_in_status() throws Exception { + RetryInfo retryInfo = callService3TimesAndReturnRetryInfo(); + + assertThat(Duration.ofNanos(retryInfo.getRetryDelay().getNanos()).toMillis(), equalTo(200L)); + } + + private RetryInfo extractRetryInfoFromStatusRuntimeException(StatusRuntimeException e) throws InvalidProtocolBufferException { + com.google.rpc.Status status = com.google.rpc.Status.parseFrom(e.getTrailers().get(Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER))); + return RetryInfo.parseFrom(status.getDetails(0).getValue()); + } + + /** + * The back off is calculated with the second call, and returned with the third + */ + private RetryInfo callService3TimesAndReturnRetryInfo() throws Exception { + StatusRuntimeException e = null; + for (int i = 0; i < 3; i++) { + final MetricsServiceGrpc.MetricsServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT) + .build(MetricsServiceGrpc.MetricsServiceBlockingStub.class); + e = assertThrows(StatusRuntimeException.class, () -> client.export(createExportMetricsRequest())); + } + + return extractRetryInfoFromStatusRuntimeException(e); + } + + private ExportMetricsServiceRequest createExportMetricsRequest() { + final Resource resource = Resource.newBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey("service.name") + .setValue(AnyValue.newBuilder().setStringValue("service").build()) + ).build(); + NumberDataPoint.Builder p1 = NumberDataPoint.newBuilder().setAsInt(4); + Gauge gauge = Gauge.newBuilder().addDataPoints(p1).build(); + + io.opentelemetry.proto.metrics.v1.Metric.Builder metric = io.opentelemetry.proto.metrics.v1.Metric.newBuilder() + .setGauge(gauge) + .setUnit("seconds") + .setName("name") + .setDescription("description"); + InstrumentationLibraryMetrics isntLib = InstrumentationLibraryMetrics.newBuilder() + .addMetrics(metric) + .setInstrumentationLibrary(InstrumentationLibrary.newBuilder() + .setName("ilname") + .setVersion("ilversion") + .build()) + .build(); + + + final ResourceMetrics resourceMetrics = ResourceMetrics.newBuilder() + .setResource(resource) + .addInstrumentationLibraryMetrics(isntLib) + .build(); + + return ExportMetricsServiceRequest.newBuilder() + .addResourceMetrics(resourceMetrics).build(); + } +} diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OtelMetricsSourceConfigTests.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OtelMetricsSourceConfigTests.java index 60193484c9..3ee075cc03 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OtelMetricsSourceConfigTests.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OtelMetricsSourceConfigTests.java @@ -6,6 +6,8 @@ package org.opensearch.dataprepper.plugins.source.otelmetrics; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -13,6 +15,7 @@ import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; @@ -30,7 +33,7 @@ import static org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsSourceConfig.DEFAULT_THREAD_COUNT; class OtelMetricsSourceConfigTests { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()); private static final String PLUGIN_NAME = "otel_metrics_source"; private static final String TEST_KEY_CERT = "test.crt"; private static final String TEST_KEY = "test.key"; @@ -303,6 +306,28 @@ void testInValidConfigWithCustomPath() { assertThat(oTelMetricsSourceConfig.isPathValid(), equalTo(false)); } + @Test + void testRetryInfoConfig() { + final PluginSetting customPathPluginSetting = completePluginSettingForOtelMetricsSource( + DEFAULT_REQUEST_TIMEOUT_MS, + DEFAULT_PORT, + null, + false, + false, + false, + true, + TEST_KEY_CERT, + "", + DEFAULT_THREAD_COUNT, + DEFAULT_MAX_CONNECTION_COUNT); + + final OTelMetricsSourceConfig otelTraceSourceConfig = OBJECT_MAPPER.convertValue(customPathPluginSetting.getSettings(), OTelMetricsSourceConfig.class); + + RetryInfoConfig retryInfo = otelTraceSourceConfig.getRetryInfo(); + assertThat(retryInfo.getMaxDelay(), equalTo(Duration.ofMillis(100))); + assertThat(retryInfo.getMinDelay(), equalTo(Duration.ofMillis(50))); + } + private PluginSetting completePluginSettingForOtelMetricsSource(final int requestTimeoutInMillis, final int port, final String path, @@ -326,6 +351,7 @@ private PluginSetting completePluginSettingForOtelMetricsSource(final int reques settings.put(OTelMetricsSourceConfig.SSL_KEY_FILE, sslKeyFile); settings.put(OTelMetricsSourceConfig.THREAD_COUNT, threadCount); settings.put(OTelMetricsSourceConfig.MAX_CONNECTION_COUNT, maxConnectionCount); + settings.put(OTelMetricsSourceConfig.RETRY_INFO, new RetryInfoConfig(Duration.ofMillis(50), Duration.ofMillis(100))); return new PluginSetting(PLUGIN_NAME, settings); } } diff --git a/data-prepper-plugins/otel-trace-source/README.md b/data-prepper-plugins/otel-trace-source/README.md index 957998489c..6e5172b1fc 100644 --- a/data-prepper-plugins/otel-trace-source/README.md +++ b/data-prepper-plugins/otel-trace-source/README.md @@ -42,6 +42,19 @@ For more information on migrating from Data Prepper 1.x to Data Prepper 2.x, see * `none`: no compression * `gzip`: apply GZip de-compression on the incoming request. + +### Retry Information + +Data Prepper replies with a `RetryInfo` specifying how long to wait for the next request in case backpressure builds up. The retry information is implemented as exponential backoff, with a max delay of `retry_info.max_delay`. + +```yaml +source: + otel_trace_source: + retry_info: + min_delay: 1000ms # defaults to 100ms + max_delay: 5s # defaults to 2s +``` + ### Authentication Configurations By default, the otel-trace-source input is unauthenticated. diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java index 077e4bc879..311abc3e7b 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.oteltrace; +import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction; import com.linecorp.armeria.common.util.BlockingTaskExecutor; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.Server; @@ -45,6 +46,7 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -59,11 +61,13 @@ public class OTelTraceSource implements Source> { static final String SERVER_CONNECTIONS = "serverConnections"; private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; + // Default RetryInfo with minimum 100ms and maximum 2s + private static final RetryInfoConfig DEFAULT_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000)); + private final OTelTraceSourceConfig oTelTraceSourceConfig; private final PluginMetrics pluginMetrics; private final GrpcAuthenticationProvider authenticationProvider; private final CertificateProviderFactory certificateProviderFactory; - private final GrpcRequestExceptionHandler requestExceptionHandler; private final String pipelineName; private Server server; private final ByteDecoder byteDecoder; @@ -83,7 +87,6 @@ public OTelTraceSource(final OTelTraceSourceConfig oTelTraceSourceConfig, final this.certificateProviderFactory = certificateProviderFactory; this.pipelineName = pipelineDescription.getPipelineName(); this.authenticationProvider = createAuthenticationProvider(pluginFactory); - this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); this.byteDecoder = new OTelTraceDecoder(); } @@ -113,7 +116,7 @@ public void start(Buffer> buffer) { .builder() .useClientTimeoutHeader(false) .useBlockingTaskExecutor(true) - .exceptionMapping(requestExceptionHandler); + .exceptionHandler(createGrpExceptionHandler()); final MethodDescriptor methodDescriptor = TraceServiceGrpc.getExportMethod(); final String oTelTraceSourcePath = oTelTraceSourceConfig.getPath(); @@ -208,6 +211,14 @@ public void start(Buffer> buffer) { LOG.info("Started otel_trace_source on port " + oTelTraceSourceConfig.getPort() + "..."); } + private GrpcExceptionHandlerFunction createGrpExceptionHandler() { + RetryInfoConfig retryInfo = oTelTraceSourceConfig.getRetryInfo() != null + ? oTelTraceSourceConfig.getRetryInfo() + : DEFAULT_RETRY_INFO; + + return new GrpcRequestExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay()); + } + @Override public void stop() { if (server != null) { diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java index 4760da34a4..c558cd9dbe 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java @@ -32,6 +32,7 @@ public class OTelTraceSourceConfig { static final String ENABLE_UNFRAMED_REQUESTS = "unframed_requests"; static final String UNAUTHENTICATED_HEALTH_CHECK = "unauthenticated_health_check"; static final String COMPRESSION = "compression"; + static final String RETRY_INFO = "retry_info"; static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000; static final int DEFAULT_PORT = 21890; static final int DEFAULT_THREAD_COUNT = 200; @@ -107,6 +108,9 @@ public class OTelTraceSourceConfig { @JsonProperty("max_request_length") private ByteCount maxRequestLength; + @JsonProperty(RETRY_INFO) + private RetryInfoConfig retryInfo; + @AssertTrue(message = "path should start with /") boolean isPathValid() { return path == null || path.startsWith("/"); @@ -228,4 +232,8 @@ public CompressionOption getCompression() { public ByteCount getMaxRequestLength() { return maxRequestLength; } + + public RetryInfoConfig getRetryInfo() { + return retryInfo; + } } diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/RetryInfoConfig.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/RetryInfoConfig.java new file mode 100644 index 0000000000..7418a909f3 --- /dev/null +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/RetryInfoConfig.java @@ -0,0 +1,38 @@ +package org.opensearch.dataprepper.plugins.source.oteltrace; + +import java.time.Duration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RetryInfoConfig { + + @JsonProperty(value = "min_delay", defaultValue = "100ms") + private Duration minDelay; + + @JsonProperty(value = "max_delay", defaultValue = "2s") + private Duration maxDelay; + + // Jackson needs this constructor + public RetryInfoConfig() {} + + public RetryInfoConfig(Duration minDelay, Duration maxDelay) { + this.minDelay = minDelay; + this.maxDelay = maxDelay; + } + + public Duration getMinDelay() { + return minDelay; + } + + public void setMinDelay(Duration minDelay) { + this.minDelay = minDelay; + } + + public Duration getMaxDelay() { + return maxDelay; + } + + public void setMaxDelay(Duration maxDelay) { + this.maxDelay = maxDelay; + } +} diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java index 9873cc6611..f52f2379dd 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.oteltrace; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; @@ -20,7 +21,6 @@ import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.SessionProtocol; -import com.linecorp.armeria.common.grpc.GrpcStatusFunction; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; @@ -55,6 +55,7 @@ import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.GrpcRequestExceptionHandler; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; import org.opensearch.dataprepper.metrics.MetricNames; @@ -81,6 +82,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.Base64; import java.util.Collection; import java.util.Collections; @@ -134,8 +136,9 @@ class OTelTraceSourceTest { private static final String USERNAME = "test_user"; private static final String PASSWORD = "test_password"; private static final String TEST_PATH = "${pipelineName}/v1/traces"; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()); private static final String TEST_PIPELINE_NAME = "test_pipeline"; + private static final RetryInfoConfig TEST_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(50), Duration.ofMillis(2000)); private static final ExportTraceServiceRequest SUCCESS_REQUEST = ExportTraceServiceRequest.newBuilder() .addResourceSpans(ResourceSpans.newBuilder() .addInstrumentationLibrarySpans(InstrumentationLibrarySpans.newBuilder() @@ -202,7 +205,8 @@ void beforeEach() { lenient().when(grpcServiceBuilder.addService(any(BindableService.class))).thenReturn(grpcServiceBuilder); lenient().when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder); lenient().when(grpcServiceBuilder.useBlockingTaskExecutor(anyBoolean())).thenReturn(grpcServiceBuilder); - lenient().when(grpcServiceBuilder.exceptionMapping(any(GrpcStatusFunction.class))).thenReturn(grpcServiceBuilder); + lenient().when(grpcServiceBuilder.exceptionHandler(any( + GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder); lenient().when(grpcServiceBuilder.build()).thenReturn(grpcService); lenient().when(authenticationProvider.getHttpAuthenticationService()).thenCallRealMethod(); @@ -213,6 +217,7 @@ void beforeEach() { when(oTelTraceSourceConfig.getMaxConnectionCount()).thenReturn(10); when(oTelTraceSourceConfig.getThreadCount()).thenReturn(5); when(oTelTraceSourceConfig.getCompression()).thenReturn(CompressionOption.NONE); + when(oTelTraceSourceConfig.getRetryInfo()).thenReturn(TEST_RETRY_INFO); when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))) .thenReturn(authenticationProvider); @@ -850,7 +855,9 @@ void testRunAnotherSourceWithSamePort() { // starting server SOURCE.start(buffer); - testPluginSetting = new PluginSetting(null, Collections.singletonMap(SSL, false)); + + Map settingsMap = Map.of("retry_info", TEST_RETRY_INFO, SSL, false); + testPluginSetting = new PluginSetting(null, settingsMap); testPluginSetting.setPipelineName("pipeline"); oTelTraceSourceConfig = OBJECT_MAPPER.convertValue(testPluginSetting.getSettings(), OTelTraceSourceConfig.class); final OTelTraceSource source = new OTelTraceSource(oTelTraceSourceConfig, pluginMetrics, pluginFactory, pipelineDescription); diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource_RetryInfoTest.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource_RetryInfoTest.java new file mode 100644 index 0000000000..b3f2cb6de8 --- /dev/null +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource_RetryInfoTest.java @@ -0,0 +1,162 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.oteltrace; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.oteltrace.OTelTraceSourceConfig.DEFAULT_PORT; +import static org.opensearch.dataprepper.plugins.source.oteltrace.OTelTraceSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS; + +import java.time.Duration; +import java.util.UUID; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.buffer.SizeOverflowException; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.rpc.RetryInfo; +import com.linecorp.armeria.client.Clients; + +import io.grpc.Metadata; +import io.grpc.StatusRuntimeException; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; +import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.Span; + +@ExtendWith(MockitoExtension.class) +class OTelTraceSource_RetryInfoTest { + private static final String GRPC_ENDPOINT = "gproto+http://127.0.0.1:21890/"; + private static final String TEST_PIPELINE_NAME = "test_pipeline"; + private static final RetryInfoConfig TEST_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000)); + + @Mock + private PluginFactory pluginFactory; + + @Mock + private GrpcBasicAuthenticationProvider authenticationProvider; + + @Mock(lenient = true) + private OTelTraceSourceConfig oTelTraceSourceConfig; + + @Mock + private Buffer> buffer; + + private PipelineDescription pipelineDescription; + private OTelTraceSource SOURCE; + + @BeforeEach + void beforeEach() throws Exception { + lenient().when(authenticationProvider.getHttpAuthenticationService()).thenCallRealMethod(); + Mockito.lenient().doThrow(SizeOverflowException.class).when(buffer).writeAll(any(), anyInt()); + + when(oTelTraceSourceConfig.getPort()).thenReturn(DEFAULT_PORT); + when(oTelTraceSourceConfig.isSsl()).thenReturn(false); + when(oTelTraceSourceConfig.getRequestTimeoutInMillis()).thenReturn(DEFAULT_REQUEST_TIMEOUT_MS); + when(oTelTraceSourceConfig.getMaxConnectionCount()).thenReturn(10); + when(oTelTraceSourceConfig.getThreadCount()).thenReturn(5); + when(oTelTraceSourceConfig.getCompression()).thenReturn(CompressionOption.NONE); + when(oTelTraceSourceConfig.getRetryInfo()).thenReturn(TEST_RETRY_INFO); + + when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))) + .thenReturn(authenticationProvider); + configureObjectUnderTest(); + pipelineDescription = mock(PipelineDescription.class); + lenient().when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + + configureObjectUnderTest(); + SOURCE.start(buffer); + } + + @AfterEach + void afterEach() { + SOURCE.stop(); + } + + private void configureObjectUnderTest() { + PluginMetrics pluginMetrics = PluginMetrics.fromNames("otel_trace", "pipeline"); + + pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + SOURCE = new OTelTraceSource(oTelTraceSourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + } + + @Test + public void gRPC_failed_request_returns_minimal_delay_in_status() throws Exception { + final TraceServiceGrpc.TraceServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT) + .build(TraceServiceGrpc.TraceServiceBlockingStub.class); + final StatusRuntimeException statusRuntimeException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportTraceRequest())); + + RetryInfo retryInfo = extractRetryInfoFromStatusRuntimeException(statusRuntimeException); + assertThat(Duration.ofNanos(retryInfo.getRetryDelay().getNanos()).toMillis(), equalTo(100L)); + } + + @Test + public void gRPC_failed_request_returns_extended_delay_in_status() throws Exception { + RetryInfo retryInfo = callService3TimesAndReturnRetryInfo(); + + assertThat(Duration.ofNanos(retryInfo.getRetryDelay().getNanos()).toMillis(), equalTo(200L)); + } + + private RetryInfo extractRetryInfoFromStatusRuntimeException(StatusRuntimeException e) throws InvalidProtocolBufferException { + com.google.rpc.Status status = com.google.rpc.Status.parseFrom(e.getTrailers().get(Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER))); + return RetryInfo.parseFrom(status.getDetails(0).getValue()); + } + + /** + * The back off is calculated with the second call, and returned with the third + */ + private RetryInfo callService3TimesAndReturnRetryInfo() throws Exception { + StatusRuntimeException e = null; + for (int i = 0; i < 3; i++) { + final TraceServiceGrpc.TraceServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT) + .build(TraceServiceGrpc.TraceServiceBlockingStub.class); + e = assertThrows(StatusRuntimeException.class, () -> client.export(createExportTraceRequest())); + } + + return extractRetryInfoFromStatusRuntimeException(e); + } + + private ExportTraceServiceRequest createExportTraceRequest() { + final Span testSpan = Span.newBuilder() + .setTraceId(ByteString.copyFromUtf8(UUID.randomUUID().toString())) + .setSpanId(ByteString.copyFromUtf8(UUID.randomUUID().toString())) + .setName(UUID.randomUUID().toString()) + .setKind(Span.SpanKind.SPAN_KIND_SERVER) + .setStartTimeUnixNano(100) + .setEndTimeUnixNano(101) + .setTraceState("SUCCESS").build(); + + return ExportTraceServiceRequest.newBuilder() + .addResourceSpans(ResourceSpans.newBuilder() + .addInstrumentationLibrarySpans(InstrumentationLibrarySpans.newBuilder().addSpans(testSpan)).build()) + .build(); + } +} diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java index ea5ce66b91..f67a637220 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java @@ -11,8 +11,11 @@ import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.model.configuration.PluginSetting; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; @@ -30,7 +33,7 @@ import static org.opensearch.dataprepper.plugins.source.oteltrace.OTelTraceSourceConfig.DEFAULT_THREAD_COUNT; class OtelTraceSourceConfigTests { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()); private static final String PLUGIN_NAME = "otel_trace_source"; private static final String TEST_KEY_CERT = "test.crt"; private static final String TEST_KEY = "test.key"; @@ -302,6 +305,28 @@ void testInValidConfigWithCustomPath() { assertThat(otelTraceSourceConfig.isPathValid(), equalTo(false)); } + @Test + void testRetryInfoConfig() { + final PluginSetting customPathPluginSetting = completePluginSettingForOtelTraceSource( + DEFAULT_REQUEST_TIMEOUT_MS, + DEFAULT_PORT, + null, + false, + false, + false, + true, + TEST_KEY_CERT, + "", + DEFAULT_THREAD_COUNT, + DEFAULT_MAX_CONNECTION_COUNT); + + final OTelTraceSourceConfig otelTraceSourceConfig = OBJECT_MAPPER.convertValue(customPathPluginSetting.getSettings(), OTelTraceSourceConfig.class); + + + assertThat(otelTraceSourceConfig.getRetryInfo().getMaxDelay(), equalTo(Duration.ofMillis(100))); + assertThat(otelTraceSourceConfig.getRetryInfo().getMinDelay(), equalTo(Duration.ofMillis(50))); + } + private PluginSetting completePluginSettingForOtelTraceSource(final int requestTimeoutInMillis, final int port, final String path, @@ -325,6 +350,7 @@ private PluginSetting completePluginSettingForOtelTraceSource(final int requestT settings.put(OTelTraceSourceConfig.SSL_KEY_FILE, sslKeyFile); settings.put(OTelTraceSourceConfig.THREAD_COUNT, threadCount); settings.put(OTelTraceSourceConfig.MAX_CONNECTION_COUNT, maxConnectionCount); + settings.put(OTelTraceSourceConfig.RETRY_INFO, new RetryInfoConfig(Duration.ofMillis(50), Duration.ofMillis(100))); return new PluginSetting(PLUGIN_NAME, settings); } }