From 28e631decc5831094ab753cc6e3bebe405221f66 Mon Sep 17 00:00:00 2001 From: s12f Date: Fri, 15 Dec 2023 16:35:45 +0800 Subject: [PATCH 1/2] fix(reader): skip dataloss gap error --- .../java/io/hstream/HServerException.java | 12 ++++++++---- .../kotlin/io/hstream/impl/ReaderKtImpl.kt | 19 +++++++++++++++++++ client/src/main/proto | 2 +- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/io/hstream/HServerException.java b/client/src/main/java/io/hstream/HServerException.java index 2182f7d1..76e51046 100644 --- a/client/src/main/java/io/hstream/HServerException.java +++ b/client/src/main/java/io/hstream/HServerException.java @@ -7,10 +7,10 @@ public class HServerException extends RuntimeException { ErrBody errBody; - static class ErrBody { - int error; - String message; - JsonElement extra; + static public class ErrBody { + public int error; + public String message; + public JsonElement extra; } HServerException(ErrBody errBody) { @@ -26,6 +26,10 @@ public String getErrorMessage() { return errBody.message; } + public ErrBody getErrBody() { + return errBody; + } + public String getRawErrorBody() { return new Gson().toJson(errBody); } diff --git a/client/src/main/kotlin/io/hstream/impl/ReaderKtImpl.kt b/client/src/main/kotlin/io/hstream/impl/ReaderKtImpl.kt index 468ffb89..0c5a6e81 100644 --- a/client/src/main/kotlin/io/hstream/impl/ReaderKtImpl.kt +++ b/client/src/main/kotlin/io/hstream/impl/ReaderKtImpl.kt @@ -1,5 +1,9 @@ package io.hstream.impl +import io.grpc.Status +import io.grpc.StatusException +import io.grpc.StatusRuntimeException +import io.hstream.HServerException import io.hstream.HStreamDBClientException import io.hstream.Reader import io.hstream.ReceivedRecord @@ -7,6 +11,7 @@ import io.hstream.Record import io.hstream.StreamShardOffset import io.hstream.internal.CreateShardReaderRequest import io.hstream.internal.DeleteShardReaderRequest +import io.hstream.internal.ErrorCode import io.hstream.internal.LookupShardReaderRequest import io.hstream.internal.ReadShardRequest import io.hstream.util.GrpcUtils @@ -70,6 +75,10 @@ class ReaderKtImpl( } } readFuture.complete(res as MutableList?) + } catch (e: StatusException) { + handleGrpcError(e.status, readFuture) + } catch (e: StatusRuntimeException) { + handleGrpcError(e.status, readFuture) } catch (e: Throwable) { readFuture.completeExceptionally(HStreamDBClientException(e)) } @@ -77,6 +86,16 @@ class ReaderKtImpl( return readFuture } + private fun handleGrpcError(status: Status, future: CompletableFuture>) { + val e = HServerException.tryToHServerException(status.description) + if (e != null && e.errBody.error == ErrorCode.ShardReaderDataLossGap.number) { + logger.warn("skip a data loss gap error, {}", status) + future.complete(mutableListOf()) + } else { + future.completeExceptionally(HStreamDBClientException(status.asException())) + } + } + override fun close() { val deleteShardReaderRequest = DeleteShardReaderRequest.newBuilder() .setReaderId(readerId) diff --git a/client/src/main/proto b/client/src/main/proto index 2d3617b0..e8c9a361 160000 --- a/client/src/main/proto +++ b/client/src/main/proto @@ -1 +1 @@ -Subproject commit 2d3617b03f8178c4da3999ae5140559aa8ba40df +Subproject commit e8c9a36175734506b3fb4233233763615c41f92e From c4f6a68fc3cf546b4f73eab32512e6fa31c688bb Mon Sep 17 00:00:00 2001 From: s12f Date: Fri, 15 Dec 2023 16:37:47 +0800 Subject: [PATCH 2/2] fix format --- client/src/main/java/io/hstream/HServerException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/io/hstream/HServerException.java b/client/src/main/java/io/hstream/HServerException.java index 76e51046..2ac1f915 100644 --- a/client/src/main/java/io/hstream/HServerException.java +++ b/client/src/main/java/io/hstream/HServerException.java @@ -7,7 +7,7 @@ public class HServerException extends RuntimeException { ErrBody errBody; - static public class ErrBody { + public static class ErrBody { public int error; public String message; public JsonElement extra;