Skip to content

Commit

Permalink
Emit messages
Browse files Browse the repository at this point in the history
  • Loading branch information
seba-aln committed Jul 25, 2023
1 parent a546791 commit dd67ffc
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
2 changes: 0 additions & 2 deletions pubnub/event_engine/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def set_pn(self, pubnub_instance):
self._effect_emitter.set_pn(pubnub_instance)

def dispatch_effect(self, effect: effects.PNEffect):
print(f'dispatching {effect.__class__.__name__} {id(effect)}')
if not self._managed_effects_factory:
self._managed_effects_factory = manage_effects.ManagedEffectFactory(self._pubnub, self._event_engine)

Expand All @@ -30,7 +29,6 @@ def dispatch_effect(self, effect: effects.PNEffect):
self.dispatch_cancel_effect(effect)

def emit_effect(self, effect: effects.PNEffect):
print(f' emiting {effect.__class__.__name__} with {effect.__dict__}')
self._effect_emitter.emit(effect)

def dispatch_managed_effect(self, effect: effects.PNEffect):
Expand Down
18 changes: 14 additions & 4 deletions pubnub/event_engine/manage_effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from queue import SimpleQueue
from typing import Union
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.models.consumer.pubsub import PNMessageResult
from pubnub.models.server.subscribe import SubscribeMessage
from pubnub.pubnub import PubNub
from pubnub.event_engine.models import effects, events
from pubnub.models.consumer.common import PNStatus
from pubnub.workers import SubscribeMessageWorker


class ManagedEffect:
Expand Down Expand Up @@ -92,7 +93,6 @@ async def receive_messages_async(self, channels, groups, timetoken, region):
timetoken = cursor['t']
region = cursor['r']
messages = response.result['m']
print(response.result)
recieve_success = events.ReceiveSuccessEvent(timetoken, region=region, messages=messages)
self.event_engine.trigger(recieve_success)

Expand Down Expand Up @@ -120,7 +120,6 @@ class EmitEffect:
def set_pn(self, pubnub: PubNub):
self.pubnub = pubnub
self.queue = SimpleQueue
self.message_worker = SubscribeMessageWorker(self.pubnub, None, None, None)

def emit(self, effect: effects.PNEmittableEffect):
if isinstance(effect, effects.EmitMessagesEffect):
Expand All @@ -129,7 +128,18 @@ def emit(self, effect: effects.PNEmittableEffect):
self.emit_status(effect)

def emit_message(self, effect: effects.EmitMessagesEffect):
self.pubnub._subscription_manager._listener_manager.announce_message('foo')
for message in effect.messages:
subscribe_message = SubscribeMessage().from_json(message)
pn_message_result = PNMessageResult(
message=subscribe_message.payload,
subscription=subscribe_message.subscription_match,
channel=subscribe_message.channel,
timetoken=int(message['p']['t']),
user_metadata=subscribe_message.publish_metadata,
publisher=subscribe_message.issuing_client_id
)
pn_message_result = 'fo'
self.pubnub._subscription_manager._listener_manager.announce_message(pn_message_result)

def emit_status(self, effect: effects.EmitStatusEffect):
pn_status = PNStatus()
Expand Down
1 change: 0 additions & 1 deletion pubnub/event_engine/models/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ def __init__(self, context: PNContext) -> None:

def on_enter(self, context: Union[None, PNContext]):
super().on_enter(context)
print(self._context)
return effects.ReceiveMessagesEffect(context.channels, context.groups, timetoken=self._context.timetoken,
region=self._context.region)

Expand Down

0 comments on commit dd67ffc

Please sign in to comment.