Skip to content

Commit

Permalink
Event engine (#163)
Browse files Browse the repository at this point in the history
* Implementing subscribe loop

* Handle messages

* Handle messages

* fix typing

* Fixed tests
  • Loading branch information
seba-aln authored Jul 24, 2023
1 parent 1029e22 commit a546791
Show file tree
Hide file tree
Showing 19 changed files with 419 additions and 139 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
5 changes: 4 additions & 1 deletion pubnub/endpoints/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pubnub.exceptions import PubNubException
from pubnub.models.consumer.common import PNStatus
from pubnub.models.consumer.pn_error_data import PNErrorData
from ..structures import RequestOptions, ResponseInfo
from pubnub.structures import RequestOptions, ResponseInfo

logger = logging.getLogger("pubnub")

Expand Down Expand Up @@ -148,6 +148,9 @@ def sync(self):

return envelope

def prepare_options(self):
return self.pubnub.prepare_options(self.options())

def pn_async(self, callback):
try:
self.validate_params()
Expand Down
47 changes: 35 additions & 12 deletions pubnub/event_engine/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,44 @@
from pubnub.event_engine import effects
from pubnub.event_engine.models import effects
from pubnub.event_engine import manage_effects


class Dispatcher:
def __init__(self) -> None:
_pubnub = None
_managed_effects_factory = None

def __init__(self, event_engine) -> None:
self._event_engine = event_engine
self._managed_effects = {}
self._effect_emitter = effects.EmitEffect()
self._effect_emitter = manage_effects.EmitEffect()

def set_pn(self, pubnub_instance):
self._pubnub = pubnub_instance
self._effect_emitter.set_pn(pubnub_instance)

def dispatch_effect(self, effect: effects.PNEffect):
print(f'dispatching {effect.__class__.__name__} {id(effect)}')
if not self._managed_effects_factory:
self._managed_effects_factory = manage_effects.ManagedEffectFactory(self._pubnub, self._event_engine)

if isinstance(effect, effects.PNEmittableEffect):
self._effect_emitter.emit(effect)
self.emit_effect(effect)

elif isinstance(effect, effects.PNManageableEffect):
self.dispatch_managed_effect(effect)

elif isinstance(effect, effects.PNCancelEffect):
self.dispatch_cancel_effect(effect)

def emit_effect(self, effect: effects.PNEffect):
print(f' emiting {effect.__class__.__name__} with {effect.__dict__}')
self._effect_emitter.emit(effect)

if isinstance(effect, effects.PNManageableEffect):
managed_effect = effects.ManagedEffect(effect)
managed_effect.run()
self._managed_effects[effect.__class__.__name__] = managed_effect
def dispatch_managed_effect(self, effect: effects.PNEffect):
managed_effect = self._managed_effects_factory.create(effect)
managed_effect.run()
self._managed_effects[effect.__class__.__name__] = managed_effect

if isinstance(effect, effects.PNCancelEffect):
if effect.cancel_effect in self._managed_effects:
self._managed_effects[effect.cancel_effect].stop()
del self._managed_effects[effect.cancel_effect]
def dispatch_cancel_effect(self, effect: effects.PNEffect):
if effect.cancel_effect in self._managed_effects:
self._managed_effects[effect.cancel_effect].stop()
del self._managed_effects[effect.cancel_effect]
138 changes: 138 additions & 0 deletions pubnub/event_engine/manage_effects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import asyncio

from queue import SimpleQueue
from typing import Union
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.pubnub import PubNub
from pubnub.event_engine.models import effects, events
from pubnub.models.consumer.common import PNStatus
from pubnub.workers import SubscribeMessageWorker


class ManagedEffect:
pubnub: PubNub = None
event_engine = None
effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]

def set_pn(self, pubnub: PubNub):
self.pubnub = pubnub

def __init__(self, pubnub_instance, event_engine_instance,
effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]) -> None:
self.effect = effect
self.event_engine = event_engine_instance
self.pubnub = pubnub_instance

