Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DTP-949] Initialise LiveObjects pool from state sync sequence #1890

Merged
merged 2 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import { LiveObject, LiveObjectData } from './liveobject';
import { StateValue } from './statemessage';
import { LiveObjects } from './liveobjects';
import { MapSemantics, StateValue } from './statemessage';

export interface ObjectIdStateData {
/**
* A reference to another state object, used to support composable state objects.
*/
/** A reference to another state object, used to support composable state objects. */
objectId: string;
}

export interface ValueStateData {
/**
* A concrete leaf value in the state object graph.
* The encoding the client should use to interpret the value.
* Analogous to the `encoding` field on the `Message` and `PresenceMessage` types.
*/
encoding?: string;
/** A concrete leaf value in the state object graph. */
value: StateValue;
}

export type StateData = ObjectIdStateData | ValueStateData;

export interface MapEntry {
// TODO: add tombstone, timeserial
tombstone: boolean;
timeserial: string;
VeskeR marked this conversation as resolved.
Show resolved Hide resolved
data: StateData;
}

Expand All @@ -27,6 +30,15 @@ export interface LiveMapData extends LiveObjectData {
}

export class LiveMap extends LiveObject<LiveMapData> {
constructor(
liveObjects: LiveObjects,
private _semantics: MapSemantics,
initialData?: LiveMapData | null,
objectId?: string,
) {
super(liveObjects, initialData, objectId);
}

/**
* Returns the value associated with the specified key in the underlying Map object.
* If no element is associated with the specified key, undefined is returned.
Expand Down
17 changes: 13 additions & 4 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export class LiveObjects {
return this._liveObjectsPool;
}

/**
* @internal
*/
getChannel(): RealtimeChannel {
return this._channel;
}

/**
* @internal
*/
Expand All @@ -53,7 +60,7 @@ export class LiveObjects {
this._startNewSync(syncId, syncCursor);
}

// TODO: delegate state messages to _syncLiveObjectsDataPool and create new live and data objects
this._syncLiveObjectsDataPool.applyStateMessages(stateMessages);

// if this is the last (or only) message in a sequence of sync updates, end the sync
if (!syncCursor) {
Expand Down Expand Up @@ -154,17 +161,19 @@ export class LiveObjects {
}

let newObject: LiveObject;
switch (entry.objectType) {
// assign to a variable so TS doesn't complain about 'never' type in the default case
const objectType = entry.objectType;
switch (objectType) {
VeskeR marked this conversation as resolved.
Show resolved Hide resolved
case 'LiveCounter':
newObject = new LiveCounter(this, entry.objectData, objectId);
break;

case 'LiveMap':
newObject = new LiveMap(this, entry.objectData, objectId);
newObject = new LiveMap(this, entry.semantics, entry.objectData, objectId);
break;

default:
throw new this._client.ErrorInfo(`Unknown live object type: ${entry.objectType}`, 40000, 400);
throw new this._client.ErrorInfo(`Unknown live object type: ${objectType}`, 40000, 400);
}
newObject.setRegionalTimeserial(entry.regionalTimeserial);

Expand Down
3 changes: 2 additions & 1 deletion src/plugins/liveobjects/liveobjectspool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type BaseClient from 'common/lib/client/baseclient';
import { LiveMap } from './livemap';
import { LiveObject } from './liveobject';
import { LiveObjects } from './liveobjects';
import { MapSemantics } from './statemessage';

export const ROOT_OBJECT_ID = 'root';

Expand Down Expand Up @@ -41,7 +42,7 @@ export class LiveObjectsPool {

private _getInitialPool(): Map<string, LiveObject> {
const pool = new Map<string, LiveObject>();
const root = new LiveMap(this._liveObjects, null, ROOT_OBJECT_ID);
const root = new LiveMap(this._liveObjects, MapSemantics.LWW, null, ROOT_OBJECT_ID);
VeskeR marked this conversation as resolved.
Show resolved Hide resolved
pool.set(root.getObjectId(), root);
return pool;
}
Expand Down
107 changes: 104 additions & 3 deletions src/plugins/liveobjects/syncliveobjectsdatapool.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
import type BaseClient from 'common/lib/client/baseclient';
import RealtimeChannel from 'common/lib/client/realtimechannel';
import { LiveCounterData } from './livecounter';
import { LiveMapData, MapEntry, ObjectIdStateData, StateData, ValueStateData } from './livemap';
import { LiveObjectData } from './liveobject';
import { LiveObjects } from './liveobjects';
import { MapSemantics, StateMessage, StateObject } from './statemessage';

export interface LiveObjectDataEntry {
objectData: LiveObjectData;
regionalTimeserial: string;
objectType: 'LiveMap' | 'LiveCounter';
}

export interface LiveCounterDataEntry extends LiveObjectDataEntry {
created: boolean;
objectType: 'LiveCounter';
}

export interface LiveMapDataEntry extends LiveObjectDataEntry {
objectType: 'LiveMap';
semantics: MapSemantics;
}

export type AnyDataEntry = LiveCounterDataEntry | LiveMapDataEntry;

/**
* @internal
*/
export class SyncLiveObjectsDataPool {
private _pool: Map<string, LiveObjectDataEntry>;
private _client: BaseClient;
private _channel: RealtimeChannel;
private _pool: Map<string, AnyDataEntry>;

constructor(private _liveObjects: LiveObjects) {
this._pool = new Map<string, LiveObjectDataEntry>();
this._client = this._liveObjects.getClient();
this._channel = this._liveObjects.getChannel();
this._pool = new Map<string, AnyDataEntry>();
}

entries() {
Expand All @@ -30,6 +51,86 @@ export class SyncLiveObjectsDataPool {
}

reset(): void {
this._pool = new Map<string, LiveObjectDataEntry>();
this._pool = new Map<string, AnyDataEntry>();
}

applyStateMessages(stateMessages: StateMessage[]): void {
for (const stateMessage of stateMessages) {
if (!stateMessage.object) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()',
`state message is received during SYNC without 'object' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}

const stateObject = stateMessage.object;

if (stateObject.counter) {
this._pool.set(stateObject.objectId, this._createLiveCounterDataEntry(stateObject));
} else if (stateObject.map) {
this._pool.set(stateObject.objectId, this._createLiveMapDataEntry(stateObject));
} else {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MINOR,
'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()',
`received unsupported state object message during SYNC, expected 'counter' or 'map' to be present; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
}
}
}

private _createLiveCounterDataEntry(stateObject: StateObject): LiveCounterDataEntry {
const counter = stateObject.counter!;

const objectData: LiveCounterData = {
data: counter.count ?? 0,
};
const newEntry: LiveCounterDataEntry = {
created: counter.created,
objectData,
objectType: 'LiveCounter',
regionalTimeserial: stateObject.regionalTimeserial,
};

return newEntry;
}

private _createLiveMapDataEntry(stateObject: StateObject): LiveMapDataEntry {
const map = stateObject.map!;

const objectData: LiveMapData = {
data: new Map<string, MapEntry>(),
};
// need to iterate over entries manually to work around optional parameters from state object entries type
Object.entries(map.entries ?? {}).forEach(([key, entryFromMessage]) => {
let liveData: StateData;
if (typeof entryFromMessage.data.objectId !== 'undefined') {
liveData = { objectId: entryFromMessage.data.objectId } as ObjectIdStateData;
} else {
liveData = { encoding: entryFromMessage.data.encoding, value: entryFromMessage.data.value } as ValueStateData;
}

const liveDataEntry: MapEntry = {
...entryFromMessage,
// true only if we received explicit true. otherwise always false
tombstone: entryFromMessage.tombstone === true,
data: liveData,
};

objectData.data.set(key, liveDataEntry);
});

const newEntry: LiveMapDataEntry = {
objectData,
objectType: 'LiveMap',
regionalTimeserial: stateObject.regionalTimeserial,
semantics: map.semantics ?? MapSemantics.LWW,
};

return newEntry;
}
}
Loading