From 5633b4a28acf53826ea5b8a87443a736911adcac Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 14 Aug 2024 11:14:43 -0700 Subject: [PATCH] feat: Idempotent Provisioning (#1004) Provisioning is occasionally finnicky, failing on different parts of the process. Having retryable provisioning allows us to heal indexer state through retries rather than manually actioning this. To this, the provisioning step needs to have access to accurate status of provisioning in order to make the correct decisions on which non-idempotent actions to take. This PR introduces a ProvisioningState class which represents the status of provisioning, and can be used to make decisions on what parts of provisioning need to be completed. --- .../__snapshots__/hasura-client.test.ts.snap | 0 .../hasura-client/hasura-client.test.ts | 0 .../hasura-client/hasura-client.ts | 2 +- .../{ => provisioner}/hasura-client/index.ts | 3 +- runner/src/provisioner/index.ts | 1 + runner/src/provisioner/provisioner.test.ts | 142 +++++++--- runner/src/provisioner/provisioner.ts | 116 +++++--- .../provisioning-state.test.ts.snap | 250 ++++++++++++++++++ .../provisioner/provisioning-state/index.ts | 1 + .../provisioning-state.test.ts | 219 +++++++++++++++ .../provisioning-state/provisioning-state.ts | 81 ++++++ .../data-layer/data-layer-service.test.ts | 17 -- .../services/data-layer/data-layer-service.ts | 50 +--- .../redis-client/index.ts | 0 .../redis-client/redis-client.test.ts | 0 .../redis-client/redis-client.ts | 2 +- runner/src/stream-handler/worker.ts | 2 +- runner/tests/integration.test.ts | 2 +- 18 files changed, 753 insertions(+), 135 deletions(-) rename runner/src/{ => provisioner}/hasura-client/__snapshots__/hasura-client.test.ts.snap (100%) rename runner/src/{ => provisioner}/hasura-client/hasura-client.test.ts (100%) rename runner/src/{ => provisioner}/hasura-client/hasura-client.ts (99%) rename runner/src/{ => provisioner}/hasura-client/index.ts (63%) create mode 100644 runner/src/provisioner/provisioning-state/__snapshots__/provisioning-state.test.ts.snap create mode 100644 runner/src/provisioner/provisioning-state/index.ts create mode 100644 runner/src/provisioner/provisioning-state/provisioning-state.test.ts create mode 100644 runner/src/provisioner/provisioning-state/provisioning-state.ts rename runner/src/{ => stream-handler}/redis-client/index.ts (100%) rename runner/src/{ => stream-handler}/redis-client/redis-client.test.ts (100%) rename runner/src/{ => stream-handler}/redis-client/redis-client.ts (97%) diff --git a/runner/src/hasura-client/__snapshots__/hasura-client.test.ts.snap b/runner/src/provisioner/hasura-client/__snapshots__/hasura-client.test.ts.snap similarity index 100% rename from runner/src/hasura-client/__snapshots__/hasura-client.test.ts.snap rename to runner/src/provisioner/hasura-client/__snapshots__/hasura-client.test.ts.snap diff --git a/runner/src/hasura-client/hasura-client.test.ts b/runner/src/provisioner/hasura-client/hasura-client.test.ts similarity index 100% rename from runner/src/hasura-client/hasura-client.test.ts rename to runner/src/provisioner/hasura-client/hasura-client.test.ts diff --git a/runner/src/hasura-client/hasura-client.ts b/runner/src/provisioner/hasura-client/hasura-client.ts similarity index 99% rename from runner/src/hasura-client/hasura-client.ts rename to runner/src/provisioner/hasura-client/hasura-client.ts index 8b50242f2..f2f4e89c5 100644 --- a/runner/src/hasura-client/hasura-client.ts +++ b/runner/src/provisioner/hasura-client/hasura-client.ts @@ -10,7 +10,7 @@ interface SqlOptions { source?: string } -export type HasuraPermission = 'select' | 'insert' | 'update' | 'delete'; +export const HASURA_PERMISSION_TYPES = ['select', 'insert', 'update', 'delete']; interface TableDefinition { name: string diff --git a/runner/src/hasura-client/index.ts b/runner/src/provisioner/hasura-client/index.ts similarity index 63% rename from runner/src/hasura-client/index.ts rename to runner/src/provisioner/hasura-client/index.ts index fa21831b8..77a2b4aea 100644 --- a/runner/src/hasura-client/index.ts +++ b/runner/src/provisioner/hasura-client/index.ts @@ -1,2 +1,3 @@ export { default } from './hasura-client'; -export type { HasuraMetadata, HasuraSource, HasuraConfiguration, HasuraDatabaseConnectionParameters, HasuraTableMetadata, HasuraRolePermission, HasuraPermission } from './hasura-client'; +export type { HasuraMetadata, HasuraSource, HasuraConfiguration, HasuraDatabaseConnectionParameters, HasuraTableMetadata, HasuraRolePermission } from './hasura-client'; +export { HASURA_PERMISSION_TYPES } from './hasura-client'; diff --git a/runner/src/provisioner/index.ts b/runner/src/provisioner/index.ts index 5a4dbeb8d..98efce62b 100644 --- a/runner/src/provisioner/index.ts +++ b/runner/src/provisioner/index.ts @@ -1 +1,2 @@ export { default } from './provisioner'; +export { METADATA_TABLE_NAME, LOGS_TABLE_NAME } from './provisioner'; diff --git a/runner/src/provisioner/provisioner.test.ts b/runner/src/provisioner/provisioner.test.ts index 0b58c4864..425ed219a 100644 --- a/runner/src/provisioner/provisioner.test.ts +++ b/runner/src/provisioner/provisioner.test.ts @@ -1,8 +1,9 @@ import pgFormat from 'pg-format'; -import Provisioner from './provisioner'; +import Provisioner, { LOGS_TABLE_NAME, METADATA_TABLE_NAME } from './provisioner'; import IndexerConfig from '../indexer-config/indexer-config'; import { LogLevel } from '../indexer-meta/log-entry'; +import { type HasuraTableMetadata, type HasuraMetadata, type HasuraSource } from './hasura-client'; describe('Provisioner', () => { let adminPgClient: any; @@ -13,15 +14,24 @@ describe('Provisioner', () => { let indexerConfig: IndexerConfig; const tableNames = ['blocks']; + const systemTables = [METADATA_TABLE_NAME, LOGS_TABLE_NAME]; + const tableNamesWithSystemTables = ['blocks', ...systemTables]; const accountId = 'morgs.near'; const functionName = 'test-function'; const databaseSchema = 'CREATE TABLE blocks (height numeric)'; indexerConfig = new IndexerConfig('', accountId, functionName, 0, '', databaseSchema, LogLevel.INFO); + const emptyHasuraMetadata = generateDefaultHasuraMetadata(); + const hasuraMetadataWithEmptySource = generateDefaultHasuraMetadata(); + hasuraMetadataWithEmptySource.sources.push(generateSourceWithTables([], [], indexerConfig.userName(), indexerConfig.databaseName())); + const hasuraMetadataWithSystemProvisions = generateDefaultHasuraMetadata(); + hasuraMetadataWithSystemProvisions.sources.push(generateSourceWithTables([indexerConfig.schemaName()], systemTables, indexerConfig.userName(), indexerConfig.databaseName())); + const hasuraMetadataWithProvisions = generateDefaultHasuraMetadata(); + hasuraMetadataWithProvisions.sources.push(generateSourceWithTables([indexerConfig.schemaName()], tableNamesWithSystemTables, indexerConfig.userName(), indexerConfig.databaseName())); const testingRetryConfig = { maxRetries: 5, baseDelay: 10 }; - const setProvisioningStatusQuery = `INSERT INTO ${indexerConfig.schemaName()}.sys_metadata (attribute, value) VALUES ('STATUS', 'PROVISIONING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`; + const setProvisioningStatusQuery = `INSERT INTO ${indexerConfig.schemaName()}.${METADATA_TABLE_NAME} (attribute, value) VALUES ('STATUS', 'PROVISIONING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`; const logsDDL = expect.any(String); const metadataDDL = expect.any(String); const error = new Error('some error'); @@ -39,7 +49,8 @@ describe('Provisioner', () => { beforeEach(() => { hasuraClient = { - getTableNames: jest.fn().mockReturnValueOnce(tableNames), + exportMetadata: jest.fn().mockResolvedValueOnce(emptyHasuraMetadata).mockResolvedValue(hasuraMetadataWithSystemProvisions), + getTableNames: jest.fn().mockResolvedValueOnce([]).mockResolvedValue(tableNamesWithSystemTables), trackTables: jest.fn().mockReturnValueOnce(null), trackForeignKeyRelationships: jest.fn().mockReturnValueOnce(null), addPermissionsToTables: jest.fn().mockReturnValueOnce(null), @@ -179,28 +190,6 @@ describe('Provisioner', () => { }); }); - describe('isUserApiProvisioned', () => { - it('returns false if datasource doesnt exists', async () => { - hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(false); - - await expect(provisioner.isProvisioned(indexerConfig)).resolves.toBe(false); - }); - - it('returns false if datasource and schema dont exists', async () => { - hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(false); - hasuraClient.doesSchemaExist = jest.fn().mockReturnValueOnce(false); - - await expect(provisioner.isProvisioned(indexerConfig)).resolves.toBe(false); - }); - - it('returns true if datasource and schema exists', async () => { - hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(true); - hasuraClient.doesSchemaExist = jest.fn().mockReturnValueOnce(true); - - await expect(provisioner.isProvisioned(indexerConfig)).resolves.toBe(true); - }); - }); - describe('provisionUserApi', () => { it('provisions an API for the user', async () => { await provisioner.provisionUserApi(indexerConfig); @@ -227,7 +216,20 @@ describe('Provisioner', () => { expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(2, indexerConfig.userName(), indexerConfig.schemaName(), logsDDL); expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(3, indexerConfig.userName(), indexerConfig.schemaName(), databaseSchema); expect(hasuraClient.getTableNames).toBeCalledWith(indexerConfig.schemaName(), indexerConfig.databaseName()); - expect(hasuraClient.trackTables).toBeCalledWith(indexerConfig.schemaName(), tableNames, indexerConfig.databaseName()); + expect(hasuraClient.trackTables).toHaveBeenNthCalledWith(1, indexerConfig.schemaName(), [METADATA_TABLE_NAME, LOGS_TABLE_NAME], indexerConfig.databaseName()); + expect(hasuraClient.trackTables).toHaveBeenNthCalledWith(2, indexerConfig.schemaName(), tableNames, indexerConfig.databaseName()); + expect(hasuraClient.addPermissionsToTables).toBeCalledWith( + indexerConfig.schemaName(), + indexerConfig.databaseName(), + [METADATA_TABLE_NAME, LOGS_TABLE_NAME], + indexerConfig.userName(), + [ + 'select', + 'insert', + 'update', + 'delete' + ] + ); expect(hasuraClient.addPermissionsToTables).toBeCalledWith( indexerConfig.schemaName(), indexerConfig.databaseName(), @@ -243,7 +245,7 @@ describe('Provisioner', () => { }); it('skips provisioning the datasource if it already exists', async () => { - hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(true); + hasuraClient.exportMetadata = jest.fn().mockResolvedValueOnce(hasuraMetadataWithEmptySource).mockResolvedValue(hasuraMetadataWithSystemProvisions); await provisioner.provisionUserApi(indexerConfig); @@ -255,7 +257,20 @@ describe('Provisioner', () => { expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(2, indexerConfig.userName(), indexerConfig.schemaName(), logsDDL); expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(3, indexerConfig.databaseName(), indexerConfig.schemaName(), databaseSchema); expect(hasuraClient.getTableNames).toBeCalledWith(indexerConfig.schemaName(), indexerConfig.databaseName()); - expect(hasuraClient.trackTables).toBeCalledWith(indexerConfig.schemaName(), tableNames, indexerConfig.databaseName()); + expect(hasuraClient.trackTables).toHaveBeenNthCalledWith(1, indexerConfig.schemaName(), [METADATA_TABLE_NAME, LOGS_TABLE_NAME], indexerConfig.databaseName()); + expect(hasuraClient.trackTables).toHaveBeenNthCalledWith(2, indexerConfig.schemaName(), tableNames, indexerConfig.databaseName()); + expect(hasuraClient.addPermissionsToTables).toBeCalledWith( + indexerConfig.schemaName(), + indexerConfig.databaseName(), + [METADATA_TABLE_NAME, LOGS_TABLE_NAME], + indexerConfig.userName(), + [ + 'select', + 'insert', + 'update', + 'delete' + ] + ); expect(hasuraClient.addPermissionsToTables).toBeCalledWith( indexerConfig.schemaName(), indexerConfig.databaseName(), @@ -270,6 +285,22 @@ describe('Provisioner', () => { ); }); + it('skips all provisioning if all provisioning tasks already done', async () => { + hasuraClient.exportMetadata = jest.fn().mockResolvedValue(hasuraMetadataWithProvisions); + hasuraClient.getTableNames = jest.fn().mockResolvedValue(tableNamesWithSystemTables); + + await provisioner.provisionUserApi(indexerConfig); + + expect(adminPgClient.query).not.toBeCalled(); + expect(hasuraClient.addDatasource).not.toBeCalled(); + + expect(hasuraClient.createSchema).not.toBeCalled(); + expect(hasuraClient.executeSqlOnSchema).not.toBeCalled(); + expect(hasuraClient.trackTables).not.toBeCalled(); + expect(hasuraClient.trackForeignKeyRelationships).toHaveBeenCalledTimes(1); + expect(hasuraClient.addPermissionsToTables).not.toBeCalled(); + }); + it('formats user input before executing the query', async () => { await provisioner.createUserDb('morgs_near', 'pass; DROP TABLE users;--', 'databaseName UNION SELECT * FROM users --'); @@ -312,12 +343,6 @@ describe('Provisioner', () => { await expect(provisioner.runLogsSql(accountId, functionName)).rejects.toThrow('Failed to run logs script: some error'); }); - it('throws an error when it fails to fetch table names', async () => { - hasuraClient.getTableNames = jest.fn().mockRejectedValue(error); - - await expect(provisioner.provisionUserApi(indexerConfig)).rejects.toThrow('Failed to provision endpoint: Failed to fetch table names: some error'); - }); - it('throws an error when it fails to track tables', async () => { hasuraClient.trackTables = jest.fn().mockRejectedValue(error); @@ -407,3 +432,52 @@ describe('Provisioner', () => { }); }); }); + +function generateDefaultHasuraMetadata (): HasuraMetadata { + const sources: HasuraSource[] = []; + // Insert default source which has different format than the rest + sources.push({ + name: 'default', + kind: 'postgres', + tables: [], + configuration: { + connection_info: { + database_url: { from_env: 'HASURA_GRAPHQL_DATABASE_URL' }, + } + } + }); + + return { + version: 3, + sources + }; +} + +function generateSourceWithTables (schemaNames: string[], tableNames: string[], role: string, db: string): HasuraSource { + const tables: HasuraTableMetadata[] = []; + schemaNames.forEach((schemaName) => { + tableNames.forEach((tableName) => { + tables.push(generateTableConfig(schemaName, tableName, role)); + }); + }); + + return { + name: db, + kind: 'postgres', + tables, + configuration: {} as any, + }; +} + +function generateTableConfig (schemaName: string, tableName: string, role: string): HasuraTableMetadata { + return { + table: { + name: tableName, + schema: schemaName, + }, + insert_permissions: [{ role, permission: {} }], + select_permissions: [{ role, permission: {} }], + update_permissions: [{ role, permission: {} }], + delete_permissions: [{ role, permission: {} }], + }; +} diff --git a/runner/src/provisioner/provisioner.ts b/runner/src/provisioner/provisioner.ts index 237bdb1e6..398ab05f6 100644 --- a/runner/src/provisioner/provisioner.ts +++ b/runner/src/provisioner/provisioner.ts @@ -5,13 +5,14 @@ import { wrapError, wrapSpan } from '../utility'; import cryptoModule from 'crypto'; import HasuraClient, { type HasuraDatabaseConnectionParameters, -} from '../hasura-client'; +} from './hasura-client'; import { logsTableDDL } from './schemas/logs-table'; import { metadataTableDDL } from './schemas/metadata-table'; import PgClientClass, { type PostgresConnectionParams } from '../pg-client'; import { type ProvisioningConfig } from '../indexer-config/indexer-config'; import IndexerMetaClass, { METADATA_TABLE_UPSERT, MetadataFields, IndexerStatus, LogEntry } from '../indexer-meta'; import logger from '../logger'; +import ProvisioningState from './provisioning-state/provisioning-state'; const DEFAULT_PASSWORD_LENGTH = 16; @@ -58,10 +59,13 @@ const defaultRetryConfig: RetryConfig = { baseDelay: 1000 }; +export const METADATA_TABLE_NAME = 'sys_metadata'; +export const LOGS_TABLE_NAME = 'sys_logs'; + export default class Provisioner { tracer: Tracer = trace.getTracer('queryapi-runner-provisioner'); - private readonly SYSTEM_TABLES = ['sys_logs', 'sys_metadata']; + private readonly SYSTEM_TABLES = [METADATA_TABLE_NAME, LOGS_TABLE_NAME]; private readonly logger: typeof logger; constructor ( @@ -162,24 +166,6 @@ export default class Provisioner { ); } - async isProvisioned (indexerConfig: ProvisioningConfig): Promise { - const checkProvisioningSpan = this.tracer.startSpan('Check if indexer is provisioned'); - - const databaseName = indexerConfig.databaseName(); - const schemaName = indexerConfig.schemaName(); - - const sourceExists = await this.hasuraClient.doesSourceExist(databaseName); - if (!sourceExists) { - return false; - } - - const schemaExists = await this.hasuraClient.doesSchemaExist(databaseName, schemaName); - - checkProvisioningSpan.end(); - - return schemaExists; - } - async createSchema (databaseName: string, schemaName: string): Promise { return await wrapError(async () => await this.hasuraClient.createSchema(databaseName, schemaName), 'Failed to create schema'); } @@ -209,10 +195,6 @@ export default class Provisioner { return await wrapError(async () => await this.hasuraClient.executeSqlOnSchema(databaseName, schemaName, sqlScript), 'Failed to run user script'); } - async getTableNames (schemaName: string, databaseName: string): Promise { - return await wrapError(async () => await this.hasuraClient.getTableNames(schemaName, databaseName), 'Failed to fetch table names'); - } - async trackTables (schemaName: string, tableNames: string[], databaseName: string): Promise { return await wrapError(async () => await this.hasuraClient.trackTables(schemaName, tableNames, databaseName), 'Failed to track tables'); } @@ -334,15 +316,22 @@ export default class Provisioner { await wrapSpan(async () => { await wrapError(async () => { + let provisioningState: ProvisioningState; try { - await this.provisionSystemResources(indexerConfig); + provisioningState = await ProvisioningState.loadProvisioningState(this.hasuraClient, indexerConfig); + } catch (error) { + logger.error('Failed to get current state of indexer resources', error); + throw error; + } + try { + await this.provisionSystemResources(indexerConfig, provisioningState); } catch (error) { logger.error('Failed to provision system resources', error); throw error; } try { - await this.provisionUserResources(indexerConfig); + await this.provisionUserResources(indexerConfig, provisioningState); } catch (err) { const error = err as Error; @@ -364,47 +353,90 @@ export default class Provisioner { await indexerMeta.writeLogs([LogEntry.systemError(error.message)]); } - async provisionSystemResources (indexerConfig: ProvisioningConfig): Promise { + async provisionSystemResources (indexerConfig: ProvisioningConfig, provisioningState: ProvisioningState): Promise { const userName = indexerConfig.userName(); const databaseName = indexerConfig.databaseName(); const schemaName = indexerConfig.schemaName(); - if (!await this.hasuraClient.doesSourceExist(databaseName)) { + if (!provisioningState.doesSourceExist()) { const password = this.generatePassword(); await this.createUserDb(userName, password, databaseName); await this.addDatasource(userName, password, databaseName); + } else { + logger.debug('Source already exists'); + } + + if (!provisioningState.doesSchemaExist()) { + await this.createSchema(databaseName, schemaName); + } else { + logger.debug('Schema already exists'); } - await this.createSchema(databaseName, schemaName); + const createdTables = provisioningState.getCreatedTables(); - await this.createMetadataTable(databaseName, schemaName); + if (!createdTables.includes(METADATA_TABLE_NAME)) { + await this.createMetadataTable(databaseName, schemaName); + } else { + logger.debug('Metadata table already exists'); + } await this.setProvisioningStatus(userName, schemaName); - await this.setupPartitionedLogsTable(userName, databaseName, schemaName); - await this.trackTables(schemaName, this.SYSTEM_TABLES, databaseName); + if (!createdTables.includes(LOGS_TABLE_NAME)) { + await this.setupPartitionedLogsTable(userName, databaseName, schemaName); + } else { + logger.debug('Logs table already exists'); + } - await this.exponentialRetry(async () => { - await this.addPermissionsToTables(indexerConfig, this.SYSTEM_TABLES, ['select', 'insert', 'update', 'delete']); - }); + const tablesToTrack = this.SYSTEM_TABLES.filter(systemTable => !provisioningState.getTrackedTables().includes(systemTable)); + if (tablesToTrack.length > 0) { + await this.trackTables(schemaName, tablesToTrack, databaseName); + } else { + logger.debug('All system tables are already tracked'); + } + + const tablesToAddPermissions = this.SYSTEM_TABLES.filter(systemTable => !provisioningState.getTablesWithPermissions().includes(systemTable)); + if (tablesToAddPermissions.length > 0) { + await this.exponentialRetry(async () => { + await this.addPermissionsToTables(indexerConfig, tablesToAddPermissions, ['select', 'insert', 'update', 'delete']); + }); + } else { + logger.debug('All system tables already have permissions'); + } } - async provisionUserResources (indexerConfig: ProvisioningConfig): Promise { + async provisionUserResources (indexerConfig: ProvisioningConfig, provisioningState: ProvisioningState): Promise { const databaseName = indexerConfig.databaseName(); const schemaName = indexerConfig.schemaName(); - await this.runIndexerSql(databaseName, schemaName, indexerConfig.schema); + const onlySystemTablesCreated = provisioningState.getCreatedTables().every((table) => this.SYSTEM_TABLES.includes(table)); + if (onlySystemTablesCreated) { + await this.runIndexerSql(databaseName, schemaName, indexerConfig.schema); + } else { + logger.debug('Skipping user script execution as non system tables have already been created'); + } - const userTableNames = (await this.getTableNames(schemaName, databaseName)).filter((tableName) => !this.SYSTEM_TABLES.includes(tableName)); + await provisioningState.reload(this.hasuraClient); + const userTableNames = provisioningState.getCreatedTables().filter((tableName) => !provisioningState.getTrackedTables().includes(tableName)); - await this.trackTables(schemaName, userTableNames, databaseName); + if (userTableNames.length > 0) { + await this.trackTables(schemaName, userTableNames, databaseName); + } else { + logger.debug('No user tables to track'); + } + // Safely retryable await this.exponentialRetry(async () => { await this.trackForeignKeyRelationships(schemaName, databaseName); }); - await this.exponentialRetry(async () => { - await this.addPermissionsToTables(indexerConfig, userTableNames, ['select', 'insert', 'update', 'delete']); - }); + const tablesWithoutPermissions = userTableNames.filter((tableName) => !provisioningState.getTablesWithPermissions().includes(tableName)); + if (tablesWithoutPermissions.length > 0) { + await this.exponentialRetry(async () => { + await this.addPermissionsToTables(indexerConfig, userTableNames, ['select', 'insert', 'update', 'delete']); + }); + } else { + logger.debug('All user tables already have permissions'); + } } async exponentialRetry (fn: () => Promise): Promise { diff --git a/runner/src/provisioner/provisioning-state/__snapshots__/provisioning-state.test.ts.snap b/runner/src/provisioner/provisioning-state/__snapshots__/provisioning-state.test.ts.snap new file mode 100644 index 000000000..e4dccf844 --- /dev/null +++ b/runner/src/provisioner/provisioning-state/__snapshots__/provisioning-state.test.ts.snap @@ -0,0 +1,250 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`ProvisioiningState correctly fetch metadata for existing source and schema 1`] = ` +[ + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableA", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableB", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, +] +`; + +exports[`ProvisioiningState handles table with missing permissions 1`] = ` +[ + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableA", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableB", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableC", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permission": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableD", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, +] +`; + +exports[`ProvisioiningState reload loads metadata and created tables 1`] = ` +[ + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableA", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableB", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, +] +`; diff --git a/runner/src/provisioner/provisioning-state/index.ts b/runner/src/provisioner/provisioning-state/index.ts new file mode 100644 index 000000000..468349a65 --- /dev/null +++ b/runner/src/provisioner/provisioning-state/index.ts @@ -0,0 +1 @@ +export { default } from './provisioning-state'; diff --git a/runner/src/provisioner/provisioning-state/provisioning-state.test.ts b/runner/src/provisioner/provisioning-state/provisioning-state.test.ts new file mode 100644 index 000000000..4b9c2bd21 --- /dev/null +++ b/runner/src/provisioner/provisioning-state/provisioning-state.test.ts @@ -0,0 +1,219 @@ +import { ProvisioningConfig } from '../../indexer-config'; +import { LogLevel } from '../../indexer-meta/log-entry'; +import type HasuraClient from '../hasura-client'; +import { type HasuraTableMetadata, type HasuraConfiguration, type HasuraDatabaseConnectionParameters, type HasuraSource, type HasuraMetadata } from '../hasura-client'; +import ProvisioningState from './provisioning-state'; + +describe('ProvisioiningState', () => { + const provisioningConfig = new ProvisioningConfig( + 'account-id', + 'function-name', + 'schema', + LogLevel.INFO, + ); + + it('can create state whether source exists or not', async () => { + const metadataWithoutUser = generateHasuraMetadata(['some_schema'], ['tableA', 'tableB'], 'someAccount', 'someDb'); + const mockExportMetadata = jest.fn().mockResolvedValue(metadataWithoutUser); + const mockGetTableNames = jest.fn().mockRejectedValueOnce(new Error('{"error":"source with name "someAccount" does not exist","path":"$.args","code":"not-exists"}')); + const mockHasuraClient = { + exportMetadata: mockExportMetadata, + getTableNames: mockGetTableNames, + } as unknown as HasuraClient; + + const provisioningState = await ProvisioningState.loadProvisioningState(mockHasuraClient, provisioningConfig); + expect(provisioningState.doesSourceExist()).toBe(false); + expect(provisioningState.doesSchemaExist()).toBe(false); + expect(provisioningState.getCreatedTables()).toEqual([]); + expect(provisioningState.getSourceMetadata()).toEqual(undefined); + expect(provisioningState.getMetadataForTables()).toEqual([]); + expect(provisioningState.getTrackedTables()).toEqual([]); + expect(provisioningState.getTablesWithPermissions()).toEqual([]); + }); + + it('state works with existing source', async () => { + const metadataWithUser = generateHasuraMetadata([provisioningConfig.schemaName(), 'some_schema'], ['tableA', 'tableB'], provisioningConfig.hasuraRoleName(), provisioningConfig.databaseName()); + metadataWithUser.sources.push(generateSourceWithTables(['anotherSchema'], ['anotherTable'], 'anotherRole', 'anotherDb')); + const mockExportMetadata = jest.fn().mockResolvedValue(metadataWithUser); + const mockGetTableNames = jest.fn().mockResolvedValue(['tableA']); + const mockHasuraClient = { + exportMetadata: mockExportMetadata, + getTableNames: mockGetTableNames, + } as unknown as HasuraClient; + + const provisioningState = await ProvisioningState.loadProvisioningState(mockHasuraClient, provisioningConfig); + expect(provisioningState.doesSourceExist()).toBe(true); + expect(provisioningState.doesSchemaExist()).toBe(true); + expect(provisioningState.getCreatedTables()).toEqual(['tableA']); + }); + + it('correctly fetch metadata for existing source and schema', async () => { + const metadataWithUser = generateHasuraMetadata([provisioningConfig.schemaName(), 'some_schema'], ['tableA', 'tableB'], provisioningConfig.hasuraRoleName(), provisioningConfig.databaseName()); + metadataWithUser.sources.push(generateSourceWithTables(['anotherSchema'], ['anotherTable'], 'anotherRole', 'anotherDb')); + const mockExportMetadata = jest.fn().mockResolvedValue(metadataWithUser); + const mockGetTableNames = jest.fn().mockResolvedValue(['tableA']); + const mockHasuraClient = { + exportMetadata: mockExportMetadata, + getTableNames: mockGetTableNames, + } as unknown as HasuraClient; + + const provisioningState = await ProvisioningState.loadProvisioningState(mockHasuraClient, provisioningConfig); + expect(provisioningState.getSourceMetadata()?.name).toBe(provisioningConfig.hasuraRoleName()); + expect(provisioningState.getMetadataForTables().length).toBe(2); + expect(provisioningState.getMetadataForTables()).toMatchSnapshot(); + expect(provisioningState.getTrackedTables()).toEqual(['tableA', 'tableB']); + expect(provisioningState.getTablesWithPermissions()).toEqual(['tableA', 'tableB']); + }); + + it('reload loads metadata and created tables', async () => { + const metadataWithoutUser = generateHasuraMetadata(['some_schema'], ['tableA', 'tableB'], 'someAccount', 'someDb'); + const metadataWithUser = generateHasuraMetadata([provisioningConfig.schemaName(), 'some_schema'], ['tableA', 'tableB'], provisioningConfig.hasuraRoleName(), provisioningConfig.databaseName()); + metadataWithUser.sources.push(generateSourceWithTables(['anotherSchema'], ['anotherTable'], 'anotherRole', 'anotherDb')); + const mockExportMetadata = jest.fn().mockResolvedValueOnce(metadataWithoutUser).mockResolvedValue(metadataWithUser); + const mockGetTableNames = jest.fn().mockRejectedValueOnce(new Error('{"error":"source with name "someAccount" does not exist","path":"$.args","code":"not-exists"}')).mockResolvedValueOnce(['tableA']); + const mockHasuraClient = { + exportMetadata: mockExportMetadata, + getTableNames: mockGetTableNames, + } as unknown as HasuraClient; + + const provisioningState = await ProvisioningState.loadProvisioningState(mockHasuraClient, provisioningConfig); + expect(provisioningState.doesSourceExist()).toBe(false); + expect(provisioningState.doesSchemaExist()).toBe(false); + expect(provisioningState.getCreatedTables()).toEqual([]); + expect(provisioningState.getSourceMetadata()).toEqual(undefined); + + await provisioningState.reload(mockHasuraClient); + expect(provisioningState.getSourceMetadata()?.name).toBe(provisioningConfig.hasuraRoleName()); + expect(provisioningState.getMetadataForTables().length).toBe(2); + expect(provisioningState.getMetadataForTables()).toMatchSnapshot(); + expect(provisioningState.getTrackedTables()).toEqual(['tableA', 'tableB']); + expect(provisioningState.getTablesWithPermissions()).toEqual(['tableA', 'tableB']); + }); + + it('handles table with missing permissions', async () => { + const metadataWithUser = generateHasuraMetadata([provisioningConfig.schemaName(), 'some_schema'], ['tableA', 'tableB'], provisioningConfig.hasuraRoleName(), provisioningConfig.databaseName()); + const role = provisioningConfig.hasuraRoleName(); + const tableMissingPermissions = { + table: { + name: 'tableC', + schema: provisioningConfig.schemaName(), + }, + insert_permissions: [{ role, permission: {} }], + update_permissions: [{ role, permission: {} }], + delete_permissions: [{ role, permission: {} }], + }; + const tableWithIncorrectlyNamedPermission = { + table: { + name: 'tableD', + schema: provisioningConfig.schemaName(), + }, + select_permissions: [{ role, permission: {} }], + insert_permission: [{ role, permission: {} }], + update_permissions: [{ role, permission: {} }], + delete_permissions: [{ role, permission: {} }], + }; + metadataWithUser.sources[1].tables.push(tableMissingPermissions); // First source is a default source + metadataWithUser.sources[1].tables.push(tableWithIncorrectlyNamedPermission); + + const mockExportMetadata = jest.fn().mockResolvedValue(metadataWithUser); + const mockGetTableNames = jest.fn().mockResolvedValue(['tableA']); + const mockHasuraClient = { + exportMetadata: mockExportMetadata, + getTableNames: mockGetTableNames, + } as unknown as HasuraClient; + + const provisioningState = await ProvisioningState.loadProvisioningState(mockHasuraClient, provisioningConfig); + expect(provisioningState.getSourceMetadata()?.name).toBe(provisioningConfig.hasuraRoleName()); + expect(provisioningState.getMetadataForTables().length).toBe(4); + expect(provisioningState.getMetadataForTables()).toMatchSnapshot(); + expect(provisioningState.getTrackedTables()).toEqual(['tableA', 'tableB', 'tableC', 'tableD']); + expect(provisioningState.getTablesWithPermissions()).toEqual(['tableA', 'tableB']); + }); + + it('throws error when multiple sources with same name exist', async () => { + const metadataWithUser = generateHasuraMetadata([provisioningConfig.schemaName(), 'some_schema'], ['tableA', 'tableB'], provisioningConfig.hasuraRoleName(), provisioningConfig.databaseName()); + metadataWithUser.sources.push(generateSourceWithTables(['anotherSchema'], ['anotherTable'], provisioningConfig.hasuraRoleName(), provisioningConfig.databaseName())); + const mockExportMetadata = jest.fn().mockResolvedValue(metadataWithUser); + const mockGetTableNames = jest.fn().mockResolvedValue(['tableA']); + const mockHasuraClient = { + exportMetadata: mockExportMetadata, + getTableNames: mockGetTableNames, + } as unknown as HasuraClient; + + const provisioningState = await ProvisioningState.loadProvisioningState(mockHasuraClient, provisioningConfig); + expect(() => provisioningState.getSourceMetadata()).toThrow('Expected no more than one source'); + expect(() => provisioningState.getMetadataForTables()).toThrow('Expected no more than one source'); + expect(() => provisioningState.getTrackedTables()).toThrow('Expected no more than one source'); + }); +}); + +function generateHasuraMetadata (schemaNames: string[], tableNames: string[], role: string, db: string): HasuraMetadata { + const sources: HasuraSource[] = []; + // Insert default source which has different format than the rest + sources.push({ + name: 'default', + kind: 'postgres', + tables: [], + configuration: { + connection_info: { + database_url: { from_env: 'HASURA_GRAPHQL_DATABASE_URL' }, + } + } + }); + + sources.push(generateSourceWithTables(schemaNames, tableNames, role, db)); + + return { + version: 3, + sources + }; +} + +function generateSourceWithTables (schemaNames: string[], tableNames: string[], role: string, db: string): HasuraSource { + const tables: HasuraTableMetadata[] = []; + schemaNames.forEach((schemaName) => { + tableNames.forEach((tableName) => { + tables.push(generateTableConfig(schemaName, tableName, role)); + }); + }); + + return { + name: db, + kind: 'postgres', + tables, + configuration: generateHasuraConfiguration(role, 'password'), + }; +} + +function generateTableConfig (schemaName: string, tableName: string, role: string): HasuraTableMetadata { + return { + table: { + name: tableName, + schema: schemaName, + }, + insert_permissions: [{ role, permission: {} }], + select_permissions: [{ role, permission: {} }], + update_permissions: [{ role, permission: {} }], + delete_permissions: [{ role, permission: {} }], + }; +} + +function generateHasuraConfiguration (user: string, password: string): HasuraConfiguration { + return { + connection_info: { + database_url: { connection_parameters: generateConnectionParameter(user, password) }, + isolation_level: 'read-committed', + use_prepared_statements: false + } + }; +} + +function generateConnectionParameter (user: string, password: string): HasuraDatabaseConnectionParameters { + return { + database: user, + host: 'postgres', + password, + port: 5432, + username: user + }; +} diff --git a/runner/src/provisioner/provisioning-state/provisioning-state.ts b/runner/src/provisioner/provisioning-state/provisioning-state.ts new file mode 100644 index 000000000..412fd4b5e --- /dev/null +++ b/runner/src/provisioner/provisioning-state/provisioning-state.ts @@ -0,0 +1,81 @@ +import { type ProvisioningConfig } from '../../indexer-config'; +import { type HasuraTableMetadata, type HasuraMetadata, type HasuraSource, HASURA_PERMISSION_TYPES } from '../hasura-client'; +import type HasuraClient from '../hasura-client'; + +export default class ProvisioningState { + constructor ( + private readonly config: ProvisioningConfig, + private hasuraMetadata: HasuraMetadata, + private tablesInSource: string[], + ) {} + + static async loadProvisioningState (hasuraClient: HasuraClient, provisioningConfig: ProvisioningConfig): Promise { + const hasuraMetadata = await hasuraClient.exportMetadata(); + const tablesInSource = await hasuraClient.getTableNames(provisioningConfig.schemaName(), provisioningConfig.databaseName()).catch((err) => { + const error = err as Error; + if (error.message.includes('source with name') && error.message.includes('not-exists')) { + return []; + } + throw error; + }); + return new ProvisioningState(provisioningConfig, hasuraMetadata, tablesInSource); + } + + async reload (hasuraClient: HasuraClient): Promise { + this.hasuraMetadata = await hasuraClient.exportMetadata(); + this.tablesInSource = await hasuraClient.getTableNames(this.config.schemaName(), this.config.databaseName()).catch((err) => { + const error = err as Error; + if (error.message.includes('source with name') && error.message.includes('not-exists')) { + return []; + } + throw error; + }); + } + + doesSourceExist (): boolean { + return this.hasuraMetadata.sources.some(source => source.name === this.config.databaseName()); + } + + doesSchemaExist (): boolean { + return this.hasuraMetadata.sources.some( + source => source.name === this.config.databaseName() && + source.tables.some( + table => table.table.schema === this.config.schemaName() + ) + ); + } + + getCreatedTables (): string[] { + return this.tablesInSource; + } + + getSourceMetadata (): HasuraSource | undefined { + const matchedSource = this.hasuraMetadata.sources.filter(source => source.name === this.config.databaseName()); + if (matchedSource.length > 1) { + throw new Error(`Expected no more than one source with name ${this.config.databaseName()}. Found ${matchedSource.length}`); + }; + return matchedSource.length === 0 ? undefined : matchedSource[0]; + } + + getMetadataForTables (): HasuraTableMetadata[] { + return this.getSourceMetadata()?.tables.filter(tableMetadata => tableMetadata.table.schema === this.config.schemaName()) ?? []; + } + + getTrackedTables (): string[] { + return this.getMetadataForTables().map(tableMetadata => tableMetadata.table.name); + } + + private tableContainsAllPermissions (tableMetadata: HasuraTableMetadata): boolean { + const allPermissions: string[] = HASURA_PERMISSION_TYPES.map(permission => `${permission}_permissions`); + const metadataKeys = Object.keys(tableMetadata); + return allPermissions.every(permission => metadataKeys.includes(permission)); + } + + // Does not check for partial permissions + getTablesWithPermissions (): string[] { + const tableMetadataList = this.getMetadataForTables(); + return tableMetadataList + .filter(metadata => this.tableContainsAllPermissions(metadata)) + .map(metadata => metadata.table.name); + } +} diff --git a/runner/src/server/services/data-layer/data-layer-service.test.ts b/runner/src/server/services/data-layer/data-layer-service.test.ts index 88541de69..8989c8a06 100644 --- a/runner/src/server/services/data-layer/data-layer-service.test.ts +++ b/runner/src/server/services/data-layer/data-layer-service.test.ts @@ -67,23 +67,6 @@ describe('DataLayerService', () => { }); describe('StartProvisioningTask', () => { - it('returns FAILED_PRECONDITION if already provisioned', (done) => { - const provisioner = { - isProvisioned: jest.fn().mockResolvedValue(true) - } as unknown as Provisioner; - const tasks = {}; - const call = { - request: { accountId: 'testAccount', functionName: 'testFunction', schema: 'testSchema' } - } as unknown as ServerUnaryCall; - const callback = (error: any): void => { - expect(error.code).toBe(status.FAILED_PRECONDITION); - expect(error.details).toBe('Data Layer is already provisioned'); - done(); - }; - - createDataLayerService(provisioner, tasks).StartProvisioningTask(call, callback); - }); - it('should start a new provisioning task', (done) => { const tasks: Record = {}; const provisioner = { diff --git a/runner/src/server/services/data-layer/data-layer-service.ts b/runner/src/server/services/data-layer/data-layer-service.ts index 61a1c94e6..89332b136 100644 --- a/runner/src/server/services/data-layer/data-layer-service.ts +++ b/runner/src/server/services/data-layer/data-layer-service.ts @@ -87,43 +87,19 @@ export function createDataLayerService ( const logger = createLogger(provisioningConfig); - provisioner - .isProvisioned(provisioningConfig) - .then((isProvisioned) => { - if (isProvisioned) { - const failedPrecondition = new StatusBuilder() - .withCode(status.FAILED_PRECONDITION) - .withDetails('Data Layer is already provisioned') - .build(); - - callback(failedPrecondition); - - return; - } - - const taskId = crypto.randomUUID(); - - logger.info(`Starting provisioning task: ${taskId}`); - - tasks[taskId] = new AsyncTask( - provisioner - .provisionUserApi(provisioningConfig) - .then(() => { - logger.info('Successfully provisioned Data Layer'); - }) - ); - - callback(null, { taskId }); - }) - .catch((err) => { - logger.warn('Failed to check if Data Layer is provisioned', err); - - const internal = new StatusBuilder() - .withCode(status.INTERNAL) - .withDetails('Failed to check Data Layer provisioned status') - .build(); - callback(internal); - }); + const taskId = crypto.randomUUID(); + + logger.info(`Starting provisioning task: ${taskId}`); + + tasks[taskId] = new AsyncTask( + provisioner + .provisionUserApi(provisioningConfig) + .then(() => { + logger.info('Successfully provisioned Data Layer'); + }) + ); + + callback(null, { taskId }); }, StartDeprovisioningTask (call: ServerUnaryCall, callback: sendUnaryData): void { diff --git a/runner/src/redis-client/index.ts b/runner/src/stream-handler/redis-client/index.ts similarity index 100% rename from runner/src/redis-client/index.ts rename to runner/src/stream-handler/redis-client/index.ts diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/stream-handler/redis-client/redis-client.test.ts similarity index 100% rename from runner/src/redis-client/redis-client.test.ts rename to runner/src/stream-handler/redis-client/redis-client.test.ts diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/stream-handler/redis-client/redis-client.ts similarity index 97% rename from runner/src/redis-client/redis-client.ts rename to runner/src/stream-handler/redis-client/redis-client.ts index 645987c6b..854278cab 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/stream-handler/redis-client/redis-client.ts @@ -1,6 +1,6 @@ import { createClient, type RedisClientType } from 'redis'; -import logger from '../logger'; +import logger from '../../logger'; interface StreamMessage { id: string diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 38a6483b5..a9014b140 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -4,7 +4,7 @@ import promClient from 'prom-client'; import { Block } from '@near-lake/primitives'; import { Indexer } from '../indexer'; -import RedisClient from '../redis-client'; +import RedisClient from './redis-client'; import { METRICS } from '../metrics'; import LakeClient from '../lake-client'; import { WorkerMessageType, type WorkerMessage, ExecutionState } from './stream-handler'; diff --git a/runner/tests/integration.test.ts b/runner/tests/integration.test.ts index cca359990..8cfa1f64d 100644 --- a/runner/tests/integration.test.ts +++ b/runner/tests/integration.test.ts @@ -3,7 +3,7 @@ import { Network, type StartedNetwork } from 'testcontainers'; import { gql, GraphQLClient } from 'graphql-request'; import { Indexer } from '../src/indexer'; -import HasuraClient from '../src/hasura-client'; +import HasuraClient from '../src/provisioner/hasura-client'; import Provisioner from '../src/provisioner'; import PgClient from '../src/pg-client';