Skip to content

Commit

Permalink
feat(crons): Record stats for volume history at clock tick (#79574)
Browse files Browse the repository at this point in the history
This adds a function `_evaluate_tick_decision` which looks back at the
last MONITOR_VOLUME_RETENTION days worth of history and compares the
minute we just ticked past to that data.

We record 3 metrics from this comparison

- `z_value`: This is measured as a ratio of standard deviations from the
mean value
- `pct_deviation`: This is the percentage we've deviated from the mean
- `count`: This is the number of historic data points we're considering

The z_value and pct_deviation will be most helpful in making our
decision as to whether we've entered an "incident" state or not.

Part of #79328
  • Loading branch information
evanpurkhiser authored Oct 24, 2024
1 parent e667a2a commit 3a23ad2
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 1 deletion.
104 changes: 104 additions & 0 deletions src/sentry/monitors/clock_dispatch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import statistics
from collections import Counter
from collections.abc import Sequence
from datetime import datetime, timedelta, timezone
Expand All @@ -11,6 +12,7 @@
from sentry_kafka_schemas.codecs import Codec
from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick

from sentry import options
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.utils import metrics, redis
from sentry.utils.arroyo_producer import SingletonProducer
Expand All @@ -27,6 +29,18 @@
# This key is used to record historical date about the volume of check-ins.
MONITOR_VOLUME_HISTORY = "sentry.monitors.volume_history:{}"

# When fetching historic volume data to make a decision whether we have lost
# data this value will determine how many historic volume data-points we fetch
# of the window of the MONITOR_VOLUME_RETENTION. It is important to consider
# the expected uniformity of the volume for different steps.
#
# For example, since we tend to have much larger volume of check-ins
# on-the-hour it may not make sense to look at each minute as a data point.
# This is why this is set to 1 day, since this should closely match the
# harmonics of how check-ins are sent (people like to check-in on the hour, but
# there are probably more check-ins at midnight, than at 3pm).
MONITOR_VOLUME_DECISION_STEP = timedelta(days=1)

# We record 30 days worth of historical data for each minute of check-ins.
MONITOR_VOLUME_RETENTION = timedelta(days=30)

Expand Down Expand Up @@ -85,6 +99,94 @@ def _make_reference_ts(ts: datetime):
return int(ts.replace(second=0, microsecond=0).timestamp())


def _evaluate_tick_decision(tick: datetime):
"""
When the clock is ticking, we may decide this tick is invalid and should
result in unknown misses and marking all in-progress check-ins as having an
unknown result.
We do this by looking at the historic volume of check-ins for the
particular minute boundary we just crossed.
XXX(epurkhiser): This is currently in development and no decision is made
to mark unknowns, instead we are only recording metrics for each clock tick
"""
if not options.get("crons.tick_volume_anomaly_detection"):
return

redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

# The clock has just ticked to the next minute. Look at the previous minute
# volume across the last with the values
past_ts = tick - timedelta(minutes=1)
start_ts = past_ts - MONITOR_VOLUME_RETENTION

# Generate previous timestamps to fetch. The first past_ts timestamp is
# also included in this query
historic_timestamps: list[datetime] = [past_ts]
historic_ts = past_ts

while historic_ts > start_ts:
historic_ts = historic_ts - MONITOR_VOLUME_DECISION_STEP
historic_timestamps.append(historic_ts)

# Bulk fetch volume counts
volumes = redis_client.mget(
MONITOR_VOLUME_HISTORY.format(_make_reference_ts(ts)) for ts in historic_timestamps
)

past_minute_volume = _int_or_none(volumes.pop(0))
historic_volume: list[int] = [int(v) for v in volumes if v is not None]

# Can't make any decisions if we didn't have data for the past minute
if past_minute_volume is None:
return

# Can't make any decisions if we don't have historic data
if len(historic_volume) == 0:
return

# Record some statistics about the past_minute_volume volume in comparison
# to the historic_volume data

historic_mean = statistics.mean(historic_volume)
historic_stdev = statistics.stdev(historic_volume)

historic_stdev_pct = (historic_stdev / historic_mean) * 100

# Calculate the z-score of our past minutes volume in comparison to the
# historic volume data. The z-score is measured in terms of standard
# deviations from the mean
z_score = (past_minute_volume - historic_mean) / historic_stdev

# Percentage deviation from the mean for our past minutes volume
pct_deviation = (abs(past_minute_volume - historic_mean) / historic_mean) * 100

metrics.gauge(
"monitors.task.clock_tick.historic_volume_stdev_pct",
historic_stdev_pct,
sample_rate=1.0,
)
metrics.gauge("monitors.task.volume_history.count", len(historic_volume), sample_rate=1.0)
metrics.gauge("monitors.task.volume_history.z_score", z_score, sample_rate=1.0)
metrics.gauge("monitors.task.volume_history.pct_deviation", pct_deviation, sample_rate=1.0)

# XXX(epurkhiser): We're not actually making any decisions with this data
# just yet.
logger.info(
"monitors.clock_dispatch.volume_history",
extra={
"reference_datetime": str(tick),
"evaluation_minute": past_ts.strftime("%H:%M"),
"history_count": len(historic_volume),
"z_score": z_score,
"pct_deviation": pct_deviation,
"historic_mean": historic_mean,
"historic_stdev": historic_stdev,
},
)


def update_check_in_volume(ts_list: Sequence[datetime]):
"""
Increment counters for a list of check-in timestamps. Each timestamp will be
Expand Down Expand Up @@ -180,7 +282,9 @@ def try_monitor_clock_tick(ts: datetime, partition: int):
extra = {"reference_datetime": str(backfill_tick)}
logger.info("monitors.consumer.clock_tick_backfill", extra=extra)

_evaluate_tick_decision(backfill_tick)
_dispatch_tick(backfill_tick)
backfill_tick = backfill_tick + timedelta(minutes=1)

_evaluate_tick_decision(tick)
_dispatch_tick(tick)
7 changes: 7 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,13 @@
# Killswitch for monitor check-ins
register("crons.organization.disable-check-in", type=Sequence, default=[])

# Enables anomaly detection based on the volume of check-ins being processed
register(
"crons.tick_volume_anomaly_detection",
default=False,
flags=FLAG_BOOL | FLAG_AUTOMATOR_MODIFIABLE,
)

# Sets the timeout for webhooks
register(
"sentry-apps.webhook.timeout.sec",
Expand Down
148 changes: 147 additions & 1 deletion tests/sentry/monitors/test_clock_dispatch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from datetime import timedelta
import itertools
from collections.abc import Sequence
from datetime import datetime, timedelta
from unittest import mock

from arroyo import Topic
Expand All @@ -8,15 +10,20 @@
from django.utils import timezone

from sentry.monitors.clock_dispatch import (
MONITOR_VOLUME_DECISION_STEP,
MONITOR_VOLUME_HISTORY,
MONITOR_VOLUME_RETENTION,
_dispatch_tick,
_evaluate_tick_decision,
try_monitor_clock_tick,
update_check_in_volume,
)
from sentry.testutils.helpers.options import override_options
from sentry.utils import json, redis


@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
@override_options({"crons.tick_volume_anomaly_detection": False})
def test_monitor_task_trigger(dispatch_tick):
now = timezone.now().replace(second=0, microsecond=0)

Expand Down Expand Up @@ -45,6 +52,7 @@ def test_monitor_task_trigger(dispatch_tick):


@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
@override_options({"crons.tick_volume_anomaly_detection": False})
def test_monitor_task_trigger_partition_desync(dispatch_tick):
"""
When consumer partitions are not completely synchronized we may read
Expand Down Expand Up @@ -76,6 +84,7 @@ def test_monitor_task_trigger_partition_desync(dispatch_tick):


@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
@override_options({"crons.tick_volume_anomaly_detection": False})
def test_monitor_task_trigger_partition_sync(dispatch_tick):
"""
When the kafka topic has multiple partitions we want to only tick our clock
Expand Down Expand Up @@ -104,6 +113,7 @@ def test_monitor_task_trigger_partition_sync(dispatch_tick):


@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
@override_options({"crons.tick_volume_anomaly_detection": False})
def test_monitor_task_trigger_partition_tick_skip(dispatch_tick):
"""
In a scenario where all partitions move multiple ticks past the slowest
Expand Down Expand Up @@ -141,6 +151,7 @@ def test_monitor_task_trigger_partition_tick_skip(dispatch_tick):
@override_settings(KAFKA_TOPIC_OVERRIDES={"monitors-clock-tick": "clock-tick-test-topic"})
@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
@mock.patch("sentry.monitors.clock_dispatch._clock_tick_producer")
@override_options({"crons.tick_volume_anomaly_detection": False})
def test_dispatch_to_kafka(clock_tick_producer_mock):
now = timezone.now().replace(second=0, microsecond=0)
_dispatch_tick(now)
Expand Down Expand Up @@ -177,3 +188,138 @@ def make_key(offset: timedelta) -> str:
assert minute_1 == "1"
assert minute_2 is None
assert minute_3 == "1"


def fill_historic_volume(
start: datetime, length: timedelta, step: timedelta, counts: Sequence[int]
):
"""
Creates a volume history starting at the `start` and going back `length`,
where each bucket is spaced by `step`s apart.
`count` Is a list of counts for each step. This value is a list that will
be cycled through, it must be a division of the number of steps between the
start and length.
"""
aligned_start = start.replace(second=0, microsecond=0)

# The length of counts should be divisible into the number of steps
steps = length // step
assert steps % len(counts) == 0

counts_cycle = itertools.cycle(counts)
ts = aligned_start
end = aligned_start - length

ts_list = []
while ts >= end:
count = next(counts_cycle)
ts_list.extend([ts] * count)
ts = ts - step

update_check_in_volume(ts_list)


@mock.patch("sentry.monitors.clock_dispatch.logger")
@mock.patch("sentry.monitors.clock_dispatch.metrics")
@override_options({"crons.tick_volume_anomaly_detection": True})
def test_evaluate_tick_decision_simple(metrics, logger):
tick = timezone.now().replace(second=0, microsecond=0)

# This is the timestamp we're looking at just after the tick
past_ts = tick - timedelta(minutes=1)

# Fill histroic volume data for earlier minutes.
fill_historic_volume(
start=past_ts - MONITOR_VOLUME_DECISION_STEP,
length=MONITOR_VOLUME_RETENTION,
step=MONITOR_VOLUME_DECISION_STEP,
counts=[170, 150, 130, 210, 154],
)

# Record a volume of 200 for the timestamp we are considerng
update_check_in_volume([past_ts] * 165)

_evaluate_tick_decision(tick)

logger.info.assert_called_with(
"monitors.clock_dispatch.volume_history",
extra={
"reference_datetime": str(tick),
"evaluation_minute": past_ts.strftime("%H:%M"),
"history_count": 30,
"z_score": 0.08064694302168258,
"pct_deviation": 1.3513513513513442,
"historic_mean": 162.8,
"historic_stdev": 27.279397303484902,
},
)

metrics.gauge.assert_any_call(
"monitors.task.volume_history.count",
30,
sample_rate=1.0,
)
metrics.gauge.assert_any_call(
"monitors.task.volume_history.z_score",
0.08064694302168258,
sample_rate=1.0,
)
metrics.gauge.assert_any_call(
"monitors.task.volume_history.pct_deviation",
1.3513513513513442,
sample_rate=1.0,
)


@mock.patch("sentry.monitors.clock_dispatch.logger")
@mock.patch("sentry.monitors.clock_dispatch.metrics")
@override_options({"crons.tick_volume_anomaly_detection": True})
def test_evaluate_tick_decision_volume_drop(metrics, logger):
tick = timezone.now().replace(second=0, microsecond=0)

# This is the timestamp we're looking at just after the tick
past_ts = tick - timedelta(minutes=1)

# Fill histroic volume data for earlier minutes.
fill_historic_volume(
start=past_ts - MONITOR_VOLUME_DECISION_STEP,
length=MONITOR_VOLUME_RETENTION,
step=MONITOR_VOLUME_DECISION_STEP,
counts=[13_000, 12_000, 12_500, 12_400, 12_600],
)

# Record a volume much lower than what we had been recording previously
update_check_in_volume([past_ts] * 6_000)

_evaluate_tick_decision(tick)

# Note that the pct_deviation and z_score are extremes
logger.info.assert_called_with(
"monitors.clock_dispatch.volume_history",
extra={
"reference_datetime": str(tick),
"evaluation_minute": past_ts.strftime("%H:%M"),
"history_count": 30,
"z_score": -19.816869917656856,
"pct_deviation": 52.0,
"historic_mean": 12500,
"historic_stdev": 328.0033641543204,
},
)

metrics.gauge.assert_any_call(
"monitors.task.volume_history.count",
30,
sample_rate=1.0,
)
metrics.gauge.assert_any_call(
"monitors.task.volume_history.z_score",
-19.816869917656856,
sample_rate=1.0,
)
metrics.gauge.assert_any_call(
"monitors.task.volume_history.pct_deviation",
52.0,
sample_rate=1.0,
)

0 comments on commit 3a23ad2

Please sign in to comment.