Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-293 Add support for index create/drop statements #58

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ Packages documentation can be found [here](https://javadoc.io/doc/com.aerospike/
* UPDATE
* DELETE
* TRUNCATE TABLE
* CREATE INDEX
* DROP INDEX

See [examples](docs/examples.md) of SQL.

Expand Down
11 changes: 11 additions & 0 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,14 @@ TRUNCATE TABLE port_list;
DELETE FROM port_list;
```

## CREATE INDEX

```sql
CREATE INDEX port_idx ON port_list (port);
```

## DROP INDEX

```sql
DROP INDEX port_idx ON port_list;
```
2 changes: 1 addition & 1 deletion src/main/java/com/aerospike/jdbc/AerospikeConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public boolean isClosed() {
@Override
public DatabaseMetaData getMetaData() throws SQLException {
logger.fine(() -> "getMetaData request");
return metadataBuilder.build(url, client, this);
return metadataBuilder.build(url, this);
}

@Override
Expand Down
165 changes: 58 additions & 107 deletions src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java
Original file line number Diff line number Diff line change
@@ -1,37 +1,35 @@
package com.aerospike.jdbc;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import com.aerospike.client.policy.InfoPolicy;
import com.aerospike.client.query.IndexType;
import com.aerospike.jdbc.model.AerospikeClusterInfo;
import com.aerospike.jdbc.model.AerospikeSecondaryIndex;
import com.aerospike.jdbc.model.DataColumn;
import com.aerospike.jdbc.schema.AerospikeSchemaBuilder;
import com.aerospike.jdbc.sql.ListRecordSet;
import com.aerospike.jdbc.sql.SimpleWrapper;
import com.aerospike.jdbc.util.AerospikeUtils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.io.IOException;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.RowIdLifetime;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.aerospike.jdbc.util.AerospikeUtils.getIndexBinValuesRatio;
import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME;
import static com.aerospike.jdbc.util.AerospikeUtils.getCatalogIndexes;
import static com.aerospike.jdbc.util.AerospikeUtils.getClusterInfo;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
Expand All @@ -40,80 +38,61 @@
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Collections.synchronizedSet;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;

@SuppressWarnings("java:S1192")
public class AerospikeDatabaseMetadata implements DatabaseMetaData, SimpleWrapper {

private static final Logger logger = Logger.getLogger(AerospikeDatabaseMetadata.class.getName());
private static final String NEW_LINE = System.lineSeparator();

private final String url;
private final Connection connection;
private final String dbBuild;
private final String dbEdition;
private final AerospikeConnection connection;

private final List<String> catalogs;
private final Map<String, Collection<String>> tables;
private final Map<String, Collection<AerospikeSecondaryIndex>> catalogIndexes;
private final AerospikeClusterInfo clusterInfo;
private final AerospikeSchemaBuilder schemaBuilder;
private final Cache<String, ResultSetMetaData> resultSetMetaDataCache;

public AerospikeDatabaseMetadata(String url, IAerospikeClient client, AerospikeConnection connection) {
private volatile Map<String, Collection<AerospikeSecondaryIndex>> catalogIndexes;

public AerospikeDatabaseMetadata(String url, AerospikeConnection connection) {
logger.info("Init AerospikeDatabaseMetadata");
this.url = url;
this.connection = connection;

Collection<String> builds = synchronizedSet(new HashSet<>());
Collection<String> editions = synchronizedSet(new HashSet<>());
Collection<String> namespaces = synchronizedSet(new HashSet<>());
final InfoPolicy infoPolicy = client.getInfoPolicyDefault();
catalogIndexes = new ConcurrentHashMap<>();
tables = new ConcurrentHashMap<>();
Arrays.stream(client.getNodes()).parallel()
.map(node -> Info.request(infoPolicy, node, "namespaces", "sets", "sindex", "build", "edition"))
.forEach(r -> {
builds.add(r.get("build"));
editions.add(r.get("edition"));
namespaces.addAll(asList(getOrDefault(r, "namespaces", "").split(";")));
streamOfSubProperties(r, "sets").forEach(p ->
tables.computeIfAbsent(p.getProperty("ns"), s -> new HashSet<>())
.addAll(Arrays.asList(p.getProperty("set"), DEFAULT_SCHEMA_NAME))
);
streamOfSubProperties(r, "sindex")
.filter(AerospikeUtils::isSupportedIndexType)
.forEach(p -> {
String namespace = p.getProperty("ns");
String indexName = p.getProperty("indexname");
Integer binRatio = connection.getAerospikeVersion().isSIndexCardinalitySupported()
? getIndexBinValuesRatio(client, namespace, indexName)
: null;
catalogIndexes.computeIfAbsent(namespace, s -> new HashSet<>())
.add(new AerospikeSecondaryIndex(
namespace,
p.getProperty("set"),
p.getProperty("bin"),
indexName,
IndexType.valueOf(p.getProperty("type").toUpperCase(Locale.ENGLISH)),
binRatio)
);
});
});

schemaBuilder = new AerospikeSchemaBuilder(client, connection.getConfiguration().getDriverPolicy());
clusterInfo = getClusterInfo(connection.getClient());
schemaBuilder = new AerospikeSchemaBuilder(
connection.getClient(),
connection.getConfiguration().getDriverPolicy()
);
resultSetMetaDataCache = CacheBuilder.newBuilder().build();

dbBuild = join("N/A", ", ", builds);
dbEdition = join("Aerospike", ", ", editions);
catalogs = namespaces.stream().filter(n -> !"".equals(n)).collect(Collectors.toList());
}

public AerospikeSchemaBuilder getSchemaBuilder() {
return schemaBuilder;
}

public void resetCatalogIndexes() {
logger.fine(() -> "Reset secondary index information");
catalogIndexes = null;
}

public Collection<AerospikeSecondaryIndex> getSecondaryIndexes(String catalog) {
initCatalogIndexes();
return catalogIndexes.get(catalog);
}

private void initCatalogIndexes() {
if (catalogIndexes == null) {
synchronized (this) {
if (catalogIndexes == null) {
logger.info(() -> "Load secondary index information");
catalogIndexes = getCatalogIndexes(connection.getClient(), connection.getAerospikeVersion());
}
}
}
}

@Override
public boolean allProceduresAreCallable() {
return false;
Expand All @@ -130,7 +109,7 @@ public String getURL() {
}

@Override
public String getUserName() throws SQLException {
public String getUserName() {
return connection.getClientInfo().getProperty("user");
}

Expand Down Expand Up @@ -161,12 +140,12 @@ public boolean nullsAreSortedAtEnd() {

@Override
public String getDatabaseProductName() {
return dbEdition;
return clusterInfo.getEdition();
}

@Override
public String getDatabaseProductVersion() {
return dbBuild;
return clusterInfo.getBuild();
}

@Override
Expand Down Expand Up @@ -251,7 +230,7 @@ public String getSQLKeywords() {

@Override
public String getNumericFunctions() {
return "sum,sumsqs,avg,min,max,count";
return "";
}

@Override
Expand Down Expand Up @@ -481,7 +460,7 @@ public boolean supportsCatalogsInTableDefinitions() {

@Override
public boolean supportsCatalogsInIndexDefinitions() {
return false;
return true;
}

@Override
Expand Down Expand Up @@ -735,12 +714,12 @@ public ResultSet getTables(String catalog, String schemaPattern, String tableNam

final Iterable<List<?>> tablesData;
if (catalog == null) {
tablesData = tables.entrySet().stream()
tablesData = clusterInfo.getTables().entrySet().stream()
.flatMap(p -> p.getValue().stream().map(t -> asList(p.getKey(), null, t, "TABLE", null, null,
null, null, null, null)))
.collect(toList());
} else {
tablesData = tables.getOrDefault(catalog, Collections.emptyList()).stream()
tablesData = clusterInfo.getTables().getOrDefault(catalog, Collections.emptyList()).stream()
.filter(t -> tableNameRegex == null || tableNameRegex.matcher(t).matches())
.map(t -> asList(catalog, null, t, "TABLE", null, null, null, null, null, null))
.collect(toList());
Expand All @@ -759,14 +738,14 @@ public ResultSet getTables(String catalog, String schemaPattern, String tableNam
public ResultSet getSchemas() {
return new ListRecordSet(null, "system", "schemas",
systemColumns(new String[]{"TABLE_SCHEM", "TABLE_CATALOG"}, new int[]{VARCHAR, VARCHAR}),
catalogs.stream().map(ns -> Arrays.asList("", ns)).collect(toList()));
clusterInfo.getCatalogs().stream().map(ns -> Arrays.asList("", ns)).collect(toList()));
}

@Override
public ResultSet getCatalogs() {
return new ListRecordSet(null, "system", "catalogs",
systemColumns(new String[]{"TABLE_CAT"}, new int[]{VARCHAR}),
catalogs.stream().map(Collections::singletonList).collect(toList()));
clusterInfo.getCatalogs().stream().map(Collections::singletonList).collect(toList()));
}

@Override
Expand All @@ -786,11 +765,11 @@ public ResultSet getColumns(String catalog, String schemaPattern, String tableNa

final List<ResultSetMetaData> resultSetMetaDataList;
if (catalog == null) {
resultSetMetaDataList = tables.entrySet().stream()
resultSetMetaDataList = clusterInfo.getTables().entrySet().stream()
.flatMap(p -> p.getValue().stream().map(t -> getMetadata(p.getKey(), t)))
.collect(toList());
} else {
resultSetMetaDataList = tables.getOrDefault(catalog, Collections.emptyList()).stream()
resultSetMetaDataList = clusterInfo.getTables().getOrDefault(catalog, Collections.emptyList()).stream()
.filter(t -> tableNameRegex == null || tableNameRegex.matcher(t).matches())
.map(t -> getMetadata(catalog, t))
.collect(toList());
Expand Down Expand Up @@ -862,12 +841,12 @@ public ResultSet getVersionColumns(String catalog, String schema, String table)
public ResultSet getPrimaryKeys(String catalog, String schema, String table) {
final Iterable<List<?>> tablesData;
if (catalog == null) {
tablesData = tables.entrySet().stream()
tablesData = clusterInfo.getTables().entrySet().stream()
.flatMap(p -> p.getValue().stream().map(t ->
asList(p.getKey(), null, t, PRIMARY_KEY_COLUMN_NAME, 1, PRIMARY_KEY_COLUMN_NAME)))
.collect(toList());
} else {
tablesData = tables.getOrDefault(catalog, Collections.emptyList()).stream()
tablesData = clusterInfo.getTables().getOrDefault(catalog, Collections.emptyList()).stream()
.filter(t -> table == null || table.equals(t))
.map(t -> asList(catalog, null, t, PRIMARY_KEY_COLUMN_NAME, 1, PRIMARY_KEY_COLUMN_NAME))
.collect(toList());
Expand Down Expand Up @@ -959,16 +938,16 @@ public ResultSet getTypeInfo() {
@Override
public ResultSet getIndexInfo(String catalog, String schema, String table, boolean unique, boolean approximate) {
logger.info(() -> format("getIndexInfo: %s, %s, %s", catalog, schema, table));
initCatalogIndexes();
Stream<AerospikeSecondaryIndex> secondaryIndexStream;
if (catalog == null) {
secondaryIndexStream = catalogIndexes.entrySet().stream()
.flatMap(p -> p.getValue().stream());
secondaryIndexStream = catalogIndexes.values().stream().flatMap(Collection::stream);
} else {
secondaryIndexStream = getOrDefault(catalogIndexes, catalog, Collections.emptyList()).stream()
.filter(i -> i.getNamespace().equals(catalog));
secondaryIndexStream = catalogIndexes.getOrDefault(catalog, Collections.emptyList()).stream()
.filter(index -> index.getNamespace().equals(catalog));
}
final Iterable<List<?>> indexData = secondaryIndexStream
.filter(i -> i.getSet().equals(table))
.filter(index -> index.getSet().equals(table))
.map(this::indexInfoAsList)
.collect(Collectors.toList());

Expand All @@ -983,10 +962,6 @@ public ResultSet getIndexInfo(String catalog, String schema, String table, boole
systemColumns(columns, sqlTypes), indexData);
}

public Collection<AerospikeSecondaryIndex> getSecondaryIndexes(String catalog) {
return catalogIndexes.get(catalog);
}

@Override
public boolean supportsResultSetType(int type) {
return false;
Expand Down Expand Up @@ -1259,30 +1234,6 @@ private List<DataColumn> systemColumns(String[] names, int[] types) {
.collect(toList());
}

private Properties initProperties(String lines) {
Properties properties = new Properties();
try {
properties.load(new StringReader(lines));
} catch (IOException e) {
logger.warning(() -> format("Expression in initProperties, lines: %s", lines));
}
return properties;
}

private Stream<Properties> streamOfSubProperties(Map<String, String> map, String key) {
return Optional.ofNullable(map.get(key)).map(s -> Arrays.stream(s.split(";"))
.map(ns -> initProperties(ns.replace(":", NEW_LINE)))).orElse(Stream.empty());
}

private <K, V> V getOrDefault(Map<K, V> map, K key, V defaultValue) {
return Optional.ofNullable(map.getOrDefault(key, defaultValue)).orElse(defaultValue);
}

@SuppressWarnings("SameParameterValue")
private String join(String defaultValue, String delimiter, Collection<String> elements) {
return elements.isEmpty() ? defaultValue : String.join(delimiter, elements);
}

private int ordinal(ResultSetMetaData md, String columnName) {
int ordinal = 0;
try {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/aerospike/jdbc/AerospikeStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.aerospike.jdbc.query.QueryPerformer;
import com.aerospike.jdbc.sql.SimpleWrapper;
import com.aerospike.jdbc.util.AuxStatementParser;
import org.apache.calcite.sql.parser.SqlParseException;

import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -26,6 +25,7 @@
public class AerospikeStatement implements Statement, SimpleWrapper {

private static final Logger logger = Logger.getLogger(AerospikeStatement.class.getName());

private static final String BATCH_NOT_SUPPORTED_MESSAGE = "Batch update is not supported";
private static final String AUTO_GENERATED_KEYS_NOT_SUPPORTED_MESSAGE = "Auto-generated keys are not supported";

Expand Down Expand Up @@ -64,8 +64,8 @@ protected AerospikeQuery parseQuery(String sql) throws SQLException {
AerospikeQuery query;
try {
query = AerospikeQuery.parse(sql);
} catch (SqlParseException e) {
query = AuxStatementParser.hack(sql);
} catch (Exception e) {
query = AuxStatementParser.parse(sql);
}
if (query.getCatalog() == null) {
query.setCatalog(catalog);
Expand Down
Loading
Loading