-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resume support #87
Labels
Comments
Postponed until rsocket protocol resume changes |
POC Public API to support resume 2.0: public interface ResumeResolver : Closeable {
public suspend fun shouldResumeRequestResponse(payload: Payload): Boolean = false
public suspend fun shouldResumeRequestStream(payload: Payload): Boolean = false
public suspend fun shouldResumeRequestChannel(payload: Payload): Boolean = false
override fun close(): Unit = Unit
}
public interface ResumeFrameStorage : Closeable {
public fun state(): ResumeState // on keep alive send
public fun onFrameSend(streamId: Int, frame: ByteReadPacket) // on send to connection
public fun onFrameReceive(streamId: Int) // on receive from connection
public fun resumeFrames(streamId: Int): List<ByteReadPacket> // on reconnect
public fun releaseFrames(streamId: Int, impliedPosition: Long) // on keep alive receive
public fun remove(streamId: Int) // on full remove
}
public class ResumeStrategy(
public val resumeResolver: ResumeResolver,
public val resumeFrameStorage: ResumeFrameStorage
) : Closeable Server side API: RSocketServer {
resume {
strategy { config: ConnectionConfig ->
//example of resume strategy configuration, that shows, how we can decide on resumability of requests based on
// both request payload (which contains f.e. route), specific storage capacity or any other parameter
val storage = InMemoryResumeFrameStorage("SERVER")
//some check based on setup payload
val resumeStream = config.setupPayload.data.readText().contains("RESUME:STREAM")
ResumeStrategy(
resumeFrameStorage = storage,
resumeResolver = object: ResumeResolver {
override suspend fun shouldResumeRequestStream(payload: Payload): Boolean {
val route: String? = payload.metadata?.read(RoutintMetadata)?.tags?.firstOrNull()
return resumeStream && route == "resumable_route" && storage.hasEnoughSpaceForResumingStream()
}
}
)
}
}
} Connector side API change: RSocketConnector {
reconnectable(10) //reconnect config for resume
resume {
token {
buildPacket {
writeText(generateStringToken()) //some token
}
}
//resumeResolver omitted, so client side Responder will not resume any stream by default
strategy { config: ConnectionConfig ->
ResumeStrategy(
resumeFrameStorage = InMemoryResumeFrameStorage("CLIENT")
)
}
}
} |
whyoleg
modified the milestone:
0.17.0 - Lease and Resume support, configuration API rework
Nov 24, 2022
whyoleg
removed this from the
0.17.0 - Lease and Resume support, configuration API rework milestone
May 3, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
No description provided.
The text was updated successfully, but these errors were encountered: