diff --git a/README.md b/README.md index 130d69a..b46e758 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,8 @@ Packages documentation can be found [here](https://javadoc.io/doc/com.aerospike/ * UPDATE * DELETE * TRUNCATE TABLE +* CREATE INDEX +* DROP INDEX See [examples](docs/examples.md) of SQL. diff --git a/docs/examples.md b/docs/examples.md index 782a1f7..4eda708 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -314,3 +314,14 @@ TRUNCATE TABLE port_list; DELETE FROM port_list; ``` +## CREATE INDEX + +```sql +CREATE INDEX port_idx ON port_list (port); +``` + +## DROP INDEX + +```sql +DROP INDEX port_idx ON port_list; +``` diff --git a/src/main/java/com/aerospike/jdbc/AerospikeConnection.java b/src/main/java/com/aerospike/jdbc/AerospikeConnection.java index 9991179..a49625c 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeConnection.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeConnection.java @@ -107,7 +107,7 @@ public boolean isClosed() { @Override public DatabaseMetaData getMetaData() throws SQLException { logger.fine(() -> "getMetaData request"); - return metadataBuilder.build(url, client, this); + return metadataBuilder.build(url, this); } @Override diff --git a/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java b/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java index a895e85..2477bfa 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java @@ -1,20 +1,14 @@ package com.aerospike.jdbc; -import com.aerospike.client.IAerospikeClient; -import com.aerospike.client.Info; -import com.aerospike.client.policy.InfoPolicy; -import com.aerospike.client.query.IndexType; +import com.aerospike.jdbc.model.AerospikeClusterInfo; import com.aerospike.jdbc.model.AerospikeSecondaryIndex; import com.aerospike.jdbc.model.DataColumn; import com.aerospike.jdbc.schema.AerospikeSchemaBuilder; import com.aerospike.jdbc.sql.ListRecordSet; import com.aerospike.jdbc.sql.SimpleWrapper; -import com.aerospike.jdbc.util.AerospikeUtils; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import java.io.IOException; -import java.io.StringReader; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; @@ -22,16 +16,20 @@ import java.sql.RowIdLifetime; import java.sql.SQLException; import java.sql.Statement; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.logging.Logger; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; -import static com.aerospike.jdbc.util.AerospikeUtils.getIndexBinValuesRatio; -import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME; +import static com.aerospike.jdbc.util.AerospikeUtils.getCatalogIndexes; +import static com.aerospike.jdbc.util.AerospikeUtils.getClusterInfo; import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME; import static com.google.common.base.Strings.isNullOrEmpty; import static java.lang.String.format; @@ -40,7 +38,6 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static java.util.Collections.synchronizedSet; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; @@ -48,72 +45,54 @@ public class AerospikeDatabaseMetadata implements DatabaseMetaData, SimpleWrapper { private static final Logger logger = Logger.getLogger(AerospikeDatabaseMetadata.class.getName()); - private static final String NEW_LINE = System.lineSeparator(); private final String url; - private final Connection connection; - private final String dbBuild; - private final String dbEdition; + private final AerospikeConnection connection; - private final List catalogs; - private final Map> tables; - private final Map> catalogIndexes; + private final AerospikeClusterInfo clusterInfo; private final AerospikeSchemaBuilder schemaBuilder; private final Cache resultSetMetaDataCache; - public AerospikeDatabaseMetadata(String url, IAerospikeClient client, AerospikeConnection connection) { + private volatile Map> catalogIndexes; + + public AerospikeDatabaseMetadata(String url, AerospikeConnection connection) { logger.info("Init AerospikeDatabaseMetadata"); this.url = url; this.connection = connection; - Collection builds = synchronizedSet(new HashSet<>()); - Collection editions = synchronizedSet(new HashSet<>()); - Collection namespaces = synchronizedSet(new HashSet<>()); - final InfoPolicy infoPolicy = client.getInfoPolicyDefault(); - catalogIndexes = new ConcurrentHashMap<>(); - tables = new ConcurrentHashMap<>(); - Arrays.stream(client.getNodes()).parallel() - .map(node -> Info.request(infoPolicy, node, "namespaces", "sets", "sindex", "build", "edition")) - .forEach(r -> { - builds.add(r.get("build")); - editions.add(r.get("edition")); - namespaces.addAll(asList(getOrDefault(r, "namespaces", "").split(";"))); - streamOfSubProperties(r, "sets").forEach(p -> - tables.computeIfAbsent(p.getProperty("ns"), s -> new HashSet<>()) - .addAll(Arrays.asList(p.getProperty("set"), DEFAULT_SCHEMA_NAME)) - ); - streamOfSubProperties(r, "sindex") - .filter(AerospikeUtils::isSupportedIndexType) - .forEach(p -> { - String namespace = p.getProperty("ns"); - String indexName = p.getProperty("indexname"); - Integer binRatio = connection.getAerospikeVersion().isSIndexCardinalitySupported() - ? getIndexBinValuesRatio(client, namespace, indexName) - : null; - catalogIndexes.computeIfAbsent(namespace, s -> new HashSet<>()) - .add(new AerospikeSecondaryIndex( - namespace, - p.getProperty("set"), - p.getProperty("bin"), - indexName, - IndexType.valueOf(p.getProperty("type").toUpperCase(Locale.ENGLISH)), - binRatio) - ); - }); - }); - - schemaBuilder = new AerospikeSchemaBuilder(client, connection.getConfiguration().getDriverPolicy()); + clusterInfo = getClusterInfo(connection.getClient()); + schemaBuilder = new AerospikeSchemaBuilder( + connection.getClient(), + connection.getConfiguration().getDriverPolicy() + ); resultSetMetaDataCache = CacheBuilder.newBuilder().build(); - - dbBuild = join("N/A", ", ", builds); - dbEdition = join("Aerospike", ", ", editions); - catalogs = namespaces.stream().filter(n -> !"".equals(n)).collect(Collectors.toList()); } public AerospikeSchemaBuilder getSchemaBuilder() { return schemaBuilder; } + public void resetCatalogIndexes() { + logger.fine(() -> "Reset secondary index information"); + catalogIndexes = null; + } + + public Collection getSecondaryIndexes(String catalog) { + initCatalogIndexes(); + return catalogIndexes.get(catalog); + } + + private void initCatalogIndexes() { + if (catalogIndexes == null) { + synchronized (this) { + if (catalogIndexes == null) { + logger.info(() -> "Load secondary index information"); + catalogIndexes = getCatalogIndexes(connection.getClient(), connection.getAerospikeVersion()); + } + } + } + } + @Override public boolean allProceduresAreCallable() { return false; @@ -130,7 +109,7 @@ public String getURL() { } @Override - public String getUserName() throws SQLException { + public String getUserName() { return connection.getClientInfo().getProperty("user"); } @@ -161,12 +140,12 @@ public boolean nullsAreSortedAtEnd() { @Override public String getDatabaseProductName() { - return dbEdition; + return clusterInfo.getEdition(); } @Override public String getDatabaseProductVersion() { - return dbBuild; + return clusterInfo.getBuild(); } @Override @@ -251,7 +230,7 @@ public String getSQLKeywords() { @Override public String getNumericFunctions() { - return "sum,sumsqs,avg,min,max,count"; + return ""; } @Override @@ -481,7 +460,7 @@ public boolean supportsCatalogsInTableDefinitions() { @Override public boolean supportsCatalogsInIndexDefinitions() { - return false; + return true; } @Override @@ -735,12 +714,12 @@ public ResultSet getTables(String catalog, String schemaPattern, String tableNam final Iterable> tablesData; if (catalog == null) { - tablesData = tables.entrySet().stream() + tablesData = clusterInfo.getTables().entrySet().stream() .flatMap(p -> p.getValue().stream().map(t -> asList(p.getKey(), null, t, "TABLE", null, null, null, null, null, null))) .collect(toList()); } else { - tablesData = tables.getOrDefault(catalog, Collections.emptyList()).stream() + tablesData = clusterInfo.getTables().getOrDefault(catalog, Collections.emptyList()).stream() .filter(t -> tableNameRegex == null || tableNameRegex.matcher(t).matches()) .map(t -> asList(catalog, null, t, "TABLE", null, null, null, null, null, null)) .collect(toList()); @@ -759,14 +738,14 @@ public ResultSet getTables(String catalog, String schemaPattern, String tableNam public ResultSet getSchemas() { return new ListRecordSet(null, "system", "schemas", systemColumns(new String[]{"TABLE_SCHEM", "TABLE_CATALOG"}, new int[]{VARCHAR, VARCHAR}), - catalogs.stream().map(ns -> Arrays.asList("", ns)).collect(toList())); + clusterInfo.getCatalogs().stream().map(ns -> Arrays.asList("", ns)).collect(toList())); } @Override public ResultSet getCatalogs() { return new ListRecordSet(null, "system", "catalogs", systemColumns(new String[]{"TABLE_CAT"}, new int[]{VARCHAR}), - catalogs.stream().map(Collections::singletonList).collect(toList())); + clusterInfo.getCatalogs().stream().map(Collections::singletonList).collect(toList())); } @Override @@ -786,11 +765,11 @@ public ResultSet getColumns(String catalog, String schemaPattern, String tableNa final List resultSetMetaDataList; if (catalog == null) { - resultSetMetaDataList = tables.entrySet().stream() + resultSetMetaDataList = clusterInfo.getTables().entrySet().stream() .flatMap(p -> p.getValue().stream().map(t -> getMetadata(p.getKey(), t))) .collect(toList()); } else { - resultSetMetaDataList = tables.getOrDefault(catalog, Collections.emptyList()).stream() + resultSetMetaDataList = clusterInfo.getTables().getOrDefault(catalog, Collections.emptyList()).stream() .filter(t -> tableNameRegex == null || tableNameRegex.matcher(t).matches()) .map(t -> getMetadata(catalog, t)) .collect(toList()); @@ -862,12 +841,12 @@ public ResultSet getVersionColumns(String catalog, String schema, String table) public ResultSet getPrimaryKeys(String catalog, String schema, String table) { final Iterable> tablesData; if (catalog == null) { - tablesData = tables.entrySet().stream() + tablesData = clusterInfo.getTables().entrySet().stream() .flatMap(p -> p.getValue().stream().map(t -> asList(p.getKey(), null, t, PRIMARY_KEY_COLUMN_NAME, 1, PRIMARY_KEY_COLUMN_NAME))) .collect(toList()); } else { - tablesData = tables.getOrDefault(catalog, Collections.emptyList()).stream() + tablesData = clusterInfo.getTables().getOrDefault(catalog, Collections.emptyList()).stream() .filter(t -> table == null || table.equals(t)) .map(t -> asList(catalog, null, t, PRIMARY_KEY_COLUMN_NAME, 1, PRIMARY_KEY_COLUMN_NAME)) .collect(toList()); @@ -959,16 +938,16 @@ public ResultSet getTypeInfo() { @Override public ResultSet getIndexInfo(String catalog, String schema, String table, boolean unique, boolean approximate) { logger.info(() -> format("getIndexInfo: %s, %s, %s", catalog, schema, table)); + initCatalogIndexes(); Stream secondaryIndexStream; if (catalog == null) { - secondaryIndexStream = catalogIndexes.entrySet().stream() - .flatMap(p -> p.getValue().stream()); + secondaryIndexStream = catalogIndexes.values().stream().flatMap(Collection::stream); } else { - secondaryIndexStream = getOrDefault(catalogIndexes, catalog, Collections.emptyList()).stream() - .filter(i -> i.getNamespace().equals(catalog)); + secondaryIndexStream = catalogIndexes.getOrDefault(catalog, Collections.emptyList()).stream() + .filter(index -> index.getNamespace().equals(catalog)); } final Iterable> indexData = secondaryIndexStream - .filter(i -> i.getSet().equals(table)) + .filter(index -> index.getSet().equals(table)) .map(this::indexInfoAsList) .collect(Collectors.toList()); @@ -983,10 +962,6 @@ public ResultSet getIndexInfo(String catalog, String schema, String table, boole systemColumns(columns, sqlTypes), indexData); } - public Collection getSecondaryIndexes(String catalog) { - return catalogIndexes.get(catalog); - } - @Override public boolean supportsResultSetType(int type) { return false; @@ -1259,30 +1234,6 @@ private List systemColumns(String[] names, int[] types) { .collect(toList()); } - private Properties initProperties(String lines) { - Properties properties = new Properties(); - try { - properties.load(new StringReader(lines)); - } catch (IOException e) { - logger.warning(() -> format("Expression in initProperties, lines: %s", lines)); - } - return properties; - } - - private Stream streamOfSubProperties(Map map, String key) { - return Optional.ofNullable(map.get(key)).map(s -> Arrays.stream(s.split(";")) - .map(ns -> initProperties(ns.replace(":", NEW_LINE)))).orElse(Stream.empty()); - } - - private V getOrDefault(Map map, K key, V defaultValue) { - return Optional.ofNullable(map.getOrDefault(key, defaultValue)).orElse(defaultValue); - } - - @SuppressWarnings("SameParameterValue") - private String join(String defaultValue, String delimiter, Collection elements) { - return elements.isEmpty() ? defaultValue : String.join(delimiter, elements); - } - private int ordinal(ResultSetMetaData md, String columnName) { int ordinal = 0; try { diff --git a/src/main/java/com/aerospike/jdbc/AerospikeStatement.java b/src/main/java/com/aerospike/jdbc/AerospikeStatement.java index 3157ca3..d0bcaa0 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeStatement.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeStatement.java @@ -7,7 +7,6 @@ import com.aerospike.jdbc.query.QueryPerformer; import com.aerospike.jdbc.sql.SimpleWrapper; import com.aerospike.jdbc.util.AuxStatementParser; -import org.apache.calcite.sql.parser.SqlParseException; import java.sql.Connection; import java.sql.ResultSet; @@ -26,6 +25,7 @@ public class AerospikeStatement implements Statement, SimpleWrapper { private static final Logger logger = Logger.getLogger(AerospikeStatement.class.getName()); + private static final String BATCH_NOT_SUPPORTED_MESSAGE = "Batch update is not supported"; private static final String AUTO_GENERATED_KEYS_NOT_SUPPORTED_MESSAGE = "Auto-generated keys are not supported"; @@ -64,8 +64,8 @@ protected AerospikeQuery parseQuery(String sql) throws SQLException { AerospikeQuery query; try { query = AerospikeQuery.parse(sql); - } catch (SqlParseException e) { - query = AuxStatementParser.hack(sql); + } catch (Exception e) { + query = AuxStatementParser.parse(sql); } if (query.getCatalog() == null) { query.setCatalog(catalog); diff --git a/src/main/java/com/aerospike/jdbc/model/AerospikeClusterInfo.java b/src/main/java/com/aerospike/jdbc/model/AerospikeClusterInfo.java new file mode 100644 index 0000000..bfbc3a5 --- /dev/null +++ b/src/main/java/com/aerospike/jdbc/model/AerospikeClusterInfo.java @@ -0,0 +1,36 @@ +package com.aerospike.jdbc.model; + +import java.util.Collection; +import java.util.Map; + +public class AerospikeClusterInfo { + + private final String build; + private final String edition; + private final Collection catalogs; + private final Map> tables; + + public AerospikeClusterInfo(String build, String edition, Collection catalogs, + Map> tables) { + this.build = build; + this.edition = edition; + this.catalogs = catalogs; + this.tables = tables; + } + + public String getBuild() { + return build; + } + + public String getEdition() { + return edition; + } + + public Collection getCatalogs() { + return catalogs; + } + + public Map> getTables() { + return tables; + } +} diff --git a/src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java b/src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java index ff832e7..05da48b 100644 --- a/src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java +++ b/src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java @@ -35,6 +35,7 @@ public class AerospikeQuery { private QueryType queryType; private Integer offset; private Integer limit; + private String index; private QueryPredicate predicate; private List values; @@ -112,6 +113,14 @@ public void setLimit(int limit) { this.limit = limit; } + public String getIndex() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } + public QueryPredicate getPredicate() { return predicate; } diff --git a/src/main/java/com/aerospike/jdbc/model/QueryType.java b/src/main/java/com/aerospike/jdbc/model/QueryType.java index f1892ff..844c925 100644 --- a/src/main/java/com/aerospike/jdbc/model/QueryType.java +++ b/src/main/java/com/aerospike/jdbc/model/QueryType.java @@ -7,6 +7,8 @@ public enum QueryType { SHOW_COLUMNS, DROP_SCHEMA, DROP_TABLE, + CREATE_INDEX, + DROP_INDEX, SELECT, INSERT, UPDATE, diff --git a/src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java b/src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java index 93a7534..a31a9c8 100644 --- a/src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java @@ -5,6 +5,7 @@ import com.aerospike.client.IAerospikeClient; import com.aerospike.client.Value; import com.aerospike.jdbc.AerospikeConnection; +import com.aerospike.jdbc.AerospikeDatabaseMetadata; import com.aerospike.jdbc.model.AerospikeQuery; import com.aerospike.jdbc.model.DriverConfiguration; import com.aerospike.jdbc.sql.ListRecordSet; @@ -29,6 +30,7 @@ public abstract class BaseQueryHandler implements QueryHandler { protected final PolicyBuilder policyBuilder; protected final DriverConfiguration config; protected final AerospikeVersion aerospikeVersion; + protected final AerospikeDatabaseMetadata databaseMetadata; protected BaseQueryHandler(IAerospikeClient client, Statement statement) { this.client = client; @@ -36,6 +38,7 @@ protected BaseQueryHandler(IAerospikeClient client, Statement statement) { policyBuilder = new PolicyBuilder(client); config = getConfiguration(); aerospikeVersion = getAerospikeVersion(); + databaseMetadata = getDatabaseMetadata(); } protected Bin[] getBins(AerospikeQuery query) { @@ -82,4 +85,12 @@ private AerospikeVersion getAerospikeVersion() { throw new IllegalStateException("Failed to get AerospikeVersion", e); } } + + private AerospikeDatabaseMetadata getDatabaseMetadata() { + try { + return (AerospikeDatabaseMetadata) statement.getConnection().getMetaData(); + } catch (SQLException e) { + throw new IllegalStateException("Failed to get AerospikeDatabaseMetadata", e); + } + } } diff --git a/src/main/java/com/aerospike/jdbc/query/IndexCreateHandler.java b/src/main/java/com/aerospike/jdbc/query/IndexCreateHandler.java new file mode 100644 index 0000000..3de6934 --- /dev/null +++ b/src/main/java/com/aerospike/jdbc/query/IndexCreateHandler.java @@ -0,0 +1,68 @@ +package com.aerospike.jdbc.query; + +import com.aerospike.client.IAerospikeClient; +import com.aerospike.client.query.IndexType; +import com.aerospike.jdbc.model.AerospikeQuery; +import com.aerospike.jdbc.model.CatalogTableName; +import com.aerospike.jdbc.model.Pair; + +import java.sql.ResultSet; +import java.sql.Statement; +import java.sql.Types; +import java.util.logging.Logger; + +import static java.lang.String.format; + +public class IndexCreateHandler extends BaseQueryHandler { + + private static final Logger logger = Logger.getLogger(IndexCreateHandler.class.getName()); + + protected IndexCreateHandler(IAerospikeClient client, Statement statement) { + super(client, statement); + } + + @Override + public Pair execute(AerospikeQuery query) { + logger.info("CREATE INDEX statement"); + if (query.getColumns().size() != 1) { + throw new UnsupportedOperationException( + format("Multi-column index is not supported, got: %s", query.getColumns())); + } else { + client.createIndex( + null, + query.getCatalog(), + query.getTable(), + query.getIndex(), + query.getColumns().get(0), + getIndexType(query)); + } + databaseMetadata.resetCatalogIndexes(); + return new Pair<>(emptyRecordSet(query), 1); + } + + private IndexType getIndexType(AerospikeQuery query) { + final CatalogTableName catalogTableName = new CatalogTableName( + query.getCatalog(), + query.getTable() + ); + final String columnName = query.getColumns().get(0); + return databaseMetadata.getSchemaBuilder().getSchema(catalogTableName).stream() + .filter(dataColumn -> dataColumn.getName().equals(columnName)) + .findFirst() + .map(dataColumn -> { + int columnType = dataColumn.getType(); + switch (columnType) { + case Types.VARCHAR: + return IndexType.STRING; + case Types.BIGINT: + case Types.INTEGER: + return IndexType.NUMERIC; + default: + throw new UnsupportedOperationException( + format("Secondary index is not supported for type: %d", columnType)); + } + }) + .orElseThrow(() -> new IllegalArgumentException(format("Column %s not found in %s", + columnName, query.getTable()))); + } +} diff --git a/src/main/java/com/aerospike/jdbc/query/IndexDropHandler.java b/src/main/java/com/aerospike/jdbc/query/IndexDropHandler.java new file mode 100644 index 0000000..556fbcd --- /dev/null +++ b/src/main/java/com/aerospike/jdbc/query/IndexDropHandler.java @@ -0,0 +1,27 @@ +package com.aerospike.jdbc.query; + +import com.aerospike.client.IAerospikeClient; +import com.aerospike.jdbc.model.AerospikeQuery; +import com.aerospike.jdbc.model.Pair; + +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.logging.Logger; + +public class IndexDropHandler extends BaseQueryHandler { + + private static final Logger logger = Logger.getLogger(IndexDropHandler.class.getName()); + + protected IndexDropHandler(IAerospikeClient client, Statement statement) { + super(client, statement); + } + + @Override + public Pair execute(AerospikeQuery query) { + logger.info("DROP INDEX statement"); + client.dropIndex(null, query.getCatalog(), query.getTable(), query.getIndex()); + + databaseMetadata.resetCatalogIndexes(); + return new Pair<>(emptyRecordSet(query), 1); + } +} diff --git a/src/main/java/com/aerospike/jdbc/query/QueryPerformer.java b/src/main/java/com/aerospike/jdbc/query/QueryPerformer.java index 6409b31..a696b38 100644 --- a/src/main/java/com/aerospike/jdbc/query/QueryPerformer.java +++ b/src/main/java/com/aerospike/jdbc/query/QueryPerformer.java @@ -41,6 +41,14 @@ public static Pair executeQuery( queryHandler = new TruncateQueryHandler(client, statement); return queryHandler.execute(query); + case CREATE_INDEX: + queryHandler = new IndexCreateHandler(client, statement); + return queryHandler.execute(query); + + case DROP_INDEX: + queryHandler = new IndexDropHandler(client, statement); + return queryHandler.execute(query); + default: throw new UnsupportedOperationException(UNSUPPORTED_QUERY_TYPE_MESSAGE); } diff --git a/src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java b/src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java index 65a1158..0ff867c 100644 --- a/src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java @@ -8,7 +8,6 @@ import com.aerospike.client.policy.QueryPolicy; import com.aerospike.client.policy.ScanPolicy; import com.aerospike.client.query.KeyRecord; -import com.aerospike.jdbc.AerospikeDatabaseMetadata; import com.aerospike.jdbc.async.EventLoopProvider; import com.aerospike.jdbc.async.RecordSet; import com.aerospike.jdbc.async.RecordSetBatchSequenceListener; @@ -21,7 +20,6 @@ import com.aerospike.jdbc.sql.AerospikeRecordResultSet; import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.Collection; @@ -40,16 +38,10 @@ public class SelectQueryHandler extends BaseQueryHandler { private static final Logger logger = Logger.getLogger(SelectQueryHandler.class.getName()); - protected final AerospikeDatabaseMetadata databaseMetadata; protected List columns; public SelectQueryHandler(IAerospikeClient client, Statement statement) { super(client, statement); - try { - databaseMetadata = (AerospikeDatabaseMetadata) statement.getConnection().getMetaData(); - } catch (SQLException e) { - throw new IllegalStateException("Failed to get AerospikeDatabaseMetadata", e); - } } @Override diff --git a/src/main/java/com/aerospike/jdbc/util/AerospikeUtils.java b/src/main/java/com/aerospike/jdbc/util/AerospikeUtils.java index bb0737c..f80fdc2 100644 --- a/src/main/java/com/aerospike/jdbc/util/AerospikeUtils.java +++ b/src/main/java/com/aerospike/jdbc/util/AerospikeUtils.java @@ -5,20 +5,32 @@ import com.aerospike.client.cluster.Node; import com.aerospike.client.policy.InfoPolicy; import com.aerospike.client.query.IndexType; +import com.aerospike.jdbc.model.AerospikeClusterInfo; +import com.aerospike.jdbc.model.AerospikeSecondaryIndex; import com.google.common.base.Splitter; -import java.util.Arrays; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; +import java.io.IOException; +import java.io.StringReader; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME; +import static java.lang.String.format; +import static java.lang.String.join; +import static java.util.Collections.synchronizedSet; public final class AerospikeUtils { private static final Logger logger = Logger.getLogger(AerospikeUtils.class.getName()); + private static final String NEW_LINE = System.lineSeparator(); + private static final String NOT_AVAILABLE = "NA"; + private AerospikeUtils() { } @@ -54,6 +66,90 @@ public static int getTableRecordsNumber(IAerospikeClient client, String ns, Stri return (int) Math.floor((double) allRecords / replicationFactor); } + public static AerospikeClusterInfo getClusterInfo(IAerospikeClient client) { + final Collection builds = synchronizedSet(new HashSet<>()); + final Collection editions = synchronizedSet(new HashSet<>()); + final Collection namespaces = synchronizedSet(new HashSet<>()); + final Map> tables = new ConcurrentHashMap<>(); + try { + Arrays.stream(client.getNodes()).parallel() + .map(node -> Info.request(client.getInfoPolicyDefault(), node, + "namespaces", "sets", "sindex", "build", "edition")) + .forEach(info -> { + builds.add(info.get("build")); + editions.add(info.get("edition")); + Optional.ofNullable(info.get("namespaces")) + .map(ns -> Arrays.stream(ns.split(";")).filter(n -> !n.isEmpty()) + .collect(Collectors.toList())) + .ifPresent(namespaces::addAll); + + streamSubProperties(info, "sets").forEach(p -> + tables.computeIfAbsent(p.getProperty("ns"), s -> new HashSet<>()) + .addAll(Arrays.asList(p.getProperty("set"), DEFAULT_SCHEMA_NAME)) + ); + }); + } catch (Exception e) { + logger.log(Level.WARNING, "Exception in getClusterInfo", e); + } + return new AerospikeClusterInfo( + builds.isEmpty() ? NOT_AVAILABLE : join(", ", builds), + editions.isEmpty() ? "Aerospike" : join(", ", editions), + Collections.unmodifiableCollection(namespaces), + Collections.unmodifiableMap(tables) + ); + } + + public static Map> getCatalogIndexes( + IAerospikeClient client, + AerospikeVersion aerospikeVersion + ) { + final InfoPolicy infoPolicy = client.getInfoPolicyDefault(); + final Map> catalogIndexes = new HashMap<>(); + try { + String indexInfo = Info.request(infoPolicy, client.getCluster().getRandomNode(), "sindex"); + streamSubProperties(indexInfo).filter(AerospikeUtils::isSupportedIndexType) + .forEach(index -> { + String namespace = index.getProperty("ns"); + String indexName = index.getProperty("indexname"); + Integer binRatio = aerospikeVersion.isSIndexCardinalitySupported() + ? getIndexBinValuesRatio(client, namespace, indexName) + : null; + catalogIndexes.computeIfAbsent(namespace, s -> new HashSet<>()) + .add(new AerospikeSecondaryIndex( + namespace, + index.getProperty("set"), + index.getProperty("bin"), + indexName, + IndexType.valueOf(index.getProperty("type").toUpperCase(Locale.ENGLISH)), + binRatio) + ); + }); + } catch (Exception e) { + logger.log(Level.WARNING, "Exception in getCatalogIndexes", e); + } + return Collections.unmodifiableMap(catalogIndexes); + } + + @SuppressWarnings("SameParameterValue") + private static Stream streamSubProperties(Map info, String key) { + return streamSubProperties(Optional.ofNullable(info.get(key)).orElse("")); + } + + private static Stream streamSubProperties(String info) { + return Arrays.stream(info.split(";")) + .filter(str -> !str.isEmpty()) + .map(str -> str.replace(":", NEW_LINE)) + .map(str -> { + Properties properties = new Properties(); + try { + properties.load(new StringReader(str)); + } catch (IOException e) { + logger.log(Level.WARNING, format("Failed to load properties: %s", str), e); + } + return properties; + }); + } + public static boolean isSupportedIndexType(Properties properties) { String indexType = properties.getProperty("type"); return indexType.equalsIgnoreCase(IndexType.NUMERIC.toString()) @@ -63,14 +159,14 @@ public static boolean isSupportedIndexType(Properties properties) { public static Integer getIndexBinValuesRatio(IAerospikeClient client, String namespace, String indexName) { try { String indexStatData = Info.request(client.getInfoPolicyDefault(), client.getCluster().getRandomNode(), - String.format("sindex-stat:ns=%s;indexname=%s", namespace, indexName)); + format("sindex-stat:ns=%s;indexname=%s", namespace, indexName)); return Integer.valueOf(Splitter.on(";").trimResults().splitToList(indexStatData).stream() .map(stat -> Splitter.on("=").trimResults().splitToList(stat)) .collect(Collectors.toMap(t -> t.get(0), t -> t.get(1))) .get("entries_per_bval")); } catch (Exception e) { - logger.warning(() -> String.format("Failed to fetch secondary index %s cardinality", indexName)); + logger.log(Level.WARNING, format("Failed to fetch secondary index %s cardinality", indexName), e); return null; } } diff --git a/src/main/java/com/aerospike/jdbc/util/AuxStatementParser.java b/src/main/java/com/aerospike/jdbc/util/AuxStatementParser.java index 34cf6e2..fc95482 100644 --- a/src/main/java/com/aerospike/jdbc/util/AuxStatementParser.java +++ b/src/main/java/com/aerospike/jdbc/util/AuxStatementParser.java @@ -3,7 +3,9 @@ import com.aerospike.jdbc.model.AerospikeQuery; import com.aerospike.jdbc.model.QueryType; +import java.sql.SQLDataException; import java.sql.SQLException; +import java.util.Arrays; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -12,22 +14,32 @@ public final class AuxStatementParser { private static final Pattern truncateTablePattern; + private static final Pattern createIndexPattern; + private static final Pattern dropIndexPattern; static { - truncateTablePattern = Pattern.compile("truncate table (.*)", Pattern.CASE_INSENSITIVE); + truncateTablePattern = Pattern.compile( + "truncate\\s+table\\s+\"?([^\\s^;\"]+)\"?[\\s;]*", + Pattern.CASE_INSENSITIVE); + createIndexPattern = Pattern.compile( + "create\\s+index\\s+(\\S+)\\s+on\\s+\"?([^\\s^;\"]+)\"?\\s*\\((.+)\\)[\\s;]*", + Pattern.CASE_INSENSITIVE); + dropIndexPattern = Pattern.compile( + "drop\\s+index\\s+(\\S+)\\s+on\\s*\"?([^\\s^;\"]+)\"?[\\s;]*", + Pattern.CASE_INSENSITIVE); } private AuxStatementParser() { } /** - * An auxiliary method to parse queries which are currently not supported by the parser. + * An auxiliary method to parse queries which are not supported by the main parser. * * @param sql the original SQL query string. * @return an {@link com.aerospike.jdbc.model.AerospikeQuery} * @throws SQLException if no match. */ - public static AerospikeQuery hack(String sql) throws SQLException { + public static AerospikeQuery parse(String sql) throws SQLException { Matcher m = truncateTablePattern.matcher(sql); if (m.find()) { AerospikeQuery query = new AerospikeQuery(); @@ -36,6 +48,25 @@ public static AerospikeQuery hack(String sql) throws SQLException { return query; } - throw new SQLException(UNSUPPORTED_QUERY_TYPE_MESSAGE); + m = createIndexPattern.matcher(sql); + if (m.find()) { + AerospikeQuery query = new AerospikeQuery(); + query.setQueryType(QueryType.CREATE_INDEX); + query.setIndex(m.group(1)); + query.setTable(m.group(2)); + query.setColumns(Arrays.asList(m.group(3).trim().split(","))); + return query; + } + + m = dropIndexPattern.matcher(sql); + if (m.find()) { + AerospikeQuery query = new AerospikeQuery(); + query.setQueryType(QueryType.DROP_INDEX); + query.setIndex(m.group(1)); + query.setTable(m.group(2)); + return query; + } + + throw new SQLDataException(UNSUPPORTED_QUERY_TYPE_MESSAGE); } } diff --git a/src/main/java/com/aerospike/jdbc/util/DatabaseMetadataBuilder.java b/src/main/java/com/aerospike/jdbc/util/DatabaseMetadataBuilder.java index 7744cf1..d899fb5 100644 --- a/src/main/java/com/aerospike/jdbc/util/DatabaseMetadataBuilder.java +++ b/src/main/java/com/aerospike/jdbc/util/DatabaseMetadataBuilder.java @@ -1,6 +1,5 @@ package com.aerospike.jdbc.util; -import com.aerospike.client.IAerospikeClient; import com.aerospike.jdbc.AerospikeConnection; import com.aerospike.jdbc.AerospikeDatabaseMetadata; import com.aerospike.jdbc.model.DriverPolicy; @@ -21,10 +20,10 @@ public DatabaseMetadataBuilder(DriverPolicy driverPolicy) { .build(); } - public AerospikeDatabaseMetadata build(String url, IAerospikeClient client, AerospikeConnection connection) + public AerospikeDatabaseMetadata build(String url, AerospikeConnection connection) throws SQLException { try { - return metadataCache.get(url, () -> new AerospikeDatabaseMetadata(url, client, connection)); + return metadataCache.get(url, () -> new AerospikeDatabaseMetadata(url, connection)); } catch (ExecutionException e) { throw new SQLException(e); } diff --git a/src/test/java/com/aerospike/jdbc/DatabaseMetadataTest.java b/src/test/java/com/aerospike/jdbc/DatabaseMetadataTest.java index f938c02..fd86650 100644 --- a/src/test/java/com/aerospike/jdbc/DatabaseMetadataTest.java +++ b/src/test/java/com/aerospike/jdbc/DatabaseMetadataTest.java @@ -1,6 +1,5 @@ package com.aerospike.jdbc; -import com.aerospike.client.Value; import com.aerospike.jdbc.util.TestUtil; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -22,7 +21,6 @@ public class DatabaseMetadataTest extends JdbcBaseTest { @BeforeClass public void setUp() throws SQLException { - Value.UseBoolBin = false; Objects.requireNonNull(connection, "connection is null"); PreparedStatement statement = null; int count; @@ -41,16 +39,15 @@ public void setUp() throws SQLException { public void tearDown() throws SQLException { Objects.requireNonNull(connection, "connection is null"); PreparedStatement statement = null; - ResultSet resultSet = null; String query = format("delete from %s", tableName); try { statement = connection.prepareStatement(query); - resultSet = statement.executeQuery(); - resultSet.next(); + boolean result = statement.execute(); + assertFalse(result); } finally { closeQuietly(statement); - closeQuietly(resultSet); } + assertTrue(statement.getUpdateCount() > 0); } @Test diff --git a/src/test/java/com/aerospike/jdbc/IndexQueriesTest.java b/src/test/java/com/aerospike/jdbc/IndexQueriesTest.java new file mode 100644 index 0000000..aca53da --- /dev/null +++ b/src/test/java/com/aerospike/jdbc/IndexQueriesTest.java @@ -0,0 +1,106 @@ +package com.aerospike.jdbc; + +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Objects; + +import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME; +import static com.aerospike.jdbc.util.TestUtil.closeQuietly; +import static java.lang.String.format; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; + +public class IndexQueriesTest extends JdbcBaseTest { + + @BeforeClass + public void setUp() throws SQLException { + Objects.requireNonNull(connection, "connection is null"); + Statement statement = null; + int count; + String query = format( + "INSERT INTO %s (%s, bin1, int1, str1, bool1) VALUES (\"key1\", 11100, 1, \"bar\", true)", + tableName, + PRIMARY_KEY_COLUMN_NAME + ); + try { + statement = connection.createStatement(); + count = statement.executeUpdate(query); + } finally { + closeQuietly(statement); + } + assertEquals(count, 1); + } + + @AfterClass + public void tearDown() throws SQLException { + Objects.requireNonNull(connection, "connection is null"); + Statement statement = null; + String query = format("TRUNCATE TABLE %s", tableName); + try { + statement = connection.createStatement(); + boolean result = statement.execute(query); + sleep(100L); + assertFalse(result); + } finally { + closeQuietly(statement); + } + assertEquals(statement.getUpdateCount(), 1); + } + + @Test + public void testIndexCreateSuccess() throws SQLException { + Statement statement = null; + int count; + String query = format("CREATE INDEX str1_idx ON %s (str1);", tableName); + try { + statement = connection.createStatement(); + count = statement.executeUpdate(query); + } finally { + closeQuietly(statement); + } + assertEquals(count, 1); + } + + @Test + public void testIndexCreateMultiColumn() throws SQLException { + String query = format("CREATE INDEX multi_idx ON %s (str1, int1)", tableName); + final Statement statement = connection.createStatement(); + assertThrows(UnsupportedOperationException.class, () -> statement.executeUpdate(query)); + closeQuietly(statement); + } + + @Test + public void testIndexCreateUnsupportedType() throws SQLException { + String query = format("CREATE INDEX bool1_idx ON %s (bool1)", tableName); + final Statement statement = connection.createStatement(); + assertThrows(UnsupportedOperationException.class, () -> statement.executeUpdate(query)); + closeQuietly(statement); + } + + @Test + public void testIndexCreateNonExistentColumn() throws SQLException { + String query = format("CREATE INDEX ne_idx ON %s (ne)", tableName); + final Statement statement = connection.createStatement(); + assertThrows(IllegalArgumentException.class, () -> statement.executeUpdate(query)); + closeQuietly(statement); + } + + @Test + public void testIndexDropSuccess() throws SQLException { + Statement statement = null; + int count; + String query = format("DROP INDEX str1_idx ON %s;", tableName); + try { + statement = connection.createStatement(); + count = statement.executeUpdate(query); + } finally { + closeQuietly(statement); + } + assertEquals(count, 1); + } +} diff --git a/src/test/java/com/aerospike/jdbc/JdbcBaseTest.java b/src/test/java/com/aerospike/jdbc/JdbcBaseTest.java index 35d66cd..6fa3412 100644 --- a/src/test/java/com/aerospike/jdbc/JdbcBaseTest.java +++ b/src/test/java/com/aerospike/jdbc/JdbcBaseTest.java @@ -12,6 +12,7 @@ import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public abstract class JdbcBaseTest { @@ -42,7 +43,7 @@ public static void connectionClose() throws SQLException { protected void assertAllByColumnLabel(ResultSet resultSet) throws SQLException { assertEquals(resultSet.getString(PRIMARY_KEY_COLUMN_NAME), "key1"); assertEquals(resultSet.getInt("bin1"), 11100); - assertEquals(resultSet.getInt("bool1"), 1); + assertTrue(resultSet.getBoolean("bool1")); assertEquals(resultSet.getInt("int1"), 1); assertEquals(resultSet.getString("str1"), "bar"); } @@ -50,8 +51,16 @@ protected void assertAllByColumnLabel(ResultSet resultSet) throws SQLException { protected void assertAllByColumnIndex(ResultSet resultSet) throws SQLException { assertEquals(resultSet.getString(1), "key1"); assertEquals(resultSet.getInt(2), 11100); - assertEquals(resultSet.getInt(3), 1); + assertTrue(resultSet.getBoolean(3)); assertEquals(resultSet.getInt(4), 1); assertEquals(resultSet.getString(5), "bar"); } + + @SuppressWarnings("all") + protected void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + } + } } diff --git a/src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java b/src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java index 3f4c3dc..94228a4 100644 --- a/src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java +++ b/src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java @@ -49,6 +49,7 @@ public void testParseUrlParameters() throws Exception { assertEquals(config.getClientPolicy().authMode, AuthMode.EXTERNAL_INSECURE); assertEquals(config.getDriverPolicy().getRecordSetTimeoutMs(), 5000); assertFalse(Value.UseBoolBin); + Value.UseBoolBin = true; Properties update = new Properties(); update.setProperty("totalTimeout", "3000"); diff --git a/src/test/java/com/aerospike/jdbc/PreparedQueriesTest.java b/src/test/java/com/aerospike/jdbc/PreparedQueriesTest.java index e497fb5..a1e3e9f 100644 --- a/src/test/java/com/aerospike/jdbc/PreparedQueriesTest.java +++ b/src/test/java/com/aerospike/jdbc/PreparedQueriesTest.java @@ -1,6 +1,5 @@ package com.aerospike.jdbc; -import com.aerospike.client.Value; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -21,7 +20,6 @@ public class PreparedQueriesTest extends JdbcBaseTest { @BeforeMethod public void setUp() throws SQLException { - Value.UseBoolBin = false; Objects.requireNonNull(connection, "connection is null"); PreparedStatement statement = null; int count; diff --git a/src/test/java/com/aerospike/jdbc/QueryCustomParserTest.java b/src/test/java/com/aerospike/jdbc/QueryCustomParserTest.java new file mode 100644 index 0000000..4388634 --- /dev/null +++ b/src/test/java/com/aerospike/jdbc/QueryCustomParserTest.java @@ -0,0 +1,85 @@ +package com.aerospike.jdbc; + +import com.aerospike.jdbc.model.AerospikeQuery; +import com.aerospike.jdbc.model.QueryType; +import com.aerospike.jdbc.util.AuxStatementParser; +import org.testng.annotations.Test; + +import java.sql.SQLException; + +import static java.lang.String.format; +import static org.testng.Assert.assertEquals; + +public class QueryCustomParserTest { + + private static final String tableName = "jdbc"; + + @Test + public void testIndexCreateQuery() throws SQLException { + AerospikeQuery query; + + String lowCaseQuery = format("create index str1_idx on %s (str1);", tableName); + query = AuxStatementParser.parse(lowCaseQuery); + assertIndexCreateQuery(query); + + String quotedTableQuery = format("create index str1_idx on \"%s\" (str1);", tableName); + query = AuxStatementParser.parse(quotedTableQuery); + assertIndexCreateQuery(query); + + String whiteSpacesQuery = format("create index str1_idx on %s( str1 ) ;", tableName); + query = AuxStatementParser.parse(whiteSpacesQuery); + assertIndexCreateQuery(query); + } + + private void assertIndexCreateQuery(AerospikeQuery query) { + assertEquals(query.getQueryType(), QueryType.CREATE_INDEX); + assertEquals(query.getTable(), tableName); + assertEquals(query.getIndex(), "str1_idx"); + assertEquals(query.getColumns().get(0), "str1"); + } + + @Test + public void testIndexDropQuery() throws SQLException { + AerospikeQuery query; + + String lowCaseQuery = format("drop index str1_idx on %s;", tableName); + query = AuxStatementParser.parse(lowCaseQuery); + assertIndexDropQuery(query); + + String quotedTableQuery = format("drop index str1_idx on \"%s\";", tableName); + query = AuxStatementParser.parse(quotedTableQuery); + assertIndexDropQuery(query); + + String whiteSpacesQuery = format("drop index str1_idx on %s ;", tableName); + query = AuxStatementParser.parse(whiteSpacesQuery); + assertIndexDropQuery(query); + } + + private void assertIndexDropQuery(AerospikeQuery query) { + assertEquals(query.getQueryType(), QueryType.DROP_INDEX); + assertEquals(query.getTable(), tableName); + assertEquals(query.getIndex(), "str1_idx"); + } + + @Test + public void testTruncateTableQuery() throws SQLException { + AerospikeQuery query; + + String lowCaseQuery = format("truncate table %s", tableName); + query = AuxStatementParser.parse(lowCaseQuery); + assertTruncateTableQuery(query); + + String quotedTableQuery = format("truncate table \"%s\"", tableName); + query = AuxStatementParser.parse(quotedTableQuery); + assertTruncateTableQuery(query); + + String whiteSpacesQuery = format("truncate table %s ;", tableName); + query = AuxStatementParser.parse(whiteSpacesQuery); + assertTruncateTableQuery(query); + } + + private void assertTruncateTableQuery(AerospikeQuery query) { + assertEquals(query.getQueryType(), QueryType.DROP_TABLE); + assertEquals(query.getTable(), tableName); + } +} diff --git a/src/test/java/com/aerospike/jdbc/SimpleQueriesTest.java b/src/test/java/com/aerospike/jdbc/SimpleQueriesTest.java index f9fe289..9251109 100644 --- a/src/test/java/com/aerospike/jdbc/SimpleQueriesTest.java +++ b/src/test/java/com/aerospike/jdbc/SimpleQueriesTest.java @@ -1,6 +1,5 @@ package com.aerospike.jdbc; -import com.aerospike.client.Value; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -21,7 +20,6 @@ public class SimpleQueriesTest extends JdbcBaseTest { @BeforeMethod public void setUp() throws SQLException { - Value.UseBoolBin = false; Objects.requireNonNull(connection, "connection is null"); Statement statement = null; int count;