Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle primary key change events in RDS source #5186

Merged
merged 3 commits into from
Nov 15, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHelper;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.model.ParentTable;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.resync.CascadingActionDetector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -267,7 +266,7 @@ void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) {
return;
}

handleRowChangeEvent(event, data.getTableId(), data.getRows(), OpenSearchBulkActions.INDEX);
handleRowChangeEvent(event, data.getTableId(), Map.of(OpenSearchBulkActions.INDEX, data.getRows()));
}

void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) {
Expand All @@ -281,12 +280,25 @@ void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) {
// Check if a cascade action is involved
cascadeActionDetector.detectCascadingUpdates(event, parentTableMap, tableMetadataMap.get(data.getTableId()));

// updatedRow contains data before update as key and data after update as value
final List<Serializable[]> rows = data.getRows().stream()
.map(Map.Entry::getValue)
.collect(Collectors.toList());
final TableMetadata tableMetadata = tableMetadataMap.get(data.getTableId());
List<Serializable[]> rowsToUpdate = new ArrayList<>();
// Delete is needed when primary key columns have value change
List<Serializable[]> rowsToDelete = new ArrayList<>();

// row map contains data before update as key and data after update as value
for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {
for (int i = 0; i < row.getKey().length; i++) {
if (tableMetadata.getPrimaryKeys().contains(tableMetadata.getColumnNames().get(i)) &&
!row.getKey()[i].equals(row.getValue()[i])) {
LOG.debug("Primary keys were updated");
rowsToDelete.add(row.getKey());
break;
}
}
rowsToUpdate.add(row.getValue());
}

handleRowChangeEvent(event, data.getTableId(), rows, OpenSearchBulkActions.INDEX);
handleRowChangeEvent(event, data.getTableId(), Map.of(OpenSearchBulkActions.DELETE, rowsToDelete, OpenSearchBulkActions.INDEX, rowsToUpdate));
}

void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) {
Expand All @@ -300,7 +312,7 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) {
// Check if a cascade action is involved
cascadeActionDetector.detectCascadingDeletes(event, parentTableMap, tableMetadataMap.get(data.getTableId()));

handleRowChangeEvent(event, data.getTableId(), data.getRows(), OpenSearchBulkActions.DELETE);
handleRowChangeEvent(event, data.getTableId(), Map.of(OpenSearchBulkActions.DELETE, data.getRows()));
}

private boolean isValidTableId(long tableId) {
Expand All @@ -318,9 +330,8 @@ private boolean isValidTableId(long tableId) {
}

private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
long tableId,
List<Serializable[]> rows,
OpenSearchBulkActions bulkAction) {
long tableId,
Map<OpenSearchBulkActions, List<Serializable[]>> bulkActionToRowDataMap) {

// Update binlog coordinate after it's first assigned in rotate event handler
if (currentBinlogCoordinate != null) {
Expand All @@ -343,31 +354,36 @@ private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event eve
final long eventTimestampMillis = event.getHeader().getTimestamp();

final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
for (Object[] rowDataArray : rows) {
final Map<String, Object> rowDataMap = new HashMap<>();
for (int i = 0; i < rowDataArray.length; i++) {
final Map<String, String> tbColumnDatatypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(tableMetadata.getFullTableName());
final String columnDataType = tbColumnDatatypeMap.get(columnNames.get(i));
final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataType), columnNames.get(i),
rowDataArray[i], tableMetadata);
rowDataMap.put(columnNames.get(i), data);
}

final Event dataPrepperEvent = JacksonEvent.builder()
.withEventType(DATA_PREPPER_EVENT_TYPE)
.withData(rowDataMap)
.build();

final Event pipelineEvent = recordConverter.convert(
dataPrepperEvent,
tableMetadata.getDatabaseName(),
tableMetadata.getTableName(),
bulkAction,
primaryKeys,
eventTimestampMillis,
eventTimestampMillis,
event.getHeader().getEventType());
pipelineEvents.add(pipelineEvent);
for (Map.Entry<OpenSearchBulkActions, List<Serializable[]>> entry : bulkActionToRowDataMap.entrySet()) {
final OpenSearchBulkActions bulkAction = entry.getKey();
final List<Serializable[]> rows = entry.getValue();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should process all the events in order. Here looks like we process INDEX and DELETE operations separately. We might have a race condition where the same key might exist in INDEX and DELETE operation.

k1-> k2; k3->k1

Index keys: k2, k1
Delete keys: k1, k3

k1 is in both index and delete.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of Map<OpenSearchBulkActions, List<Serializable[]>>, we should have List that contains OpenSearchBulkAction as property to process the events in order.

Copy link
Collaborator Author

@oeyh oeyh Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I took your suggestions and arrange the row changes in order, and put delete before index. So in the above case, the generated events will be: delete k1, index k2, delete k3, index k1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good to me. Can you add unit test ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the unit test!

for (Object[] rowDataArray : rows) {
final Map<String, Object> rowDataMap = new HashMap<>();
for (int i = 0; i < rowDataArray.length; i++) {
final Map<String, String> tbColumnDatatypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(tableMetadata.getFullTableName());
final String columnDataType = tbColumnDatatypeMap.get(columnNames.get(i));
final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataType), columnNames.get(i),
rowDataArray[i], tableMetadata);
rowDataMap.put(columnNames.get(i), data);
}

final Event dataPrepperEvent = JacksonEvent.builder()
.withEventType(DATA_PREPPER_EVENT_TYPE)
.withData(rowDataMap)
.build();

final Event pipelineEvent = recordConverter.convert(
dataPrepperEvent,
tableMetadata.getDatabaseName(),
tableMetadata.getTableName(),
bulkAction,
primaryKeys,
eventTimestampMillis,
eventTimestampMillis,
event.getHeader().getEventType());
pipelineEvents.add(pipelineEvent);
}
}

writeToBuffer(bufferAccumulator, acknowledgementSet);
Expand Down
Loading