From 9a374193d38ba7231c48d6eee31f7b43b6925112 Mon Sep 17 00:00:00 2001 From: Sai Ranjit Tummalapalli Date: Fri, 13 Sep 2024 16:34:15 +0530 Subject: [PATCH] refactor: added timeout in message processing Signed-off-by: Sai Ranjit Tummalapalli --- .../src/transport/HttpInboundTransport.ts | 27 ++++++++++++------- .../node/src/transport/WsInboundTransport.ts | 27 ++++++++++++------- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/packages/node/src/transport/HttpInboundTransport.ts b/packages/node/src/transport/HttpInboundTransport.ts index 34d3509a0..e3e311415 100644 --- a/packages/node/src/transport/HttpInboundTransport.ts +++ b/packages/node/src/transport/HttpInboundTransport.ts @@ -12,7 +12,7 @@ import type { Server } from 'http' import { DidCommMimeType, CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core' import express, { text } from 'express' -import { filter, first, firstValueFrom } from 'rxjs' +import { first, firstValueFrom, ReplaySubject, timeout } from 'rxjs' const supportedContentTypes: string[] = [DidCommMimeType.V0, DidCommMimeType.V1] @@ -59,7 +59,21 @@ export class HttpInboundTransport implements InboundTransport { try { const message = req.body - const encryptedMessage = JSON.parse(message) + const encryptedMessage = JSON.parse(message) as EncryptedMessage + + const observable = agent.events.observable(AgentEventTypes.AgentMessageProcessed) + const subject = new ReplaySubject(1) + + observable + .pipe( + first((e) => e.type === AgentEventTypes.AgentMessageProcessed), + timeout({ + first: 10000, // timeout after 10 seconds + meta: 'HttpInboundTransport.start', + }) + ) + .subscribe(subject) + agent.events.emit(agent.context, { type: AgentEventTypes.AgentMessageReceived, payload: { @@ -69,14 +83,7 @@ export class HttpInboundTransport implements InboundTransport { }) // Wait for message to be processed - await firstValueFrom( - agent.events.observable(AgentEventTypes.AgentMessageProcessed).pipe( - filter((e) => e.type === AgentEventTypes.AgentMessageProcessed), - filter((e) => e.payload.message.id === encryptedMessage.id), - filter((e) => e.payload.message.type === encryptedMessage.type), - first() - ) - ) + await firstValueFrom(subject) // If agent did not use session when processing message we need to send response here. if (!res.headersSent) { diff --git a/packages/node/src/transport/WsInboundTransport.ts b/packages/node/src/transport/WsInboundTransport.ts index 8765b8df6..aa4edc021 100644 --- a/packages/node/src/transport/WsInboundTransport.ts +++ b/packages/node/src/transport/WsInboundTransport.ts @@ -10,7 +10,7 @@ import type { } from '@credo-ts/core' import { CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core' -import { filter, first, firstValueFrom } from 'rxjs' +import { first, firstValueFrom, ReplaySubject, timeout } from 'rxjs' // eslint-disable-next-line import/no-named-as-default import WebSocket, { Server } from 'ws' @@ -72,7 +72,21 @@ export class WsInboundTransport implements InboundTransport { socket.addEventListener('message', async (event: any) => { this.logger.debug('WebSocket message event received.', { url: event.target.url }) try { - const encryptedMessage = JSON.parse(event.data) + const encryptedMessage = JSON.parse(event.data) as EncryptedMessage + + const observable = agent.events.observable(AgentEventTypes.AgentMessageProcessed) + const subject = new ReplaySubject(1) + + observable + .pipe( + first((e) => e.type === AgentEventTypes.AgentMessageProcessed), + timeout({ + first: 10000, // timeout after 10 seconds + meta: 'WsInboundTransport.listenOnWebSocketMessages', + }) + ) + .subscribe(subject) + agent.events.emit(agent.context, { type: AgentEventTypes.AgentMessageReceived, payload: { @@ -82,14 +96,7 @@ export class WsInboundTransport implements InboundTransport { }) // Wait for message to be processed - await firstValueFrom( - agent.events.observable(AgentEventTypes.AgentMessageProcessed).pipe( - filter((e) => e.type === AgentEventTypes.AgentMessageProcessed), - filter((e) => e.payload.message.id === encryptedMessage.id), - filter((e) => e.payload.message.type === encryptedMessage.type), - first() - ) - ) + await firstValueFrom(subject) } catch (error) { this.logger.error(`Error processing message: ${error}`) }