diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index bb6f8cda..5567fda7 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -78,6 +78,7 @@ jobs: mkdir tests/acceptance/encryption/assets/ cp sdk-specifications/features/encryption/assets/* tests/acceptance/encryption/assets/ cp sdk-specifications/features/subscribe/event-engine/happy-path.feature tests/acceptance/subscribe/happy-path.feature + cp sdk-specifications/features/presence/event-engine/presence-engine.feature tests/acceptance/subscribe/presence-engine.feature sudo pip3 install -r requirements-dev.txt behave --junit tests/acceptance/pam diff --git a/.pubnub.yml b/.pubnub.yml index 1dd9ab96..189b92ae 100644 --- a/.pubnub.yml +++ b/.pubnub.yml @@ -1,5 +1,5 @@ name: python -version: 7.3.2 +version: 7.4.0 schema: 1 scm: github.com/pubnub/python sdks: @@ -18,7 +18,7 @@ sdks: distributions: - distribution-type: library distribution-repository: package - package-name: pubnub-7.3.2 + package-name: pubnub-7.4.0 location: https://pypi.org/project/pubnub/ supported-platforms: supported-operating-systems: @@ -97,8 +97,8 @@ sdks: - distribution-type: library distribution-repository: git release - package-name: pubnub-7.3.2 - location: https://github.com/pubnub/python/releases/download/v7.3.2/pubnub-7.3.2.tar.gz + package-name: pubnub-7.4.0 + location: https://github.com/pubnub/python/releases/download/v7.4.0/pubnub-7.4.0.tar.gz supported-platforms: supported-operating-systems: Linux: @@ -169,6 +169,11 @@ sdks: license-url: https://github.com/aio-libs/aiohttp/blob/master/LICENSE.txt is-required: Required changelog: + - date: 2024-02-08 + version: v7.4.0 + changes: + - type: feature + text: "Optional Event Engine for Subscribe Loop." - date: 2023-11-27 version: v7.3.2 changes: diff --git a/CHANGELOG.md b/CHANGELOG.md index a702235d..175054ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## v7.4.0 +February 08 2024 + +#### Added +- Optional Event Engine for Subscribe Loop. + ## v7.3.2 November 27 2023 diff --git a/examples/cli_chat.py b/examples/cli_chat.py new file mode 100644 index 00000000..81f77013 --- /dev/null +++ b/examples/cli_chat.py @@ -0,0 +1,63 @@ +import argparse +import asyncio + +from os import getenv +from pubnub.callbacks import SubscribeCallback +from pubnub.pubnub_asyncio import EventEngineSubscriptionManager, PubNubAsyncio +from pubnub.pnconfiguration import PNConfiguration + +parser = argparse.ArgumentParser(description="Chat with others using PubNub") +parser.add_argument("-n", metavar="name", help="Your name", default=None, required=False) +parser.add_argument("-c", metavar="channel", help="The channel you want to join", default=None, required=False) +args = parser.parse_args() + + +class ExampleCallback(SubscribeCallback): + def message(self, pubnub, message): + print(f"{message.publisher}> {message.message}\n") + + def presence(self, pubnub, presence): + print(f"-- {presence.uuid} {'joined' if presence.event == 'join' else 'left'} \n") + + def status(self, pubnub, status): + if status.is_error(): + print(f"! Error: {status.error_data}") + else: + print(f"* Status: {status.category.name}") + + +async def async_input(): + print() + await asyncio.sleep(0.1) + return (await asyncio.get_event_loop().run_in_executor(None, input)) + + +async def main(): + name = args.name if hasattr(args, "name") else input("Enter your name: ") + channel = args.channel if hasattr(args, "channel") else input("Enter the channel you want to join: ") + + print("Welcome to the chat room. Type 'exit' to leave the chat.") + + config = PNConfiguration() + config.subscribe_key = getenv("PN_KEY_SUBSCRIBE") + config.publish_key = getenv("PN_KEY_PUBLISH") + config.uuid = name + + pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager) + pubnub.add_listener(ExampleCallback()) + + pubnub.subscribe().channels(channel).with_presence().execute() + + while True: + message = await async_input() + print("\x1b[2K") + if message == "exit": + print("Goodbye!") + break + + await pubnub.publish().channel(channel).message(message).future() + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) diff --git a/pubnub/dtos.py b/pubnub/dtos.py index ae0220b0..047714a0 100644 --- a/pubnub/dtos.py +++ b/pubnub/dtos.py @@ -10,6 +10,18 @@ def __init__(self, channels=None, channel_groups=None, presence_enabled=None, ti self.presence_enabled = presence_enabled self.timetoken = timetoken + @property + def channels_with_pressence(self): + if not self.presence_enabled: + return self.channels + return self.channels + [ch + '-pnpres' for ch in self.channels] + + @property + def groups_with_pressence(self): + if not self.presence_enabled: + return self.channel_groups + return self.channel_groups + [ch + '-pnpres' for ch in self.channel_groups] + class UnsubscribeOperation(object): def __init__(self, channels=None, channel_groups=None): @@ -19,6 +31,18 @@ def __init__(self, channels=None, channel_groups=None): self.channels = channels self.channel_groups = channel_groups + def get_subscribed_channels(self, channels, with_presence=False) -> list: + result = [ch for ch in channels if ch not in self.channels and not ch.endswith('-pnpres')] + if not with_presence: + return result + return result + [ch + '-pnpres' for ch in result] + + def get_subscribed_channel_groups(self, channel_groups, with_presence=False) -> list: + result = [grp for grp in channel_groups if grp not in self.channel_groups and not grp.endswith('-pnpres')] + if not with_presence: + return result + return result + [grp + '-pnpres' for grp in result] + class StateOperation(object): def __init__(self, channels=None, channel_groups=None, state=None): diff --git a/pubnub/endpoints/presence/heartbeat.py b/pubnub/endpoints/presence/heartbeat.py index 20ea60e8..f8bb42e2 100644 --- a/pubnub/endpoints/presence/heartbeat.py +++ b/pubnub/endpoints/presence/heartbeat.py @@ -52,6 +52,9 @@ def custom_params(self): if self._state is not None and len(self._state) > 0: params['state'] = utils.url_write(self._state) + if hasattr(self.pubnub, '_subscription_manager'): + params.update(self.pubnub._subscription_manager.get_custom_params()) + return params def create_response(self, envelope): diff --git a/pubnub/endpoints/presence/leave.py b/pubnub/endpoints/presence/leave.py index 0023a859..113150e8 100644 --- a/pubnub/endpoints/presence/leave.py +++ b/pubnub/endpoints/presence/leave.py @@ -36,6 +36,9 @@ def custom_params(self): if len(self._groups) > 0: params['channel-group'] = utils.join_items(self._groups) + if hasattr(self.pubnub, '_subscription_manager'): + params.update(self.pubnub._subscription_manager.get_custom_params()) + return params def build_path(self): diff --git a/pubnub/endpoints/pubsub/subscribe.py b/pubnub/endpoints/pubsub/subscribe.py index 5f5300bd..7209aa42 100644 --- a/pubnub/endpoints/pubsub/subscribe.py +++ b/pubnub/endpoints/pubsub/subscribe.py @@ -18,6 +18,7 @@ def __init__(self, pubnub): self._filter_expression = None self._timetoken = None self._with_presence = None + self._state = None def channels(self, channels): utils.extend_list(self._channels, channels) @@ -44,6 +45,10 @@ def region(self, region): return self + def state(self, state): + self._state = state + return self + def http_method(self): return HttpMethod.GET @@ -75,6 +80,12 @@ def custom_params(self): if not self.pubnub.config.heartbeat_default_values: params['heartbeat'] = self.pubnub.config.presence_timeout + if self._state is not None and len(self._state) > 0: + params['state'] = utils.url_write(self._state) + + if hasattr(self.pubnub, '_subscription_manager'): + params.update(self.pubnub._subscription_manager.get_custom_params()) + return params def create_response(self, envelope): diff --git a/pubnub/enums.py b/pubnub/enums.py index 5dddd2c6..7603fb68 100644 --- a/pubnub/enums.py +++ b/pubnub/enums.py @@ -19,7 +19,7 @@ def string(cls, method): return "PATCH" -class PNStatusCategory(object): +class PNStatusCategory(Enum): PNUnknownCategory = 1 PNAcknowledgmentCategory = 2 PNAccessDeniedCategory = 3 diff --git a/pubnub/event_engine/containers.py b/pubnub/event_engine/containers.py new file mode 100644 index 00000000..7f53708c --- /dev/null +++ b/pubnub/event_engine/containers.py @@ -0,0 +1,15 @@ +class PresenceStateContainer: + channel_states: dict + + def __init__(self): + self.channel_states = {} + + def register_state(self, state: dict, channels: list): + for channel in channels: + self.channel_states[channel] = state + + def get_state(self, channels: list): + return {channel: self.channel_states[channel] for channel in channels if channel in self.channel_states} + + def get_channels_states(self, channels: list): + return {channel: self.channel_states[channel] for channel in channels if channel in self.channel_states} diff --git a/pubnub/event_engine/dispatcher.py b/pubnub/event_engine/dispatcher.py index 74340e72..5424b809 100644 --- a/pubnub/event_engine/dispatcher.py +++ b/pubnub/event_engine/dispatcher.py @@ -1,42 +1,42 @@ -from pubnub.event_engine.models import effects -from pubnub.event_engine import manage_effects +from pubnub.event_engine.models import invocations +from pubnub.event_engine import effects class Dispatcher: _pubnub = None - _managed_effects_factory = None + _effects_factory = None def __init__(self, event_engine) -> None: self._event_engine = event_engine self._managed_effects = {} - self._effect_emitter = manage_effects.EmitEffect() + self._effect_emitter = effects.EmitEffect() def set_pn(self, pubnub_instance): self._pubnub = pubnub_instance self._effect_emitter.set_pn(pubnub_instance) - def dispatch_effect(self, effect: effects.PNEffect): - if not self._managed_effects_factory: - self._managed_effects_factory = manage_effects.ManagedEffectFactory(self._pubnub, self._event_engine) + def dispatch_effect(self, invocation: invocations.PNInvocation): + if not self._effects_factory: + self._effects_factory = effects.EffectFactory(self._pubnub, self._event_engine) - if isinstance(effect, effects.PNEmittableEffect): - self.emit_effect(effect) + if isinstance(invocation, invocations.PNEmittableInvocation): + self.emit_effect(invocation) - elif isinstance(effect, effects.PNManageableEffect): - self.dispatch_managed_effect(effect) + elif isinstance(invocation, invocations.PNManageableInvocation): + self.dispatch_managed_effect(invocation) - elif isinstance(effect, effects.PNCancelEffect): - self.dispatch_cancel_effect(effect) + elif isinstance(invocation, invocations.PNCancelInvocation): + self.dispatch_cancel_effect(invocation) - def emit_effect(self, effect: effects.PNEffect): + def emit_effect(self, effect: invocations.PNInvocation): self._effect_emitter.emit(effect) - def dispatch_managed_effect(self, effect: effects.PNEffect): - managed_effect = self._managed_effects_factory.create(effect) - managed_effect.run() - self._managed_effects[effect.__class__.__name__] = managed_effect + def dispatch_managed_effect(self, invocation: invocations.PNInvocation): + effect = self._effects_factory.create(invocation) + effect.run() + self._managed_effects[invocation.__class__.__name__] = effect - def dispatch_cancel_effect(self, effect: effects.PNEffect): - if effect.cancel_effect in self._managed_effects: - self._managed_effects[effect.cancel_effect].stop() - del self._managed_effects[effect.cancel_effect] + def dispatch_cancel_effect(self, invocation: invocations.PNInvocation): + if invocation.cancel_effect in self._managed_effects: + self._managed_effects[invocation.cancel_effect].stop() + del self._managed_effects[invocation.cancel_effect] diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py new file mode 100644 index 00000000..a6a7ce43 --- /dev/null +++ b/pubnub/event_engine/effects.py @@ -0,0 +1,431 @@ +import asyncio +import logging +import math + +from typing import Optional, Union +from pubnub.endpoints.presence.heartbeat import Heartbeat +from pubnub.endpoints.presence.leave import Leave +from pubnub.endpoints.pubsub.subscribe import Subscribe +from pubnub.enums import PNReconnectionPolicy +from pubnub.exceptions import PubNubException +from pubnub.features import feature_enabled +from pubnub.models.server.subscribe import SubscribeMessage +from pubnub.pubnub import PubNub +from pubnub.event_engine.models import events, invocations +from pubnub.models.consumer.common import PNStatus +from pubnub.workers import BaseMessageWorker + + +class Effect: + pubnub: PubNub = None + event_engine = None + invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation] + stop_event = None + logger: logging.Logger + task: asyncio.Task + + def set_pn(self, pubnub: PubNub): + self.pubnub = pubnub + + def __init__(self, pubnub_instance, event_engine_instance, + invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None: + self.invocation = invocation + self.event_engine = event_engine_instance + self.pubnub = pubnub_instance + + self.logger = logging.getLogger("pubnub") + + def run(self): + pass + + def run_async(self, coro): + loop: asyncio.AbstractEventLoop = self.pubnub.event_loop + if loop.is_running(): + self.task = loop.create_task(coro, name=self.__class__.__name__) + else: + self.task = loop.run_until_complete(coro) + + def stop(self): + if self.stop_event: + self.logger.debug(f'stop_event({id(self.stop_event)}).set() called on {self.__class__.__name__}') + self.stop_event.set() + if hasattr(self, 'task') and isinstance(self.task, asyncio.Task) and not self.task.cancelled(): + self.task.cancel() + + def get_new_stop_event(self): + event = asyncio.Event() + self.logger.debug(f'creating new stop_event({id(event)}) for {self.__class__.__name__}') + return event + + def calculate_reconnection_delay(self, attempts): + if self.reconnection_policy is PNReconnectionPolicy.LINEAR: + delay = self.interval + + elif self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: + delay = int(math.pow(2, attempts - 5 * math.floor((attempts - 1) / 5)) - 1) + return delay + + +class HandshakeEffect(Effect): + def run(self): + channels = self.invocation.channels + groups = self.invocation.groups + tt = self.invocation.timetoken or 0 + if hasattr(self.pubnub, 'event_loop'): + self.stop_event = self.get_new_stop_event() + self.run_async(self.handshake_async(channels=channels, + groups=groups, + timetoken=tt, + stop_event=self.stop_event)) + + async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0): + request = Subscribe(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event) + + if feature_enabled('PN_MAINTAIN_PRESENCE_STATE') and hasattr(self.pubnub, 'state_container'): + state_container = self.pubnub.state_container + request.state(state_container.get_state(channels)) + + request.timetoken(0) + response = await request.future() + + if isinstance(response, PubNubException): + self.logger.warning(f'Handshake failed: {str(response)}') + handshake_failure = events.HandshakeFailureEvent(str(response), 1, timetoken=timetoken) + self.event_engine.trigger(handshake_failure) + elif response.status.error: + self.logger.warning(f'Handshake failed: {response.status.error_data.__dict__}') + handshake_failure = events.HandshakeFailureEvent(response.status.error_data, 1, timetoken=timetoken) + self.event_engine.trigger(handshake_failure) + else: + cursor = response.result['t'] + timetoken = timetoken if timetoken > 0 else cursor['t'] + region = cursor['r'] + handshake_success = events.HandshakeSuccessEvent(timetoken, region) + self.event_engine.trigger(handshake_success) + + +class ReceiveMessagesEffect(Effect): + invocation: invocations.ReceiveMessagesInvocation + + def run(self): + channels = self.invocation.channels + groups = self.invocation.groups + timetoken = self.invocation.timetoken + region = self.invocation.region + + if hasattr(self.pubnub, 'event_loop'): + self.stop_event = self.get_new_stop_event() + self.run_async(self.receive_messages_async(channels, groups, timetoken, region)) + + async def receive_messages_async(self, channels, groups, timetoken, region): + request = Subscribe(self.pubnub) + if channels: + request.channels(channels) + if groups: + request.channel_groups(groups) + if timetoken: + request.timetoken(timetoken) + if region: + request.region(region) + + request.cancellation_event(self.stop_event) + response = await request.future() + + if response.status is None and response.result is None: + self.logger.warning('Recieve messages failed: Empty response') + recieve_failure = events.ReceiveFailureEvent('Empty response', 1, timetoken=timetoken) + self.event_engine.trigger(recieve_failure) + elif response.status.error: + self.logger.warning(f'Recieve messages failed: {response.status.error_data.__dict__}') + recieve_failure = events.ReceiveFailureEvent(response.status.error_data, 1, timetoken=timetoken) + self.event_engine.trigger(recieve_failure) + else: + cursor = response.result['t'] + timetoken = cursor['t'] + region = cursor['r'] + messages = response.result['m'] + recieve_success = events.ReceiveSuccessEvent(timetoken, region=region, messages=messages) + self.event_engine.trigger(recieve_success) + self.stop_event.set() + + +class ReconnectEffect(Effect): + invocation: invocations.ReconnectInvocation + reconnection_policy: PNReconnectionPolicy + + def __init__(self, pubnub_instance, event_engine_instance, + invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None: + super().__init__(pubnub_instance, event_engine_instance, invocation) + self.reconnection_policy = pubnub_instance.config.reconnect_policy + self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries + self.interval = pubnub_instance.config.RECONNECTION_INTERVAL + self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF + self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF + + def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0): + self.logger.error(f"GiveUp called on Unspecific event. Reason: {reason}, Attempt: {attempt} TT:{timetoken}") + raise PubNubException('Unspecified Invocation') + + def failure(self, reason: PubNubException, attempt: int, timetoken: int = 0): + self.logger.error(f"Failure called on Unspecific event. Reason: {reason}, Attempt: {attempt} TT:{timetoken}") + raise PubNubException('Unspecified Invocation') + + def success(self, timetoken: str, region: Optional[int] = None, **kwargs): + self.logger.error(f"Success called on Unspecific event. TT:{timetoken}, Reg: {region}, KWARGS: {kwargs.keys()}") + raise PubNubException('Unspecified Invocation') + + def run(self): + if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts: + self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts) + else: + attempts = self.invocation.attempts + delay = self.calculate_reconnection_delay(attempts) + self.logger.warning(f'will reconnect in {delay}s') + if hasattr(self.pubnub, 'event_loop'): + self.run_async(self.delayed_reconnect_async(delay, attempts)) + + async def delayed_reconnect_async(self, delay, attempt): + self.stop_event = self.get_new_stop_event() + await asyncio.sleep(delay) + + request = Subscribe(self.pubnub).timetoken(self.get_timetoken()).cancellation_event(self.stop_event) + + if self.invocation.channels: + request.channels(self.invocation.channels) + if self.invocation.groups: + request.channel_groups(self.invocation.groups) + + if self.invocation.region: + request.region(self.invocation.region) + + if feature_enabled('PN_MAINTAIN_PRESENCE_STATE') and hasattr(self.pubnub, 'state_container'): + state_container = self.pubnub.state_container + request.state(state_container.get_state(self.invocation.channels)) + + response = await request.future() + + if isinstance(response, PubNubException): + self.logger.warning(f'Reconnect failed: {str(response)}') + self.failure(str(response), attempt, self.get_timetoken()) + + elif response.status.error: + self.logger.warning(f'Reconnect failed: {response.status.error_data.__dict__}') + self.failure(response.status.error_data, attempt, self.get_timetoken()) + else: + cursor = response.result['t'] + timetoken = int(self.invocation.timetoken) if self.invocation.timetoken else cursor['t'] + region = cursor['r'] + messages = response.result['m'] + self.success(timetoken=timetoken, region=region, messages=messages) + + def stop(self): + self.logger.debug(f'stop called on {self.__class__.__name__}') + if self.stop_event: + self.logger.debug(f'stop_event({id(self.stop_event)}).set() called on {self.__class__.__name__}') + self.stop_event.set() + if self.task: + try: + self.task.cancel() + except asyncio.exceptions.CancelledError: + pass + + +class HandshakeReconnectEffect(ReconnectEffect): + def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0): + self.event_engine.trigger( + events.HandshakeReconnectGiveupEvent(reason, attempt, timetoken) + ) + + def failure(self, reason: PubNubException, attempt: int, timetoken: int = 0): + self.event_engine.trigger( + events.HandshakeReconnectFailureEvent(reason, attempt, timetoken) + ) + + def success(self, timetoken: str, region: Optional[int] = None, **kwargs): + self.event_engine.trigger( + events.HandshakeReconnectSuccessEvent(timetoken, region) + ) + + def get_timetoken(self): + return 0 + + +class ReceiveReconnectEffect(ReconnectEffect): + def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0): + self.event_engine.trigger( + events.ReceiveReconnectGiveupEvent(reason, attempt, timetoken) + ) + + def failure(self, reason: PubNubException, attempt: int, timetoken: int = 0): + self.event_engine.trigger( + events.ReceiveReconnectFailureEvent(reason, attempt, timetoken) + ) + + def success(self, timetoken: str, region: Optional[int] = None, messages=None): + + self.event_engine.trigger( + events.ReceiveReconnectSuccessEvent(timetoken=timetoken, region=region, messages=messages) + ) + + def get_timetoken(self): + return int(self.invocation.timetoken) + + +class HeartbeatEffect(Effect): + def run(self): + channels = self.invocation.channels + groups = self.invocation.groups + if hasattr(self.pubnub, 'event_loop'): + self.stop_event = self.get_new_stop_event() + self.run_async(self.heartbeat(channels=channels, groups=groups, stop_event=self.stop_event)) + + async def heartbeat(self, channels, groups, stop_event): + request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event) + + if feature_enabled('PN_MAINTAIN_PRESENCE_STATE') and hasattr(self.pubnub, 'state_container'): + state_container = self.pubnub.state_container + request.state(state_container.get_state(self.invocation.channels)) + + response = await request.future() + + if isinstance(response, PubNubException): + self.logger.warning(f'Heartbeat failed: {str(response)}') + self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, + reason=response.status.error_data, attempt=1)) + elif response.status.error: + self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}') + self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, + reason=response.status.error_data, attempt=1)) + else: + self.event_engine.trigger(events.HeartbeatSuccessEvent(channels=channels, groups=groups)) + + +class HeartbeatWaitEffect(Effect): + def __init__(self, pubnub_instance, event_engine_instance, invocation: invocations.HeartbeatWaitInvocation) -> None: + super().__init__(pubnub_instance, event_engine_instance, invocation) + self.heartbeat_interval = pubnub_instance.config.heartbeat_interval + + def run(self): + if hasattr(self.pubnub, 'event_loop'): + self.stop_event = self.get_new_stop_event() + self.run_async(self.heartbeat_wait(self.heartbeat_interval, stop_event=self.stop_event)) + + async def heartbeat_wait(self, wait_time: int, stop_event): + try: + await asyncio.sleep(wait_time) + self.event_engine.trigger(events.HeartbeatTimesUpEvent()) + except asyncio.CancelledError: + pass + + +class HeartbeatLeaveEffect(Effect): + def run(self): + channels = self.invocation.channels + groups = self.invocation.groups + if hasattr(self.pubnub, 'event_loop'): + self.stop_event = self.get_new_stop_event() + self.run_async(self.leave(channels=channels, groups=groups, stop_event=self.stop_event)) + + async def leave(self, channels, groups, stop_event): + leave_request = Leave(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event) + leave = await leave_request.future() + + if leave.status.error: + self.logger.warning(f'Heartbeat failed: {leave.status.error_data.__dict__}') + + +class HeartbeatDelayedEffect(Effect): + def __init__(self, pubnub_instance, event_engine_instance, + invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None: + super().__init__(pubnub_instance, event_engine_instance, invocation) + self.reconnection_policy = pubnub_instance.config.reconnect_policy + self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries + self.interval = pubnub_instance.config.RECONNECTION_INTERVAL + self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF + self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF + + def run(self): + if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts: + self.event_engine.trigger(events.HeartbeatGiveUpEvent(channels=self.invocation.channels, + groups=self.invocation.groups, + reason=self.invocation.reason, + attempt=self.invocation.attempts)) + + if hasattr(self.pubnub, 'event_loop'): + self.stop_event = self.get_new_stop_event() + self.run_async(self.heartbeat(channels=self.invocation.channels, groups=self.invocation.groups, + attempt=self.invocation.attempts, stop_event=self.stop_event)) + + async def heartbeat(self, channels, groups, attempt, stop_event): + if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts: + self.event_engine.trigger(events.HeartbeatGiveUpEvent(channels=self.invocation.channels, + groups=self.invocation.groups, + reason=self.invocation.reason, + attempt=self.invocation.attempts)) + request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event) + delay = self.calculate_reconnection_delay(attempt) + self.logger.warning(f'Will retry to Heartbeat in {delay}s') + await asyncio.sleep(delay) + + response = await request.future() + if isinstance(response, PubNubException): + self.logger.warning(f'Heartbeat failed: {str(response)}') + self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, + reason=response.status.error_data, + attempt=attempt)) + elif response.status.error: + self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}') + self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, + reason=response.status.error_data, + attempt=attempt)) + else: + self.event_engine.trigger(events.HeartbeatSuccessEvent(channels=channels, groups=groups)) + + +class EffectFactory: + _managed_invocations = { + invocations.HandshakeInvocation.__name__: HandshakeEffect, + invocations.ReceiveMessagesInvocation.__name__: ReceiveMessagesEffect, + invocations.HandshakeReconnectInvocation.__name__: HandshakeReconnectEffect, + invocations.ReceiveReconnectInvocation.__name__: ReceiveReconnectEffect, + invocations.HeartbeatInvocation.__name__: HeartbeatEffect, + invocations.HeartbeatWaitInvocation.__name__: HeartbeatWaitEffect, + invocations.HeartbeatDelayedHeartbeatInvocation.__name__: HeartbeatDelayedEffect, + invocations.HeartbeatLeaveInvocation.__name__: HeartbeatLeaveEffect, + } + + def __init__(self, pubnub_instance, event_engine_instance) -> None: + self._pubnub = pubnub_instance + self._event_engine = event_engine_instance + + def create(self, invocation: invocations.PNInvocation) -> Effect: + if invocation.__class__.__name__ not in self._managed_invocations: + raise PubNubException(errormsg=f"Unhandled Invocation: {invocation.__class__.__name__}") + return self._managed_invocations[invocation.__class__.__name__](self._pubnub, self._event_engine, invocation) + + +class EmitEffect: + pubnub: PubNub + message_worker: BaseMessageWorker + + def set_pn(self, pubnub: PubNub): + self.pubnub = pubnub + self.message_worker = BaseMessageWorker(pubnub) + + def emit(self, invocation: invocations.PNEmittableInvocation): + if isinstance(invocation, invocations.EmitMessagesInvocation): + self.emit_message(invocation) + if isinstance(invocation, invocations.EmitStatusInvocation): + self.emit_status(invocation) + + def emit_message(self, invocation: invocations.EmitMessagesInvocation): + self.message_worker._listener_manager = self.pubnub._subscription_manager._listener_manager + for message in invocation.messages: + subscribe_message = SubscribeMessage().from_json(message) + self.message_worker._process_incoming_payload(subscribe_message) + + def emit_status(self, invocation: invocations.EmitStatusInvocation): + pn_status = PNStatus() + pn_status.category = invocation.status + pn_status.error = False + self.pubnub._subscription_manager._listener_manager.announce_status(pn_status) diff --git a/pubnub/event_engine/manage_effects.py b/pubnub/event_engine/manage_effects.py deleted file mode 100644 index 00746205..00000000 --- a/pubnub/event_engine/manage_effects.py +++ /dev/null @@ -1,315 +0,0 @@ -import asyncio -import logging -import math - -from typing import Optional, Union -from pubnub.endpoints.pubsub.subscribe import Subscribe -from pubnub.enums import PNReconnectionPolicy -from pubnub.exceptions import PubNubException -from pubnub.models.consumer.pubsub import PNMessageResult -from pubnub.models.server.subscribe import SubscribeMessage -from pubnub.pubnub import PubNub -from pubnub.event_engine.models import effects, events -from pubnub.models.consumer.common import PNStatus - - -class ManagedEffect: - pubnub: PubNub = None - event_engine = None - effect: Union[effects.PNManageableEffect, effects.PNCancelEffect] - stop_event = None - logger: logging.Logger - - def set_pn(self, pubnub: PubNub): - self.pubnub = pubnub - - def __init__(self, pubnub_instance, event_engine_instance, - effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]) -> None: - self.effect = effect - self.event_engine = event_engine_instance - self.pubnub = pubnub_instance - - self.logger = logging.getLogger("pubnub") - - def run(self): - pass - - def run_async(self): - pass - - def stop(self): - if self.stop_event: - self.logger.debug(f'stop_event({id(self.stop_event)}).set() called on {self.__class__.__name__}') - self.stop_event.set() - - def get_new_stop_event(self): - event = asyncio.Event() - self.logger.debug(f'creating new stop_event({id(event)}) for {self.__class__.__name__}') - return event - - -class ManageHandshakeEffect(ManagedEffect): - def run(self): - channels = self.effect.channels - groups = self.effect.groups - tt = self.effect.timetoken or 0 - if hasattr(self.pubnub, 'event_loop'): - self.stop_event = self.get_new_stop_event() - - loop: asyncio.AbstractEventLoop = self.pubnub.event_loop - coro = self.handshake_async(channels=channels, groups=groups, timetoken=tt, stop_event=self.stop_event) - if loop.is_running(): - loop.create_task(coro) - else: - loop.run_until_complete(coro) - else: - # TODO: the synchronous way - pass - - async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0): - request = Subscribe(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event) - request.timetoken(0) - handshake = await request.future() - - if handshake.status.error: - self.logger.warning(f'Handshake failed: {handshake.status.error_data.__dict__}') - handshake_failure = events.HandshakeFailureEvent(handshake.status.error_data, 1, timetoken=timetoken) - self.event_engine.trigger(handshake_failure) - else: - cursor = handshake.result['t'] - timetoken = timetoken if timetoken > 0 else cursor['t'] - region = cursor['r'] - handshake_success = events.HandshakeSuccessEvent(timetoken, region) - self.event_engine.trigger(handshake_success) - - -class ManagedReceiveMessagesEffect(ManagedEffect): - effect: effects.ReceiveMessagesEffect - - def run(self): - channels = self.effect.channels - groups = self.effect.groups - timetoken = self.effect.timetoken - region = self.effect.region - - if hasattr(self.pubnub, 'event_loop'): - self.stop_event = self.get_new_stop_event() - loop: asyncio.AbstractEventLoop = self.pubnub.event_loop - coro = self.receive_messages_async(channels, groups, timetoken, region) - if loop.is_running(): - loop.create_task(coro) - else: - loop.run_until_complete(coro) - else: - # TODO: the synchronous way - pass - - async def receive_messages_async(self, channels, groups, timetoken, region): - subscribe = Subscribe(self.pubnub) - if channels: - subscribe.channels(channels) - if groups: - subscribe.channel_groups(groups) - if timetoken: - subscribe.timetoken(timetoken) - if region: - subscribe.region(region) - - subscribe.cancellation_event(self.stop_event) - response = await subscribe.future() - - if response.status is None and response.result is None: - self.logger.warning('Recieve messages failed: Empty response') - recieve_failure = events.ReceiveFailureEvent('Empty response', 1, timetoken=timetoken) - self.event_engine.trigger(recieve_failure) - elif response.status.error: - self.logger.warning(f'Recieve messages failed: {response.status.error_data.__dict__}') - recieve_failure = events.ReceiveFailureEvent(response.status.error_data, 1, timetoken=timetoken) - self.event_engine.trigger(recieve_failure) - else: - cursor = response.result['t'] - timetoken = cursor['t'] - region = cursor['r'] - messages = response.result['m'] - recieve_success = events.ReceiveSuccessEvent(timetoken, region=region, messages=messages) - self.event_engine.trigger(recieve_success) - self.stop_event.set() - - -class ManagedReconnectEffect(ManagedEffect): - effect: effects.ReconnectEffect - reconnection_policy: PNReconnectionPolicy - - def __init__(self, pubnub_instance, event_engine_instance, - effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]) -> None: - super().__init__(pubnub_instance, event_engine_instance, effect) - self.reconnection_policy = pubnub_instance.config.reconnect_policy - self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries - self.interval = pubnub_instance.config.RECONNECTION_INTERVAL - self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF - self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF - - def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0): - self.logger.error(f"GiveUp called on Unspecific event. Reason: {reason}, Attempt: {attempt} TT:{timetoken}") - raise PubNubException('Unspecified Effect') - - def failure(self, reason: PubNubException, attempt: int, timetoken: int = 0): - self.logger.error(f"Failure called on Unspecific event. Reason: {reason}, Attempt: {attempt} TT:{timetoken}") - raise PubNubException('Unspecified Effect') - - def success(self, timetoken: str, region: Optional[int] = None, **kwargs): - self.logger.error(f"Success called on Unspecific event. TT:{timetoken}, Reg: {region}, KWARGS: {kwargs.keys()}") - raise PubNubException('Unspecified Effect') - - def calculate_reconnection_delay(self, attempts): - if self.reconnection_policy is PNReconnectionPolicy.LINEAR: - delay = self.interval - - elif self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: - delay = int(math.pow(2, attempts - 5 * math.floor((attempts - 1) / 5)) - 1) - return delay - - def run(self): - if self.reconnection_policy is PNReconnectionPolicy.NONE or self.effect.attempts > self.max_retry_attempts: - self.give_up(reason=self.effect.reason, attempt=self.effect.attempts) - else: - attempts = self.effect.attempts - delay = self.calculate_reconnection_delay(attempts) - self.logger.warning(f'will reconnect in {delay}s') - if hasattr(self.pubnub, 'event_loop'): - loop: asyncio.AbstractEventLoop = self.pubnub.event_loop - coro = self.delayed_reconnect_async(delay, attempts) - if loop.is_running(): - self.delayed_reconnect_coro = loop.create_task(coro) - else: - self.delayed_reconnect_coro = loop.run_until_complete(coro) - else: - # TODO: the synchronous way - pass - - async def delayed_reconnect_async(self, delay, attempt): - self.stop_event = self.get_new_stop_event() - await asyncio.sleep(delay) - - request = Subscribe(self.pubnub) \ - .channels(self.effect.channels) \ - .channel_groups(self.effect.groups) \ - .timetoken(self.get_timetoken()) \ - .cancellation_event(self.stop_event) - - if self.effect.region: - request.region(self.effect.region) - - reconnect = await request.future() - - if reconnect.status.error: - self.logger.warning(f'Reconnect failed: {reconnect.status.error_data.__dict__}') - self.failure(reconnect.status.error_data, attempt, self.get_timetoken()) - else: - cursor = reconnect.result['t'] - timetoken = int(self.effect.timetoken) if self.effect.timetoken else cursor['t'] - region = cursor['r'] - messages = reconnect.result['m'] - self.success(timetoken=timetoken, region=region, messages=messages) - - def stop(self): - self.logger.debug(f'stop called on {self.__class__.__name__}') - if self.stop_event: - self.logger.debug(f'stop_event({id(self.stop_event)}).set() called on {self.__class__.__name__}') - self.stop_event.set() - if self.delayed_reconnect_coro: - try: - self.delayed_reconnect_coro.cancel() - except asyncio.exceptions.CancelledError: - pass - - -class ManagedHandshakeReconnectEffect(ManagedReconnectEffect): - def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0): - self.event_engine.trigger( - events.HandshakeReconnectGiveupEvent(reason, attempt, timetoken) - ) - - def failure(self, reason: PubNubException, attempt: int, timetoken: int = 0): - self.event_engine.trigger( - events.HandshakeReconnectFailureEvent(reason, attempt, timetoken) - ) - - def success(self, timetoken: str, region: Optional[int] = None, **kwargs): - self.event_engine.trigger( - events.HandshakeReconnectSuccessEvent(timetoken, region) - ) - - def get_timetoken(self): - return 0 - - -class ManagedReceiveReconnectEffect(ManagedReconnectEffect): - def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0): - self.event_engine.trigger( - events.ReceiveReconnectGiveupEvent(reason, attempt, timetoken) - ) - - def failure(self, reason: PubNubException, attempt: int, timetoken: int = 0): - self.event_engine.trigger( - events.ReceiveReconnectFailureEvent(reason, attempt, timetoken) - ) - - def success(self, timetoken: str, region: Optional[int] = None, messages=None): - - self.event_engine.trigger( - events.ReceiveReconnectSuccessEvent(timetoken=timetoken, region=region, messages=messages) - ) - - def get_timetoken(self): - return int(self.effect.timetoken) - - -class ManagedEffectFactory: - _managed_effects = { - effects.HandshakeEffect.__name__: ManageHandshakeEffect, - effects.ReceiveMessagesEffect.__name__: ManagedReceiveMessagesEffect, - effects.HandshakeReconnectEffect.__name__: ManagedHandshakeReconnectEffect, - effects.ReceiveReconnectEffect.__name__: ManagedReceiveReconnectEffect, - } - - def __init__(self, pubnub_instance, event_engine_instance) -> None: - self._pubnub = pubnub_instance - self._event_engine = event_engine_instance - - def create(self, effect: ManagedEffect): - if effect.__class__.__name__ not in self._managed_effects: - raise PubNubException(errormsg="Unhandled manage effect") - return self._managed_effects[effect.__class__.__name__](self._pubnub, self._event_engine, effect) - - -class EmitEffect: - pubnub: PubNub - - def set_pn(self, pubnub: PubNub): - self.pubnub = pubnub - - def emit(self, effect: effects.PNEmittableEffect): - if isinstance(effect, effects.EmitMessagesEffect): - self.emit_message(effect) - if isinstance(effect, effects.EmitStatusEffect): - self.emit_status(effect) - - def emit_message(self, effect: effects.EmitMessagesEffect): - for message in effect.messages: - subscribe_message = SubscribeMessage().from_json(message) - pn_message_result = PNMessageResult( - message=subscribe_message.payload, - subscription=subscribe_message.subscription_match, - channel=subscribe_message.channel, - timetoken=int(message['p']['t']), - user_metadata=subscribe_message.publish_metadata, - publisher=subscribe_message.issuing_client_id - ) - self.pubnub._subscription_manager._listener_manager.announce_message(pn_message_result) - - def emit_status(self, effect: effects.EmitStatusEffect): - pn_status = PNStatus() - pn_status.category = effect.status - pn_status.error = False - self.pubnub._subscription_manager._listener_manager.announce_status(pn_status) diff --git a/pubnub/event_engine/models/effects.py b/pubnub/event_engine/models/effects.py deleted file mode 100644 index 3112584c..00000000 --- a/pubnub/event_engine/models/effects.py +++ /dev/null @@ -1,95 +0,0 @@ -from typing import List, Union -from pubnub.exceptions import PubNubException -from pubnub.enums import PNStatusCategory - - -class PNEffect: - pass - - -class PNManageableEffect(PNEffect): - pass - - -class PNCancelEffect(PNEffect): - cancel_effect: str - - -class HandshakeEffect(PNManageableEffect): - def __init__(self, channels: Union[None, List[str]] = None, groups: Union[None, List[str]] = None, - timetoken: Union[None, int] = None) -> None: - super().__init__() - self.channels = channels - self.groups = groups - self.timetoken = timetoken - - -class CancelHandshakeEffect(PNCancelEffect): - cancel_effect = HandshakeEffect.__name__ - - -class ReceiveMessagesEffect(PNManageableEffect): - def __init__(self, - channels: Union[None, List[str]] = None, - groups: Union[None, List[str]] = None, - timetoken: Union[None, str] = None, - region: Union[None, int] = None - ) -> None: - super().__init__() - self.channels = channels - self.groups = groups - self.timetoken = timetoken - self.region = region - - -class CancelReceiveMessagesEffect(PNCancelEffect): - cancel_effect = ReceiveMessagesEffect.__name__ - - -class ReconnectEffect(PNManageableEffect): - def __init__(self, - channels: Union[None, List[str]] = None, - groups: Union[None, List[str]] = None, - timetoken: Union[None, str] = None, - region: Union[None, int] = None, - attempts: Union[None, int] = None, - reason: Union[None, PubNubException] = None - ) -> None: - self.channels = channels - self.groups = groups - self.attempts = attempts - self.reason = reason - self.timetoken = timetoken - self.region = region - - -class HandshakeReconnectEffect(ReconnectEffect): - pass - - -class CancelHandshakeReconnectEffect(PNCancelEffect): - cancel_effect = HandshakeReconnectEffect.__name__ - - -class ReceiveReconnectEffect(ReconnectEffect): - pass - - -class CancelReceiveReconnectEffect(PNCancelEffect): - cancel_effect = ReceiveReconnectEffect.__name__ - - -class PNEmittableEffect(PNEffect): - pass - - -class EmitMessagesEffect(PNEmittableEffect): - def __init__(self, messages: Union[None, List[str]]) -> None: - super().__init__() - self.messages = messages - - -class EmitStatusEffect(PNEmittableEffect): - def __init__(self, status: Union[None, PNStatusCategory]) -> None: - super().__init__() - self.status = status diff --git a/pubnub/event_engine/models/events.py b/pubnub/event_engine/models/events.py index 35821f82..6b926337 100644 --- a/pubnub/event_engine/models/events.py +++ b/pubnub/event_engine/models/events.py @@ -28,14 +28,17 @@ def __init__(self, channels: List[str], groups: List[str]) -> None: class SubscriptionChangedEvent(PNChannelGroupsEvent): - def __init__(self, channels: List[str], groups: List[str]) -> None: + def __init__(self, channels: List[str], groups: List[str], with_presence: Optional[bool] = None) -> None: PNChannelGroupsEvent.__init__(self, channels, groups) + self.with_presence = with_presence class SubscriptionRestoredEvent(PNCursorEvent, PNChannelGroupsEvent): - def __init__(self, timetoken: str, channels: List[str], groups: List[str], region: Optional[int] = None) -> None: + def __init__(self, timetoken: str, channels: List[str], groups: List[str], region: Optional[int] = None, + with_presence: Optional[bool] = None) -> None: PNCursorEvent.__init__(self, timetoken, region) PNChannelGroupsEvent.__init__(self, channels, groups) + self.with_presence = with_presence class HandshakeSuccessEvent(PNCursorEvent): @@ -97,3 +100,52 @@ class DisconnectEvent(PNEvent): class ReconnectEvent(PNEvent): pass + + +""" + Presence Events +""" + + +class HeartbeatJoinedEvent(PNChannelGroupsEvent): + pass + + +class HeartbeatReconnectEvent(PNEvent): + pass + + +class HeartbeatLeftAllEvent(PNEvent): + pass + + +class HeartbeatLeftEvent(PNChannelGroupsEvent): + def __init__(self, channels: List[str], groups: List[str], suppress_leave: bool = False) -> None: + PNChannelGroupsEvent.__init__(self, channels, groups) + self.suppress_leave = suppress_leave + + +class HeartbeatDisconnectEvent(PNChannelGroupsEvent): + pass + + +class HeartbeatSuccessEvent(PNChannelGroupsEvent): + pass + + +class HeartbeatFailureEvent(PNChannelGroupsEvent, PNFailureEvent): + def __init__(self, channels: List[str], groups: List[str], reason: PubNubException, attempt: int, + timetoken: int = 0) -> None: + PNChannelGroupsEvent.__init__(self, channels, groups) + PNFailureEvent.__init__(self, reason, attempt, timetoken) + + +class HeartbeatTimesUpEvent(PNEvent): + pass + + +class HeartbeatGiveUpEvent(PNChannelGroupsEvent, PNFailureEvent): + def __init__(self, channels: List[str], groups: List[str], reason: PubNubException, attempt: int, + timetoken: int = 0) -> None: + PNChannelGroupsEvent.__init__(self, channels, groups) + PNFailureEvent.__init__(self, reason, attempt, timetoken) diff --git a/pubnub/event_engine/models/invocations.py b/pubnub/event_engine/models/invocations.py new file mode 100644 index 00000000..6793739e --- /dev/null +++ b/pubnub/event_engine/models/invocations.py @@ -0,0 +1,143 @@ +from typing import List, Union +from pubnub.exceptions import PubNubException +from pubnub.enums import PNStatusCategory + + +class PNInvocation: + pass + + +class PNManageableInvocation(PNInvocation): + pass + + +class PNCancelInvocation(PNInvocation): + cancel_effect: str + + +class HandshakeInvocation(PNManageableInvocation): + def __init__(self, channels: Union[None, List[str]] = None, groups: Union[None, List[str]] = None, + timetoken: Union[None, int] = None) -> None: + super().__init__() + self.channels = channels + self.groups = groups + self.timetoken = timetoken + + +class CancelHandshakeInvocation(PNCancelInvocation): + cancel_effect = HandshakeInvocation.__name__ + + +class ReceiveMessagesInvocation(PNManageableInvocation): + def __init__(self, + channels: Union[None, List[str]] = None, + groups: Union[None, List[str]] = None, + timetoken: Union[None, str] = None, + region: Union[None, int] = None + ) -> None: + super().__init__() + self.channels = channels + self.groups = groups + self.timetoken = timetoken + self.region = region + + +class CancelReceiveMessagesInvocation(PNCancelInvocation): + cancel_effect = ReceiveMessagesInvocation.__name__ + + +class ReconnectInvocation(PNManageableInvocation): + def __init__(self, + channels: Union[None, List[str]] = None, + groups: Union[None, List[str]] = None, + timetoken: Union[None, str] = None, + region: Union[None, int] = None, + attempts: Union[None, int] = None, + reason: Union[None, PubNubException] = None + ) -> None: + self.channels = channels + self.groups = groups + self.attempts = attempts + self.reason = reason + self.timetoken = timetoken + self.region = region + + +class HandshakeReconnectInvocation(ReconnectInvocation): + pass + + +class CancelHandshakeReconnectInvocation(PNCancelInvocation): + cancel_effect = HandshakeReconnectInvocation.__name__ + + +class ReceiveReconnectInvocation(ReconnectInvocation): + pass + + +class CancelReceiveReconnectInvocation(PNCancelInvocation): + cancel_effect = ReceiveReconnectInvocation.__name__ + + +class PNEmittableInvocation(PNInvocation): + pass + + +class EmitMessagesInvocation(PNEmittableInvocation): + def __init__(self, messages: Union[None, List[str]]) -> None: + super().__init__() + self.messages = messages + + +class EmitStatusInvocation(PNEmittableInvocation): + def __init__(self, status: Union[None, PNStatusCategory]) -> None: + super().__init__() + self.status = status + + +""" + Presence Effects +""" + + +class HeartbeatInvocation(PNManageableInvocation): + def __init__(self, channels: Union[None, List[str]] = None, groups: Union[None, List[str]] = None) -> None: + super().__init__() + self.channels = channels + self.groups = groups + + +class HeartbeatWaitInvocation(PNManageableInvocation): + def __init__(self, time) -> None: + self.wait_time = time + super().__init__() + + +class HeartbeatCancelWaitInvocation(PNCancelInvocation): + cancel_effect = HeartbeatWaitInvocation.__name__ + + +class HeartbeatLeaveInvocation(PNManageableInvocation): + def __init__(self, channels: Union[None, List[str]] = None, groups: Union[None, List[str]] = None, + suppress_leave: bool = False) -> None: + super().__init__() + self.channels = channels + self.groups = groups + self.suppress_leave = suppress_leave + + +class HeartbeatDelayedHeartbeatInvocation(PNManageableInvocation): + def __init__(self, + channels: Union[None, List[str]] = None, + groups: Union[None, List[str]] = None, + attempts: Union[None, int] = None, + reason: Union[None, PubNubException] = None): + super().__init__() + self.channels = channels + self.groups = groups + self.attempts = attempts + self.reason = reason + + +class HeartbeatCancelDelayedHeartbeatInvocation(PNCancelInvocation): + cancel_effect = HeartbeatDelayedHeartbeatInvocation.__name__ diff --git a/pubnub/event_engine/models/states.py b/pubnub/event_engine/models/states.py index dc5b65e7..72acdfcd 100644 --- a/pubnub/event_engine/models/states.py +++ b/pubnub/event_engine/models/states.py @@ -1,6 +1,6 @@ from pubnub.enums import PNStatusCategory -from pubnub.event_engine.models import effects -from pubnub.event_engine.models.effects import PNEffect +from pubnub.event_engine.models import invocations +from pubnub.event_engine.models.invocations import PNInvocation from pubnub.event_engine.models import events from pubnub.exceptions import PubNubException from typing import List, Union @@ -13,6 +13,7 @@ class PNContext(dict): timetoken: str attempt: int reason: PubNubException + with_presence: bool = False def update(self, context): super().update(context.__dict__) @@ -41,16 +42,16 @@ def get_context(self) -> PNContext: class PNTransition: context: PNContext state: PNState - effect: Union[None, List[PNEffect]] + invocation: Union[None, List[PNInvocation]] def __init__(self, state: PNState, context: Union[None, PNContext] = None, - effect: Union[None, List[PNEffect]] = None, + invocation: Union[None, List[PNInvocation]] = None, ) -> None: self.context = context self.state = state - self.effect = effect + self.invocation = invocation class UnsubscribedState(PNState): @@ -67,6 +68,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = 0 return PNTransition( @@ -78,6 +80,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = event.timetoken self._context.region = event.region @@ -101,16 +104,19 @@ def __init__(self, context: PNContext) -> None: def on_enter(self, context: Union[None, PNContext]): self._context.update(context) super().on_enter(self._context) - return effects.HandshakeEffect(self._context.channels, self._context.groups, self._context.timetoken or 0) + return invocations.HandshakeInvocation(self._context.channels, + self._context.groups, + self._context.timetoken or 0) def on_exit(self): super().on_exit() - return effects.CancelHandshakeEffect() + return invocations.CancelHandshakeInvocation() def subscription_changed(self, event: events.SubscriptionChangedEvent, context: PNContext) -> PNTransition: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = 0 return PNTransition( @@ -122,6 +128,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.region = event.region if self._context.timetoken == 0: self._context.timetoken = event.timetoken @@ -161,7 +168,7 @@ def handshaking_success(self, event: events.HandshakeSuccessEvent, context: PNCo return PNTransition( state=ReceivingState, context=self._context, - effect=effects.EmitStatusEffect(PNStatusCategory.PNConnectedCategory) + invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNConnectedCategory) ) @@ -180,15 +187,15 @@ def __init__(self, context: PNContext) -> None: def on_enter(self, context: Union[None, PNContext]): self._context.update(context) super().on_enter(self._context) - return effects.HandshakeReconnectEffect(self._context.channels, - self._context.groups, - attempts=self._context.attempt, - reason=self._context.reason, - timetoken=self._context.timetoken) + return invocations.HandshakeReconnectInvocation(self._context.channels, + self._context.groups, + attempts=self._context.attempt, + reason=self._context.reason, + timetoken=self._context.timetoken) def on_exit(self): super().on_exit() - return effects.CancelHandshakeReconnectEffect() + return invocations.CancelHandshakeReconnectInvocation() def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition: self._context.update(context) @@ -202,6 +209,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = 0 return PNTransition( @@ -227,13 +235,14 @@ def give_up(self, event: events.HandshakeReconnectGiveupEvent, context: PNContex return PNTransition( state=HandshakeFailedState, context=self._context, - effect=effects.EmitStatusEffect(PNStatusCategory.PNDisconnectedCategory) + invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory) ) def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = event.timetoken self._context.region = event.region @@ -250,7 +259,7 @@ def success(self, event: events.HandshakeReconnectSuccessEvent, context: PNConte return PNTransition( state=ReceivingState, context=self._context, - effect=effects.EmitStatusEffect(PNStatusCategory.PNConnectedCategory, ) + invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNConnectedCategory, ) ) @@ -267,6 +276,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = 0 return PNTransition( @@ -286,6 +296,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = event.timetoken self._context.region = event.region @@ -329,18 +340,19 @@ def __init__(self, context: PNContext) -> None: def on_enter(self, context: Union[None, PNContext]): super().on_enter(context) - return effects.ReceiveMessagesEffect(context.channels, context.groups, timetoken=self._context.timetoken, - region=self._context.region) + return invocations.ReceiveMessagesInvocation(context.channels, context.groups, + timetoken=self._context.timetoken, region=self._context.region) def on_exit(self): super().on_exit() - return effects.CancelReceiveMessagesEffect() + return invocations.CancelReceiveMessagesInvocation() def subscription_changed(self, event: events.SubscriptionChangedEvent, context: PNContext) -> PNTransition: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups - self._context.timetoken = 0 + self._context.with_presence = event.with_presence + # self._context.timetoken = 0 # why we don't reset timetoken here? return PNTransition( state=self.__class__, @@ -351,6 +363,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = event.timetoken self._context.region = event.region @@ -367,7 +380,7 @@ def receiving_success(self, event: events.ReceiveSuccessEvent, context: PNContex return PNTransition( state=self.__class__, context=self._context, - effect=effects.EmitMessagesEffect(messages=event.messages), + invocation=invocations.EmitMessagesInvocation(messages=event.messages), ) def receiving_failure(self, event: events.ReceiveFailureEvent, context: PNContext) -> PNTransition: @@ -386,7 +399,7 @@ def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTra return PNTransition( state=ReceiveStoppedState, context=self._context, - effect=effects.EmitStatusEffect(PNStatusCategory.PNDisconnectedCategory) + invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory) ) def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition: @@ -413,12 +426,16 @@ def __init__(self, context: PNContext) -> None: def on_enter(self, context: Union[None, PNContext]): self._context.update(context) super().on_enter(self._context) - return effects.ReceiveReconnectEffect(self._context.channels, self._context.groups, self._context.timetoken, - self._context.region, self._context.attempt, self._context.reason) + return invocations.ReceiveReconnectInvocation(self._context.channels, + self._context.groups, + self._context.timetoken, + self._context.region, + self._context.attempt, + self._context.reason) def on_exit(self): super().on_exit() - return effects.CancelReceiveReconnectEffect() + return invocations.CancelReceiveReconnectInvocation() def reconnect_failure(self, event: events.ReceiveReconnectFailureEvent, context: PNContext) -> PNTransition: self._context.update(context) @@ -434,6 +451,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = 0 return PNTransition( @@ -457,7 +475,7 @@ def give_up(self, event: events.ReceiveReconnectGiveupEvent, context: PNContext) return PNTransition( state=ReceiveFailedState, context=self._context, - effect=effects.EmitStatusEffect(PNStatusCategory.PNDisconnectedCategory) + invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory) ) def reconnect_success(self, event: events.ReceiveReconnectSuccessEvent, context: PNContext) -> PNTransition: @@ -468,13 +486,14 @@ def reconnect_success(self, event: events.ReceiveReconnectSuccessEvent, context: return PNTransition( state=ReceivingState, context=self._context, - effect=effects.EmitMessagesEffect(event.messages) + invocation=invocations.EmitMessagesInvocation(event.messages) ) def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = event.timetoken self._context.region = event.region @@ -506,6 +525,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = 0 return PNTransition( @@ -525,6 +545,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = event.timetoken self._context.region = event.region @@ -550,3 +571,450 @@ def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTrans state=ReceiveReconnectingState, context=self._context ) + + +""" +Presence states +""" + + +class HeartbeatInactiveState(PNState): + def __init__(self, context: PNContext) -> None: + super().__init__(context) + + self._transitions = { + events.HeartbeatJoinedEvent.__name__: self.joined + } + + def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: + self._context.channels = event.channels + self._context.groups = event.groups + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + +class HeartbeatStoppedState(PNState): + def __init__(self, context: PNContext) -> None: + super().__init__(context) + + self._transitions = { + events.HeartbeatReconnectEvent.__name__: self.reconnect, + events.HeartbeatLeftAllEvent.__name__: self.left_all, + events.HeartbeatJoinedEvent.__name__: self.joined, + events.HeartbeatLeftEvent.__name__: self.left + } + + def reconnect(self, event: events.HeartbeatReconnectEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.channels = [] + self._context.groups = [] + + return PNTransition( + state=HeartbeatInactiveState, + context=self._context + ) + + def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatStoppedState, + context=self._context + ) + + def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: + self._context.update(context) + for channel in event.channels: + self._context.channels.remove(channel) + + for group in event.groups: + self._context.groups.remove(group) + + return PNTransition( + state=HeartbeatStoppedState, + context=self._context + ) + + +class HeartbeatFailedState(PNState): + def __init__(self, context: PNContext) -> None: + super().__init__(context) + + self._transitions = { + events.HeartbeatJoinedEvent.__name__: self.joined, + events.HeartbeatLeftEvent.__name__: self.left, + events.HeartbeatReconnectEvent.__name__: self.reconnect, + events.HeartbeatDisconnectEvent.__name__: self.disconnect, + events.HeartbeatLeftAllEvent.__name__: self.left_all + } + + def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: + self._context.update(context) + for channel in event.channels: + self._context.channels.remove(channel) + + for group in event.groups: + self._context.groups.remove(group) + + invocation = None + if not event.suppress_leave: + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) + + return PNTransition( + state=HeartbeatingState, + context=self._context, + invocation=invocation + ) + + def reconnect(self, event: events.HeartbeatReconnectEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + invocation = None + if not event.suppress_leave: + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) + + return PNTransition( + state=HeartbeatStoppedState, + context=self._context, + invocation=invocation + ) + + def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.channels = [] + self._context.groups = [] + + invocation = None + if not event.suppress_leave: + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) + + return PNTransition( + state=HeartbeatInactiveState, + context=self._context, + invocation=invocation + ) + + +class HeartbeatingState(PNState): + def __init__(self, context: PNContext) -> None: + super().__init__(context) + self._transitions = { + events.HeartbeatFailureEvent.__name__: self.failure, + events.HeartbeatDisconnectEvent.__name__: self.disconnect, + events.HeartbeatLeftAllEvent.__name__: self.left_all, + events.HeartbeatJoinedEvent.__name__: self.joined, + events.HeartbeatLeftEvent.__name__: self.left, + events.HeartbeatSuccessEvent.__name__: self.success + } + + def on_enter(self, context: Union[None, PNContext]): + self._context.update(context) + super().on_enter(self._context) + return invocations.HeartbeatInvocation(channels=self._context.channels, groups=self._context.groups) + + def failure(self, event: events.HeartbeatFailureEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.attempt = event.attempt + self._context.reason = event.reason + + return PNTransition( + state=HeartbeatReconnectingState, + context=self._context + ) + + def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + invocation = None + if not event.suppress_leave: + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) + + return PNTransition( + state=HeartbeatStoppedState, + context=self._context, + invocation=invocation + ) + + def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.channels = [] + self._context.groups = [] + + invocation = None + if not event.suppress_leave: + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) + + return PNTransition( + state=HeartbeatInactiveState, + context=self._context, + invocation=invocation + ) + + def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: + self._context.update(context) + for channel in event.channels: + self._context.channels.remove(channel) + + for group in event.groups: + self._context.groups.remove(group) + + invocation = None + if not event.suppress_leave: + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) + + return PNTransition( + state=HeartbeatingState, + context=self._context, + invocation=invocation + ) + + def success(self, event: events.HeartbeatSuccessEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.attempt = 0 + + return PNTransition( + state=HeartbeatCooldownState, + context=self._context + ) + + +class HeartbeatCooldownState(PNState): + def __init__(self, context: PNContext) -> None: + super().__init__(context) + self._transitions = { + events.HeartbeatJoinedEvent.__name__: self.joined, + events.HeartbeatLeftEvent.__name__: self.left, + events.HeartbeatTimesUpEvent.__name__: self.times_up, + events.HeartbeatDisconnectEvent.__name__: self.disconnect, + events.HeartbeatLeftAllEvent.__name__: self.left_all, + + } + + def on_enter(self, context: PNContext): + self._context.update(context) + super().on_enter(self._context) + return invocations.HeartbeatWaitInvocation(self._context) + + def on_exit(self): + super().on_exit() + return invocations.HeartbeatCancelWaitInvocation() + + def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + invocation = None + if not event.suppress_leave: + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) + + return PNTransition( + state=HeartbeatStoppedState, + context=self._context, + invocation=invocation + ) + + def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.channels = [] + self._context.groups = [] + + invocation = None + if not event.suppress_leave: + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) + + return PNTransition( + state=HeartbeatInactiveState, + context=self._context, + invocation=invocation + ) + + def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: + self._context.update(context) + for channel in event.channels: + self._context.channels.remove(channel) + + for group in event.groups: + self._context.groups.remove(group) + + invocation = None + if not event.suppress_leave: + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) + + return PNTransition( + state=HeartbeatingState, + context=self._context, + invocation=invocation + ) + + def times_up(self, event: events.HeartbeatTimesUpEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + +class HeartbeatReconnectingState(PNState): + def __init__(self, context: PNContext) -> None: + super().__init__(context) + self._transitions = { + events.HeartbeatFailureEvent.__name__: self.failure, + events.HeartbeatJoinedEvent.__name__: self.joined, + events.HeartbeatLeftEvent.__name__: self.left, + events.HeartbeatSuccessEvent.__name__: self.success, + events.HeartbeatGiveUpEvent.__name__: self.give_up, + events.HeartbeatDisconnectEvent.__name__: self.disconnect, + events.HeartbeatLeftAllEvent.__name__: self.left_all + } + + def on_enter(self, context: PNContext): + self._context.update(context) + super().on_enter(self._context) + + return invocations.HeartbeatDelayedHeartbeatInvocation(channels=self._context.channels, + groups=self._context.groups, + attempts=self._context.attempt, + reason=None) + + def on_exit(self): + super().on_exit() + return invocations.HeartbeatCancelDelayedHeartbeatInvocation() + + def failure(self, event: events.HeartbeatFailureEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.attempt = event.attempt + 1 + self._context.reason = event.reason + + return PNTransition( + state=HeartbeatReconnectingState, + context=self._context + ) + + def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: + self._context.update(context) + for channel in event.channels: + self._context.channels.remove(channel) + + for group in event.groups: + self._context.groups.remove(group) + + invocation = None + if not event.suppress_leave: + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) + + return PNTransition( + state=HeartbeatingState, + context=self._context, + invocation=invocation + ) + + def success(self, event: events.HeartbeatSuccessEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.attempt = 0 + + return PNTransition( + state=HeartbeatCooldownState, + context=self._context + ) + + def give_up(self, event: events.HeartbeatGiveUpEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.attempt = event.attempt + self._context.reason = event.reason + + return PNTransition( + state=HeartbeatFailedState, + context=self._context + ) + + def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + invocation = None + if not event.suppress_leave: + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) + + return PNTransition( + state=HeartbeatStoppedState, + context=self._context, + invocation=invocation + ) + + def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.channels = [] + self._context.groups = [] + + invocation = None + if not event.suppress_leave: + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) + + return PNTransition( + state=HeartbeatInactiveState, + context=self._context, + invocation=invocation + ) diff --git a/pubnub/event_engine/statemachine.py b/pubnub/event_engine/statemachine.py index 4373bf9d..41c0b327 100644 --- a/pubnub/event_engine/statemachine.py +++ b/pubnub/event_engine/statemachine.py @@ -2,26 +2,28 @@ from typing import List, Optional -from pubnub.event_engine.models import effects, events, states +from pubnub.event_engine.models import events, invocations, states from pubnub.event_engine.dispatcher import Dispatcher class StateMachine: _current_state: states.PNState _context: states.PNContext - _effect_list: List[effects.PNEffect] + _invocations: List[invocations.PNInvocation] _enabled: bool - def __init__(self, initial_state: states.PNState, dispatcher_class: Optional[Dispatcher] = None) -> None: + def __init__(self, initial_state: states.PNState, dispatcher_class: Optional[Dispatcher] = None, + name: Optional[str] = None) -> None: self._context = states.PNContext() self._current_state = initial_state(self._context) self._listeners = {} - self._effect_list = [] + self._invocations = [] if dispatcher_class is None: dispatcher_class = Dispatcher self._dispatcher = dispatcher_class(self) self._enabled = True - self.logger = logging.getLogger("pubnub") + self._name = name + self.logger = logging.getLogger("pubnub" if not name else f"pubnub.{name}") def __del__(self): self.logger.debug('Shutting down StateMachine') @@ -37,6 +39,7 @@ def get_dispatcher(self) -> Dispatcher: return self._dispatcher def trigger(self, event: events.PNEvent) -> states.PNTransition: + self.logger.debug(f'Current State: {self.get_state_name()}') self.logger.debug(f'Triggered event: {event.__class__.__name__}({event.__dict__}) on {self.get_state_name()}') if not self._enabled: @@ -44,46 +47,48 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition: return False if event.get_name() in self._current_state._transitions: - self._effect_list.clear() - effect = self._current_state.on_exit() + self._invocations.clear() + invocation = self._current_state.on_exit() - if effect: - self.logger.debug(f'Invoke effect: {effect.__class__.__name__} {effect.__dict__}') - self._effect_list.append(effect) + if invocation: + self.logger.debug(f'Invoke effect: {invocation.__class__.__name__}') + self._invocations.append(invocation) transition: states.PNTransition = self._current_state.on(event, self._context) self._current_state = transition.state(self._current_state.get_context()) self._context = transition.context - if transition.effect: - if isinstance(transition.effect, list): + if transition.invocation: + if isinstance(transition.invocation, list): self.logger.debug('unpacking list') - for effect in transition.effect: - self.logger.debug(f'Invoke effect: {effect.__class__.__name__}') - self._effect_list.append(effect) + for invocation in transition.invocation: + self.logger.debug(f'Invoke effect: {invocation.__class__.__name__}') + self._invocations.append(invocation) else: - self.logger.debug(f'Invoke effect: {transition.effect.__class__.__name__}{effect.__dict__}') - self._effect_list.append(transition.effect) + self.logger.debug(f'Invoke effect: {transition.invocation.__class__.__name__}') + self._invocations.append(transition.invocation) - effect = self._current_state.on_enter(self._context) + invocation = self._current_state.on_enter(self._context) - if effect: - self.logger.debug(f'Invoke effect: {effect.__class__.__name__} StateMachine ({id(self)})') - self._effect_list.append(effect) + if invocation: + self.logger.debug(f'Invoke effect: {invocation.__class__.__name__}') + self._invocations.append(invocation) else: - message = f'Unhandled event: {event.__class__.__name__} in {self._current_state.__class__.__name__}' - self.logger.warning(message) - self.stop() + self.logger.warning(f'Unhandled event: {event.get_name()} in {self.get_state_name()}') self.dispatch_effects() def dispatch_effects(self): - for effect in self._effect_list: - self.logger.debug(f'dispatching {effect.__class__.__name__} {id(effect)}') - self._dispatcher.dispatch_effect(effect) + for invocation in self._invocations: + self.logger.debug(f'Dispatching {invocation.__class__.__name__} {id(invocation)}') + self._dispatcher.dispatch_effect(invocation) - self._effect_list.clear() + self._invocations.clear() def stop(self): self._enabled = False + + @property + def name(self): + return self._name diff --git a/pubnub/features.py b/pubnub/features.py index 95d5fc7e..d0e8c333 100644 --- a/pubnub/features.py +++ b/pubnub/features.py @@ -2,7 +2,9 @@ from pubnub.exceptions import PubNubException flags = { - 'PN_ENABLE_ENTITIES': getenv('PN_ENABLE_ENTITIES', False) + 'PN_ENABLE_ENTITIES': getenv('PN_ENABLE_ENTITIES', False), + 'PN_ENABLE_EVENT_ENGINE': getenv('PN_ENABLE_EVENT_ENGINE', False), + 'PN_MAINTAIN_PRESENCE_STATE': getenv('PN_MAINTAIN_PRESENCE_STATE', False), } @@ -18,3 +20,7 @@ def inner(method): return not_implemented return method return inner + + +def feature_enabled(flag): + return flags[flag] diff --git a/pubnub/managers.py b/pubnub/managers.py index 181e122d..785b75e4 100644 --- a/pubnub/managers.py +++ b/pubnub/managers.py @@ -365,6 +365,9 @@ def _handle_endpoint_call(self, raw_result, status): def _register_heartbeat_timer(self): self._stop_heartbeat_timer() + def get_custom_params(self): + return {} + class TelemetryManager: TIMESTAMP_DIVIDER = 1000 diff --git a/pubnub/pubnub_asyncio.py b/pubnub/pubnub_asyncio.py index 450c3efb..d47eb40c 100644 --- a/pubnub/pubnub_asyncio.py +++ b/pubnub/pubnub_asyncio.py @@ -8,6 +8,7 @@ from asyncio import Event, Queue, Semaphore from yarl import URL +from pubnub.event_engine.containers import PresenceStateContainer from pubnub.event_engine.models import events, states from pubnub.models.consumer.common import PNStatus @@ -16,6 +17,7 @@ from pubnub.endpoints.presence.heartbeat import Heartbeat from pubnub.endpoints.presence.leave import Leave from pubnub.endpoints.pubsub.subscribe import Subscribe +from pubnub.features import feature_enabled from pubnub.pubnub_core import PubNubCore from pubnub.workers import SubscribeMessageWorker from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager, TelemetryManager @@ -47,7 +49,9 @@ def __init__(self, config, custom_event_loop=None, subscription_manager=None): self._connector = aiohttp.TCPConnector(verify_ssl=True, loop=self.event_loop) if not subscription_manager: - subscription_manager = AsyncioSubscriptionManager + subscription_manager = ( + EventEngineSubscriptionManager if feature_enabled('PN_ENABLE_EVENT_ENGINE') + else AsyncioSubscriptionManager) if self.config.enable_subscribe: self._subscription_manager = subscription_manager(self) @@ -56,10 +60,6 @@ def __init__(self, config, custom_event_loop=None, subscription_manager=None): self._telemetry_manager = AsyncioTelemetryManager() - def __del__(self): - if self.event_loop.is_running(): - self.event_loop.create_task(self.close_session()) - async def close_pending_tasks(self, tasks): await asyncio.gather(*tasks) await asyncio.sleep(0.1) @@ -87,9 +87,9 @@ async def set_connector(self, cn): ) async def stop(self): - await self.close_session() if self._subscription_manager: self._subscription_manager.stop() + await self.close_session() def sdk_platform(self): return "-Asyncio" @@ -558,14 +558,20 @@ class EventEngineSubscriptionManager(SubscriptionManager): loop: asyncio.AbstractEventLoop def __init__(self, pubnub_instance): - self.event_engine = StateMachine(states.UnsubscribedState) + self.state_container = PresenceStateContainer() + self.event_engine = StateMachine(states.UnsubscribedState, + name="subscribe") + self.presence_engine = StateMachine(states.HeartbeatInactiveState, + name="presence") self.event_engine.get_dispatcher().set_pn(pubnub_instance) + self.presence_engine.get_dispatcher().set_pn(pubnub_instance) self.loop = asyncio.new_event_loop() - + pubnub_instance.state_container = self.state_container super().__init__(pubnub_instance) def stop(self): self.event_engine.stop() + self.presence_engine.stop() def adapt_subscribe_builder(self, subscribe_operation: SubscribeOperation): if not isinstance(subscribe_operation, SubscribeOperation): @@ -573,22 +579,52 @@ def adapt_subscribe_builder(self, subscribe_operation: SubscribeOperation): if subscribe_operation.timetoken: subscription_event = events.SubscriptionRestoredEvent( - channels=subscribe_operation.channels, - groups=subscribe_operation.channel_groups, - timetoken=subscribe_operation.timetoken + channels=subscribe_operation.channels_with_pressence, + groups=subscribe_operation.groups_with_pressence, + timetoken=subscribe_operation.timetoken, + with_presence=subscribe_operation.presence_enabled ) else: subscription_event = events.SubscriptionChangedEvent( - channels=subscribe_operation.channels, - groups=subscribe_operation.channel_groups + channels=subscribe_operation.channels_with_pressence, + groups=subscribe_operation.groups_with_pressence, + with_presence=subscribe_operation.presence_enabled ) self.event_engine.trigger(subscription_event) + if self._pubnub.config._heartbeat_interval > 0: + self.presence_engine.trigger(events.HeartbeatJoinedEvent( + channels=subscribe_operation.channels, + groups=subscribe_operation.channel_groups + )) def adapt_unsubscribe_builder(self, unsubscribe_operation): if not isinstance(unsubscribe_operation, UnsubscribeOperation): raise PubNubException('Invalid Unsubscribe Operation') - event = events.SubscriptionChangedEvent(None, None) - self.event_engine.trigger(event) + + channels = unsubscribe_operation.get_subscribed_channels( + self.event_engine.get_context().channels, + self.event_engine.get_context().with_presence) + + groups = unsubscribe_operation.get_subscribed_channel_groups( + self.event_engine.get_context().groups, + self.event_engine.get_context().with_presence) + + self.event_engine.trigger(events.SubscriptionChangedEvent(channels=channels, groups=groups)) + + self.presence_engine.trigger(event=events.HeartbeatLeftEvent( + channels=unsubscribe_operation.channels, + groups=unsubscribe_operation.channel_groups, + suppress_leave=self._pubnub.config.suppress_leave_events + )) + + def adapt_state_builder(self, state_operation): + self.state_container.register_state(state_operation.state, + state_operation.channels, + state_operation.channel_groups) + return super().adapt_state_builder(state_operation) + + def get_custom_params(self): + return {'ee': 1} class AsyncioSubscribeMessageWorker(SubscribeMessageWorker): diff --git a/pubnub/pubnub_core.py b/pubnub/pubnub_core.py index fc55059b..d6ef1a10 100644 --- a/pubnub/pubnub_core.py +++ b/pubnub/pubnub_core.py @@ -85,14 +85,13 @@ class PubNubCore: """A base class for PubNub Python API implementations""" - SDK_VERSION = "7.3.2" + SDK_VERSION = "7.4.0" SDK_NAME = "PubNub-Python" TIMESTAMP_DIVIDER = 1000 MAX_SEQUENCE = 65535 __metaclass__ = ABCMeta - _plugins = [] __crypto = None def __init__(self, config): diff --git a/pubnub/workers.py b/pubnub/workers.py index 81eb5b78..70a18d30 100644 --- a/pubnub/workers.py +++ b/pubnub/workers.py @@ -1,48 +1,38 @@ import logging from abc import abstractmethod - -from .enums import PNStatusCategory, PNOperationType -from .models.consumer.common import PNStatus -from .models.consumer.objects_v2.channel import PNChannelMetadataResult -from .models.consumer.objects_v2.memberships import PNMembershipResult -from .models.consumer.objects_v2.uuid import PNUUIDMetadataResult -from .models.consumer.pn_error_data import PNErrorData -from .utils import strip_right -from .models.consumer.pubsub import ( +from typing import Union + +from pubnub.enums import PNStatusCategory, PNOperationType +from pubnub.managers import ListenerManager +from pubnub.models.consumer.common import PNStatus +from pubnub.models.consumer.objects_v2.channel import PNChannelMetadataResult +from pubnub.models.consumer.objects_v2.memberships import PNMembershipResult +from pubnub.models.consumer.objects_v2.uuid import PNUUIDMetadataResult +from pubnub.models.consumer.pn_error_data import PNErrorData +from pubnub.utils import strip_right +from pubnub.models.consumer.pubsub import ( PNPresenceEventResult, PNMessageResult, PNSignalMessageResult, PNMessageActionResult, PNFileMessageResult ) -from .models.server.subscribe import SubscribeMessage, PresenceEnvelope -from .endpoints.file_operations.get_file_url import GetFileDownloadUrl +from pubnub.models.server.subscribe import SubscribeMessage, PresenceEnvelope +from pubnub.endpoints.file_operations.get_file_url import GetFileDownloadUrl logger = logging.getLogger("pubnub") -class SubscribeMessageWorker(object): +class BaseMessageWorker: + # _pubnub: PubNub + _listener_manager: Union[ListenerManager, None] = None + TYPE_MESSAGE = 0 TYPE_SIGNAL = 1 TYPE_OBJECT = 2 TYPE_MESSAGE_ACTION = 3 TYPE_FILE_MESSAGE = 4 - def __init__(self, pubnub_instance, listener_manager_instance, queue_instance, event): - # assert isinstance(pubnub_instnace, PubNubCore) - # assert isinstance(listener_manager_instance, ListenerManager) - # assert isinstance(queue_instance, utils.Queue) - + def __init__(self, pubnub_instance) -> None: self._pubnub = pubnub_instance - self._listener_manager = listener_manager_instance - self._queue = queue_instance - self._is_running = None - self._event = event - - def run(self): - self._take_message() - - @abstractmethod - def _take_message(self): - pass def _get_url_for_file_event_message(self, channel, extracted_message): return GetFileDownloadUrl(self._pubnub)\ @@ -55,10 +45,7 @@ def _process_message(self, message_input): return message_input, None else: try: - return self._pubnub.config.crypto.decrypt( - self._pubnub.config.cipher_key, - message_input - ), None + return self._pubnub.crypto.decrypt(message_input), None except Exception as exception: logger.warning("could not decrypt message: \"%s\", due to error %s" % (message_input, str(exception))) @@ -67,10 +54,41 @@ def _process_message(self, message_input): pn_status.error_data = PNErrorData(str(exception), exception) pn_status.error = True pn_status.operation = PNOperationType.PNSubscribeOperation - self._listener_manager.announce_status(pn_status) + self.announce(pn_status) return message_input, exception - def _process_incoming_payload(self, message): + def announce(self, result): + if not self._listener_manager: + return + + if isinstance(result, PNStatus): + self._listener_manager.announce_status(result) + + elif isinstance(result, PNPresenceEventResult): + self._listener_manager.announce_presence(result) + + elif isinstance(result, PNChannelMetadataResult): + self._listener_manager.announce_channel(result) + + elif isinstance(result, PNUUIDMetadataResult): + self._listener_manager.announce_uuid(result) + + elif isinstance(result, PNMembershipResult): + self._listener_manager.announce_membership(result) + + elif isinstance(result, PNFileMessageResult): + self._listener_manager.announce_file_message(result) + + elif isinstance(result, PNSignalMessageResult): + self._listener_manager.announce_signal(result) + + elif isinstance(result, PNMessageActionResult): + self._listener_manager.announce_message_action(result) + + elif isinstance(result, PNMessageResult): + self._listener_manager.announce_message(result) + + def _process_incoming_payload(self, message: SubscribeMessage): assert isinstance(message, SubscribeMessage) channel = message.channel @@ -105,26 +123,35 @@ def _process_incoming_payload(self, message): leave=message.payload.get('leave', None), timeout=message.payload.get('timeout', None) ) - self._listener_manager.announce_presence(pn_presence_event_result) + + self.announce(pn_presence_event_result) + return pn_presence_event_result + elif message.type == SubscribeMessageWorker.TYPE_OBJECT: if message.payload['type'] == 'channel': channel_result = PNChannelMetadataResult( event=message.payload['event'], data=message.payload['data'] ) - self._listener_manager.announce_channel(channel_result) + self.announce(channel_result) + return channel_result + elif message.payload['type'] == 'uuid': uuid_result = PNUUIDMetadataResult( event=message.payload['event'], data=message.payload['data'] ) - self._listener_manager.announce_uuid(uuid_result) + self.announce(uuid_result) + return uuid_result + elif message.payload['type'] == 'membership': membership_result = PNMembershipResult( event=message.payload['event'], data=message.payload['data'] ) - self._listener_manager.announce_membership(membership_result) + self.announce(membership_result) + return membership_result + elif message.type == SubscribeMessageWorker.TYPE_FILE_MESSAGE: extracted_message, _ = self._process_message(message.payload) download_url = self._get_url_for_file_event_message(channel, extracted_message) @@ -139,8 +166,8 @@ def _process_incoming_payload(self, message): file_id=extracted_message["file"]["id"], file_name=extracted_message["file"]["name"] ) - - self._listener_manager.announce_file_message(pn_file_result) + self.announce(pn_file_result) + return pn_file_result else: extracted_message, error = self._process_message(message.payload) @@ -157,7 +184,8 @@ def _process_incoming_payload(self, message): timetoken=publish_meta_data.publish_timetoken, publisher=publisher ) - self._listener_manager.announce_signal(pn_signal_result) + self.announce(pn_signal_result) + return pn_signal_result elif message.type == SubscribeMessageWorker.TYPE_MESSAGE_ACTION: message_action = extracted_message['data'] @@ -176,4 +204,24 @@ def _process_incoming_payload(self, message): publisher=publisher, error=error ) - self._listener_manager.announce_message(pn_message_result) + self.announce(pn_message_result) + return pn_message_result + + +class SubscribeMessageWorker(BaseMessageWorker): + def __init__(self, pubnub_instance, listener_manager_instance, queue_instance, event): + # assert isinstance(pubnub_instnace, PubNubCore) + # assert isinstance(listener_manager_instance, ListenerManager) + # assert isinstance(queue_instance, utils.Queue) + super().__init__(pubnub_instance) + self._listener_manager = listener_manager_instance + self._queue = queue_instance + self._is_running = None + self._event = event + + def run(self): + self._take_message() + + @abstractmethod + def _take_message(self): + pass diff --git a/setup.py b/setup.py index cf20f2d2..60d39266 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='pubnub', - version='7.3.2', + version='7.4.0', description='PubNub Real-time push service in the cloud', author='PubNub', author_email='support@pubnub.com', diff --git a/tests/acceptance/subscribe/environment.py b/tests/acceptance/subscribe/environment.py index 4700ef12..dea2c0c7 100644 --- a/tests/acceptance/subscribe/environment.py +++ b/tests/acceptance/subscribe/environment.py @@ -1,3 +1,4 @@ +import asyncio import requests from behave.runner import Context @@ -40,7 +41,10 @@ def before_scenario(context: Context, feature): def after_scenario(context: Context, feature): - context.pubnub.unsubscribe_all() + loop = asyncio.get_event_loop() + loop.run_until_complete(context.pubnub.stop()) + loop.run_until_complete(asyncio.sleep(0.1)) + for tag in feature.tags: if "contract" in tag: response = requests.get(MOCK_SERVER_URL + CONTRACT_EXPECT_ENDPOINT) @@ -48,5 +52,5 @@ def after_scenario(context: Context, feature): response_json = response.json() - assert not response_json["expectations"]["failed"] - assert not response_json["expectations"]["pending"] + assert not response_json["expectations"]["failed"], str(response_json["expectations"]["failed"]) + assert not response_json["expectations"]["pending"], str(response_json["expectations"]["pending"]) diff --git a/tests/acceptance/subscribe/steps/given_steps.py b/tests/acceptance/subscribe/steps/given_steps.py index f33905a0..9f5e6b9d 100644 --- a/tests/acceptance/subscribe/steps/given_steps.py +++ b/tests/acceptance/subscribe/steps/given_steps.py @@ -11,7 +11,7 @@ @given("the demo keyset with event engine enabled") def step_impl(context: PNContext): context.log_stream = StringIO() - logger = logging.getLogger('pubnub') + logger = logging.getLogger('pubnub').getChild('subscribe') logger.setLevel(logging.DEBUG) logger.handlers = [] logger.addHandler(logging.StreamHandler(context.log_stream)) @@ -19,6 +19,7 @@ def step_impl(context: PNContext): context.pn_config = pnconf_env_acceptance_copy() context.pn_config.enable_subscribe = True context.pn_config.reconnect_policy = PNReconnectionPolicy.NONE + context.pn_config.set_presence_timeout(0) context.pubnub = PubNubAsyncio(context.pn_config, subscription_manager=EventEngineSubscriptionManager) context.callback = AcceptanceCallback() @@ -29,3 +30,42 @@ def step_impl(context: PNContext): def step_impl(context: PNContext, max_retries: str): context.pubnub.config.reconnect_policy = PNReconnectionPolicy.LINEAR context.pubnub.config.maximum_reconnection_retries = int(max_retries) + + +""" +Presence engine step definitions +""" + + +@given("the demo keyset with Presence EE enabled") +def step_impl(context: PNContext): + context.log_stream_pubnub = StringIO() + logger = logging.getLogger('pubnub') + logger.setLevel(logging.DEBUG) + logger.handlers = [] + logger.addHandler(logging.StreamHandler(context.log_stream_pubnub)) + + context.log_stream = StringIO() + logger = logging.getLogger('pubnub').getChild('presence') + logger.setLevel(logging.DEBUG) + logger.handlers = [] + logger.addHandler(logging.StreamHandler(context.log_stream)) + + context.pn_config = pnconf_env_acceptance_copy() + context.pn_config.enable_subscribe = True + context.pn_config.enable_presence_heartbeat = True + context.pn_config.reconnect_policy = PNReconnectionPolicy.LINEAR + context.pn_config.subscribe_request_timeout = 10 + context.pn_config.RECONNECTION_INTERVAL = 2 + context.pn_config.set_presence_timeout(3) + context.pubnub = PubNubAsyncio(context.pn_config, subscription_manager=EventEngineSubscriptionManager) + + context.callback = AcceptanceCallback() + context.pubnub.add_listener(context.callback) + + +@given("heartbeatInterval set to '{interval}', timeout set to '{timeout}'" + " and suppressLeaveEvents set to '{suppress_leave}'") +def step_impl(context: PNContext, interval: str, timeout: str, suppress_leave: str): + context.pn_config.set_presence_timeout_with_custom_interval(int(timeout), int(interval)) + context.pn_config.suppress_leave_events = True if suppress_leave == 'true' else False diff --git a/tests/acceptance/subscribe/steps/then_steps.py b/tests/acceptance/subscribe/steps/then_steps.py index 522c0775..26c84c63 100644 --- a/tests/acceptance/subscribe/steps/then_steps.py +++ b/tests/acceptance/subscribe/steps/then_steps.py @@ -1,3 +1,4 @@ +import asyncio import re import busypie @@ -11,60 +12,122 @@ @then("I receive the message in my subscribe response") @async_run_until_complete -async def step_impl(context: PNContext): - try: - await busypie.wait() \ - .at_most(15) \ - .poll_delay(1) \ - .poll_interval(1) \ - .until_async(lambda: context.callback.message_result) - except Exception: - import ipdb - ipdb.set_trace() - - response = context.callback.message_result +async def step_impl(ctx: PNContext): + await busypie.wait() \ + .at_most(15) \ + .poll_delay(1) \ + .poll_interval(1) \ + .until_async(lambda: ctx.callback.message_result) + + response = ctx.callback.message_result assert isinstance(response, PNMessageResult) assert response.message is not None - await context.pubnub.stop() + await ctx.pubnub.stop() @then("I observe the following") @async_run_until_complete -async def step_impl(context): +async def step_impl(ctx): def parse_log_line(line: str): line_type = 'event' if line.startswith('Triggered event') else 'invocation' - m = re.search('([A-Za-z])+(Event|Effect)', line) - name = m.group(0).replace('Effect', '').replace('Event', '') - name = name.replace('Effect', '').replace('Event', '') + m = re.search('([A-Za-z])+(Event|Invocation)', line) + name = m.group(0).replace('Invocation', '').replace('Event', '') + name = name.replace('Invocation', '').replace('Event', '') name = re.sub(r'([A-Z])', r'_\1', name).upper().lstrip('_') return (line_type, name) normalized_log = [parse_log_line(log_line) for log_line in list(filter( lambda line: line.startswith('Triggered event') or line.startswith('Invoke effect'), - context.log_stream.getvalue().splitlines() + ctx.log_stream.getvalue().splitlines() ))] - try: - for index, expected in enumerate(context.table): - logged_type, logged_name = normalized_log[index] - expected_type, expected_name = expected - assert expected_type == logged_type, f'on line {index + 1} => {expected_type} != {logged_type}' - assert expected_name == logged_name, f'on line {index + 1} => {expected_name} != {logged_name}' - except Exception as e: - import ipdb - ipdb.set_trace() - raise e + for index, expected in enumerate(ctx.table): + logged_type, logged_name = normalized_log[index] + expected_type, expected_name = expected + assert expected_type == logged_type, f'on line {index + 1} => {expected_type} != {logged_type}' + assert expected_name == logged_name, f'on line {index + 1} => {expected_name} != {logged_name}' @then("I receive an error in my subscribe response") @async_run_until_complete -async def step_impl(context: PNContext): +async def step_impl(ctx: PNContext): await busypie.wait() \ .at_most(15) \ .poll_delay(1) \ .poll_interval(1) \ - .until_async(lambda: context.callback.status_result) + .until_async(lambda: ctx.callback.status_result) - status = context.callback.status_result + status = ctx.callback.status_result assert isinstance(status, PNStatus) assert status.category == PNStatusCategory.PNDisconnectedCategory - await context.pubnub.stop() + await ctx.pubnub.stop() + + +""" +Presence engine step definitions +""" + + +@then("I wait '{wait_time}' seconds") +@async_run_until_complete +async def step_impl(ctx: PNContext, wait_time: str): + await asyncio.sleep(int(wait_time)) + + +@then(u'I observe the following Events and Invocations of the Presence EE') +@async_run_until_complete +async def step_impl(ctx): + def parse_log_line(line: str): + line_type = 'event' if line.startswith('Triggered event') else 'invocation' + m = re.search('([A-Za-z])+(Event|Invocation)', line) + name = m.group(0).replace('Invocation', '').replace('Event', '') + name = name.replace('Invocation', '').replace('Event', '').replace('GiveUp', 'Giveup') + name = re.sub(r'([A-Z])', r'_\1', name).upper().lstrip('_') + + if name not in ['HEARTBEAT', 'HEARTBEAT_FAILURE', 'HEARTBEAT_SUCCESS', 'HEARTBEAT_GIVEUP']: + name = name.replace('HEARTBEAT_', '') + return (line_type, name) + + normalized_log = [parse_log_line(log_line) for log_line in list(filter( + lambda line: line.startswith('Triggered event') or line.startswith('Invoke effect'), + ctx.log_stream.getvalue().splitlines() + ))] + + assert len(normalized_log) >= len(list(ctx.table)), f'Log lenght mismatch!' \ + f'Expected {len(list(ctx.table))}, but got {len(normalized_log)}:\n {normalized_log}' + + for index, expected in enumerate(ctx.table): + logged_type, logged_name = normalized_log[index] + expected_type, expected_name = expected + assert expected_type == logged_type, f'on line {index + 1} => {expected_type} != {logged_type}' + assert expected_name == logged_name, f'on line {index + 1} => {expected_name} != {logged_name}' + + +@then(u'I wait for getting Presence joined events') +@async_run_until_complete +async def step_impl(context: PNContext): + await busypie.wait() \ + .at_most(15) \ + .poll_delay(1) \ + .poll_interval(1) \ + .until_async(lambda: context.callback.presence_result) + + +@then(u'I receive an error in my heartbeat response') +@async_run_until_complete +async def step_impl(ctx): + await busypie.wait() \ + .at_most(20) \ + .poll_delay(3) \ + .until_async(lambda: 'HeartbeatGiveUpEvent' in ctx.log_stream.getvalue()) + + +@then("I leave '{channel1}' and '{channel2}' channels with presence") +@async_run_until_complete +async def step_impl(context, channel1, channel2): + context.pubnub.unsubscribe().channels([channel1, channel2]).execute() + + +@then(u'I don\'t observe any Events and Invocations of the Presence EE') +@async_run_until_complete +async def step_impl(context): + assert len(context.log_stream.getvalue().splitlines()) == 0 diff --git a/tests/acceptance/subscribe/steps/when_steps.py b/tests/acceptance/subscribe/steps/when_steps.py index b48f1187..63f4ffab 100644 --- a/tests/acceptance/subscribe/steps/when_steps.py +++ b/tests/acceptance/subscribe/steps/when_steps.py @@ -1,16 +1,32 @@ from behave import when +from behave.api.async_step import async_run_until_complete from tests.acceptance.subscribe.environment import PNContext, AcceptanceCallback @when('I subscribe') def step_impl(context: PNContext): - print(f'WHEN I subscribe {id(context.pubnub)}') context.pubnub.subscribe().channels('foo').execute() @when('I subscribe with timetoken {timetoken}') def step_impl(context: PNContext, timetoken: str): # noqa F811 - print(f'WHEN I subscribe with TT {id(context.pubnub)}') callback = AcceptanceCallback() context.pubnub.add_listener(callback) context.pubnub.subscribe().channels('foo').with_timetoken(int(timetoken)).execute() + + +""" +Presence engine step definitions +""" + + +@when("I join '{channel1}', '{channel2}', '{channel3}' channels") +@async_run_until_complete +async def step_impl(context, channel1, channel2, channel3): + context.pubnub.subscribe().channels([channel1, channel2, channel3]).execute() + + +@when("I join '{channel1}', '{channel2}', '{channel3}' channels with presence") +@async_run_until_complete +async def step_impl(context, channel1, channel2, channel3): + context.pubnub.subscribe().channels([channel1, channel2, channel3]).with_presence().execute() diff --git a/tests/functional/event_engine/test_emitable_effect.py b/tests/functional/event_engine/test_emitable_effect.py index 92c764be..0469e589 100644 --- a/tests/functional/event_engine/test_emitable_effect.py +++ b/tests/functional/event_engine/test_emitable_effect.py @@ -1,20 +1,20 @@ from unittest.mock import patch -from pubnub.event_engine import manage_effects -from pubnub.event_engine.models import effects +from pubnub.event_engine import effects +from pubnub.event_engine.models import invocations from pubnub.event_engine.dispatcher import Dispatcher from pubnub.event_engine.models.states import UnsubscribedState from pubnub.event_engine.statemachine import StateMachine def test_dispatch_emit_messages_effect(): - with patch.object(manage_effects.EmitEffect, 'emit_message') as mocked_emit_message: + with patch.object(effects.EmitEffect, 'emit_message') as mocked_emit_message: dispatcher = Dispatcher(StateMachine(UnsubscribedState)) - dispatcher.dispatch_effect(effects.EmitMessagesEffect(['chan'])) + dispatcher.dispatch_effect(invocations.EmitMessagesInvocation(['chan'])) mocked_emit_message.assert_called() def test_dispatch_emit_status_effect(): - with patch.object(manage_effects.EmitEffect, 'emit_status') as mocked_emit_status: + with patch.object(effects.EmitEffect, 'emit_status') as mocked_emit_status: dispatcher = Dispatcher(StateMachine(UnsubscribedState)) - dispatcher.dispatch_effect(effects.EmitStatusEffect(['chan'])) + dispatcher.dispatch_effect(invocations.EmitStatusInvocation(['chan'])) mocked_emit_status.assert_called() diff --git a/tests/functional/event_engine/test_managed_effect.py b/tests/functional/event_engine/test_managed_effect.py index 26c46530..c59049d2 100644 --- a/tests/functional/event_engine/test_managed_effect.py +++ b/tests/functional/event_engine/test_managed_effect.py @@ -1,10 +1,16 @@ +import pytest +import asyncio + from unittest.mock import patch from pubnub.enums import PNReconnectionPolicy -from pubnub.event_engine import manage_effects -from pubnub.event_engine.models import effects +from pubnub.event_engine import effects +from pubnub.event_engine.models import invocations from pubnub.event_engine.dispatcher import Dispatcher +from pubnub.event_engine.models import states from pubnub.event_engine.models.states import UnsubscribedState from pubnub.event_engine.statemachine import StateMachine +from pubnub.pubnub_asyncio import PubNubAsyncio +from tests.helper import pnconf_env_copy class FakeConfig: @@ -21,64 +27,76 @@ def __init__(self) -> None: def test_dispatch_run_handshake_effect(): - with patch.object(manage_effects.ManageHandshakeEffect, 'run') as mocked_run: + with patch.object(effects.HandshakeEffect, 'run') as mocked_run: dispatcher = Dispatcher(StateMachine(UnsubscribedState)) - dispatcher.dispatch_effect(effects.HandshakeEffect(['chan'])) + dispatcher.dispatch_effect(invocations.HandshakeInvocation(['chan'])) mocked_run.assert_called() def test_dispatch_stop_handshake_effect(): - with patch.object(manage_effects.ManageHandshakeEffect, 'stop') as mocked_stop: + with patch.object(effects.HandshakeEffect, 'stop') as mocked_stop: dispatcher = Dispatcher(StateMachine(UnsubscribedState)) - dispatcher.dispatch_effect(effects.HandshakeEffect(['chan'])) - dispatcher.dispatch_effect(effects.CancelHandshakeEffect()) + dispatcher.dispatch_effect(invocations.HandshakeInvocation(['chan'])) + dispatcher.dispatch_effect(invocations.CancelHandshakeInvocation()) mocked_stop.assert_called() def test_dispatch_run_receive_effect(): - with patch.object(manage_effects.ManagedReceiveMessagesEffect, 'run') as mocked_run: + with patch.object(effects.ReceiveMessagesEffect, 'run') as mocked_run: dispatcher = Dispatcher(StateMachine(UnsubscribedState)) - dispatcher.dispatch_effect(effects.ReceiveMessagesEffect(['chan'])) + dispatcher.dispatch_effect(invocations.ReceiveMessagesInvocation(['chan'])) mocked_run.assert_called() def test_dispatch_stop_receive_effect(): - with patch.object(manage_effects.ManagedReceiveMessagesEffect, 'stop', ) as mocked_stop: + with patch.object(effects.ReceiveMessagesEffect, 'stop', ) as mocked_stop: dispatcher = Dispatcher(StateMachine(UnsubscribedState)) - dispatcher.dispatch_effect(effects.ReceiveMessagesEffect(['chan'])) - dispatcher.dispatch_effect(effects.CancelReceiveMessagesEffect()) + dispatcher.dispatch_effect(invocations.ReceiveMessagesInvocation(['chan'])) + dispatcher.dispatch_effect(invocations.CancelReceiveMessagesInvocation()) mocked_stop.assert_called() def test_dispatch_run_handshake_reconnect_effect(): - with patch.object(manage_effects.ManagedHandshakeReconnectEffect, 'run') as mocked_run: + with patch.object(effects.HandshakeReconnectEffect, 'run') as mocked_run: dispatcher = Dispatcher(StateMachine(UnsubscribedState)) dispatcher.set_pn(FakePN()) - dispatcher.dispatch_effect(effects.HandshakeReconnectEffect(['chan'])) + dispatcher.dispatch_effect(invocations.HandshakeReconnectInvocation(['chan'])) mocked_run.assert_called() def test_dispatch_stop_handshake_reconnect_effect(): - with patch.object(manage_effects.ManagedHandshakeReconnectEffect, 'stop') as mocked_stop: + with patch.object(effects.HandshakeReconnectEffect, 'stop') as mocked_stop: dispatcher = Dispatcher(StateMachine(UnsubscribedState)) dispatcher.set_pn(FakePN()) - dispatcher.dispatch_effect(effects.HandshakeReconnectEffect(['chan'])) - dispatcher.dispatch_effect(effects.CancelHandshakeReconnectEffect()) + dispatcher.dispatch_effect(invocations.HandshakeReconnectInvocation(['chan'])) + dispatcher.dispatch_effect(invocations.CancelHandshakeReconnectInvocation()) mocked_stop.assert_called() def test_dispatch_run_receive_reconnect_effect(): - with patch.object(manage_effects.ManagedReceiveReconnectEffect, 'run') as mocked_run: + with patch.object(effects.ReceiveReconnectEffect, 'run') as mocked_run: dispatcher = Dispatcher(StateMachine(UnsubscribedState)) dispatcher.set_pn(FakePN()) - dispatcher.dispatch_effect(effects.ReceiveReconnectEffect(['chan'])) + dispatcher.dispatch_effect(invocations.ReceiveReconnectInvocation(['chan'])) mocked_run.assert_called() def test_dispatch_stop_receive_reconnect_effect(): - with patch.object(manage_effects.ManagedReceiveReconnectEffect, 'stop') as mocked_stop: + with patch.object(effects.ReceiveReconnectEffect, 'stop') as mocked_stop: dispatcher = Dispatcher(StateMachine(UnsubscribedState)) dispatcher.set_pn(FakePN()) - dispatcher.dispatch_effect(effects.ReceiveReconnectEffect(['chan'])) - dispatcher.dispatch_effect(effects.CancelReceiveReconnectEffect()) + dispatcher.dispatch_effect(invocations.ReceiveReconnectInvocation(['chan'])) + dispatcher.dispatch_effect(invocations.CancelReceiveReconnectInvocation()) mocked_stop.assert_called() + + +@pytest.mark.asyncio +async def test_cancel_effect(): + pubnub = PubNubAsyncio(pnconf_env_copy()) + event_engine = StateMachine(states.HeartbeatInactiveState, name="presence") + managed_effects_factory = effects.EffectFactory(pubnub, event_engine) + managed_wait_effect = managed_effects_factory.create(invocation=invocations.HeartbeatWaitInvocation(10)) + managed_wait_effect.run() + await asyncio.sleep(1) + managed_wait_effect.stop() + await pubnub.stop() diff --git a/tests/functional/event_engine/test_state_container.py b/tests/functional/event_engine/test_state_container.py new file mode 100644 index 00000000..d0b7af7d --- /dev/null +++ b/tests/functional/event_engine/test_state_container.py @@ -0,0 +1,16 @@ +from pubnub.event_engine.containers import PresenceStateContainer + + +def test_set_state(): + container = PresenceStateContainer() + container.register_state(state={'state': 'active'}, channels=['c1', 'c2']) + assert container.get_channels_states(['c1', 'c2']) == {'c1': {'state': 'active'}, 'c2': {'state': 'active'}} + assert container.get_state(['c1']) == {'c1': {'state': 'active'}} + + +def test_set_state_with_overwrite(): + container = PresenceStateContainer() + container.register_state(state={'state': 'active'}, channels=['c1']) + container.register_state(state={'state': 'inactive'}, channels=['c1']) + assert container.get_channels_states(['c1']) == {'c1': {'state': 'inactive'}} + assert container.get_state(['c1', 'c2']) == {'c1': {'state': 'inactive'}} diff --git a/tests/functional/event_engine/test_subscribe.py b/tests/functional/event_engine/test_subscribe.py index 37fbaf50..588c60e8 100644 --- a/tests/functional/event_engine/test_subscribe.py +++ b/tests/functional/event_engine/test_subscribe.py @@ -62,6 +62,7 @@ async def test_subscribe(): message_callback.assert_called() pubnub.unsubscribe_all() pubnub._subscription_manager.stop() + await pubnub.stop() async def delayed_publish(channel, message, delay): @@ -84,6 +85,7 @@ async def test_handshaking(): assert pubnub._subscription_manager.event_engine.get_state_name() == states.ReceivingState.__name__ status_callback.assert_called() pubnub._subscription_manager.stop() + await pubnub.stop() @pytest.mark.asyncio @@ -112,7 +114,7 @@ def is_state(state): assert pubnub._subscription_manager.event_engine.get_state_name() == states.HandshakeFailedState.__name__ pubnub._subscription_manager.stop() - await pubnub.close_session() + await pubnub.stop() @pytest.mark.asyncio @@ -141,3 +143,4 @@ def is_state(state): .until_async(lambda: is_state(states.HandshakeReconnectingState.__name__)) assert pubnub._subscription_manager.event_engine.get_state_name() == states.HandshakeReconnectingState.__name__ pubnub._subscription_manager.stop() + await pubnub.stop()