From 4824d29dbcef5d7ca9e8aa9ec31c5210a1f8844c Mon Sep 17 00:00:00 2001 From: yrizhkov Date: Wed, 29 Nov 2023 19:17:58 +0200 Subject: [PATCH 1/4] FMWK-267 Create driver configuration instance per connection --- .../aerospike/jdbc/AerospikeConnection.java | 99 ++++++----- .../jdbc/AerospikeDatabaseMetadata.java | 11 +- .../jdbc/AerospikePreparedStatement.java | 8 +- .../aerospike/jdbc/AerospikeStatement.java | 9 +- .../com/aerospike/jdbc/async/RecordSet.java | 1 + .../async/RecordSetBatchSequenceListener.java | 8 +- .../RecordSetRecordSequenceListener.java | 8 +- .../jdbc/async/ScanQueryHandler.java | 16 +- .../async/SecondaryIndexQueryHandler.java | 17 +- .../DriverConfiguration.java} | 156 ++++++++++-------- .../jdbc/query/BaseQueryHandler.java | 11 ++ .../jdbc/query/DeleteQueryHandler.java | 9 +- .../jdbc/query/InsertQueryHandler.java | 11 +- .../aerospike/jdbc/query/PolicyBuilder.java | 33 ++-- .../jdbc/query/SelectQueryHandler.java | 26 ++- .../jdbc/query/UpdateQueryHandler.java | 9 +- .../jdbc/schema/AerospikeSchemaBuilder.java | 6 +- .../com/aerospike/jdbc/sql/BaseResultSet.java | 9 +- .../jdbc/sql/IndexToLabelResultSet.java | 3 + 19 files changed, 240 insertions(+), 210 deletions(-) rename src/main/java/com/aerospike/jdbc/{util/URLParser.java => model/DriverConfiguration.java} (57%) diff --git a/src/main/java/com/aerospike/jdbc/AerospikeConnection.java b/src/main/java/com/aerospike/jdbc/AerospikeConnection.java index 517d30b..b101e0d 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeConnection.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeConnection.java @@ -3,10 +3,10 @@ import com.aerospike.client.AerospikeClient; import com.aerospike.client.IAerospikeClient; import com.aerospike.client.policy.Policy; +import com.aerospike.jdbc.model.DriverConfiguration; import com.aerospike.jdbc.sql.SimpleWrapper; import com.aerospike.jdbc.sql.type.ByteArrayBlob; import com.aerospike.jdbc.sql.type.StringClob; -import com.aerospike.jdbc.util.URLParser; import java.sql.*; import java.util.Map; @@ -17,30 +17,35 @@ import java.util.logging.Logger; import static java.lang.String.format; -import static java.sql.ResultSet.*; +import static java.sql.ResultSet.CLOSE_CURSORS_AT_COMMIT; +import static java.sql.ResultSet.CONCUR_READ_ONLY; +import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT; +import static java.sql.ResultSet.TYPE_FORWARD_ONLY; import static java.util.Arrays.stream; import static java.util.Collections.emptyMap; public class AerospikeConnection implements Connection, SimpleWrapper { private static final Logger logger = Logger.getLogger(AerospikeConnection.class.getName()); + private static final String NOT_TRANSACTIONAL_MESSAGE = "Connection is not transactional"; private final String url; + private final DriverConfiguration config; private final IAerospikeClient client; + private final AtomicReference schema = new AtomicReference<>(null); // namespace private volatile boolean readOnly = false; - private final Properties clientInfo = new Properties(); private volatile Map> typeMap = emptyMap(); private volatile int holdability = HOLD_CURSORS_OVER_COMMIT; - private final AtomicReference schema = new AtomicReference<>(null); // namespace private volatile boolean closed; public AerospikeConnection(String url, Properties props) { this.url = url; - URLParser.parseUrl(url, props); + config = new DriverConfiguration(props); + config.parse(url); client = new AerospikeClient( - URLParser.getClientPolicy(), URLParser.getHosts() + config.getClientPolicy(), config.getHosts() ); - schema.set(URLParser.getSchema()); // namespace + schema.set(config.getSchema()); // namespace } @Override @@ -64,13 +69,13 @@ public String nativeSQL(String sql) throws SQLException { } @Override - public void setAutoCommit(boolean autoCommit) { - // do nothing + public boolean getAutoCommit() { + return true; } @Override - public boolean getAutoCommit() { - return true; + public void setAutoCommit(boolean autoCommit) { + // do nothing } @Override @@ -100,6 +105,11 @@ public DatabaseMetaData getMetaData() { return new AerospikeDatabaseMetadata(url, client, this); } + @Override + public boolean isReadOnly() { + return readOnly; + } + @Override public void setReadOnly(boolean readOnly) throws SQLException { if (!isValid(1)) { @@ -109,8 +119,8 @@ public void setReadOnly(boolean readOnly) throws SQLException { } @Override - public boolean isReadOnly() { - return readOnly; + public String getCatalog() { + return schema.get(); } @Override @@ -119,8 +129,8 @@ public void setCatalog(String catalog) { } @Override - public String getCatalog() { - return schema.get(); + public int getTransactionIsolation() { + return TRANSACTION_NONE; } @Override @@ -131,11 +141,6 @@ public void setTransactionIsolation(int level) throws SQLException { } } - @Override - public int getTransactionIsolation() { - return TRANSACTION_NONE; - } - @Override public SQLWarning getWarnings() { return null; @@ -143,7 +148,7 @@ public SQLWarning getWarnings() { @Override public void clearWarnings() { - // TODO make use of warnings + // TODO: make use of warnings } @Override @@ -152,7 +157,8 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency) th } @Override - public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) + throws SQLException { return prepareStatement(sql, resultSetType, resultSetConcurrency, holdability); } @@ -171,6 +177,11 @@ public void setTypeMap(Map> map) { typeMap = map; } + @Override + public int getHoldability() { + return holdability; + } + @Override public void setHoldability(int holdability) throws SQLException { if (isClosed()) { @@ -187,33 +198,29 @@ public void setHoldability(int holdability) throws SQLException { this.holdability = holdability; } - @Override - public int getHoldability() { - return holdability; - } - @Override public Savepoint setSavepoint() throws SQLException { - throw new SQLFeatureNotSupportedException("Connection is not transactional"); + throw new SQLFeatureNotSupportedException(NOT_TRANSACTIONAL_MESSAGE); } @Override public Savepoint setSavepoint(String name) throws SQLException { - throw new SQLFeatureNotSupportedException("Connection is not transactional"); + throw new SQLFeatureNotSupportedException(NOT_TRANSACTIONAL_MESSAGE); } @Override public void rollback(Savepoint savepoint) throws SQLException { - throw new SQLFeatureNotSupportedException("Connection is not transactional"); + throw new SQLFeatureNotSupportedException(NOT_TRANSACTIONAL_MESSAGE); } @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException { - throw new SQLFeatureNotSupportedException("Connection is not transactional"); + throw new SQLFeatureNotSupportedException(NOT_TRANSACTIONAL_MESSAGE); } @Override - public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { validateResultSetParameters(resultSetType, resultSetConcurrency, resultSetHoldability); return new AerospikeStatement(client, this); } @@ -225,7 +232,8 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res return new AerospikePreparedStatement(client, this, sql); } - private void validateResultSetParameters(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + private void validateResultSetParameters(int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { if (resultSetType != TYPE_FORWARD_ONLY) { throw new SQLFeatureNotSupportedException("ResultSet type other than TYPE_FORWARD_ONLY is not supported"); } @@ -287,22 +295,22 @@ public boolean isValid(int timeout) { @Override public void setClientInfo(String name, String value) { - clientInfo.setProperty(name, value); + config.put(name, value); } @Override - public void setClientInfo(Properties properties) { - clientInfo.putAll(properties); + public String getClientInfo(String name) { + return config.getClientInfo().getProperty(name); } @Override - public String getClientInfo(String name) { - return clientInfo.getProperty(name); + public Properties getClientInfo() { + return config.getClientInfo(); } @Override - public Properties getClientInfo() { - return clientInfo; + public void setClientInfo(Properties properties) { + config.putAll(properties); } @Override @@ -316,13 +324,13 @@ public Struct createStruct(String typeName, Object[] attributes) throws SQLExcep } @Override - public void setSchema(String schema) { - // do nothing + public String getSchema() throws SQLException { + return schema.get(); } @Override - public String getSchema() { - return schema.get(); + public void setSchema(String schema) { + // do nothing } @Override @@ -347,4 +355,7 @@ public int getNetworkTimeout() { return client.getReadPolicyDefault().totalTimeout; } + public DriverConfiguration getConfiguration() { + return config; + } } diff --git a/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java b/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java index fc381a8..5837b56 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java @@ -11,7 +11,6 @@ import com.aerospike.jdbc.sql.ListRecordSet; import com.aerospike.jdbc.sql.SimpleWrapper; import com.aerospike.jdbc.util.AerospikeUtils; -import com.aerospike.jdbc.util.URLParser; import java.io.IOException; import java.io.StringReader; @@ -46,15 +45,14 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; +@SuppressWarnings("java:S1192") public class AerospikeDatabaseMetadata implements DatabaseMetaData, SimpleWrapper { private static final Logger logger = Logger.getLogger(AerospikeDatabaseMetadata.class.getName()); private static final String NEW_LINE = System.lineSeparator(); private final String url; - private final Properties clientInfo; private final Connection connection; - private final InfoPolicy infoPolicy; private final String dbBuild; private final String dbEdition; private final List catalogs; @@ -65,13 +63,12 @@ public AerospikeDatabaseMetadata(String url, IAerospikeClient client, Connection logger.info("Init AerospikeDatabaseMetadata"); AerospikeSchemaBuilder.cleanSchemaCache(); this.url = url; - clientInfo = URLParser.getClientInfo(); this.connection = connection; - infoPolicy = client.getInfoPolicyDefault(); Collection builds = synchronizedSet(new HashSet<>()); Collection editions = synchronizedSet(new HashSet<>()); Collection namespaces = synchronizedSet(new HashSet<>()); + final InfoPolicy infoPolicy = client.getInfoPolicyDefault(); Arrays.stream(client.getNodes()).parallel() .map(node -> Info.request(infoPolicy, node, "namespaces", "sets", "sindex", "build", "edition")) .forEach(r -> { @@ -121,8 +118,8 @@ public String getURL() { } @Override - public String getUserName() { - return clientInfo.getProperty("user"); + public String getUserName() throws SQLException { + return connection.getClientInfo().getProperty("user"); } @Override diff --git a/src/main/java/com/aerospike/jdbc/AerospikePreparedStatement.java b/src/main/java/com/aerospike/jdbc/AerospikePreparedStatement.java index 38b6e29..c37af1a 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikePreparedStatement.java +++ b/src/main/java/com/aerospike/jdbc/AerospikePreparedStatement.java @@ -38,7 +38,7 @@ public class AerospikePreparedStatement extends AerospikeStatement implements Pr private final AerospikeQuery query; private final Object[] parameterValues; - public AerospikePreparedStatement(IAerospikeClient client, Connection connection, String sql) { + public AerospikePreparedStatement(IAerospikeClient client, AerospikeConnection connection, String sql) { super(client, connection); this.sql = sql; int params = parseParameters(sql, 0).getValue(); @@ -49,7 +49,8 @@ public AerospikePreparedStatement(IAerospikeClient client, Connection connection } catch (SQLException e) { throw new UnsupportedOperationException(e); } - columns = AerospikeSchemaBuilder.getSchema(query.getSchemaTable(), client); + columns = AerospikeSchemaBuilder.getSchema(query.getSchemaTable(), client, + connection.getConfiguration().getScanPolicy()); } @Override @@ -143,6 +144,9 @@ public void setAsciiStream(int parameterIndex, InputStream x, int length) throws setAsciiStream(parameterIndex, x, (long) length); } + /** + * @deprecated Use {@code setCharacterStream} + */ @Override @Deprecated public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException { diff --git a/src/main/java/com/aerospike/jdbc/AerospikeStatement.java b/src/main/java/com/aerospike/jdbc/AerospikeStatement.java index 84eeab5..106ff02 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeStatement.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeStatement.java @@ -25,6 +25,7 @@ public class AerospikeStatement implements Statement, SimpleWrapper { private static final Logger logger = Logger.getLogger(AerospikeStatement.class.getName()); + private static final String BATCH_NOT_SUPPORTED_MESSAGE = "Batch update is not supported"; protected final IAerospikeClient client; private final Connection connection; @@ -34,7 +35,7 @@ public class AerospikeStatement implements Statement, SimpleWrapper { private ResultSet resultSet; private int updateCount; - public AerospikeStatement(IAerospikeClient client, Connection connection) { + public AerospikeStatement(IAerospikeClient client, AerospikeConnection connection) { this.client = client; this.connection = connection; try { @@ -192,17 +193,17 @@ public int getResultSetType() { @Override public void addBatch(String sql) throws SQLException { - throw new SQLFeatureNotSupportedException("Batch update is not supported"); + throw new SQLFeatureNotSupportedException(BATCH_NOT_SUPPORTED_MESSAGE); } @Override public void clearBatch() throws SQLException { - throw new SQLFeatureNotSupportedException("Batch update is not supported"); + throw new SQLFeatureNotSupportedException(BATCH_NOT_SUPPORTED_MESSAGE); } @Override public int[] executeBatch() throws SQLException { - throw new SQLFeatureNotSupportedException("Batch update is not supported"); + throw new SQLFeatureNotSupportedException(BATCH_NOT_SUPPORTED_MESSAGE); } @Override diff --git a/src/main/java/com/aerospike/jdbc/async/RecordSet.java b/src/main/java/com/aerospike/jdbc/async/RecordSet.java index 9bb2165..294ee0b 100644 --- a/src/main/java/com/aerospike/jdbc/async/RecordSet.java +++ b/src/main/java/com/aerospike/jdbc/async/RecordSet.java @@ -42,6 +42,7 @@ public boolean next() interrupt(); } if (keyRecord == FAILURE) { + logger.info(() -> String.format("timeoutMs: %d", timeoutMs)); throw new AerospikeException("Aerospike asynchronous command failure"); } if (keyRecord == END) { diff --git a/src/main/java/com/aerospike/jdbc/async/RecordSetBatchSequenceListener.java b/src/main/java/com/aerospike/jdbc/async/RecordSetBatchSequenceListener.java index ed546ee..c9666df 100644 --- a/src/main/java/com/aerospike/jdbc/async/RecordSetBatchSequenceListener.java +++ b/src/main/java/com/aerospike/jdbc/async/RecordSetBatchSequenceListener.java @@ -4,16 +4,16 @@ import com.aerospike.client.BatchRead; import com.aerospike.client.listener.BatchSequenceListener; import com.aerospike.client.query.KeyRecord; -import com.aerospike.jdbc.util.URLParser; +import com.aerospike.jdbc.model.DriverPolicy; public class RecordSetBatchSequenceListener implements BatchSequenceListener { private final RecordSet recordSet; - public RecordSetBatchSequenceListener() { + public RecordSetBatchSequenceListener(DriverPolicy driverPolicy) { recordSet = new RecordSet( - URLParser.getDriverPolicy().getRecordSetQueueCapacity(), - URLParser.getDriverPolicy().getRecordSetTimeoutMs() + driverPolicy.getRecordSetQueueCapacity(), + driverPolicy.getRecordSetTimeoutMs() ); } diff --git a/src/main/java/com/aerospike/jdbc/async/RecordSetRecordSequenceListener.java b/src/main/java/com/aerospike/jdbc/async/RecordSetRecordSequenceListener.java index 50ab7b4..9155531 100644 --- a/src/main/java/com/aerospike/jdbc/async/RecordSetRecordSequenceListener.java +++ b/src/main/java/com/aerospike/jdbc/async/RecordSetRecordSequenceListener.java @@ -5,16 +5,16 @@ import com.aerospike.client.Record; import com.aerospike.client.listener.RecordSequenceListener; import com.aerospike.client.query.KeyRecord; -import com.aerospike.jdbc.util.URLParser; +import com.aerospike.jdbc.model.DriverPolicy; public class RecordSetRecordSequenceListener implements RecordSequenceListener { private final RecordSet recordSet; - public RecordSetRecordSequenceListener() { + public RecordSetRecordSequenceListener(DriverPolicy driverPolicy) { recordSet = new RecordSet( - URLParser.getDriverPolicy().getRecordSetQueueCapacity(), - URLParser.getDriverPolicy().getRecordSetTimeoutMs() + driverPolicy.getRecordSetQueueCapacity(), + driverPolicy.getRecordSetTimeoutMs() ); } diff --git a/src/main/java/com/aerospike/jdbc/async/ScanQueryHandler.java b/src/main/java/com/aerospike/jdbc/async/ScanQueryHandler.java index 2a4b112..4ff555f 100644 --- a/src/main/java/com/aerospike/jdbc/async/ScanQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/async/ScanQueryHandler.java @@ -8,33 +8,31 @@ import com.aerospike.client.policy.ScanPolicy; import com.aerospike.client.query.PartitionFilter; import com.aerospike.jdbc.model.AerospikeQuery; +import com.aerospike.jdbc.model.DriverConfiguration; import java.util.Objects; -import java.util.logging.Logger; import static com.aerospike.jdbc.util.Constants.defaultKeyName; public class ScanQueryHandler { - private static final Logger logger = Logger.getLogger(ScanQueryHandler.class.getName()); - private final IAerospikeClient client; private RecordSetRecordSequenceListener listener; private int currentPartition; private int count; - private final ScanCallback callback = ((key, record) -> { - listener.onRecord(key, record); + private final ScanCallback callback = ((key, rec) -> { + listener.onRecord(key, rec); count++; }); - public ScanQueryHandler(IAerospikeClient client) { + public ScanQueryHandler(IAerospikeClient client, DriverConfiguration config) { this.client = client; - this.listener = new RecordSetRecordSequenceListener(); + this.listener = new RecordSetRecordSequenceListener(config.getDriverPolicy()); } - public static ScanQueryHandler create(IAerospikeClient client) { - return new ScanQueryHandler(client); + public static ScanQueryHandler create(IAerospikeClient client, DriverConfiguration config) { + return new ScanQueryHandler(client, config); } public RecordSet execute(ScanPolicy scanPolicy, AerospikeQuery query) { diff --git a/src/main/java/com/aerospike/jdbc/async/SecondaryIndexQueryHandler.java b/src/main/java/com/aerospike/jdbc/async/SecondaryIndexQueryHandler.java index 37c7e63..429ca42 100644 --- a/src/main/java/com/aerospike/jdbc/async/SecondaryIndexQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/async/SecondaryIndexQueryHandler.java @@ -4,33 +4,32 @@ import com.aerospike.client.policy.QueryPolicy; import com.aerospike.jdbc.model.AerospikeQuery; import com.aerospike.jdbc.model.AerospikeSecondaryIndex; -import com.aerospike.jdbc.util.URLParser; +import com.aerospike.jdbc.model.DriverConfiguration; import java.util.Objects; import java.util.Optional; -import java.util.logging.Logger; public class SecondaryIndexQueryHandler { - private static final Logger logger = Logger.getLogger(SecondaryIndexQueryHandler.class.getName()); - private final IAerospikeClient client; + private final DriverConfiguration config; private final RecordSetRecordSequenceListener listener; - public SecondaryIndexQueryHandler(IAerospikeClient client) { + public SecondaryIndexQueryHandler(IAerospikeClient client, DriverConfiguration config) { this.client = client; - this.listener = new RecordSetRecordSequenceListener(); + this.config = config; + this.listener = new RecordSetRecordSequenceListener(config.getDriverPolicy()); } - public static SecondaryIndexQueryHandler create(IAerospikeClient client) { - return new SecondaryIndexQueryHandler(client); + public static SecondaryIndexQueryHandler create(IAerospikeClient client, DriverConfiguration config) { + return new SecondaryIndexQueryHandler(client, config); } public RecordSet execute(QueryPolicy queryPolicy, AerospikeQuery query, AerospikeSecondaryIndex secondaryIndex) { com.aerospike.client.query.Statement statement = new com.aerospike.client.query.Statement(); Optional.ofNullable(query.getLimit()).ifPresent(statement::setMaxRecords); - statement.setRecordsPerSecond(URLParser.getScanPolicy().recordsPerSecond); + statement.setRecordsPerSecond(config.getScanPolicy().recordsPerSecond); statement.setIndexName(secondaryIndex.getIndexName()); statement.setNamespace(query.getSchema()); diff --git a/src/main/java/com/aerospike/jdbc/util/URLParser.java b/src/main/java/com/aerospike/jdbc/model/DriverConfiguration.java similarity index 57% rename from src/main/java/com/aerospike/jdbc/util/URLParser.java rename to src/main/java/com/aerospike/jdbc/model/DriverConfiguration.java index db2001d..cfd96da 100644 --- a/src/main/java/com/aerospike/jdbc/util/URLParser.java +++ b/src/main/java/com/aerospike/jdbc/model/DriverConfiguration.java @@ -1,4 +1,4 @@ -package com.aerospike.jdbc.util; +package com.aerospike.jdbc.model; import com.aerospike.client.Host; import com.aerospike.client.Value; @@ -9,7 +9,6 @@ import com.aerospike.client.policy.TlsPolicy; import com.aerospike.client.policy.WritePolicy; import com.aerospike.jdbc.async.EventLoopProvider; -import com.aerospike.jdbc.model.DriverPolicy; import com.aerospike.jdbc.tls.AerospikeTLSPolicyBuilder; import com.aerospike.jdbc.tls.AerospikeTLSPolicyConfig; @@ -18,85 +17,61 @@ import java.util.Locale; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; -public final class URLParser { +public final class DriverConfiguration { - private static final Logger logger = Logger.getLogger(URLParser.class.getName()); + private static final Logger logger = Logger.getLogger(DriverConfiguration.class.getName()); private static final String DEFAULT_AEROSPIKE_PORT = "3000"; private static final Pattern AS_JDBC_URL = Pattern.compile("^jdbc:aerospike:(?://)?([^/?]+)"); private static final Pattern AS_JDBC_SCHEMA = Pattern.compile("/([^?]+)"); - private static volatile Host[] hosts; - private static volatile String schema; - private static volatile Properties clientInfo; - private static volatile ClientPolicy clientPolicy; - private static volatile WritePolicy writePolicy; - private static volatile ScanPolicy scanPolicy; - private static volatile QueryPolicy queryPolicy; - private static volatile DriverPolicy driverPolicy; - - private URLParser() { - } - - public static Host[] getHosts() { - return hosts; - } - - public static String getSchema() { - return schema; + private final ConcurrentHashMap clientInfo = new ConcurrentHashMap<>(); + private Host[] hosts; + private String schema; + private volatile ClientPolicy clientPolicy; + private volatile WritePolicy writePolicy; + private volatile ScanPolicy scanPolicy; + private volatile QueryPolicy queryPolicy; + private volatile DriverPolicy driverPolicy; + + public DriverConfiguration(Properties props) { + logger.info(() -> "Configuration properties: " + props); + clientInfo.putAll(props); } - public static Properties getClientInfo() { - return clientInfo; - } - - public static ClientPolicy getClientPolicy() { - return clientPolicy; - } - - public static WritePolicy getWritePolicy() { - return writePolicy; - } - - public static ScanPolicy getScanPolicy() { - return scanPolicy; - } - - public static QueryPolicy getQueryPolicy() { - return queryPolicy; - } - - public static DriverPolicy getDriverPolicy() { - return driverPolicy; + @SuppressWarnings("java:S2696") + public void parse(String url) { + schema = parseSchema(url); + updateClientInfo(url); + hosts = parseHosts(url, Optional.ofNullable(clientInfo.get("tlsName")) + .map(Object::toString).orElse(null)); + resetPolicies(); + Value.UseBoolBin = Optional.ofNullable(clientInfo.get("useBoolBin")) + .map(Object::toString).map(Boolean::parseBoolean).orElse(true); + logger.info(() -> "Value.UseBoolBin = " + Value.UseBoolBin); } - public static void parseUrl(String url, Properties props) { - logger.info(() -> "URL properties: " + props); - schema = parseSchema(url); - clientInfo = parseClientInfo(url, props); - hosts = parseHosts(url, clientInfo.getProperty("tlsName")); - clientPolicy = copy(clientInfo, new ClientPolicy()); + private void resetPolicies() { + clientPolicy = copy(new ClientPolicy()); clientPolicy.eventLoops = EventLoopProvider.getEventLoops(); - clientPolicy.tlsPolicy = parseTlsPolicy(clientInfo); - - writePolicy = copy(clientInfo, new WritePolicy()); - scanPolicy = copy(clientInfo, new ScanPolicy()); - queryPolicy = copy(clientInfo, new QueryPolicy()); - Value.UseBoolBin = Optional.ofNullable(clientInfo.getProperty("useBoolBin")) - .map(Boolean::parseBoolean).orElse(true); - driverPolicy = new DriverPolicy(clientInfo); - logger.info(() -> "Value.UseBoolBin = " + Value.UseBoolBin); + clientPolicy.tlsPolicy = buildTlsPolicy(); + + writePolicy = copy(new WritePolicy()); + scanPolicy = copy(new ScanPolicy()); + queryPolicy = copy(new QueryPolicy()); + driverPolicy = new DriverPolicy(getClientInfo()); } - public static T copy(Properties props, T object) { + private T copy(T object) { @SuppressWarnings("unchecked") Class clazz = (Class) object.getClass(); - props.forEach((key, value) -> { + clientInfo.forEach((key, value) -> { try { Field field = clazz.getField((String) key); if (field.getType().equals(Integer.TYPE)) { @@ -117,12 +92,12 @@ public static T copy(Properties props, T object) { return object; } - private static TlsPolicy parseTlsPolicy(Properties props) { - AerospikeTLSPolicyConfig config = AerospikeTLSPolicyConfig.fromProperties(props); + private TlsPolicy buildTlsPolicy() { + AerospikeTLSPolicyConfig config = AerospikeTLSPolicyConfig.fromProperties(getClientInfo()); return new AerospikeTLSPolicyBuilder(config).build(); } - private static Host[] parseHosts(String url, final String tlsName) { + private Host[] parseHosts(String url, final String tlsName) { Matcher m = AS_JDBC_URL.matcher(url); if (!m.find()) { throw new IllegalArgumentException("Cannot parse URL " + url); @@ -134,23 +109,64 @@ private static Host[] parseHosts(String url, final String tlsName) { .toArray(Host[]::new); } - private static String parseSchema(String url) { + private String parseSchema(String url) { Matcher m = AS_JDBC_SCHEMA.matcher(url); return m.find() ? m.group(1) : null; } - private static Properties parseClientInfo(String url, Properties props) { - Properties all = new Properties(); - all.putAll(props); + private void updateClientInfo(String url) { int questionPos = url.indexOf('?'); if (questionPos > 0 && questionPos < url.length() - 1) { Arrays.stream(url.substring(questionPos + 1).split("&")).forEach(p -> { String[] kv = p.split("="); if (kv.length > 1) { - all.setProperty(kv[0], kv[1]); + clientInfo.put(kv[0], kv[1]); } }); } - return all; + } + + public Host[] getHosts() { + return hosts; + } + + public String getSchema() { + return schema; + } + + public Properties getClientInfo() { + Properties properties = new Properties(); + properties.putAll(clientInfo); + return properties; + } + + public void put(String name, String value) { + clientInfo.put(name, value); + resetPolicies(); + } + + public void putAll(Properties properties) { + clientInfo.putAll(properties); + resetPolicies(); + } + + public ClientPolicy getClientPolicy() { + return clientPolicy; + } + + public WritePolicy getWritePolicy() { + return writePolicy; + } + + public ScanPolicy getScanPolicy() { + return scanPolicy; + } + + public QueryPolicy getQueryPolicy() { + return queryPolicy; + } + + public DriverPolicy getDriverPolicy() { + return driverPolicy; } } diff --git a/src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java b/src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java index 9a987c8..83590fa 100644 --- a/src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java @@ -3,9 +3,12 @@ import com.aerospike.client.Bin; import com.aerospike.client.IAerospikeClient; import com.aerospike.client.Value; +import com.aerospike.jdbc.AerospikeConnection; import com.aerospike.jdbc.model.AerospikeQuery; +import com.aerospike.jdbc.model.DriverConfiguration; import com.aerospike.jdbc.sql.ListRecordSet; +import java.sql.SQLException; import java.sql.Statement; import java.util.List; @@ -15,10 +18,18 @@ public abstract class BaseQueryHandler implements QueryHandler { protected final IAerospikeClient client; protected final Statement statement; + protected final DriverConfiguration config; + protected final PolicyBuilder policyBuilder; protected BaseQueryHandler(IAerospikeClient client, Statement statement) { this.client = client; this.statement = statement; + try { + config = ((AerospikeConnection) statement.getConnection()).getConfiguration(); + } catch (SQLException e) { + throw new IllegalStateException("Failed to get connection"); + } + policyBuilder = new PolicyBuilder(config); } protected Bin[] getBins(AerospikeQuery query) { diff --git a/src/main/java/com/aerospike/jdbc/query/DeleteQueryHandler.java b/src/main/java/com/aerospike/jdbc/query/DeleteQueryHandler.java index 19ab3e3..f2bdd43 100644 --- a/src/main/java/com/aerospike/jdbc/query/DeleteQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/query/DeleteQueryHandler.java @@ -19,9 +19,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; -import static com.aerospike.jdbc.query.PolicyBuilder.buildScanPolicy; -import static com.aerospike.jdbc.query.PolicyBuilder.buildWritePolicy; - public class DeleteQueryHandler extends BaseQueryHandler { private static final Logger logger = Logger.getLogger(DeleteQueryHandler.class.getName()); @@ -33,7 +30,7 @@ public DeleteQueryHandler(IAerospikeClient client, Statement statement) { @Override public Pair execute(AerospikeQuery query) { Collection keyObjects = query.getPrimaryKeys(); - final WritePolicy writePolicy = buildWritePolicy(query); + final WritePolicy writePolicy = policyBuilder.buildWritePolicy(query); if (!keyObjects.isEmpty()) { logger.info("DELETE primary key"); FutureDeleteListener listener = new FutureDeleteListener(keyObjects.size()); @@ -53,8 +50,8 @@ public Pair execute(AerospikeQuery query) { } } else { logger.info("DELETE scan"); - RecordSetRecordSequenceListener listener = new RecordSetRecordSequenceListener(); - ScanPolicy scanPolicy = buildScanPolicy(query); + RecordSetRecordSequenceListener listener = new RecordSetRecordSequenceListener(config.getDriverPolicy()); + ScanPolicy scanPolicy = policyBuilder.buildScanPolicy(query); scanPolicy.includeBinData = false; client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(), query.getSetName()); diff --git a/src/main/java/com/aerospike/jdbc/query/InsertQueryHandler.java b/src/main/java/com/aerospike/jdbc/query/InsertQueryHandler.java index 06ab07b..888e0f9 100644 --- a/src/main/java/com/aerospike/jdbc/query/InsertQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/query/InsertQueryHandler.java @@ -22,7 +22,6 @@ import java.util.logging.Logger; import java.util.stream.Collectors; -import static com.aerospike.jdbc.query.PolicyBuilder.buildCreateOnlyPolicy; import static com.aerospike.jdbc.util.Constants.defaultKeyName; public class InsertQueryHandler extends BaseQueryHandler { @@ -48,10 +47,10 @@ public Pair putConsecutively(AerospikeQuery query) { .collect(Collectors.toList()); FutureWriteListener listener = new FutureWriteListener(query.getValues().size()); - WritePolicy writePolicy = buildCreateOnlyPolicy(); - for (Object record : query.getValues()) { + WritePolicy writePolicy = policyBuilder.buildCreateOnlyPolicy(); + for (Object aerospikeRecord : query.getValues()) { @SuppressWarnings("unchecked") - List values = (List) record; + List values = (List) aerospikeRecord; Value recordKey = extractInsertKey(query, values); Key key = new Key(query.getSchema(), query.getSetName(), recordKey); Bin[] bins = buildBinArray(binNames, values); @@ -81,9 +80,9 @@ public Pair putBatch(AerospikeQuery query) { batchWritePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY; batchWritePolicy.sendKey = true; - for (Object record : query.getValues()) { + for (Object aerospikeRecord : query.getValues()) { @SuppressWarnings("unchecked") - List values = (List) record; + List values = (List) aerospikeRecord; Value recordKey = extractInsertKey(query, values); Key key = new Key(query.getSchema(), query.getSetName(), recordKey); batchRecords.add( diff --git a/src/main/java/com/aerospike/jdbc/query/PolicyBuilder.java b/src/main/java/com/aerospike/jdbc/query/PolicyBuilder.java index b27f605..4e9beb9 100644 --- a/src/main/java/com/aerospike/jdbc/query/PolicyBuilder.java +++ b/src/main/java/com/aerospike/jdbc/query/PolicyBuilder.java @@ -7,17 +7,20 @@ import com.aerospike.client.policy.ScanPolicy; import com.aerospike.client.policy.WritePolicy; import com.aerospike.jdbc.model.AerospikeQuery; -import com.aerospike.jdbc.util.URLParser; +import com.aerospike.jdbc.model.DriverConfiguration; import java.util.Objects; -public final class PolicyBuilder { +public class PolicyBuilder { - private PolicyBuilder() { + protected final DriverConfiguration config; + + public PolicyBuilder(DriverConfiguration config) { + this.config = config; } - public static ScanPolicy buildScanPolicy(AerospikeQuery query) { - ScanPolicy scanPolicy = new ScanPolicy(URLParser.getScanPolicy()); + public ScanPolicy buildScanPolicy(AerospikeQuery query) { + ScanPolicy scanPolicy = new ScanPolicy(config.getScanPolicy()); scanPolicy.maxRecords = Objects.isNull(query.getLimit()) ? 0 : query.getLimit(); scanPolicy.filterExp = Objects.isNull(query.getPredicate()) ? null : Exp.build(query.getPredicate().toFilterExpression()); @@ -25,43 +28,43 @@ public static ScanPolicy buildScanPolicy(AerospikeQuery query) { return scanPolicy; } - public static QueryPolicy buildQueryPolicy(AerospikeQuery query) { - QueryPolicy queryPolicy = new QueryPolicy(URLParser.getQueryPolicy()); + public QueryPolicy buildQueryPolicy(AerospikeQuery query) { + QueryPolicy queryPolicy = new QueryPolicy(config.getQueryPolicy()); queryPolicy.filterExp = Objects.isNull(query.getPredicate()) ? null : Exp.build(query.getPredicate().toFilterExpression()); return queryPolicy; } - public static ScanPolicy buildScanNoBinDataPolicy(AerospikeQuery query) { + public ScanPolicy buildScanNoBinDataPolicy(AerospikeQuery query) { ScanPolicy scanPolicy = buildScanPolicy(query); scanPolicy.includeBinData = false; return scanPolicy; } - public static WritePolicy buildWritePolicy(AerospikeQuery query) { - WritePolicy writePolicy = new WritePolicy(URLParser.getWritePolicy()); + public WritePolicy buildWritePolicy(AerospikeQuery query) { + WritePolicy writePolicy = new WritePolicy(config.getWritePolicy()); writePolicy.filterExp = Objects.isNull(query.getPredicate()) ? null : Exp.build(query.getPredicate().toFilterExpression()); writePolicy.sendKey = true; return writePolicy; } - public static BatchReadPolicy buildBatchReadPolicy(AerospikeQuery query) { + public BatchReadPolicy buildBatchReadPolicy(AerospikeQuery query) { BatchReadPolicy policy = new BatchReadPolicy(); policy.filterExp = Objects.isNull(query.getPredicate()) ? null : Exp.build(query.getPredicate().toFilterExpression()); return policy; } - public static WritePolicy buildCreateOnlyPolicy() { - WritePolicy writePolicy = new WritePolicy(URLParser.getWritePolicy()); + public WritePolicy buildCreateOnlyPolicy() { + WritePolicy writePolicy = new WritePolicy(config.getWritePolicy()); writePolicy.sendKey = true; writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY; return writePolicy; } - public static WritePolicy buildUpdateOnlyPolicy() { - WritePolicy writePolicy = new WritePolicy(URLParser.getWritePolicy()); + public WritePolicy buildUpdateOnlyPolicy() { + WritePolicy writePolicy = new WritePolicy(config.getWritePolicy()); writePolicy.sendKey = true; writePolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY; return writePolicy; diff --git a/src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java b/src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java index b86f88c..fd6ade3 100644 --- a/src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java @@ -19,7 +19,6 @@ import com.aerospike.jdbc.model.Pair; import com.aerospike.jdbc.schema.AerospikeSchemaBuilder; import com.aerospike.jdbc.sql.AerospikeRecordResultSet; -import com.aerospike.jdbc.util.URLParser; import com.aerospike.jdbc.util.VersionUtils; import java.sql.ResultSet; @@ -30,10 +29,6 @@ import java.util.logging.Logger; import java.util.stream.Collectors; -import static com.aerospike.jdbc.query.PolicyBuilder.buildBatchReadPolicy; -import static com.aerospike.jdbc.query.PolicyBuilder.buildQueryPolicy; -import static com.aerospike.jdbc.query.PolicyBuilder.buildScanNoBinDataPolicy; -import static com.aerospike.jdbc.query.PolicyBuilder.buildScanPolicy; import static com.aerospike.jdbc.util.AerospikeUtils.getTableRecordsNumber; public class SelectQueryHandler extends BaseQueryHandler { @@ -48,7 +43,7 @@ public SelectQueryHandler(IAerospikeClient client, Statement statement) { @Override public Pair execute(AerospikeQuery query) { - columns = AerospikeSchemaBuilder.getSchema(query.getSchemaTable(), client); + columns = AerospikeSchemaBuilder.getSchema(query.getSchemaTable(), client, config.getScanPolicy()); Collection keyObjects = query.getPrimaryKeys(); Optional sIndex = secondaryIndex(query); Pair result; @@ -70,8 +65,8 @@ private Pair executeCountQuery(AerospikeQuery query) { if (Objects.isNull(query.getPredicate())) { recordNumber = getTableRecordsNumber(client, query.getSchema(), query.getTable()); } else { - ScanPolicy policy = buildScanNoBinDataPolicy(query); - RecordSet recordSet = ScanQueryHandler.create(client).execute(policy, query); + ScanPolicy policy = policyBuilder.buildScanNoBinDataPolicy(query); + RecordSet recordSet = ScanQueryHandler.create(client, config).execute(policy, query); final AtomicInteger count = new AtomicInteger(); recordSet.forEach(r -> count.incrementAndGet()); @@ -80,7 +75,7 @@ private Pair executeCountQuery(AerospikeQuery query) { com.aerospike.client.Record aeroRecord = new com.aerospike.client.Record(Collections.singletonMap( countLabel, recordNumber), 1, 0); - RecordSet recordSet = new RecordSet(2, URLParser.getDriverPolicy().getRecordSetTimeoutMs()); + RecordSet recordSet = new RecordSet(2, config.getDriverPolicy().getRecordSetTimeoutMs()); recordSet.put(new KeyRecord(null, aeroRecord)); recordSet.close(); @@ -93,12 +88,12 @@ private Pair executeCountQuery(AerospikeQuery query) { private Pair executeSelectByPrimaryKey(AerospikeQuery query, Collection keyObjects) { logger.info(() -> "SELECT primary key"); - final BatchReadPolicy policy = buildBatchReadPolicy(query); + final BatchReadPolicy policy = policyBuilder.buildBatchReadPolicy(query); List batchReadList = keyObjects.stream() .map(k -> new BatchRead(policy, new Key(query.getSchema(), query.getSetName(), Value.get(k)), true)) .collect(Collectors.toList()); - RecordSetBatchSequenceListener listener = new RecordSetBatchSequenceListener(); + RecordSetBatchSequenceListener listener = new RecordSetBatchSequenceListener(config.getDriverPolicy()); client.get(EventLoopProvider.getEventLoop(), listener, null, batchReadList); return new Pair<>(new AerospikeRecordResultSet(listener.getRecordSet(), statement, query.getSchema(), @@ -108,8 +103,8 @@ private Pair executeSelectByPrimaryKey(AerospikeQuery query, private Pair executeScan(AerospikeQuery query) { logger.info(() -> "SELECT scan " + (Objects.nonNull(query.getOffset()) ? "partition" : "all")); - ScanPolicy policy = buildScanPolicy(query); - RecordSet recordSet = ScanQueryHandler.create(client).execute(policy, query); + ScanPolicy policy = policyBuilder.buildScanPolicy(query); + RecordSet recordSet = ScanQueryHandler.create(client, config).execute(policy, query); return new Pair<>(new AerospikeRecordResultSet(recordSet, statement, query.getSchema(), query.getTable(), filterColumns(columns, query.getBinNames())), -1); @@ -119,8 +114,9 @@ private Pair executeQuery(AerospikeQuery query, AerospikeSecondaryIndex secondaryIndex) { logger.info(() -> "SELECT secondary index query for column: " + secondaryIndex.getBinName()); - QueryPolicy policy = buildQueryPolicy(query); - RecordSet recordSet = SecondaryIndexQueryHandler.create(client).execute(policy, query, secondaryIndex); + QueryPolicy policy = policyBuilder.buildQueryPolicy(query); + RecordSet recordSet = SecondaryIndexQueryHandler.create(client, config) + .execute(policy, query, secondaryIndex); return new Pair<>(new AerospikeRecordResultSet(recordSet, statement, query.getSchema(), query.getTable(), filterColumns(columns, query.getBinNames())), -1); diff --git a/src/main/java/com/aerospike/jdbc/query/UpdateQueryHandler.java b/src/main/java/com/aerospike/jdbc/query/UpdateQueryHandler.java index 8784028..87056bc 100644 --- a/src/main/java/com/aerospike/jdbc/query/UpdateQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/query/UpdateQueryHandler.java @@ -20,9 +20,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; -import static com.aerospike.jdbc.query.PolicyBuilder.buildScanPolicy; -import static com.aerospike.jdbc.query.PolicyBuilder.buildUpdateOnlyPolicy; - public class UpdateQueryHandler extends BaseQueryHandler { private static final Logger logger = Logger.getLogger(UpdateQueryHandler.class.getName()); @@ -35,7 +32,7 @@ public UpdateQueryHandler(IAerospikeClient client, Statement statement) { public Pair execute(AerospikeQuery query) { Collection keyObjects = query.getPrimaryKeys(); final Bin[] bins = getBins(query); - final WritePolicy writePolicy = buildUpdateOnlyPolicy(); + final WritePolicy writePolicy = policyBuilder.buildUpdateOnlyPolicy(); if (!keyObjects.isEmpty()) { logger.info("UPDATE primary key"); FutureWriteListener listener = new FutureWriteListener(keyObjects.size()); @@ -55,8 +52,8 @@ public Pair execute(AerospikeQuery query) { } } else { logger.info("UPDATE scan"); - RecordSetRecordSequenceListener listener = new RecordSetRecordSequenceListener(); - ScanPolicy scanPolicy = buildScanPolicy(query); + RecordSetRecordSequenceListener listener = new RecordSetRecordSequenceListener(config.getDriverPolicy()); + ScanPolicy scanPolicy = policyBuilder.buildScanPolicy(query); scanPolicy.includeBinData = false; client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(), query.getSetName()); diff --git a/src/main/java/com/aerospike/jdbc/schema/AerospikeSchemaBuilder.java b/src/main/java/com/aerospike/jdbc/schema/AerospikeSchemaBuilder.java index b047276..7b46503 100644 --- a/src/main/java/com/aerospike/jdbc/schema/AerospikeSchemaBuilder.java +++ b/src/main/java/com/aerospike/jdbc/schema/AerospikeSchemaBuilder.java @@ -5,7 +5,6 @@ import com.aerospike.client.policy.ScanPolicy; import com.aerospike.jdbc.model.DataColumn; import com.aerospike.jdbc.model.SchemaTableName; -import com.aerospike.jdbc.util.URLParser; import java.sql.Types; import java.time.Duration; @@ -34,11 +33,12 @@ public static void cleanSchemaCache() { cache.clear(); } - public static List getSchema(SchemaTableName schemaTableName, IAerospikeClient client) { + public static List getSchema(SchemaTableName schemaTableName, IAerospikeClient client, + ScanPolicy scanPolicy) { return cache.get(schemaTableName).orElseGet(() -> { logger.info(() -> "Fetching SchemaTableName: " + schemaTableName); final Map columnHandles = new TreeMap<>(String::compareToIgnoreCase); - ScanPolicy policy = new ScanPolicy(URLParser.getScanPolicy()); + ScanPolicy policy = new ScanPolicy(scanPolicy); policy.maxRecords = schemaScanRecords; // add record key column handler diff --git a/src/main/java/com/aerospike/jdbc/sql/BaseResultSet.java b/src/main/java/com/aerospike/jdbc/sql/BaseResultSet.java index b53bb48..8465a58 100644 --- a/src/main/java/com/aerospike/jdbc/sql/BaseResultSet.java +++ b/src/main/java/com/aerospike/jdbc/sql/BaseResultSet.java @@ -12,7 +12,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.logging.Logger; import java.util.stream.IntStream; import static java.lang.String.format; @@ -20,16 +19,14 @@ public abstract class BaseResultSet implements ResultSet, IndexToLabelResultSet, UpdateResultSet, SimpleWrapper { - private static final Logger logger = Logger.getLogger(BaseResultSet.class.getName()); - protected final String schema; protected final String table; protected final List columns; private final Statement statement; private final ResultSetMetaData metadata; protected volatile int index; - protected boolean afterLast = false; - private boolean wasNull = false; + protected boolean afterLast; + private boolean wasNull; private volatile boolean closed; protected BaseResultSet(Statement statement, String schema, String table, List columns) { @@ -232,7 +229,7 @@ public Ref getRef(String columnLabel) throws SQLException { @Override public Blob getBlob(String columnLabel) throws SQLException { - throw new SQLFeatureNotSupportedException("getBlob"); // TODO check cast + throw new SQLFeatureNotSupportedException("getBlob"); } @Override diff --git a/src/main/java/com/aerospike/jdbc/sql/IndexToLabelResultSet.java b/src/main/java/com/aerospike/jdbc/sql/IndexToLabelResultSet.java index acf2aa6..9607b0f 100644 --- a/src/main/java/com/aerospike/jdbc/sql/IndexToLabelResultSet.java +++ b/src/main/java/com/aerospike/jdbc/sql/IndexToLabelResultSet.java @@ -81,6 +81,9 @@ default InputStream getAsciiStream(int columnIndex) throws SQLException { return getAsciiStream(getColumnLabel(columnIndex)); } + /** + * @deprecated use getCharacterStream in place of getUnicodeStream + */ @Override @Deprecated default InputStream getUnicodeStream(int columnIndex) throws SQLException { From 064bfa562082547c00177c3cfd5649b848db123e Mon Sep 17 00:00:00 2001 From: yrizhkov Date: Thu, 30 Nov 2023 09:49:33 +0200 Subject: [PATCH 2/4] use ClientPolicy factory method --- .../com/aerospike/jdbc/AerospikeConnection.java | 4 +--- .../jdbc/model/DriverConfiguration.java | 16 ++++++++++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/aerospike/jdbc/AerospikeConnection.java b/src/main/java/com/aerospike/jdbc/AerospikeConnection.java index b101e0d..997cd09 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeConnection.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeConnection.java @@ -42,9 +42,7 @@ public AerospikeConnection(String url, Properties props) { this.url = url; config = new DriverConfiguration(props); config.parse(url); - client = new AerospikeClient( - config.getClientPolicy(), config.getHosts() - ); + client = new AerospikeClient(config.getClientPolicy(), config.getHosts()); schema.set(config.getSchema()); // namespace } diff --git a/src/main/java/com/aerospike/jdbc/model/DriverConfiguration.java b/src/main/java/com/aerospike/jdbc/model/DriverConfiguration.java index cfd96da..089a835 100644 --- a/src/main/java/com/aerospike/jdbc/model/DriverConfiguration.java +++ b/src/main/java/com/aerospike/jdbc/model/DriverConfiguration.java @@ -32,8 +32,8 @@ public final class DriverConfiguration { private static final Pattern AS_JDBC_SCHEMA = Pattern.compile("/([^?]+)"); private final ConcurrentHashMap clientInfo = new ConcurrentHashMap<>(); - private Host[] hosts; - private String schema; + private volatile Host[] hosts; + private volatile String schema; private volatile ClientPolicy clientPolicy; private volatile WritePolicy writePolicy; private volatile ScanPolicy scanPolicy; @@ -51,17 +51,21 @@ public void parse(String url) { updateClientInfo(url); hosts = parseHosts(url, Optional.ofNullable(clientInfo.get("tlsName")) .map(Object::toString).orElse(null)); + clientPolicy = buildClientPolicy(); resetPolicies(); Value.UseBoolBin = Optional.ofNullable(clientInfo.get("useBoolBin")) .map(Object::toString).map(Boolean::parseBoolean).orElse(true); logger.info(() -> "Value.UseBoolBin = " + Value.UseBoolBin); } - private void resetPolicies() { - clientPolicy = copy(new ClientPolicy()); - clientPolicy.eventLoops = EventLoopProvider.getEventLoops(); - clientPolicy.tlsPolicy = buildTlsPolicy(); + private ClientPolicy buildClientPolicy() { + ClientPolicy policy = copy(new ClientPolicy()); + policy.eventLoops = EventLoopProvider.getEventLoops(); + policy.tlsPolicy = buildTlsPolicy(); + return policy; + } + private void resetPolicies() { writePolicy = copy(new WritePolicy()); scanPolicy = copy(new ScanPolicy()); queryPolicy = copy(new QueryPolicy()); From 69822d683adf9c9dfa4b4a272ff0fe3e5d82308e Mon Sep 17 00:00:00 2001 From: yrizhkov Date: Thu, 30 Nov 2023 09:50:39 +0200 Subject: [PATCH 3/4] add test coverage for configuration properties --- .../com/aerospike/jdbc/ParseJdbcUrlTest.java | 53 +++++++++++++++++-- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java b/src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java index aaf372f..8d925fb 100644 --- a/src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java +++ b/src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java @@ -1,8 +1,16 @@ package com.aerospike.jdbc; +import com.aerospike.client.Value; +import com.aerospike.client.policy.AuthMode; +import com.aerospike.jdbc.model.DriverConfiguration; import org.testng.annotations.Test; import java.sql.DriverManager; +import java.util.Properties; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; public class ParseJdbcUrlTest { @@ -10,9 +18,48 @@ public class ParseJdbcUrlTest { public void testParseUrlParameters() throws Exception { Class.forName("com.aerospike.jdbc.AerospikeDriver").newInstance(); String url = String.format( - "jdbc:aerospike:%s:%d/%s?timeout=%d&useServicesAlternate=%b&maxRecords=%d&authMode=%s", - "localhost", 3000, "test", 512, true, 64L, "external_insecure" + "jdbc:aerospike:%s:%d/%s?%s=%d&%s=%d&%s=%b&%s=%s&%s=%d&%s=%b", + "localhost", 3000, "test", + "timeout", 512, + "totalTimeout", 2000, + "useServicesAlternate", true, + "authMode", "external_insecure", + "recordSetTimeoutMs", 5000, + "useBoolBin", false ); - DriverManager.getConnection(url); + AerospikeConnection connection = (AerospikeConnection) DriverManager.getConnection(url); + Properties properties = connection.getClientInfo(); + + assertEquals(properties.getProperty("timeout"), "512"); + assertEquals(properties.getProperty("totalTimeout"), "2000"); + assertEquals(properties.getProperty("useServicesAlternate"), "true"); + assertEquals(properties.getProperty("authMode"), "external_insecure"); + assertEquals(properties.getProperty("recordSetTimeoutMs"), "5000"); + assertEquals(properties.getProperty("useBoolBin"), "false"); + + DriverConfiguration config = connection.getConfiguration(); + assertEquals(config.getClientPolicy().timeout, 512); + assertEquals(config.getQueryPolicy().totalTimeout, 2000); + assertEquals(config.getWritePolicy().totalTimeout, 2000); + assertEquals(config.getScanPolicy().totalTimeout, 2000); + assertTrue(config.getClientPolicy().useServicesAlternate); + assertEquals(config.getClientPolicy().authMode, AuthMode.EXTERNAL_INSECURE); + assertEquals(config.getDriverPolicy().getRecordSetTimeoutMs(), 5000); + assertFalse(Value.UseBoolBin); + assertFalse(config.getQueryPolicy().sendKey); + + Properties update = new Properties(); + update.setProperty("totalTimeout", "3000"); + update.setProperty("sendKey", "true"); + connection.setClientInfo(update); + assertEquals(config.getQueryPolicy().totalTimeout, 3000); + assertEquals(config.getWritePolicy().totalTimeout, 3000); + assertEquals(config.getScanPolicy().totalTimeout, 3000); + assertTrue(config.getQueryPolicy().sendKey); + assertTrue(config.getWritePolicy().sendKey); + assertTrue(config.getScanPolicy().sendKey); + + connection.setClientInfo("recordSetTimeoutMs", "7000"); + assertEquals(config.getDriverPolicy().getRecordSetTimeoutMs(), 7000); } } From 8e80277b90fb6a380aa14eb3789275779071b419 Mon Sep 17 00:00:00 2001 From: yrizhkov Date: Thu, 30 Nov 2023 09:59:18 +0200 Subject: [PATCH 4/4] add cause to exception --- src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java b/src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java index 83590fa..a52aec3 100644 --- a/src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java @@ -27,7 +27,7 @@ protected BaseQueryHandler(IAerospikeClient client, Statement statement) { try { config = ((AerospikeConnection) statement.getConnection()).getConfiguration(); } catch (SQLException e) { - throw new IllegalStateException("Failed to get connection"); + throw new IllegalStateException("Failed to get configuration", e); } policyBuilder = new PolicyBuilder(config); }