From 9f1c08c2dce4819e473523456bada632003cbe5c Mon Sep 17 00:00:00 2001 From: Eugene R Date: Tue, 5 Dec 2023 09:54:13 +0200 Subject: [PATCH] FMWK-275 Expose maximum records configuration for schema builder (#49) --- .../jdbc/AerospikeDatabaseMetadata.java | 24 ++++++++----- .../jdbc/AerospikePreparedStatement.java | 34 ++++++++++++------- .../aerospike/jdbc/model/DriverPolicy.java | 8 +++++ .../jdbc/query/SelectQueryHandler.java | 12 +++---- .../jdbc/schema/AerospikeSchemaBuilder.java | 29 ++++++++-------- .../com/aerospike/jdbc/util/Constants.java | 3 -- .../com/aerospike/jdbc/ParseJdbcUrlTest.java | 2 ++ .../aerospike/jdbc/PreparedQueriesTest.java | 9 ----- 8 files changed, 65 insertions(+), 56 deletions(-) diff --git a/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java b/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java index 930e379..f236a58 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java @@ -31,7 +31,6 @@ import static com.aerospike.jdbc.util.AerospikeUtils.getIndexBinValuesRatio; import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME; import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME; -import static com.aerospike.jdbc.util.Constants.schemaScanRecords; import static com.google.common.base.Strings.isNullOrEmpty; import static java.lang.String.format; import static java.sql.Connection.TRANSACTION_NONE; @@ -50,17 +49,17 @@ public class AerospikeDatabaseMetadata implements DatabaseMetaData, SimpleWrappe private static final String NEW_LINE = System.lineSeparator(); private final String url; - private final Connection connection; + private final AerospikeConnection connection; private final String dbBuild; private final String dbEdition; private final List catalogs; private final Map> tables; private final Map> catalogIndexes; private final Map secondaryIndexes; + private final AerospikeSchemaBuilder schemaBuilder; public AerospikeDatabaseMetadata(String url, IAerospikeClient client, AerospikeConnection connection) { logger.info("Init AerospikeDatabaseMetadata"); - AerospikeSchemaBuilder.cleanSchemaCache(); this.url = url; this.connection = connection; @@ -103,11 +102,17 @@ public AerospikeDatabaseMetadata(String url, IAerospikeClient client, AerospikeC .flatMap(Collection::stream) .collect(Collectors.toMap(AerospikeSecondaryIndex::toKey, Function.identity())); + schemaBuilder = new AerospikeSchemaBuilder(client, connection.getConfiguration().getDriverPolicy()); + dbBuild = join("N/A", ", ", builds); dbEdition = join("Aerospike", ", ", editions); catalogs = namespaces.stream().filter(n -> !"".equals(n)).collect(Collectors.toList()); } + public AerospikeSchemaBuilder getSchemaBuilder() { + return schemaBuilder; + } + @Override public boolean allProceduresAreCallable() { return false; @@ -773,7 +778,7 @@ public ResultSet getTableTypes() { @Override public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) throws SQLException { - logger.info(() -> String.format("AerospikeDatabaseMetadata getColumns; %s, %s, %s, %s", catalog, + logger.info(() -> format("AerospikeDatabaseMetadata getColumns; %s, %s, %s, %s", catalog, schemaPattern, tableNamePattern, columnNamePattern)); Pattern tableNameRegex = isNullOrEmpty(tableNamePattern) ? null : Pattern.compile(tableNamePattern.replace("%", ".*")); @@ -1263,7 +1268,7 @@ private Properties initProperties(String lines) { try { properties.load(new StringReader(lines)); } catch (IOException e) { - logger.warning(() -> String.format("Expression in initProperties, lines: %s", lines)); + logger.warning(() -> format("Expression in initProperties, lines: %s", lines)); } return properties; } @@ -1293,17 +1298,18 @@ private int ordinal(ResultSetMetaData md, String columnName) { } } } catch (SQLException e) { - logger.severe(() -> String.format("Exception in ordinal, columnName: %s", columnName)); + logger.severe(() -> format("Exception in ordinal, columnName: %s", columnName)); } return ordinal; } private ResultSetMetaData getMetadata(String namespace, String table) { try (Statement statement = connection.createStatement()) { - return statement.executeQuery(format( - "select * from \"%s.%s\" limit %d", namespace, table, schemaScanRecords)).getMetaData(); + String query = format("SELECT * FROM \"%s.%s\" LIMIT %d", namespace, table, + connection.getConfiguration().getDriverPolicy().getSchemaBuilderMaxRecords()); + return statement.executeQuery(query).getMetaData(); } catch (SQLException e) { - logger.severe(() -> String.format("Exception in getMetadata, namespace: %s, table: %s", namespace, table)); + logger.severe(() -> format("Exception in getMetadata, namespace: %s, table: %s", namespace, table)); throw new IllegalArgumentException(e); } } diff --git a/src/main/java/com/aerospike/jdbc/AerospikePreparedStatement.java b/src/main/java/com/aerospike/jdbc/AerospikePreparedStatement.java index 976c59d..1d333b7 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikePreparedStatement.java +++ b/src/main/java/com/aerospike/jdbc/AerospikePreparedStatement.java @@ -4,7 +4,6 @@ import com.aerospike.client.Value; import com.aerospike.jdbc.model.AerospikeQuery; import com.aerospike.jdbc.model.DataColumn; -import com.aerospike.jdbc.schema.AerospikeSchemaBuilder; import com.aerospike.jdbc.sql.AerospikeResultSetMetaData; import com.aerospike.jdbc.sql.SimpleParameterMetaData; import com.aerospike.jdbc.sql.type.ByteArrayBlob; @@ -23,7 +22,6 @@ import java.util.Arrays; import java.util.Calendar; import java.util.List; -import java.util.Optional; import java.util.logging.Logger; import static com.aerospike.jdbc.util.PreparedStatement.parseParameters; @@ -34,22 +32,25 @@ public class AerospikePreparedStatement extends AerospikeStatement implements Pr private static final Logger logger = Logger.getLogger(AerospikePreparedStatement.class.getName()); private final String sql; - private final List columns; - private final AerospikeQuery query; + private final AerospikeConnection connection; private final Object[] parameterValues; + private final AerospikeQuery query; public AerospikePreparedStatement(IAerospikeClient client, AerospikeConnection connection, String sql) { super(client, connection); this.sql = sql; - int params = parseParameters(sql, 0).getValue(); - parameterValues = new Object[params]; - Arrays.fill(parameterValues, Optional.empty()); + this.connection = connection; + parameterValues = buildParameterValues(sql); try { query = parseQuery(sql); } catch (SQLException e) { throw new UnsupportedOperationException(e); } - columns = AerospikeSchemaBuilder.getSchema(query.getSchemaTable(), client); + } + + private Object[] buildParameterValues(String sql) { + int params = parseParameters(sql, 0).getValue(); + return new Object[params]; } @Override @@ -159,7 +160,7 @@ public void setBinaryStream(int parameterIndex, InputStream x, int length) throw @Override public void clearParameters() { - Arrays.fill(parameterValues, Optional.empty()); + Arrays.fill(parameterValues, null); } @Override @@ -185,7 +186,7 @@ public boolean execute() throws SQLException { } private String prepareQuery() { - return String.format(this.sql.replace("?", "%s"), parameterValues); + return format(this.sql.replace("?", "%s"), parameterValues); } @Override @@ -219,7 +220,10 @@ public void setArray(int parameterIndex, Array x) throws SQLException { } @Override - public ResultSetMetaData getMetaData() { + public ResultSetMetaData getMetaData() throws SQLException { + List columns = ((AerospikeDatabaseMetadata) connection.getMetaData()) + .getSchemaBuilder() + .getSchema(query.getSchemaTable()); return new AerospikeResultSetMetaData(query.getSchema(), query.getTable(), columns); } @@ -249,7 +253,10 @@ public void setURL(int parameterIndex, URL url) throws SQLException { } @Override - public ParameterMetaData getParameterMetaData() { + public ParameterMetaData getParameterMetaData() throws SQLException { + List columns = ((AerospikeDatabaseMetadata) connection.getMetaData()) + .getSchemaBuilder() + .getSchema(query.getSchemaTable()); return new SimpleParameterMetaData(columns); } @@ -278,7 +285,8 @@ public void setClob(int parameterIndex, Reader reader, long length) throws SQLEx try { String result = IOUtils.toString(reader); if (result.length() != length) { - throw new SQLException(format("Unexpected data length: expected %s but was %d", length, result.length())); + throw new SQLException(format("Unexpected data length: expected %s but was %d", length, + result.length())); } setObject(parameterIndex, result); } catch (IOException e) { diff --git a/src/main/java/com/aerospike/jdbc/model/DriverPolicy.java b/src/main/java/com/aerospike/jdbc/model/DriverPolicy.java index 6d29996..c9e9b54 100644 --- a/src/main/java/com/aerospike/jdbc/model/DriverPolicy.java +++ b/src/main/java/com/aerospike/jdbc/model/DriverPolicy.java @@ -7,16 +7,20 @@ public class DriverPolicy { private static final int DEFAULT_CAPACITY = 256; private static final int DEFAULT_TIMEOUT_MS = 1000; private static final int DEFAULT_METADATA_CACHE_TTL_SECONDS = 3600; + private static final int DEFAULT_SCHEMA_BUILDER_MAX_RECORDS = 1000; private final int recordSetQueueCapacity; private final int recordSetTimeoutMs; private final int metadataCacheTtlSeconds; + private final int schemaBuilderMaxRecords; public DriverPolicy(Properties properties) { recordSetQueueCapacity = parseInt(properties.getProperty("recordSetQueueCapacity"), DEFAULT_CAPACITY); recordSetTimeoutMs = parseInt(properties.getProperty("recordSetTimeoutMs"), DEFAULT_TIMEOUT_MS); metadataCacheTtlSeconds = parseInt(properties.getProperty("metadataCacheTtlSeconds"), DEFAULT_METADATA_CACHE_TTL_SECONDS); + schemaBuilderMaxRecords = parseInt(properties.getProperty("schemaBuilderMaxRecords"), + DEFAULT_SCHEMA_BUILDER_MAX_RECORDS); } public int getRecordSetQueueCapacity() { @@ -31,6 +35,10 @@ public int getMetadataCacheTtlSeconds() { return metadataCacheTtlSeconds; } + public int getSchemaBuilderMaxRecords() { + return schemaBuilderMaxRecords; + } + private int parseInt(String value, int defaultValue) { if (value != null) { return Integer.parseInt(value); diff --git a/src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java b/src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java index ebcb6d3..add4efe 100644 --- a/src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java @@ -18,7 +18,6 @@ import com.aerospike.jdbc.model.AerospikeSecondaryIndex; import com.aerospike.jdbc.model.DataColumn; import com.aerospike.jdbc.model.Pair; -import com.aerospike.jdbc.schema.AerospikeSchemaBuilder; import com.aerospike.jdbc.sql.AerospikeRecordResultSet; import java.sql.ResultSet; @@ -36,22 +35,21 @@ public class SelectQueryHandler extends BaseQueryHandler { private static final Logger logger = Logger.getLogger(SelectQueryHandler.class.getName()); - protected final Map secondaryIndexes; + protected final AerospikeDatabaseMetadata databaseMetadata; protected List columns; public SelectQueryHandler(IAerospikeClient client, Statement statement) { super(client, statement); try { - secondaryIndexes = ((AerospikeDatabaseMetadata) statement.getConnection().getMetaData()) - .getSecondaryIndexes(); + databaseMetadata = (AerospikeDatabaseMetadata) statement.getConnection().getMetaData(); } catch (SQLException e) { - throw new IllegalStateException("Failed to get secondary indexes", e); + throw new IllegalStateException("Failed to get AerospikeDatabaseMetadata", e); } } @Override public Pair execute(AerospikeQuery query) { - columns = AerospikeSchemaBuilder.getSchema(query.getSchemaTable(), client); + columns = databaseMetadata.getSchemaBuilder().getSchema(query.getSchemaTable()); Collection keyObjects = query.getPrimaryKeys(); Optional sIndex = secondaryIndex(query); Pair result; @@ -132,7 +130,7 @@ private Pair executeQuery(AerospikeQuery query, private Optional secondaryIndex(AerospikeQuery query) { if (aerospikeVersion.isSIndexSupported() && query.isIndexable()) { - Map indexMap = secondaryIndexes; + Map indexMap = databaseMetadata.getSecondaryIndexes(); List binNames = query.getPredicate().getBinNames(); if (!binNames.isEmpty() && indexMap != null && !indexMap.isEmpty()) { if (binNames.size() == 1) { diff --git a/src/main/java/com/aerospike/jdbc/schema/AerospikeSchemaBuilder.java b/src/main/java/com/aerospike/jdbc/schema/AerospikeSchemaBuilder.java index 7cc786e..370efec 100644 --- a/src/main/java/com/aerospike/jdbc/schema/AerospikeSchemaBuilder.java +++ b/src/main/java/com/aerospike/jdbc/schema/AerospikeSchemaBuilder.java @@ -4,6 +4,7 @@ import com.aerospike.client.Value; import com.aerospike.client.policy.ScanPolicy; import com.aerospike.jdbc.model.DataColumn; +import com.aerospike.jdbc.model.DriverPolicy; import com.aerospike.jdbc.model.SchemaTableName; import java.sql.Types; @@ -16,29 +17,27 @@ import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME; import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME; -import static com.aerospike.jdbc.util.Constants.schemaCacheTTLMinutes; -import static com.aerospike.jdbc.util.Constants.schemaScanRecords; public final class AerospikeSchemaBuilder { private static final Logger logger = Logger.getLogger(AerospikeSchemaBuilder.class.getName()); - private static final Duration cacheTTL = Duration.ofMinutes(schemaCacheTTLMinutes); - private static final AerospikeSchemaCache cache = new AerospikeSchemaCache(cacheTTL); + private final IAerospikeClient client; + private final AerospikeSchemaCache schemaCache; + private final int scanMaxRecords; - private AerospikeSchemaBuilder() { + public AerospikeSchemaBuilder(IAerospikeClient client, DriverPolicy driverPolicy) { + this.client = client; + schemaCache = new AerospikeSchemaCache(Duration.ofSeconds(driverPolicy.getMetadataCacheTtlSeconds())); + scanMaxRecords = driverPolicy.getSchemaBuilderMaxRecords(); } - public static void cleanSchemaCache() { - cache.clear(); - } - - public static List getSchema(SchemaTableName schemaTableName, IAerospikeClient client) { - return cache.get(schemaTableName).orElseGet(() -> { + public List getSchema(SchemaTableName schemaTableName) { + return schemaCache.get(schemaTableName).orElseGet(() -> { logger.info(() -> "Fetching SchemaTableName: " + schemaTableName); final Map columnHandles = new TreeMap<>(String::compareToIgnoreCase); ScanPolicy policy = new ScanPolicy(client.getScanPolicyDefault()); - policy.maxRecords = schemaScanRecords; + policy.maxRecords = scanMaxRecords; // add record key column handler columnHandles.put(PRIMARY_KEY_COLUMN_NAME, @@ -65,19 +64,19 @@ public static List getSchema(SchemaTableName schemaTableName, IAeros }); List columns = new ArrayList<>(columnHandles.values()); - cache.put(schemaTableName, columns); + schemaCache.put(schemaTableName, columns); return columns; }); } - private static String toSet(String tableName) { + private String toSet(String tableName) { if (tableName.equals(DEFAULT_SCHEMA_NAME)) { return null; } return tableName; } - private static int getBinType(Object value) { + private int getBinType(Object value) { int t = 0; if (value instanceof byte[] || value instanceof Value.BytesValue || value instanceof Value.ByteSegmentValue) { t = Types.VARBINARY; diff --git a/src/main/java/com/aerospike/jdbc/util/Constants.java b/src/main/java/com/aerospike/jdbc/util/Constants.java index 3c46f71..ddb7d4e 100644 --- a/src/main/java/com/aerospike/jdbc/util/Constants.java +++ b/src/main/java/com/aerospike/jdbc/util/Constants.java @@ -5,9 +5,6 @@ public final class Constants { public static final String PRIMARY_KEY_COLUMN_NAME = "__key"; public static final String DEFAULT_SCHEMA_NAME = "__default"; - public static final long schemaScanRecords = 1000L; - public static final long schemaCacheTTLMinutes = 30L; - public static final String UNSUPPORTED_QUERY_TYPE_MESSAGE = "Unsupported query type"; private Constants() { diff --git a/src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java b/src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java index b20d0c4..3f4c3dc 100644 --- a/src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java +++ b/src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java @@ -56,12 +56,14 @@ public void testParseUrlParameters() throws Exception { update.setProperty("recordSetQueueCapacity", "1024"); update.setProperty("metadataCacheTtlSeconds", "7200"); update.setProperty("recordsPerSecond", "128"); + update.setProperty("schemaBuilderMaxRecords", "500"); connection.setClientInfo(update); assertEquals(client.getScanPolicyDefault().recordsPerSecond, 128); assertTotalTimeoutAll(client, 3000); assertSendKeyAll(client, true); assertEquals(config.getDriverPolicy().getRecordSetQueueCapacity(), 1024); assertEquals(config.getDriverPolicy().getMetadataCacheTtlSeconds(), 7200); + assertEquals(config.getDriverPolicy().getSchemaBuilderMaxRecords(), 500); connection.setClientInfo("recordSetTimeoutMs", "7000"); assertEquals(config.getDriverPolicy().getRecordSetTimeoutMs(), 7000); diff --git a/src/test/java/com/aerospike/jdbc/PreparedQueriesTest.java b/src/test/java/com/aerospike/jdbc/PreparedQueriesTest.java index 775819e..2174acd 100644 --- a/src/test/java/com/aerospike/jdbc/PreparedQueriesTest.java +++ b/src/test/java/com/aerospike/jdbc/PreparedQueriesTest.java @@ -1,9 +1,7 @@ package com.aerospike.jdbc; import com.aerospike.client.Value; -import com.aerospike.jdbc.schema.AerospikeSchemaBuilder; import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -19,13 +17,6 @@ public class PreparedQueriesTest extends JdbcBaseTest { - @BeforeClass - public void initSchemaCache() throws SQLException { - setUp(); - AerospikeSchemaBuilder.cleanSchemaCache(); - tearDown(); - } - @BeforeMethod public void setUp() throws SQLException { Value.UseBoolBin = false;