diff --git a/.gitmodules b/.gitmodules index 71907ead7..ed00dea93 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "src/pynxtools/definitions"] path = src/pynxtools/definitions - url = https://github.com/FAIRmat-NFDI/nexus_definitions.git \ No newline at end of file + url = https://github.com/FAIRmat-NFDI/nexus_definitions.git diff --git a/src/pynxtools/data/NXtest.nxdl.xml b/src/pynxtools/data/NXtest.nxdl.xml index 8695a20c9..04d9dd1b8 100644 --- a/src/pynxtools/data/NXtest.nxdl.xml +++ b/src/pynxtools/data/NXtest.nxdl.xml @@ -13,7 +13,9 @@ This is a dummy NXDL to test out the dataconverter. - + + This is the version of the definition. + diff --git a/src/pynxtools/dataconverter/convert.py b/src/pynxtools/dataconverter/convert.py index 508071906..66aecff29 100644 --- a/src/pynxtools/dataconverter/convert.py +++ b/src/pynxtools/dataconverter/convert.py @@ -237,7 +237,13 @@ def convert( ) helpers.add_default_root_attributes(data=data, filename=os.path.basename(output)) - Writer(data=data, nxdl_f_path=nxdl_f_path, output_path=output).write() + + write_docs = kwargs.pop("write_docs", False) + docs_format = kwargs.pop("docs_format", None) + Writer(data=data, nxdl_f_path=nxdl_f_path, output_path=output).write( + write_docs=write_docs, + docs_format=docs_format, + ) logger.info(f"The output file generated: {output}.") @@ -350,7 +356,21 @@ def main_cli(): default=None, help="A json config file for the reader", ) -# pylint: disable=too-many-arguments +@click.option( + "--write-docs", + is_flag=True, + default=False, + help="Write docs for the individual NeXus concepts as HDF5 attributes.", +) +@click.option( + "--docs-format", + type=click.Choice(["default", "html", "html5", "xml", "pseudoxml"]), + default=None, + help=( + "Optionally specify the format in which the docs for the individual NeXus concepts is generated. " + "By default, the docs are formatted as in the NXDL file." + ), +) def convert_cli( files: Tuple[str, ...], input_file: Tuple[str, ...], @@ -363,6 +383,8 @@ def convert_cli( mapping: str, config_file: str, fail: bool, + write_docs: bool, + docs_format: str, **kwargs, ): """This command allows you to use the converter functionality of the dataconverter.""" @@ -390,6 +412,18 @@ def convert_cli( if config_file: kwargs["config_file"] = config_file + if write_docs: + kwargs["write_docs"] = write_docs + if not docs_format: + kwargs["docs_format"] = "default" + else: + kwargs["docs_format"] = docs_format + + elif docs_format is not None: + raise click.UsageError( + "Error: --docs-format can only be used with --write-docs." + ) + file_list = [] for file in files: if os.path.isdir(file): diff --git a/src/pynxtools/dataconverter/helpers.py b/src/pynxtools/dataconverter/helpers.py index e9b35dffb..87c5691f0 100644 --- a/src/pynxtools/dataconverter/helpers.py +++ b/src/pynxtools/dataconverter/helpers.py @@ -790,6 +790,64 @@ def get_concept_basepath(path: str) -> str: return "/" + "/".join(concept_path) +def get_concept_path_from_elem(elem: ET.Element) -> str: + """ + Process individual XML element to generate the NeXus concept path. + + Output is e.g. "NXexperiment:/NXentry/NXinstrument/NXdetector". + """ + + name = elem.attrib.get("name", "") + elem_type = elem.attrib.get("type", "") + nxdlbase = elem.attrib.get("nxdlbase", "") # .split("/")[-1] + nxdlbase_class = elem.attrib.get("nxdlbase_class", "") + nxdlpath = elem.attrib.get("nxdlpath", "") + category = elem.attrib.get("category", "") + # optional = elem.attrib.get("optional", "") + # extends = elem.attrib.get("extends", "") + + # print(f"tag: {tag}") + # print(f"name: {name}") + # print(f"elem_type: {elem_type}") + # print(f"nxdlbase: {nxdlbase}") + # print(f"nxdlbase_class: {nxdlbase_class}") + # print(f"nxdlpath: {nxdlpath}") + # # print(f"optional: {optional}") + # # print(f"extends: {extends}") + # print("\n") + + concept_path = "" + + if elem.tag.endswith("group"): + if nxdlbase_class and nxdlbase_class == "application": + concept_path += "NXmpes:" + concept_path += nxdlpath # + = f"(elem_type)" + + else: + if nxdlbase: + concept_path += nxdlbase.replace(".nxdl.xml", "").split(os.path.sep)[-1] + concept_path += nxdlpath # + = f"(elem_type)" + + elif elem.tag.endswith("field"): + pass + + elif elem.tag.endswith("attribute"): + pass + elif elem.tag.endswith("definition"): + concept_path += name + + return concept_path + + # if nxdlpath: + # # Split the nxdlpath and construct the string + # path_parts = nxdlpath.strip("/").split("/") + # formatted_path = "/".join(path_parts) + # return f"{formatted_path}({elem_type})" + # else: + # # For elements with no path, return the name and type + # return f"{name}({elem_type})" + + def remove_namespace_from_tag(tag): """Helper function to remove the namespace from an XML tag.""" diff --git a/src/pynxtools/dataconverter/writer.py b/src/pynxtools/dataconverter/writer.py index d22307c88..83e0201e1 100644 --- a/src/pynxtools/dataconverter/writer.py +++ b/src/pynxtools/dataconverter/writer.py @@ -19,19 +19,22 @@ # pylint: disable=R0912 +import io import copy import logging -import sys import xml.etree.ElementTree as ET +from typing import Optional import h5py import numpy as np +from docutils.core import publish_string from pynxtools.dataconverter import helpers from pynxtools.dataconverter.exceptions import InvalidDictProvided from pynxtools.definitions.dev_tools.utils.nxdl_utils import ( NxdlAttributeNotFoundError, get_node_at_nxdl_path, + get_inherited_nodes, ) logger = logging.getLogger("pynxtools") # pylint: disable=C0103 @@ -109,7 +112,7 @@ def handle_shape_entries(data, file, path): # pylint: disable=too-many-locals, inconsistent-return-statements -def handle_dicts_entries(data, grp, entry_name, output_path, path): +def handle_dicts_entries(data, grp, entry_name, output_path, path, docs): """Handle function for dictionaries found as value of the nexus file. Several cases can be encoutered: @@ -118,12 +121,14 @@ def handle_dicts_entries(data, grp, entry_name, output_path, path): - Internal links - External links - compression label""" + + # print(data, grp, entry_name, output_path, path, docs) if "link" in data: file, path = split_link(data, output_path) # generate virtual datasets from slices if "shape" in data.keys(): layout = handle_shape_entries(data, file, path) - grp.create_virtual_dataset(entry_name, layout) + dataset = grp.create_virtual_dataset(entry_name, layout) # multiple datasets to concatenate elif "link" in data.keys() and isinstance(data["link"], list): total_length = 0 @@ -141,7 +146,7 @@ def handle_dicts_entries(data, grp, entry_name, output_path, path): for vsource in sources: layout[offset : offset + vsource.shape[0]] = vsource offset += vsource.shape[0] - grp.create_virtual_dataset(entry_name, layout, fillvalue=0) + dataset = grp.create_virtual_dataset(entry_name, layout, fillvalue=0) # internal and external links elif "link" in data.keys(): if ":/" not in data["link"]: @@ -159,7 +164,7 @@ def handle_dicts_entries(data, grp, entry_name, output_path, path): ) if accept is True: strength = data["strength"] - grp.create_dataset( + dataset = grp.create_dataset( entry_name, data=data["compress"], compression="gzip", @@ -167,13 +172,20 @@ def handle_dicts_entries(data, grp, entry_name, output_path, path): compression_opts=strength, ) else: - grp.create_dataset(entry_name, data=data["compress"]) + dataset = grp.create_dataset(entry_name, data=data["compress"]) else: raise InvalidDictProvided( "A dictionary was provided to the template but it didn't" " fall into any of the know cases of handling" " dictionaries. This occured for: " + entry_name ) + + if docs: + try: + dataset.attrs["docs"] = docs + except NameError: + pass + # Check whether link has been stabilished or not try: return grp[entry_name] @@ -198,10 +210,14 @@ class Writer: output_nexus (h5py.File): The h5py file object to manipulate output file. nxdl_data (dict): Stores xml data from given nxdl file to use during conversion. nxs_namespace (str): The namespace used in the NXDL tags. Helps search for XML children. + write_docs (bool): Write docs for the individual NeXus concepts as HDF5 attributes. """ def __init__( - self, data: dict = None, nxdl_f_path: str = None, output_path: str = None + self, + data: dict = None, + nxdl_f_path: str = None, + output_path: str = None, ): """Constructs the necessary objects required by the Writer class.""" self.data = data @@ -211,6 +227,9 @@ def __init__( self.nxdl_data = ET.parse(self.nxdl_f_path).getroot() self.nxs_namespace = get_namespace(self.nxdl_data) + self.write_docs: bool = False + self.docs_format: str = "default" + def __nxdl_to_attrs(self, path: str = "/") -> dict: """ Return a dictionary of all the attributes at the given path in the NXDL and @@ -237,6 +256,77 @@ def __nxdl_to_attrs(self, path: str = "/") -> dict: return elem.attrib + def __nxdl_docs(self, path: str = "/") -> Optional[str]: + """Get the NXDL docs for a path in the data.""" + + def extract_and_format_docs(elem: ET.Element) -> str: + """Get the docstring for a given element in the NDXL tree.""" + docs_elements = elem.findall(f"{self.nxs_namespace}doc") + if docs_elements: + docs = docs_elements[0].text + if self.docs_format != "default": + docs = publish_string( + docs, + writer_name=self.docs_format, + settings_overrides={"warning_stream": io.StringIO()}, + ).decode("utf-8") + return docs.strip().replace("\\n", "\n") + return "" + + docs: str = "" + + if not self.write_docs: + return None + + nxdl_path = helpers.convert_data_converter_dict_to_nxdl_path(path) + + if nxdl_path == "/ENTRY": + # Special case for docs of application definition + app_def_docs = extract_and_format_docs(self.nxdl_data) + if app_def_docs: + return app_def_docs + + class_path, nxdl_elem_path, elist = get_inherited_nodes( + nxdl_path, elem=copy.deepcopy(self.nxdl_data) + ) + + path_to_check = "/ENTRY/INSTRUMENT/ELECTRONANALYSER/energy_resolution" # /physical_quantity" # == "/ENTRY/SAMPLE/flood_gun_current_env/flood_gun" + + if nxdl_path == path_to_check: + for thing in [ + # path, + # nxdl_path, + # class_path, + # nxdl_elem_path, + # elist + ]: + print(thing, "\n") + for elem in elist: + if nxdl_path == path_to_check: + # print(elem.tag) + # print("\t elem.attrib:", elem.attrib.keys()) + + if elem.tag.endswith(("group", "field", "attribute", "definition")): + concept_path = helpers.get_concept_path_from_elem(elem), "\n" + # print(concept_path) + + if not docs: + # Only use docs from superclasses if they are not extended. + docs += extract_and_format_docs(elem) + # print("\n") + + if not elist: + # Handle docs for attributeS + (_, inherited_nodes, _) = get_inherited_nodes( + nxdl_path, elem=copy.deepcopy(self.nxdl_data) + ) + attrs = inherited_nodes[-1].findall(f"{self.nxs_namespace}attribute") + for attr in attrs: + if attr.attrib["name"] == path.split("@")[-1]: + docs += extract_and_format_docs(attr) + + return docs + def ensure_and_get_parent_node(self, path: str, undocumented_paths) -> h5py.Group: """Returns the parent if it exists for a given path else creates the parent group.""" parent_path = path[0 : path.rindex("/")] or "/" @@ -249,6 +339,11 @@ def ensure_and_get_parent_node(self, path: str, undocumented_paths) -> h5py.Grou if attrs is not None: grp.attrs["NX_class"] = attrs["type"] + + docs = self.__nxdl_docs(parent_path) + if docs: + grp.attrs["docs"] = docs + return grp return self.output_nexus[parent_path_hdf5] @@ -263,6 +358,8 @@ def add_units_key(dataset, path): dataset.attrs["units"] = self.data[units_key] for path, value in self.data.items(): + docs = self.__nxdl_docs(path) + try: if path[path.rindex("/") + 1 :] == "@units": continue @@ -279,17 +376,22 @@ def add_units_key(dataset, path): grp = self.ensure_and_get_parent_node( path, self.data.undocumented.keys() ) + if isinstance(data, dict): if "compress" in data.keys(): dataset = handle_dicts_entries( - data, grp, entry_name, self.output_path, path + data, grp, entry_name, self.output_path, path, docs ) + else: hdf5_links_for_later.append( - [data, grp, entry_name, self.output_path, path] + [data, grp, entry_name, self.output_path, path, docs] ) else: dataset = grp.create_dataset(entry_name, data=data) + if docs: + dataset.attrs["docs"] = docs + except InvalidDictProvided as exc: print(str(exc)) except Exception as exc: @@ -305,6 +407,7 @@ def add_units_key(dataset, path): del self.data[links[-1]] for path, value in self.data.items(): + docs = self.__nxdl_docs(path) try: if path[path.rindex("/") + 1 :] == "@units": continue @@ -322,19 +425,28 @@ def add_units_key(dataset, path): add_units_key(self.output_nexus[path_hdf5], path) else: - # consider changing the name here the lvalue can also be group! dataset = self.ensure_and_get_parent_node( path, self.data.undocumented.keys() ) dataset.attrs[entry_name[1:]] = data + if docs: + # Write docs for attributes like __docs + dataset.attrs[f"{entry_name[1:]}_docs"] = docs except Exception as exc: raise IOError( f"Unknown error occured writing the path: {path} " f"with the following message: {str(exc)}" ) from exc - def write(self): - """Writes the NeXus file with previously validated data from the reader with NXDL attrs.""" + def write(self, write_docs: bool = False, docs_format: str = "default"): + """ + Writes the NeXus file with previously validated data from the reader with NXDL attrs. + + Args: + write_docs (bool): Write docs for the individual NeXus concepts as HDF5 attributes. The default is False. + """ + self.write_docs = write_docs + self.docs_format = docs_format try: self._put_data_into_hdf5() finally: diff --git a/tests/dataconverter/test_writer.py b/tests/dataconverter/test_writer.py index acc84d8d5..a6502c519 100644 --- a/tests/dataconverter/test_writer.py +++ b/tests/dataconverter/test_writer.py @@ -59,6 +59,29 @@ def test_write(writer): assert test_nxs["/my_entry/nxodd_name/posint_value"].shape == (3,) # pylint: disable=no-member +def test_write_docs(writer): + """Test for the Writer's write_docs option. Checks whether docs are written for NeXus concepts.""" + writer.write(write_docs=True) + test_nxs = h5py.File(writer.output_path, "r") + # print(writer.output_path) + assert ( + test_nxs["/my_entry"].attrs["docs"] + == "This is a dummy NXDL to test out the dataconverter." + ) + assert ( + test_nxs["/my_entry/definition"].attrs["version__docs"] + == "This is the version of the definition." + ) + assert ( + test_nxs["/my_entry/nxodd_name/int_value"].attrs["docs"] + == "A dummy entry for an int value." + ) + assert ( + test_nxs["/my_entry/required_group"].attrs["docs"] + == "This is a required yet empty group." + ) + + def test_write_link(writer): """Test for the Writer's write function.