Skip to content

Commit

Permalink
Fixes and effects
Browse files Browse the repository at this point in the history
  • Loading branch information
seba-aln committed Jan 9, 2024
1 parent 4e9eb3e commit f294766
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 75 deletions.
6 changes: 6 additions & 0 deletions pubnub/dtos.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ def __init__(self, channels=None, channel_groups=None, presence_enabled=None, ti
self.presence_enabled = presence_enabled
self.timetoken = timetoken

@property
def channels_with_pressence(self):
if not self.presence_enabled:
return self.channels
return [*self.channels] + [ch + '-pnpres' for ch in self.channels]


class UnsubscribeOperation(object):
def __init__(self, channels=None, channel_groups=None):
Expand Down
62 changes: 43 additions & 19 deletions pubnub/event_engine/manage_effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from typing import Optional, Union
from pubnub.endpoints.presence.heartbeat import Heartbeat
from pubnub.endpoints.presence.leave import Leave
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.enums import PNReconnectionPolicy
from pubnub.exceptions import PubNubException
Expand All @@ -20,6 +21,7 @@ class ManagedEffect:
effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]
stop_event = None
logger: logging.Logger
task: asyncio.Task

def set_pn(self, pubnub: PubNub):
self.pubnub = pubnub
Expand All @@ -42,6 +44,8 @@ def stop(self):
if self.stop_event:
self.logger.debug(f'stop_event({id(self.stop_event)}).set() called on {self.__class__.__name__}')
self.stop_event.set()
if hasattr(self, 'task') and isinstance(self.task, asyncio.Task) and not self.task.cancelled():
self.task.cancel()

def get_new_stop_event(self):
event = asyncio.Event()
Expand All @@ -60,9 +64,9 @@ def run(self):
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.handshake_async(channels=channels, groups=groups, timetoken=tt, stop_event=self.stop_event)
if loop.is_running():
loop.create_task(coro)
self.task = loop.create_task(coro)
else:
loop.run_until_complete(coro)
self.task = loop.run_until_complete(coro)
else:
# TODO: the synchronous way
pass
Expand Down Expand Up @@ -98,9 +102,9 @@ def run(self):
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.receive_messages_async(channels, groups, timetoken, region)
if loop.is_running():
loop.create_task(coro)
self.task = loop.create_task(coro)
else:
loop.run_until_complete(coro)
self.task = loop.run_until_complete(coro)
else:
# TODO: the synchronous way
pass
Expand Down Expand Up @@ -181,9 +185,9 @@ def run(self):
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.delayed_reconnect_async(delay, attempts)
if loop.is_running():
self.delayed_reconnect_coro = loop.create_task(coro)
self.task = loop.create_task(coro)
else:
self.delayed_reconnect_coro = loop.run_until_complete(coro)
self.task = loop.run_until_complete(coro)
else:
# TODO: the synchronous way
pass
Expand Down Expand Up @@ -218,9 +222,9 @@ def stop(self):
if self.stop_event:
self.logger.debug(f'stop_event({id(self.stop_event)}).set() called on {self.__class__.__name__}')
self.stop_event.set()
if self.delayed_reconnect_coro:
if self.task:
try:
self.delayed_reconnect_coro.cancel()
self.task.cancel()
except asyncio.exceptions.CancelledError:
pass

Expand Down Expand Up @@ -276,38 +280,38 @@ def run(self):
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.heartbeat(channels=channels, groups=groups, stop_event=self.stop_event)
if loop.is_running():
loop.create_task(coro)
self.task = loop.create_task(coro)
else:
loop.run_until_complete(coro)
self.task = loop.run_until_complete(coro)

async def heartbeat(self, channels, groups, stop_event):
request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)
heartbeat = await request.future()

if heartbeat.status.error:
self.logger.warning(f'Heartbeat failed: {heartbeat.status.error_data.__dict__}')
self.event_engine.trigger(events.HeartbeatFailureEvent(heartbeat.status.error_data, 1))
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
reason=heartbeat.status.error_data, attempt=1))
else:
self.event_engine.trigger(events.HeartbeatSuccessEvent(channels=channels, groups=groups))


class ManagedHeartbeatWaitEffect(ManagedEffect):
def __init__(self, pubnub_instance, event_engine_instance,
effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]) -> None:
def __init__(self, pubnub_instance, event_engine_instance, effect: effects.HeartbeatWaitEffect) -> None:
super().__init__(pubnub_instance, event_engine_instance, effect)
self.heartbeat_interval = pubnub_instance.config.heartbeat_interval

