Skip to content

Commit

Permalink
fix(gqty): onSubscribe for connected WebSockets (#2033)
Browse files Browse the repository at this point in the history
  • Loading branch information
vicary authored Nov 10, 2024
1 parent ee30fe3 commit f681c06
Show file tree
Hide file tree
Showing 21 changed files with 204 additions and 264 deletions.
5 changes: 5 additions & 0 deletions .changeset/breezy-apes-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'gqty': patch
---

fix(gqty): onSubscribe for connected WebSockets
2 changes: 1 addition & 1 deletion examples/gnt/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"eslint-config-next": "^14.2.5",
"postcss": "^8.4.41",
"tailwindcss": "^3.4.9",
"typescript": "^5.5.4",
"typescript": "^5.6.3",
"utf-8-validate": "^5.0.10"
},
"gqty": {
Expand Down
2 changes: 1 addition & 1 deletion examples/solid/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"postcss": "^8.4.38",
"solid-devtools": "^0.29.2",
"tailwindcss": "^3.4.3",
"typescript": "^5.3.3",
"typescript": "^5.6.3",
"vite": "^5.4.3",
"vite-plugin-solid": "^2.10.2"
}
Expand Down
2 changes: 1 addition & 1 deletion internal/test-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"@jest/types": "^29.6.3",
"@types/node": "^20.14.15",
"esbuild": "^0.23.0",
"typescript": "^5.5.4"
"typescript": "^5.6.3"
},
"peerDependencies": {
"graphql": "*"
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"ts-jest": "^29.2.5",
"ts-node": "^10.9.2",
"tsx": "^4.19.0",
"typescript": "^5.5.4",
"typescript": "^5.6.3",
"utf-8-validate": "^6.0.4",
"wait-on": "^7.2.0"
},
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
"p-lazy": "^3.1.0",
"test-utils": "workspace:^",
"tmp-promise": "^3.0.3",
"typescript": "^5.5.4",
"typescript": "^5.6.3",
"wait-on": "^7.2.0"
},
"peerDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/gqty/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
"test-utils": "workspace:^",
"tsc-watch": "^6.2.0",
"type-fest": "^4.24.0",
"typescript": "^5.5.4",
"typescript": "^5.6.3",
"wait-on": "^7.2.0",
"ws": "^8.18.0"
},
Expand Down
5 changes: 4 additions & 1 deletion packages/gqty/src/Cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ export class Cache {
#data = new FrailMap<string, CacheDataContainer>();

/** Look up table for normalized objects. */
#normalizedObjects = new Map<string, NormalizedObjectShell<CacheObject>>();
#normalizedObjects = new FrailMap<
string,
NormalizedObjectShell<CacheObject>
>();

/** Temporary strong references for the WeakRefs in FrailMap. */
#dataRefs = new Set<CacheDataContainer>();
Expand Down
3 changes: 2 additions & 1 deletion packages/gqty/src/Cache/normalization.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { FrailMap } from 'frail-map';
import type { CacheNode, CacheObject } from '.';
import { GQtyError } from '../Error';
import { deepAssign } from '../Utils';
Expand All @@ -22,7 +23,7 @@ const shells = new Set<NormalizedObjectShell>();

export type NormalizatioOptions<TData extends CacheObject = CacheObject> =
CacheNormalizationHandler & {
store: Map<string, NormalizedObjectShell<TData>>;
store: FrailMap<string, NormalizedObjectShell<TData>>;
};

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/gqty/src/Cache/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ const deduplicationCache = new WeakMap<
*
* After promise resolution, the hash is removed from the map.
*
* If the cache to omitted, a global cache will be used instead.
* If the `cache` argument is omitted, a global cache will be used instead.
*/
export const dedupePromise = <
TData = Record<string, unknown>,
TExtensions = Record<string, unknown>
TExtensions = Record<string, unknown>,
>(
cache: Cache | undefined,
hash: string,
Expand Down
2 changes: 1 addition & 1 deletion packages/gqty/src/Client/compat/resolved.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { updateCaches } from '../updateCaches';
import type { CreateLegacyMethodOptions } from './client';
import { convertSelection, type LegacySelection } from './selection';

export interface LegacyFetchOptions extends Omit<RequestInit, 'body'> {}
export type LegacyFetchOptions = Omit<RequestInit, 'body'>;

export interface LegacyResolved {
<T = unknown>(fn: () => T, opts?: LegacyResolveOptions<T>): Promise<T>;
Expand Down
48 changes: 18 additions & 30 deletions packages/gqty/src/Client/resolveSelections.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import type { ExecutionResult } from 'graphql';
import type { Client as SseClient } from 'graphql-sse';
import type {
MessageType,
SubscribePayload,
Client as WsClient,
} from 'graphql-ws';
import type { MessageType, SubscribePayload } from 'graphql-ws';
import type { CloseEvent } from 'ws';
import type { FetchOptions } from '.';
import type { Cache } from '../Cache';
Expand All @@ -15,12 +11,17 @@ import { buildQuery } from '../QueryBuilder';
import type { QueryPayload } from '../Schema';
import type { Selection } from '../Selection';
import type { Debugger } from './debugger';
import { isWsClient, type GQtyWsClient } from './subscriber';

export type ResolverFetchOptions = Omit<FetchOptions, 'subscriber'> & {
subscriber?: SseClient | GQtyWsClient;
};

export type FetchSelectionsOptions = {
cache?: Cache;
debugger?: Debugger;
extensions?: Record<string, unknown>;
fetchOptions: FetchOptions;
fetchOptions: ResolverFetchOptions;
operationName?: string;
};

Expand Down Expand Up @@ -163,15 +164,7 @@ export const subscribeSelections = <

if (isWsClient(subscriber)) {
if (onSubscribe) {
const unsub = subscriber.on('message', (message) => {
switch (message.type) {
case 'connection_ack' as MessageType.ConnectionAck: {
unsub();
onSubscribe();
break;
}
}
});
subscriber.onSubscribe(onSubscribe);
}

if (debug) {
Expand All @@ -198,13 +191,11 @@ export const subscribeSelections = <
}
});
}
} else if (isSseClient(subscriber)) {
} else {
// [ ] Get id via constructor#onMessage option, this requires
// modifications to the generated client.
subscriptionId = 'EventSource';
onSubscribe?.();
} else {
throw new GQtyError(`Please specify a subscriber for subscriptions.`);
}

