diff --git a/runner/src/hasura-client/__snapshots__/hasura-client.test.ts.snap b/runner/src/provisioner/hasura-client/__snapshots__/hasura-client.test.ts.snap similarity index 100% rename from runner/src/hasura-client/__snapshots__/hasura-client.test.ts.snap rename to runner/src/provisioner/hasura-client/__snapshots__/hasura-client.test.ts.snap diff --git a/runner/src/hasura-client/hasura-client.test.ts b/runner/src/provisioner/hasura-client/hasura-client.test.ts similarity index 100% rename from runner/src/hasura-client/hasura-client.test.ts rename to runner/src/provisioner/hasura-client/hasura-client.test.ts diff --git a/runner/src/hasura-client/hasura-client.ts b/runner/src/provisioner/hasura-client/hasura-client.ts similarity index 99% rename from runner/src/hasura-client/hasura-client.ts rename to runner/src/provisioner/hasura-client/hasura-client.ts index 8b50242f2..f2f4e89c5 100644 --- a/runner/src/hasura-client/hasura-client.ts +++ b/runner/src/provisioner/hasura-client/hasura-client.ts @@ -10,7 +10,7 @@ interface SqlOptions { source?: string } -export type HasuraPermission = 'select' | 'insert' | 'update' | 'delete'; +export const HASURA_PERMISSION_TYPES = ['select', 'insert', 'update', 'delete']; interface TableDefinition { name: string diff --git a/runner/src/hasura-client/index.ts b/runner/src/provisioner/hasura-client/index.ts similarity index 63% rename from runner/src/hasura-client/index.ts rename to runner/src/provisioner/hasura-client/index.ts index fa21831b8..77a2b4aea 100644 --- a/runner/src/hasura-client/index.ts +++ b/runner/src/provisioner/hasura-client/index.ts @@ -1,2 +1,3 @@ export { default } from './hasura-client'; -export type { HasuraMetadata, HasuraSource, HasuraConfiguration, HasuraDatabaseConnectionParameters, HasuraTableMetadata, HasuraRolePermission, HasuraPermission } from './hasura-client'; +export type { HasuraMetadata, HasuraSource, HasuraConfiguration, HasuraDatabaseConnectionParameters, HasuraTableMetadata, HasuraRolePermission } from './hasura-client'; +export { HASURA_PERMISSION_TYPES } from './hasura-client'; diff --git a/runner/src/provisioner/index.ts b/runner/src/provisioner/index.ts index 5a4dbeb8d..98efce62b 100644 --- a/runner/src/provisioner/index.ts +++ b/runner/src/provisioner/index.ts @@ -1 +1,2 @@ export { default } from './provisioner'; +export { METADATA_TABLE_NAME, LOGS_TABLE_NAME } from './provisioner'; diff --git a/runner/src/provisioner/provisioner.test.ts b/runner/src/provisioner/provisioner.test.ts index 0b58c4864..425ed219a 100644 --- a/runner/src/provisioner/provisioner.test.ts +++ b/runner/src/provisioner/provisioner.test.ts @@ -1,8 +1,9 @@ import pgFormat from 'pg-format'; -import Provisioner from './provisioner'; +import Provisioner, { LOGS_TABLE_NAME, METADATA_TABLE_NAME } from './provisioner'; import IndexerConfig from '../indexer-config/indexer-config'; import { LogLevel } from '../indexer-meta/log-entry'; +import { type HasuraTableMetadata, type HasuraMetadata, type HasuraSource } from './hasura-client'; describe('Provisioner', () => { let adminPgClient: any; @@ -13,15 +14,24 @@ describe('Provisioner', () => { let indexerConfig: IndexerConfig; const tableNames = ['blocks']; + const systemTables = [METADATA_TABLE_NAME, LOGS_TABLE_NAME]; + const tableNamesWithSystemTables = ['blocks', ...systemTables]; const accountId = 'morgs.near'; const functionName = 'test-function'; const databaseSchema = 'CREATE TABLE blocks (height numeric)'; indexerConfig = new IndexerConfig('', accountId, functionName, 0, '', databaseSchema, LogLevel.INFO); + const emptyHasuraMetadata = generateDefaultHasuraMetadata(); + const hasuraMetadataWithEmptySource = generateDefaultHasuraMetadata(); + hasuraMetadataWithEmptySource.sources.push(generateSourceWithTables([], [], indexerConfig.userName(), indexerConfig.databaseName())); + const hasuraMetadataWithSystemProvisions = generateDefaultHasuraMetadata(); + hasuraMetadataWithSystemProvisions.sources.push(generateSourceWithTables([indexerConfig.schemaName()], systemTables, indexerConfig.userName(), indexerConfig.databaseName())); + const hasuraMetadataWithProvisions = generateDefaultHasuraMetadata(); + hasuraMetadataWithProvisions.sources.push(generateSourceWithTables([indexerConfig.schemaName()], tableNamesWithSystemTables, indexerConfig.userName(), indexerConfig.databaseName())); const testingRetryConfig = { maxRetries: 5, baseDelay: 10 }; - const setProvisioningStatusQuery = `INSERT INTO ${indexerConfig.schemaName()}.sys_metadata (attribute, value) VALUES ('STATUS', 'PROVISIONING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`; + const setProvisioningStatusQuery = `INSERT INTO ${indexerConfig.schemaName()}.${METADATA_TABLE_NAME} (attribute, value) VALUES ('STATUS', 'PROVISIONING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`; const logsDDL = expect.any(String); const metadataDDL = expect.any(String); const error = new Error('some error'); @@ -39,7 +49,8 @@ describe('Provisioner', () => { beforeEach(() => { hasuraClient = { - getTableNames: jest.fn().mockReturnValueOnce(tableNames), + exportMetadata: jest.fn().mockResolvedValueOnce(emptyHasuraMetadata).mockResolvedValue(hasuraMetadataWithSystemProvisions), + getTableNames: jest.fn().mockResolvedValueOnce([]).mockResolvedValue(tableNamesWithSystemTables), trackTables: jest.fn().mockReturnValueOnce(null), trackForeignKeyRelationships: jest.fn().mockReturnValueOnce(null), addPermissionsToTables: jest.fn().mockReturnValueOnce(null), @@ -179,28 +190,6 @@ describe('Provisioner', () => { }); }); - describe('isUserApiProvisioned', () => { - it('returns false if datasource doesnt exists', async () => { - hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(false); - - await expect(provisioner.isProvisioned(indexerConfig)).resolves.toBe(false); - }); - - it('returns false if datasource and schema dont exists', async () => { - hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(false); - hasuraClient.doesSchemaExist = jest.fn().mockReturnValueOnce(false); - - await expect(provisioner.isProvisioned(indexerConfig)).resolves.toBe(false); - }); - - it('returns true if datasource and schema exists', async () => { - hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(true); - hasuraClient.doesSchemaExist = jest.fn().mockReturnValueOnce(true); - - await expect(provisioner.isProvisioned(indexerConfig)).resolves.toBe(true); - }); - }); - describe('provisionUserApi', () => { it('provisions an API for the user', async () => { await provisioner.provisionUserApi(indexerConfig); @@ -227,7 +216,20 @@ describe('Provisioner', () => { expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(2, indexerConfig.userName(), indexerConfig.schemaName(), logsDDL); expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(3, indexerConfig.userName(), indexerConfig.schemaName(), databaseSchema); expect(hasuraClient.getTableNames).toBeCalledWith(indexerConfig.schemaName(), indexerConfig.databaseName()); - expect(hasuraClient.trackTables).toBeCalledWith(indexerConfig.schemaName(), tableNames, indexerConfig.databaseName()); + expect(hasuraClient.trackTables).toHaveBeenNthCalledWith(1, indexerConfig.schemaName(), [METADATA_TABLE_NAME, LOGS_TABLE_NAME], indexerConfig.databaseName()); + expect(hasuraClient.trackTables).toHaveBeenNthCalledWith(2, indexerConfig.schemaName(), tableNames, indexerConfig.databaseName()); + expect(hasuraClient.addPermissionsToTables).toBeCalledWith( + indexerConfig.schemaName(), + indexerConfig.databaseName(), + [METADATA_TABLE_NAME, LOGS_TABLE_NAME], + indexerConfig.userName(), + [ + 'select', + 'insert', + 'update', + 'delete' + ] + ); expect(hasuraClient.addPermissionsToTables).toBeCalledWith( indexerConfig.schemaName(), indexerConfig.databaseName(), @@ -243,7 +245,7 @@ describe('Provisioner', () => { }); it('skips provisioning the datasource if it already exists', async () => { - hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(true); + hasuraClient.exportMetadata = jest.fn().mockResolvedValueOnce(hasuraMetadataWithEmptySource).mockResolvedValue(hasuraMetadataWithSystemProvisions); await provisioner.provisionUserApi(indexerConfig); @@ -255,7 +257,20 @@ describe('Provisioner', () => { expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(2, indexerConfig.userName(), indexerConfig.schemaName(), logsDDL); expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(3, indexerConfig.databaseName(), indexerConfig.schemaName(), databaseSchema); expect(hasuraClient.getTableNames).toBeCalledWith(indexerConfig.schemaName(), indexerConfig.databaseName()); - expect(hasuraClient.trackTables).toBeCalledWith(indexerConfig.schemaName(), tableNames, indexerConfig.databaseName()); + expect(hasuraClient.trackTables).toHaveBeenNthCalledWith(1, indexerConfig.schemaName(), [METADATA_TABLE_NAME, LOGS_TABLE_NAME], indexerConfig.databaseName()); + expect(hasuraClient.trackTables).toHaveBeenNthCalledWith(2, indexerConfig.schemaName(), tableNames, indexerConfig.databaseName()); + expect(hasuraClient.addPermissionsToTables).toBeCalledWith( + indexerConfig.schemaName(), + indexerConfig.databaseName(), + [METADATA_TABLE_NAME, LOGS_TABLE_NAME], + indexerConfig.userName(), + [ + 'select', + 'insert', + 'update', + 'delete' + ] + ); expect(hasuraClient.addPermissionsToTables).toBeCalledWith( indexerConfig.schemaName(), indexerConfig.databaseName(), @@ -270,6 +285,22 @@ describe('Provisioner', () => { ); }); + it('skips all provisioning if all provisioning tasks already done', async () => { + hasuraClient.exportMetadata = jest.fn().mockResolvedValue(hasuraMetadataWithProvisions); + hasuraClient.getTableNames = jest.fn().mockResolvedValue(tableNamesWithSystemTables); + + await provisioner.provisionUserApi(indexerConfig); + + expect(adminPgClient.query).not.toBeCalled(); + expect(hasuraClient.addDatasource).not.toBeCalled(); + + expect(hasuraClient.createSchema).not.toBeCalled(); + expect(hasuraClient.executeSqlOnSchema).not.toBeCalled(); + expect(hasuraClient.trackTables).not.toBeCalled(); + expect(hasuraClient.trackForeignKeyRelationships).toHaveBeenCalledTimes(1); + expect(hasuraClient.addPermissionsToTables).not.toBeCalled(); + }); + it('formats user input before executing the query', async () => { await provisioner.createUserDb('morgs_near', 'pass; DROP TABLE users;--', 'databaseName UNION SELECT * FROM users --'); @@ -312,12 +343,6 @@ describe('Provisioner', () => { await expect(provisioner.runLogsSql(accountId, functionName)).rejects.toThrow('Failed to run logs script: some error'); }); - it('throws an error when it fails to fetch table names', async () => { - hasuraClient.getTableNames = jest.fn().mockRejectedValue(error); - - await expect(provisioner.provisionUserApi(indexerConfig)).rejects.toThrow('Failed to provision endpoint: Failed to fetch table names: some error'); - }); - it('throws an error when it fails to track tables', async () => { hasuraClient.trackTables = jest.fn().mockRejectedValue(error); @@ -407,3 +432,52 @@ describe('Provisioner', () => { }); }); }); + +function generateDefaultHasuraMetadata (): HasuraMetadata { + const sources: HasuraSource[] = []; + // Insert default source which has different format than the rest + sources.push({ + name: 'default', + kind: 'postgres', + tables: [], + configuration: { + connection_info: { + database_url: { from_env: 'HASURA_GRAPHQL_DATABASE_URL' }, + } + } + }); + + return { + version: 3, + sources + }; +} + +function generateSourceWithTables (schemaNames: string[], tableNames: string[], role: string, db: string): HasuraSource { + const tables: HasuraTableMetadata[] = []; + schemaNames.forEach((schemaName) => { + tableNames.forEach((tableName) => { + tables.push(generateTableConfig(schemaName, tableName, role)); + }); + }); + + return { + name: db, + kind: 'postgres', + tables, + configuration: {} as any, + }; +} + +function generateTableConfig (schemaName: string, tableName: string, role: string): HasuraTableMetadata { + return { + table: { + name: tableName, + schema: schemaName, + }, + insert_permissions: [{ role, permission: {} }], + select_permissions: [{ role, permission: {} }], + update_permissions: [{ role, permission: {} }], + delete_permissions: [{ role, permission: {} }], + }; +} diff --git a/runner/src/provisioner/provisioner.ts b/runner/src/provisioner/provisioner.ts index 237bdb1e6..398ab05f6 100644 --- a/runner/src/provisioner/provisioner.ts +++ b/runner/src/provisioner/provisioner.ts @@ -5,13 +5,14 @@ import { wrapError, wrapSpan } from '../utility'; import cryptoModule from 'crypto'; import HasuraClient, { type HasuraDatabaseConnectionParameters, -} from '../hasura-client'; +} from './hasura-client'; import { logsTableDDL } from './schemas/logs-table'; import { metadataTableDDL } from './schemas/metadata-table'; import PgClientClass, { type PostgresConnectionParams } from '../pg-client'; import { type ProvisioningConfig } from '../indexer-config/indexer-config'; import IndexerMetaClass, { METADATA_TABLE_UPSERT, MetadataFields, IndexerStatus, LogEntry } from '../indexer-meta'; import logger from '../logger'; +import ProvisioningState from './provisioning-state/provisioning-state'; const DEFAULT_PASSWORD_LENGTH = 16; @@ -58,10 +59,13 @@ const defaultRetryConfig: RetryConfig = { baseDelay: 1000 }; +export const METADATA_TABLE_NAME = 'sys_metadata'; +export const LOGS_TABLE_NAME = 'sys_logs'; + export default class Provisioner { tracer: Tracer = trace.getTracer('queryapi-runner-provisioner'); - private readonly SYSTEM_TABLES = ['sys_logs', 'sys_metadata']; + private readonly SYSTEM_TABLES = [METADATA_TABLE_NAME, LOGS_TABLE_NAME]; private readonly logger: typeof logger; constructor ( @@ -162,24 +166,6 @@ export default class Provisioner { ); } - async isProvisioned (indexerConfig: ProvisioningConfig): Promise { - const checkProvisioningSpan = this.tracer.startSpan('Check if indexer is provisioned'); - - const databaseName = indexerConfig.databaseName(); - const schemaName = indexerConfig.schemaName(); - - const sourceExists = await this.hasuraClient.doesSourceExist(databaseName); - if (!sourceExists) { - return false; - } - - const schemaExists = await this.hasuraClient.doesSchemaExist(databaseName, schemaName); - - checkProvisioningSpan.end(); - - return schemaExists; - } - async createSchema (databaseName: string, schemaName: string): Promise { return await wrapError(async () => await this.hasuraClient.createSchema(databaseName, schemaName), 'Failed to create schema'); } @@ -209,10 +195,6 @@ export default class Provisioner { return await wrapError(async () => await this.hasuraClient.executeSqlOnSchema(databaseName, schemaName, sqlScript), 'Failed to run user script'); } - async getTableNames (schemaName: string, databaseName: string): Promise { - return await wrapError(async () => await this.hasuraClient.getTableNames(schemaName, databaseName), 'Failed to fetch table names'); - } - async trackTables (schemaName: string, tableNames: string[], databaseName: string): Promise { return await wrapError(async () => await this.hasuraClient.trackTables(schemaName, tableNames, databaseName), 'Failed to track tables'); } @@ -334,15 +316,22 @@ export default class Provisioner { await wrapSpan(async () => { await wrapError(async () => { + let provisioningState: ProvisioningState; try { - await this.provisionSystemResources(indexerConfig); + provisioningState = await ProvisioningState.loadProvisioningState(this.hasuraClient, indexerConfig); + } catch (error) { + logger.error('Failed to get current state of indexer resources', error); + throw error; + } + try { + await this.provisionSystemResources(indexerConfig, provisioningState); } catch (error) { logger.error('Failed to provision system resources', error); throw error; } try { - await this.provisionUserResources(indexerConfig); + await this.provisionUserResources(indexerConfig, provisioningState); } catch (err) { const error = err as Error; @@ -364,47 +353,90 @@ export default class Provisioner { await indexerMeta.writeLogs([LogEntry.systemError(error.message)]); } - async provisionSystemResources (indexerConfig: ProvisioningConfig): Promise { + async provisionSystemResources (indexerConfig: ProvisioningConfig, provisioningState: ProvisioningState): Promise { const userName = indexerConfig.userName(); const databaseName = indexerConfig.databaseName(); const schemaName = indexerConfig.schemaName(); - if (!await this.hasuraClient.doesSourceExist(databaseName)) { + if (!provisioningState.doesSourceExist()) { const password = this.generatePassword(); await this.createUserDb(userName, password, databaseName); await this.addDatasource(userName, password, databaseName); + } else { + logger.debug('Source already exists'); + } + + if (!provisioningState.doesSchemaExist()) { + await this.createSchema(databaseName, schemaName); + } else { + logger.debug('Schema already exists'); } - await this.createSchema(databaseName, schemaName); + const createdTables = provisioningState.getCreatedTables(); - await this.createMetadataTable(databaseName, schemaName); + if (!createdTables.includes(METADATA_TABLE_NAME)) { + await this.createMetadataTable(databaseName, schemaName); + } else { + logger.debug('Metadata table already exists'); + } await this.setProvisioningStatus(userName, schemaName); - await this.setupPartitionedLogsTable(userName, databaseName, schemaName); - await this.trackTables(schemaName, this.SYSTEM_TABLES, databaseName); + if (!createdTables.includes(LOGS_TABLE_NAME)) { + await this.setupPartitionedLogsTable(userName, databaseName, schemaName); + } else { + logger.debug('Logs table already exists'); + } - await this.exponentialRetry(async () => { - await this.addPermissionsToTables(indexerConfig, this.SYSTEM_TABLES, ['select', 'insert', 'update', 'delete']); - }); + const tablesToTrack = this.SYSTEM_TABLES.filter(systemTable => !provisioningState.getTrackedTables().includes(systemTable)); + if (tablesToTrack.length > 0) { + await this.trackTables(schemaName, tablesToTrack, databaseName); + } else { + logger.debug('All system tables are already tracked'); + } + + const tablesToAddPermissions = this.SYSTEM_TABLES.filter(systemTable => !provisioningState.getTablesWithPermissions().includes(systemTable)); + if (tablesToAddPermissions.length > 0) { + await this.exponentialRetry(async () => { + await this.addPermissionsToTables(indexerConfig, tablesToAddPermissions, ['select', 'insert', 'update', 'delete']); + }); + } else { + logger.debug('All system tables already have permissions'); + } } - async provisionUserResources (indexerConfig: ProvisioningConfig): Promise { + async provisionUserResources (indexerConfig: ProvisioningConfig, provisioningState: ProvisioningState): Promise { const databaseName = indexerConfig.databaseName(); const schemaName = indexerConfig.schemaName(); - await this.runIndexerSql(databaseName, schemaName, indexerConfig.schema); + const onlySystemTablesCreated = provisioningState.getCreatedTables().every((table) => this.SYSTEM_TABLES.includes(table)); + if (onlySystemTablesCreated) { + await this.runIndexerSql(databaseName, schemaName, indexerConfig.schema); + } else { + logger.debug('Skipping user script execution as non system tables have already been created'); + } - const userTableNames = (await this.getTableNames(schemaName, databaseName)).filter((tableName) => !this.SYSTEM_TABLES.includes(tableName)); + await provisioningState.reload(this.hasuraClient); + const userTableNames = provisioningState.getCreatedTables().filter((tableName) => !provisioningState.getTrackedTables().includes(tableName)); - await this.trackTables(schemaName, userTableNames, databaseName); + if (userTableNames.length > 0) { + await this.trackTables(schemaName, userTableNames, databaseName); + } else { + logger.debug('No user tables to track'); + } + // Safely retryable await this.exponentialRetry(async () => { await this.trackForeignKeyRelationships(schemaName, databaseName); }); - await this.exponentialRetry(async () => { - await this.addPermissionsToTables(indexerConfig, userTableNames, ['select', 'insert', 'update', 'delete']); - }); + const tablesWithoutPermissions = userTableNames.filter((tableName) => !provisioningState.getTablesWithPermissions().includes(tableName)); + if (tablesWithoutPermissions.length > 0) { + await this.exponentialRetry(async () => { + await this.addPermissionsToTables(indexerConfig, userTableNames, ['select', 'insert', 'update', 'delete']); + }); + } else { + logger.debug('All user tables already have permissions'); + } } async exponentialRetry (fn: () => Promise): Promise { diff --git a/runner/src/provisioner/provisioning-state/__snapshots__/provisioning-state.test.ts.snap b/runner/src/provisioner/provisioning-state/__snapshots__/provisioning-state.test.ts.snap new file mode 100644 index 000000000..e4dccf844 --- /dev/null +++ b/runner/src/provisioner/provisioning-state/__snapshots__/provisioning-state.test.ts.snap @@ -0,0 +1,250 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`ProvisioiningState correctly fetch metadata for existing source and schema 1`] = ` +[ + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableA", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableB", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, +] +`; + +exports[`ProvisioiningState handles table with missing permissions 1`] = ` +[ + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableA", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableB", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableC", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permission": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableD", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, +] +`; + +exports[`ProvisioiningState reload loads metadata and created tables 1`] = ` +[ + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableA", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, + { + "delete_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "insert_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "select_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + "table": { + "name": "tableB", + "schema": "account_id_function_name", + }, + "update_permissions": [ + { + "permission": {}, + "role": "account_id", + }, + ], + }, +] +`; diff --git a/runner/src/provisioner/provisioning-state/index.ts b/runner/src/provisioner/provisioning-state/index.ts new file mode 100644 index 000000000..468349a65 --- /dev/null +++ b/runner/src/provisioner/provisioning-state/index.ts @@ -0,0 +1 @@ +export { default } from './provisioning-state'; diff --git a/runner/src/provisioner/provisioning-state/provisioning-state.test.ts b/runner/src/provisioner/provisioning-state/provisioning-state.test.ts new file mode 100644 index 000000000..4b9c2bd21 --- /dev/null +++ b/runner/src/provisioner/provisioning-state/provisioning-state.test.ts @@ -0,0 +1,219 @@ +import { ProvisioningConfig } from '../../indexer-config'; +import { LogLevel } from '../../indexer-meta/log-entry'; +import type HasuraClient from '../hasura-client'; +import { type HasuraTableMetadata, type HasuraConfiguration, type HasuraDatabaseConnectionParameters, type HasuraSource, type HasuraMetadata } from '../hasura-client'; +import ProvisioningState from './provisioning-state'; + +describe('ProvisioiningState', () => { + const provisioningConfig = new ProvisioningConfig( + 'account-id', + 'function-name', + 'schema', + LogLevel.INFO, + ); + + it('can create state whether source exists or not', async () => { + const metadataWithoutUser = generateHasuraMetadata(['some_schema'], ['tableA', 'tableB'], 'someAccount', 'someDb'); + const mockExportMetadata = jest.fn().mockResolvedValue(metadataWithoutUser); + const mockGetTableNames = jest.fn().mockRejectedValueOnce(new Error('{"error":"source with name "someAccount" does not exist","path":"$.args","code":"not-exists"}')); + const mockHasuraClient = { + exportMetadata: mockExportMetadata, + getTableNames: mockGetTableNames, + } as unknown as HasuraClient; + + const provisioningState = await ProvisioningState.loadProvisioningState(mockHasuraClient, provisioningConfig); + expect(provisioningState.doesSourceExist()).toBe(false); + expect(provisioningState.doesSchemaExist()).toBe(false); + expect(provisioningState.getCreatedTables()).toEqual([]); + expect(provisioningState.getSourceMetadata()).toEqual(undefined); + expect(provisioningState.getMetadataForTables()).toEqual([]); + expect(provisioningState.getTrackedTables()).toEqual([]); + expect(provisioningState.getTablesWithPermissions()).toEqual([]); + }); + + it('state works with existing source', async () => { + const metadataWithUser = generateHasuraMetadata([provisioningConfig.schemaName(), 'some_schema'], ['tableA', 'tableB'], provisioningConfig.hasuraRoleName(), provisioningConfig.databaseName()); + metadataWithUser.sources.push(generateSourceWithTables(['anotherSchema'], ['anotherTable'], 'anotherRole', 'anotherDb')); + const mockExportMetadata = jest.fn().mockResolvedValue(metadataWithUser); + const mockGetTableNames = jest.fn().mockResolvedValue(['tableA']); + const mockHasuraClient = { + exportMetadata: mockExportMetadata, + getTableNames: mockGetTableNames, + } as unknown as HasuraClient; + + const provisioningState = await ProvisioningState.loadProvisioningState(mockHasuraClient, provisioningConfig); + expect(provisioningState.doesSourceExist()).toBe(true); + expect(provisioningState.doesSchemaExist()).toBe(true); + expect(provisioningState.getCreatedTables()).toEqual(['tableA']); + }); + + it('correctly fetch metadata for existing source and schema', async () => { + const metadataWithUser = generateHasuraMetadata([provisioningConfig.schemaName(), 'some_schema'], ['tableA', 'tableB'], provisioningConfig.hasuraRoleName(), provisioningConfig.databaseName()); + metadataWithUser.sources.push(generateSourceWithTables(['anotherSchema'], ['anotherTable'], 'anotherRole', 'anotherDb')); + const mockExportMetadata = jest.fn().mockResolvedValue(metadataWithUser); + const mockGetTableNames = jest.fn().mockResolvedValue(['tableA']); + const mockHasuraClient = { + exportMetadata: mockExportMetadata, + getTableNames: mockGetTableNames, + } as unknown as HasuraClient; + + const provisioningState = await ProvisioningState.loadProvisioningState(mockHasuraClient, provisioningConfig); + expect(provisioningState.getSourceMetadata()?.name).toBe(provisioningConfig.hasuraRoleName()); + expect(provisioningState.getMetadataForTables().length).toBe(2); + expect(provisioningState.getMetadataForTables()).toMatchSnapshot(); + expect(provisioningState.getTrackedTables()).toEqual(['tableA', 'tableB']); + expect(provisioningState.getTablesWithPermissions()).toEqual(['tableA', 'tableB']); + }); + + it('reload loads metadata and created tables', async () => { + const metadataWithoutUser = generateHasuraMetadata(['some_schema'], ['tableA', 'tableB'], 'someAccount', 'someDb'); + const metadataWithUser = generateHasuraMetadata([provisioningConfig.schemaName(), 'some_schema'], ['tableA', 'tableB'], provisioningConfig.hasuraRoleName(), provisioningConfig.databaseName()); + metadataWithUser.sources.push(generateSourceWithTables(['anotherSchema'], ['anotherTable'], 'anotherRole', 'anotherDb')); + const mockExportMetadata = jest.fn().mockResolvedValueOnce(metadataWithoutUser).mockResolvedValue(metadataWithUser); + const mockGetTableNames = jest.fn().mockRejectedValueOnce(new Error('{"error":"source with name "someAccount" does not exist","path":"$.args","code":"not-exists"}')).mockResolvedValueOnce(['tableA']); + const mockHasuraClient = { + exportMetadata: mockExportMetadata, + getTableNames: mockGetTableNames, + } as unknown as HasuraClient; + + const provisioningState = await ProvisioningState.loadProvisioningState(mockHasuraClient, provisioningConfig); + expect(provisioningState.doesSourceExist()).toBe(false); + expect(provisioningState.doesSchemaExist()).toBe(false); + expect(provisioningState.getCreatedTables()).toEqual([]); + expect(provisioningState.getSourceMetadata()).toEqual(undefined); + + await provisioningState.reload(mockHasuraClient); + expect(provisioningState.getSourceMetadata()?.name).toBe(provisioningConfig.hasuraRoleName()); + expect(provisioningState.getMetadataForTables().length).toBe(2); + expect(provisioningState.getMetadataForTables()).toMatchSnapshot(); + expect(provisioningState.getTrackedTables()).toEqual(['tableA', 'tableB']); + expect(provisioningState.getTablesWithPermissions()).toEqual(['tableA', 'tableB']); + }); + + it('handles table with missing permissions', async () => { + const metadataWithUser = generateHasuraMetadata([provisioningConfig.schemaName(), 'some_schema'], ['tableA', 'tableB'], provisioningConfig.hasuraRoleName(), provisioningConfig.databaseName()); + const role = provisioningConfig.hasuraRoleName(); + const tableMissingPermissions = { + table: { + name: 'tableC', + schema: provisioningConfig.schemaName(), + }, + insert_permissions: [{ role, permission: {} }], + update_permissions: [{ role, permission: {} }], + delete_permissions: [{ role, permission: {} }], + }; + const tableWithIncorrectlyNamedPermission = { + table: { + name: 'tableD', + schema: provisioningConfig.schemaName(), + }, + select_permissions: [{ role, permission: {} }], + insert_permission: [{ role, permission: {} }], + update_permissions: [{ role, permission: {} }], + delete_permissions: [{ role, permission: {} }], + }; + metadataWithUser.sources[1].tables.push(tableMissingPermissions); // First source is a default source + metadataWithUser.sources[1].tables.push(tableWithIncorrectlyNamedPermission); + + const mockExportMetadata = jest.fn().mockResolvedValue(metadataWithUser); + const mockGetTableNames = jest.fn().mockResolvedValue(['tableA']); + const mockHasuraClient = { + exportMetadata: mockExportMetadata, + getTableNames: mockGetTableNames, + } as unknown as HasuraClient; + + const provisioningState = await ProvisioningState.loadProvisioningState(mockHasuraClient, provisioningConfig); + expect(provisioningState.getSourceMetadata()?.name).toBe(provisioningConfig.hasuraRoleName()); + expect(provisioningState.getMetadataForTables().length).toBe(4); + expect(provisioningState.getMetadataForTables()).toMatchSnapshot(); + expect(provisioningState.getTrackedTables()).toEqual(['tableA', 'tableB', 'tableC', 'tableD']); + expect(provisioningState.getTablesWithPermissions()).toEqual(['tableA', 'tableB']); + }); + + it('throws error when multiple sources with same name exist', async () => { + const metadataWithUser = generateHasuraMetadata([provisioningConfig.schemaName(), 'some_schema'], ['tableA', 'tableB'], provisioningConfig.hasuraRoleName(), provisioningConfig.databaseName()); + metadataWithUser.sources.push(generateSourceWithTables(['anotherSchema'], ['anotherTable'], provisioningConfig.hasuraRoleName(), provisioningConfig.databaseName())); + const mockExportMetadata = jest.fn().mockResolvedValue(metadataWithUser); + const mockGetTableNames = jest.fn().mockResolvedValue(['tableA']); + const mockHasuraClient = { + exportMetadata: mockExportMetadata, + getTableNames: mockGetTableNames, + } as unknown as HasuraClient; + + const provisioningState = await ProvisioningState.loadProvisioningState(mockHasuraClient, provisioningConfig); + expect(() => provisioningState.getSourceMetadata()).toThrow('Expected no more than one source'); + expect(() => provisioningState.getMetadataForTables()).toThrow('Expected no more than one source'); + expect(() => provisioningState.getTrackedTables()).toThrow('Expected no more than one source'); + }); +}); + +function generateHasuraMetadata (schemaNames: string[], tableNames: string[], role: string, db: string): HasuraMetadata { + const sources: HasuraSource[] = []; + // Insert default source which has different format than the rest + sources.push({ + name: 'default', + kind: 'postgres', + tables: [], + configuration: { + connection_info: { + database_url: { from_env: 'HASURA_GRAPHQL_DATABASE_URL' }, + } + } + }); + + sources.push(generateSourceWithTables(schemaNames, tableNames, role, db)); + + return { + version: 3, + sources + }; +} + +function generateSourceWithTables (schemaNames: string[], tableNames: string[], role: string, db: string): HasuraSource { + const tables: HasuraTableMetadata[] = []; + schemaNames.forEach((schemaName) => { + tableNames.forEach((tableName) => { + tables.push(generateTableConfig(schemaName, tableName, role)); + }); + }); + + return { + name: db, + kind: 'postgres', + tables, + configuration: generateHasuraConfiguration(role, 'password'), + }; +} + +function generateTableConfig (schemaName: string, tableName: string, role: string): HasuraTableMetadata { + return { + table: { + name: tableName, + schema: schemaName, + }, + insert_permissions: [{ role, permission: {} }], + select_permissions: [{ role, permission: {} }], + update_permissions: [{ role, permission: {} }], + delete_permissions: [{ role, permission: {} }], + }; +} + +function generateHasuraConfiguration (user: string, password: string): HasuraConfiguration { + return { + connection_info: { + database_url: { connection_parameters: generateConnectionParameter(user, password) }, + isolation_level: 'read-committed', + use_prepared_statements: false + } + }; +} + +function generateConnectionParameter (user: string, password: string): HasuraDatabaseConnectionParameters { + return { + database: user, + host: 'postgres', + password, + port: 5432, + username: user + }; +} diff --git a/runner/src/provisioner/provisioning-state/provisioning-state.ts b/runner/src/provisioner/provisioning-state/provisioning-state.ts new file mode 100644 index 000000000..412fd4b5e --- /dev/null +++ b/runner/src/provisioner/provisioning-state/provisioning-state.ts @@ -0,0 +1,81 @@ +import { type ProvisioningConfig } from '../../indexer-config'; +import { type HasuraTableMetadata, type HasuraMetadata, type HasuraSource, HASURA_PERMISSION_TYPES } from '../hasura-client'; +import type HasuraClient from '../hasura-client'; + +export default class ProvisioningState { + constructor ( + private readonly config: ProvisioningConfig, + private hasuraMetadata: HasuraMetadata, + private tablesInSource: string[], + ) {} + + static async loadProvisioningState (hasuraClient: HasuraClient, provisioningConfig: ProvisioningConfig): Promise { + const hasuraMetadata = await hasuraClient.exportMetadata(); + const tablesInSource = await hasuraClient.getTableNames(provisioningConfig.schemaName(), provisioningConfig.databaseName()).catch((err) => { + const error = err as Error; + if (error.message.includes('source with name') && error.message.includes('not-exists')) { + return []; + } + throw error; + }); + return new ProvisioningState(provisioningConfig, hasuraMetadata, tablesInSource); + } + + async reload (hasuraClient: HasuraClient): Promise { + this.hasuraMetadata = await hasuraClient.exportMetadata(); + this.tablesInSource = await hasuraClient.getTableNames(this.config.schemaName(), this.config.databaseName()).catch((err) => { + const error = err as Error; + if (error.message.includes('source with name') && error.message.includes('not-exists')) { + return []; + } + throw error; + }); + } + + doesSourceExist (): boolean { + return this.hasuraMetadata.sources.some(source => source.name === this.config.databaseName()); + } + + doesSchemaExist (): boolean { + return this.hasuraMetadata.sources.some( + source => source.name === this.config.databaseName() && + source.tables.some( + table => table.table.schema === this.config.schemaName() + ) + ); + } + + getCreatedTables (): string[] { + return this.tablesInSource; + } + + getSourceMetadata (): HasuraSource | undefined { + const matchedSource = this.hasuraMetadata.sources.filter(source => source.name === this.config.databaseName()); + if (matchedSource.length > 1) { + throw new Error(`Expected no more than one source with name ${this.config.databaseName()}. Found ${matchedSource.length}`); + }; + return matchedSource.length === 0 ? undefined : matchedSource[0]; + } + + getMetadataForTables (): HasuraTableMetadata[] { + return this.getSourceMetadata()?.tables.filter(tableMetadata => tableMetadata.table.schema === this.config.schemaName()) ?? []; + } + + getTrackedTables (): string[] { + return this.getMetadataForTables().map(tableMetadata => tableMetadata.table.name); + } + + private tableContainsAllPermissions (tableMetadata: HasuraTableMetadata): boolean { + const allPermissions: string[] = HASURA_PERMISSION_TYPES.map(permission => `${permission}_permissions`); + const metadataKeys = Object.keys(tableMetadata); + return allPermissions.every(permission => metadataKeys.includes(permission)); + } + + // Does not check for partial permissions + getTablesWithPermissions (): string[] { + const tableMetadataList = this.getMetadataForTables(); + return tableMetadataList + .filter(metadata => this.tableContainsAllPermissions(metadata)) + .map(metadata => metadata.table.name); + } +} diff --git a/runner/src/server/services/data-layer/data-layer-service.test.ts b/runner/src/server/services/data-layer/data-layer-service.test.ts index 88541de69..8989c8a06 100644 --- a/runner/src/server/services/data-layer/data-layer-service.test.ts +++ b/runner/src/server/services/data-layer/data-layer-service.test.ts @@ -67,23 +67,6 @@ describe('DataLayerService', () => { }); describe('StartProvisioningTask', () => { - it('returns FAILED_PRECONDITION if already provisioned', (done) => { - const provisioner = { - isProvisioned: jest.fn().mockResolvedValue(true) - } as unknown as Provisioner; - const tasks = {}; - const call = { - request: { accountId: 'testAccount', functionName: 'testFunction', schema: 'testSchema' } - } as unknown as ServerUnaryCall; - const callback = (error: any): void => { - expect(error.code).toBe(status.FAILED_PRECONDITION); - expect(error.details).toBe('Data Layer is already provisioned'); - done(); - }; - - createDataLayerService(provisioner, tasks).StartProvisioningTask(call, callback); - }); - it('should start a new provisioning task', (done) => { const tasks: Record = {}; const provisioner = { diff --git a/runner/src/server/services/data-layer/data-layer-service.ts b/runner/src/server/services/data-layer/data-layer-service.ts index 61a1c94e6..89332b136 100644 --- a/runner/src/server/services/data-layer/data-layer-service.ts +++ b/runner/src/server/services/data-layer/data-layer-service.ts @@ -87,43 +87,19 @@ export function createDataLayerService ( const logger = createLogger(provisioningConfig); - provisioner - .isProvisioned(provisioningConfig) - .then((isProvisioned) => { - if (isProvisioned) { - const failedPrecondition = new StatusBuilder() - .withCode(status.FAILED_PRECONDITION) - .withDetails('Data Layer is already provisioned') - .build(); - - callback(failedPrecondition); - - return; - } - - const taskId = crypto.randomUUID(); - - logger.info(`Starting provisioning task: ${taskId}`); - - tasks[taskId] = new AsyncTask( - provisioner - .provisionUserApi(provisioningConfig) - .then(() => { - logger.info('Successfully provisioned Data Layer'); - }) - ); - - callback(null, { taskId }); - }) - .catch((err) => { - logger.warn('Failed to check if Data Layer is provisioned', err); - - const internal = new StatusBuilder() - .withCode(status.INTERNAL) - .withDetails('Failed to check Data Layer provisioned status') - .build(); - callback(internal); - }); + const taskId = crypto.randomUUID(); + + logger.info(`Starting provisioning task: ${taskId}`); + + tasks[taskId] = new AsyncTask( + provisioner + .provisionUserApi(provisioningConfig) + .then(() => { + logger.info('Successfully provisioned Data Layer'); + }) + ); + + callback(null, { taskId }); }, StartDeprovisioningTask (call: ServerUnaryCall, callback: sendUnaryData): void { diff --git a/runner/src/redis-client/index.ts b/runner/src/stream-handler/redis-client/index.ts similarity index 100% rename from runner/src/redis-client/index.ts rename to runner/src/stream-handler/redis-client/index.ts diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/stream-handler/redis-client/redis-client.test.ts similarity index 100% rename from runner/src/redis-client/redis-client.test.ts rename to runner/src/stream-handler/redis-client/redis-client.test.ts diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/stream-handler/redis-client/redis-client.ts similarity index 97% rename from runner/src/redis-client/redis-client.ts rename to runner/src/stream-handler/redis-client/redis-client.ts index 645987c6b..854278cab 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/stream-handler/redis-client/redis-client.ts @@ -1,6 +1,6 @@ import { createClient, type RedisClientType } from 'redis'; -import logger from '../logger'; +import logger from '../../logger'; interface StreamMessage { id: string diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 38a6483b5..a9014b140 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -4,7 +4,7 @@ import promClient from 'prom-client'; import { Block } from '@near-lake/primitives'; import { Indexer } from '../indexer'; -import RedisClient from '../redis-client'; +import RedisClient from './redis-client'; import { METRICS } from '../metrics'; import LakeClient from '../lake-client'; import { WorkerMessageType, type WorkerMessage, ExecutionState } from './stream-handler'; diff --git a/runner/tests/integration.test.ts b/runner/tests/integration.test.ts index cca359990..8cfa1f64d 100644 --- a/runner/tests/integration.test.ts +++ b/runner/tests/integration.test.ts @@ -3,7 +3,7 @@ import { Network, type StartedNetwork } from 'testcontainers'; import { gql, GraphQLClient } from 'graphql-request'; import { Indexer } from '../src/indexer'; -import HasuraClient from '../src/hasura-client'; +import HasuraClient from '../src/provisioner/hasura-client'; import Provisioner from '../src/provisioner'; import PgClient from '../src/pg-client';