def run(self):
pass

def run_async(self):
pass

def stop(self):
pass


class ManageHandshakeEffect(ManagedEffect):
def run(self):
channels = self.effect.channels
groups = self.effect.groups
if hasattr(self.pubnub, 'event_loop'):
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
if loop.is_running():
loop.create_task(self.handshake_async(channels, groups))
else:
loop.run_until_complete(self.handshake_async(channels, groups))
else:
# TODO: the synchronous way
pass

def stop(self):
pass

async def handshake_async(self, channels, groups):
handshake = await Subscribe(self.pubnub).channels(channels).channel_groups(groups).future()
if not handshake.status.error:
cursor = handshake.result['t']
timetoken = cursor['t']
region = cursor['r']
handshake_success = events.HandshakeSuccessEvent(timetoken, region)
self.event_engine.trigger(handshake_success)


class ManagedReceiveMessagesEffect(ManagedEffect):
effect: effects.ReceiveMessagesEffect

def run(self):
channels = self.effect.channels
groups = self.effect.groups
timetoken = self.effect.timetoken
region = self.effect.region

if hasattr(self.pubnub, 'event_loop'):
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.receive_messages_async(channels, groups, timetoken, region)
if loop.is_running():
loop.create_task(coro)
else:
loop.run_until_complete(coro)
else:
# TODO: the synchronous way
pass

def stop(self):
pass

async def receive_messages_async(self, channels, groups, timetoken, region):
response = await Subscribe(self.pubnub).channels(channels).channel_groups(groups).timetoken(timetoken) \
.region(region).future()

if not response.status.error:
cursor = response.result['t']
timetoken = cursor['t']
region = cursor['r']
messages = response.result['m']
print(response.result)
recieve_success = events.ReceiveSuccessEvent(timetoken, region=region, messages=messages)
self.event_engine.trigger(recieve_success)


class ManagedEffectFactory:
_managed_effects = {
effects.HandshakeEffect.__name__: ManageHandshakeEffect,
effects.ReceiveMessagesEffect.__name__: ManagedReceiveMessagesEffect,
}

def __init__(self, pubnub_instance, event_engine_instance) -> None:
self._pubnub = pubnub_instance
self._event_engine = event_engine_instance

def create(self, effect: ManagedEffect):
if effect.__class__.__name__ not in self._managed_effects:
# TODO replace below with raise unsupported managed effect exception
return ManagedEffect(self._pubnub, self._event_engine, effect)
return self._managed_effects[effect.__class__.__name__](self._pubnub, self._event_engine, effect)


class EmitEffect:
pubnub: PubNub

def set_pn(self, pubnub: PubNub):
self.pubnub = pubnub
self.queue = SimpleQueue
self.message_worker = SubscribeMessageWorker(self.pubnub, None, None, None)

def emit(self, effect: effects.PNEmittableEffect):
if isinstance(effect, effects.EmitMessagesEffect):
self.emit_message(effect)
if isinstance(effect, effects.EmitStatusEffect):
self.emit_status(effect)

def emit_message(self, effect: effects.EmitMessagesEffect):
self.pubnub._subscription_manager._listener_manager.announce_message('foo')

def emit_status(self, effect: effects.EmitStatusEffect):
pn_status = PNStatus()
pn_status.category = effect.status
pn_status.error = False
self.pubnub._subscription_manager._listener_manager.announce_status(pn_status)
Empty file.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import List, Union
from pubnub.exceptions import PubNubException
from pubnub.enums import PNStatusCategory
from pubnub.pubnub import PubNub


class PNEffect:
Expand Down Expand Up @@ -97,39 +96,3 @@ class EmitStatusEffect(PNEmittableEffect):
def __init__(self, status: Union[None, PNStatusCategory]) -> None:
super().__init__()
self.status = status


class ManagedEffect:
pubnub: PubNub
effect: Union[PNManageableEffect, PNCancelEffect]

def set_pn(pubnub: PubNub):
pubnub = pubnub

