-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Process resync partitions in RDS source #5171
Conversation
f620e9b
to
07a1012
Compare
07a1012
to
de92832
Compare
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
de92832
to
9aea7db
Compare
Had to rebase to resolve merge conflicts. |
...urce/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/ResyncScheduler.java
Outdated
Show resolved
Hide resolved
String[] keySplits = resyncPartition.getPartitionKey().split("\\|"); | ||
final String database = keySplits[0]; | ||
final String table = keySplits[1]; | ||
final long eventTimestampMillis = Long.parseLong(keySplits[2]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add validation on number of key splits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can also be move to separate data structure and be created here.
final PartitionInfo partitionInfo = parsePartitionKey();
private static class PartitionInfo {
final String database;
final String table;
final long eventTimestampMillis;
PartitionInfo(String database, String table, long eventTimestampMillis) {
this.database = database;
this.table = table;
this.eventTimestampMillis = eventTimestampMillis;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the change in ResyncPartition class and added unit tests.
String[] keySplits = resyncPartition.getPartitionKey().split("\\|"); | ||
final String database = keySplits[0]; | ||
final String table = keySplits[1]; | ||
final long eventTimestampMillis = Long.parseLong(keySplits[2]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can also be move to separate data structure and be created here.
final PartitionInfo partitionInfo = parsePartitionKey();
private static class PartitionInfo {
final String database;
final String table;
final long eventTimestampMillis;
PartitionInfo(String database, String table, long eventTimestampMillis) {
this.database = database;
this.table = table;
this.eventTimestampMillis = eventTimestampMillis;
}
}
return new ResyncWorker(resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet); | ||
} | ||
|
||
public void run() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code can be modular. I will have some comments below to break down
final String table = keySplits[1]; | ||
final long eventTimestampMillis = Long.parseLong(keySplits[2]); | ||
|
||
if (resyncPartition.getProgressState().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to separate method to validate and get progress state
final ResyncProgressState progressState = validateAndGetProgressState();
private ResyncProgressState validateAndGetProgressState() {
return resyncPartition.getProgressState()
.orElseThrow(() -> new IllegalStateException(
"ResyncPartition " + resyncPartition.getPartitionKey() + " doesn't contain progress state."));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
queryStatement = String.format("SELECT * FROM %s WHERE %s IS NULL", database + "." + table, foreignKeyName); | ||
} else { | ||
queryStatement = String.format("SELECT * FROM %s WHERE %s='%s'", database + "." + table, foreignKeyName, updatedValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the query to static constant. You can use SQL Prepared statement which is pre-compiled SQL statement.
Move this to separate method to build and execute query
List<Map<String, Object>> rows = executeQuery(); // or executeQuery(queryStatement)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); | ||
|
||
for (Map<String, Object> row : rows) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move rest of the code to new method to process rows
processRows(rows, partitionInfo, progressState);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
try (ResultSet resultSet = statement.executeQuery(query)) { | ||
return convertResultSetToList(resultSet); | ||
} | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider adding retry if there is retryable error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
} | ||
LOG.debug("Query statement: {}", queryStatement); | ||
|
||
List<Map<String, Object>> rows = queryManager.selectRows(queryStatement); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These would need data type mapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Will add.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
Signed-off-by: Hai Yan <[email protected]>
public void run() { | ||
LOG.info("Start running Resync Scheduler"); | ||
ResyncPartition resyncPartition = null; | ||
while (!shutdownRequested && !Thread.currentThread().isInterrupted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add some metrics on when the resync occurs.
processRows(rows, database, table, primaryKeys, eventTimestampMillis); | ||
} | ||
|
||
private void processRows(List<Map<String, Object>> rows, String database, String table, List<String> primaryKeys, long eventTimestampMillis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add metrics when records are processed with count
Thanks for the refactoring and changes. I had some comments on adding metrics to ReSync worker. You can address that as a separate issue. |
Description
Followup PR to #5168 to process the resync partitions.
Tested against some MySQL tables to verify that the source generated expected events.
Issues Resolved
Contributes to #4561
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.