-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle primary key change events in RDS source #5186
Conversation
Signed-off-by: Hai Yan <[email protected]>
pipelineEvents.add(pipelineEvent); | ||
for (Map.Entry<OpenSearchBulkActions, List<Serializable[]>> entry : bulkActionToRowDataMap.entrySet()) { | ||
final OpenSearchBulkActions bulkAction = entry.getKey(); | ||
final List<Serializable[]> rows = entry.getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should process all the events in order. Here looks like we process INDEX and DELETE operations separately. We might have a race condition where the same key might exist in INDEX and DELETE operation.
k1-> k2; k3->k1
Index keys: k2, k1
Delete keys: k1, k3
k1 is in both index and delete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of Map<OpenSearchBulkActions, List<Serializable[]>>
, we should have List that contains OpenSearchBulkAction as property to process the events in order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I took your suggestions and arrange the row changes in order, and put delete before index. So in the above case, the generated events will be: delete k1, index k2, delete k3, index k1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good to me. Can you add unit test ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the unit test!
Signed-off-by: Hai Yan <[email protected]>
10386cc
to
84d2fe9
Compare
Signed-off-by: Hai Yan <[email protected]>
.collect(Collectors.toList()); | ||
final TableMetadata tableMetadata = tableMetadataMap.get(data.getTableId()); | ||
final List<OpenSearchBulkActions> bulkActions = new ArrayList<>(); | ||
final List<Serializable[]> rows = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It seems to me more readable variable name to change rows -> rowUpdates. Same applies to row -> rowUpdate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Will make this change in coming PRs for RDS source.
Description
It's possible that value of a primary key is changed in MySQL update statement. When this happens, we need to generate two events - one for indexing the updated record (with the new primary key value), and another one for deleting the old record. This PR add changes to handle it.
Testing
Tested against an Aurora MySQL table and verified that the two expected events were generated.
Issues Resolved
Contributes to #4561
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.