diff --git a/binlog_streamer.go b/binlog_streamer.go index 3349d9b0..942810d8 100644 --- a/binlog_streamer.go +++ b/binlog_streamer.go @@ -188,6 +188,7 @@ func (s *BinlogStreamer) Run() { isEventPositionResumable := false isEventPositionValid := true + skipEvent := false switch e := ev.Event.(type) { case *replication.RotateEvent: @@ -267,6 +268,31 @@ func (s *BinlogStreamer) Run() { // or the end of the current/next transaction. As such, the query will be // reset following the next RowsQueryEvent before the corresponding RowsEvent(s) query = nil + case *replication.TableMapEvent: + skipEvent = true//we can skip this event + case *replication.GenericEvent: + if ev.Header.Flags == replication.LOG_EVENT_IGNORABLE_F { + skipEvent = true//we can skip this event + } + case *replication.QueryEvent: + // This event is published in binlog when not using ROW binlog_format + // or when MariaDB sees that replica can't handle annotations + // these always contain "BEGIN" or "# Dummy event replacing event type %u that slave cannot handle." + // this means that we can check for # as this is a comment in a query + queryEventQuery := ev.Event.(*replication.QueryEvent).Query + queryString := string(queryEventQuery) + if queryString == "BEGIN" { + skipEvent = true//we can skip this event + } else if queryEventQuery[0] == 35 {//35 is # + skipEvent = true//we can skip this event + } else { + err := fmt.Errorf("failed to handle query event query: %s", queryString) + s.logger.WithError(err).Error("failed to handle query event") + s.ErrorHandler.Fatal("binlog_streamer", err) + } + } + if skipEvent { + continue } if isEventPositionValid {