Skip to content

Commit

Permalink
Issue #115 only collect assets of main subjob for crossbackend job
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 11, 2023
1 parent 56bf3f5 commit f402fa6
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 4 deletions.
3 changes: 2 additions & 1 deletion src/openeo_aggregator/partitionedjobs/crossbackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def split_streaming(
self,
process_graph: FlatPG,
get_replacement: GetReplacementCallable = _default_get_replacement,
main_subgraph_id: SubGraphId = "main",
) -> Iterator[Tuple[SubGraphId, SubJob, List[SubGraphId]]]:
"""
Split given process graph in sub-process graphs and return these as an iterator
Expand Down Expand Up @@ -113,7 +114,7 @@ def split_streaming(
secondary_backends = {b for b in backend_usage if b != primary_backend}
_log.info(f"Backend split: {primary_backend=} {secondary_backends=}")

primary_id = "main"
primary_id = main_subgraph_id
primary_pg = {}
primary_has_load_collection = False
primary_dependencies = []
Expand Down
24 changes: 21 additions & 3 deletions src/openeo_aggregator/partitionedjobs/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from openeo_aggregator.partitionedjobs.zookeeper import ZooKeeperPartitionedJobDB
from openeo_aggregator.utils import _UNSET, Clock, PGWithMetadata, timestamp_to_rfc3339

PJOB_METADATA_FIELD_RESULT_JOBS = "result_jobs"

_log = logging.getLogger(__name__)


Expand Down Expand Up @@ -79,12 +81,16 @@ def create_crossbackend_pjob(
before we have finalised sub-processgraphs, whose metadata can then be persisted in the ZooKeeperPartitionedJobDB
"""
# Start with reserving a new partitioned job id based on initial metadata
main_subgraph_id = "main"
pjob_node_value = self._db.serialize(
user_id=user_id,
created=Clock.time(),
process=process,
metadata=metadata,
job_options=job_options,
**{
PJOB_METADATA_FIELD_RESULT_JOBS: [main_subgraph_id],
},
)
pjob_id = self._db.obtain_new_pjob_id(user_id=user_id, initial_value=pjob_node_value)
self._db.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_INSERTED, create=True)
Expand All @@ -107,7 +113,7 @@ def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
}

for sjob_id, subjob, subjob_dependencies in splitter.split_streaming(
process_graph=process["process_graph"], get_replacement=get_replacement
process_graph=process["process_graph"], get_replacement=get_replacement, main_subgraph_id=main_subgraph_id
):
subjobs[sjob_id] = subjob
dependencies[sjob_id] = subjob_dependencies
Expand Down Expand Up @@ -380,9 +386,21 @@ def get_assets(self, user_id: str, pjob_id: str, flask_request: flask.Request) -
# TODO: do a sync if latest sync is too long ago?
pjob_metadata = self._db.get_pjob_metadata(user_id=user_id, pjob_id=pjob_id)
sjobs = self._db.list_subjobs(user_id=user_id, pjob_id=pjob_id)
if pjob_metadata.get(PJOB_METADATA_FIELD_RESULT_JOBS):
result_jobs = set(pjob_metadata[PJOB_METADATA_FIELD_RESULT_JOBS])
result_sjob_ids = [s for s in sjobs if s in result_jobs]
log_msg = f"Collect {pjob_id} subjob assets: subset {result_sjob_ids} (from {len(sjobs)})"
else:
# Collect results of all subjobs by default
result_sjob_ids = list(sjobs.keys())
log_msg = f"Collect {pjob_id} subjob assets: all {len(sjobs)})"

assets = []
with TimingLogger(title=f"Collect assets of {pjob_id} ({len(sjobs)} sub-jobs)", logger=_log):
for sjob_id, sjob_metadata in sjobs.items():
with TimingLogger(title=log_msg, logger=_log):
for sjob_id in result_sjob_ids:
sjob_metadata = sjobs[sjob_id]

