Skip to content

Commit

Permalink
Merge pull request #1890 from ably/DTP-949/init-objects-pool-from-sync
Browse files Browse the repository at this point in the history
[DTP-949] Initialise LiveObjects pool from state sync sequence
  • Loading branch information
VeskeR authored Oct 22, 2024
2 parents d620d8c + 1cc2bc2 commit c836ed1
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 14 deletions.
24 changes: 18 additions & 6 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import { LiveObject, LiveObjectData } from './liveobject';
import { StateValue } from './statemessage';
import { LiveObjects } from './liveobjects';
import { MapSemantics, StateValue } from './statemessage';

export interface ObjectIdStateData {
/**
* A reference to another state object, used to support composable state objects.
*/
/** A reference to another state object, used to support composable state objects. */
objectId: string;
}

export interface ValueStateData {
/**
* A concrete leaf value in the state object graph.
* The encoding the client should use to interpret the value.
* Analogous to the `encoding` field on the `Message` and `PresenceMessage` types.
*/
encoding?: string;
/** A concrete leaf value in the state object graph. */
value: StateValue;
}

export type StateData = ObjectIdStateData | ValueStateData;

export interface MapEntry {
// TODO: add tombstone, timeserial
tombstone: boolean;
timeserial: string;
data: StateData;
}

Expand All @@ -27,6 +30,15 @@ export interface LiveMapData extends LiveObjectData {
}

