Skip to content

Commit

Permalink
FMWK-271 Create new policies based on client default settings (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Dec 3, 2023
1 parent a9d08a4 commit d612f92
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 98 deletions.
11 changes: 8 additions & 3 deletions src/main/java/com/aerospike/jdbc/AerospikeConnection.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.aerospike.jdbc;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.policy.Policy;
import com.aerospike.jdbc.model.DriverConfiguration;
Expand Down Expand Up @@ -41,10 +40,10 @@ public class AerospikeConnection implements Connection, SimpleWrapper {
private volatile boolean closed;

public AerospikeConnection(String url, Properties props) {
logger.info("Init AerospikeConnection");
this.url = url;
config = new DriverConfiguration(props);
config.parse(url);
client = new AerospikeClient(config.getClientPolicy(), config.getHosts());
client = config.parse(url);
metadataBuilder = new DatabaseMetadataBuilder(config.getDriverPolicy());
schema.set(config.getSchema()); // namespace
}
Expand Down Expand Up @@ -296,6 +295,7 @@ public boolean isValid(int timeout) {

@Override
public void setClientInfo(String name, String value) {
logger.info(() -> format("Set client info: %s -> %s", name, value));
config.put(name, value);
}

Expand All @@ -311,6 +311,7 @@ public Properties getClientInfo() {

@Override
public void setClientInfo(Properties properties) {
logger.info(() -> format("Set client info: %s", properties));
config.putAll(properties);
}

Expand Down Expand Up @@ -359,4 +360,8 @@ public int getNetworkTimeout() {
public DriverConfiguration getConfiguration() {
return config;
}

public IAerospikeClient getClient() {
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ public AerospikePreparedStatement(IAerospikeClient client, AerospikeConnection c
} catch (SQLException e) {
throw new UnsupportedOperationException(e);
}
columns = AerospikeSchemaBuilder.getSchema(query.getSchemaTable(), client,
connection.getConfiguration().getScanPolicy());
columns = AerospikeSchemaBuilder.getSchema(query.getSchemaTable(), client);
}

@Override
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/aerospike/jdbc/async/ScanQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.query.PartitionFilter;
import com.aerospike.jdbc.model.AerospikeQuery;
import com.aerospike.jdbc.model.DriverConfiguration;
import com.aerospike.jdbc.model.DriverPolicy;

import java.util.Objects;

Expand All @@ -26,13 +26,13 @@ public class ScanQueryHandler {
count++;
});

public ScanQueryHandler(IAerospikeClient client, DriverConfiguration config) {
public ScanQueryHandler(IAerospikeClient client, DriverPolicy driverPolicy) {
this.client = client;
this.listener = new RecordSetRecordSequenceListener(config.getDriverPolicy());
this.listener = new RecordSetRecordSequenceListener(driverPolicy);
}

public static ScanQueryHandler create(IAerospikeClient client, DriverConfiguration config) {
return new ScanQueryHandler(client, config);
public static ScanQueryHandler create(IAerospikeClient client, DriverPolicy driverPolicy) {
return new ScanQueryHandler(client, driverPolicy);
}

public RecordSet execute(ScanPolicy scanPolicy, AerospikeQuery query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,30 @@
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.jdbc.model.AerospikeQuery;
import com.aerospike.jdbc.model.AerospikeSecondaryIndex;
import com.aerospike.jdbc.model.DriverConfiguration;
import com.aerospike.jdbc.model.DriverPolicy;

import java.util.Objects;
import java.util.Optional;

public class SecondaryIndexQueryHandler {

private final IAerospikeClient client;
private final DriverConfiguration config;
private final RecordSetRecordSequenceListener listener;

public SecondaryIndexQueryHandler(IAerospikeClient client, DriverConfiguration config) {
public SecondaryIndexQueryHandler(IAerospikeClient client, DriverPolicy driverPolicy) {
this.client = client;
this.config = config;
this.listener = new RecordSetRecordSequenceListener(config.getDriverPolicy());
this.listener = new RecordSetRecordSequenceListener(driverPolicy);
}

public static SecondaryIndexQueryHandler create(IAerospikeClient client, DriverConfiguration config) {
return new SecondaryIndexQueryHandler(client, config);
public static SecondaryIndexQueryHandler create(IAerospikeClient client, DriverPolicy driverPolicy) {
return new SecondaryIndexQueryHandler(client, driverPolicy);
}

public RecordSet execute(QueryPolicy queryPolicy, AerospikeQuery query,
AerospikeSecondaryIndex secondaryIndex) {
com.aerospike.client.query.Statement statement = new com.aerospike.client.query.Statement();
Optional.ofNullable(query.getLimit()).ifPresent(statement::setMaxRecords);
statement.setRecordsPerSecond(config.getScanPolicy().recordsPerSecond);
statement.setRecordsPerSecond(client.getScanPolicyDefault().recordsPerSecond);

statement.setIndexName(secondaryIndex.getIndexName());
statement.setNamespace(query.getSchema());
Expand Down
62 changes: 29 additions & 33 deletions src/main/java/com/aerospike/jdbc/model/DriverConfiguration.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.aerospike.jdbc.model;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Value;
import com.aerospike.client.policy.AuthMode;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.policy.TlsPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.jdbc.async.EventLoopProvider;
import com.aerospike.jdbc.tls.AerospikeTLSPolicyBuilder;
import com.aerospike.jdbc.tls.AerospikeTLSPolicyConfig;
Expand All @@ -23,6 +22,9 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public final class DriverConfiguration {

private static final Logger logger = Logger.getLogger(DriverConfiguration.class.getName());
Expand All @@ -33,30 +35,32 @@ public final class DriverConfiguration {
private static final Pattern AS_JDBC_SCHEMA = Pattern.compile("/([^?]+)");

private final Map<Object, Object> clientInfo = new ConcurrentHashMap<>();
private volatile Host[] hosts;
private volatile IAerospikeClient client;
private volatile String schema;
private volatile ClientPolicy clientPolicy;
private volatile WritePolicy writePolicy;
private volatile ScanPolicy scanPolicy;
private volatile QueryPolicy queryPolicy;
private volatile DriverPolicy driverPolicy;

public DriverConfiguration(Properties props) {
logger.info(() -> "Configuration properties: " + props);
logger.info(() -> format("Init DriverConfiguration with properties: %s", props));
clientInfo.putAll(props);
}

@SuppressWarnings("java:S2696")
public void parse(String url) {
public IAerospikeClient parse(String url) {
logger.info(() -> format("Parse URL: %s", url));
schema = parseSchema(url);
updateClientInfo(url);
hosts = parseHosts(url, Optional.ofNullable(clientInfo.get("tlsName"))
.map(Object::toString).orElse(null));
clientPolicy = buildClientPolicy();
resetPolicies();

Value.UseBoolBin = Optional.ofNullable(clientInfo.get("useBoolBin"))
.map(Object::toString).map(Boolean::parseBoolean).orElse(true);
logger.info(() -> "Value.UseBoolBin = " + Value.UseBoolBin);
logger.info(() -> format("Value.UseBoolBin = %b", Value.UseBoolBin));

clientPolicy = buildClientPolicy();
Host[] hosts = parseHosts(url, Optional.ofNullable(clientInfo.get("tlsName"))
.map(Object::toString).orElse(null));
client = new AerospikeClient(clientPolicy, hosts);
resetPolicies();
return client;
}

private ClientPolicy buildClientPolicy() {
Expand All @@ -67,9 +71,15 @@ private ClientPolicy buildClientPolicy() {
}

private void resetPolicies() {
writePolicy = copy(new WritePolicy());
scanPolicy = copy(new ScanPolicy());
queryPolicy = copy(new QueryPolicy());
logger.fine(() -> "resetPolicies call");
if (client != null) {
copy(client.getReadPolicyDefault());
copy(client.getWritePolicyDefault());
copy(client.getScanPolicyDefault());
copy(client.getQueryPolicyDefault());
copy(client.getBatchPolicyDefault());
copy(client.getInfoPolicyDefault());
}
driverPolicy = new DriverPolicy(getClientInfo());
}

Expand Down Expand Up @@ -131,10 +141,6 @@ private void updateClientInfo(String url) {
}
}

public Host[] getHosts() {
return hosts;
}

public String getSchema() {
return schema;
}
Expand All @@ -156,22 +162,12 @@ public void putAll(Properties properties) {
}

public ClientPolicy getClientPolicy() {
requireNonNull(clientPolicy, "clientPolicy is null");
return clientPolicy;
}

public WritePolicy getWritePolicy() {
return writePolicy;
}

public ScanPolicy getScanPolicy() {
return scanPolicy;
}

public QueryPolicy getQueryPolicy() {
return queryPolicy;
}

public DriverPolicy getDriverPolicy() {
requireNonNull(driverPolicy, "driverPolicy is null");
return driverPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ protected BaseQueryHandler(IAerospikeClient client, Statement statement) {
} catch (SQLException e) {
throw new IllegalStateException("Failed to get configuration", e);
}
policyBuilder = new PolicyBuilder(config);
policyBuilder = new PolicyBuilder(client);
}

protected Bin[] getBins(AerospikeQuery query) {
Expand Down
15 changes: 5 additions & 10 deletions src/main/java/com/aerospike/jdbc/query/InsertQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.aerospike.client.*;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.BatchWritePolicy;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.jdbc.async.EventLoopProvider;
import com.aerospike.jdbc.async.FutureBatchOperateListListener;
Expand Down Expand Up @@ -75,10 +74,7 @@ public Pair<ResultSet, Integer> putBatch(AerospikeQuery query) {

FutureBatchOperateListListener listener = new FutureBatchOperateListListener();
List<BatchRecord> batchRecords = new ArrayList<>();

BatchWritePolicy batchWritePolicy = new BatchWritePolicy();
batchWritePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
batchWritePolicy.sendKey = true;
BatchWritePolicy batchWritePolicy = policyBuilder.buildBatchCreateOnlyPolicy();

for (Object aerospikeRecord : query.getValues()) {
@SuppressWarnings("unchecked")
Expand All @@ -89,14 +85,13 @@ public Pair<ResultSet, Integer> putBatch(AerospikeQuery query) {
new BatchWrite(
batchWritePolicy,
key,
Arrays.stream(buildBinArray(binNames, values)).map(Operation::put).toArray(Operation[]::new)
Arrays.stream(buildBinArray(binNames, values))
.map(Operation::put)
.toArray(Operation[]::new)
)
);
}
BatchPolicy batchPolicy = new BatchPolicy();
batchPolicy.sendKey = true;
batchPolicy.maxConcurrentThreads = batchRecords.size() / 100 + 1;

BatchPolicy batchPolicy = client.getBatchPolicyDefault();
try {
client.operate(EventLoopProvider.getEventLoop(), listener, batchPolicy, batchRecords);
} catch (AerospikeException e) {
Expand Down
37 changes: 21 additions & 16 deletions src/main/java/com/aerospike/jdbc/query/PolicyBuilder.java
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
package com.aerospike.jdbc.query;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.policy.BatchReadPolicy;
import com.aerospike.client.policy.BatchWritePolicy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.jdbc.model.AerospikeQuery;
import com.aerospike.jdbc.model.DriverConfiguration;

import java.util.Objects;

public class PolicyBuilder {

protected final DriverConfiguration config;
protected final IAerospikeClient client;

public PolicyBuilder(DriverConfiguration config) {
this.config = config;
public PolicyBuilder(IAerospikeClient client) {
this.client = client;
}

public ScanPolicy buildScanPolicy(AerospikeQuery query) {
ScanPolicy scanPolicy = new ScanPolicy(config.getScanPolicy());
ScanPolicy scanPolicy = new ScanPolicy(client.getScanPolicyDefault());
scanPolicy.maxRecords = Objects.isNull(query.getLimit()) ? 0 : query.getLimit();
scanPolicy.filterExp = Objects.isNull(query.getPredicate())
? null : Exp.build(query.getPredicate().toFilterExpression());
scanPolicy.sendKey = true;
return scanPolicy;
}

public QueryPolicy buildQueryPolicy(AerospikeQuery query) {
QueryPolicy queryPolicy = new QueryPolicy(config.getQueryPolicy());
QueryPolicy queryPolicy = new QueryPolicy(client.getQueryPolicyDefault());
queryPolicy.filterExp = Objects.isNull(query.getPredicate())
? null : Exp.build(query.getPredicate().toFilterExpression());
return queryPolicy;
Expand All @@ -42,30 +42,35 @@ public ScanPolicy buildScanNoBinDataPolicy(AerospikeQuery query) {
}

public WritePolicy buildWritePolicy(AerospikeQuery query) {
WritePolicy writePolicy = new WritePolicy(config.getWritePolicy());
WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault());
writePolicy.filterExp = Objects.isNull(query.getPredicate())
? null : Exp.build(query.getPredicate().toFilterExpression());
writePolicy.sendKey = true;
return writePolicy;
}

public BatchReadPolicy buildBatchReadPolicy(AerospikeQuery query) {
BatchReadPolicy policy = new BatchReadPolicy();
policy.filterExp = Objects.isNull(query.getPredicate())
BatchReadPolicy batchReadPolicy = new BatchReadPolicy();
batchReadPolicy.filterExp = Objects.isNull(query.getPredicate())
? null : Exp.build(query.getPredicate().toFilterExpression());
return policy;
return batchReadPolicy;
}

public BatchWritePolicy buildBatchCreateOnlyPolicy() {
BatchWritePolicy batchWritePolicy = new BatchWritePolicy();
batchWritePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
batchWritePolicy.sendKey = client.getBatchPolicyDefault().sendKey;
batchWritePolicy.expiration = client.getBatchWritePolicyDefault().expiration;
return batchWritePolicy;
}

public WritePolicy buildCreateOnlyPolicy() {
WritePolicy writePolicy = new WritePolicy(config.getWritePolicy());
writePolicy.sendKey = true;
WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault());
writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
return writePolicy;
}

public WritePolicy buildUpdateOnlyPolicy() {
WritePolicy writePolicy = new WritePolicy(config.getWritePolicy());
writePolicy.sendKey = true;
WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault());
writePolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
return writePolicy;
}
Expand Down
Loading

0 comments on commit d612f92

Please sign in to comment.