Skip to content

Commit

Permalink
Preserve delete and index event order
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Nov 13, 2024
1 parent 7d68f78 commit 10386cc
Showing 1 changed file with 46 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,13 @@ private Map<String, String[]> getStrValuesMap(final TableMapEventData eventData,
List<String> columnNames = eventData.getEventMetadata().getColumnNames();
List<String[]> strValues = getStrValues(eventData, columnType);

LOG.debug("Getting tbMetadata for table {}.{}", eventData.getDatabase(), eventData.getTable());
final Map<String, String> tbMetadata = dbTableMetadata.getTableColumnDataTypeMap()
.get(eventData.getDatabase() + SEPARATOR + eventData.getTable());

LOG.debug("tbMetadata: {}", tbMetadata);
for (int i = 0, j=0; i < columnNames.size(); i++) {
LOG.debug("Column name: {}, Column type: {}", columnNames.get(i), columnType);
final String dataType = tbMetadata.get(columnNames.get(i));
if (MySQLDataType.byDataType(dataType) == columnType) {
strValuesMap.put(columnNames.get(i), strValues.get(j++));
Expand Down Expand Up @@ -266,7 +269,7 @@ void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) {
return;
}

handleRowChangeEvent(event, data.getTableId(), Map.of(OpenSearchBulkActions.INDEX, data.getRows()));
handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.INDEX));
}

void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) {
Expand All @@ -281,24 +284,28 @@ void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) {
cascadeActionDetector.detectCascadingUpdates(event, parentTableMap, tableMetadataMap.get(data.getTableId()));

final TableMetadata tableMetadata = tableMetadataMap.get(data.getTableId());
List<Serializable[]> rowsToUpdate = new ArrayList<>();
// Delete is needed when primary key columns have value change
List<Serializable[]> rowsToDelete = new ArrayList<>();
final List<OpenSearchBulkActions> bulkActions = new ArrayList<>();
final List<Serializable[]> rows = new ArrayList<>();
for (int rowNum = 0; rowNum < data.getRows().size(); rowNum++) {
// `row` contains data before update as key and data after update as value
Map.Entry<Serializable[], Serializable[]> row = data.getRows().get(rowNum);

// row map contains data before update as key and data after update as value
for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {
for (int i = 0; i < row.getKey().length; i++) {
if (tableMetadata.getPrimaryKeys().contains(tableMetadata.getColumnNames().get(i)) &&
!row.getKey()[i].equals(row.getValue()[i])) {
LOG.debug("Primary keys were updated");
rowsToDelete.add(row.getKey());
// add delete event for the old row data
rows.add(row.getKey());
bulkActions.add(OpenSearchBulkActions.DELETE);
break;
}
}
rowsToUpdate.add(row.getValue());
// add index event for the new row data
rows.add(row.getValue());
bulkActions.add(OpenSearchBulkActions.INDEX);
}

handleRowChangeEvent(event, data.getTableId(), Map.of(OpenSearchBulkActions.DELETE, rowsToDelete, OpenSearchBulkActions.INDEX, rowsToUpdate));
handleRowChangeEvent(event, data.getTableId(), rows, bulkActions);
}

void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) {
Expand All @@ -312,7 +319,7 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) {
// Check if a cascade action is involved
cascadeActionDetector.detectCascadingDeletes(event, parentTableMap, tableMetadataMap.get(data.getTableId()));

handleRowChangeEvent(event, data.getTableId(), Map.of(OpenSearchBulkActions.DELETE, data.getRows()));
handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.DELETE));
}

private boolean isValidTableId(long tableId) {
Expand All @@ -331,7 +338,8 @@ private boolean isValidTableId(long tableId) {

private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
long tableId,
Map<OpenSearchBulkActions, List<Serializable[]>> bulkActionToRowDataMap) {
List<Serializable[]> rows,
List<OpenSearchBulkActions> bulkActions) {

// Update binlog coordinate after it's first assigned in rotate event handler
if (currentBinlogCoordinate != null) {
Expand All @@ -355,35 +363,34 @@ private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event eve

final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);

for (Map.Entry<OpenSearchBulkActions, List<Serializable[]>> entry : bulkActionToRowDataMap.entrySet()) {
final OpenSearchBulkActions bulkAction = entry.getKey();
final List<Serializable[]> rows = entry.getValue();
for (Object[] rowDataArray : rows) {
final Map<String, Object> rowDataMap = new HashMap<>();
for (int i = 0; i < rowDataArray.length; i++) {
final Map<String, String> tbColumnDatatypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(tableMetadata.getFullTableName());
final String columnDataType = tbColumnDatatypeMap.get(columnNames.get(i));
final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataType), columnNames.get(i),
rowDataArray[i], tableMetadata);
rowDataMap.put(columnNames.get(i), data);
}

final Event dataPrepperEvent = JacksonEvent.builder()
.withEventType(DATA_PREPPER_EVENT_TYPE)
.withData(rowDataMap)
.build();

final Event pipelineEvent = recordConverter.convert(
dataPrepperEvent,
tableMetadata.getDatabaseName(),
tableMetadata.getTableName(),
bulkAction,
primaryKeys,
eventTimestampMillis,
eventTimestampMillis,
event.getHeader().getEventType());
pipelineEvents.add(pipelineEvent);
for (int rowNum = 0; rowNum < rows.size(); rowNum++) {
final Object[] rowDataArray = rows.get(rowNum);
final OpenSearchBulkActions bulkAction = bulkActions.get(rowNum);

final Map<String, Object> rowDataMap = new HashMap<>();
for (int i = 0; i < rowDataArray.length; i++) {
final Map<String, String> tbColumnDatatypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(tableMetadata.getFullTableName());
final String columnDataType = tbColumnDatatypeMap.get(columnNames.get(i));
final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataType), columnNames.get(i),
rowDataArray[i], tableMetadata);
rowDataMap.put(columnNames.get(i), data);
}

final Event dataPrepperEvent = JacksonEvent.builder()
.withEventType(DATA_PREPPER_EVENT_TYPE)
.withData(rowDataMap)
.build();

final Event pipelineEvent = recordConverter.convert(
dataPrepperEvent,
tableMetadata.getDatabaseName(),
tableMetadata.getTableName(),
bulkAction,
primaryKeys,
eventTimestampMillis,
eventTimestampMillis,
event.getHeader().getEventType());
pipelineEvents.add(pipelineEvent);
}

writeToBuffer(bufferAccumulator, acknowledgementSet);
Expand Down

0 comments on commit 10386cc

Please sign in to comment.