Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unify models to one model instead of 3 separate ones #176

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
9 changes: 9 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## v2.2.0 🌈

### 🚀 Features

- Created a new `Task` model representing all kind of scheduled tasks.
- In future versions, `CronTask`, `ScheduledTask` and `RepeatableTask` will be removed.
- `Task` model has a `task_type` field to differentiate between the types of tasks.
- Old tasks in the database will be migrated to the new `Task` model automatically.

## v2.1.1 🌈

### 🐛 Bug Fixes
Expand Down
13 changes: 6 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "django-tasks-scheduler"
packages = [
{ include = "scheduler" },
]
version = "2.1.1"
version = "2.2.0"
description = "An async job scheduler for django using redis/valkey brokers"
readme = "README.md"
keywords = ["redis", "valkey", "django", "background-jobs", "job-queue", "task-queue", "redis-queue", "scheduled-jobs"]
Expand Down Expand Up @@ -47,7 +47,7 @@ croniter = ">=2.0"
click = "^8.1"
rq = "^1.16"
pyyaml = { version = "^6.0", optional = true }
valkey = { version = "^6.0.2", optional = true}
valkey = "6.0.1"

[tool.poetry.dev-dependencies]
poetry = "^1.8.3"
Expand All @@ -60,7 +60,6 @@ freezegun = "^1.5"

[tool.poetry.extras]
yaml = ["pyyaml"]
valkey = ["valkey"]

[tool.flake8]
max-line-length = 120
Expand Down
3 changes: 2 additions & 1 deletion scheduler/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .task_models import TaskAdmin # noqa: F401
from .task_models import TaskAdmin as OldTaskAdmin # noqa: F401
from .ephemeral_models import QueueAdmin, WorkerAdmin # noqa: F401
from .task_admin import TaskAdmin # noqa: F401
179 changes: 179 additions & 0 deletions scheduler/admin/task_admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import redis
import valkey
from django.contrib import admin, messages
from django.contrib.contenttypes.admin import GenericStackedInline
from django.utils.translation import gettext_lazy as _

from scheduler import tools
from scheduler.models import TaskArg, TaskKwarg, Task
from scheduler.settings import SCHEDULER_CONFIG, logger
from scheduler.tools import get_job_executions_for_task


class HiddenMixin(object):
class Media:
js = ("admin/js/jquery.init.js",)


class JobArgInline(HiddenMixin, GenericStackedInline):
model = TaskArg
extra = 0
fieldsets = ((None, dict(fields=("arg_type", "val"))),)


class JobKwargInline(HiddenMixin, GenericStackedInline):
model = TaskKwarg
extra = 0
fieldsets = ((None, dict(fields=("key", ("arg_type", "val")))),)


@admin.register(Task)
class TaskAdmin(admin.ModelAdmin):
"""TaskAdmin admin view for all task models."""

class Media:
js = (
"admin/js/jquery.init.js",
"admin/js/select-fields.js",
)

save_on_top = True
change_form_template = "admin/scheduler/change_form.html"
actions = [
"disable_selected",
"enable_selected",
"enqueue_job_now",
]
inlines = [
JobArgInline,
JobKwargInline,
]
list_filter = ("enabled",)
list_display = (
"enabled",
"name",
"job_id",
"function_string",
"is_scheduled",
"queue",
"scheduled_time",
"interval_display",
"cron_string",
"next_run",
"successful_runs",
"last_successful_run",
"failed_runs",
"last_failed_run",
)
list_display_links = ("name",)
readonly_fields = (
"job_id",
"successful_runs",
"last_successful_run",
"failed_runs",
"last_failed_run",
)
# radio_fields = {"task_type": admin.HORIZONTAL}
fieldsets = (
(
None,
dict(
fields=(
"name",
"callable",
"task_type",
("enabled", "timeout", "result_ttl"),
)
),
),
(
None,
dict(fields=("scheduled_time",), classes=("tasktype-OnceTask",)),
),
(
None,
dict(fields=("cron_string",), classes=("tasktype-CronTask",)),
),
(
None,
dict(fields=("interval", "interval_unit", "repeat"), classes=("tasktype-RepeatableTask",)),
),
(_("RQ Settings"), dict(fields=(("queue", "at_front"), "job_id"))),
(
_("Previous runs info"),
dict(fields=(("successful_runs", "last_successful_run"), ("failed_runs", "last_failed_run"))),
),
)

@admin.display(description="Next run")
def next_run(self, o: Task):
return tools.get_next_cron_time(o.cron_string)

def change_view(self, request, object_id, form_url="", extra_context=None):
extra = extra_context or {}
obj = self.get_object(request, object_id)
try:
execution_list = get_job_executions_for_task(obj.queue, obj)
except (redis.ConnectionError, valkey.ConnectionError) as e:
logger.warn(f"Could not get job executions: {e}")
execution_list = list()
paginator = self.get_paginator(request, execution_list, SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE)
page_number = request.GET.get("p", 1)
page_obj = paginator.get_page(page_number)
page_range = paginator.get_elided_page_range(page_obj.number)

extra.update(
{
"pagination_required": paginator.count > SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE,
"executions": page_obj,
"page_range": page_range,
"page_var": "p",
}
)

return super(TaskAdmin, self).change_view(request, object_id, form_url, extra_context=extra)

