From eab0a220ef7864a8f418386fd61aeee5d03c48c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Wed, 16 Oct 2024 19:06:09 +0200 Subject: [PATCH 1/3] Added needed machinery to debug in an effective way workflow materialization codepaths --- wfexs_backend/__main__.py | 53 ++++++++++++++++++++++++++++++++-- wfexs_backend/wfexs_backend.py | 47 +++++++++++++++++++++--------- wfexs_backend/workflow.py | 53 ++++++++++++++++++++++++++++++++++ 3 files changed, 137 insertions(+), 16 deletions(-) diff --git a/wfexs_backend/__main__.py b/wfexs_backend/__main__.py index a38fbef..b781409 100644 --- a/wfexs_backend/__main__.py +++ b/wfexs_backend/__main__.py @@ -133,6 +133,10 @@ class WfExS_Commands(StrDocEnum): "re-stage", "Prepare a new staging (working) directory for workflow execution, repeating the fetch of dependencies and contents", ) + TryStage = ( + "try-stage", + "Prepare a workflow in a new staging (working) directory for workflow execution, fetching dependencies but no input", + ) MountWorkDir = ( "mount-workdir", "Mount the encrypted staging directory on secure staging scenarios", @@ -342,7 +346,20 @@ def genParserSub( help="Force secured working directory", ) - if preStageParams or exportParams or command == WfExS_Commands.ReStage: + if command == WfExS_Commands.TryStage: + ap_.add_argument( + "--workflow-uri", + dest="workflowURI", + required=True, + type=str, + help="URI of the workflow to be tried", + ) + + if ( + preStageParams + or exportParams + or command in (WfExS_Commands.ReStage, WfExS_Commands.TryStage) + ): ap_.add_argument( "-Z", "--creds-config", @@ -362,7 +379,7 @@ def genParserSub( if ( preStageParams and command not in (WfExS_Commands.ConfigValidate,) - ) or command == WfExS_Commands.ReStage: + ) or command in (WfExS_Commands.ReStage, WfExS_Commands.TryStage): ap_.add_argument( "-n", "--nickname-prefix", @@ -1306,6 +1323,7 @@ def _get_wfexs_argparse_internal( ap_ll = genParserSub(sp, WfExS_Commands.ListLicences) ap_cv = genParserSub(sp, WfExS_Commands.ConfigValidate, preStageParams=True) + ap_try = genParserSub(sp, WfExS_Commands.TryStage) ap_s = genParserSub(sp, WfExS_Commands.Stage, preStageParams=True) ap_r_s = genParserSub( @@ -1568,7 +1586,10 @@ def main() -> None: file=sys.stderr, ) sys.exit(1) - elif command != WfExS_Commands.Import and not args.workflowConfigFilename: + elif ( + command not in (WfExS_Commands.Import, WfExS_Commands.TryStage) + and not args.workflowConfigFilename + ): print("[ERROR] Workflow config was not provided! Stopping.", file=sys.stderr) sys.exit(1) elif command == WfExS_Commands.ConfigValidate: @@ -1587,6 +1608,12 @@ def main() -> None: orcids=op_orcids, paranoidMode=args.secure, ) + elif command == WfExS_Commands.TryStage: + wfInstance = wfBackend.tryWorkflowURI( + args.workflowURI, + args.securityContextsConfigFilename, + nickname_prefix=args.nickname_prefix, + ) elif command == WfExS_Commands.Import: wfInstance = wfBackend.fromPreviousROCrate( args.workflowROCrateFilenameOrURI, @@ -1678,6 +1705,26 @@ def main() -> None: or not isinstance(wfInstance.stageMarshalled, datetime.datetime) else 0 ) + elif command == WfExS_Commands.TryStage: + print( + "\t Instance {} (nickname '{}') (to be inspected later)".format( + wfSetup.instance_id, wfSetup.nickname + ) + ) + stagedSetup = wfInstance.tryStageWorkflow() + print( + "\t- Instance {} (nickname '{}') is {} ready".format( + wfSetup.instance_id, + wfSetup.nickname, + "NOT" if stagedSetup.is_damaged else "now", + ) + ) + sys.exit( + 1 + if stagedSetup.is_damaged + or not isinstance(wfInstance.stageMarshalled, datetime.datetime) + else 0 + ) # Depending on the parameters, it might not exist if getattr(args, "doMaterializedROCrate", None): diff --git a/wfexs_backend/wfexs_backend.py b/wfexs_backend/wfexs_backend.py index 98f51bf..822b7a4 100644 --- a/wfexs_backend/wfexs_backend.py +++ b/wfexs_backend/wfexs_backend.py @@ -1345,6 +1345,19 @@ def getDefaultParanoidMode(self) -> "bool": def enableDefaultParanoidMode(self) -> None: self.defaultParanoidMode = True + def tryWorkflowURI( + self, + workflow_uri: "str", + securityContextsConfigFilename: "Optional[pathlib.Path]" = None, + nickname_prefix: "Optional[str]" = None, + ) -> "WF": + return WF.TryWorkflowURI( + self, + workflow_uri, + securityContextsConfigFilename=securityContextsConfigFilename, + nickname_prefix=nickname_prefix, + ) + def fromFiles( self, workflowMetaFilename: "pathlib.Path", @@ -2122,10 +2135,12 @@ def cacheWorkflow( web_url=guessedRepo.web_url, ) else: + repoRelPath: "Optional[str]" = None ( i_workflow, cached_putative_path, metadata_array, + repoRelPath, ) = self.getWorkflowBundleFromURI( cast("URIType", workflow_id), offline=offline, @@ -2134,17 +2149,17 @@ def cacheWorkflow( if i_workflow is None: repoDir = cached_putative_path - repoRelPath: "Optional[str]" = None - if repoDir.is_dir(): - if len(parsedRepoURL.fragment) > 0: - frag_qs = urllib.parse.parse_qs(parsedRepoURL.fragment) - subDirArr = frag_qs.get("subdirectory", []) - if len(subDirArr) > 0: - repoRelPath = subDirArr[0] - elif len(metadata_array) > 0: - # Let's try getting a pretty filename - # when the workflow is a single file - repoRelPath = metadata_array[0].preferredName + if not repoRelPath: + if repoDir.is_dir(): + if len(parsedRepoURL.fragment) > 0: + frag_qs = urllib.parse.parse_qs(parsedRepoURL.fragment) + subDirArr = frag_qs.get("subdirectory", []) + if len(subDirArr) > 0: + repoRelPath = subDirArr[0] + elif len(metadata_array) > 0: + # Let's try getting a pretty filename + # when the workflow is a single file + repoRelPath = metadata_array[0].preferredName # It can be either a relative path to a directory or to a file # It could be even empty! @@ -2418,6 +2433,7 @@ def getWorkflowRepoFromTRS( i_workflow, self.cacheROCrateFilename, metadata_array, + _, ) = self.getWorkflowBundleFromURI( roCrateURL, expectedEngineDesc=self.RECOGNIZED_TRS_DESCRIPTORS[ @@ -2607,7 +2623,7 @@ def getWorkflowBundleFromURI( offline: "bool" = False, ignoreCache: "bool" = False, registerInCache: "bool" = True, - ) -> "Tuple[Optional[IdentifiedWorkflow], pathlib.Path, Sequence[URIWithMetadata]]": + ) -> "Tuple[Optional[IdentifiedWorkflow], pathlib.Path, Sequence[URIWithMetadata], Optional[RelPath]]": try: cached_content = self.cacheFetch( remote_url, @@ -2649,16 +2665,21 @@ def getWorkflowBundleFromURI( roCrateFile, ) + identified_workflow = self.getWorkflowRepoFromROCrateFile( + roCrateFile, expectedEngineDesc + ) return ( - self.getWorkflowRepoFromROCrateFile(roCrateFile, expectedEngineDesc), + identified_workflow, roCrateFile, cached_content.metadata_array, + identified_workflow.remote_repo.rel_path, ) else: return ( None, cached_content.path, cached_content.metadata_array, + None, ) def getWorkflowRepoFromROCrateFile( diff --git a/wfexs_backend/workflow.py b/wfexs_backend/workflow.py index edb3a5e..524c1dd 100644 --- a/wfexs_backend/workflow.py +++ b/wfexs_backend/workflow.py @@ -1329,6 +1329,32 @@ def FromWorkDir( fail_ok=fail_ok, ) + @classmethod + def TryWorkflowURI( + cls, + wfexs: "WfExSBackend", + workflow_uri: "str", + securityContextsConfigFilename: "Optional[pathlib.Path]" = None, + nickname_prefix: "Optional[str]" = None, + ) -> "WF": + """ + This class method creates a new staged working directory + """ + + workflow_meta = { + "workflow_id": workflow_uri, + "workflow_config": {"secure": False}, + "params": {}, + } + + return cls.FromStagedRecipe( + wfexs, + workflow_meta, + securityContextsConfigFilename=securityContextsConfigFilename, + nickname_prefix=nickname_prefix, + reproducibility_level=ReproducibilityLevel.Minimal, + ) + @classmethod def FromFiles( cls, @@ -3574,6 +3600,33 @@ def fetchInputs( return theInputs, lastInput, the_failed_uris + def tryStageWorkflow( + self, offline: "bool" = False, ignoreCache: "bool" = False + ) -> "StagedSetup": + """ + This method is here to try materializing and identifying a workflow + """ + + # Inputs should be materialized before materializing the workflow itself + # because some workflow systems could need them in order to describe + # some its internal details. + # + # But as we are trying to materialize a bare workflow, no input + # is going to be provided + + # This method is called from within setupEngine + # self.fetchWorkflow(self.id, self.version_id, self.trs_endpoint, self.descriptor_type) + # This method is called from within materializeWorkflowAndContainers + # self.setupEngine(offline=offline) + self.materializeWorkflowAndContainers( + offline=offline, + ignoreCache=ignoreCache, + ) + + self.marshallStage() + + return self.getStagedSetup() + def stageWorkDir( self, offline: "bool" = False, ignoreCache: "bool" = False ) -> "StagedSetup": From aa51d88af615092c8a19b63b5b3b5b55efd94e48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Thu, 17 Oct 2024 01:20:20 +0200 Subject: [PATCH 2/3] Added heuristic to (sometimes) properly obtain the relative path of the workflow entrypoint. It should help with some Workflow RO-Crate cases where bug #128 is triggered. --- wfexs_backend/utils/rocrate.py | 18 ++++++++++++++++++ wfexs_backend/workflow.py | 10 ++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/wfexs_backend/utils/rocrate.py b/wfexs_backend/utils/rocrate.py index 6543d01..68777e3 100644 --- a/wfexs_backend/utils/rocrate.py +++ b/wfexs_backend/utils/rocrate.py @@ -2491,6 +2491,24 @@ def extractWorkflowMetadata( if langrow.workflow_alternate_name is not None: repo_relpath = str(langrow.workflow_alternate_name) + # A fallback + if repo_relpath is None: + self.logger.warning( + f"Deriving relative path of workflow entry point from entry point location in RO-Crate metadata" + ) + main_entity_uri = str(main_entity) + main_entity_parsed_uri = urllib.parse.urlparse(main_entity_uri) + use_main_entity = ( + main_entity_parsed_uri.scheme == self.RELATIVE_ROCRATE_SCHEME + ) + + if use_main_entity: + entity_path = urllib.parse.unquote(main_entity_parsed_uri.path) + if entity_path.startswith("/"): + entity_path = entity_path[1:] + + repo_relpath = entity_path + repo_web_url: "Optional[str]" = None if langrow.workflow_url is not None: repo_web_url = str(langrow.workflow_url) diff --git a/wfexs_backend/workflow.py b/wfexs_backend/workflow.py index 524c1dd..46f23fb 100644 --- a/wfexs_backend/workflow.py +++ b/wfexs_backend/workflow.py @@ -2016,6 +2016,7 @@ def fetchWorkflow( offline=offline, meta_dir=self.metaDir, ) + self.logger.error(repo) self.remote_repo = repo # These are kept for compatibility @@ -2078,15 +2079,15 @@ def fetchWorkflow( ) self.logger.info( "materialized workflow repository (checkout {}): {}".format( - self.repoEffectiveCheckout, self.workflowDir + self.repoEffectiveCheckout, localWorkflow.dir ) ) - if self.repoRelPath is not None: - if not (self.workflowDir / self.repoRelPath).exists(): + if localWorkflow.relPath is not None: + if not (localWorkflow.dir / localWorkflow.relPath).exists(): raise WFException( "Relative path {} cannot be found in materialized workflow repository {}".format( - self.repoRelPath, self.workflowDir + localWorkflow.relPath, localWorkflow.dir ) ) # A valid engine must be identified from the fetched content @@ -2126,6 +2127,7 @@ def fetchWorkflow( self.logger.debug("Fixed engine " + self.engineDesc.trs_descriptor) engine = self.wfexs.instantiateEngine(self.engineDesc, self.staged_setup) engineVer, candidateLocalWorkflow = engine.identifyWorkflow(localWorkflow) + self.logger.error(localWorkflow) if engineVer is None: raise WFException( "Engine {} did not recognize a workflow at {}".format( From 7714606ae935ec5901896293f15599ff8916f484 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Thu, 17 Oct 2024 01:49:57 +0200 Subject: [PATCH 3/3] Removed a couple of traces using self.logger.error --- wfexs_backend/workflow.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/wfexs_backend/workflow.py b/wfexs_backend/workflow.py index 46f23fb..5a982ab 100644 --- a/wfexs_backend/workflow.py +++ b/wfexs_backend/workflow.py @@ -2016,7 +2016,6 @@ def fetchWorkflow( offline=offline, meta_dir=self.metaDir, ) - self.logger.error(repo) self.remote_repo = repo # These are kept for compatibility @@ -2127,7 +2126,6 @@ def fetchWorkflow( self.logger.debug("Fixed engine " + self.engineDesc.trs_descriptor) engine = self.wfexs.instantiateEngine(self.engineDesc, self.staged_setup) engineVer, candidateLocalWorkflow = engine.identifyWorkflow(localWorkflow) - self.logger.error(localWorkflow) if engineVer is None: raise WFException( "Engine {} did not recognize a workflow at {}".format(