From 7d68f789b1be5fc3208a3cb1247d8020cef7608c Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Tue, 12 Nov 2024 09:35:35 -0600 Subject: [PATCH 1/3] Handle primary key change events Signed-off-by: Hai Yan --- .../rds/stream/BinlogEventListener.java | 86 +++++++++++-------- 1 file changed, 51 insertions(+), 35 deletions(-) diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java index af8892ffdf..c5b2a5465b 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -36,7 +36,6 @@ import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHelper; import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; import org.opensearch.dataprepper.plugins.source.rds.model.ParentTable; -import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import org.opensearch.dataprepper.plugins.source.rds.resync.CascadingActionDetector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -267,7 +266,7 @@ void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) { return; } - handleRowChangeEvent(event, data.getTableId(), data.getRows(), OpenSearchBulkActions.INDEX); + handleRowChangeEvent(event, data.getTableId(), Map.of(OpenSearchBulkActions.INDEX, data.getRows())); } void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) { @@ -281,12 +280,25 @@ void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) { // Check if a cascade action is involved cascadeActionDetector.detectCascadingUpdates(event, parentTableMap, tableMetadataMap.get(data.getTableId())); - // updatedRow contains data before update as key and data after update as value - final List rows = data.getRows().stream() - .map(Map.Entry::getValue) - .collect(Collectors.toList()); + final TableMetadata tableMetadata = tableMetadataMap.get(data.getTableId()); + List rowsToUpdate = new ArrayList<>(); + // Delete is needed when primary key columns have value change + List rowsToDelete = new ArrayList<>(); + + // row map contains data before update as key and data after update as value + for (Map.Entry row : data.getRows()) { + for (int i = 0; i < row.getKey().length; i++) { + if (tableMetadata.getPrimaryKeys().contains(tableMetadata.getColumnNames().get(i)) && + !row.getKey()[i].equals(row.getValue()[i])) { + LOG.debug("Primary keys were updated"); + rowsToDelete.add(row.getKey()); + break; + } + } + rowsToUpdate.add(row.getValue()); + } - handleRowChangeEvent(event, data.getTableId(), rows, OpenSearchBulkActions.INDEX); + handleRowChangeEvent(event, data.getTableId(), Map.of(OpenSearchBulkActions.DELETE, rowsToDelete, OpenSearchBulkActions.INDEX, rowsToUpdate)); } void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { @@ -300,7 +312,7 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { // Check if a cascade action is involved cascadeActionDetector.detectCascadingDeletes(event, parentTableMap, tableMetadataMap.get(data.getTableId())); - handleRowChangeEvent(event, data.getTableId(), data.getRows(), OpenSearchBulkActions.DELETE); + handleRowChangeEvent(event, data.getTableId(), Map.of(OpenSearchBulkActions.DELETE, data.getRows())); } private boolean isValidTableId(long tableId) { @@ -318,9 +330,8 @@ private boolean isValidTableId(long tableId) { } private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event, - long tableId, - List rows, - OpenSearchBulkActions bulkAction) { + long tableId, + Map> bulkActionToRowDataMap) { // Update binlog coordinate after it's first assigned in rotate event handler if (currentBinlogCoordinate != null) { @@ -343,31 +354,36 @@ private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event eve final long eventTimestampMillis = event.getHeader().getTimestamp(); final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); - for (Object[] rowDataArray : rows) { - final Map rowDataMap = new HashMap<>(); - for (int i = 0; i < rowDataArray.length; i++) { - final Map tbColumnDatatypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(tableMetadata.getFullTableName()); - final String columnDataType = tbColumnDatatypeMap.get(columnNames.get(i)); - final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataType), columnNames.get(i), - rowDataArray[i], tableMetadata); - rowDataMap.put(columnNames.get(i), data); - } - final Event dataPrepperEvent = JacksonEvent.builder() - .withEventType(DATA_PREPPER_EVENT_TYPE) - .withData(rowDataMap) - .build(); - - final Event pipelineEvent = recordConverter.convert( - dataPrepperEvent, - tableMetadata.getDatabaseName(), - tableMetadata.getTableName(), - bulkAction, - primaryKeys, - eventTimestampMillis, - eventTimestampMillis, - event.getHeader().getEventType()); - pipelineEvents.add(pipelineEvent); + for (Map.Entry> entry : bulkActionToRowDataMap.entrySet()) { + final OpenSearchBulkActions bulkAction = entry.getKey(); + final List rows = entry.getValue(); + for (Object[] rowDataArray : rows) { + final Map rowDataMap = new HashMap<>(); + for (int i = 0; i < rowDataArray.length; i++) { + final Map tbColumnDatatypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(tableMetadata.getFullTableName()); + final String columnDataType = tbColumnDatatypeMap.get(columnNames.get(i)); + final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataType), columnNames.get(i), + rowDataArray[i], tableMetadata); + rowDataMap.put(columnNames.get(i), data); + } + + final Event dataPrepperEvent = JacksonEvent.builder() + .withEventType(DATA_PREPPER_EVENT_TYPE) + .withData(rowDataMap) + .build(); + + final Event pipelineEvent = recordConverter.convert( + dataPrepperEvent, + tableMetadata.getDatabaseName(), + tableMetadata.getTableName(), + bulkAction, + primaryKeys, + eventTimestampMillis, + eventTimestampMillis, + event.getHeader().getEventType()); + pipelineEvents.add(pipelineEvent); + } } writeToBuffer(bufferAccumulator, acknowledgementSet); From 84d2fe9e8f0da591814454bc09d47e2cfb473def Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Wed, 13 Nov 2024 14:51:31 -0600 Subject: [PATCH 2/3] Preserve delete and index event order Signed-off-by: Hai Yan --- .../rds/stream/BinlogEventListener.java | 82 ++++++++++--------- 1 file changed, 43 insertions(+), 39 deletions(-) diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java index c5b2a5465b..c7935a5420 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -266,7 +266,7 @@ void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) { return; } - handleRowChangeEvent(event, data.getTableId(), Map.of(OpenSearchBulkActions.INDEX, data.getRows())); + handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.INDEX)); } void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) { @@ -281,24 +281,28 @@ void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) { cascadeActionDetector.detectCascadingUpdates(event, parentTableMap, tableMetadataMap.get(data.getTableId())); final TableMetadata tableMetadata = tableMetadataMap.get(data.getTableId()); - List rowsToUpdate = new ArrayList<>(); - // Delete is needed when primary key columns have value change - List rowsToDelete = new ArrayList<>(); + final List bulkActions = new ArrayList<>(); + final List rows = new ArrayList<>(); + for (int rowNum = 0; rowNum < data.getRows().size(); rowNum++) { + // `row` contains data before update as key and data after update as value + Map.Entry row = data.getRows().get(rowNum); - // row map contains data before update as key and data after update as value - for (Map.Entry row : data.getRows()) { for (int i = 0; i < row.getKey().length; i++) { if (tableMetadata.getPrimaryKeys().contains(tableMetadata.getColumnNames().get(i)) && !row.getKey()[i].equals(row.getValue()[i])) { LOG.debug("Primary keys were updated"); - rowsToDelete.add(row.getKey()); + // add delete event for the old row data + rows.add(row.getKey()); + bulkActions.add(OpenSearchBulkActions.DELETE); break; } } - rowsToUpdate.add(row.getValue()); + // add index event for the new row data + rows.add(row.getValue()); + bulkActions.add(OpenSearchBulkActions.INDEX); } - handleRowChangeEvent(event, data.getTableId(), Map.of(OpenSearchBulkActions.DELETE, rowsToDelete, OpenSearchBulkActions.INDEX, rowsToUpdate)); + handleRowChangeEvent(event, data.getTableId(), rows, bulkActions); } void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { @@ -312,7 +316,7 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { // Check if a cascade action is involved cascadeActionDetector.detectCascadingDeletes(event, parentTableMap, tableMetadataMap.get(data.getTableId())); - handleRowChangeEvent(event, data.getTableId(), Map.of(OpenSearchBulkActions.DELETE, data.getRows())); + handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.DELETE)); } private boolean isValidTableId(long tableId) { @@ -331,7 +335,8 @@ private boolean isValidTableId(long tableId) { private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event, long tableId, - Map> bulkActionToRowDataMap) { + List rows, + List bulkActions) { // Update binlog coordinate after it's first assigned in rotate event handler if (currentBinlogCoordinate != null) { @@ -355,35 +360,34 @@ private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event eve final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); - for (Map.Entry> entry : bulkActionToRowDataMap.entrySet()) { - final OpenSearchBulkActions bulkAction = entry.getKey(); - final List rows = entry.getValue(); - for (Object[] rowDataArray : rows) { - final Map rowDataMap = new HashMap<>(); - for (int i = 0; i < rowDataArray.length; i++) { - final Map tbColumnDatatypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(tableMetadata.getFullTableName()); - final String columnDataType = tbColumnDatatypeMap.get(columnNames.get(i)); - final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataType), columnNames.get(i), - rowDataArray[i], tableMetadata); - rowDataMap.put(columnNames.get(i), data); - } - - final Event dataPrepperEvent = JacksonEvent.builder() - .withEventType(DATA_PREPPER_EVENT_TYPE) - .withData(rowDataMap) - .build(); - - final Event pipelineEvent = recordConverter.convert( - dataPrepperEvent, - tableMetadata.getDatabaseName(), - tableMetadata.getTableName(), - bulkAction, - primaryKeys, - eventTimestampMillis, - eventTimestampMillis, - event.getHeader().getEventType()); - pipelineEvents.add(pipelineEvent); + for (int rowNum = 0; rowNum < rows.size(); rowNum++) { + final Object[] rowDataArray = rows.get(rowNum); + final OpenSearchBulkActions bulkAction = bulkActions.get(rowNum); + + final Map rowDataMap = new HashMap<>(); + for (int i = 0; i < rowDataArray.length; i++) { + final Map tbColumnDatatypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(tableMetadata.getFullTableName()); + final String columnDataType = tbColumnDatatypeMap.get(columnNames.get(i)); + final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataType), columnNames.get(i), + rowDataArray[i], tableMetadata); + rowDataMap.put(columnNames.get(i), data); } + + final Event dataPrepperEvent = JacksonEvent.builder() + .withEventType(DATA_PREPPER_EVENT_TYPE) + .withData(rowDataMap) + .build(); + + final Event pipelineEvent = recordConverter.convert( + dataPrepperEvent, + tableMetadata.getDatabaseName(), + tableMetadata.getTableName(), + bulkAction, + primaryKeys, + eventTimestampMillis, + eventTimestampMillis, + event.getHeader().getEventType()); + pipelineEvents.add(pipelineEvent); } writeToBuffer(bufferAccumulator, acknowledgementSet); From e725ce6e892e1eb2d8c6edc7b2bd1c3a6bc3c3ad Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Thu, 14 Nov 2024 12:35:23 -0600 Subject: [PATCH 3/3] Add unit tests Signed-off-by: Hai Yan --- .../rds/stream/BinlogEventListener.java | 12 ++-- .../rds/stream/BinlogEventListenerTest.java | 63 +++++++++++++++++++ 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java index c7935a5420..4491a7c643 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -319,7 +319,8 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.DELETE)); } - private boolean isValidTableId(long tableId) { + // Visible For Testing + boolean isValidTableId(long tableId) { if (!tableMetadataMap.containsKey(tableId)) { LOG.debug("Cannot find table metadata, the event is likely not from a table of interest or the table metadata was not read"); return false; @@ -333,10 +334,11 @@ private boolean isValidTableId(long tableId) { return true; } - private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event, - long tableId, - List rows, - List bulkActions) { + // Visible For Testing + void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event, + long tableId, + List rows, + List bulkActions) { // Update binlog coordinate after it's first assigned in rotate event handler if (currentBinlogCoordinate != null) { diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java index 92738b971e..95035f4501 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java @@ -7,6 +7,7 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; @@ -24,22 +25,32 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import org.opensearch.dataprepper.plugins.source.rds.resync.CascadingActionDetector; import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -152,6 +163,58 @@ void test_given_UpdateRows_event_then_calls_correct_handler(EventType eventType) verify(objectUnderTest).handleUpdateEvent(binlogEvent); } + @ParameterizedTest + @EnumSource(names = {"UPDATE_ROWS", "EXT_UPDATE_ROWS"}) + void test_given_UpdateRows_event_when_primary_key_changes_then_generate_correct_events(EventType eventType) throws NoSuchFieldException, IllegalAccessException { + final UpdateRowsEventData data = mock(UpdateRowsEventData.class); + final Serializable[] oldCol1Data = new Serializable[]{1, "a"}; + final Serializable[] newCol1Data = new Serializable[]{2, "a"}; + final Serializable[] oldCol2Data = new Serializable[]{3, "b"}; + final Serializable[] newCol2Data = new Serializable[]{1, "b"}; + final List> rows = List.of( + Map.entry(oldCol1Data, newCol1Data), + Map.entry(oldCol2Data, newCol2Data) + ); + final long tableId = 1234L; + when(binlogEvent.getHeader().getEventType()).thenReturn(eventType); + when(binlogEvent.getData()).thenReturn(data); + when(data.getTableId()).thenReturn(tableId); + when(objectUnderTest.isValidTableId(tableId)).thenReturn(true); + when(data.getRows()).thenReturn(rows); + + // Set tableMetadataMap reflectively + final TableMetadata tableMetadata = mock(TableMetadata.class); + final Map tableMetadataMap = Map.of(tableId, tableMetadata); + Field tableMetadataMapField = BinlogEventListener.class.getDeclaredField("tableMetadataMap"); + tableMetadataMapField.setAccessible(true); + tableMetadataMapField.set(objectUnderTest, tableMetadataMap); + when(tableMetadata.getPrimaryKeys()).thenReturn(List.of("col1")); + when(tableMetadata.getColumnNames()).thenReturn(List.of("col1", "col2")); + + objectUnderTest.onEvent(binlogEvent); + + verifyHandlerCallHelper(); + verify(objectUnderTest).handleUpdateEvent(binlogEvent); + + // verify rowList and bulkActionList that were sent to handleRowChangeEvent() were correct + ArgumentCaptor> rowListArgumentCaptor = ArgumentCaptor.forClass(List.class); + ArgumentCaptor> bulkActionListArgumentCaptor = ArgumentCaptor.forClass(List.class); + verify(objectUnderTest).handleRowChangeEvent(eq(binlogEvent), eq(tableId), rowListArgumentCaptor.capture(), bulkActionListArgumentCaptor.capture()); + List rowList = rowListArgumentCaptor.getValue(); + List bulkActionList = bulkActionListArgumentCaptor.getValue(); + + assertThat(rowList.size(), is(4)); + assertThat(bulkActionList.size(), is(4)); + assertThat(rowList.get(0), is(oldCol1Data)); + assertThat(bulkActionList.get(0), is(OpenSearchBulkActions.DELETE)); + assertThat(rowList.get(1), is(newCol1Data)); + assertThat(bulkActionList.get(1), is(OpenSearchBulkActions.INDEX)); + assertThat(rowList.get(2), is(oldCol2Data)); + assertThat(bulkActionList.get(2), is(OpenSearchBulkActions.DELETE)); + assertThat(rowList.get(3), is(newCol2Data)); + assertThat(bulkActionList.get(3), is(OpenSearchBulkActions.INDEX)); + } + @ParameterizedTest @EnumSource(names = {"DELETE_ROWS", "EXT_DELETE_ROWS"}) void test_given_DeleteRows_event_then_calls_correct_handler(EventType eventType) {