-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support sharded pubsub commands #2498
base: main
Are you sure you want to change the base?
Conversation
My main concern, however, is around shard rebalancing; I don't see anything that handles this currently; presumably we'd need to detect shard movement, and validate that subscriptions are still against the correct server - right now I don't see that happening; we may or may not also need to do this during resubscribe against an existing connection? |
My bad; found them |
var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal); | ||
Trace("MESSAGE: " + channel); | ||
RedisChannel channel; | ||
if (items[0].IsEqual(message)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feels like we're testing this a lot; can we hoist this to a bool
somewhere? maybe:
if (items.Length >= 3 && IsSimpleMessage(items[0], out bool isSharded))
then just use isSharded
in all the places? for example channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded);
or alternatively, using the PatternMode
approach, maybe we can out var patternMode
?
@@ -485,6 +489,7 @@ private bool UnregisterSubscription(in RedisChannel channel, Action<RedisChannel | |||
return false; | |||
} | |||
|
|||
// TODO: We need a new api to support SUNSUBSCRIBE all. Calling this now would unsubscribe both sharded and unsharded channels. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we actually need such an API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently UnsubscribeAll will unsubscribe both unsharded and sharded channels. Alternatively, we need to pass some flag to indicate whether we want to unsubscribe all sharded channels to match the SUNSUBSCRIBE semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that. I just don't know that it is obvious whether that represents a problem. What genuinely likely scenario are we thinking of where this nuance is needed?
@@ -1325,7 +1325,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes | |||
{ | |||
case ResultType.MultiBulk: | |||
var final = result.ToArray( | |||
(in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode), | |||
(in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode, isSharded: false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to support PUBSUB SHARDCHANNELS
(see SubscriptionChannels[Async]
) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto SHARDNUMSUB
- see SubscriptionPatternCount
/ SubscriptionSubscriberCount
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto ssub
in CLIENT
- see ClientInfo.TryParse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, plan to add it in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ssub handled in my extensions to this PR
good idea for leveraging the pattern mode to save one additional bool variable. Thanks for bringing up the shard movement concern, which is critical. Based on my testing, the subscriber gets a SUNSUBSCRIBE message reply from Redis when the shard is moved, which triggers a re-subscription. The new subscription against the old server will get a MOVED message, which makes it to resubscribe to the right server. Does this make sense to you? |
@mgravell No pressure, but any idea when we might expect this feature to be merged? Thanks |
Sorry, took my eyes off this one; let's get this done! if you're not still eager, I can probably take it over; but! 1: I really don't like these constructors: + StackExchange.Redis.RedisChannel.RedisChannel(string! value, bool isSharded) -> void
+ StackExchange.Redis.RedisChannel.RedisChannel(byte[]? value, bool isSharded) -> void can we instead use (to mirror public static RedisChannel Sharded(string value) => ...
public static RedisChannel Sharded(byte[] value) => ... ? To me that feels more consistent. 2: I still think we should squash the 2 |
I've taken the liberty of pushing the merge fixes; I've also proposed the single-field refactor here: vandyvilla#1 - would love to get your input / thoughts here @vandyvilla , although I guess I can push it through either way; sorry this has been delayed |
right, I think that's everything - @vandyvilla if you want to merge vandyvilla#1, then I should be able to pull in the combined effort (preserving your credit, etc) |
important TODOs before final merge:
|
hi @NickCraver Could you please review this merge request at your earliest convenience? |
@mgravell is this mr still active? |
Support SPUBLISH and SSUBSCRIBE, SUNSUBSCRIBE commands.
We tested this PR internally and will add more tests to this PR.
Would like to gather some early feedback. Thanks!
cc @NickCraver @mgravell