diff --git a/packages/core/src/modules/connections/ConnectionEvents.ts b/packages/core/src/modules/connections/ConnectionEvents.ts index c9f1064bab..bd8f98989f 100644 --- a/packages/core/src/modules/connections/ConnectionEvents.ts +++ b/packages/core/src/modules/connections/ConnectionEvents.ts @@ -4,6 +4,7 @@ import type { BaseEvent } from '../../agent/Events' export enum ConnectionEventTypes { ConnectionStateChanged = 'ConnectionStateChanged', + ConnectionDidRotated = 'ConnectionDidRotated', } export interface ConnectionStateChangedEvent extends BaseEvent { @@ -13,3 +14,19 @@ export interface ConnectionStateChangedEvent extends BaseEvent { previousState: DidExchangeState | null } } + +export interface ConnectionDidRotatedEvent extends BaseEvent { + type: typeof ConnectionEventTypes.ConnectionDidRotated + payload: { + connectionRecord: ConnectionRecord + + ourDid?: { + from: string + to: string + } + theirDid?: { + from: string + to: string + } + } +} diff --git a/packages/core/src/modules/connections/__tests__/did-rotate.test.ts b/packages/core/src/modules/connections/__tests__/did-rotate.test.ts index a6f885cb5f..66db4f26f4 100644 --- a/packages/core/src/modules/connections/__tests__/did-rotate.test.ts +++ b/packages/core/src/modules/connections/__tests__/did-rotate.test.ts @@ -1,7 +1,5 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ -import type { ConnectionRecord } from '../repository' - import { ReplaySubject, first, firstValueFrom, timeout } from 'rxjs' import { MessageSender } from '../../..//agent/MessageSender' @@ -11,6 +9,8 @@ import { makeConnection, waitForAgentMessageProcessedEvent, waitForBasicMessage, + waitForConnectionRecord, + waitForDidRotate, } from '../../../../tests/helpers' import { Agent } from '../../../agent/Agent' import { getOutboundMessageContext } from '../../../agent/getOutboundMessageContext' @@ -20,6 +20,8 @@ import { BasicMessage } from '../../basic-messages' import { createPeerDidDocumentFromServices } from '../../dids' import { ConnectionsModule } from '../ConnectionsModule' import { DidRotateProblemReportMessage, HangupMessage, DidRotateAckMessage } from '../messages' +import { DidExchangeState } from '../models' +import { ConnectionRecord } from '../repository' import { InMemoryDidRegistry } from './InMemoryDidRegistry' @@ -233,11 +235,33 @@ describe('Rotation E2E tests', () => { didDocument, }) + const waitForAllDidRotate = Promise.all([waitForDidRotate(aliceAgent, {}), waitForDidRotate(bobAgent, {})]) + // Do did rotate await aliceAgent.connections.rotate({ connectionId: aliceBobConnection!.id, toDid: did }) // Wait for acknowledge await waitForAgentMessageProcessedEvent(aliceAgent, { messageType: DidRotateAckMessage.type.messageTypeUri }) + const [firstRotate, secondRotate] = await waitForAllDidRotate + + const preRotateDid = aliceBobConnection!.did + expect(firstRotate).toEqual({ + connectionRecord: expect.any(ConnectionRecord), + ourDid: { + from: preRotateDid, + to: did, + }, + theirDid: undefined, + }) + + expect(secondRotate).toEqual({ + connectionRecord: expect.any(ConnectionRecord), + ourDid: undefined, + theirDid: { + from: preRotateDid, + to: did, + }, + }) // Send message to previous did await bobAgent.dependencyManager.resolve(MessageSender).sendMessage(messageToPreviousDid) @@ -323,6 +347,16 @@ describe('Rotation E2E tests', () => { connectionRecord: bobAliceConnection!.clone(), }) + const connectionsAbandoned = Promise.all([ + waitForConnectionRecord(aliceAgent, { + state: DidExchangeState.Abandoned, + threadId: aliceBobConnection?.threadId, + }), + waitForConnectionRecord(bobAgent, { + state: DidExchangeState.Abandoned, + threadId: aliceBobConnection?.threadId, + }), + ]) await aliceAgent.connections.hangup({ connectionId: aliceBobConnection!.id }) // Wait for hangup @@ -330,6 +364,16 @@ describe('Rotation E2E tests', () => { messageType: HangupMessage.type.messageTypeUri, }) + const [aliceAbandoned, bobAbandoned] = await connectionsAbandoned + expect(aliceAbandoned).toMatchObject({ + state: DidExchangeState.Abandoned, + errorMessage: 'Connection hangup by us', + }) + expect(bobAbandoned).toMatchObject({ + state: DidExchangeState.Abandoned, + errorMessage: 'Connection hangup by other party', + }) + // If Bob attempts to send a message to Alice after they received the hangup, framework should reject it expect(bobAgent.basicMessages.sendMessage(bobAliceConnection!.id, 'Message after hangup')).rejects.toThrowError() @@ -358,7 +402,7 @@ describe('Rotation E2E tests', () => { await aliceAgent.connections.hangup({ connectionId: aliceBobConnection!.id, deleteAfterHangup: true }) // Verify that alice connection has been effectively deleted - expect(aliceAgent.connections.getById(aliceBobConnection!.id)).rejects.toThrowError(RecordNotFoundError) + expect(aliceAgent.connections.getById(aliceBobConnection!.id)).rejects.toThrow(RecordNotFoundError) // Wait for hangup await waitForAgentMessageProcessedEvent(bobAgent, { diff --git a/packages/core/src/modules/connections/services/DidRotateService.ts b/packages/core/src/modules/connections/services/DidRotateService.ts index d02812a61c..8fcfc98e6a 100644 --- a/packages/core/src/modules/connections/services/DidRotateService.ts +++ b/packages/core/src/modules/connections/services/DidRotateService.ts @@ -1,8 +1,10 @@ import type { Routing } from './ConnectionService' import type { AgentContext } from '../../../agent' import type { InboundMessageContext } from '../../../agent/models/InboundMessageContext' +import type { ConnectionDidRotatedEvent, ConnectionStateChangedEvent } from '../ConnectionEvents' import type { ConnectionRecord } from '../repository/ConnectionRecord' +import { EventEmitter } from '../../../agent/EventEmitter' import { OutboundMessageContext } from '../../../agent/models' import { InjectionSymbols } from '../../../constants' import { CredoError } from '../../../error' @@ -18,8 +20,10 @@ import { isValidPeerDid, } from '../../dids' import { getMediationRecordForDidDocument } from '../../routing/services/helpers' +import { ConnectionEventTypes } from '../ConnectionEvents' import { ConnectionsModuleConfig } from '../ConnectionsModuleConfig' import { DidRotateMessage, DidRotateAckMessage, DidRotateProblemReportMessage, HangupMessage } from '../messages' +import { DidExchangeState } from '../models' import { ConnectionMetadataKeys } from '../repository/ConnectionMetadataTypes' import { ConnectionService } from './ConnectionService' @@ -29,10 +33,16 @@ import { createPeerDidFromServices, getDidDocumentForCreatedDid, routingToServic export class DidRotateService { private didResolverService: DidResolverService private logger: Logger + private eventEmitter: EventEmitter - public constructor(didResolverService: DidResolverService, @inject(InjectionSymbols.Logger) logger: Logger) { + public constructor( + didResolverService: DidResolverService, + @inject(InjectionSymbols.Logger) logger: Logger, + eventEmitter: EventEmitter + ) { this.didResolverService = didResolverService this.logger = logger + this.eventEmitter = eventEmitter } public async createRotate( @@ -95,9 +105,13 @@ export class DidRotateService { connection.previousDids = [...connection.previousDids, connection.did] } + const previousState = connection.state connection.did = undefined + connection.state = DidExchangeState.Abandoned + connection.errorMessage = 'Connection hangup by us' await agentContext.dependencyManager.resolve(ConnectionService).update(agentContext, connection) + this.emitStateChangedEvent(agentContext, connection, previousState) return message } @@ -119,9 +133,13 @@ export class DidRotateService { connection.previousTheirDids = [...connection.previousTheirDids, connection.theirDid] } + const previousState = connection.state connection.theirDid = undefined + connection.state = DidExchangeState.Abandoned + connection.errorMessage = 'Connection hangup by other party' await agentContext.dependencyManager.resolve(ConnectionService).update(agentContext, connection) + this.emitStateChangedEvent(agentContext, connection, previousState) } /** @@ -197,9 +215,13 @@ export class DidRotateService { connection.previousTheirDids = [...connection.previousTheirDids, connection.theirDid] } + const previousTheirDid = connection.theirDid connection.theirDid = newDid await agentContext.dependencyManager.resolve(ConnectionService).update(agentContext, connection) + this.emitDidRotatedEvent(agentContext, connection, { + previousTheirDid, + }) return outboundMessageContext } @@ -225,11 +247,15 @@ export class DidRotateService { // Store previous did in order to still accept out-of-order messages that arrived later using it if (connection.did) connection.previousDids = [...connection.previousDids, connection.did] + const previousOurDid = connection.did connection.did = didRotateMetadata.did connection.mediatorId = didRotateMetadata.mediatorId connection.metadata.delete(ConnectionMetadataKeys.DidRotate) await agentContext.dependencyManager.resolve(ConnectionService).update(agentContext, connection) + this.emitDidRotatedEvent(agentContext, connection, { + previousOurDid, + }) } /** @@ -271,4 +297,49 @@ export class DidRotateService { await agentContext.dependencyManager.resolve(ConnectionService).update(agentContext, connection) } + + private emitDidRotatedEvent( + agentContext: AgentContext, + connectionRecord: ConnectionRecord, + { previousOurDid, previousTheirDid }: { previousOurDid?: string; previousTheirDid?: string } + ) { + this.eventEmitter.emit(agentContext, { + type: ConnectionEventTypes.ConnectionDidRotated, + payload: { + // Connection record in event should be static + connectionRecord: connectionRecord.clone(), + + ourDid: + previousOurDid && connectionRecord.did + ? { + from: previousOurDid, + to: connectionRecord.did, + } + : undefined, + + theirDid: + previousTheirDid && connectionRecord.theirDid + ? { + from: previousTheirDid, + to: connectionRecord.theirDid, + } + : undefined, + }, + }) + } + + private emitStateChangedEvent( + agentContext: AgentContext, + connectionRecord: ConnectionRecord, + previousState: DidExchangeState | null + ) { + this.eventEmitter.emit(agentContext, { + type: ConnectionEventTypes.ConnectionStateChanged, + payload: { + // Connection record in event should be static + connectionRecord: connectionRecord.clone(), + previousState, + }, + }) + } } diff --git a/packages/core/tests/helpers.ts b/packages/core/tests/helpers.ts index f26aa51f24..e217b935a9 100644 --- a/packages/core/tests/helpers.ts +++ b/packages/core/tests/helpers.ts @@ -18,6 +18,7 @@ import type { AgentMessageProcessedEvent, RevocationNotificationReceivedEvent, KeyDidCreateOptions, + ConnectionDidRotatedEvent, } from '../src' import type { AgentModulesInput, EmptyModuleMap } from '../src/agent/AgentModules' import type { TrustPingReceivedEvent, TrustPingResponseReceivedEvent } from '../src/modules/connections/TrustPingEvents' @@ -28,7 +29,7 @@ import type { Observable } from 'rxjs' import { readFileSync } from 'fs' import path from 'path' import { lastValueFrom, firstValueFrom, ReplaySubject } from 'rxjs' -import { catchError, filter, map, take, timeout } from 'rxjs/operators' +import { catchError, filter, map, take, tap, timeout } from 'rxjs/operators' import { InMemoryWalletModule } from '../../../tests/InMemoryWalletModule' import { agentDependencies } from '../../node/src' @@ -231,6 +232,8 @@ const isCredentialStateChangedEvent = (e: BaseEvent): e is CredentialStateChange e.type === CredentialEventTypes.CredentialStateChanged const isConnectionStateChangedEvent = (e: BaseEvent): e is ConnectionStateChangedEvent => e.type === ConnectionEventTypes.ConnectionStateChanged +const isConnectionDidRotatedEvent = (e: BaseEvent): e is ConnectionDidRotatedEvent => + e.type === ConnectionEventTypes.ConnectionDidRotated const isTrustPingReceivedEvent = (e: BaseEvent): e is TrustPingReceivedEvent => e.type === TrustPingEventTypes.TrustPingReceivedEvent const isTrustPingResponseReceivedEvent = (e: BaseEvent): e is TrustPingResponseReceivedEvent => @@ -455,6 +458,38 @@ export async function waitForCredentialRecord( return waitForCredentialRecordSubject(observable, options) } +export function waitForDidRotateSubject( + subject: ReplaySubject | Observable, + { + threadId, + state, + timeoutMs = 15000, // sign and store credential in W3c credential protocols take several seconds + }: { + threadId?: string + state?: DidExchangeState + previousState?: DidExchangeState | null + timeoutMs?: number + } +) { + const observable = subject instanceof ReplaySubject ? subject.asObservable() : subject + + return firstValueFrom( + observable.pipe( + filter(isConnectionDidRotatedEvent), + filter((e) => threadId === undefined || e.payload.connectionRecord.threadId === threadId), + filter((e) => state === undefined || e.payload.connectionRecord.state === state), + timeout(timeoutMs), + catchError(() => { + throw new Error(`ConnectionDidRotated event not emitted within specified timeout: { + threadId: ${threadId}, + state: ${state} +}`) + }), + map((e) => e.payload) + ) + ) +} + export function waitForConnectionRecordSubject( subject: ReplaySubject | Observable, { @@ -480,10 +515,10 @@ export function waitForConnectionRecordSubject( timeout(timeoutMs), catchError(() => { throw new Error(`ConnectionStateChanged event not emitted within specified timeout: { - previousState: ${previousState}, - threadId: ${threadId}, - state: ${state} -}`) + previousState: ${previousState}, + threadId: ${threadId}, + state: ${state} + }`) }), map((e) => e.payload.connectionRecord) ) @@ -503,6 +538,18 @@ export async function waitForConnectionRecord( return waitForConnectionRecordSubject(observable, options) } +export async function waitForDidRotate( + agent: Agent, + options: { + threadId?: string + state?: DidExchangeState + timeoutMs?: number + } +) { + const observable = agent.events.observable(ConnectionEventTypes.ConnectionDidRotated) + return waitForDidRotateSubject(observable, options) +} + export async function waitForBasicMessage( agent: Agent, { content, connectionId }: { content?: string; connectionId?: string }