From 72bc6eab3fe7bcc01f79b5461678d9cb9a08ed4c Mon Sep 17 00:00:00 2001 From: Kevin Grismore <146098880+kevingrismore@users.noreply.github.com> Date: Thu, 22 Feb 2024 17:46:18 -0600 Subject: [PATCH] Enable flow run suspend/resume with Cloud Run workers via unique Job names (#249) * cloud run job unique names * fix populate name test * fix v2 job name test * update test name * add tests * update changelog * slugify job name * slugify tests * correct docstring * remove changelog * make slugify public * formatting ugh --- prefect_gcp/cloud_run.py | 1 + prefect_gcp/utilities.py | 29 ++++++++++++++++++++++++ prefect_gcp/workers/cloud_run.py | 6 ++++- prefect_gcp/workers/cloud_run_v2.py | 26 ++++++++++++---------- tests/test_cloud_run_worker.py | 34 ++++++++++++++++++++++++++++- tests/test_cloud_run_worker_v2.py | 30 ++++++++++++++++++++++++- 6 files changed, 112 insertions(+), 14 deletions(-) create mode 100644 prefect_gcp/utilities.py diff --git a/prefect_gcp/cloud_run.py b/prefect_gcp/cloud_run.py index fd4c0ba7..17fdbc38 100644 --- a/prefect_gcp/cloud_run.py +++ b/prefect_gcp/cloud_run.py @@ -27,6 +27,7 @@ ``` """ + from __future__ import annotations import json diff --git a/prefect_gcp/utilities.py b/prefect_gcp/utilities.py new file mode 100644 index 00000000..2fba9ee7 --- /dev/null +++ b/prefect_gcp/utilities.py @@ -0,0 +1,29 @@ +from typing import Optional + +from slugify import slugify + + +def slugify_name(name: str, max_length: int = 30) -> Optional[str]: + """ + Slugify text for use as a name. + + Keeps only alphanumeric characters and dashes, and caps the length + of the slug at 30 chars. + + The 30 character length allows room to add a uuid for generating a unique + name for the job while keeping the total length of a name below 63 characters, + which is the limit for Cloud Run job names. + + Args: + name: The name of the job + + Returns: + The slugified job name or None if the slugified name is empty + """ + slug = slugify( + name, + max_length=max_length, + regex_pattern=r"[^a-zA-Z0-9-]+", + ) + + return slug if slug else None diff --git a/prefect_gcp/workers/cloud_run.py b/prefect_gcp/workers/cloud_run.py index e1f0263b..ebfb9a3f 100644 --- a/prefect_gcp/workers/cloud_run.py +++ b/prefect_gcp/workers/cloud_run.py @@ -145,6 +145,7 @@ import shlex import time from typing import TYPE_CHECKING, Any, Dict, Optional +from uuid import uuid4 import anyio import googleapiclient @@ -172,6 +173,7 @@ from prefect_gcp.cloud_run import Execution, Job from prefect_gcp.credentials import GcpCredentials +from prefect_gcp.utilities import slugify_name if TYPE_CHECKING: from prefect.client.schemas import FlowRun @@ -338,7 +340,9 @@ def _populate_name_if_not_present(self): """Adds the flow run name to the job if one is not already provided.""" try: if "name" not in self.job_body["metadata"]: - self.job_body["metadata"]["name"] = self.name + base_job_name = slugify_name(self.name) + job_name = f"{base_job_name}-{uuid4().hex}" + self.job_body["metadata"]["name"] = job_name except KeyError: raise ValueError("Unable to verify name due to invalid job body template.") diff --git a/prefect_gcp/workers/cloud_run_v2.py b/prefect_gcp/workers/cloud_run_v2.py index 4906ab55..6237bede 100644 --- a/prefect_gcp/workers/cloud_run_v2.py +++ b/prefect_gcp/workers/cloud_run_v2.py @@ -2,6 +2,7 @@ import shlex import time from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional +from uuid import uuid4 from anyio.abc import TaskStatus from google.api_core.client_options import ClientOptions @@ -24,12 +25,13 @@ from pydantic import VERSION as PYDANTIC_VERSION if PYDANTIC_VERSION.startswith("2."): - from pydantic.v1 import Field, validator + from pydantic.v1 import Field, PrivateAttr, validator else: - from pydantic import Field, validator + from pydantic import Field, validator, PrivateAttr from prefect_gcp.credentials import GcpCredentials from prefect_gcp.models.cloud_run_v2 import CloudRunJobV2Result, ExecutionV2, JobV2 +from prefect_gcp.utilities import slugify_name if TYPE_CHECKING: from prefect.client.schemas import FlowRun @@ -125,6 +127,7 @@ class CloudRunWorkerJobV2Configuration(BaseJobConfiguration): "complete before raising an exception." ), ) + _job_name: str = PrivateAttr(default=None) @property def project(self) -> str: @@ -137,18 +140,19 @@ def project(self) -> str: return self.credentials.project @property - def job_name(self): + def job_name(self) -> str: """ - Returns the job name, if it does not exist, it creates it. - """ - pre_trim_cr_job_name = f"prefect-{self.name}" - - if len(pre_trim_cr_job_name) > 40: - pre_trim_cr_job_name = pre_trim_cr_job_name[:40] + Returns the name of the job. - pre_trim_cr_job_name = pre_trim_cr_job_name.rstrip("-") + Returns: + str: The name of the job. + """ + if self._job_name is None: + base_job_name = slugify_name(self.name) + job_name = f"{base_job_name}-{uuid4().hex}" + self._job_name = job_name - return pre_trim_cr_job_name + return self._job_name def prepare_for_flow_run( self, diff --git a/tests/test_cloud_run_worker.py b/tests/test_cloud_run_worker.py index 6d5fa742..7c62514a 100644 --- a/tests/test_cloud_run_worker.py +++ b/tests/test_cloud_run_worker.py @@ -17,6 +17,7 @@ from prefect.server.schemas.actions import DeploymentCreate from prefect_gcp.credentials import GcpCredentials +from prefect_gcp.utilities import slugify_name from prefect_gcp.workers.cloud_run import ( CloudRunWorker, CloudRunWorkerJobConfiguration, @@ -68,11 +69,28 @@ def cloud_run_worker_job_config(service_account_info, jobs_body): ) +@pytest.fixture +def cloud_run_worker_job_config_noncompliant_name(service_account_info, jobs_body): + return CloudRunWorkerJobConfiguration( + name="MY_JOB_NAME", + image="gcr.io//not-a/real-image", + region="middle-earth2", + job_body=jobs_body, + credentials=GcpCredentials(service_account_info=service_account_info), + ) + + class TestCloudRunWorkerJobConfiguration: def test_job_name(self, cloud_run_worker_job_config): cloud_run_worker_job_config.job_body["metadata"]["name"] = "my-job-name" assert cloud_run_worker_job_config.job_name == "my-job-name" + def test_job_name_is_slug(self, cloud_run_worker_job_config_noncompliant_name): + cloud_run_worker_job_config_noncompliant_name._populate_name_if_not_present() + assert cloud_run_worker_job_config_noncompliant_name.job_name[ + :-33 + ] == slugify_name("MY_JOB_NAME") + def test_populate_envs( self, cloud_run_worker_job_config, @@ -124,7 +142,21 @@ def test_populate_name_if_not_present(self, cloud_run_worker_job_config): cloud_run_worker_job_config._populate_name_if_not_present() assert "name" in metadata - assert metadata["name"] == cloud_run_worker_job_config.name + assert metadata["name"][:-33] == cloud_run_worker_job_config.name + + def test_job_name_different_after_retry(self, cloud_run_worker_job_config): + metadata = cloud_run_worker_job_config.job_body["metadata"] + + assert "name" not in metadata + + cloud_run_worker_job_config._populate_name_if_not_present() + job_name_1 = metadata.pop("name") + + cloud_run_worker_job_config._populate_name_if_not_present() + job_name_2 = metadata.pop("name") + + assert job_name_1[:-33] == job_name_2[:-33] + assert job_name_1 != job_name_2 def test_populate_or_format_command_doesnt_exist(self, cloud_run_worker_job_config): container = cloud_run_worker_job_config.job_body["spec"]["template"]["spec"][ diff --git a/tests/test_cloud_run_worker_v2.py b/tests/test_cloud_run_worker_v2.py index 23468d93..e7025ed8 100644 --- a/tests/test_cloud_run_worker_v2.py +++ b/tests/test_cloud_run_worker_v2.py @@ -2,6 +2,7 @@ from prefect.utilities.dockerutils import get_prefect_image_name from prefect_gcp.credentials import GcpCredentials +from prefect_gcp.utilities import slugify_name from prefect_gcp.workers.cloud_run_v2 import CloudRunWorkerJobV2Configuration @@ -44,12 +45,39 @@ def cloud_run_worker_v2_job_config(service_account_info, job_body): ) +@pytest.fixture +def cloud_run_worker_v2_job_config_noncompliant_name(service_account_info, job_body): + return CloudRunWorkerJobV2Configuration( + name="MY_JOB_NAME", + job_body=job_body, + credentials=GcpCredentials(service_account_info=service_account_info), + region="us-central1", + timeout=86400, + env={"ENV1": "VALUE1", "ENV2": "VALUE2"}, + ) + + class TestCloudRunWorkerJobV2Configuration: def test_project(self, cloud_run_worker_v2_job_config): assert cloud_run_worker_v2_job_config.project == "my_project" def test_job_name(self, cloud_run_worker_v2_job_config): - assert cloud_run_worker_v2_job_config.job_name == "prefect-my-job-name" + assert cloud_run_worker_v2_job_config.job_name[:-33] == "my-job-name" + + def test_job_name_is_slug(self, cloud_run_worker_v2_job_config_noncompliant_name): + assert cloud_run_worker_v2_job_config_noncompliant_name.job_name[ + :-33 + ] == slugify_name("MY_JOB_NAME") + + def test_job_name_different_after_retry(self, cloud_run_worker_v2_job_config): + job_name_1 = cloud_run_worker_v2_job_config.job_name + + cloud_run_worker_v2_job_config._job_name = None + + job_name_2 = cloud_run_worker_v2_job_config.job_name + + assert job_name_1[:-33] == job_name_2[:-33] + assert job_name_1 != job_name_2 def test_populate_timeout(self, cloud_run_worker_v2_job_config): cloud_run_worker_v2_job_config._populate_timeout()