Skip to content

Commit

Permalink
Add taskworker registry
Browse files Browse the repository at this point in the history
  • Loading branch information
john-z-yang committed Oct 24, 2024
1 parent 4862dc2 commit 020fb98
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 0 deletions.
151 changes: 151 additions & 0 deletions src/sentry/taskworker/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from __future__ import annotations

import logging
import time
from collections.abc import Mapping
from datetime import timedelta
from typing import Any
from uuid import uuid4

import orjson
from arroyo.backends.kafka import KafkaPayload, KafkaProducer
from arroyo.types import Topic as ArroyoTopic
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.sentry.v1alpha.taskworker_pb2 import RetryState, TaskActivation

from sentry.conf.types.kafka_definition import Topic
from sentry.taskworker.retry import FALLBACK_RETRY, Retry
from sentry.taskworker.task import Task
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

logger = logging.getLogger(__name__)


class TaskNamespace:
"""
Task namespaces link topics, config and default retry mechanics together
All tasks within a namespace are stored in the same topic and run by shared
worker pool.
"""

__registered_tasks: dict[str, Task]
__producer: KafkaProducer | None = None

def __init__(self, name: str, topic: str, deadletter_topic: str, retry: Retry | None):
self.name = name
self.topic = topic
self.deadletter_topic = deadletter_topic
self.default_retry = retry
self.__registered_tasks = {}

@property
def producer(self) -> KafkaProducer:
if self.__producer:
return self.__producer
cluster_name = get_topic_definition(Topic.HACKWEEK)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
self.__producer = KafkaProducer(producer_config)

return self.__producer

def get(self, name: str) -> Task:
if name not in self.__registered_tasks:
raise KeyError(f"No task registered with the name {name}. Check your imports")
return self.__registered_tasks[name]

def contains(self, name: str) -> bool:
return name in self.__registered_tasks

def register(
self,
name: str,
idempotent: bool | None = None,
deadline: timedelta | int | None = None,
retry: Retry | None = None,
):
"""register a task, used as a decorator"""

def wrapped(func):
task = Task(
name=name,
func=func,
namespace=self,
idempotent=idempotent,
deadline=deadline,
retry=retry,
)
# TODO tasks should be registered into the registry
# so that we can ensure task names are globally unique
self.__registered_tasks[name] = task
return task

return wrapped

def retry_task(self, taskdata: TaskActivation) -> None:
self.producer.produce(
ArroyoTopic(name=self.topic),
KafkaPayload(key=None, value=taskdata.SerializeToString(), headers=[]),
)

def send_task(self, task: Task, args, kwargs) -> None:
task_message = self._serialize_task_call(task, args, kwargs)
# TODO this could use an RPC instead of appending to the topic directly
# TODO callback handling
self.producer.produce(
ArroyoTopic(name=self.topic),
KafkaPayload(key=None, value=task_message, headers=[]),
)

def _serialize_task_call(self, task: Task, args: list[Any], kwargs: Mapping[Any, Any]) -> bytes:
retry = task.retry or self.default_retry or FALLBACK_RETRY

retry_state = RetryState(
attempts=retry.initial_state().attempts,
kind=retry.initial_state().kind,
discard_after_attempt=retry.initial_state().discard_after_attempt,
deadletter_after_attempt=retry.initial_state().deadletter_after_attempt,
)
pending_task_payload = TaskActivation(
id=uuid4().hex,
namespace=self.name,
taskname=task.name,
parameters=orjson.dumps({"args": args, "kwargs": kwargs}),
retry_state=retry_state,
received_at=Timestamp(seconds=int(time.time())),
).SerializeToString()

return pending_task_payload


class TaskRegistry:
"""Registry of all namespaces"""

__namespaces: dict[str, TaskNamespace]

def __init__(self):
self.__namespaces = {}

def get(self, name: str) -> TaskNamespace:
if name not in self.__namespaces:
raise KeyError(f"No task namespace with the name {name}")
return self.__namespaces[name]

def get_task(self, namespace: str, task: str) -> Task:
return self.get(namespace).get(task)

