Skip to content

Commit

Permalink
Merge branch 'master' into build/clen-1387/gha-switch-to-large-runners
Browse files Browse the repository at this point in the history
  • Loading branch information
parfeon authored Aug 8, 2023
2 parents 824a7eb + e1ba518 commit d2e8c9a
Show file tree
Hide file tree
Showing 20 changed files with 421 additions and 141 deletions.
4 changes: 2 additions & 2 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
* @seba-aln @kleewho @marek-lewandowski
.github/* @parfeon @seba-aln @kleewho
* @seba-aln @kleewho @Xavrax @jguz-pubnub
.github/* @parfeon @seba-aln @kleewho @Xavrax @jguz-pubnub
README.md @techwritermat
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
5 changes: 4 additions & 1 deletion pubnub/endpoints/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pubnub.exceptions import PubNubException
from pubnub.models.consumer.common import PNStatus
from pubnub.models.consumer.pn_error_data import PNErrorData
from ..structures import RequestOptions, ResponseInfo
from pubnub.structures import RequestOptions, ResponseInfo

logger = logging.getLogger("pubnub")

Expand Down Expand Up @@ -148,6 +148,9 @@ def sync(self):

return envelope

def prepare_options(self):
return self.pubnub.prepare_options(self.options())

def pn_async(self, callback):
try:
self.validate_params()
Expand Down
47 changes: 35 additions & 12 deletions pubnub/event_engine/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,44 @@
from pubnub.event_engine import effects
from pubnub.event_engine.models import effects
from pubnub.event_engine import manage_effects


class Dispatcher:
def __init__(self) -> None:
_pubnub = None
_managed_effects_factory = None

def __init__(self, event_engine) -> None:
self._event_engine = event_engine
self._managed_effects = {}
self._effect_emitter = effects.EmitEffect()
self._effect_emitter = manage_effects.EmitEffect()

def set_pn(self, pubnub_instance):
self._pubnub = pubnub_instance
self._effect_emitter.set_pn(pubnub_instance)

def dispatch_effect(self, effect: effects.PNEffect):
print(f'dispatching {effect.__class__.__name__} {id(effect)}')
if not self._managed_effects_factory:
self._managed_effects_factory = manage_effects.ManagedEffectFactory(self._pubnub, self._event_engine)

if isinstance(effect, effects.PNEmittableEffect):
self._effect_emitter.emit(effect)
self.emit_effect(effect)

elif isinstance(effect, effects.PNManageableEffect):
self.dispatch_managed_effect(effect)

elif isinstance(effect, effects.PNCancelEffect):
self.dispatch_cancel_effect(effect)

def emit_effect(self, effect: effects.PNEffect):
print(f' emiting {effect.__class__.__name__} with {effect.__dict__}')
self._effect_emitter.emit(effect)

if isinstance(effect, effects.PNManageableEffect):
managed_effect = effects.ManagedEffect(effect)
managed_effect.run()
self._managed_effects[effect.__class__.__name__] = managed_effect
def dispatch_managed_effect(self, effect: effects.PNEffect):
managed_effect = self._managed_effects_factory.create(effect)
managed_effect.run()
self._managed_effects[effect.__class__.__name__] = managed_effect

if isinstance(effect, effects.PNCancelEffect):
if effect.cancel_effect in self._managed_effects:
self._managed_effects[effect.cancel_effect].stop()
del self._managed_effects[effect.cancel_effect]
def dispatch_cancel_effect(self, effect: effects.PNEffect):
if effect.cancel_effect in self._managed_effects:
self._managed_effects[effect.cancel_effect].stop()
del self._managed_effects[effect.cancel_effect]
138 changes: 138 additions & 0 deletions pubnub/event_engine/manage_effects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import asyncio

from queue import SimpleQueue
from typing import Union
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.pubnub import PubNub
from pubnub.event_engine.models import effects, events
from pubnub.models.consumer.common import PNStatus
from pubnub.workers import SubscribeMessageWorker


class ManagedEffect:
pubnub: PubNub = None
event_engine = None
effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]

def set_pn(self, pubnub: PubNub):
self.pubnub = pubnub

def __init__(self, pubnub_instance, event_engine_instance,
effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]) -> None:
self.effect = effect
self.event_engine = event_engine_instance
self.pubnub = pubnub_instance

def run(self):
pass

def run_async(self):
pass

def stop(self):
pass


class ManageHandshakeEffect(ManagedEffect):
def run(self):
channels = self.effect.channels
groups = self.effect.groups
if hasattr(self.pubnub, 'event_loop'):
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
if loop.is_running():
loop.create_task(self.handshake_async(channels, groups))
else:
loop.run_until_complete(self.handshake_async(channels, groups))
else:
# TODO: the synchronous way
pass

def stop(self):
pass

async def handshake_async(self, channels, groups):
handshake = await Subscribe(self.pubnub).channels(channels).channel_groups(groups).future()
if not handshake.status.error:
cursor = handshake.result['t']
timetoken = cursor['t']
region = cursor['r']
handshake_success = events.HandshakeSuccessEvent(timetoken, region)
self.event_engine.trigger(handshake_success)


class ManagedReceiveMessagesEffect(ManagedEffect):
effect: effects.ReceiveMessagesEffect

def run(self):
channels = self.effect.channels
groups = self.effect.groups
timetoken = self.effect.timetoken
region = self.effect.region

if hasattr(self.pubnub, 'event_loop'):
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.receive_messages_async(channels, groups, timetoken, region)
if loop.is_running():
loop.create_task(coro)
else:
loop.run_until_complete(coro)
else:
# TODO: the synchronous way
pass

def stop(self):
pass

async def receive_messages_async(self, channels, groups, timetoken, region):
response = await Subscribe(self.pubnub).channels(channels).channel_groups(groups).timetoken(timetoken) \
.region(region).future()

if not response.status.error:
cursor = response.result['t']
timetoken = cursor['t']
region = cursor['r']
messages = response.result['m']
print(response.result)
recieve_success = events.ReceiveSuccessEvent(timetoken, region=region, messages=messages)
self.event_engine.trigger(recieve_success)


class ManagedEffectFactory:
_managed_effects = {
effects.HandshakeEffect.__name__: ManageHandshakeEffect,
effects.ReceiveMessagesEffect.__name__: ManagedReceiveMessagesEffect,
}

def __init__(self, pubnub_instance, event_engine_instance) -> None:
self._pubnub = pubnub_instance
self._event_engine = event_engine_instance

def create(self, effect: ManagedEffect):
if effect.__class__.__name__ not in self._managed_effects:
# TODO replace below with raise unsupported managed effect exception
return ManagedEffect(self._pubnub, self._event_engine, effect)
return self._managed_effects[effect.__class__.__name__](self._pubnub, self._event_engine, effect)


class EmitEffect:
pubnub: PubNub

def set_pn(self, pubnub: PubNub):
self.pubnub = pubnub
self.queue = SimpleQueue
self.message_worker = SubscribeMessageWorker(self.pubnub, None, None, None)

def emit(self, effect: effects.PNEmittableEffect):
if isinstance(effect, effects.EmitMessagesEffect):
self.emit_message(effect)
if isinstance(effect, effects.EmitStatusEffect):
self.emit_status(effect)

def emit_message(self, effect: effects.EmitMessagesEffect):
self.pubnub._subscription_manager._listener_manager.announce_message('foo')

def emit_status(self, effect: effects.EmitStatusEffect):
pn_status = PNStatus()
pn_status.category = effect.status
pn_status.error = False
self.pubnub._subscription_manager._listener_manager.announce_status(pn_status)
Empty file.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import List, Union
from pubnub.exceptions import PubNubException
from pubnub.enums import PNStatusCategory
from pubnub.pubnub import PubNub


class PNEffect:
Expand Down Expand Up @@ -97,39 +96,3 @@ class EmitStatusEffect(PNEmittableEffect):
def __init__(self, status: Union[None, PNStatusCategory]) -> None:
super().__init__()
self.status = status


class ManagedEffect:
pubnub: PubNub
effect: Union[PNManageableEffect, PNCancelEffect]

def set_pn(pubnub: PubNub):
pubnub = pubnub

def __init__(self, effect: Union[PNManageableEffect, PNCancelEffect]) -> None:
self.effect = effect

def run(self):
pass

def stop(self):
pass


class EmitEffect:
pubnub: PubNub

def set_pn(pubnub: PubNub):
pubnub = pubnub

def emit(self, effect: PNEmittableEffect):
if isinstance(effect, EmitMessagesEffect):
self.emit_message(effect)
if isinstance(effect, EmitStatusEffect):
self.emit_status(effect)

def emit_message(self, effect: EmitMessagesEffect):
pass

def emit_status(self, effect: EmitStatusEffect):
pass
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pubnub.exceptions import PubNubException
from typing import List
from typing import List, Optional


class PNEvent:
Expand All @@ -13,7 +13,7 @@ def __init__(self, reason: PubNubException, attempt: int) -> None:


class PNCursorEvent(PNEvent):
def __init__(self, timetoken: str, region: int) -> None:
def __init__(self, timetoken: str, region: Optional[int] = None) -> None:
self.timetoken = timetoken
self.region = region

Expand All @@ -30,19 +30,17 @@ def __init__(self, channels: List[str], groups: List[str]) -> None:


class SubscriptionRestoredEvent(PNCursorEvent, PNChannelGroupsEvent):
def __init__(self, timetoken: str, region: int, channels: List[str], groups: List[str]) -> None:
def __init__(self, timetoken: str, channels: List[str], groups: List[str], region: Optional[int] = None) -> None:
PNCursorEvent.__init__(self, timetoken, region)
PNChannelGroupsEvent.__init__(self, channels, groups)


class HandshakeSuccessEvent(PNCursorEvent):
def __init__(self, attempt: int, reason: PubNubException) -> None:
self.attempt = attempt
self.reason = reason
def __init__(self, timetoken: str, region: Optional[int] = None) -> None:
super().__init__(timetoken, region)


class HandshakeFailureEvent(PNFailureEvent):

pass


Expand All @@ -63,7 +61,7 @@ class HandshakeReconnectRetryEvent(PNEvent):


class ReceiveSuccessEvent(PNCursorEvent):
def __init__(self, timetoken: str, region: int, messages: list) -> None:
def __init__(self, timetoken: str, messages: list, region: Optional[int] = None) -> None:
PNCursorEvent.__init__(self, timetoken, region)
self.messages = messages

Expand All @@ -73,7 +71,7 @@ class ReceiveFailureEvent(PNFailureEvent):


class ReceiveReconnectSuccessEvent(PNCursorEvent):
def __init__(self, timetoken: str, region: int, messages: list) -> None:
def __init__(self, timetoken: str, messages: list, region: Optional[int] = None) -> None:
PNCursorEvent.__init__(self, timetoken, region)
self.messages = messages

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pubnub.enums import PNStatusCategory
from pubnub.event_engine import effects, events
from pubnub.event_engine.effects import PNEffect
from pubnub.event_engine.models import effects
from pubnub.event_engine.models.effects import PNEffect
from pubnub.event_engine.models import events
from pubnub.exceptions import PubNubException
from typing import List, Union

Expand Down Expand Up @@ -328,7 +329,9 @@ def __init__(self, context: PNContext) -> None:

def on_enter(self, context: Union[None, PNContext]):
super().on_enter(context)
return effects.ReceiveMessagesEffect(context.channels, context.groups)
print(self._context)
return effects.ReceiveMessagesEffect(context.channels, context.groups, timetoken=self._context.timetoken,
region=self._context.region)

def on_exit(self):
super().on_exit()
Expand Down Expand Up @@ -387,6 +390,13 @@ def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTra
effect=effects.EmitStatusEffect(PNStatusCategory.PNDisconnectedCategory)
)

def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition:
self._context.update(context)
return PNTransition(
state=ReceivingState,
context=self._context
)


class ReceiveReconnectingState(PNState):
def __init__(self, context: PNContext) -> None:
Expand Down
Loading

0 comments on commit d2e8c9a

Please sign in to comment.