Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-275 Expose maximum records configuration for schema builder #49

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import static com.aerospike.jdbc.util.AerospikeUtils.getIndexBinValuesRatio;
import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;
import static com.aerospike.jdbc.util.Constants.schemaScanRecords;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static java.sql.Connection.TRANSACTION_NONE;
Expand All @@ -50,17 +49,17 @@ public class AerospikeDatabaseMetadata implements DatabaseMetaData, SimpleWrappe
private static final String NEW_LINE = System.lineSeparator();

private final String url;
private final Connection connection;
private final AerospikeConnection connection;
private final String dbBuild;
private final String dbEdition;
private final List<String> catalogs;
private final Map<String, Collection<String>> tables;
private final Map<String, Collection<AerospikeSecondaryIndex>> catalogIndexes;
private final Map<String, AerospikeSecondaryIndex> secondaryIndexes;
private final AerospikeSchemaBuilder schemaBuilder;

public AerospikeDatabaseMetadata(String url, IAerospikeClient client, AerospikeConnection connection) {
logger.info("Init AerospikeDatabaseMetadata");
AerospikeSchemaBuilder.cleanSchemaCache();
this.url = url;
this.connection = connection;

Expand Down Expand Up @@ -103,11 +102,17 @@ public AerospikeDatabaseMetadata(String url, IAerospikeClient client, AerospikeC
.flatMap(Collection::stream)
.collect(Collectors.toMap(AerospikeSecondaryIndex::toKey, Function.identity()));

schemaBuilder = new AerospikeSchemaBuilder(client, connection.getConfiguration().getDriverPolicy());

dbBuild = join("N/A", ", ", builds);
dbEdition = join("Aerospike", ", ", editions);
catalogs = namespaces.stream().filter(n -> !"".equals(n)).collect(Collectors.toList());
}

public AerospikeSchemaBuilder getSchemaBuilder() {
return schemaBuilder;
}

@Override
public boolean allProceduresAreCallable() {
return false;
Expand Down Expand Up @@ -773,7 +778,7 @@ public ResultSet getTableTypes() {
@Override
public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern,
String columnNamePattern) throws SQLException {
logger.info(() -> String.format("AerospikeDatabaseMetadata getColumns; %s, %s, %s, %s", catalog,
logger.info(() -> format("AerospikeDatabaseMetadata getColumns; %s, %s, %s, %s", catalog,
schemaPattern, tableNamePattern, columnNamePattern));
Pattern tableNameRegex = isNullOrEmpty(tableNamePattern) ? null
: Pattern.compile(tableNamePattern.replace("%", ".*"));
Expand Down Expand Up @@ -1263,7 +1268,7 @@ private Properties initProperties(String lines) {
try {
properties.load(new StringReader(lines));
} catch (IOException e) {
logger.warning(() -> String.format("Expression in initProperties, lines: %s", lines));
logger.warning(() -> format("Expression in initProperties, lines: %s", lines));
}
return properties;
}
Expand Down Expand Up @@ -1293,17 +1298,18 @@ private int ordinal(ResultSetMetaData md, String columnName) {
}
}
} catch (SQLException e) {
logger.severe(() -> String.format("Exception in ordinal, columnName: %s", columnName));
logger.severe(() -> format("Exception in ordinal, columnName: %s", columnName));
}
return ordinal;
}

private ResultSetMetaData getMetadata(String namespace, String table) {
try (Statement statement = connection.createStatement()) {
return statement.executeQuery(format(
"select * from \"%s.%s\" limit %d", namespace, table, schemaScanRecords)).getMetaData();
String query = format("SELECT * FROM \"%s.%s\" LIMIT %d", namespace, table,
connection.getConfiguration().getDriverPolicy().getSchemaBuilderMaxRecords());
return statement.executeQuery(query).getMetaData();
} catch (SQLException e) {
logger.severe(() -> String.format("Exception in getMetadata, namespace: %s, table: %s", namespace, table));
logger.severe(() -> format("Exception in getMetadata, namespace: %s, table: %s", namespace, table));
throw new IllegalArgumentException(e);
}
}
Expand Down
34 changes: 21 additions & 13 deletions src/main/java/com/aerospike/jdbc/AerospikePreparedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.aerospike.client.Value;
import com.aerospike.jdbc.model.AerospikeQuery;
import com.aerospike.jdbc.model.DataColumn;
import com.aerospike.jdbc.schema.AerospikeSchemaBuilder;
import com.aerospike.jdbc.sql.AerospikeResultSetMetaData;
import com.aerospike.jdbc.sql.SimpleParameterMetaData;
import com.aerospike.jdbc.sql.type.ByteArrayBlob;
Expand All @@ -23,7 +22,6 @@
import java.util.Arrays;
import java.util.Calendar;
import java.util.List;
import java.util.Optional;
import java.util.logging.Logger;

import static com.aerospike.jdbc.util.PreparedStatement.parseParameters;
Expand All @@ -34,22 +32,25 @@ public class AerospikePreparedStatement extends AerospikeStatement implements Pr
private static final Logger logger = Logger.getLogger(AerospikePreparedStatement.class.getName());

private final String sql;
private final List<DataColumn> columns;
private final AerospikeQuery query;
private final AerospikeConnection connection;
private final Object[] parameterValues;
private final AerospikeQuery query;

public AerospikePreparedStatement(IAerospikeClient client, AerospikeConnection connection, String sql) {
super(client, connection);
this.sql = sql;
int params = parseParameters(sql, 0).getValue();
parameterValues = new Object[params];
Arrays.fill(parameterValues, Optional.empty());
this.connection = connection;
parameterValues = buildParameterValues(sql);
try {
query = parseQuery(sql);
} catch (SQLException e) {
throw new UnsupportedOperationException(e);
}
columns = AerospikeSchemaBuilder.getSchema(query.getSchemaTable(), client);
}

private Object[] buildParameterValues(String sql) {
int params = parseParameters(sql, 0).getValue();
return new Object[params];
}

@Override
Expand Down Expand Up @@ -159,7 +160,7 @@ public void setBinaryStream(int parameterIndex, InputStream x, int length) throw

@Override
public void clearParameters() {
Arrays.fill(parameterValues, Optional.empty());
Arrays.fill(parameterValues, null);
}

@Override
Expand All @@ -185,7 +186,7 @@ public boolean execute() throws SQLException {
}

private String prepareQuery() {
return String.format(this.sql.replace("?", "%s"), parameterValues);
return format(this.sql.replace("?", "%s"), parameterValues);
}

@Override
Expand Down Expand Up @@ -219,7 +220,10 @@ public void setArray(int parameterIndex, Array x) throws SQLException {
}

@Override
public ResultSetMetaData getMetaData() {
public ResultSetMetaData getMetaData() throws SQLException {
List<DataColumn> columns = ((AerospikeDatabaseMetadata) connection.getMetaData())
.getSchemaBuilder()
.getSchema(query.getSchemaTable());
return new AerospikeResultSetMetaData(query.getSchema(), query.getTable(), columns);
}

Expand Down Expand Up @@ -249,7 +253,10 @@ public void setURL(int parameterIndex, URL url) throws SQLException {
}

@Override
public ParameterMetaData getParameterMetaData() {
public ParameterMetaData getParameterMetaData() throws SQLException {
List<DataColumn> columns = ((AerospikeDatabaseMetadata) connection.getMetaData())
.getSchemaBuilder()
.getSchema(query.getSchemaTable());
return new SimpleParameterMetaData(columns);
}

Expand Down Expand Up @@ -278,7 +285,8 @@ public void setClob(int parameterIndex, Reader reader, long length) throws SQLEx
try {
String result = IOUtils.toString(reader);
if (result.length() != length) {
throw new SQLException(format("Unexpected data length: expected %s but was %d", length, result.length()));
throw new SQLException(format("Unexpected data length: expected %s but was %d", length,
result.length()));
}
setObject(parameterIndex, result);
} catch (IOException e) {
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/aerospike/jdbc/model/DriverPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ public class DriverPolicy {
private static final int DEFAULT_CAPACITY = 256;
private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final int DEFAULT_METADATA_CACHE_TTL_SECONDS = 3600;
private static final int DEFAULT_SCHEMA_BUILDER_MAX_RECORDS = 1000;

private final int recordSetQueueCapacity;
private final int recordSetTimeoutMs;
private final int metadataCacheTtlSeconds;
private final int schemaBuilderMaxRecords;

public DriverPolicy(Properties properties) {
recordSetQueueCapacity = parseInt(properties.getProperty("recordSetQueueCapacity"), DEFAULT_CAPACITY);
recordSetTimeoutMs = parseInt(properties.getProperty("recordSetTimeoutMs"), DEFAULT_TIMEOUT_MS);
metadataCacheTtlSeconds = parseInt(properties.getProperty("metadataCacheTtlSeconds"),
DEFAULT_METADATA_CACHE_TTL_SECONDS);
schemaBuilderMaxRecords = parseInt(properties.getProperty("schemaBuilderMaxRecords"),
DEFAULT_SCHEMA_BUILDER_MAX_RECORDS);
}

public int getRecordSetQueueCapacity() {
Expand All @@ -31,6 +35,10 @@ public int getMetadataCacheTtlSeconds() {
return metadataCacheTtlSeconds;
}

public int getSchemaBuilderMaxRecords() {
return schemaBuilderMaxRecords;
}

private int parseInt(String value, int defaultValue) {
if (value != null) {
return Integer.parseInt(value);
Expand Down
12 changes: 5 additions & 7 deletions src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.aerospike.jdbc.model.AerospikeSecondaryIndex;
import com.aerospike.jdbc.model.DataColumn;
import com.aerospike.jdbc.model.Pair;
import com.aerospike.jdbc.schema.AerospikeSchemaBuilder;
import com.aerospike.jdbc.sql.AerospikeRecordResultSet;

import java.sql.ResultSet;
Expand All @@ -36,22 +35,21 @@ public class SelectQueryHandler extends BaseQueryHandler {

private static final Logger logger = Logger.getLogger(SelectQueryHandler.class.getName());

protected final Map<String, AerospikeSecondaryIndex> secondaryIndexes;
protected final AerospikeDatabaseMetadata databaseMetadata;
protected List<DataColumn> columns;

public SelectQueryHandler(IAerospikeClient client, Statement statement) {
super(client, statement);
try {
secondaryIndexes = ((AerospikeDatabaseMetadata) statement.getConnection().getMetaData())
.getSecondaryIndexes();
databaseMetadata = (AerospikeDatabaseMetadata) statement.getConnection().getMetaData();
} catch (SQLException e) {
throw new IllegalStateException("Failed to get secondary indexes", e);
throw new IllegalStateException("Failed to get AerospikeDatabaseMetadata", e);
}
}

@Override
public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
columns = AerospikeSchemaBuilder.getSchema(query.getSchemaTable(), client);
columns = databaseMetadata.getSchemaBuilder().getSchema(query.getSchemaTable());
Collection<Object> keyObjects = query.getPrimaryKeys();
Optional<AerospikeSecondaryIndex> sIndex = secondaryIndex(query);
Pair<ResultSet, Integer> result;
Expand Down Expand Up @@ -132,7 +130,7 @@ private Pair<ResultSet, Integer> executeQuery(AerospikeQuery query,

private Optional<AerospikeSecondaryIndex> secondaryIndex(AerospikeQuery query) {
if (aerospikeVersion.isSIndexSupported() && query.isIndexable()) {
Map<String, AerospikeSecondaryIndex> indexMap = secondaryIndexes;
Map<String, AerospikeSecondaryIndex> indexMap = databaseMetadata.getSecondaryIndexes();
List<String> binNames = query.getPredicate().getBinNames();
if (!binNames.isEmpty() && indexMap != null && !indexMap.isEmpty()) {
if (binNames.size() == 1) {
Expand Down
29 changes: 14 additions & 15 deletions src/main/java/com/aerospike/jdbc/schema/AerospikeSchemaBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.aerospike.client.Value;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.jdbc.model.DataColumn;
import com.aerospike.jdbc.model.DriverPolicy;
import com.aerospike.jdbc.model.SchemaTableName;

import java.sql.Types;
Expand All @@ -16,29 +17,27 @@

import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;
import static com.aerospike.jdbc.util.Constants.schemaCacheTTLMinutes;
import static com.aerospike.jdbc.util.Constants.schemaScanRecords;

public final class AerospikeSchemaBuilder {

private static final Logger logger = Logger.getLogger(AerospikeSchemaBuilder.class.getName());

private static final Duration cacheTTL = Duration.ofMinutes(schemaCacheTTLMinutes);
private static final AerospikeSchemaCache cache = new AerospikeSchemaCache(cacheTTL);
private final IAerospikeClient client;
private final AerospikeSchemaCache schemaCache;
private final int scanMaxRecords;

private AerospikeSchemaBuilder() {
public AerospikeSchemaBuilder(IAerospikeClient client, DriverPolicy driverPolicy) {
this.client = client;
schemaCache = new AerospikeSchemaCache(Duration.ofSeconds(driverPolicy.getMetadataCacheTtlSeconds()));
scanMaxRecords = driverPolicy.getSchemaBuilderMaxRecords();
}

public static void cleanSchemaCache() {
cache.clear();
}

public static List<DataColumn> getSchema(SchemaTableName schemaTableName, IAerospikeClient client) {
return cache.get(schemaTableName).orElseGet(() -> {
public List<DataColumn> getSchema(SchemaTableName schemaTableName) {
return schemaCache.get(schemaTableName).orElseGet(() -> {
logger.info(() -> "Fetching SchemaTableName: " + schemaTableName);
final Map<String, DataColumn> columnHandles = new TreeMap<>(String::compareToIgnoreCase);
ScanPolicy policy = new ScanPolicy(client.getScanPolicyDefault());
policy.maxRecords = schemaScanRecords;
policy.maxRecords = scanMaxRecords;

// add record key column handler
columnHandles.put(PRIMARY_KEY_COLUMN_NAME,
Expand All @@ -65,19 +64,19 @@ public static List<DataColumn> getSchema(SchemaTableName schemaTableName, IAeros
});

List<DataColumn> columns = new ArrayList<>(columnHandles.values());
cache.put(schemaTableName, columns);
schemaCache.put(schemaTableName, columns);
return columns;
});
}

private static String toSet(String tableName) {
private String toSet(String tableName) {
if (tableName.equals(DEFAULT_SCHEMA_NAME)) {
return null;
}
return tableName;
}

private static int getBinType(Object value) {
private int getBinType(Object value) {
int t = 0;
if (value instanceof byte[] || value instanceof Value.BytesValue || value instanceof Value.ByteSegmentValue) {
t = Types.VARBINARY;
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/com/aerospike/jdbc/util/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ public final class Constants {
public static final String PRIMARY_KEY_COLUMN_NAME = "__key";
public static final String DEFAULT_SCHEMA_NAME = "__default";

public static final long schemaScanRecords = 1000L;
public static final long schemaCacheTTLMinutes = 30L;

public static final String UNSUPPORTED_QUERY_TYPE_MESSAGE = "Unsupported query type";

private Constants() {
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ public void testParseUrlParameters() throws Exception {
update.setProperty("recordSetQueueCapacity", "1024");
update.setProperty("metadataCacheTtlSeconds", "7200");
update.setProperty("recordsPerSecond", "128");
update.setProperty("schemaBuilderMaxRecords", "500");
connection.setClientInfo(update);
assertEquals(client.getScanPolicyDefault().recordsPerSecond, 128);
assertTotalTimeoutAll(client, 3000);
assertSendKeyAll(client, true);
assertEquals(config.getDriverPolicy().getRecordSetQueueCapacity(), 1024);
assertEquals(config.getDriverPolicy().getMetadataCacheTtlSeconds(), 7200);
assertEquals(config.getDriverPolicy().getSchemaBuilderMaxRecords(), 500);

connection.setClientInfo("recordSetTimeoutMs", "7000");
assertEquals(config.getDriverPolicy().getRecordSetTimeoutMs(), 7000);
Expand Down
9 changes: 0 additions & 9 deletions src/test/java/com/aerospike/jdbc/PreparedQueriesTest.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.aerospike.jdbc;

import com.aerospike.client.Value;
import com.aerospike.jdbc.schema.AerospikeSchemaBuilder;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand All @@ -19,13 +17,6 @@

public class PreparedQueriesTest extends JdbcBaseTest {

@BeforeClass
public void initSchemaCache() throws SQLException {
setUp();
AerospikeSchemaBuilder.cleanSchemaCache();
tearDown();
}

@BeforeMethod
public void setUp() throws SQLException {
Value.UseBoolBin = false;
Expand Down
Loading