const next = ({ data, errors, extensions }: ExecutionResult<TData>) => {
Expand Down Expand Up @@ -258,11 +249,15 @@ export const subscribeSelections = <
queryPayload,
{
next,
error(err) {
if (Array.isArray(err)) {
error(GQtyError.fromGraphQLErrors(err));
} else if (!isCloseEvent(err)) {
error(GQtyError.create(err));
error(e) {
if (Array.isArray(e)) {
error(GQtyError.fromGraphQLErrors(e));
} else if (!isCloseEvent(e)) {
if (e instanceof Error) {
error(GQtyError.create(e));
} else {
console.error('Unknown subscription error:', e);
}
}

this.complete();
Expand Down Expand Up @@ -433,10 +428,3 @@ export const isCloseEvent = (input: unknown): input is CloseEvent => {
].includes(error.code))
);
};

const isWsClient = (client?: SseClient | WsClient): client is WsClient => {
return client !== undefined && typeof (client as WsClient).on === 'function';
};

const isSseClient = (client?: SseClient | WsClient): client is SseClient =>
client !== undefined && !isWsClient(client);
14 changes: 11 additions & 3 deletions packages/gqty/src/Client/resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
subscribeSelections,
type Unsubscribe,
} from './resolveSelections';
import { createSubscriber, isWsClient } from './subscriber';
import { updateCaches } from './updateCaches';

export type CreateResolversOptions = {
Expand Down Expand Up @@ -188,6 +189,8 @@ export type SubscribeOptions = ResolverOptions & {
* Called when a subscription is established, receives an unsubscribe
* function that immediately terminates the async generator and any pending
* promise.
*
* @deprecated Use the `unsubscribe` method returned from `subscribe()` instead.
*/
onSubscribe?: (unsubscribe: Unsubscribe) => void;
};
Expand Down Expand Up @@ -238,6 +241,10 @@ export const createResolvers = <TSchema extends BaseGeneratedSchema>({
// be updated. Along with the original client cache.
const correlatedCaches = new MultiDict<Set<unknown>, Cache>();

const subscriber = isWsClient(fetchOptions.subscriber)
? createSubscriber(fetchOptions.subscriber)
: fetchOptions.subscriber;

const createResolver: CreateResolverFn<TSchema> = ({
cachePolicy = defaultCachePolicy,
extensions,
Expand Down Expand Up @@ -530,8 +537,7 @@ export const createResolvers = <TSchema extends BaseGeneratedSchema>({
[{ data, error, extensions }],
cachePolicy !== 'no-store' && context.cache !== resolverCache
? [context.cache, resolverCache]
: [context.cache],
{ skipNotify: !context.notifyCacheUpdate }
: [context.cache]
);

if (!lastSelectionsUpdated) {
Expand All @@ -554,9 +560,11 @@ export const createResolvers = <TSchema extends BaseGeneratedSchema>({
...fetchOptions,
cachePolicy,
retryPolicy,
subscriber,
},
operationName,
onSubscribe: () => onSubscribe?.(unsubscribe),
onSubscribe: () =>
queueMicrotask(() => onSubscribe?.(unsubscribe)),
onComplete: () => resolve(),
}
);
Expand Down
52 changes: 52 additions & 0 deletions packages/gqty/src/Client/subscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import type { Client as SseClient } from 'graphql-sse';
import type { Client as WsClient } from 'graphql-ws';

export const isWsClient = (client?: SseClient | WsClient): client is WsClient =>
client !== undefined && typeof (client as WsClient).on === 'function';

export type GQtyWsClient = WsClient & {
isOnline?: boolean;

/**
* Subscription listeners to be called when the client is online, or called
* immediately when the client is already online.
*/
onSubscribe: (fn: () => void) => void;
};

/**
* Warning: If the WebSocket is already connected before this funciton is
* called, the `onSubscribe` will not be called until next connected event.
*/
export const createSubscriber = (input: WsClient): GQtyWsClient => {
const client = input as GQtyWsClient;

// Prevent double initialization
if (client.onSubscribe !== undefined) {
return client;
}

const listeners: Array<() => void> = [];

client.on('connected', () => {
if (!client.isOnline) {
client.isOnline = true;
listeners.forEach((fn) => fn());
listeners.length = 0;
}
});

client.on('closed', () => {
client.isOnline = false;
});

client.onSubscribe = (fn: () => void) => {
if (client.isOnline) {
fn();
} else {
listeners.push(fn);
}
};

return client;
};
2 changes: 1 addition & 1 deletion packages/gqty/src/QueryBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export const buildQuery = (
const key = s.alias ? `${s.alias}:${s.key}` : s.key;
const input = s.input;

if (input) {
if (input && Object.keys(input.values).length > 0) {
if (!inputDedupe.has(input)) {
const queryInputs = Object.entries(input.values)
.map(([key, value]) => {
Expand Down
3 changes: 3 additions & 0 deletions packages/gqty/src/Utils/deferred.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,8 @@ export const createDeferredIterator = <T>(): DeferredIterator<T> => {
[Symbol.asyncIterator]() {
return this;
},
async [Symbol.asyncDispose]() {
await this.return();
},
};
};
2 changes: 1 addition & 1 deletion packages/logger/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"gqty": "workspace:^",
"jest": "^29.7.0",
"test-utils": "workspace:^",
"typescript": "^5.5.4"
"typescript": "^5.6.3"
},
"peerDependencies": {
"gqty": "workspace:^"
Expand Down
2 changes: 1 addition & 1 deletion packages/react/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ export interface ReactClient<TSchema extends BaseGeneratedSchema> {
export function createReactClient<TSchema extends BaseGeneratedSchema>(
client: GQtyClient<TSchema>,
{
defaults: { suspense = false } = {},
defaults: {
initialLoadingState = false,
suspense = false,
transactionFetchPolicy = 'cache-first',
lazyFetchPolicy = 'network-only',
staleWhileRevalidate = false,
Expand Down
4 changes: 2 additions & 2 deletions packages/solid/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"date-fns": "^3.6.0",
"jsdom": "^24.0.0",
"test-utils": "workspace:^",
"typescript": "^5.4.5",
"typescript": "^5.6.3",
"vite": "^5.4.3",
"vite-plugin-solid": "^2.10.2",
"vitest": "^2.0.5"
Expand All @@ -64,7 +64,7 @@
}
},
"engines": {
"node": ">=16 <=22"
"node": ">=16 <=23"
},
"publishConfig": {
"directory": "dist"
Expand Down
2 changes: 1 addition & 1 deletion packages/subscriptions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"gqty": "workspace:^",
"graphql": "^16.9.0",
"test-utils": "workspace:^",
"typescript": "^5.5.4"
"typescript": "^5.6.3"
},
"peerDependencies": {
"gqty": "workspace:^",
Expand Down
Loading

0 comments on commit f681c06

Please sign in to comment.