def run(self):
if hasattr(self.pubnub, 'event_loop'):
self.stop_event = self.get_new_stop_event()
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.heartbeat_wait(self.heartbeat_interval, stop_event=self.stop_event)
coroutine = self.heartbeat_wait(self.heartbeat_interval, stop_event=self.stop_event)
if loop.is_running():
loop.create_task(coro)
self.task = loop.create_task(coroutine)
else:
loop.run_until_complete(coro)
self.task = loop.run_until_complete(coroutine)

async def heartbeat(self, wait_time: int, stop_event):
async def heartbeat_wait(self, wait_time: int, stop_event):
try:
await asyncio.sleep(wait_time)
self.event_engine.trigger(events.HeartbeatTimesUpEvent(channels=self.effect.channels,
Expand All @@ -317,7 +321,24 @@ async def heartbeat(self, wait_time: int, stop_event):


class ManagedHeartbeatLeaveEffect(ManagedEffect):
pass
def run(self):
channels = self.effect.channels
groups = self.effect.groups
if hasattr(self.pubnub, 'event_loop'):
self.stop_event = self.get_new_stop_event()
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.leave(channels=channels, groups=groups, stop_event=self.stop_event)
if loop.is_running():
self.task = loop.create_task(coro)
else:
self.task = loop.run_until_complete(coro)

async def leave(self, channels, groups, stop_event):
leave_request = Leave(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)
leave = await leave_request.future()

if leave.status.error:
self.logger.warning(f'Heartbeat failed: {leave.status.error_data.__dict__}')


class ManagedHeartbeatDelayedHeartbeatEffect(ManagedEffect):
Expand All @@ -331,6 +352,9 @@ class ManagedEffectFactory:
effects.HandshakeReconnectEffect.__name__: ManagedHandshakeReconnectEffect,
effects.ReceiveReconnectEffect.__name__: ManagedReceiveReconnectEffect,
effects.HeartbeatEffect.__name__: ManagedHeartbeatEffect,
effects.HeartbeatWaitEffect.__name__: ManagedHeartbeatWaitEffect,
effects.HeartbeatDelayedEffect.__name__: ManagedHeartbeatDelayedHeartbeatEffect,
effects.HeartbeatLeaveEffect.__name__: ManagedHeartbeatLeaveEffect,
}

def __init__(self, pubnub_instance, event_engine_instance) -> None:
Expand All @@ -339,7 +363,7 @@ def __init__(self, pubnub_instance, event_engine_instance) -> None:

def create(self, effect: ManagedEffect):
if effect.__class__.__name__ not in self._managed_effects:
raise PubNubException(errormsg="Unhandled manage effect")
raise PubNubException(errormsg=f"Unhandled managed effect: {effect.__class__.__name__}")
return self._managed_effects[effect.__class__.__name__](self._pubnub, self._event_engine, effect)


Expand Down
8 changes: 5 additions & 3 deletions pubnub/event_engine/models/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(self, channels: Union[None, List[str]] = None, groups: Union[None,

class HeartbeatWaitEffect(PNManageableEffect):
def __init__(self, time) -> None:
self.wait_time = time
super().__init__()


Expand All @@ -117,13 +118,14 @@ class HeartbeatCancelWaitEffect(PNCancelEffect):


class HeartbeatLeaveEffect(PNManageableEffect):
def __init__(self) -> None:
def __init__(self, channels: Union[None, List[str]] = None, groups: Union[None, List[str]] = None) -> None:
super().__init__()
self.channels = channels
self.groups = groups


class HeartbeatDelayedEffect(PNManageableEffect):
def __init__(self) -> None:
super().__init__()
pass


class HeartbeatCancelDelayedEffect(PNCancelEffect):
Expand Down
40 changes: 20 additions & 20 deletions pubnub/event_engine/models/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,8 @@ def __init__(self, context: PNContext) -> None:
}

def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition:
self._context.channels = event.channels
self._context.groups = event.groups
self._context.update(context)

return PNTransition(
Expand Down Expand Up @@ -644,7 +646,7 @@ def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransi
return PNTransition(
state=HeartbeatingState,
context=self._context,
effect=effects.HeartbeatLeaveEffect()
effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups)
)

def reconnect(self, event: events.HeartbeatReconnectEvent, context: PNContext) -> PNTransition:
Expand All @@ -661,7 +663,7 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext)
return PNTransition(
state=HeartbeatStoppedState,
context=self._context,
effect=effects.HeartbeatLeaveEffect()
effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups)
)

