diff --git a/src/sentry/workflow_engine/models/detector.py b/src/sentry/workflow_engine/models/detector.py index 2698d682a187a4..86c0ca38a8285c 100644 --- a/src/sentry/workflow_engine/models/detector.py +++ b/src/sentry/workflow_engine/models/detector.py @@ -16,6 +16,7 @@ from sentry.issues import grouptype from sentry.models.owner_base import OwnerModel from sentry.utils import redis +from sentry.utils.iterators import chunked from sentry.workflow_engine.models import DataPacket from sentry.workflow_engine.models.detector_state import DetectorState from sentry.workflow_engine.types import DetectorPriorityLevel @@ -126,6 +127,76 @@ def evaluate(self, data_packet: DataPacket[T]) -> list[DetectorEvaluationResult] class StatefulDetectorHandler(DetectorHandler[T], abc.ABC): + @property + @abc.abstractmethod + def counter_names(self) -> list[str]: + """ + The names of counters that this detector is going to keep track of. + """ + pass + + @abc.abstractmethod + def get_dedupe_value(self, data_packet: DataPacket[T]) -> int: + """ + Extracts the deduplication value from a passed data packet. + TODO: This might belong on the `DataPacket` instead. + """ + pass + + @abc.abstractmethod + def get_group_key_values(self, data_packet: DataPacket[T]) -> dict[str, int]: + """ + Extracts the values for all the group keys that exist in the given data packet, + and returns then as a dict keyed by group_key. + """ + pass + + def get_state_data(self, group_keys: list[str | None]) -> dict[str | None, DetectorStateData]: + """ + Fetches state data associated with this detector for the associated `group_keys`. + Returns a dict keyed by each group_key with the fetched `DetectorStateData`. + If data isn't currently stored, falls back to default values. + """ + group_key_detectors = self.bulk_get_detector_state(group_keys) + dedupe_keys = [self.build_dedupe_value_key(gk) for gk in group_keys] + pipeline = get_redis_client().pipeline() + for dk in dedupe_keys: + pipeline.get(dk) + group_key_dedupe_values = { + gk: int(dv) if dv else 0 for gk, dv in zip(group_keys, pipeline.execute()) + } + pipeline.reset() + counter_updates = {} + if self.counter_names: + counter_keys = [ + self.build_counter_value_key(gk, name) + for gk in group_keys + for name in self.counter_names + ] + for ck in counter_keys: + pipeline.get(ck) + vals = [int(val) if val is not None else val for val in pipeline.execute()] + counter_updates = { + gk: dict(zip(self.counter_names, values)) + for gk, values in zip(group_keys, chunked(vals, len(self.counter_names))) + } + + results = {} + for gk in group_keys: + detector_state = group_key_detectors.get(gk) + results[gk] = DetectorStateData( + group_key=gk, + active=detector_state.active if detector_state else False, + status=( + DetectorPriorityLevel(int(detector_state.state)) + if detector_state + else DetectorPriorityLevel.OK + ), + dedupe_value=group_key_dedupe_values[gk], + counter_updates=counter_updates[gk], + ) + return results + def build_dedupe_value_key(self, group_key: str | None) -> str: if group_key is None: group_key = "" @@ -137,9 +208,16 @@ def build_counter_value_key(self, group_key: str | None, counter_name: str) -> s return f"{self.detector.id}:{group_key}:{counter_name}" def bulk_get_detector_state( - self, state_updates: list[DetectorStateData] + self, group_keys: list[str | None] ) -> dict[str | None, DetectorState]: - group_keys = {update.group_key for update in state_updates} + """ + Bulk fetches detector state for the passed `group_keys`. Returns a dict keyed by each + `group_key` with the fetched `DetectorStateData`. + + If there's no `DetectorState` row for a `detector`/`group_key` pair then we'll exclude + the group_key from the returned dict. + """ + # TODO: Cache this query (or individual fetches, then bulk fetch anything missing) query_filter = Q( detector_group_key__in=[group_key for group_key in group_keys if group_key is not None] ) @@ -179,7 +257,9 @@ def _bulk_commit_redis_state(self, state_updates: list[DetectorStateData]): pipeline.execute() def _bulk_commit_detector_state(self, state_updates: list[DetectorStateData]): - detector_state_lookup = self.bulk_get_detector_state(state_updates) + detector_state_lookup = self.bulk_get_detector_state( + [update.group_key for update in state_updates] + ) created_detector_states = [] updated_detector_states = [] for state_update in state_updates: diff --git a/tests/sentry/workflow_engine/models/test_detector.py b/tests/sentry/workflow_engine/models/test_detector.py index 9725cfb8c93ee6..a50b0817539a01 100644 --- a/tests/sentry/workflow_engine/models/test_detector.py +++ b/tests/sentry/workflow_engine/models/test_detector.py @@ -1,5 +1,6 @@ import unittest +from sentry.testutils.abstract import Abstract from sentry.testutils.cases import TestCase from sentry.workflow_engine.models import DataPacket, DetectorEvaluationResult from sentry.workflow_engine.models.detector import ( @@ -13,9 +14,17 @@ class MockDetectorStateHandler(StatefulDetectorHandler[dict]): + counter_names = ["test1", "test2"] + def evaluate(self, data_packet: DataPacket[dict]) -> list[DetectorEvaluationResult]: return [] + def get_dedupe_value(self, data_packet: DataPacket[dict]) -> int: + return 0 + + def get_group_key_values(self, data_packet: DataPacket[dict]) -> dict[str, int]: + return {} + class TestKeyBuilders(unittest.TestCase): def build_handler(self, detector: Detector | None = None) -> MockDetectorStateHandler: @@ -55,10 +64,57 @@ def test_different_counter_value_keys(self): ) -class TestCommitStateUpdateData(TestCase): - def setUp(self): - super().setUp() +class StatefulDetectorHandlerTestMixin(TestCase): + __test__ = Abstract(__module__, __qualname__) + + def build_handler(self, detector: Detector | None = None) -> MockDetectorStateHandler: + if detector is None: + detector = self.create_detector() + return MockDetectorStateHandler(detector) + + +class TestGetStateData(StatefulDetectorHandlerTestMixin): + def test_new(self): + handler = self.build_handler() + key = "test_key" + assert handler.get_state_data([key]) == { + key: DetectorStateData( + key, False, DetectorPriorityLevel.OK, 0, {"test1": None, "test2": None} + ) + } + + def test_existing(self): + handler = self.build_handler() + key = "test_key" + state_data = DetectorStateData( + key, True, DetectorPriorityLevel.OK, 10, {"test1": 5, "test2": 200} + ) + handler.commit_state_update_data([state_data]) + assert handler.get_state_data([key]) == {key: state_data} + + def test_multi(self): + handler = self.build_handler() + key_1 = "test_key_1" + state_data_1 = DetectorStateData( + key_1, True, DetectorPriorityLevel.OK, 100, {"test1": 50, "test2": 300} + ) + key_2 = "test_key_2" + state_data_2 = DetectorStateData( + key_2, True, DetectorPriorityLevel.OK, 10, {"test1": 55, "test2": 12} + ) + key_uncommitted = "test_key_uncommitted" + state_data_uncommitted = DetectorStateData( + key_uncommitted, False, DetectorPriorityLevel.OK, 0, {"test1": None, "test2": None} + ) + handler.commit_state_update_data([state_data_1, state_data_2]) + assert handler.get_state_data([key_1, key_2, key_uncommitted]) == { + key_1: state_data_1, + key_2: state_data_2, + key_uncommitted: state_data_uncommitted, + } + +class TestCommitStateUpdateData(StatefulDetectorHandlerTestMixin): def build_handler(self, detector: Detector | None = None) -> MockDetectorStateHandler: if detector is None: detector = self.create_detector()