Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process resync partitions in RDS source #5171

Merged
merged 5 commits into from
Nov 15, 2024

Conversation

oeyh
Copy link
Collaborator

@oeyh oeyh commented Nov 6, 2024

Description

Followup PR to #5168 to process the resync partitions.

Tested against some MySQL tables to verify that the source generated expected events.

Issues Resolved

Contributes to #4561

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
@oeyh
Copy link
Collaborator Author

oeyh commented Nov 12, 2024

Had to rebase to resolve merge conflicts.

chenqi0805
chenqi0805 previously approved these changes Nov 13, 2024
Comment on lines 65 to 68
String[] keySplits = resyncPartition.getPartitionKey().split("\\|");
final String database = keySplits[0];
final String table = keySplits[1];
final long eventTimestampMillis = Long.parseLong(keySplits[2]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add validation on number of key splits

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also be move to separate data structure and be created here.

final PartitionInfo partitionInfo = parsePartitionKey();
private static class PartitionInfo {
    final String database;
    final String table;
    final long eventTimestampMillis;

    PartitionInfo(String database, String table, long eventTimestampMillis) {
        this.database = database;
        this.table = table;
        this.eventTimestampMillis = eventTimestampMillis;
    }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the change in ResyncPartition class and added unit tests.

Comment on lines 65 to 68
String[] keySplits = resyncPartition.getPartitionKey().split("\\|");
final String database = keySplits[0];
final String table = keySplits[1];
final long eventTimestampMillis = Long.parseLong(keySplits[2]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also be move to separate data structure and be created here.

final PartitionInfo partitionInfo = parsePartitionKey();
private static class PartitionInfo {
    final String database;
    final String table;
    final long eventTimestampMillis;

    PartitionInfo(String database, String table, long eventTimestampMillis) {
        this.database = database;
        this.table = table;
        this.eventTimestampMillis = eventTimestampMillis;
    }
}

return new ResyncWorker(resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet);
}

public void run() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code can be modular. I will have some comments below to break down

final String table = keySplits[1];
final long eventTimestampMillis = Long.parseLong(keySplits[2]);

if (resyncPartition.getProgressState().isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to separate method to validate and get progress state

final ResyncProgressState progressState = validateAndGetProgressState();
private ResyncProgressState validateAndGetProgressState() {
    return resyncPartition.getProgressState()
            .orElseThrow(() -> new IllegalStateException(
                    "ResyncPartition " + resyncPartition.getPartitionKey() + " doesn't contain progress state."));
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 81 to 83
queryStatement = String.format("SELECT * FROM %s WHERE %s IS NULL", database + "." + table, foreignKeyName);
} else {
queryStatement = String.format("SELECT * FROM %s WHERE %s='%s'", database + "." + table, foreignKeyName, updatedValue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the query to static constant. You can use SQL Prepared statement which is pre-compiled SQL statement.

Move this to separate method to build and execute query

List<Map<String, Object>> rows = executeQuery(); // or executeQuery(queryStatement)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);

for (Map<String, Object> row : rows) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move rest of the code to new method to process rows

processRows(rows, partitionInfo, progressState);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

try (ResultSet resultSet = statement.executeQuery(query)) {
return convertResultSetToList(resultSet);
}
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider adding retry if there is retryable error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

}
LOG.debug("Query statement: {}", queryStatement);

List<Map<String, Object>> rows = queryManager.selectRows(queryStatement);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These would need data type mapping.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Will add.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Signed-off-by: Hai Yan <[email protected]>
public void run() {
LOG.info("Start running Resync Scheduler");
ResyncPartition resyncPartition = null;
while (!shutdownRequested && !Thread.currentThread().isInterrupted()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some metrics on when the resync occurs.

processRows(rows, database, table, primaryKeys, eventTimestampMillis);
}

private void processRows(List<Map<String, Object>> rows, String database, String table, List<String> primaryKeys, long eventTimestampMillis) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add metrics when records are processed with count

@dinujoh
Copy link
Member

dinujoh commented Nov 15, 2024

Thanks for the refactoring and changes. I had some comments on adding metrics to ReSync worker. You can address that as a separate issue.

@oeyh oeyh merged commit 2e32d3d into opensearch-project:main Nov 15, 2024
43 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants