Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process resync partitions in RDS source #5171

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.opensearch.dataprepper.plugins.source.rds.leader.RdsApiStrategy;
import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.resync.ResyncScheduler;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler;
Expand Down Expand Up @@ -64,6 +66,7 @@ public class RdsService {
private ExportScheduler exportScheduler;
private DataFileScheduler dataFileScheduler;
private StreamScheduler streamScheduler;
private ResyncScheduler resyncScheduler;

public RdsService(final EnhancedSourceCoordinator sourceCoordinator,
final RdsSourceConfig sourceConfig,
Expand Down Expand Up @@ -129,6 +132,10 @@ public void start(Buffer<Record<Event>> buffer) {
streamScheduler = new StreamScheduler(
sourceCoordinator, sourceConfig, s3PathPrefix, binaryLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable);
runnableList.add(streamScheduler);

resyncScheduler = new ResyncScheduler(
sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(resyncScheduler);
}

executor = Executors.newFixedThreadPool(runnableList.size());
Expand Down Expand Up @@ -158,14 +165,26 @@ public void shutdown() {

private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final ConnectionManager connectionManager = new ConnectionManager(
dbMetadata.getHostName(),
dbMetadata.getEndpoint(),
dbMetadata.getPort(),
sourceConfig.getAuthenticationConfig().getUsername(),
sourceConfig.getAuthenticationConfig().getPassword(),
sourceConfig.isTlsEnabled());
return new SchemaManager(connectionManager);
}

private QueryManager getQueryManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final String readerEndpoint = dbMetadata.getReaderEndpoint() != null ? dbMetadata.getReaderEndpoint() : dbMetadata.getEndpoint();
final int readerPort = dbMetadata.getReaderPort() == 0 ? dbMetadata.getPort() : dbMetadata.getReaderPort();
final ConnectionManager readerConnectionManager = new ConnectionManager(
readerEndpoint,
readerPort,
sourceConfig.getAuthenticationConfig().getUsername(),
sourceConfig.getAuthenticationConfig().getPassword(),
sourceConfig.isTlsEnabled());
return new QueryManager(readerConnectionManager);
}

private String getS3PathPrefix() {
final String s3UserPathPrefix;
if (sourceConfig.getS3Prefix() != null && !sourceConfig.getS3Prefix().isBlank()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.rds.coordination.partition;

import lombok.Getter;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState;
Expand All @@ -18,21 +19,23 @@ public class ResyncPartition extends EnhancedSourcePartition<ResyncProgressState
private final String database;
private final String table;
private final long timestamp;
private final PartitionKeyInfo partitionKeyInfo;
private final ResyncProgressState state;

public ResyncPartition(String database, String table, long timestamp, ResyncProgressState state) {
this.database = database;
this.table = table;
this.timestamp = timestamp;
partitionKeyInfo = new PartitionKeyInfo(database, table, timestamp);
this.state = state;
}

public ResyncPartition(SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
String[] keySplits = sourcePartitionStoreItem.getSourcePartitionKey().split("\\|");
database = keySplits[0];
table = keySplits[1];
timestamp = Long.parseLong(keySplits[2]);
partitionKeyInfo = PartitionKeyInfo.fromString(sourcePartitionStoreItem.getSourcePartitionKey());
database = partitionKeyInfo.getDatabase();
table = partitionKeyInfo.getTable();
timestamp = partitionKeyInfo.getTimestamp();
state = convertStringToPartitionProgressState(ResyncProgressState.class, sourcePartitionStoreItem.getPartitionProgressState());
}

Expand All @@ -43,7 +46,7 @@ public String getPartitionType() {

@Override
public String getPartitionKey() {
return database + "|" + table + "|" + timestamp;
return partitionKeyInfo.toString();
}

@Override
Expand All @@ -53,4 +56,34 @@ public Optional<ResyncProgressState> getProgressState() {
}
return Optional.empty();
}

public PartitionKeyInfo getPartitionKeyInfo() {
return partitionKeyInfo;
}

@Getter
public static class PartitionKeyInfo {
private final String database;
private final String table;
private final long timestamp;

private PartitionKeyInfo(String database, String table, long timestamp) {
this.database = database;
this.table = table;
this.timestamp = timestamp;
}

private static PartitionKeyInfo fromString(String partitionKey) {
String[] keySplits = partitionKey.split("\\|");
if (keySplits.length != 3) {
throw new IllegalArgumentException("Invalid partition key: " + partitionKey);
}
return new PartitionKeyInfo(keySplits[0], keySplits[1], Long.parseLong(keySplits[2]));
}

@Override
public String toString() {
return database + "|" + table + "|" + timestamp;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ public DbMetadata describeDb(String dbIdentifier) {
try {
final DescribeDbClustersResponse response = rdsClient.describeDBClusters(request);
final DBCluster dbCluster = response.dbClusters().get(0);
return new DbMetadata(dbIdentifier, dbCluster.endpoint(), dbCluster.port());
return DbMetadata.builder()
.dbIdentifier(dbIdentifier)
.endpoint(dbCluster.endpoint())
.port(dbCluster.port())
.readerEndpoint(dbCluster.readerEndpoint())
.readerPort(dbCluster.port())
.build();
} catch (Exception e) {
throw new RuntimeException("Failed to describe DB " + dbIdentifier, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,28 @@ public InstanceApiStrategy(final RdsClient rdsClient) {

@Override
public DbMetadata describeDb(String dbIdentifier) {
final DescribeDbInstancesRequest request = DescribeDbInstancesRequest.builder()
.dbInstanceIdentifier(dbIdentifier)
.build();

try {
final DescribeDbInstancesRequest request = DescribeDbInstancesRequest.builder()
.dbInstanceIdentifier(dbIdentifier)
.build();

final DescribeDbInstancesResponse response = rdsClient.describeDBInstances(request);
final DBInstance dbInstance = response.dbInstances().get(0);
return new DbMetadata(dbIdentifier, dbInstance.endpoint().address(), dbInstance.endpoint().port());
DbMetadata.DbMetadataBuilder dbMetadataBuilder = DbMetadata.builder()
.dbIdentifier(dbIdentifier)
.endpoint(dbInstance.endpoint().address())
.port(dbInstance.endpoint().port());

if (dbInstance.hasReadReplicaDBInstanceIdentifiers()) {
final DescribeDbInstancesRequest readerInstanceRequest = DescribeDbInstancesRequest.builder()
.dbInstanceIdentifier(dbInstance.readReplicaDBInstanceIdentifiers().get(0))
.build();
final DescribeDbInstancesResponse readerInstanceResponse = rdsClient.describeDBInstances(readerInstanceRequest);
final DBInstance readerInstance = readerInstanceResponse.dbInstances().get(0);
dbMetadataBuilder.readerEndpoint(readerInstance.endpoint().address())
.readerPort(readerInstance.endpoint().port());
}
return dbMetadataBuilder.build();
} catch (Exception e) {
throw new RuntimeException("Failed to describe DB " + dbIdentifier, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,49 @@

package org.opensearch.dataprepper.plugins.source.rds.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@Getter
@AllArgsConstructor
@Builder
public class DbMetadata {

private static final String DB_IDENTIFIER_KEY = "dbIdentifier";
private static final String HOST_NAME_KEY = "hostName";
private static final String PORT_KEY = "port";
static final String DB_IDENTIFIER_KEY = "dbIdentifier";
static final String ENDPOINT_KEY = "endpoint";
static final String PORT_KEY = "port";
static final String READER_ENDPOINT_KEY = "readerEndpoint";
static final String READER_PORT_KEY = "readerPort";

private final String dbIdentifier;
private final String hostName;
private final String endpoint;
private final int port;

public DbMetadata(final String dbIdentifier, final String hostName, final int port) {
this.dbIdentifier = dbIdentifier;
this.hostName = hostName;
this.port = port;
}

public String getDbIdentifier() {
return dbIdentifier;
}

public String getHostName() {
return hostName;
}

public int getPort() {
return port;
}
private final String readerEndpoint;
private final int readerPort;

public Map<String, Object> toMap() {
return Map.of(
DB_IDENTIFIER_KEY, dbIdentifier,
HOST_NAME_KEY, hostName,
PORT_KEY, port
);
Map<String, Object> map = new HashMap<>();
map.put(DB_IDENTIFIER_KEY, dbIdentifier);
map.put(ENDPOINT_KEY, endpoint);
map.put(PORT_KEY, port);
map.put(READER_ENDPOINT_KEY, readerEndpoint);
map.put(READER_PORT_KEY, readerPort);

return Collections.unmodifiableMap(map);
}

public static DbMetadata fromMap(Map<String, Object> map) {
return new DbMetadata(
(String) map.get(DB_IDENTIFIER_KEY),
(String) map.get(HOST_NAME_KEY),
((Integer) map.get(PORT_KEY))
(String) map.get(ENDPOINT_KEY),
(Integer) map.get(PORT_KEY),
(String) map.get(READER_ENDPOINT_KEY),
(Integer) map.get(READER_PORT_KEY)
);
}
}
Loading
Loading