From 853992e30328b3292dbcf018df95fe9bbcbe3a31 Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Tue, 9 Jul 2024 17:45:31 +0200 Subject: [PATCH] Incremental Configuration Updates Previously, the entire configuration stored in the database was synchronized every second. With the growth of configurations in live environments on the horizon, this would simply not scale well. This brings us to incremental updates. By introducing two new columns - "changed_at" as a Unix millisecond timestamp and "deleted" as a boolean - for all tables referenced in the ConfigSet structure, SQL queries can be modified to retrieve only those rows with a more recent timestamp. The "deleted" column became necessary to detect disappearances, since the synchronization now only takes newer items into account. Some additional fields needed to be added to the ConfigSet to track relationships. Even though the codebase served well at the time, there was some code that did almost the same thing as other code, just in different ways. So a huge refactoring was done. This resulted in an internal generic function that handles all synchronization with custom callbacks. The web counterpart is being developed in . Closes #5. --- internal/channel/channel.go | 18 +- .../config/baseconf/incremental_config.go | 45 +++ internal/config/channel.go | 98 ++----- internal/config/contact.go | 97 +++---- internal/config/contact_address.go | 112 -------- internal/config/group.go | 130 +++------ internal/config/incremental_sync.go | 259 +++++++++++++++++ internal/config/rule.go | 267 ++++++------------ internal/config/runtime.go | 113 +++++--- internal/config/schedule.go | 244 ++++++---------- internal/config/source.go | 129 ++------- internal/config/timeperiod.go | 133 ++++----- internal/incident/incident.go | 10 +- internal/incident/incidents_test.go | 9 +- internal/recipient/contact.go | 14 +- internal/recipient/group.go | 36 ++- internal/recipient/rotations.go | 10 +- internal/recipient/rotations_test.go | 8 +- internal/recipient/schedule.go | 17 +- internal/rule/escalation.go | 37 ++- internal/rule/rule.go | 19 +- internal/timeperiod/timeperiod.go | 24 +- schema/pgsql/schema.sql | 99 ++++++- schema/pgsql/upgrades/032.sql | 100 +++++++ 24 files changed, 1099 insertions(+), 929 deletions(-) create mode 100644 internal/config/baseconf/incremental_config.go delete mode 100644 internal/config/contact_address.go create mode 100644 internal/config/incremental_sync.go create mode 100644 schema/pgsql/upgrades/032.sql diff --git a/internal/channel/channel.go b/internal/channel/channel.go index a318ac69f..5fa5658f2 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -4,16 +4,19 @@ import ( "context" "errors" "fmt" + "github.com/icinga/icinga-notifications/internal/config/baseconf" "github.com/icinga/icinga-notifications/internal/contracts" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/pkg/plugin" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "net/url" ) type Channel struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + Name string `db:"name"` Type string `db:"type"` Config string `db:"config" json:"-"` // excluded from JSON config dump as this may contain sensitive information @@ -27,6 +30,19 @@ type Channel struct { pluginCtxCancel func() } +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. +func (c *Channel) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("id", c.ID) + encoder.AddString("name", c.Name) + encoder.AddString("type", c.Type) + return nil +} + +// IncrementalInitAndValidate implements the config.IncrementalConfigurableInitAndValidatable interface. +func (c *Channel) IncrementalInitAndValidate() error { + return ValidateType(c.Type) +} + // newConfig helps to store the channel's updated properties type newConfig struct { ctype string diff --git a/internal/config/baseconf/incremental_config.go b/internal/config/baseconf/incremental_config.go new file mode 100644 index 000000000..18f3a4cc2 --- /dev/null +++ b/internal/config/baseconf/incremental_config.go @@ -0,0 +1,45 @@ +package baseconf + +import ( + "github.com/icinga/icinga-go-library/types" +) + +// IncrementalDbEntry contains the changed_at and deleted columns as struct fields. +// +// This type partially implements config.IncrementalConfigurable with GetChangedAt and IsDeleted. Thus, it can be +// embedded in other types with the _`db:",inline"`_ struct tag. However, most structs might want to embed the +// IncrementalPkDbEntry struct instead. +type IncrementalDbEntry struct { + ChangedAt types.UnixMilli `db:"changed_at"` + Deleted types.Bool `db:"deleted"` +} + +// GetChangedAt returns the changed_at value of this entry from the database. +// +// It is required by the config.IncrementalConfigurable interface. +func (i IncrementalDbEntry) GetChangedAt() types.UnixMilli { + return i.ChangedAt +} + +// IsDeleted indicates if this entry is marked as deleted in the database. +// +// It is required by the config.IncrementalConfigurable interface. +func (i IncrementalDbEntry) IsDeleted() bool { + return i.Deleted.Valid && i.Deleted.Bool +} + +// IncrementalPkDbEntry implements a single primary key named id of a generic type next to IncrementalDbEntry. +// +// This type embeds IncrementalDbEntry and adds a single column/value id field, getting one step closer to implementing +// the config.IncrementalConfigurable interface. Thus, it needs to be embedded with the _`db:",inline"`_ struct tag. +type IncrementalPkDbEntry[PK comparable] struct { + IncrementalDbEntry `db:",inline"` + ID PK `db:"id"` +} + +// GetPrimaryKey returns the id of this entry from the database. +// +// It is required by the config.IncrementalConfigurable interface. +func (i IncrementalPkDbEntry[PK]) GetPrimaryKey() PK { + return i.ID +} diff --git a/internal/config/channel.go b/internal/config/channel.go index 769a919f3..04cc4c63f 100644 --- a/internal/config/channel.go +++ b/internal/config/channel.go @@ -3,84 +3,30 @@ package config import ( "context" "github.com/icinga/icinga-notifications/internal/channel" - "github.com/jmoiron/sqlx" "go.uber.org/zap" ) -func (r *RuntimeConfig) fetchChannels(ctx context.Context, tx *sqlx.Tx) error { - var channelPtr *channel.Channel - stmt := r.db.BuildSelectStmt(channelPtr, channelPtr) - r.logger.Debugf("Executing query %q", stmt) - - var channels []*channel.Channel - if err := tx.SelectContext(ctx, &channels, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - channelsById := make(map[int64]*channel.Channel) - for _, c := range channels { - channelLogger := r.logger.With( - zap.Int64("id", c.ID), - zap.String("name", c.Name), - zap.String("type", c.Type), - ) - if channelsById[c.ID] != nil { - channelLogger.Warnw("ignoring duplicate config for channel type") - } else if err := channel.ValidateType(c.Type); err != nil { - channelLogger.Errorw("Cannot load channel config", zap.Error(err)) - } else { - channelsById[c.ID] = c - - channelLogger.Debugw("loaded channel config") - } - } - - if r.Channels != nil { - // mark no longer existing channels for deletion - for id := range r.Channels { - if _, ok := channelsById[id]; !ok { - channelsById[id] = nil - } - } - } - - r.pending.Channels = channelsById - - return nil -} - +// applyPendingChannels synchronizes changed channels. func (r *RuntimeConfig) applyPendingChannels() { - if r.Channels == nil { - r.Channels = make(map[int64]*channel.Channel) - } - - for id, pendingChannel := range r.pending.Channels { - if pendingChannel == nil { - r.Channels[id].Logger.Info("Channel has been removed") - r.Channels[id].Stop() - - delete(r.Channels, id) - } else if currentChannel := r.Channels[id]; currentChannel != nil { - // Currently, the whole config is reloaded from the database frequently, replacing everything. - // Prevent restarting the plugin processes every time by explicitly checking for config changes. - // The if condition should no longer be necessary when https://github.com/Icinga/icinga-notifications/issues/5 - // is solved properly. - if currentChannel.Type != pendingChannel.Type || currentChannel.Name != pendingChannel.Name || currentChannel.Config != pendingChannel.Config { - currentChannel.Type = pendingChannel.Type - currentChannel.Name = pendingChannel.Name - currentChannel.Config = pendingChannel.Config - - currentChannel.Restart() - } - } else { - pendingChannel.Start(context.TODO(), r.logs.GetChildLogger("channel").With( - zap.Int64("id", pendingChannel.ID), - zap.String("name", pendingChannel.Name))) - - r.Channels[id] = pendingChannel - } - } - - r.pending.Channels = nil + incrementalApplyPending( + r, + &r.Channels, &r.configChange.Channels, + func(newElement *channel.Channel) error { + newElement.Start(context.TODO(), r.logs.GetChildLogger("channel").With( + zap.Int64("id", newElement.ID), + zap.String("name", newElement.Name))) + return nil + }, + func(curElement, update *channel.Channel) error { + curElement.ChangedAt = update.ChangedAt + curElement.Name = update.Name + curElement.Type = update.Type + curElement.Config = update.Config + curElement.Restart() + return nil + }, + func(delElement *channel.Channel) error { + delElement.Stop() + return nil + }) } diff --git a/internal/config/contact.go b/internal/config/contact.go index bb72efad4..9a272aec9 100644 --- a/internal/config/contact.go +++ b/internal/config/contact.go @@ -1,62 +1,57 @@ package config import ( - "context" + "fmt" "github.com/icinga/icinga-notifications/internal/recipient" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" + "slices" ) -func (r *RuntimeConfig) fetchContacts(ctx context.Context, tx *sqlx.Tx) error { - var contactPtr *recipient.Contact - stmt := r.db.BuildSelectStmt(contactPtr, contactPtr) - r.logger.Debugf("Executing query %q", stmt) - - var contacts []*recipient.Contact - if err := tx.SelectContext(ctx, &contacts, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - contactsByID := make(map[int64]*recipient.Contact) - for _, c := range contacts { - contactsByID[c.ID] = c - - r.logger.Debugw("loaded contact config", - zap.Int64("id", c.ID), - zap.String("name", c.FullName)) - } - - if r.Contacts != nil { - // mark no longer existing contacts for deletion - for id := range r.Contacts { - if _, ok := contactsByID[id]; !ok { - contactsByID[id] = nil +// applyPendingContacts synchronizes changed contacts +func (r *RuntimeConfig) applyPendingContacts() { + incrementalApplyPending( + r, + &r.Contacts, &r.configChange.Contacts, + nil, + func(curElement, update *recipient.Contact) error { + curElement.ChangedAt = update.ChangedAt + curElement.FullName = update.FullName + curElement.Username = update.Username + curElement.DefaultChannelID = update.DefaultChannelID + return nil + }, + nil) + + incrementalApplyPending( + r, + &r.ContactAddresses, &r.configChange.ContactAddresses, + func(newElement *recipient.Address) error { + contact, ok := r.Contacts[newElement.ContactID] + if !ok { + return fmt.Errorf("contact address refers unknown contact %d", newElement.ContactID) } - } - } - - r.pending.Contacts = contactsByID - return nil -} - -func (r *RuntimeConfig) applyPendingContacts() { - if r.Contacts == nil { - r.Contacts = make(map[int64]*recipient.Contact) - } + contact.Addresses = append(contact.Addresses, newElement) + return nil + }, + func(curElement, update *recipient.Address) error { + if curElement.ContactID != update.ContactID { + return errRemoveAndAddInstead + } - for id, pendingContact := range r.pending.Contacts { - if pendingContact == nil { - delete(r.Contacts, id) - } else if currentContact := r.Contacts[id]; currentContact != nil { - currentContact.FullName = pendingContact.FullName - currentContact.Username = pendingContact.Username - currentContact.DefaultChannelID = pendingContact.DefaultChannelID - } else { - r.Contacts[id] = pendingContact - } - } + curElement.ChangedAt = update.ChangedAt + curElement.Type = update.Type + curElement.Address = update.Address + return nil + }, + func(delElement *recipient.Address) error { + contact, ok := r.Contacts[delElement.ContactID] + if !ok { + return nil + } - r.pending.Contacts = nil + contact.Addresses = slices.DeleteFunc(contact.Addresses, func(address *recipient.Address) bool { + return address.ID == delElement.ID + }) + return nil + }) } diff --git a/internal/config/contact_address.go b/internal/config/contact_address.go deleted file mode 100644 index f89f82f0a..000000000 --- a/internal/config/contact_address.go +++ /dev/null @@ -1,112 +0,0 @@ -package config - -import ( - "context" - "github.com/icinga/icinga-notifications/internal/recipient" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" - "slices" -) - -func (r *RuntimeConfig) fetchContactAddresses(ctx context.Context, tx *sqlx.Tx) error { - var addressPtr *recipient.Address - stmt := r.db.BuildSelectStmt(addressPtr, addressPtr) - r.logger.Debugf("Executing query %q", stmt) - - var addresses []*recipient.Address - if err := tx.SelectContext(ctx, &addresses, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - addressesById := make(map[int64]*recipient.Address) - for _, a := range addresses { - addressesById[a.ID] = a - r.logger.Debugw("loaded contact_address config", - zap.Int64("id", a.ID), - zap.Int64("contact_id", a.ContactID), - zap.String("type", a.Type), - zap.String("address", a.Address), - ) - } - - if r.ContactAddresses != nil { - // mark no longer existing contacts for deletion - for id := range r.ContactAddresses { - if _, ok := addressesById[id]; !ok { - addressesById[id] = nil - } - } - } - - r.pending.ContactAddresses = addressesById - - return nil -} - -func (r *RuntimeConfig) applyPendingContactAddresses() { - if r.ContactAddresses == nil { - r.ContactAddresses = make(map[int64]*recipient.Address) - } - - for id, pendingAddress := range r.pending.ContactAddresses { - currentAddress := r.ContactAddresses[id] - - if pendingAddress == nil { - r.removeContactAddress(currentAddress) - } else if currentAddress != nil { - r.updateContactAddress(currentAddress, pendingAddress) - } else { - r.addContactAddress(pendingAddress) - } - } - - r.pending.ContactAddresses = nil -} - -func (r *RuntimeConfig) addContactAddress(addr *recipient.Address) { - contact := r.Contacts[addr.ContactID] - if contact != nil { - if i := slices.Index(contact.Addresses, addr); i < 0 { - contact.Addresses = append(contact.Addresses, addr) - - r.logger.Debugw("added new address to contact", - zap.Any("contact", contact), - zap.Any("address", addr)) - } - } - - r.ContactAddresses[addr.ID] = addr -} - -func (r *RuntimeConfig) updateContactAddress(addr, pending *recipient.Address) { - contactChanged := addr.ContactID != pending.ContactID - - if contactChanged { - r.removeContactAddress(addr) - } - - addr.ContactID = pending.ContactID - addr.Type = pending.Type - addr.Address = pending.Address - - if contactChanged { - r.addContactAddress(addr) - } - - r.logger.Debugw("updated contact address", zap.Any("address", addr)) -} - -func (r *RuntimeConfig) removeContactAddress(addr *recipient.Address) { - if contact := r.Contacts[addr.ContactID]; contact != nil { - if i := slices.Index(contact.Addresses, addr); i >= 0 { - contact.Addresses = slices.Delete(contact.Addresses, i, i+1) - - r.logger.Debugw("removed address from contact", - zap.Any("contact", contact), - zap.Any("address", addr)) - } - } - - delete(r.ContactAddresses, addr.ID) -} diff --git a/internal/config/group.go b/internal/config/group.go index 433162aaf..45bab4e75 100644 --- a/internal/config/group.go +++ b/internal/config/group.go @@ -1,100 +1,56 @@ package config import ( - "context" + "fmt" "github.com/icinga/icinga-notifications/internal/recipient" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" + "slices" ) -func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error { - var groupPtr *recipient.Group - stmt := r.db.BuildSelectStmt(groupPtr, groupPtr) - r.logger.Debugf("Executing query %q", stmt) - - var groups []*recipient.Group - if err := tx.SelectContext(ctx, &groups, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - groupsById := make(map[int64]*recipient.Group) - for _, g := range groups { - groupsById[g.ID] = g - - r.logger.Debugw("loaded group config", - zap.Int64("id", g.ID), - zap.String("name", g.Name)) - } - - type ContactgroupMember struct { - GroupId int64 `db:"contactgroup_id"` - ContactId int64 `db:"contact_id"` - } - - var memberPtr *ContactgroupMember - stmt = r.db.BuildSelectStmt(memberPtr, memberPtr) - r.logger.Debugf("Executing query %q", stmt) - - var members []*ContactgroupMember - if err := tx.SelectContext(ctx, &members, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - for _, m := range members { - memberLogger := r.logger.With( - zap.Int64("contact_id", m.ContactId), - zap.Int64("contactgroup_id", m.GroupId), - ) - - if g := groupsById[m.GroupId]; g == nil { - memberLogger.Warnw("ignoring member for unknown contactgroup_id") - } else { - g.MemberIDs = append(g.MemberIDs, m.ContactId) - - memberLogger.Debugw("loaded contact group member", - zap.String("contactgroup_name", g.Name)) - } - } - - if r.Groups != nil { - // mark no longer existing groups for deletion - for id := range r.Groups { - if _, ok := groupsById[id]; !ok { - groupsById[id] = nil - } - } - } - - r.pending.Groups = groupsById - - return nil -} - +// applyPendingGroups synchronizes changed groups. func (r *RuntimeConfig) applyPendingGroups() { - if r.Groups == nil { - r.Groups = make(map[int64]*recipient.Group) - } + incrementalApplyPending( + r, + &r.Groups, &r.configChange.Groups, + nil, + func(curElement, update *recipient.Group) error { + curElement.ChangedAt = update.ChangedAt + curElement.Name = update.Name + return nil + }, + nil) + + incrementalApplyPending( + r, + &r.groupMembers, &r.configChange.groupMembers, + func(newElement *recipient.GroupMember) error { + group, ok := r.Groups[newElement.GroupId] + if !ok { + return fmt.Errorf("group member refers unknown group %d", newElement.GroupId) + } - for id, pendingGroup := range r.pending.Groups { - if pendingGroup == nil { - delete(r.Groups, id) - } else { - pendingGroup.Members = make([]*recipient.Contact, 0, len(pendingGroup.MemberIDs)) - for _, contactID := range pendingGroup.MemberIDs { - if c := r.Contacts[contactID]; c != nil { - pendingGroup.Members = append(pendingGroup.Members, c) - } + contact, ok := r.Contacts[newElement.ContactId] + if !ok { + return fmt.Errorf("group member refers unknown contact %d", newElement.ContactId) } - if currentGroup := r.Groups[id]; currentGroup != nil { - *currentGroup = *pendingGroup - } else { - r.Groups[id] = pendingGroup + group.Members = append(group.Members, contact) + return nil + }, + func(element, update *recipient.GroupMember) error { + // The only fields in GroupMember are - next to ChangedAt and Deleted - GroupId and ContactId. As those two + // fields are the primary key, changing one would result in another primary key, which is technically not an + // update anymore. + return fmt.Errorf("group membership entry cannot change") + }, + func(delElement *recipient.GroupMember) error { + group, ok := r.Groups[delElement.GroupId] + if !ok { + return nil } - } - } - r.pending.Groups = nil + group.Members = slices.DeleteFunc(group.Members, func(contact *recipient.Contact) bool { + return contact.ID == delElement.ContactId + }) + return nil + }) } diff --git a/internal/config/incremental_sync.go b/internal/config/incremental_sync.go new file mode 100644 index 000000000..05711b476 --- /dev/null +++ b/internal/config/incremental_sync.go @@ -0,0 +1,259 @@ +package config + +import ( + "context" + "errors" + "fmt" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/types" + "github.com/jmoiron/sqlx" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "time" +) + +// IncrementalConfigurable specifies Getter methods required for types supporting incremental configuration loading. +type IncrementalConfigurable[PK comparable] interface { + zapcore.ObjectMarshaler + + // GetPrimaryKey returns the primary key value. + GetPrimaryKey() PK + + // GetChangedAt returns the changed_at value. + GetChangedAt() types.UnixMilli + + // IsDeleted returns if this entry was marked as deleted. + IsDeleted() bool +} + +// IncrementalConfigurableInitAndValidatable defines a single method for new and updated elements to allow both +// initialization and validation, to be used within incrementalFetch. +type IncrementalConfigurableInitAndValidatable interface { + // IncrementalInitAndValidate allows both to initialize and validates with an optional error. + // + // If an error is returned, the incrementalFetch function aborts the element in question. + IncrementalInitAndValidate() error +} + +// incrementalFetch queries all recently changed elements of BaseT and stores them in changeConfigSetField. +// +// The RuntimeConfig.configChangeTimestamps map contains the last known timestamp for each BaseT table. Only those +// elements where the changed_at SQL column is greater than the stored timestamp will be fetched and stored in the +// temporary RuntimeConfig.configChange ConfigSet. Later on, incrementalApplyPending merges it into the main ConfigSet. +func incrementalFetch[ + BaseT any, + PK comparable, + T interface { + *BaseT + IncrementalConfigurable[PK] + }, +](ctx context.Context, tx *sqlx.Tx, r *RuntimeConfig, changeConfigSetField *map[PK]T) error { + startTime := time.Now() + + var typePtr T + + tableName := database.TableName(typePtr) + changedAt, hasChangedAt := r.configChangeTimestamps[tableName] + + stmtLogger := r.logger.With(zap.String("table", tableName)) + + stmt := r.db.BuildSelectStmt(typePtr, typePtr) + stmtArgs := []any{} + if hasChangedAt { + stmtLogger = stmtLogger.With(zap.Time("changed_at", changedAt.Time())) + stmt += ` WHERE "changed_at" > ?` + stmtArgs = []any{changedAt} + } + + stmt = r.db.Rebind(stmt + ` ORDER BY "changed_at"`) + stmtLogger = stmtLogger.With(zap.String("query", stmt)) + + var ts []T + if err := tx.SelectContext(ctx, &ts, stmt, stmtArgs...); err != nil { + stmtLogger.Errorw("Cannot execute query to fetch incremental config updates", zap.Error(err)) + return err + } + + *changeConfigSetField = make(map[PK]T) + countDel, countErr, countLoad := 0, 0, 0 + for _, t := range ts { + r.configChangeTimestamps[tableName] = t.GetChangedAt() + + logger := r.logger.With(zap.String("table", tableName), zap.Inline(t)) + + if t.IsDeleted() { + if !hasChangedAt { + // This is a special case for the first synchronization or each run when nothing is stored in the + // database yet. Unfortunately, it is not possible to add a "WHERE "deleted" = 'n'" to the query above + // as newer deleted elements would be skipped in the first run, but being read in a subsequent run. + logger.Debug("Skipping deleted element as no prior configuration is loaded") + continue + } + + countDel++ + logger.Debug("Marking entry as deleted") + (*changeConfigSetField)[t.GetPrimaryKey()] = nil + continue + } + + if t, ok := any(t).(IncrementalConfigurableInitAndValidatable); ok { + err := t.IncrementalInitAndValidate() + if err != nil { + countErr++ + logger.Errorw("Cannot validate entry, skipping element", zap.Error(err)) + continue + } + } + + countLoad++ + logger.Debug("Loaded entry") + (*changeConfigSetField)[t.GetPrimaryKey()] = t + } + + stmtLogger = stmtLogger.With("took", time.Since(startTime)) + if countDel > 0 || countErr > 0 || countLoad > 0 { + stmtLogger.Debugw("Fetched incremental configuration updates", + zap.Int("deleted_elements", countDel), + zap.Int("faulty_elements", countErr), + zap.Int("loaded_elements", countLoad)) + } else { + stmtLogger.Debug("No configuration updates are available") + } + + return nil +} + +// errRemoveAndAddInstead is a special non-error which might be expected from incrementalApplyPending's updateFn to +// signal that the current element should be updated by being deleted through the deleteFn first and added again by the +// createFn hook function. +var errRemoveAndAddInstead = errors.New("re-adding by invoking the deletion function followed by the creation function") + +// incrementalApplyPending merges the incremental change from RuntimeConfig.configChange into the main ConfigSet. +// +// The recently fetched incremental change can be of three different types: +// - Newly created elements. Therefore, the createFn callback function will be called upon it, allowing both further +// initialization and also aborting by returning an error. +// - Changed elements. The updateFn callback function receives the current and the updated element, expecting the +// implementation to synchronize the necessary changes into the current element. This hook is allowed to return an +// error as well. However, it might also return the special errRemoveAndAddInstead, resulting in the old element to +// be deleted first and then re-added, with optional calls to the other two callbacks included. +// - Finally, deleted elements. Additional cleanup might be performed by the deleteFn. +// +// If no specific callback action is necessary, each function can be nil. A nil updateFn results in the same behavior as +// errRemoveAndAddInstead. +func incrementalApplyPending[ + BaseT any, + PK comparable, + T interface { + *BaseT + IncrementalConfigurable[PK] + }, +]( + r *RuntimeConfig, + configSetField, changeConfigSetField *map[PK]T, + createFn func(newElement T) error, + updateFn func(curElement, update T) error, + deleteFn func(delElement T) error, +) { + startTime := time.Now() + tableName := database.TableName(T(nil)) + countErr, countDelSkip, countDel, countUpdate, countNew := 0, 0, 0, 0, 0 + + if *configSetField == nil { + *configSetField = make(map[PK]T) + } + + createAction := func(id PK, newT T) error { + if createFn != nil { + if err := createFn(newT); err != nil { + countErr++ + return fmt.Errorf("creation callback error, %w", err) + } + } + (*configSetField)[id] = newT + countNew++ + return nil + } + + deleteAction := func(id PK, oldT T) error { + defer delete(*configSetField, id) + countDel++ + if deleteFn != nil { + if err := deleteFn(oldT); err != nil { + countErr++ + return fmt.Errorf("deletion callback error, %w", err) + } + } + return nil + } + + for id, newT := range *changeConfigSetField { + oldT, oldExists := (*configSetField)[id] + + logger := r.logger.With( + zap.String("table", tableName), + zap.Any("id", id)) + + if newT == nil && !oldExists { + countDelSkip++ + logger.Warn("Skipping unknown marked as deleted configuration element") + } else if newT == nil { + logger := logger.With(zap.Object("deleted", oldT)) + if err := deleteAction(id, oldT); err != nil { + logger.Errorw("Deleting configuration element failed", zap.Error(err)) + } else { + logger.Debug("Deleted configuration element") + } + } else if oldExists { + logger := logger.With(zap.Object("old", oldT), zap.Object("update", newT)) + reAdd := updateFn == nil + if updateFn != nil { + if err := updateFn(oldT, newT); errors.Is(err, errRemoveAndAddInstead) { + reAdd = true + } else if err != nil { + logger.Errorw("Updating callback failed", zap.Error(err)) + countErr++ + continue + } + } + if reAdd { + logger.Debug("Invoking update by removing and re-adding element") + if err := deleteAction(id, oldT); err != nil { + logger.Errorw("Deleting the old element during re-adding failed", zap.Error(err)) + continue + } + if err := createAction(id, newT); err != nil { + logger.Errorw("Creating the new element during re-adding failed", zap.Error(err)) + continue + } + } + countUpdate++ + logger.Debug("Updated known configuration element") + } else { + logger := logger.With(zap.Object("new", newT)) + if err := createAction(id, newT); err != nil { + logger.Errorw("Creating configuration element failed", zap.Error(err)) + } else { + logger.Debug("Created new configuration element") + } + } + } + + *changeConfigSetField = nil + appliedChanges := countErr > 0 || countDelSkip > 0 || countDel > 0 || countUpdate > 0 || countNew > 0 + + logger := r.logger.With( + zap.String("table", tableName), + zap.Duration("took", time.Since(startTime))) + if appliedChanges { + logger.Infow("Applied configuration updates", + zap.Int("faulty_elements", countErr), + zap.Int("deleted_unknown_elements", countDelSkip), + zap.Int("deleted_elements", countDel), + zap.Int("updated_elements", countUpdate), + zap.Int("new_elements", countNew)) + r.configChangeAvailable = true + } else { + logger.Debug("No configuration updates available to be applied") + } +} diff --git a/internal/config/rule.go b/internal/config/rule.go index 10012cb58..59b69c943 100644 --- a/internal/config/rule.go +++ b/internal/config/rule.go @@ -1,195 +1,114 @@ package config import ( - "context" - "github.com/icinga/icinga-notifications/internal/filter" + "fmt" "github.com/icinga/icinga-notifications/internal/rule" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" "slices" ) -func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { - var rulePtr *rule.Rule - stmt := r.db.BuildSelectStmt(rulePtr, rulePtr) - r.logger.Debugf("Executing query %q", stmt) - - var rules []*rule.Rule - if err := tx.SelectContext(ctx, &rules, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - rulesByID := make(map[int64]*rule.Rule) - for _, ru := range rules { - ruleLogger := r.logger.With(zap.Inline(ru)) - - if ru.ObjectFilterExpr.Valid { - f, err := filter.Parse(ru.ObjectFilterExpr.String) - if err != nil { - ruleLogger.Warnw("ignoring rule as parsing object_filter failed", zap.Error(err)) - continue +// applyPendingRules synchronizes changed rules. +func (r *RuntimeConfig) applyPendingRules() { + incrementalApplyPending( + r, + &r.Rules, &r.configChange.Rules, + func(newElement *rule.Rule) error { + if newElement.TimePeriodID.Valid { + tp, ok := r.TimePeriods[newElement.TimePeriodID.Int64] + if !ok { + return fmt.Errorf("rule refers unknown time period %d", newElement.TimePeriodID.Int64) + } + newElement.TimePeriod = tp } - ru.ObjectFilter = f - } - - ru.Escalations = make(map[int64]*rule.Escalation) - - rulesByID[ru.ID] = ru - ruleLogger.Debugw("loaded rule config") - } - - var escalationPtr *rule.Escalation - stmt = r.db.BuildSelectStmt(escalationPtr, escalationPtr) - r.logger.Debugf("Executing query %q", stmt) - - var escalations []*rule.Escalation - if err := tx.SelectContext(ctx, &escalations, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - escalationsByID := make(map[int64]*rule.Escalation) - for _, escalation := range escalations { - escalationLogger := r.logger.With(zap.Inline(escalation)) - - rule := rulesByID[escalation.RuleID] - if rule == nil { - escalationLogger.Warnw("ignoring escalation for unknown rule_id") - continue - } - - if escalation.ConditionExpr.Valid { - cond, err := filter.Parse(escalation.ConditionExpr.String) - if err != nil { - escalationLogger.Warnw("ignoring escalation, failed to parse condition", zap.Error(err)) - continue + newElement.Escalations = make(map[int64]*rule.Escalation) + return nil + }, + func(curElement, update *rule.Rule) error { + curElement.ChangedAt = update.ChangedAt + curElement.Name = update.Name + + curElement.TimePeriodID = update.TimePeriodID + if curElement.TimePeriodID.Valid { + tp, ok := r.TimePeriods[curElement.TimePeriodID.Int64] + if !ok { + return fmt.Errorf("rule refers unknown time period %d", curElement.TimePeriodID.Int64) + } + curElement.TimePeriod = tp + } else { + curElement.TimePeriod = nil } - escalation.Condition = cond - } - - if escalation.FallbackForID.Valid { - // TODO: implement fallbacks (needs extra validation: mismatching rule_id, cycles) - escalationLogger.Warnw("ignoring fallback escalation (not yet implemented)") - continue - } - - rule.Escalations[escalation.ID] = escalation - escalationsByID[escalation.ID] = escalation - escalationLogger.Debugw("loaded escalation config") - } - - var recipientPtr *rule.EscalationRecipient - stmt = r.db.BuildSelectStmt(recipientPtr, recipientPtr) - r.logger.Debugf("Executing query %q", stmt) - - var recipients []*rule.EscalationRecipient - if err := tx.SelectContext(ctx, &recipients, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - for _, recipient := range recipients { - recipientLogger := r.logger.With( - zap.Int64("id", recipient.ID), - zap.Int64("escalation_id", recipient.EscalationID), - zap.Int64("channel_id", recipient.ChannelID.Int64)) - - escalation := escalationsByID[recipient.EscalationID] - if escalation == nil { - recipientLogger.Warnw("ignoring recipient for unknown escalation") - } else { - escalation.Recipients = append(escalation.Recipients, recipient) - recipientLogger.Debugw("loaded escalation recipient config") - } - } - - if r.Rules != nil { - // mark no longer existing rules for deletion - for id := range r.Rules { - if _, ok := rulesByID[id]; !ok { - rulesByID[id] = nil + // ObjectFilter{,Expr} are being initialized by config.IncrementalConfigurableInitAndValidatable. + curElement.ObjectFilter = update.ObjectFilter + curElement.ObjectFilterExpr = update.ObjectFilterExpr + + return nil + }, + nil) + + incrementalApplyPending( + r, + &r.ruleEscalations, &r.configChange.ruleEscalations, + func(newElement *rule.Escalation) error { + elementRule, ok := r.Rules[newElement.RuleID] + if !ok { + return fmt.Errorf("rule escalation refers unknown rule %d", newElement.RuleID) } - } - } - - r.pending.Rules = rulesByID - - return nil -} - -func (r *RuntimeConfig) applyPendingRules() { - if r.Rules == nil { - r.Rules = make(map[int64]*rule.Rule) - } - - for id, pendingRule := range r.pending.Rules { - if pendingRule == nil { - delete(r.Rules, id) - } else { - ruleLogger := r.logger.With(zap.Inline(pendingRule)) - if pendingRule.TimePeriodID.Valid { - if p := r.TimePeriods[pendingRule.TimePeriodID.Int64]; p == nil { - ruleLogger.Warnw("ignoring rule with unknown timeperiod_id") - continue - } else { - pendingRule.TimePeriod = p - } + elementRule.Escalations[newElement.ID] = newElement + return nil + }, + func(curElement, update *rule.Escalation) error { + if curElement.RuleID != update.RuleID { + return errRemoveAndAddInstead } - for _, escalation := range pendingRule.Escalations { - for i, recipient := range escalation.Recipients { - recipientLogger := r.logger.With( - zap.Int64("id", recipient.ID), - zap.Int64("escalation_id", recipient.EscalationID), - zap.Int64("channel_id", recipient.ChannelID.Int64), - zap.Inline(recipient.Key)) - - if recipient.ContactID.Valid { - id := recipient.ContactID.Int64 - if c := r.Contacts[id]; c != nil { - recipient.Recipient = c - } else { - recipientLogger.Warnw("ignoring unknown escalation recipient") - escalation.Recipients[i] = nil - } - } else if recipient.GroupID.Valid { - id := recipient.GroupID.Int64 - if g := r.Groups[id]; g != nil { - recipient.Recipient = g - } else { - recipientLogger.Warnw("ignoring unknown escalation recipient") - escalation.Recipients[i] = nil - } - } else if recipient.ScheduleID.Valid { - id := recipient.ScheduleID.Int64 - if s := r.Schedules[id]; s != nil { - recipient.Recipient = s - } else { - recipientLogger.Warnw("ignoring unknown escalation recipient") - escalation.Recipients[i] = nil - } - } else { - recipientLogger.Warnw("ignoring unknown escalation recipient") - escalation.Recipients[i] = nil - } - } + curElement.ChangedAt = update.ChangedAt + curElement.NameRaw = update.NameRaw + // Condition{,Expr} are being initialized by config.IncrementalConfigurableInitAndValidatable. + curElement.Condition = update.Condition + curElement.ConditionExpr = update.ConditionExpr + // TODO: synchronize Fallback{ForID,s} when implemented + + return nil + }, + func(delElement *rule.Escalation) error { + elementRule, ok := r.Rules[delElement.RuleID] + if !ok { + return nil + } - escalation.Recipients = slices.DeleteFunc(escalation.Recipients, func(r *rule.EscalationRecipient) bool { - return r == nil - }) + delete(elementRule.Escalations, delElement.ID) + return nil + }) + + incrementalApplyPending( + r, + &r.ruleEscalationRecipients, &r.configChange.ruleEscalationRecipients, + func(newElement *rule.EscalationRecipient) error { + newElement.Recipient = r.GetRecipient(newElement.Key) + if newElement.Recipient == nil { + return fmt.Errorf("rule escalation recipient is missing or unknown") } - if currentRule := r.Rules[id]; currentRule != nil { - *currentRule = *pendingRule - } else { - r.Rules[id] = pendingRule + escalation := r.GetRuleEscalation(newElement.EscalationID) + if escalation == nil { + return fmt.Errorf("rule escalation recipient refers to unknown escalation %d", newElement.EscalationID) + } + escalation.Recipients = append(escalation.Recipients, newElement) + + return nil + }, + nil, + func(delElement *rule.EscalationRecipient) error { + escalation := r.GetRuleEscalation(delElement.EscalationID) + if escalation == nil { + return nil } - } - } - r.pending.Rules = nil + escalation.Recipients = slices.DeleteFunc(escalation.Recipients, func(recipient *rule.EscalationRecipient) bool { + return recipient.EscalationID == delElement.EscalationID + }) + return nil + }) } diff --git a/internal/config/runtime.go b/internal/config/runtime.go index 69b38188d..8e483788a 100644 --- a/internal/config/runtime.go +++ b/internal/config/runtime.go @@ -6,11 +6,11 @@ import ( "errors" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/channel" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/timeperiod" - "github.com/jmoiron/sqlx" "go.uber.org/zap" "golang.org/x/crypto/bcrypt" "strconv" @@ -29,8 +29,13 @@ type RuntimeConfig struct { // This became necessary due to circular imports, either with the incident or icinga2 package. EventStreamLaunchFunc func(source *Source) - // pending contains changes to config objects that are to be applied to the embedded live config. - pending ConfigSet + // configChange contains incremental changes to config objects to be merged into the live configuration. + // + // It will be both created and deleted within RuntimeConfig.UpdateFromDatabase. To keep track of the known state, + // the last known timestamp of each ConfigSet type is stored within configChangeTimestamps. + configChange *ConfigSet + configChangeAvailable bool + configChangeTimestamps map[string]types.UnixMilli logs *logging.Logging logger *logging.Logger @@ -48,6 +53,8 @@ func NewRuntimeConfig( return &RuntimeConfig{ EventStreamLaunchFunc: esLaunch, + configChangeTimestamps: make(map[string]types.UnixMilli), + logs: logs, logger: logs.GetChildLogger("runtime-updates"), db: db, @@ -63,19 +70,39 @@ type ConfigSet struct { Schedules map[int64]*recipient.Schedule Rules map[int64]*rule.Rule Sources map[int64]*Source + + // The following fields contain intermediate values, necessary for the incremental config synchronization. + // Furthermore, they allow accessing intermediate tables as everything is referred by pointers. + groupMembers map[recipient.GroupMemberKey]*recipient.GroupMember + timePeriodEntries map[int64]*timeperiod.Entry + scheduleRotations map[int64]*recipient.Rotation + scheduleRotationMembers map[int64]*recipient.RotationMember + ruleEscalations map[int64]*rule.Escalation + ruleEscalationRecipients map[int64]*rule.EscalationRecipient } func (r *RuntimeConfig) UpdateFromDatabase(ctx context.Context) error { - err := r.fetchFromDatabase(ctx) - if err != nil { + startTime := time.Now() + defer func() { + r.logger.Debugw("Finished configuration synchronization", zap.Duration("took", time.Since(startTime))) + }() + + r.logger.Debug("Synchronizing configuration with database") + + r.configChange = &ConfigSet{} + r.configChangeAvailable = false + defer func() { r.configChange = nil }() + + if err := r.fetchFromDatabase(ctx); err != nil { return err } r.applyPending() - - err = r.debugVerify() - if err != nil { - panic(err) + if r.configChangeAvailable { + r.logger.Debug("Synchronizing applied configuration changes, verifying state") + if err := r.debugVerify(); err != nil { + r.logger.Panicw("Newly synchronized configuration failed verification", zap.Error(err)) + } } return nil @@ -88,10 +115,8 @@ func (r *RuntimeConfig) PeriodicUpdates(ctx context.Context, interval time.Durat for { select { case <-ticker.C: - r.logger.Debug("periodically updating config") - err := r.UpdateFromDatabase(ctx) - if err != nil { - r.logger.Errorw("periodic config update failed, continuing with previous config", zap.Error(err)) + if err := r.UpdateFromDatabase(ctx); err != nil { + r.logger.Errorw("Periodic configuration synchronization failed", zap.Error(err)) } case <-ctx.Done(): return @@ -201,12 +226,6 @@ func (r *RuntimeConfig) GetSourceFromCredentials(user, pass string, logger *logg } func (r *RuntimeConfig) fetchFromDatabase(ctx context.Context) error { - r.logger.Debug("fetching configuration from database") - start := time.Now() - - // Reset all pending state to start from a clean state. - r.pending = ConfigSet{} - tx, err := r.db.BeginTxx(ctx, &sql.TxOptions{ Isolation: sql.LevelRepeatableRead, ReadOnly: true, @@ -217,42 +236,46 @@ func (r *RuntimeConfig) fetchFromDatabase(ctx context.Context) error { // The transaction is only used for reading, never has to be committed. defer func() { _ = tx.Rollback() }() - updateFuncs := []func(ctx context.Context, tx *sqlx.Tx) error{ - r.fetchChannels, - r.fetchContacts, - r.fetchContactAddresses, - r.fetchGroups, - r.fetchTimePeriods, - r.fetchSchedules, - r.fetchRules, - r.fetchSources, + fetchFns := []func() error{ + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Channels) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Contacts) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.ContactAddresses) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Groups) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.groupMembers) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Schedules) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.scheduleRotations) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.scheduleRotationMembers) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.TimePeriods) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.timePeriodEntries) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Rules) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.ruleEscalations) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.ruleEscalationRecipients) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Sources) }, } - for _, f := range updateFuncs { - if err := f(ctx, tx); err != nil { + for _, f := range fetchFns { + if err := f(); err != nil { return err } } - r.logger.Debugw("fetched configuration from database", zap.Duration("took", time.Since(start))) - return nil } +// applyPending synchronizes all changes. func (r *RuntimeConfig) applyPending() { r.mu.Lock() defer r.mu.Unlock() - r.logger.Debug("applying pending configuration") - start := time.Now() - - r.applyPendingChannels() - r.applyPendingContacts() - r.applyPendingContactAddresses() - r.applyPendingGroups() - r.applyPendingTimePeriods() - r.applyPendingSchedules() - r.applyPendingRules() - r.applyPendingSources() - - r.logger.Debugw("applied pending configuration", zap.Duration("took", time.Since(start))) + applyFns := []func(){ + r.applyPendingChannels, + r.applyPendingContacts, + r.applyPendingGroups, + r.applyPendingSchedules, + r.applyPendingTimePeriods, + r.applyPendingRules, + r.applyPendingSources, + } + for _, f := range applyFns { + f() + } } diff --git a/internal/config/schedule.go b/internal/config/schedule.go index af3c7557b..a1fb7db58 100644 --- a/internal/config/schedule.go +++ b/internal/config/schedule.go @@ -1,172 +1,114 @@ package config import ( - "context" + "fmt" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/timeperiod" - "github.com/jmoiron/sqlx" "go.uber.org/zap" + "slices" ) -func (r *RuntimeConfig) fetchSchedules(ctx context.Context, tx *sqlx.Tx) error { - var schedulePtr *recipient.Schedule - stmt := r.db.BuildSelectStmt(schedulePtr, schedulePtr) - r.logger.Debugf("Executing query %q", stmt) - - var schedules []*recipient.Schedule - if err := tx.SelectContext(ctx, &schedules, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - schedulesById := make(map[int64]*recipient.Schedule) - for _, g := range schedules { - schedulesById[g.ID] = g - - r.logger.Debugw("loaded schedule config", - zap.Int64("id", g.ID), - zap.String("name", g.Name)) - } - - var rotationPtr *recipient.Rotation - stmt = r.db.BuildSelectStmt(rotationPtr, rotationPtr) - r.logger.Debugf("Executing query %q", stmt) - - var rotations []*recipient.Rotation - if err := tx.SelectContext(ctx, &rotations, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - rotationsById := make(map[int64]*recipient.Rotation) - for _, rotation := range rotations { - rotationLogger := r.logger.With(zap.Object("rotation", rotation)) - - if schedule := schedulesById[rotation.ScheduleID]; schedule == nil { - rotationLogger.Warnw("ignoring schedule rotation for unknown schedule_id") - } else { - rotationsById[rotation.ID] = rotation - schedule.Rotations = append(schedule.Rotations, rotation) - - rotationLogger.Debugw("loaded schedule rotation") - } - } - - var rotationMemberPtr *recipient.RotationMember - stmt = r.db.BuildSelectStmt(rotationMemberPtr, rotationMemberPtr) - r.logger.Debugf("Executing query %q", stmt) - - var members []*recipient.RotationMember - if err := tx.SelectContext(ctx, &members, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - rotationMembersById := make(map[int64]*recipient.RotationMember) - for _, member := range members { - memberLogger := r.logger.With(zap.Object("rotation_member", member)) - - if rotation := rotationsById[member.RotationID]; rotation == nil { - memberLogger.Warnw("ignoring rotation member for unknown rotation_member_id") - } else { - member.TimePeriodEntries = make(map[int64]*timeperiod.Entry) - rotation.Members = append(rotation.Members, member) - rotationMembersById[member.ID] = member - - memberLogger.Debugw("loaded schedule rotation member") - } - } - - var entryPtr *timeperiod.Entry - stmt = r.db.BuildSelectStmt(entryPtr, entryPtr) + " WHERE rotation_member_id IS NOT NULL" - r.logger.Debugf("Executing query %q", stmt) - - var entries []*timeperiod.Entry - if err := tx.SelectContext(ctx, &entries, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - for _, entry := range entries { - var member *recipient.RotationMember - if entry.RotationMemberID.Valid { - member = rotationMembersById[entry.RotationMemberID.Int64] - } - - if member == nil { - r.logger.Warnw("ignoring entry for unknown rotation_member_id", - zap.Int64("timeperiod_entry_id", entry.ID), - zap.Int64("timeperiod_id", entry.TimePeriodID)) - continue - } - - err := entry.Init() - if err != nil { - r.logger.Warnw("ignoring time period entry", - zap.Object("entry", entry), - zap.Error(err)) - continue - } - - member.TimePeriodEntries[entry.ID] = entry - } - - for _, schedule := range schedulesById { - schedule.RefreshRotations() - } - - if r.Schedules != nil { - // mark no longer existing schedules for deletion - for id := range r.Schedules { - if _, ok := schedulesById[id]; !ok { - schedulesById[id] = nil +// applyPendingSchedules synchronizes changed schedules. +func (r *RuntimeConfig) applyPendingSchedules() { + // Set of schedules (by id) which Rotation was altered and where RefreshRotations must be called. + updatedScheduleIds := make(map[int64]struct{}) + + incrementalApplyPending( + r, + &r.Schedules, &r.configChange.Schedules, + nil, + func(curElement, update *recipient.Schedule) error { + curElement.ChangedAt = update.ChangedAt + curElement.Name = update.Name + return nil + }, + nil) + + incrementalApplyPending( + r, + &r.scheduleRotations, &r.configChange.scheduleRotations, + func(newElement *recipient.Rotation) error { + schedule, ok := r.Schedules[newElement.ScheduleID] + if !ok { + return fmt.Errorf("rotation refers to unknown schedule %d", newElement.ScheduleID) } - } - } - r.pending.Schedules = schedulesById + schedule.Rotations = append(schedule.Rotations, newElement) + updatedScheduleIds[schedule.ID] = struct{}{} + return nil + }, + func(curElement, update *recipient.Rotation) error { + if curElement.ScheduleID != update.ScheduleID { + return errRemoveAndAddInstead + } - return nil -} + curElement.ChangedAt = update.ChangedAt + curElement.ActualHandoff = update.ActualHandoff + curElement.Priority = update.Priority + curElement.Name = update.Name + + updatedScheduleIds[curElement.ScheduleID] = struct{}{} + return nil + }, + func(delElement *recipient.Rotation) error { + schedule, ok := r.Schedules[delElement.ScheduleID] + if !ok { + return nil + } -func (r *RuntimeConfig) applyPendingSchedules() { - if r.Schedules == nil { - r.Schedules = make(map[int64]*recipient.Schedule) - } + schedule.Rotations = slices.DeleteFunc(schedule.Rotations, func(rotation *recipient.Rotation) bool { + return rotation.ID == delElement.ID + }) + updatedScheduleIds[schedule.ID] = struct{}{} + return nil + }) + + incrementalApplyPending( + r, + &r.scheduleRotationMembers, &r.configChange.scheduleRotationMembers, + func(newElement *recipient.RotationMember) error { + rotation, ok := r.scheduleRotations[newElement.RotationID] + if !ok { + return fmt.Errorf("schedule rotation member refers unknown rotation %d", newElement.RotationID) + } - for id, pendingSchedule := range r.pending.Schedules { - if pendingSchedule == nil { - delete(r.Schedules, id) - } else { - for _, rotation := range pendingSchedule.Rotations { - for _, member := range rotation.Members { - memberLogger := r.logger.With( - zap.Object("rotation", rotation), - zap.Object("rotation_member", member)) + rotation.Members = append(rotation.Members, newElement) + updatedScheduleIds[rotation.ScheduleID] = struct{}{} - if member.ContactID.Valid { - member.Contact = r.Contacts[member.ContactID.Int64] - if member.Contact == nil { - memberLogger.Warnw("rotation member has an unknown contact_id") - } - } + if newElement.ContactID.Valid { + newElement.Contact, ok = r.Contacts[newElement.ContactID.Int64] + if !ok { + return fmt.Errorf("schedule rotation member refers unknown contact %d", newElement.ContactID.Int64) + } + } - if member.ContactGroupID.Valid { - member.ContactGroup = r.Groups[member.ContactGroupID.Int64] - if member.ContactGroup == nil { - memberLogger.Warnw("rotation member has an unknown contactgroup_id") - } - } + if newElement.ContactGroupID.Valid { + newElement.ContactGroup, ok = r.Groups[newElement.ContactGroupID.Int64] + if !ok { + return fmt.Errorf("schedule rotation member refers unknown contact group %d", newElement.ContactGroupID.Int64) } } - if currentSchedule := r.Schedules[id]; currentSchedule != nil { - *currentSchedule = *pendingSchedule - } else { - r.Schedules[id] = pendingSchedule + newElement.TimePeriodEntries = make(map[int64]*timeperiod.Entry) + return nil + }, + nil, + func(delElement *recipient.RotationMember) error { + rotation, ok := r.scheduleRotations[delElement.RotationID] + if !ok { + return nil } - } - } - r.pending.Schedules = nil + rotation.Members = slices.DeleteFunc(rotation.Members, func(member *recipient.RotationMember) bool { + return member.ID == delElement.ID + }) + updatedScheduleIds[rotation.ScheduleID] = struct{}{} + return nil + }) + + for id := range updatedScheduleIds { + schedule := r.Schedules[id] + r.logger.Debugw("Refreshing schedule rotations", zap.Inline(schedule)) + schedule.RefreshRotations() + } } diff --git a/internal/config/source.go b/internal/config/source.go index df986859f..affb9620b 100644 --- a/internal/config/source.go +++ b/internal/config/source.go @@ -3,8 +3,8 @@ package config import ( "context" "github.com/icinga/icinga-go-library/types" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" + "github.com/icinga/icinga-notifications/internal/config/baseconf" + "go.uber.org/zap/zapcore" ) // SourceTypeIcinga2 represents the "icinga2" Source Type for Event Stream API sources. @@ -12,7 +12,8 @@ const SourceTypeIcinga2 = "icinga2" // Source entry within the ConfigSet to describe a source. type Source struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + Type string `db:"type"` Name string `db:"name"` @@ -29,109 +30,31 @@ type Source struct { Icinga2SourceCancel context.CancelFunc `db:"-" json:"-"` } -// fieldEquals checks if this Source's database fields are equal to those of another Source. -func (source *Source) fieldEquals(other *Source) bool { - boolEq := func(a, b types.Bool) bool { return (!a.Valid && !b.Valid) || (a == b) } - stringEq := func(a, b types.String) bool { return (!a.Valid && !b.Valid) || (a == b) } - - return source.ID == other.ID && - source.Type == other.Type && - source.Name == other.Name && - stringEq(source.ListenerPasswordHash, other.ListenerPasswordHash) && - stringEq(source.Icinga2BaseURL, other.Icinga2BaseURL) && - stringEq(source.Icinga2AuthUser, other.Icinga2AuthUser) && - stringEq(source.Icinga2AuthPass, other.Icinga2AuthPass) && - stringEq(source.Icinga2CAPem, other.Icinga2CAPem) && - stringEq(source.Icinga2CommonName, other.Icinga2CommonName) && - boolEq(source.Icinga2InsecureTLS, other.Icinga2InsecureTLS) -} - -// stop this Source's worker; currently only Icinga Event Stream API Client. -func (source *Source) stop() { - if source.Type == SourceTypeIcinga2 && source.Icinga2SourceCancel != nil { - source.Icinga2SourceCancel() - source.Icinga2SourceCancel = nil - } -} - -func (r *RuntimeConfig) fetchSources(ctx context.Context, tx *sqlx.Tx) error { - var sourcePtr *Source - stmt := r.db.BuildSelectStmt(sourcePtr, sourcePtr) - r.logger.Debugf("Executing query %q", stmt) - - var sources []*Source - if err := tx.SelectContext(ctx, &sources, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - sourcesById := make(map[int64]*Source) - for _, s := range sources { - sourceLogger := r.logger.With( - zap.Int64("id", s.ID), - zap.String("name", s.Name), - zap.String("type", s.Type), - ) - if sourcesById[s.ID] != nil { - sourceLogger.Error("Ignoring duplicate config for source ID") - continue - } - - sourcesById[s.ID] = s - sourceLogger.Debug("loaded source config") - } - - if r.Sources != nil { - // mark no longer existing sources for deletion - for id := range r.Sources { - if _, ok := sourcesById[id]; !ok { - sourcesById[id] = nil - } - } - } - - r.pending.Sources = sourcesById - +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. +func (source *Source) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("id", source.ID) + encoder.AddString("type", source.Type) + encoder.AddString("name", source.Name) return nil } +// applyPendingSources synchronizes changed sources. func (r *RuntimeConfig) applyPendingSources() { - if r.Sources == nil { - r.Sources = make(map[int64]*Source) - } - - for id, pendingSource := range r.pending.Sources { - logger := r.logger.With(zap.Int64("id", id)) - currentSource := r.Sources[id] - - // Compare the pending source with an optional existing source; instruct the Event Source Client, if necessary. - if pendingSource == nil && currentSource != nil { - logger.Info("Source has been removed") - - currentSource.stop() - delete(r.Sources, id) - continue - } else if pendingSource != nil && currentSource != nil { - if currentSource.fieldEquals(pendingSource) { - continue + incrementalApplyPending( + r, + &r.Sources, &r.configChange.Sources, + func(newElement *Source) error { + if newElement.Type == SourceTypeIcinga2 { + r.EventStreamLaunchFunc(newElement) } - - logger.Info("Source has been updated") - currentSource.stop() - } else if pendingSource != nil && currentSource == nil { - logger.Info("Source has been added") - } else { - // Neither an active nor a pending source? - logger.Error("Cannot applying pending configuration: neither an active nor a pending source") - continue - } - - if pendingSource.Type == SourceTypeIcinga2 { - r.EventStreamLaunchFunc(pendingSource) - } - - r.Sources[id] = pendingSource - } - - r.pending.Sources = nil + return nil + }, + nil, + func(delElement *Source) error { + if delElement.Type == SourceTypeIcinga2 && delElement.Icinga2SourceCancel != nil { + delElement.Icinga2SourceCancel() + delElement.Icinga2SourceCancel = nil + } + return nil + }) } diff --git a/internal/config/timeperiod.go b/internal/config/timeperiod.go index 9263c52d4..411467a18 100644 --- a/internal/config/timeperiod.go +++ b/internal/config/timeperiod.go @@ -1,100 +1,63 @@ package config import ( - "context" "fmt" "github.com/icinga/icinga-notifications/internal/timeperiod" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" + "slices" ) -func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error { - var timePeriodPtr *timeperiod.TimePeriod - stmt := r.db.BuildSelectStmt(timePeriodPtr, timePeriodPtr) - r.logger.Debugf("Executing query %q", stmt) - - var timePeriods []*timeperiod.TimePeriod - if err := tx.SelectContext(ctx, &timePeriods, stmt); err != nil { - r.logger.Errorln(err) - return err - } - timePeriodsById := make(map[int64]*timeperiod.TimePeriod) - for _, period := range timePeriods { - timePeriodsById[period.ID] = period - } - - var entryPtr *timeperiod.Entry - stmt = r.db.BuildSelectStmt(entryPtr, entryPtr) - r.logger.Debugf("Executing query %q", stmt) - - var entries []*timeperiod.Entry - if err := tx.SelectContext(ctx, &entries, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - for _, entry := range entries { - p := timePeriodsById[entry.TimePeriodID] - if p == nil { - r.logger.Warnw("ignoring entry for unknown timeperiod_id", - zap.Int64("timeperiod_entry_id", entry.ID), - zap.Int64("timeperiod_id", entry.TimePeriodID)) - continue - } - - if p.Name == "" { - p.Name = fmt.Sprintf("Time Period #%d", entry.TimePeriodID) - } - - err := entry.Init() - if err != nil { - r.logger.Warnw("ignoring time period entry", - zap.Object("entry", entry), - zap.Error(err)) - continue - } - - p.Entries = append(p.Entries, entry) +// applyPendingTimePeriods synchronizes changed time periods. +func (r *RuntimeConfig) applyPendingTimePeriods() { + incrementalApplyPending( + r, + &r.TimePeriods, &r.configChange.TimePeriods, + nil, + func(curElement, update *timeperiod.TimePeriod) error { + curElement.ChangedAt = update.ChangedAt + curElement.Name = update.Name + return nil + }, + nil) + + incrementalApplyPending( + r, + &r.timePeriodEntries, &r.configChange.timePeriodEntries, + func(newElement *timeperiod.Entry) error { + period, ok := r.TimePeriods[newElement.TimePeriodID] + if !ok { + return fmt.Errorf("time period entry refers unknown time period %d", newElement.TimePeriodID) + } - r.logger.Debugw("loaded time period entry", - zap.Object("timeperiod", p), - zap.Object("entry", entry)) - } + period.Entries = append(period.Entries, newElement) - for _, p := range timePeriodsById { - if p.Name == "" { - p.Name = fmt.Sprintf("Time Period #%d (empty)", p.ID) - } - } + // rotation_member_id is nullable for future standalone timeperiods + if newElement.RotationMemberID.Valid { + rotationMember, ok := r.scheduleRotationMembers[newElement.RotationMemberID.Int64] + if !ok { + return fmt.Errorf("time period entry refers unknown rotation member %d", newElement.RotationMemberID.Int64) + } - if r.TimePeriods != nil { - // mark no longer existing time periods for deletion - for id := range r.TimePeriods { - if _, ok := timePeriodsById[id]; !ok { - timePeriodsById[id] = nil + rotationMember.TimePeriodEntries[newElement.ID] = newElement } - } - } - - r.pending.TimePeriods = timePeriodsById - - return nil -} -func (r *RuntimeConfig) applyPendingTimePeriods() { - if r.TimePeriods == nil { - r.TimePeriods = make(map[int64]*timeperiod.TimePeriod) - } + return nil + }, + nil, + func(delElement *timeperiod.Entry) error { + period, ok := r.TimePeriods[delElement.TimePeriodID] + if ok { + period.Entries = slices.DeleteFunc(period.Entries, func(entry *timeperiod.Entry) bool { + return entry.ID == delElement.ID + }) + } - for id, pendingTimePeriod := range r.pending.TimePeriods { - if pendingTimePeriod == nil { - delete(r.TimePeriods, id) - } else if currentTimePeriod := r.TimePeriods[id]; currentTimePeriod != nil { - *currentTimePeriod = *pendingTimePeriod - } else { - r.TimePeriods[id] = pendingTimePeriod - } - } + if delElement.RotationMemberID.Valid { + rotationMember, ok := r.scheduleRotationMembers[delElement.RotationMemberID.Int64] + if ok { + delete(rotationMember.TimePeriodEntries, delElement.ID) + } + } - r.pending.TimePeriods = nil + return nil + }) } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index ddad59956..83c90827f 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -410,10 +410,6 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 } for _, r := range i.runtimeConfig.Rules { - if !r.IsActive.Valid || !r.IsActive.Bool { - continue - } - if _, ok := i.Rules[r.ID]; !ok { matched, err := r.Eval(i.Object) if err != nil { @@ -474,8 +470,10 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, for rID := range i.Rules { r := i.runtimeConfig.Rules[rID] - - if r == nil || !r.IsActive.Valid || !r.IsActive.Bool { + if r == nil { + i.logger.Warnw("Incident refers unknown rule", + zap.String("incident", i.String()), + zap.Int64("rule_id", rID)) continue } diff --git a/internal/incident/incidents_test.go b/internal/incident/incidents_test.go index b4b35c0de..2cf4c0e07 100644 --- a/internal/incident/incidents_test.go +++ b/internal/incident/incidents_test.go @@ -23,7 +23,14 @@ func TestLoadOpenIncidents(t *testing.T) { db := testutils.GetTestDB(ctx, t) // Insert a dummy source for our test cases! - source := config.Source{Type: "notifications", Name: "Icinga Notifications", Icinga2InsecureTLS: types.Bool{Bool: false, Valid: true}} + source := config.Source{ + Type: "notifications", + Name: "Icinga Notifications", + Icinga2InsecureTLS: types.Bool{Bool: false, Valid: true}, + } + source.ChangedAt = types.UnixMilli(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)) + source.Deleted = types.Bool{Bool: false, Valid: true} + err := utils.RunInTx(ctx, db, func(tx *sqlx.Tx) error { id, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, source, "id"), source) require.NoError(t, err, "populating source table should not fail") diff --git a/internal/recipient/contact.go b/internal/recipient/contact.go index 82732f1f6..bf85810e9 100644 --- a/internal/recipient/contact.go +++ b/internal/recipient/contact.go @@ -2,12 +2,14 @@ package recipient import ( "database/sql" + "github.com/icinga/icinga-notifications/internal/config/baseconf" "go.uber.org/zap/zapcore" "time" ) type Contact struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + FullName string `db:"full_name"` Username sql.NullString `db:"username"` DefaultChannelID int64 `db:"default_channel_id"` @@ -33,12 +35,20 @@ func (c *Contact) MarshalLogObject(encoder zapcore.ObjectEncoder) error { var _ Recipient = (*Contact)(nil) type Address struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + ContactID int64 `db:"contact_id"` Type string `db:"type"` Address string `db:"address"` } +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. +func (a *Address) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("id", a.ID) + encoder.AddInt64("contact_id", a.ContactID) + return nil +} + func (a *Address) TableName() string { return "contact_address" } diff --git a/internal/recipient/group.go b/internal/recipient/group.go index 243dde7b9..ea355d863 100644 --- a/internal/recipient/group.go +++ b/internal/recipient/group.go @@ -1,15 +1,16 @@ package recipient import ( + "github.com/icinga/icinga-notifications/internal/config/baseconf" "go.uber.org/zap/zapcore" "time" ) type Group struct { - ID int64 `db:"id"` - Name string `db:"name"` - Members []*Contact `db:"-"` - MemberIDs []int64 `db:"-"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + + Name string `db:"name"` + Members []*Contact `db:"-"` } func (g *Group) GetContactsAt(t time.Time) []*Contact { @@ -32,4 +33,31 @@ func (g *Group) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return nil } +// GroupMemberKey represents the combined primary key of GroupMember. +type GroupMemberKey struct { + GroupId int64 `db:"contactgroup_id"` + ContactId int64 `db:"contact_id"` +} + +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. +func (g *GroupMemberKey) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("contactgroup_id", g.GroupId) + encoder.AddInt64("contact_id", g.ContactId) + return nil +} + +type GroupMember struct { + GroupMemberKey `db:",inline"` + baseconf.IncrementalDbEntry `db:",inline"` +} + +func (g *GroupMember) TableName() string { + return "contactgroup_member" +} + +// GetPrimaryKey is required by the config.IncrementalConfigurable interface. +func (g *GroupMember) GetPrimaryKey() GroupMemberKey { + return g.GroupMemberKey +} + var _ Recipient = (*Group)(nil) diff --git a/internal/recipient/rotations.go b/internal/recipient/rotations.go index ea28da6f7..b2e684a72 100644 --- a/internal/recipient/rotations.go +++ b/internal/recipient/rotations.go @@ -26,12 +26,16 @@ func (r *rotationResolver) update(rotations []*Rotation) { // Group sortedByHandoff by priority using a temporary map with the priority as key. prioMap := make(map[int32]*rotationsWithPriority) for _, rotation := range rotations { - p := prioMap[rotation.Priority] + if !rotation.Priority.Valid { + continue + } + + p := prioMap[rotation.Priority.Int32] if p == nil { p = &rotationsWithPriority{ - priority: rotation.Priority, + priority: rotation.Priority.Int32, } - prioMap[rotation.Priority] = p + prioMap[rotation.Priority.Int32] = p } p.sortedByHandoff = append(p.sortedByHandoff, rotation) diff --git a/internal/recipient/rotations_test.go b/internal/recipient/rotations_test.go index 82bf3640b..7c2645d88 100644 --- a/internal/recipient/rotations_test.go +++ b/internal/recipient/rotations_test.go @@ -41,7 +41,7 @@ func Test_rotationResolver_getCurrentRotations(t *testing.T) { // Weekend rotation starting 2024, alternating between contacts contactWeekend2024a and contactWeekend2024b { ActualHandoff: types.UnixMilli(parse("2024-01-01")), - Priority: 0, + Priority: sql.NullInt32{Int32: 0, Valid: true}, Members: []*RotationMember{ { Contact: contactWeekend2024a, @@ -71,7 +71,7 @@ func Test_rotationResolver_getCurrentRotations(t *testing.T) { // alternating between contacts contactWeekend2025a and contactWeekend2025b { ActualHandoff: types.UnixMilli(parse("2025-01-01")), - Priority: 0, + Priority: sql.NullInt32{Int32: 0, Valid: true}, Members: []*RotationMember{ { Contact: contactWeekend2025a, @@ -101,7 +101,7 @@ func Test_rotationResolver_getCurrentRotations(t *testing.T) { // with an override for 12 to 14 o'clock with contactWeekdayNoon. { ActualHandoff: types.UnixMilli(parse("2024-01-01")), - Priority: 1, + Priority: sql.NullInt32{Int32: 1, Valid: true}, Members: []*RotationMember{ { Contact: contactWeekdayNoon, @@ -117,7 +117,7 @@ func Test_rotationResolver_getCurrentRotations(t *testing.T) { }, }, { ActualHandoff: types.UnixMilli(parse("2024-01-01")), - Priority: 2, + Priority: sql.NullInt32{Int32: 2, Valid: true}, Members: []*RotationMember{ { Contact: contactWeekday, diff --git a/internal/recipient/schedule.go b/internal/recipient/schedule.go index d66ef027d..e1ae79252 100644 --- a/internal/recipient/schedule.go +++ b/internal/recipient/schedule.go @@ -3,13 +3,15 @@ package recipient import ( "database/sql" "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/config/baseconf" "github.com/icinga/icinga-notifications/internal/timeperiod" "go.uber.org/zap/zapcore" "time" ) type Schedule struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + Name string `db:"name"` Rotations []*Rotation `db:"-"` @@ -32,10 +34,11 @@ func (s *Schedule) MarshalLogObject(encoder zapcore.ObjectEncoder) error { } type Rotation struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + ScheduleID int64 `db:"schedule_id"` ActualHandoff types.UnixMilli `db:"actual_handoff"` - Priority int32 `db:"priority"` + Priority sql.NullInt32 `db:"priority"` Name string `db:"name"` Members []*RotationMember `db:"-"` } @@ -44,13 +47,16 @@ type Rotation struct { func (r *Rotation) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddInt64("id", r.ID) encoder.AddInt64("schedule_id", r.ScheduleID) - encoder.AddInt32("priority", r.Priority) + if r.Priority.Valid { + encoder.AddInt32("priority", r.Priority.Int32) + } encoder.AddString("name", r.Name) return nil } type RotationMember struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + RotationID int64 `db:"rotation_id"` Contact *Contact `db:"-"` ContactID sql.NullInt64 `db:"contact_id"` @@ -59,6 +65,7 @@ type RotationMember struct { TimePeriodEntries map[int64]*timeperiod.Entry `db:"-"` } +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. func (r *RotationMember) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddInt64("id", r.ID) encoder.AddInt64("rotation_id", r.RotationID) diff --git a/internal/rule/escalation.go b/internal/rule/escalation.go index 823648cf7..40e14a178 100644 --- a/internal/rule/escalation.go +++ b/internal/rule/escalation.go @@ -2,6 +2,8 @@ package rule import ( "database/sql" + "fmt" + "github.com/icinga/icinga-notifications/internal/config/baseconf" "github.com/icinga/icinga-notifications/internal/filter" "github.com/icinga/icinga-notifications/internal/recipient" "go.uber.org/zap/zapcore" @@ -10,7 +12,8 @@ import ( ) type Escalation struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + RuleID int64 `db:"rule_id"` NameRaw sql.NullString `db:"name"` Condition filter.Filter `db:"-"` @@ -21,6 +24,25 @@ type Escalation struct { Recipients []*EscalationRecipient `db:"-"` } +// IncrementalInitAndValidate implements the config.IncrementalConfigurableInitAndValidatable interface. +func (e *Escalation) IncrementalInitAndValidate() error { + if e.ConditionExpr.Valid { + cond, err := filter.Parse(e.ConditionExpr.String) + if err != nil { + return err + } + + e.Condition = cond + } + + if e.FallbackForID.Valid { + // TODO: implement fallbacks (needs extra validation: mismatching rule_id, cycles) + return fmt.Errorf("ignoring fallback escalation (not yet implemented)") + } + + return nil +} + // MarshalLogObject implements the zapcore.ObjectMarshaler interface. // // This allows us to use `zap.Inline(escalation)` or `zap.Object("rule_escalation", escalation)` wherever @@ -93,13 +115,24 @@ func (e *Escalation) TableName() string { } type EscalationRecipient struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + EscalationID int64 `db:"rule_escalation_id"` ChannelID sql.NullInt64 `db:"channel_id"` recipient.Key `db:",inline"` Recipient recipient.Recipient `db:"-"` } +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. +func (r *EscalationRecipient) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("id", r.ID) + encoder.AddInt64("rule_escalation_id", r.EscalationID) + if r.ChannelID.Valid { + encoder.AddInt64("channel_id", r.ChannelID.Int64) + } + return r.Key.MarshalLogObject(encoder) +} + func (r *EscalationRecipient) TableName() string { return "rule_escalation_recipient" } diff --git a/internal/rule/rule.go b/internal/rule/rule.go index 22b3c0500..8b1fbcef1 100644 --- a/internal/rule/rule.go +++ b/internal/rule/rule.go @@ -2,6 +2,7 @@ package rule import ( "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/config/baseconf" "github.com/icinga/icinga-notifications/internal/filter" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/timeperiod" @@ -10,8 +11,8 @@ import ( ) type Rule struct { - ID int64 `db:"id"` - IsActive types.Bool `db:"is_active"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + Name string `db:"name"` TimePeriod *timeperiod.TimePeriod `db:"-"` TimePeriodID types.Int `db:"timeperiod_id"` @@ -20,6 +21,20 @@ type Rule struct { Escalations map[int64]*Escalation `db:"-"` } +// IncrementalInitAndValidate implements the config.IncrementalConfigurableInitAndValidatable interface. +func (r *Rule) IncrementalInitAndValidate() error { + if r.ObjectFilterExpr.Valid { + f, err := filter.Parse(r.ObjectFilterExpr.String) + if err != nil { + return err + } + + r.ObjectFilter = f + } + + return nil +} + // MarshalLogObject implements the zapcore.ObjectMarshaler interface. func (r *Rule) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddInt64("id", r.ID) diff --git a/internal/timeperiod/timeperiod.go b/internal/timeperiod/timeperiod.go index e5718845b..431858969 100644 --- a/internal/timeperiod/timeperiod.go +++ b/internal/timeperiod/timeperiod.go @@ -2,7 +2,9 @@ package timeperiod import ( "database/sql" + "fmt" "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/config/baseconf" "github.com/pkg/errors" "github.com/teambition/rrule-go" "go.uber.org/zap/zapcore" @@ -10,11 +12,19 @@ import ( ) type TimePeriod struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + Name string `db:"-"` Entries []*Entry `db:"-"` } +func (p *TimePeriod) IncrementalInitAndValidate() error { + if p.Name == "" { + p.Name = fmt.Sprintf("Time Period #%d", p.ID) + } + return nil +} + func (p *TimePeriod) TableName() string { return "timeperiod" } @@ -57,7 +67,8 @@ func (p *TimePeriod) NextTransition(base time.Time) time.Time { } type Entry struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + TimePeriodID int64 `db:"timeperiod_id"` StartTime types.UnixMilli `db:"start_time"` EndTime types.UnixMilli `db:"end_time"` @@ -69,6 +80,11 @@ type Entry struct { rrule *rrule.RRule } +// IncrementalInitAndValidate implements the config.IncrementalConfigurableInitAndValidatable interface. +func (e *Entry) IncrementalInitAndValidate() error { + return e.Init() +} + // TableName implements the contracts.TableNamer interface. func (e *Entry) TableName() string { return "timeperiod_entry" @@ -77,12 +93,16 @@ func (e *Entry) TableName() string { // MarshalLogObject implements the zapcore.ObjectMarshaler interface. func (e *Entry) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddInt64("id", e.ID) + encoder.AddInt64("timeperiod_id", e.TimePeriodID) encoder.AddTime("start", e.StartTime.Time()) encoder.AddTime("end", e.EndTime.Time()) encoder.AddString("timezone", e.Timezone) if e.RRule.Valid { encoder.AddString("rrule", e.RRule.String) } + if e.RotationMemberID.Valid { + encoder.AddInt64("rotation_member_id", e.RotationMemberID.Int64) + } return nil } diff --git a/schema/pgsql/schema.sql b/schema/pgsql/schema.sql index 4002b281b..f3ccc5c35 100644 --- a/schema/pgsql/schema.sql +++ b/schema/pgsql/schema.sql @@ -48,55 +48,84 @@ CREATE TABLE channel ( -- for now type determines the implementation, in the future, this will need a reference to a concrete -- implementation to allow multiple implementations of a sms channel for example, probably even user-provided ones + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_channel PRIMARY KEY (id) ); +CREATE INDEX idx_channel_changed_at ON channel(changed_at); + CREATE TABLE contact ( id bigserial, full_name citext NOT NULL, - username citext, -- reference to web user + username citext, -- reference web user default_channel_id bigint NOT NULL REFERENCES channel(id), + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_contact PRIMARY KEY (id), - UNIQUE (username) + UNIQUE (username) -- column must be NULLed for deletion via "deleted = 'y'" ); +CREATE INDEX idx_contact_changed_at ON contact(changed_at); + CREATE TABLE contact_address ( id bigserial, contact_id bigint NOT NULL REFERENCES contact(id), type text NOT NULL, -- 'phone', 'email', ... address text NOT NULL, -- phone number, email address, ... - CONSTRAINT pk_contact_address PRIMARY KEY (id), - UNIQUE (contact_id, type) -- constraint may be relaxed in the future to support multiple addresses per type + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + + CONSTRAINT pk_contact_address PRIMARY KEY (id) ); +CREATE INDEX idx_contact_address_changed_at ON contact_address(changed_at); + CREATE TABLE contactgroup ( id bigserial, name citext NOT NULL, + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_contactgroup PRIMARY KEY (id) ); +CREATE INDEX idx_contactgroup_changed_at ON contactgroup(changed_at); + CREATE TABLE contactgroup_member ( contactgroup_id bigint NOT NULL REFERENCES contactgroup(id), contact_id bigint NOT NULL REFERENCES contact(id), + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_contactgroup_member PRIMARY KEY (contactgroup_id, contact_id) ); +CREATE INDEX idx_contactgroup_member_changed_at ON contactgroup_member(changed_at); + CREATE TABLE schedule ( id bigserial, name citext NOT NULL, + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_schedule PRIMARY KEY (id) ); +CREATE INDEX idx_schedule_changed_at ON schedule(changed_at); + CREATE TABLE rotation ( id bigserial, schedule_id bigint NOT NULL REFERENCES schedule(id), -- the lower the more important, starting at 0, avoids the need to re-index upon addition - priority integer NOT NULL, + priority integer, name text NOT NULL, mode rotation_type NOT NULL, -- JSON with rotation-specific attributes @@ -105,32 +134,47 @@ CREATE TABLE rotation ( -- A date in the format 'YYYY-MM-DD' when the first handoff should happen. -- It is a string as handoffs are restricted to happen only once per day - first_handoff date NOT NULL, + first_handoff date, -- Set to the actual time of the first handoff. -- If this is in the past during creation of the rotation, it is set to the creation time. -- Used by Web to avoid showing shifts that never happened actual_handoff bigint NOT NULL, - -- each schedule can only have one rotation with a given priority starting at a given date + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + + -- Each schedule can only have one rotation with a given priority starting at a given date. + -- Column must be NULLed for deletion via "deleted = 'y'". UNIQUE (schedule_id, priority, first_handoff), + CHECK (deleted = 'y' OR priority IS NOT NULL AND first_handoff IS NOT NULL), CONSTRAINT pk_rotation PRIMARY KEY (id) ); +CREATE INDEX idx_rotation_changed_at ON rotation(changed_at); + CREATE TABLE timeperiod ( id bigserial, owned_by_rotation_id bigint REFERENCES rotation(id), -- nullable for future standalone timeperiods + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_timeperiod PRIMARY KEY (id) ); +CREATE INDEX idx_timeperiod_changed_at ON timeperiod(changed_at); + CREATE TABLE rotation_member ( id bigserial, rotation_id bigint NOT NULL REFERENCES rotation(id), contact_id bigint REFERENCES contact(id), contactgroup_id bigint REFERENCES contactgroup(id), - position integer NOT NULL, + position integer, -- column must be NULLed for deletion via "deleted = 'y'". + + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', UNIQUE (rotation_id, position), -- each position in a rotation can only be used once @@ -140,11 +184,15 @@ CREATE TABLE rotation_member ( -- ensures that each row has only non-NULL values in one of these constraints. UNIQUE (rotation_id, contact_id), UNIQUE (rotation_id, contactgroup_id), - CHECK (num_nonnulls(contact_id, contactgroup_id) = 1), + + CONSTRAINT ck_rotation_member_either_contact_id_or_contactgroup_id CHECK (num_nonnulls(contact_id, contactgroup_id) = 1), + CONSTRAINT ck_rotation_member_non_deleted_needs_position CHECK (deleted = 'y' OR position IS NOT NULL), CONSTRAINT pk_rotation_member PRIMARY KEY (id) ); +CREATE INDEX idx_rotation_member_changed_at ON rotation_member(changed_at); + CREATE TABLE timeperiod_entry ( id bigserial, timeperiod_id bigint NOT NULL REFERENCES timeperiod(id), @@ -156,9 +204,14 @@ CREATE TABLE timeperiod_entry ( timezone text NOT NULL, -- e.g. 'Europe/Berlin', relevant for evaluating rrule (DST changes differ between zones) rrule text, -- recurrence rule (RFC5545) + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_timeperiod_entry PRIMARY KEY (id) ); +CREATE INDEX idx_timeperiod_entry_changed_at ON timeperiod_entry(changed_at); + CREATE TABLE source ( id bigserial, -- The type "icinga2" is special and requires (at least some of) the icinga2_ prefixed columns. @@ -184,6 +237,9 @@ CREATE TABLE source ( icinga2_common_name text, icinga2_insecure_tls boolenum NOT NULL DEFAULT 'n', + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + -- The hash is a PHP password_hash with PASSWORD_DEFAULT algorithm, defaulting to bcrypt. This check roughly ensures -- that listener_password_hash can only be populated with bcrypt hashes. -- https://icinga.com/docs/icinga-web/latest/doc/20-Advanced-Topics/#manual-user-creation-for-database-authentication-backend @@ -193,6 +249,8 @@ CREATE TABLE source ( CONSTRAINT pk_source PRIMARY KEY (id) ); +CREATE INDEX idx_source_changed_at ON source(changed_at); + CREATE TABLE object ( id bytea NOT NULL, -- SHA256 of identifying tags and the source.id source_id bigint NOT NULL REFERENCES source(id), @@ -258,25 +316,35 @@ CREATE TABLE rule ( name citext NOT NULL, timeperiod_id bigint REFERENCES timeperiod(id), object_filter text, - is_active boolenum NOT NULL DEFAULT 'y', + + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', CONSTRAINT pk_rule PRIMARY KEY (id) ); +CREATE INDEX idx_rule_changed_at ON rule(changed_at); + CREATE TABLE rule_escalation ( id bigserial, rule_id bigint NOT NULL REFERENCES rule(id), - position integer NOT NULL, + position integer, condition text, name citext, -- if not set, recipients are used as a fallback for display purposes fallback_for bigint REFERENCES rule_escalation(id), + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_rule_escalation PRIMARY KEY (id), - UNIQUE (rule_id, position), - CHECK (NOT (condition IS NOT NULL AND fallback_for IS NOT NULL)) + UNIQUE (rule_id, position), -- column must be NULLed for deletion via "deleted = 'y'" + CONSTRAINT ck_rule_escalation_fallback_has_no_fallback CHECK (NOT (condition IS NOT NULL AND fallback_for IS NOT NULL)), + CONSTRAINT ck_rule_escalation_non_deleted_needs_position CHECK (deleted = 'y' OR position IS NOT NULL) ); +CREATE INDEX idx_rule_escalation_changed_at ON rule_escalation(changed_at); + CREATE TABLE rule_escalation_recipient ( id bigserial, rule_escalation_id bigint NOT NULL REFERENCES rule_escalation(id), @@ -285,11 +353,16 @@ CREATE TABLE rule_escalation_recipient ( schedule_id bigint REFERENCES schedule(id), channel_id bigint REFERENCES channel(id), + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_rule_escalation_recipient PRIMARY KEY (id), CHECK (num_nonnulls(contact_id, contactgroup_id, schedule_id) = 1) ); +CREATE INDEX idx_rule_escalation_recipient_changed_at ON rule_escalation_recipient(changed_at); + CREATE TABLE incident ( id bigserial, object_id bytea NOT NULL REFERENCES object(id), diff --git a/schema/pgsql/upgrades/032.sql b/schema/pgsql/upgrades/032.sql new file mode 100644 index 000000000..f1eb85f77 --- /dev/null +++ b/schema/pgsql/upgrades/032.sql @@ -0,0 +1,100 @@ +ALTER TABLE channel + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_channel_changed_at ON channel(changed_at); + +ALTER TABLE contact + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_contact_changed_at ON contact(changed_at); + +ALTER TABLE contact_address + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n', + DROP CONSTRAINT contact_address_contact_id_type_key; + +CREATE INDEX idx_contact_address_changed_at ON contact_address(changed_at); + +ALTER TABLE contactgroup + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_contactgroup_changed_at ON contactgroup(changed_at); + +ALTER TABLE contactgroup_member + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_contactgroup_member_changed_at ON contactgroup_member(changed_at); + +ALTER TABLE schedule + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_schedule_changed_at ON schedule(changed_at); + +ALTER TABLE rotation + ALTER COLUMN priority DROP NOT NULL, + ALTER COLUMN first_handoff DROP NOT NULL, + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n', + ADD CHECK (deleted = 'y' OR priority IS NOT NULL AND first_handoff IS NOT NULL); + +CREATE INDEX idx_rotation_changed_at ON rotation(changed_at); + +ALTER TABLE timeperiod + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_timeperiod_changed_at ON timeperiod(changed_at); + +ALTER TABLE rotation_member + RENAME CONSTRAINT rotation_member_check TO ck_rotation_member_either_contact_id_or_contactgroup_id; + +ALTER TABLE rotation_member + ALTER COLUMN position DROP NOT NULL, + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n', + ADD CONSTRAINT ck_rotation_member_non_deleted_needs_position CHECK (deleted = 'y' OR position IS NOT NULL); + +CREATE INDEX idx_rotation_member_changed_at ON rotation_member(changed_at); + +ALTER TABLE timeperiod_entry + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_timeperiod_entry_changed_at ON timeperiod_entry(changed_at); + +ALTER TABLE source + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_source_changed_at ON source(changed_at); + +ALTER TABLE rule + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_rule_changed_at ON rule(changed_at); + +UPDATE rule SET deleted = 'y' WHERE is_active = 'n'; +ALTER TABLE rule DROP COLUMN is_active; + +ALTER TABLE rule_escalation + RENAME CONSTRAINT rule_escalation_check TO ck_rule_escalation_non_deleted_needs_position; + +ALTER TABLE rule_escalation + ALTER COLUMN position DROP NOT NULL, + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n', + ADD CONSTRAINT ck_rule_escalation_fallback_has_no_fallback CHECK (deleted = 'y' OR position IS NOT NULL); + +CREATE INDEX idx_rule_escalation_changed_at ON rule_escalation(changed_at); + +ALTER TABLE rule_escalation_recipient + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_rule_escalation_recipient_changed_at ON rule_escalation_recipient(changed_at);