Skip to content

Commit

Permalink
fix(reader): skip dataloss gap error
Browse files Browse the repository at this point in the history
  • Loading branch information
s12f committed Dec 15, 2023
1 parent f7e40d3 commit 28e631d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 5 deletions.
12 changes: 8 additions & 4 deletions client/src/main/java/io/hstream/HServerException.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
public class HServerException extends RuntimeException {
ErrBody errBody;

static class ErrBody {
int error;
String message;
JsonElement extra;
static public class ErrBody {
public int error;
public String message;
public JsonElement extra;
}

HServerException(ErrBody errBody) {
Expand All @@ -26,6 +26,10 @@ public String getErrorMessage() {
return errBody.message;
}

public ErrBody getErrBody() {
return errBody;
}

public String getRawErrorBody() {
return new Gson().toJson(errBody);
}
Expand Down
19 changes: 19 additions & 0 deletions client/src/main/kotlin/io/hstream/impl/ReaderKtImpl.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package io.hstream.impl

import io.grpc.Status
import io.grpc.StatusException
import io.grpc.StatusRuntimeException
import io.hstream.HServerException
import io.hstream.HStreamDBClientException
import io.hstream.Reader
import io.hstream.ReceivedRecord
import io.hstream.Record
import io.hstream.StreamShardOffset
import io.hstream.internal.CreateShardReaderRequest
import io.hstream.internal.DeleteShardReaderRequest
import io.hstream.internal.ErrorCode
import io.hstream.internal.LookupShardReaderRequest
import io.hstream.internal.ReadShardRequest
import io.hstream.util.GrpcUtils
Expand Down Expand Up @@ -70,13 +75,27 @@ class ReaderKtImpl(
}
}
readFuture.complete(res as MutableList<ReceivedRecord>?)
} catch (e: StatusException) {
handleGrpcError(e.status, readFuture)
} catch (e: StatusRuntimeException) {
handleGrpcError(e.status, readFuture)
} catch (e: Throwable) {
readFuture.completeExceptionally(HStreamDBClientException(e))
}
}
return readFuture
}

private fun handleGrpcError(status: Status, future: CompletableFuture<MutableList<ReceivedRecord>>) {
val e = HServerException.tryToHServerException(status.description)
if (e != null && e.errBody.error == ErrorCode.ShardReaderDataLossGap.number) {
logger.warn("skip a data loss gap error, {}", status)
future.complete(mutableListOf())
} else {
future.completeExceptionally(HStreamDBClientException(status.asException()))
}
}

override fun close() {
val deleteShardReaderRequest = DeleteShardReaderRequest.newBuilder()
.setReaderId(readerId)
Expand Down
2 changes: 1 addition & 1 deletion client/src/main/proto
Submodule proto updated 1 files
+4 −0 hstream.proto

0 comments on commit 28e631d

Please sign in to comment.