From dd67ffc9f85f1dca4fe29ac2a77bb6141ff559f3 Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Tue, 25 Jul 2023 15:42:25 +0200 Subject: [PATCH] Emit messages --- pubnub/event_engine/dispatcher.py | 2 -- pubnub/event_engine/manage_effects.py | 18 ++++++++++++++---- pubnub/event_engine/models/states.py | 1 - 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/pubnub/event_engine/dispatcher.py b/pubnub/event_engine/dispatcher.py index d5d584b8..74340e72 100644 --- a/pubnub/event_engine/dispatcher.py +++ b/pubnub/event_engine/dispatcher.py @@ -16,7 +16,6 @@ def set_pn(self, pubnub_instance): self._effect_emitter.set_pn(pubnub_instance) def dispatch_effect(self, effect: effects.PNEffect): - print(f'dispatching {effect.__class__.__name__} {id(effect)}') if not self._managed_effects_factory: self._managed_effects_factory = manage_effects.ManagedEffectFactory(self._pubnub, self._event_engine) @@ -30,7 +29,6 @@ def dispatch_effect(self, effect: effects.PNEffect): self.dispatch_cancel_effect(effect) def emit_effect(self, effect: effects.PNEffect): - print(f' emiting {effect.__class__.__name__} with {effect.__dict__}') self._effect_emitter.emit(effect) def dispatch_managed_effect(self, effect: effects.PNEffect): diff --git a/pubnub/event_engine/manage_effects.py b/pubnub/event_engine/manage_effects.py index 63283352..faf27a0b 100644 --- a/pubnub/event_engine/manage_effects.py +++ b/pubnub/event_engine/manage_effects.py @@ -3,10 +3,11 @@ from queue import SimpleQueue from typing import Union from pubnub.endpoints.pubsub.subscribe import Subscribe +from pubnub.models.consumer.pubsub import PNMessageResult +from pubnub.models.server.subscribe import SubscribeMessage from pubnub.pubnub import PubNub from pubnub.event_engine.models import effects, events from pubnub.models.consumer.common import PNStatus -from pubnub.workers import SubscribeMessageWorker class ManagedEffect: @@ -92,7 +93,6 @@ async def receive_messages_async(self, channels, groups, timetoken, region): timetoken = cursor['t'] region = cursor['r'] messages = response.result['m'] - print(response.result) recieve_success = events.ReceiveSuccessEvent(timetoken, region=region, messages=messages) self.event_engine.trigger(recieve_success) @@ -120,7 +120,6 @@ class EmitEffect: def set_pn(self, pubnub: PubNub): self.pubnub = pubnub self.queue = SimpleQueue - self.message_worker = SubscribeMessageWorker(self.pubnub, None, None, None) def emit(self, effect: effects.PNEmittableEffect): if isinstance(effect, effects.EmitMessagesEffect): @@ -129,7 +128,18 @@ def emit(self, effect: effects.PNEmittableEffect): self.emit_status(effect) def emit_message(self, effect: effects.EmitMessagesEffect): - self.pubnub._subscription_manager._listener_manager.announce_message('foo') + for message in effect.messages: + subscribe_message = SubscribeMessage().from_json(message) + pn_message_result = PNMessageResult( + message=subscribe_message.payload, + subscription=subscribe_message.subscription_match, + channel=subscribe_message.channel, + timetoken=int(message['p']['t']), + user_metadata=subscribe_message.publish_metadata, + publisher=subscribe_message.issuing_client_id + ) + pn_message_result = 'fo' + self.pubnub._subscription_manager._listener_manager.announce_message(pn_message_result) def emit_status(self, effect: effects.EmitStatusEffect): pn_status = PNStatus() diff --git a/pubnub/event_engine/models/states.py b/pubnub/event_engine/models/states.py index 0edb0885..cb7e58d7 100644 --- a/pubnub/event_engine/models/states.py +++ b/pubnub/event_engine/models/states.py @@ -329,7 +329,6 @@ def __init__(self, context: PNContext) -> None: def on_enter(self, context: Union[None, PNContext]): super().on_enter(context) - print(self._context) return effects.ReceiveMessagesEffect(context.channels, context.groups, timetoken=self._context.timetoken, region=self._context.region)