Skip to content

Commit

Permalink
FMWK-277 Set Aerospike client logger callback (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Dec 6, 2023
1 parent 9406b48 commit c3325ab
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 31 deletions.
12 changes: 11 additions & 1 deletion src/main/java/com/aerospike/jdbc/AerospikeDriver.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package com.aerospike.jdbc;

import com.aerospike.client.Log;
import com.aerospike.jdbc.util.AerospikeClientLogger;

import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverPropertyInfo;
import java.sql.SQLException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.Properties;
import java.util.logging.Logger;

import static java.util.stream.Collectors.toList;
Expand All @@ -19,6 +26,9 @@ public class AerospikeDriver implements Driver {
} catch (SQLException e) {
throw new ExceptionInInitializerError("Can not register AerospikeDriver");
}
logger.info("Set callback for Java client logs");
Log.Callback asLoggerCallback = new AerospikeClientLogger();
Log.setCallback(asLoggerCallback);
}

public Connection connect(String url, Properties info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import com.aerospike.client.query.KeyRecord;
import com.aerospike.jdbc.model.DriverPolicy;

import java.util.logging.Logger;

public class RecordSetBatchSequenceListener implements BatchSequenceListener {

private static final Logger logger = Logger.getLogger(RecordSetBatchSequenceListener.class.getName());

private final RecordSet recordSet;

public RecordSetBatchSequenceListener(DriverPolicy driverPolicy) {
Expand All @@ -31,6 +35,7 @@ public void onSuccess() {

@Override
public void onFailure(AerospikeException e) {
logger.warning(e::getMessage);
recordSet.abort();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
import com.aerospike.client.query.KeyRecord;
import com.aerospike.jdbc.model.DriverPolicy;

import java.util.logging.Logger;

public class RecordSetRecordSequenceListener implements RecordSequenceListener {

private static final Logger logger = Logger.getLogger(RecordSetRecordSequenceListener.class.getName());

private final RecordSet recordSet;

public RecordSetRecordSequenceListener(DriverPolicy driverPolicy) {
Expand All @@ -30,6 +34,7 @@ public void onSuccess() {

@Override
public void onFailure(AerospikeException exception) {
logger.warning(exception::getMessage);
recordSet.abort();
}

Expand Down
22 changes: 22 additions & 0 deletions src/main/java/com/aerospike/jdbc/query/BaseQueryHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.aerospike.jdbc.query;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Value;
Expand All @@ -12,11 +13,17 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;

import static java.util.Collections.emptyList;

public abstract class BaseQueryHandler implements QueryHandler {

private static final Logger logger = Logger.getLogger(BaseQueryHandler.class.getName());

protected final IAerospikeClient client;
protected final Statement statement;
protected final PolicyBuilder policyBuilder;
Expand Down Expand Up @@ -45,6 +52,21 @@ protected ListRecordSet emptyRecordSet(AerospikeQuery query) {
emptyList(), emptyList());
}

protected Integer getUpdateCount(Future<Integer> updateCountFuture) {
try {
return updateCountFuture.get();
} catch (ExecutionException e) {
logger.log(Level.SEVERE, "Future computation failure", e.getCause());
} catch (InterruptedException e) {
logger.log(Level.SEVERE, "Thread was interrupted", e);
}
return 0;
}

protected void logAerospikeException(AerospikeException e) {
logger.log(Level.SEVERE, "Aerospike operation failure", e);
}

private DriverConfiguration getConfiguration() {
try {
return ((AerospikeConnection) statement.getConnection()).getConfiguration();
Expand Down
13 changes: 4 additions & 9 deletions src/main/java/com/aerospike/jdbc/query/DeleteQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

Expand All @@ -39,15 +38,11 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
try {
client.delete(EventLoopProvider.getEventLoop(), listener, writePolicy, key);
} catch (AerospikeException e) {
logger.warning("Error on database call: " + e.getMessage());
logAerospikeException(e);
listener.onFailure(e);
}
}
try {
return new Pair<>(emptyRecordSet(query), listener.getTotal().get());
} catch (InterruptedException | ExecutionException e) {
return new Pair<>(emptyRecordSet(query), 0);
}
return new Pair<>(emptyRecordSet(query), getUpdateCount(listener.getTotal()));
} else {
logger.info("DELETE scan");
RecordSetRecordSequenceListener listener = new RecordSetRecordSequenceListener(config.getDriverPolicy());
Expand All @@ -63,8 +58,8 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
try {
if (client.delete(deletePolicy, r.key))
count.incrementAndGet();
} catch (Exception e) {
logger.warning("Failed to delete record: " + e.getMessage());
} catch (AerospikeException e) {
logAerospikeException(e);
}
});

Expand Down
17 changes: 4 additions & 13 deletions src/main/java/com/aerospike/jdbc/query/InsertQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -56,15 +55,11 @@ public Pair<ResultSet, Integer> putConsecutively(AerospikeQuery query) {
try {
client.put(EventLoopProvider.getEventLoop(), listener, writePolicy, key, bins);
} catch (AerospikeException e) {
logger.severe("Error on database call: " + e.getMessage());
logAerospikeException(e);
listener.onFailure(e);
}
}
try {
return new Pair<>(emptyRecordSet(query), listener.getTotal().get());
} catch (InterruptedException | ExecutionException e) {
return new Pair<>(emptyRecordSet(query), 0);
}
return new Pair<>(emptyRecordSet(query), getUpdateCount(listener.getTotal()));
}

public Pair<ResultSet, Integer> putBatch(AerospikeQuery query) {
Expand Down Expand Up @@ -93,15 +88,11 @@ public Pair<ResultSet, Integer> putBatch(AerospikeQuery query) {
try {
client.operate(EventLoopProvider.getEventLoop(), listener, batchPolicy, batchRecords);
} catch (AerospikeException e) {
logger.severe("Error on database call: " + e.getMessage());
// no error log as this completes the future exceptionally
listener.onFailure(e);
}

try {
return new Pair<>(emptyRecordSet(query), listener.getTotal().get());
} catch (InterruptedException | ExecutionException e) {
return new Pair<>(emptyRecordSet(query), 0);
}
return new Pair<>(emptyRecordSet(query), getUpdateCount(listener.getTotal()));
}

protected Bin[] buildBinArray(List<String> binNames, List<Object> values) {
Expand Down
11 changes: 3 additions & 8 deletions src/main/java/com/aerospike/jdbc/query/UpdateQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

Expand All @@ -41,15 +40,11 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
try {
client.put(EventLoopProvider.getEventLoop(), listener, writePolicy, key, bins);
} catch (AerospikeException e) {
logger.warning("Error on database call: " + e.getMessage());
logAerospikeException(e);
listener.onFailure(e);
}
}
try {
return new Pair<>(emptyRecordSet(query), listener.getTotal().get());
} catch (InterruptedException | ExecutionException e) {
return new Pair<>(emptyRecordSet(query), 0);
}
return new Pair<>(emptyRecordSet(query), getUpdateCount(listener.getTotal()));
} else {
logger.info("UPDATE scan");
RecordSetRecordSequenceListener listener = new RecordSetRecordSequenceListener(config.getDriverPolicy());
Expand All @@ -64,7 +59,7 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
client.put(writePolicy, r.key, bins);
count.incrementAndGet();
} catch (AerospikeException e) {
logger.warning("Failed to update record: " + e.getMessage());
logAerospikeException(e);
}
});

Expand Down
37 changes: 37 additions & 0 deletions src/main/java/com/aerospike/jdbc/util/AerospikeClientLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.aerospike.jdbc.util;

import com.aerospike.client.Log;

import java.util.logging.Logger;

import static java.lang.String.format;

public class AerospikeClientLogger implements Log.Callback {

private static final Logger logger = Logger.getLogger(AerospikeClientLogger.class.getName());

@Override
public void log(Log.Level level, String message) {
switch (level) {
case DEBUG:
logger.fine(message);
break;

case INFO:
logger.info(message);
break;

case WARN:
logger.warning(message);
break;

case ERROR:
logger.severe(message);
break;

default:
logger.warning(() -> format("Unexpected Aerospike client log level %s. Message: %s",
level, message));
}
}
}

0 comments on commit c3325ab

Please sign in to comment.