Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(reader): skip dataloss gap error #220

Merged
merged 2 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions client/src/main/java/io/hstream/HServerException.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
public class HServerException extends RuntimeException {
ErrBody errBody;

static class ErrBody {
int error;
String message;
JsonElement extra;
public static class ErrBody {
public int error;
public String message;
public JsonElement extra;
}

HServerException(ErrBody errBody) {
Expand All @@ -26,6 +26,10 @@ public String getErrorMessage() {
return errBody.message;
}

public ErrBody getErrBody() {
return errBody;
}

public String getRawErrorBody() {
return new Gson().toJson(errBody);
}
Expand Down
19 changes: 19 additions & 0 deletions client/src/main/kotlin/io/hstream/impl/ReaderKtImpl.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package io.hstream.impl

import io.grpc.Status
import io.grpc.StatusException
import io.grpc.StatusRuntimeException
import io.hstream.HServerException
import io.hstream.HStreamDBClientException
import io.hstream.Reader
import io.hstream.ReceivedRecord
import io.hstream.Record
import io.hstream.StreamShardOffset
import io.hstream.internal.CreateShardReaderRequest
import io.hstream.internal.DeleteShardReaderRequest
import io.hstream.internal.ErrorCode
import io.hstream.internal.LookupShardReaderRequest
import io.hstream.internal.ReadShardRequest
import io.hstream.util.GrpcUtils
Expand Down Expand Up @@ -70,13 +75,27 @@ class ReaderKtImpl(
}
}
readFuture.complete(res as MutableList<ReceivedRecord>?)
} catch (e: StatusException) {
handleGrpcError(e.status, readFuture)
} catch (e: StatusRuntimeException) {
handleGrpcError(e.status, readFuture)
} catch (e: Throwable) {
readFuture.completeExceptionally(HStreamDBClientException(e))
}
}
return readFuture
}

private fun handleGrpcError(status: Status, future: CompletableFuture<MutableList<ReceivedRecord>>) {
val e = HServerException.tryToHServerException(status.description)
if (e != null && e.errBody.error == ErrorCode.ShardReaderDataLossGap.number) {
logger.warn("skip a data loss gap error, {}", status)
future.complete(mutableListOf())
} else {
future.completeExceptionally(HStreamDBClientException(status.asException()))
}
}

override fun close() {
val deleteShardReaderRequest = DeleteShardReaderRequest.newBuilder()
.setReaderId(readerId)
Expand Down
2 changes: 1 addition & 1 deletion client/src/main/proto
Submodule proto updated 1 files
+4 −0 hstream.proto
Loading