def create_namespace(
self, name: str, topic: str, deadletter_topic: str, retry: Retry | None = None
):
# TODO So much
# - validate topic names
# - validate deadletter topic
# - do topic : cluster resolution
namespace = TaskNamespace(
name=name, topic=topic, deadletter_topic=deadletter_topic, retry=retry
)
self.__namespaces[name] = namespace

return namespace


taskregistry = TaskRegistry()
55 changes: 55 additions & 0 deletions src/sentry/taskworker/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations

from collections.abc import Sequence
from multiprocessing.context import TimeoutError

from sentry_protos.sentry.v1alpha.taskworker_pb2 import RetryState


class Retry:
__times: int
__on: Sequence[type] | None
__ignore: Sequence[type] | None
__deadletter: bool | None
__discard: bool | None

"""Used with tasks to define the retry policy for a task"""

def __init__(
self,
times: int = 1,
on: Sequence[type] | None = None,
ignore: Sequence[type] | None = None,
deadletter: bool | None = None,
discard: bool | None = None,
):
self.__times = times
self.__on = on
self.__ignore = ignore
self.__deadletter = deadletter
self.__discard = discard

def should_retry(self, state: RetryState, exc: Exception) -> bool:
# No more attempts left
if state.attempts >= self.__times:
return False
# No retries for types on the ignore list
if self.__ignore and isinstance(exc, self.__ignore):
return False
# In the retry allow list or processing deadline is exceeded
# When processing deadline is exceeded, the subprocess raises a TimeoutError
if (self.__on and isinstance(exc, self.__on)) or isinstance(exc, TimeoutError):
return True
# TODO add logging/assertion for no funny business
return False

def initial_state(self) -> RetryState:
return RetryState(
attempts=0,
discard_after_attempt=self.__times if self.__discard else None,
deadletter_after_attempt=self.__times if self.__deadletter else None,
kind="sentry.taskworker.retry.Retry",
)


FALLBACK_RETRY = Retry(times=3, deadletter=True)
77 changes: 77 additions & 0 deletions src/sentry/taskworker/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from __future__ import annotations

from collections.abc import Callable
from datetime import timedelta
from functools import update_wrapper
from typing import TYPE_CHECKING, Any

from django.utils import timezone

if TYPE_CHECKING:
from sentry.taskworker.registry import TaskNamespace
from sentry.taskworker.retry import Retry, RetryState


class Task:
name: str
__func: Callable[..., Any]
__namespace: TaskNamespace
__idempotent: bool | None
__deadline: timedelta | int | None

def __init__(
self,
name: str,
func: Callable[..., Any],
namespace: TaskNamespace,
retry: Retry | None,
idempotent: bool | None = None,
deadline: timedelta | int | None = None,
):
self.name = name
self.__func = func
self.__namespace = namespace
self.__retry = retry
self.__idempotent = idempotent
self.__deadline = deadline
update_wrapper(self, func)

@property
def retry(self) -> Retry | None:
return self.__retry

@property
def idempotent(self) -> bool:
return self.__idempotent or False

@property
def deadline_timestamp(self) -> int | None:
# TODO add namespace/default deadlines
if not self.__deadline:
return None
if isinstance(self.__deadline, int):
return int(timezone.now().timestamp() + self.__deadline)
if isinstance(self.__deadline, timedelta):
return int(timezone.now().timestamp() + self.__deadline.total_seconds())
raise ValueError(f"unknown type for Task.deadline {self.__deadline}")

def __call__(self, *args, **kwargs) -> None:
return self.__func(*args, **kwargs)

def delay(self, *args, **kwargs):
return self.apply_async(*args, **kwargs)

def apply_async(self, *args, **kwargs):
from django.conf import settings

if settings.TASK_WORKER_ALWAYS_EAGER:
self.__func(*args, **kwargs)
else:
self.__namespace.send_task(self, args, kwargs)

def should_retry(self, state: RetryState, exc: Exception) -> bool:
# No retry policy means no retries.
retry = self.retry
if not retry:
return False
return retry.should_retry(state, exc)

0 comments on commit 020fb98

Please sign in to comment.