export class LiveMap extends LiveObject<LiveMapData> {
constructor(
liveObjects: LiveObjects,
private _semantics: MapSemantics,
initialData?: LiveMapData | null,
objectId?: string,
) {
super(liveObjects, initialData, objectId);
}

/**
* Returns the value associated with the specified key in the underlying Map object.
* If no element is associated with the specified key, undefined is returned.
Expand Down
17 changes: 13 additions & 4 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export class LiveObjects {
return this._liveObjectsPool;
}

/**
* @internal
*/
getChannel(): RealtimeChannel {
return this._channel;
}

/**
* @internal
*/
Expand All @@ -53,7 +60,7 @@ export class LiveObjects {
this._startNewSync(syncId, syncCursor);
}

// TODO: delegate state messages to _syncLiveObjectsDataPool and create new live and data objects
this._syncLiveObjectsDataPool.applyStateMessages(stateMessages);

// if this is the last (or only) message in a sequence of sync updates, end the sync
if (!syncCursor) {
Expand Down Expand Up @@ -154,17 +161,19 @@ export class LiveObjects {
}

let newObject: LiveObject;
switch (entry.objectType) {
// assign to a variable so TS doesn't complain about 'never' type in the default case
const objectType = entry.objectType;
switch (objectType) {
case 'LiveCounter':
newObject = new LiveCounter(this, entry.objectData, objectId);
break;

case 'LiveMap':
newObject = new LiveMap(this, entry.objectData, objectId);
newObject = new LiveMap(this, entry.semantics, entry.objectData, objectId);
break;

default:
throw new this._client.ErrorInfo(`Unknown live object type: ${entry.objectType}`, 40000, 400);
throw new this._client.ErrorInfo(`Unknown live object type: ${objectType}`, 40000, 400);
}
newObject.setRegionalTimeserial(entry.regionalTimeserial);

Expand Down
3 changes: 2 additions & 1 deletion src/plugins/liveobjects/liveobjectspool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type BaseClient from 'common/lib/client/baseclient';
import { LiveMap } from './livemap';
import { LiveObject } from './liveobject';
import { LiveObjects } from './liveobjects';
import { MapSemantics } from './statemessage';

export const ROOT_OBJECT_ID = 'root';

Expand Down Expand Up @@ -41,7 +42,7 @@ export class LiveObjectsPool {

private _getInitialPool(): Map<string, LiveObject> {
const pool = new Map<string, LiveObject>();
const root = new LiveMap(this._liveObjects, null, ROOT_OBJECT_ID);
const root = new LiveMap(this._liveObjects, MapSemantics.LWW, null, ROOT_OBJECT_ID);
pool.set(root.getObjectId(), root);
return pool;
}
Expand Down
107 changes: 104 additions & 3 deletions src/plugins/liveobjects/syncliveobjectsdatapool.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
import type BaseClient from 'common/lib/client/baseclient';
import RealtimeChannel from 'common/lib/client/realtimechannel';
import { LiveCounterData } from './livecounter';
import { LiveMapData, MapEntry, ObjectIdStateData, StateData, ValueStateData } from './livemap';
import { LiveObjectData } from './liveobject';
import { LiveObjects } from './liveobjects';
import { MapSemantics, StateMessage, StateObject } from './statemessage';

export interface LiveObjectDataEntry {
objectData: LiveObjectData;
regionalTimeserial: string;
objectType: 'LiveMap' | 'LiveCounter';
}

export interface LiveCounterDataEntry extends LiveObjectDataEntry {
created: boolean;
objectType: 'LiveCounter';
}

export interface LiveMapDataEntry extends LiveObjectDataEntry {
objectType: 'LiveMap';
semantics: MapSemantics;
}

export type AnyDataEntry = LiveCounterDataEntry | LiveMapDataEntry;

/**
* @internal
*/
export class SyncLiveObjectsDataPool {
private _pool: Map<string, LiveObjectDataEntry>;
private _client: BaseClient;
private _channel: RealtimeChannel;
private _pool: Map<string, AnyDataEntry>;

constructor(private _liveObjects: LiveObjects) {
this._pool = new Map<string, LiveObjectDataEntry>();
this._client = this._liveObjects.getClient();
this._channel = this._liveObjects.getChannel();
this._pool = new Map<string, AnyDataEntry>();
}

entries() {
Expand All @@ -30,6 +51,86 @@ export class SyncLiveObjectsDataPool {
}

reset(): void {
this._pool = new Map<string, LiveObjectDataEntry>();
this._pool = new Map<string, AnyDataEntry>();
}

applyStateMessages(stateMessages: StateMessage[]): void {
for (const stateMessage of stateMessages) {
if (!stateMessage.object) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()',
`state message is received during SYNC without 'object' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}

const stateObject = stateMessage.object;

if (stateObject.counter) {
this._pool.set(stateObject.objectId, this._createLiveCounterDataEntry(stateObject));
} else if (stateObject.map) {
this._pool.set(stateObject.objectId, this._createLiveMapDataEntry(stateObject));
} else {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MINOR,
'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()',
`received unsupported state object message during SYNC, expected 'counter' or 'map' to be present; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
}
}
}

private _createLiveCounterDataEntry(stateObject: StateObject): LiveCounterDataEntry {
const counter = stateObject.counter!;

const objectData: LiveCounterData = {
data: counter.count ?? 0,
};
const newEntry: LiveCounterDataEntry = {
created: counter.created,
objectData,
objectType: 'LiveCounter',
regionalTimeserial: stateObject.regionalTimeserial,
};

return newEntry;
}

private _createLiveMapDataEntry(stateObject: StateObject): LiveMapDataEntry {
const map = stateObject.map!;

const objectData: LiveMapData = {
data: new Map<string, MapEntry>(),
};
// need to iterate over entries manually to work around optional parameters from state object entries type
Object.entries(map.entries ?? {}).forEach(([key, entryFromMessage]) => {
let liveData: StateData;
if (typeof entryFromMessage.data.objectId !== 'undefined') {
liveData = { objectId: entryFromMessage.data.objectId } as ObjectIdStateData;
} else {
liveData = { encoding: entryFromMessage.data.encoding, value: entryFromMessage.data.value } as ValueStateData;
}

const liveDataEntry: MapEntry = {
...entryFromMessage,
// true only if we received explicit true. otherwise always false
tombstone: entryFromMessage.tombstone === true,
data: liveData,
};

objectData.data.set(key, liveDataEntry);
});

const newEntry: LiveMapDataEntry = {
objectData,
objectType: 'LiveMap',
regionalTimeserial: stateObject.regionalTimeserial,
semantics: map.semantics ?? MapSemantics.LWW,
};

return newEntry;
}
}

0 comments on commit c836ed1

Please sign in to comment.