def __init__(self, effect: Union[PNManageableEffect, PNCancelEffect]) -> None:
self.effect = effect

def run(self):
pass

def stop(self):
pass


class EmitEffect:
pubnub: PubNub

def set_pn(pubnub: PubNub):
pubnub = pubnub

def emit(self, effect: PNEmittableEffect):
if isinstance(effect, EmitMessagesEffect):
self.emit_message(effect)
if isinstance(effect, EmitStatusEffect):
self.emit_status(effect)

def emit_message(self, effect: EmitMessagesEffect):
pass

def emit_status(self, effect: EmitStatusEffect):
pass
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pubnub.exceptions import PubNubException
from typing import List
from typing import List, Optional


class PNEvent:
Expand All @@ -13,7 +13,7 @@ def __init__(self, reason: PubNubException, attempt: int) -> None:


class PNCursorEvent(PNEvent):
def __init__(self, timetoken: str, region: int) -> None:
def __init__(self, timetoken: str, region: Optional[int] = None) -> None:
self.timetoken = timetoken
self.region = region

Expand All @@ -30,19 +30,17 @@ def __init__(self, channels: List[str], groups: List[str]) -> None:


class SubscriptionRestoredEvent(PNCursorEvent, PNChannelGroupsEvent):
def __init__(self, timetoken: str, region: int, channels: List[str], groups: List[str]) -> None:
def __init__(self, timetoken: str, channels: List[str], groups: List[str], region: Optional[int] = None) -> None:
PNCursorEvent.__init__(self, timetoken, region)
PNChannelGroupsEvent.__init__(self, channels, groups)


class HandshakeSuccessEvent(PNCursorEvent):
def __init__(self, attempt: int, reason: PubNubException) -> None:
self.attempt = attempt
self.reason = reason
def __init__(self, timetoken: str, region: Optional[int] = None) -> None:
super().__init__(timetoken, region)


class HandshakeFailureEvent(PNFailureEvent):

pass


Expand All @@ -63,7 +61,7 @@ class HandshakeReconnectRetryEvent(PNEvent):


class ReceiveSuccessEvent(PNCursorEvent):
def __init__(self, timetoken: str, region: int, messages: list) -> None:
def __init__(self, timetoken: str, messages: list, region: Optional[int] = None) -> None:
PNCursorEvent.__init__(self, timetoken, region)
self.messages = messages

Expand All @@ -73,7 +71,7 @@ class ReceiveFailureEvent(PNFailureEvent):


class ReceiveReconnectSuccessEvent(PNCursorEvent):
def __init__(self, timetoken: str, region: int, messages: list) -> None:
def __init__(self, timetoken: str, messages: list, region: Optional[int] = None) -> None:
PNCursorEvent.__init__(self, timetoken, region)
self.messages = messages

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pubnub.enums import PNStatusCategory
from pubnub.event_engine import effects, events
from pubnub.event_engine.effects import PNEffect
from pubnub.event_engine.models import effects
from pubnub.event_engine.models.effects import PNEffect
from pubnub.event_engine.models import events
from pubnub.exceptions import PubNubException
from typing import List, Union

Expand Down Expand Up @@ -328,7 +329,9 @@ def __init__(self, context: PNContext) -> None:

def on_enter(self, context: Union[None, PNContext]):
super().on_enter(context)
return effects.ReceiveMessagesEffect(context.channels, context.groups)
print(self._context)
return effects.ReceiveMessagesEffect(context.channels, context.groups, timetoken=self._context.timetoken,
region=self._context.region)

def on_exit(self):
super().on_exit()
Expand Down Expand Up @@ -387,6 +390,13 @@ def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTra
effect=effects.EmitStatusEffect(PNStatusCategory.PNDisconnectedCategory)
)

def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition:
self._context.update(context)
return PNTransition(
state=ReceivingState,
context=self._context
)


class ReceiveReconnectingState(PNState):
def __init__(self, context: PNContext) -> None:
Expand Down
Loading

0 comments on commit a546791

Please sign in to comment.