Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/listeners #186

Merged
merged 22 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pubnub/builders.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from abc import ABCMeta, abstractmethod

from pubnub.models.subscription import PubNubChannel, PubNubChannelGroup
from . import utils


Expand Down Expand Up @@ -41,15 +43,17 @@ def with_timetoken(self, timetoken):
return self

def channel_subscriptions(self):
return self._channel_subscriptions
return [PubNubChannel(self._pubnub, channel).subscription(self._presence_enabled)
for channel in self._channel_subscriptions]

def channel_group_subscriptions(self):
return self._channel_group_subscriptions
return [PubNubChannelGroup(self._pubnub, group).subscription(self._presence_enabled)
for group in self._channel_group_subscriptions]

def execute(self):
subscription = self._pubnub.subscription_set(self.channel_subscriptions(), self.channel_group_subscriptions())
subscription = self._pubnub.subscription_set(self.channel_subscriptions() + self.channel_group_subscriptions())

subscription.subscribe(with_presence=self._presence_enabled, timetoken=self._timetoken)
subscription.subscribe(timetoken=self._timetoken)


class UnsubscribeBuilder(PubSubBuilder):
Expand Down
32 changes: 21 additions & 11 deletions pubnub/dtos.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ def groups_with_pressence(self):
return self.channel_groups
return self.channel_groups + [ch + '-pnpres' for ch in self.channel_groups]

@property
def channels_without_presence(self):
return list(filter(lambda ch: not ch.endswith('-pnpres'), self.channels))

@property
def channel_groups_without_presence(self):
return list(filter(lambda gr: not gr.endswith('-pnpres'), self.channel_groups))


class UnsubscribeOperation(object):
def __init__(self, channels=None, channel_groups=None):
Expand All @@ -31,17 +39,19 @@ def __init__(self, channels=None, channel_groups=None):
self.channels = channels
self.channel_groups = channel_groups

def get_subscribed_channels(self, channels, with_presence=False) -> list:
result = [ch for ch in channels if ch not in self.channels and not ch.endswith('-pnpres')]
if not with_presence:
return result
return result + [ch + '-pnpres' for ch in result]

def get_subscribed_channel_groups(self, channel_groups, with_presence=False) -> list:
result = [grp for grp in channel_groups if grp not in self.channel_groups and not grp.endswith('-pnpres')]
if not with_presence:
return result
return result + [grp + '-pnpres' for grp in result]
def get_subscribed_channels(self, channels) -> list:
seba-aln marked this conversation as resolved.
Show resolved Hide resolved
return [ch for ch in channels if ch not in self.channels]

def get_subscribed_channel_groups(self, channel_groups) -> list:
return [grp for grp in channel_groups if grp not in self.channel_groups]

@property
def channels_without_presence(self):
return list(filter(lambda ch: not ch.endswith('-pnpres'), self.channels))

@property
def channel_groups_without_presence(self):
return list(filter(lambda gr: not gr.endswith('-pnpres'), self.channel_groups))


class StateOperation(object):
Expand Down
9 changes: 7 additions & 2 deletions pubnub/event_engine/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,9 @@ def get_timetoken(self):

class HeartbeatEffect(Effect):
def run(self):
channels = self.invocation.channels
groups = self.invocation.groups
channels = list(filter(lambda ch: not ch.endswith('-pnpres'), self.invocation.channels))
groups = list(filter(lambda gr: not gr.endswith('-pnpres'), self.invocation.groups))

if hasattr(self.pubnub, 'event_loop'):
self.stop_event = self.get_new_stop_event()
self.run_async(self.heartbeat(channels=channels, groups=groups, stop_event=self.stop_event))
Expand Down Expand Up @@ -362,6 +363,10 @@ async def heartbeat(self, channels, groups, attempt, stop_event):
groups=self.invocation.groups,
reason=self.invocation.reason,
attempt=self.invocation.attempts))

channels = list(filter(lambda ch: not ch.endswith('-pnpres'), self.invocation.channels))
groups = list(filter(lambda gr: not gr.endswith('-pnpres'), self.invocation.groups))

request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)
delay = self.calculate_reconnection_delay(attempt)
self.logger.warning(f'Will retry to Heartbeat in {delay}s')
Expand Down
6 changes: 4 additions & 2 deletions pubnub/event_engine/models/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,10 +982,12 @@ def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTr
def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition:
self._context.update(context)
for channel in event.channels:
self._context.channels.remove(channel)
if channel in self._context.channels:
self._context.channels.remove(channel)

for group in event.groups:
self._context.groups.remove(group)
if group in self._context.groups:
self._context.groups.remove(group) or None

invocation = None
if not event.suppress_leave:
Expand Down
Loading
Loading