Skip to content

Commit

Permalink
feat(workflow-engine): Add function to fetch state data for group keys (
Browse files Browse the repository at this point in the history
#79657)

This implements the ability to get state data for group keys that is
associated with the detector. This will be used when we process results
and need to check the existing state.
  • Loading branch information
wedamija authored Oct 28, 2024
1 parent f8777a1 commit 0e81905
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 6 deletions.
86 changes: 83 additions & 3 deletions src/sentry/workflow_engine/models/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sentry.issues import grouptype
from sentry.models.owner_base import OwnerModel
from sentry.utils import redis
from sentry.utils.iterators import chunked
from sentry.workflow_engine.models import DataPacket
from sentry.workflow_engine.models.detector_state import DetectorState
from sentry.workflow_engine.types import DetectorPriorityLevel
Expand Down Expand Up @@ -126,6 +127,76 @@ def evaluate(self, data_packet: DataPacket[T]) -> list[DetectorEvaluationResult]


class StatefulDetectorHandler(DetectorHandler[T], abc.ABC):
@property
@abc.abstractmethod
def counter_names(self) -> list[str]:
"""
The names of counters that this detector is going to keep track of.
"""
pass

@abc.abstractmethod
def get_dedupe_value(self, data_packet: DataPacket[T]) -> int:
"""
Extracts the deduplication value from a passed data packet.
TODO: This might belong on the `DataPacket` instead.
"""
pass

@abc.abstractmethod
def get_group_key_values(self, data_packet: DataPacket[T]) -> dict[str, int]:
"""
Extracts the values for all the group keys that exist in the given data packet,
and returns then as a dict keyed by group_key.
"""
pass

def get_state_data(self, group_keys: list[str | None]) -> dict[str | None, DetectorStateData]:
"""
Fetches state data associated with this detector for the associated `group_keys`.
Returns a dict keyed by each group_key with the fetched `DetectorStateData`.
If data isn't currently stored, falls back to default values.
"""
group_key_detectors = self.bulk_get_detector_state(group_keys)
dedupe_keys = [self.build_dedupe_value_key(gk) for gk in group_keys]
pipeline = get_redis_client().pipeline()
for dk in dedupe_keys:
pipeline.get(dk)
group_key_dedupe_values = {
gk: int(dv) if dv else 0 for gk, dv in zip(group_keys, pipeline.execute())
}
pipeline.reset()
counter_updates = {}
if self.counter_names:
counter_keys = [
self.build_counter_value_key(gk, name)
for gk in group_keys
for name in self.counter_names
]
for ck in counter_keys:
pipeline.get(ck)
vals = [int(val) if val is not None else val for val in pipeline.execute()]
counter_updates = {
gk: dict(zip(self.counter_names, values))
for gk, values in zip(group_keys, chunked(vals, len(self.counter_names)))
}

results = {}
for gk in group_keys:
detector_state = group_key_detectors.get(gk)
results[gk] = DetectorStateData(
group_key=gk,
active=detector_state.active if detector_state else False,
status=(
DetectorPriorityLevel(int(detector_state.state))
if detector_state
else DetectorPriorityLevel.OK
),
dedupe_value=group_key_dedupe_values[gk],
counter_updates=counter_updates[gk],
)
return results

def build_dedupe_value_key(self, group_key: str | None) -> str:
if group_key is None:
group_key = ""
Expand All @@ -137,9 +208,16 @@ def build_counter_value_key(self, group_key: str | None, counter_name: str) -> s
return f"{self.detector.id}:{group_key}:{counter_name}"

def bulk_get_detector_state(
self, state_updates: list[DetectorStateData]
self, group_keys: list[str | None]
) -> dict[str | None, DetectorState]:
group_keys = {update.group_key for update in state_updates}
"""
Bulk fetches detector state for the passed `group_keys`. Returns a dict keyed by each
`group_key` with the fetched `DetectorStateData`.
If there's no `DetectorState` row for a `detector`/`group_key` pair then we'll exclude
the group_key from the returned dict.
"""
# TODO: Cache this query (or individual fetches, then bulk fetch anything missing)
query_filter = Q(
detector_group_key__in=[group_key for group_key in group_keys if group_key is not None]
)
Expand Down Expand Up @@ -179,7 +257,9 @@ def _bulk_commit_redis_state(self, state_updates: list[DetectorStateData]):
pipeline.execute()

def _bulk_commit_detector_state(self, state_updates: list[DetectorStateData]):
detector_state_lookup = self.bulk_get_detector_state(state_updates)
detector_state_lookup = self.bulk_get_detector_state(
[update.group_key for update in state_updates]
)
created_detector_states = []
updated_detector_states = []
for state_update in state_updates:
Expand Down
62 changes: 59 additions & 3 deletions tests/sentry/workflow_engine/models/test_detector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import unittest

from sentry.testutils.abstract import Abstract
from sentry.testutils.cases import TestCase
from sentry.workflow_engine.models import DataPacket, DetectorEvaluationResult
from sentry.workflow_engine.models.detector import (
Expand All @@ -13,9 +14,17 @@


class MockDetectorStateHandler(StatefulDetectorHandler[dict]):
counter_names = ["test1", "test2"]

def evaluate(self, data_packet: DataPacket[dict]) -> list[DetectorEvaluationResult]:
return []

def get_dedupe_value(self, data_packet: DataPacket[dict]) -> int:
return 0

def get_group_key_values(self, data_packet: DataPacket[dict]) -> dict[str, int]:
return {}


class TestKeyBuilders(unittest.TestCase):
def build_handler(self, detector: Detector | None = None) -> MockDetectorStateHandler:
Expand Down Expand Up @@ -55,10 +64,57 @@ def test_different_counter_value_keys(self):
)


class TestCommitStateUpdateData(TestCase):
def setUp(self):
super().setUp()
class StatefulDetectorHandlerTestMixin(TestCase):
__test__ = Abstract(__module__, __qualname__)

def build_handler(self, detector: Detector | None = None) -> MockDetectorStateHandler:
if detector is None:
detector = self.create_detector()
return MockDetectorStateHandler(detector)


class TestGetStateData(StatefulDetectorHandlerTestMixin):
def test_new(self):
handler = self.build_handler()
key = "test_key"
assert handler.get_state_data([key]) == {
key: DetectorStateData(
key, False, DetectorPriorityLevel.OK, 0, {"test1": None, "test2": None}
)
}

def test_existing(self):
handler = self.build_handler()
key = "test_key"
state_data = DetectorStateData(
key, True, DetectorPriorityLevel.OK, 10, {"test1": 5, "test2": 200}
)
handler.commit_state_update_data([state_data])
assert handler.get_state_data([key]) == {key: state_data}

def test_multi(self):
handler = self.build_handler()
key_1 = "test_key_1"
state_data_1 = DetectorStateData(
key_1, True, DetectorPriorityLevel.OK, 100, {"test1": 50, "test2": 300}
)
key_2 = "test_key_2"
state_data_2 = DetectorStateData(
key_2, True, DetectorPriorityLevel.OK, 10, {"test1": 55, "test2": 12}
)
key_uncommitted = "test_key_uncommitted"
state_data_uncommitted = DetectorStateData(
key_uncommitted, False, DetectorPriorityLevel.OK, 0, {"test1": None, "test2": None}
)
handler.commit_state_update_data([state_data_1, state_data_2])
assert handler.get_state_data([key_1, key_2, key_uncommitted]) == {
key_1: state_data_1,
key_2: state_data_2,
key_uncommitted: state_data_uncommitted,
}


class TestCommitStateUpdateData(StatefulDetectorHandlerTestMixin):
def build_handler(self, detector: Detector | None = None) -> MockDetectorStateHandler:
if detector is None:
detector = self.create_detector()
Expand Down

0 comments on commit 0e81905

Please sign in to comment.