# TODO: skip subjobs that are just dependencies for a main/grouping job
sjob_status = self._db.get_sjob_status(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id)["status"]
if sjob_status in {STATUS_INSERTED, STATUS_CREATED, STATUS_RUNNING}:
raise JobNotFinishedException
Expand Down
76 changes: 76 additions & 0 deletions tests/partitionedjobs/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self, date: str):

@pytest.fixture
def dummy1(backend1, requests_mock) -> DummyBackend:
# TODO: rename this fixture to dummy_backed1
dummy = DummyBackend(requests_mock=requests_mock, backend_url=backend1, job_id_template="1-jb-{i}")
dummy.setup_basic_requests_mocks()
dummy.register_user(bearer_token=TEST_USER_BEARER_TOKEN, user_id=TEST_USER)
Expand Down Expand Up @@ -651,6 +652,7 @@ def test_create_job_simple(self, flask_app, api100, zk_db, dummy1):
"process": {"process_graph": pg},
"metadata": {},
"job_options": {"split_strategy": "crossbackend"},
"result_jobs": ["main"],
}

assert zk_db.get_pjob_status(user_id=TEST_USER, pjob_id=pjob_id) == {
Expand Down Expand Up @@ -722,6 +724,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
"process": {"process_graph": pg},
"metadata": {},
"job_options": {"split_strategy": "crossbackend"},
"result_jobs": ["main"],
}

assert zk_db.get_pjob_status(user_id=TEST_USER, pjob_id=pjob_id) == {
Expand Down Expand Up @@ -797,3 +800,76 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
"result": True,
},
}

@now.mock
def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1):
"""Run the jobs and get results"""
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)

pg = {
"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
"lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}},
"merge": {
"process_id": "merge_cubes",
"arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}},
"result": True,
},
}

res = api100.post(
"/jobs",
json={
"process": {"process_graph": pg},
"job_options": {"split_strategy": "crossbackend"},
},
).assert_status_code(201)

pjob_id = "pj-20220119-123456"
expected_job_id = f"agg-{pjob_id}"
assert res.headers["OpenEO-Identifier"] == expected_job_id

res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
assert res.json == {
"id": expected_job_id,
"process": {"process_graph": pg},
"status": "created",
"created": self.now.rfc3339,
"progress": 0,
}

# start job
api100.post(f"/jobs/{expected_job_id}/results").assert_status_code(202)
dummy1.set_job_status(TEST_USER, "1-jb-0", status="running")
dummy1.set_job_status(TEST_USER, "1-jb-1", status="queued")
res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
assert res.json == DictSubSet({"id": expected_job_id, "status": "running", "progress": 0})

# First job is ready
dummy1.set_job_status(TEST_USER, "1-jb-0", status="finished")
dummy1.setup_assets(job_id=f"1-jb-0", assets=["1-jb-0-result.tif"])
dummy1.set_job_status(TEST_USER, "1-jb-1", status="running")
res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
assert res.json == DictSubSet({"id": expected_job_id, "status": "running", "progress": 50})

# Main job is ready too
dummy1.set_job_status(TEST_USER, "1-jb-1", status="finished")
dummy1.setup_assets(job_id=f"1-jb-1", assets=["1-jb-1-result.tif"])
res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
assert res.json == DictSubSet({"id": expected_job_id, "status": "finished", "progress": 100})

# Get results
res = api100.get(f"/jobs/{expected_job_id}/results").assert_status_code(200)
assert res.json == DictSubSet(
{
"id": expected_job_id,
"assets": {
"main-1-jb-1-result.tif": {
"file:nodata": [None],
"href": "https://b1.test/v1/jobs/1-jb-1/results/1-jb-1-result.tif",
"roles": ["data"],
"title": "main-1-jb-1-result.tif",
"type": "application/octet-stream",
},
},
}
)

0 comments on commit f402fa6

Please sign in to comment.