def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition:
Expand All @@ -670,7 +672,7 @@ def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> P
return PNTransition(
state=HeartbeatInactiveState,
context=self._context,
effect=effects.HeartbeatLeaveEffect()
effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups)
)


Expand Down Expand Up @@ -705,7 +707,7 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext)
return PNTransition(
state=HeartbeatStoppedState,
context=self._context,
effect=effects.HeartbeatLeaveEffect()
effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups)
)

def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition:
Expand All @@ -714,7 +716,7 @@ def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> P
return PNTransition(
state=HeartbeatInactiveState,
context=self._context,
effect=effects.HeartbeatLeaveEffect()
effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups)
)

def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition:
Expand All @@ -731,7 +733,7 @@ def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransi
return PNTransition(
state=HeartbeatingState,
context=self._context,
effect=effects.HeartbeatLeaveEffect()
effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups)
)

def success(self, event: events.HeartbeatSuccessEvent, context: PNContext) -> PNTransition:
Expand Down Expand Up @@ -760,18 +762,17 @@ def on_enter(self, context: PNContext):
super().on_enter(self._context)
return effects.HeartbeatWaitEffect(self._context)

def on_exit(self, context: PNContext):
self._context.update(context)
super().on_exit(self._context)
return effects.HeartbeatCancelWaitEffect(self._context)
def on_exit(self):
super().on_exit()
return effects.HeartbeatCancelWaitEffect()

def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition:
self._context.update(context)

return PNTransition(
state=HeartbeatStoppedState,
context=self._context,
effect=effects.HeartbeatLeaveEffect()
effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups)
)

def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition:
Expand All @@ -780,7 +781,7 @@ def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> P
return PNTransition(
state=HeartbeatInactiveState,
context=self._context,
effect=effects.HeartbeatLeaveEffect()
effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups)
)

def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition:
Expand All @@ -797,7 +798,7 @@ def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransi
return PNTransition(
state=HeartbeatingState,
context=self._context,
effect=effects.HeartbeatLeaveEffect()
effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups)
)

def times_up(self, event: events.HeartbeatTimesUpEvent, context: PNContext) -> PNTransition:
Expand Down Expand Up @@ -827,10 +828,9 @@ def on_enter(self, context: PNContext):
super().on_enter(self._context)
return effects.HeartbeatDelayedEffect(self._context)

def on_exit(self, context: PNContext):
self._context.update(context)
super().on_exit(self._context)
return effects.HeartbeatCancelDelayedEffect(self._context)
def on_exit(self):
super().on_exit()
return effects.HeartbeatCancelDelayedEffect()

def failure(self, event: events.HeartbeatFailureEvent, context: PNContext) -> PNTransition:
self._context.update(context)
Expand All @@ -854,7 +854,7 @@ def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransi
return PNTransition(
state=HeartbeatingState,
context=self._context,
effect=effects.HeartbeatLeaveEffect()
effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups)
)

def success(self, event: events.HeartbeatSuccessEvent, context: PNContext) -> PNTransition:
Expand All @@ -879,7 +879,7 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext)
return PNTransition(
state=HeartbeatStoppedState,
context=self._context,
effect=effects.HeartbeatLeaveEffect()
effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups)
)

def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition:
Expand All @@ -888,5 +888,5 @@ def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> P
return PNTransition(
state=HeartbeatInactiveState,
context=self._context,
effect=effects.HeartbeatLeaveEffect()
effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups)
)
5 changes: 5 additions & 0 deletions pubnub/event_engine/statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, initial_state: states.PNState, dispatcher_class: Optional[Dis
dispatcher_class = Dispatcher
self._dispatcher = dispatcher_class(self)
self._enabled = True
self._name = name
self.logger = logging.getLogger("pubnub" if not name else f"pubnub.{name}")

def __del__(self):
Expand Down Expand Up @@ -88,3 +89,7 @@ def dispatch_effects(self):

def stop(self):
self._enabled = False

@property
def name(self):
return self._name
Loading

0 comments on commit f294766

Please sign in to comment.