Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Enable flow run suspend/resume with Cloud Run workers via unique Job …
Browse files Browse the repository at this point in the history
…names (#249)

* cloud run job unique names

* fix populate name test

* fix v2 job name test

* update test name

* add tests

* update changelog

* slugify job name

* slugify tests

* correct docstring

* remove changelog

* make slugify public

* formatting ugh
  • Loading branch information
kevingrismore authored Feb 22, 2024
1 parent 6bc4283 commit 72bc6ea
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 14 deletions.
1 change: 1 addition & 0 deletions prefect_gcp/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
```
"""

from __future__ import annotations

import json
Expand Down
29 changes: 29 additions & 0 deletions prefect_gcp/utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Optional

from slugify import slugify


def slugify_name(name: str, max_length: int = 30) -> Optional[str]:
"""
Slugify text for use as a name.
Keeps only alphanumeric characters and dashes, and caps the length
of the slug at 30 chars.
The 30 character length allows room to add a uuid for generating a unique
name for the job while keeping the total length of a name below 63 characters,
which is the limit for Cloud Run job names.
Args:
name: The name of the job
Returns:
The slugified job name or None if the slugified name is empty
"""
slug = slugify(
name,
max_length=max_length,
regex_pattern=r"[^a-zA-Z0-9-]+",
)

return slug if slug else None
6 changes: 5 additions & 1 deletion prefect_gcp/workers/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import shlex
import time
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import uuid4

import anyio
import googleapiclient
Expand Down Expand Up @@ -172,6 +173,7 @@

from prefect_gcp.cloud_run import Execution, Job
from prefect_gcp.credentials import GcpCredentials
from prefect_gcp.utilities import slugify_name

if TYPE_CHECKING:
from prefect.client.schemas import FlowRun
Expand Down Expand Up @@ -338,7 +340,9 @@ def _populate_name_if_not_present(self):
"""Adds the flow run name to the job if one is not already provided."""
try:
if "name" not in self.job_body["metadata"]:
self.job_body["metadata"]["name"] = self.name
base_job_name = slugify_name(self.name)
job_name = f"{base_job_name}-{uuid4().hex}"
self.job_body["metadata"]["name"] = job_name
except KeyError:
raise ValueError("Unable to verify name due to invalid job body template.")

Expand Down
26 changes: 15 additions & 11 deletions prefect_gcp/workers/cloud_run_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import shlex
import time
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional
from uuid import uuid4

from anyio.abc import TaskStatus
from google.api_core.client_options import ClientOptions
Expand All @@ -24,12 +25,13 @@
from pydantic import VERSION as PYDANTIC_VERSION

if PYDANTIC_VERSION.startswith("2."):
from pydantic.v1 import Field, validator
from pydantic.v1 import Field, PrivateAttr, validator
else:
from pydantic import Field, validator
from pydantic import Field, validator, PrivateAttr

from prefect_gcp.credentials import GcpCredentials
from prefect_gcp.models.cloud_run_v2 import CloudRunJobV2Result, ExecutionV2, JobV2
from prefect_gcp.utilities import slugify_name

if TYPE_CHECKING:
from prefect.client.schemas import FlowRun
Expand Down Expand Up @@ -125,6 +127,7 @@ class CloudRunWorkerJobV2Configuration(BaseJobConfiguration):
"complete before raising an exception."
),
)
_job_name: str = PrivateAttr(default=None)

@property
def project(self) -> str:
Expand All @@ -137,18 +140,19 @@ def project(self) -> str:
return self.credentials.project

@property
def job_name(self):
def job_name(self) -> str:
"""
Returns the job name, if it does not exist, it creates it.
"""
pre_trim_cr_job_name = f"prefect-{self.name}"

if len(pre_trim_cr_job_name) > 40:
pre_trim_cr_job_name = pre_trim_cr_job_name[:40]
Returns the name of the job.
pre_trim_cr_job_name = pre_trim_cr_job_name.rstrip("-")
Returns:
str: The name of the job.
"""
if self._job_name is None:
base_job_name = slugify_name(self.name)
job_name = f"{base_job_name}-{uuid4().hex}"
self._job_name = job_name

return pre_trim_cr_job_name
return self._job_name

def prepare_for_flow_run(
self,
Expand Down
34 changes: 33 additions & 1 deletion tests/test_cloud_run_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from prefect.server.schemas.actions import DeploymentCreate

from prefect_gcp.credentials import GcpCredentials
from prefect_gcp.utilities import slugify_name
from prefect_gcp.workers.cloud_run import (
CloudRunWorker,
CloudRunWorkerJobConfiguration,
Expand Down Expand Up @@ -68,11 +69,28 @@ def cloud_run_worker_job_config(service_account_info, jobs_body):
)


@pytest.fixture
def cloud_run_worker_job_config_noncompliant_name(service_account_info, jobs_body):
return CloudRunWorkerJobConfiguration(
name="MY_JOB_NAME",
image="gcr.io//not-a/real-image",
region="middle-earth2",
job_body=jobs_body,
credentials=GcpCredentials(service_account_info=service_account_info),
)


class TestCloudRunWorkerJobConfiguration:
def test_job_name(self, cloud_run_worker_job_config):
cloud_run_worker_job_config.job_body["metadata"]["name"] = "my-job-name"
assert cloud_run_worker_job_config.job_name == "my-job-name"

def test_job_name_is_slug(self, cloud_run_worker_job_config_noncompliant_name):
cloud_run_worker_job_config_noncompliant_name._populate_name_if_not_present()
assert cloud_run_worker_job_config_noncompliant_name.job_name[
:-33
] == slugify_name("MY_JOB_NAME")

def test_populate_envs(
self,
cloud_run_worker_job_config,
Expand Down Expand Up @@ -124,7 +142,21 @@ def test_populate_name_if_not_present(self, cloud_run_worker_job_config):
cloud_run_worker_job_config._populate_name_if_not_present()

assert "name" in metadata
assert metadata["name"] == cloud_run_worker_job_config.name
assert metadata["name"][:-33] == cloud_run_worker_job_config.name

def test_job_name_different_after_retry(self, cloud_run_worker_job_config):
metadata = cloud_run_worker_job_config.job_body["metadata"]

assert "name" not in metadata

cloud_run_worker_job_config._populate_name_if_not_present()
job_name_1 = metadata.pop("name")

cloud_run_worker_job_config._populate_name_if_not_present()
job_name_2 = metadata.pop("name")

assert job_name_1[:-33] == job_name_2[:-33]
assert job_name_1 != job_name_2

def test_populate_or_format_command_doesnt_exist(self, cloud_run_worker_job_config):
container = cloud_run_worker_job_config.job_body["spec"]["template"]["spec"][
Expand Down
30 changes: 29 additions & 1 deletion tests/test_cloud_run_worker_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from prefect.utilities.dockerutils import get_prefect_image_name

from prefect_gcp.credentials import GcpCredentials
from prefect_gcp.utilities import slugify_name
from prefect_gcp.workers.cloud_run_v2 import CloudRunWorkerJobV2Configuration


Expand Down Expand Up @@ -44,12 +45,39 @@ def cloud_run_worker_v2_job_config(service_account_info, job_body):
)


@pytest.fixture
def cloud_run_worker_v2_job_config_noncompliant_name(service_account_info, job_body):
return CloudRunWorkerJobV2Configuration(
name="MY_JOB_NAME",
job_body=job_body,
credentials=GcpCredentials(service_account_info=service_account_info),
region="us-central1",
timeout=86400,
env={"ENV1": "VALUE1", "ENV2": "VALUE2"},
)


class TestCloudRunWorkerJobV2Configuration:
def test_project(self, cloud_run_worker_v2_job_config):
assert cloud_run_worker_v2_job_config.project == "my_project"

def test_job_name(self, cloud_run_worker_v2_job_config):
assert cloud_run_worker_v2_job_config.job_name == "prefect-my-job-name"
assert cloud_run_worker_v2_job_config.job_name[:-33] == "my-job-name"

def test_job_name_is_slug(self, cloud_run_worker_v2_job_config_noncompliant_name):
assert cloud_run_worker_v2_job_config_noncompliant_name.job_name[
:-33
] == slugify_name("MY_JOB_NAME")

def test_job_name_different_after_retry(self, cloud_run_worker_v2_job_config):
job_name_1 = cloud_run_worker_v2_job_config.job_name

cloud_run_worker_v2_job_config._job_name = None

job_name_2 = cloud_run_worker_v2_job_config.job_name

assert job_name_1[:-33] == job_name_2[:-33]
assert job_name_1 != job_name_2

def test_populate_timeout(self, cloud_run_worker_v2_job_config):
cloud_run_worker_v2_job_config._populate_timeout()
Expand Down

0 comments on commit 72bc6ea

Please sign in to comment.