Skip to content

Commit

Permalink
Added binlog events from MariaDB that can be skipped
Browse files Browse the repository at this point in the history
resolves Shopify#296
  • Loading branch information
HemeraOne committed Jul 8, 2021
1 parent 459b862 commit 577d353
Showing 1 changed file with 26 additions and 0 deletions.
26 changes: 26 additions & 0 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (s *BinlogStreamer) Run() {

isEventPositionResumable := false
isEventPositionValid := true
skipEvent := false

switch e := ev.Event.(type) {
case *replication.RotateEvent:
Expand Down Expand Up @@ -267,6 +268,31 @@ func (s *BinlogStreamer) Run() {
// or the end of the current/next transaction. As such, the query will be
// reset following the next RowsQueryEvent before the corresponding RowsEvent(s)
query = nil
case *replication.TableMapEvent:
skipEvent = true//we can skip this event
case *replication.GenericEvent:
if ev.Header.Flags == replication.LOG_EVENT_IGNORABLE_F {
skipEvent = true//we can skip this event
}
case *replication.QueryEvent:
// This event is published in binlog when not using ROW binlog_format
// or when MariaDB sees that replica can't handle annotations
// these always contain "BEGIN" or "# Dummy event replacing event type %u that slave cannot handle."
// this means that we can check for # as this is a comment in a query
queryEventQuery := ev.Event.(*replication.QueryEvent).Query
queryString := string(queryEventQuery)
if queryString == "BEGIN" {
skipEvent = true//we can skip this event
} else if queryEventQuery[0] == 35 {//35 is #
skipEvent = true//we can skip this event
} else {
err := fmt.Errorf("failed to handle query event query: %s", queryString)
s.logger.WithError(err).Error("failed to handle query event")
s.ErrorHandler.Fatal("binlog_streamer", err)
}
}
if skipEvent {
continue
}

if isEventPositionValid {
Expand Down

0 comments on commit 577d353

Please sign in to comment.