def delete_queryset(self, request, queryset):
for job in queryset:
job.unschedule()
super(TaskAdmin, self).delete_queryset(request, queryset)

def delete_model(self, request, obj):
obj.unschedule()
super(TaskAdmin, self).delete_model(request, obj)

@admin.action(description=_("Disable selected %(verbose_name_plural)s"), permissions=("change",))
def disable_selected(self, request, queryset):
rows_updated = 0
for obj in queryset.filter(enabled=True).iterator():
obj.enabled = False
obj.unschedule()
rows_updated += 1

message_bit = "1 job was" if rows_updated == 1 else f"{rows_updated} jobs were"

level = messages.WARNING if not rows_updated else messages.INFO
self.message_user(request, f"{message_bit} successfully disabled and unscheduled.", level=level)

@admin.action(description=_("Enable selected %(verbose_name_plural)s"), permissions=("change",))
def enable_selected(self, request, queryset):
rows_updated = 0
for obj in queryset.filter(enabled=False).iterator():
obj.enabled = True
obj.save()
rows_updated += 1

message_bit = "1 job was" if rows_updated == 1 else f"{rows_updated} jobs were"
level = messages.WARNING if not rows_updated else messages.INFO
self.message_user(request, f"{message_bit} successfully enabled and scheduled.", level=level)

@admin.action(description="Enqueue now", permissions=("change",))
def enqueue_job_now(self, request, queryset):
task_names = []
for task in queryset:
task.enqueue_to_run()
task_names.append(task.name)
self.message_user(
request,
f"The following jobs have been enqueued: {', '.join(task_names)}",
)
2 changes: 1 addition & 1 deletion scheduler/management/commands/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def handle(self, *args, **options):
if options.get("format") == "json":
import json

click.echo(json.dumps(res, indent=2), file=file)
click.echo(json.dumps(res, indent=2, default=str), file=file)
return

if options.get("format") == "yaml":
Expand Down
34 changes: 24 additions & 10 deletions scheduler/management/commands/import.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from django.core.management.base import BaseCommand
from django.utils import timezone

from scheduler.models import TaskArg, TaskKwarg
from scheduler.models import TaskArg, TaskKwarg, Task
from scheduler.models.task import TaskType
from scheduler.tools import MODEL_NAMES


Expand All @@ -18,17 +19,30 @@ def job_model_str(model_str: str) -> str:
return model_str


def create_job_from_dict(job_dict: Dict[str, Any], update):
model = apps.get_model(app_label="scheduler", model_name=job_model_str(job_dict["model"]))
existing_job = model.objects.filter(name=job_dict["name"]).first()
def get_task_type(model_str: str) -> TaskType:
model_str = job_model_str(model_str)
if model_str not in MODEL_NAMES:
raise ValueError(f"Invalid model {model_str}")
if model_str == "CronTask":
return TaskType.CRON
elif model_str == "RepeatableTask":
return TaskType.REPEATABLE
elif model_str == "ScheduledTask":
return TaskType.ONCE


def create_task_from_dict(task_dict: Dict[str, Any], update):
existing_job = Task.objects.filter(name=task_dict["name"]).first()
task_type = get_task_type(task_dict["model"])
if existing_job:
if update:
click.echo(f'Found existing job "{existing_job}, removing it to be reinserted"')
existing_job.delete()
else:
click.echo(f'Found existing job "{existing_job}", skipping')
return
kwargs = dict(job_dict)
kwargs = dict(task_dict)
kwargs["task_type"] = task_type
del kwargs["model"]
del kwargs["callable_args"]
del kwargs["callable_kwargs"]
Expand All @@ -37,21 +51,21 @@ def create_job_from_dict(job_dict: Dict[str, Any], update):
if not settings.USE_TZ and not timezone.is_naive(target):
target = timezone.make_naive(target)
kwargs["scheduled_time"] = target
model_fields = set(map(lambda field: field.attname, model._meta.get_fields()))
model_fields = set(map(lambda field: field.attname, Task._meta.get_fields()))
keys_to_ignore = list(filter(lambda _k: _k not in model_fields, kwargs.keys()))
for k in keys_to_ignore:
del kwargs[k]
scheduled_job = model.objects.create(**kwargs)
scheduled_job = Task.objects.create(**kwargs)
click.echo(f"Created job {scheduled_job}")
content_type = ContentType.objects.get_for_model(scheduled_job)

for arg in job_dict["callable_args"]:
for arg in task_dict["callable_args"]:
TaskArg.objects.create(
content_type=content_type,
object_id=scheduled_job.id,
**arg,
)
for arg in job_dict["callable_kwargs"]:
for arg in task_dict["callable_kwargs"]:
TaskKwarg.objects.create(
content_type=content_type,
object_id=scheduled_job.id,
Expand Down Expand Up @@ -125,4 +139,4 @@ def handle(self, *args, **options):
model.objects.all().delete()

for job in jobs:
create_job_from_dict(job, update=options.get("update"))
create_task_from_dict(job, update=options.get("update"))
5 changes: 1 addition & 4 deletions scheduler/management/commands/rqworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,7 @@ def handle(self, **options):

try:
# Instantiate a worker
w = create_worker(
*queues,
**init_options
)
w = create_worker(*queues, **init_options)

# Close any opened DB connection before any fork
reset_db_connections()
Expand Down
Loading
Loading