Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
oxzi committed May 7, 2024
1 parent 4c482cf commit b39adec
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 100 deletions.
61 changes: 43 additions & 18 deletions internal/config/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,43 @@ import (
"context"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/icinga/icingadb/pkg/types"
dbutils "github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
"slices"
)

func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error {
var groupPtr *recipient.Group
changedAt := r.pendingLastChange[dbutils.TableName(groupPtr)]
type ContactgroupMember struct {
GroupId int64 `db:"contactgroup_id"`
ContactId int64 `db:"contact_id"`

ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"`
IsDeleted types.Bool `db:"deleted" json:"deleted"`
}

var (
groupPtr *recipient.Group
memberPtr *ContactgroupMember

groupChangedAt = r.pendingLastChange[dbutils.TableName(groupPtr)]
memberChangedAt = r.pendingLastChange[dbutils.TableName(memberPtr)]
)

stmt := r.buildSelectStmtWhereChangedAt(groupPtr)
r.logger.Debugw("Executing query to fetch groups",
zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time()))
zap.String("query", stmt), zap.Time("changed_at_after", groupChangedAt.Time()))

var groups []*recipient.Group
if err := tx.SelectContext(ctx, &groups, stmt, changedAt); err != nil {
if err := tx.SelectContext(ctx, &groups, stmt, groupChangedAt); err != nil {
r.logger.Errorln(err)
return err
}

groupsById := make(map[int64]*recipient.Group)
for _, g := range groups {
changedAt = g.ChangedAt
groupChangedAt = g.ChangedAt

groupLogger := r.logger.With(
zap.Int64("id", g.ID),
Expand All @@ -40,46 +56,55 @@ func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error {
}
}

type ContactgroupMember struct {
GroupId int64 `db:"contactgroup_id"`
ContactId int64 `db:"contact_id"`
}

var members []*ContactgroupMember
err := r.selectRelationshipTableEntries(
ctx,
tx,
new(ContactgroupMember),
memberPtr,
"contactgroup_id",
utils.MapKeys(groupsById),
r.Groups == nil,
memberChangedAt,
&members)
if err != nil {
r.logger.Errorln(err)
return err
}

for _, m := range members {
memberChangedAt = m.ChangedAt

memberLogger := r.logger.With(
zap.Int64("contact_id", m.ContactId),
zap.Int64("contactgroup_id", m.GroupId),
)

g, ok := groupsById[m.GroupId]
if !ok {
group, newGroupOk := groupsById[m.GroupId]
cachedGroup, cachedGroupOk := r.Groups[m.GroupId]
if !newGroupOk && !cachedGroupOk {
memberLogger.Warn("Ignoring member for unknown contactgroup_id")
continue
} else if g == nil {
} else if !newGroupOk {
groupsById[m.ContactId] = cachedGroup
group = cachedGroup
}

if group == nil {
memberLogger.Debug("Skipping deleted member for unknown contactgroup_id")
continue
}

g.MemberIDs = append(g.MemberIDs, m.ContactId)
memberLogger.Info("Loaded contact group member", zap.String("contactgroup_name", g.Name))
if m.IsDeleted.Valid && m.IsDeleted.Bool {
slices.DeleteFunc(group.MemberIDs, func(id int64) bool { return id == m.ContactId })
memberLogger.Debug("Deleted contact group member")
} else {
group.MemberIDs = append(group.MemberIDs, m.ContactId)
memberLogger.Debug("Loaded contact group member")
}
}

r.pending.Groups = groupsById
r.pendingLastChange[dbutils.TableName(groupPtr)] = changedAt
r.pendingLastChange[dbutils.TableName(groupPtr)] = groupChangedAt
r.pendingLastChange[dbutils.TableName(memberPtr)] = memberChangedAt

return nil
}
Expand Down
94 changes: 77 additions & 17 deletions internal/config/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,34 @@ import (
dbutils "github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
"slices"
)

func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error {
var rulePtr *rule.Rule
changedAt := r.pendingLastChange[dbutils.TableName(rulePtr)]
var (
rulePtr *rule.Rule
escalationPtr *rule.Escalation
recipientPtr *rule.EscalationRecipient

ruleChangedAt = r.pendingLastChange[dbutils.TableName(rulePtr)]
escalationChangedAt = r.pendingLastChange[dbutils.TableName(escalationPtr)]
recipientChangedAt = r.pendingLastChange[dbutils.TableName(recipientPtr)]
)

stmt := r.buildSelectStmtWhereChangedAt(rulePtr)
r.logger.Debugw("Executing query to fetch rules",
zap.String("query", stmt),
zap.Time("changed_at_after", changedAt.Time()))
zap.Time("changed_at_after", ruleChangedAt.Time()))

var rules []*rule.Rule
if err := tx.SelectContext(ctx, &rules, stmt, changedAt); err != nil {
if err := tx.SelectContext(ctx, &rules, stmt, ruleChangedAt); err != nil {
r.logger.Errorln(err)
return err
}

rulesByID := make(map[int64]*rule.Rule)
for _, ru := range rules {
changedAt = ru.ChangedAt
ruleChangedAt = ru.ChangedAt

ruleLogger := r.logger.With(
zap.Int64("id", ru.ID),
Expand Down Expand Up @@ -61,10 +70,10 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error {
err := r.selectRelationshipTableEntries(
ctx,
tx,
new(rule.Escalation),
escalationPtr,
"rule_id",
utils.MapKeys(rulesByID),
r.Rules == nil,
escalationChangedAt,
&escalations)
if err != nil {
r.logger.Errorln(err)
Expand All @@ -73,6 +82,8 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error {

escalationsByID := make(map[int64]*rule.Escalation)
for _, escalation := range escalations {
escalationChangedAt = escalation.ChangedAt

escalationLogger := r.logger.With(
zap.Int64("id", escalation.ID),
zap.Int64("rule_id", escalation.RuleID),
Expand All @@ -81,15 +92,28 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error {
zap.Int64("fallback_for", escalation.FallbackForID.Int64),
)

r, ok := rulesByID[escalation.RuleID]
if !ok {
rule, newRuleOk := rulesByID[escalation.RuleID]
cachedRule, cachedRuleOk := r.Rules[escalation.RuleID]
if !newRuleOk && !cachedRuleOk {
escalationLogger.Warn("Ignoring escalation for unknown rule_id")
continue
} else if r == nil {
} else if !newRuleOk {
rulesByID[escalation.RuleID] = cachedRule
rule = cachedRule
}

if rule == nil {
escalationLogger.Debug("Skipping deleted escalation for unknown rule_id")
continue
}

if escalation.IsDeleted.Valid && escalation.IsDeleted.Bool {
rule.Escalations[escalation.ID] = nil
escalationsByID[escalation.ID] = nil
escalationLogger.Info("Deleted escalation config")
continue
}

if escalation.ConditionExpr.Valid {
cond, err := filter.Parse(escalation.ConditionExpr.String)
if err != nil {
Expand All @@ -110,43 +134,79 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error {
escalation.Name = escalation.NameRaw.String
}

r.Escalations[escalation.ID] = escalation
rule.Escalations[escalation.ID] = escalation
escalationsByID[escalation.ID] = escalation
escalationLogger.Debugw("loaded escalation config")
escalationLogger.Debug("Loaded escalation config")
}

var recipients []*rule.EscalationRecipient
err = r.selectRelationshipTableEntries(
ctx,
tx,
new(rule.EscalationRecipient),
recipientPtr,
"rule_escalation_id",
utils.MapKeys(escalationsByID),
r.Rules == nil,
recipientChangedAt,
&recipients)
if err != nil {
r.logger.Errorln(err)
return err
}

for _, recipient := range recipients {
recipientChangedAt = recipient.ChangedAt

recipientLogger := r.logger.With(
zap.Int64("id", recipient.ID),
zap.Int64("escalation_id", recipient.EscalationID),
zap.Int64("channel_id", recipient.ChannelID.Int64))

escalation := escalationsByID[recipient.EscalationID]
// In contrary to similar code snippets, this should not be able to contain nil elements.
escalation, escalationOk := escalationsByID[recipient.EscalationID]
if !escalationOk {
for _, cachedRule := range r.Rules {
if ruleEscalation, ok := cachedRule.Escalations[recipient.EscalationID]; ok {
rule, ruleOk := rulesByID[cachedRule.ID]
if !ruleOk {
// Rule does not exist from this run; load from previous run.
rulesByID[rule.ID] = cachedRule
rule = cachedRule
} else {
// Populate current rule with cached escalation.
rule.Escalations[ruleEscalation.ID] = ruleEscalation
}

escalationsByID[ruleEscalation.ID] = ruleEscalation
escalation = ruleEscalation
break
}
}

if escalation == nil {
recipientLogger.Error("Failed to load escalation")
continue
}
}

if escalation == nil {
recipientLogger.Warn("Ignoring recipient for unknown escalation")
continue
}

if recipient.IsDeleted.Valid && recipient.IsDeleted.Bool {
slices.DeleteFunc(escalation.Recipients, func(cmpRecipient *rule.EscalationRecipient) bool {
return cmpRecipient.ID == recipient.ID
})
recipientLogger.Debug("Removed escalation recipient config")
} else {
escalation.Recipients = append(escalation.Recipients, recipient)
recipientLogger.Debug("Loaded escalation recipient config")
}
}

r.pending.Rules = rulesByID
r.pendingLastChange[dbutils.TableName(rulePtr)] = changedAt
r.pendingLastChange[dbutils.TableName(rulePtr)] = ruleChangedAt
r.pendingLastChange[dbutils.TableName(escalationPtr)] = escalationChangedAt
r.pendingLastChange[dbutils.TableName(recipientPtr)] = recipientChangedAt

return nil
}
Expand Down
49 changes: 36 additions & 13 deletions internal/config/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,31 @@ import (
dbutils "github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
"slices"
)

func (r *RuntimeConfig) fetchSchedules(ctx context.Context, tx *sqlx.Tx) error {
var schedulePtr *recipient.Schedule
changedAt := r.pendingLastChange[dbutils.TableName(schedulePtr)]
var (
schedulePtr *recipient.Schedule
memberPtr *recipient.ScheduleMemberRow

scheduleChangedAt = r.pendingLastChange[dbutils.TableName(schedulePtr)]
memberChangedAt = r.pendingLastChange[dbutils.TableName(memberPtr)]
)

stmt := r.buildSelectStmtWhereChangedAt(schedulePtr)
r.logger.Debugw("Executing query to fetch schedule",
zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time()))
zap.String("query", stmt), zap.Time("changed_at_after", scheduleChangedAt.Time()))

var schedules []*recipient.Schedule
if err := tx.SelectContext(ctx, &schedules, stmt, changedAt); err != nil {
if err := tx.SelectContext(ctx, &schedules, stmt, scheduleChangedAt); err != nil {
r.logger.Errorln(err)
return err
}

schedulesById := make(map[int64]*recipient.Schedule)
for _, s := range schedules {
changedAt = s.ChangedAt
scheduleChangedAt = s.ChangedAt

scheduleLogger := r.logger.With(
zap.Int64("id", s.ID),
Expand All @@ -44,34 +51,50 @@ func (r *RuntimeConfig) fetchSchedules(ctx context.Context, tx *sqlx.Tx) error {
err := r.selectRelationshipTableEntries(
ctx,
tx,
new(recipient.ScheduleMemberRow),
memberPtr,
"schedule_id",
utils.MapKeys(schedulesById),
r.Schedules == nil,
memberChangedAt,
&members)
if err != nil {
r.logger.Errorln(err)
return err
}

for _, member := range members {
memberChangedAt = member.ChangedAt

memberLogger := makeScheduleMemberLogger(r.logger.SugaredLogger, member)

s, ok := schedulesById[member.ScheduleID]
if !ok {
schedule, newScheduleOk := schedulesById[member.ScheduleID]
cachedSchedule, cachedScheduleOk := r.Schedules[member.ScheduleID]
if !newScheduleOk && !cachedScheduleOk {
memberLogger.Warn("Ignoring entry for unknown schedule_id")
continue
} else if s == nil {
} else if !newScheduleOk {
schedulesById[member.ScheduleID] = cachedSchedule
schedule = cachedSchedule
}

if schedule == nil {
memberLogger.Debug("Skipping deleted entry for unknown schedule_id")
continue
}

s.MemberRows = append(s.MemberRows, member)
memberLogger.Debug("Member")
if member.IsDeleted.Valid && member.IsDeleted.Bool {
slices.DeleteFunc(schedule.MemberRows, func(row *recipient.ScheduleMemberRow) bool {
return row.ScheduleID == member.ScheduleID
})
memberLogger.Debug("Deleted schedule member")
} else {
schedule.MemberRows = append(schedule.MemberRows, member)
memberLogger.Debug("Loaded schedule member")
}
}

r.pending.Schedules = schedulesById
r.pendingLastChange[dbutils.TableName(schedulePtr)] = changedAt
r.pendingLastChange[dbutils.TableName(schedulePtr)] = scheduleChangedAt
r.pendingLastChange[dbutils.TableName(memberPtr)] = memberChangedAt

return nil
}
Expand Down
Loading

0 comments on commit b39adec

Please sign in to comment.