Skip to content

Commit

Permalink
FMWK-290 Align Aerospike namespace naming with JDBC catalog (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Dec 18, 2023
1 parent 4da86f6 commit 367f1eb
Show file tree
Hide file tree
Showing 25 changed files with 99 additions and 114 deletions.
8 changes: 4 additions & 4 deletions src/main/java/com/aerospike/jdbc/AerospikeConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class AerospikeConnection implements Connection, SimpleWrapper {
private final IAerospikeClient client;
private final DatabaseMetadataBuilder metadataBuilder;
private final AerospikeVersion aerospikeVersion;
private final AtomicReference<String> schema = new AtomicReference<>(null); // namespace
private final AtomicReference<String> catalog = new AtomicReference<>(null);

private volatile boolean readOnly = false;
private volatile Map<String, Class<?>> typeMap = emptyMap();
Expand All @@ -49,7 +49,7 @@ public AerospikeConnection(String url, Properties props) {
client = config.parse(url);
metadataBuilder = new DatabaseMetadataBuilder(config.getDriverPolicy());
aerospikeVersion = new AerospikeVersion(client);
schema.set(config.getSchema()); // namespace
catalog.set(config.getCatalog()); // namespace
}

@Override
Expand Down Expand Up @@ -125,12 +125,12 @@ public void setReadOnly(boolean readOnly) throws SQLException {

@Override
public String getCatalog() {
return schema.get();
return catalog.get();
}

@Override
public void setCatalog(String catalog) {
schema.set(catalog);
this.catalog.set(catalog);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ public ResultSetMetaData getMetaData() throws SQLException {
AerospikeQuery query = parseQuery(prepareQueryString());
List<DataColumn> columns = ((AerospikeDatabaseMetadata) connection.getMetaData())
.getSchemaBuilder()
.getSchema(query.getSchemaTable());
return new AerospikeResultSetMetaData(query.getSchema(), query.getTable(), columns);
.getSchema(query.getCatalogTable());
return new AerospikeResultSetMetaData(query.getCatalog(), query.getTable(), columns);
}

@Override
Expand Down Expand Up @@ -265,7 +265,7 @@ public ParameterMetaData getParameterMetaData() throws SQLException {
AerospikeQuery query = parseQuery(prepareQueryString());
List<DataColumn> columns = ((AerospikeDatabaseMetadata) connection.getMetaData())
.getSchemaBuilder()
.getSchema(query.getSchemaTable());
.getSchema(query.getCatalogTable());
return new SimpleParameterMetaData(columns);
}

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/aerospike/jdbc/AerospikeStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class AerospikeStatement implements Statement, SimpleWrapper {
protected final IAerospikeClient client;
protected final AerospikeConnection connection;

protected String schema;
protected String catalog;
protected ResultSet resultSet;
protected int updateCount;

Expand All @@ -42,7 +42,7 @@ public class AerospikeStatement implements Statement, SimpleWrapper {
public AerospikeStatement(IAerospikeClient client, AerospikeConnection connection) {
this.client = client;
this.connection = connection;
this.schema = connection.getCatalog();
this.catalog = connection.getCatalog();
}

@Override
Expand All @@ -67,8 +67,8 @@ protected AerospikeQuery parseQuery(String sql) throws SQLException {
} catch (SqlParseException e) {
query = AuxStatementParser.hack(sql);
}
if (query.getSchema() == null) {
query.setSchema(schema);
if (query.getCatalog() == null) {
query.setCatalog(catalog);
}
return query;
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/aerospike/jdbc/async/ScanQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ public RecordSet execute(ScanPolicy scanPolicy, AerospikeQuery query) {
long maxRecords = scanPolicy.maxRecords;
PartitionFilter filter = getPartitionFilter(query);
while (isScanRequired(maxRecords)) {
client.scanPartitions(scanPolicy, filter, query.getSchema(), query.getSetName(),
client.scanPartitions(scanPolicy, filter, query.getCatalog(), query.getSetName(),
callback, query.columnBins());
scanPolicy.maxRecords = maxRecords > 0 ? maxRecords - count : maxRecords;
filter = PartitionFilter.id(++currentPartition);
}
listener.onSuccess();
} else {
client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(),
client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getCatalog(),
query.getSetName(), query.columnBins());
}
return listener.getRecordSet();
}

private PartitionFilter getPartitionFilter(AerospikeQuery query) {
Key key = new Key(query.getSchema(), query.getSetName(), query.getOffset());
Key key = new Key(query.getCatalog(), query.getSetName(), query.getOffset());
currentPartition = Partition.getPartitionId(key.digest);
return PartitionFilter.after(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public RecordSet execute(QueryPolicy queryPolicy, AerospikeQuery query,
statement.setRecordsPerSecond(client.getScanPolicyDefault().recordsPerSecond);

statement.setIndexName(secondaryIndex.getIndexName());
statement.setNamespace(query.getSchema());
statement.setNamespace(query.getCatalog());
statement.setSetName(query.getTable());
statement.setBinNames(query.columnBins());

Expand Down
21 changes: 3 additions & 18 deletions src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class AerospikeQuery {
private static final String ASTERISK = "*";

private String catalog;
private String schema;
private String table;
private QueryType queryType;
private Integer offset;
Expand Down Expand Up @@ -59,29 +58,15 @@ public void setCatalog(String catalog) {
this.catalog = catalog;
}

public String getSchema() {
return schema;
}

public void setSchema(String schema) {
this.catalog = schema; // TODO ?
this.schema = schema;
}

public String getTable() {
return table;
}

public void setTable(String table) {
String[] spec = table.split("\\.");
switch (spec.length) {
case 3:
this.catalog = spec[0];
this.schema = spec[1];
this.table = spec[2];
break;
case 2:
this.schema = spec[0];
this.catalog = spec[0];
this.table = spec[1];
break;
case 1:
Expand All @@ -99,8 +84,8 @@ public String getSetName() {
return table;
}

public SchemaTableName getSchemaTable() {
return new SchemaTableName(schema, table);
public CatalogTableName getCatalogTable() {
return new CatalogTableName(catalog, table);
}

public QueryType getQueryType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class AerospikeSecondaryIndex {
private final IndexType indexType;

@Order(3)
@JsonProperty("schema_name")
@JsonProperty("catalog_name")
private final String namespace;

@Order(4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public AerospikeQuery visit(SqlCall sqlCall) {
query.setQueryType(QueryType.DROP_TABLE);
} else if (sqlCall instanceof SqlDropSchema) {
SqlDropSchema sql = (SqlDropSchema) sqlCall;
query.setSchema(requireNonNull(sql.name).toString());
query.setCatalog(requireNonNull(sql.name).toString());
query.setQueryType(QueryType.DROP_SCHEMA);
} else if (sqlCall instanceof SqlOrderBy) {
SqlOrderBy sql = (SqlOrderBy) sqlCall;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

import java.util.Objects;

public class SchemaTableName {
public class CatalogTableName {

private final String schemaName;
private final String catalogName;
private final String tableName;

public SchemaTableName(String schemaName, String tableName) {
this.schemaName = schemaName;
public CatalogTableName(String catalogName, String tableName) {
this.catalogName = catalogName;
this.tableName = tableName;
}

public String getSchemaName() {
return schemaName;
public String getCatalogName() {
return catalogName;
}

public String getTableName() {
Expand All @@ -24,18 +24,18 @@ public String getTableName() {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SchemaTableName that = (SchemaTableName) o;
return Objects.equals(schemaName, that.schemaName) &&
CatalogTableName that = (CatalogTableName) o;
return Objects.equals(catalogName, that.catalogName) &&
Objects.equals(tableName, that.tableName);
}

@Override
public int hashCode() {
return Objects.hash(schemaName, tableName);
return Objects.hash(catalogName, tableName);
}

@Override
public String toString() {
return String.format("%s(%s, %s)", getClass().getSimpleName(), schemaName, tableName);
return String.format("%s(%s, %s)", getClass().getSimpleName(), catalogName, tableName);
}
}
4 changes: 2 additions & 2 deletions src/main/java/com/aerospike/jdbc/model/DataColumn.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public int getType() {
return type;
}

public SchemaTableName getSchemaTableName() {
return new SchemaTableName(catalog, table);
public CatalogTableName getSchemaTableName() {
return new CatalogTableName(catalog, table);
}

@Override
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/com/aerospike/jdbc/model/DriverConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ public final class DriverConfiguration {

private static final String DEFAULT_AEROSPIKE_PORT = "3000";

private static final Pattern AS_JDBC_URL = Pattern.compile("^jdbc:aerospike:(?://)?([^/?]+)");
private static final Pattern AS_JDBC_SCHEMA = Pattern.compile("/([^?]+)");
private static final Pattern AEROSPIKE_JDBC_URL = Pattern.compile("^jdbc:aerospike:(?://)?([^/?]+)");
private static final Pattern AEROSPIKE_JDBC_CATALOG = Pattern.compile("/([^?]+)");

private final Map<Object, Object> clientInfo = new ConcurrentHashMap<>();
private volatile IAerospikeClient client;
private volatile String schema;
private volatile String catalog;
private volatile ClientPolicy clientPolicy;
private volatile DriverPolicy driverPolicy;

Expand All @@ -48,7 +48,7 @@ public DriverConfiguration(Properties props) {
@SuppressWarnings("java:S2696")
public IAerospikeClient parse(String url) {
logger.info(() -> format("Parse URL: %s", url));
schema = parseSchema(url);
catalog = parseCatalog(url);
updateClientInfo(url);

Value.UseBoolBin = Optional.ofNullable(clientInfo.get("useBoolBin"))
Expand Down Expand Up @@ -113,7 +113,7 @@ private TlsPolicy buildTlsPolicy() {
}

private Host[] parseHosts(String url, final String tlsName) {
Matcher m = AS_JDBC_URL.matcher(url);
Matcher m = AEROSPIKE_JDBC_URL.matcher(url);
if (!m.find()) {
throw new IllegalArgumentException("Cannot parse URL " + url);
}
Expand All @@ -124,8 +124,8 @@ private Host[] parseHosts(String url, final String tlsName) {
.toArray(Host[]::new);
}

private String parseSchema(String url) {
Matcher m = AS_JDBC_SCHEMA.matcher(url);
private String parseCatalog(String url) {
Matcher m = AEROSPIKE_JDBC_CATALOG.matcher(url);
return m.find() ? m.group(1) : null;
}

Expand All @@ -141,8 +141,8 @@ private void updateClientInfo(String url) {
}
}

public String getSchema() {
return schema;
public String getCatalog() {
return catalog;
}

public Properties getClientInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected Bin[] getBins(AerospikeQuery query) {
}

protected ListRecordSet emptyRecordSet(AerospikeQuery query) {
return new ListRecordSet(statement, query.getSchema(), query.getTable(),
return new ListRecordSet(statement, query.getCatalog(), query.getTable(),
emptyList(), emptyList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
logger.info("DELETE primary key");
FutureDeleteListener listener = new FutureDeleteListener(keyObjects.size());
for (Object keyObject : keyObjects) {
Key key = new Key(query.getSchema(), query.getSetName(), Value.get(keyObject));
Key key = new Key(query.getCatalog(), query.getSetName(), Value.get(keyObject));
try {
client.delete(EventLoopProvider.getEventLoop(), listener, writePolicy, key);
} catch (AerospikeException e) {
Expand All @@ -49,7 +49,7 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
ScanPolicy scanPolicy = policyBuilder.buildScanPolicy(query);
scanPolicy.includeBinData = false;

client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(),
client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getCatalog(),
query.getSetName());

final WritePolicy deletePolicy = policyBuilder.buildDeleteWritePolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Pair<ResultSet, Integer> putConsecutively(AerospikeQuery query) {
@SuppressWarnings("unchecked")
List<Object> values = (List<Object>) aerospikeRecord;
Value recordKey = extractInsertKey(query, values);
Key key = new Key(query.getSchema(), query.getSetName(), recordKey);
Key key = new Key(query.getCatalog(), query.getSetName(), recordKey);
Bin[] bins = buildBinArray(binNames, values);

try {
Expand All @@ -73,7 +73,7 @@ public Pair<ResultSet, Integer> putBatch(AerospikeQuery query) {
@SuppressWarnings("unchecked")
List<Object> values = (List<Object>) aerospikeRecord;
Value recordKey = extractInsertKey(query, values);
Key key = new Key(query.getSchema(), query.getSetName(), recordKey);
Key key = new Key(query.getCatalog(), query.getSetName(), recordKey);
batchRecords.add(
new BatchWrite(
batchWritePolicy,
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public SelectQueryHandler(IAerospikeClient client, Statement statement) {

@Override
public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
columns = databaseMetadata.getSchemaBuilder().getSchema(query.getSchemaTable());
columns = databaseMetadata.getSchemaBuilder().getSchema(query.getCatalogTable());
Collection<Object> keyObjects = query.getPrimaryKeys();
Optional<AerospikeSecondaryIndex> sIndex = secondaryIndex(query);
Pair<ResultSet, Integer> result;
Expand All @@ -74,7 +74,7 @@ private Pair<ResultSet, Integer> executeCountQuery(AerospikeQuery query) {
String countLabel = query.getColumns().get(0);
int recordNumber;
if (Objects.isNull(query.getPredicate())) {
recordNumber = getTableRecordsNumber(client, query.getSchema(), query.getTable());
recordNumber = getTableRecordsNumber(client, query.getCatalog(), query.getTable());
} else {
ScanPolicy policy = policyBuilder.buildScanNoBinDataPolicy(query);
RecordSet recordSet = ScanQueryHandler.create(client, config.getDriverPolicy())
Expand All @@ -91,7 +91,7 @@ private Pair<ResultSet, Integer> executeCountQuery(AerospikeQuery query) {
recordSet.put(new KeyRecord(null, aeroRecord));
recordSet.close();

columns = Collections.singletonList(new DataColumn(query.getSchema(), query.getTable(),
columns = Collections.singletonList(new DataColumn(query.getCatalog(), query.getTable(),
Types.INTEGER, countLabel, countLabel));

return queryResult(recordSet, query);
Expand All @@ -102,7 +102,7 @@ private Pair<ResultSet, Integer> executeSelectByPrimaryKey(AerospikeQuery query,
final BatchReadPolicy policy = policyBuilder.buildBatchReadPolicy(query);
List<BatchRead> batchReadList = keyObjects.stream()
.map(k -> {
Key key = new Key(query.getSchema(), query.getSetName(), Value.get(k));
Key key = new Key(query.getCatalog(), query.getSetName(), Value.get(k));
return new BatchRead(policy, key, true);
})
.collect(Collectors.toList());
Expand Down Expand Up @@ -135,7 +135,7 @@ private Pair<ResultSet, Integer> executeQuery(AerospikeQuery query,

private Optional<AerospikeSecondaryIndex> secondaryIndex(AerospikeQuery query) {
if (aerospikeVersion.isSIndexSupported() && query.isIndexable()) {
Collection<AerospikeSecondaryIndex> indexes = databaseMetadata.getSecondaryIndexes(query.getSchema());
Collection<AerospikeSecondaryIndex> indexes = databaseMetadata.getSecondaryIndexes(query.getCatalog());
List<String> binNames = query.getPredicate().getBinNames();
if (!binNames.isEmpty() && indexes != null && !indexes.isEmpty()) {
List<AerospikeSecondaryIndex> indexList = indexes.stream()
Expand All @@ -154,7 +154,7 @@ private Optional<AerospikeSecondaryIndex> secondaryIndex(AerospikeQuery query) {
}

private Pair<ResultSet, Integer> queryResult(RecordSet recordSet, AerospikeQuery query) {
return new Pair<>(new AerospikeRecordResultSet(recordSet, statement, query.getSchema(),
return new Pair<>(new AerospikeRecordResultSet(recordSet, statement, query.getCatalog(),
query.getTable(), filterColumns(query)), -1);
}

Expand Down
Loading

0 comments on commit 367f1eb

Please sign in to comment.