diff --git a/.gitmodules b/.gitmodules
index 71907ead7..ed00dea93 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +1,3 @@
[submodule "src/pynxtools/definitions"]
path = src/pynxtools/definitions
- url = https://github.com/FAIRmat-NFDI/nexus_definitions.git
\ No newline at end of file
+ url = https://github.com/FAIRmat-NFDI/nexus_definitions.git
diff --git a/src/pynxtools/data/NXtest.nxdl.xml b/src/pynxtools/data/NXtest.nxdl.xml
index 8695a20c9..04d9dd1b8 100644
--- a/src/pynxtools/data/NXtest.nxdl.xml
+++ b/src/pynxtools/data/NXtest.nxdl.xml
@@ -13,7 +13,9 @@
This is a dummy NXDL to test out the dataconverter.
-
+
+ This is the version of the definition.
+
diff --git a/src/pynxtools/dataconverter/convert.py b/src/pynxtools/dataconverter/convert.py
index 508071906..66aecff29 100644
--- a/src/pynxtools/dataconverter/convert.py
+++ b/src/pynxtools/dataconverter/convert.py
@@ -237,7 +237,13 @@ def convert(
)
helpers.add_default_root_attributes(data=data, filename=os.path.basename(output))
- Writer(data=data, nxdl_f_path=nxdl_f_path, output_path=output).write()
+
+ write_docs = kwargs.pop("write_docs", False)
+ docs_format = kwargs.pop("docs_format", None)
+ Writer(data=data, nxdl_f_path=nxdl_f_path, output_path=output).write(
+ write_docs=write_docs,
+ docs_format=docs_format,
+ )
logger.info(f"The output file generated: {output}.")
@@ -350,7 +356,21 @@ def main_cli():
default=None,
help="A json config file for the reader",
)
-# pylint: disable=too-many-arguments
+@click.option(
+ "--write-docs",
+ is_flag=True,
+ default=False,
+ help="Write docs for the individual NeXus concepts as HDF5 attributes.",
+)
+@click.option(
+ "--docs-format",
+ type=click.Choice(["default", "html", "html5", "xml", "pseudoxml"]),
+ default=None,
+ help=(
+ "Optionally specify the format in which the docs for the individual NeXus concepts is generated. "
+ "By default, the docs are formatted as in the NXDL file."
+ ),
+)
def convert_cli(
files: Tuple[str, ...],
input_file: Tuple[str, ...],
@@ -363,6 +383,8 @@ def convert_cli(
mapping: str,
config_file: str,
fail: bool,
+ write_docs: bool,
+ docs_format: str,
**kwargs,
):
"""This command allows you to use the converter functionality of the dataconverter."""
@@ -390,6 +412,18 @@ def convert_cli(
if config_file:
kwargs["config_file"] = config_file
+ if write_docs:
+ kwargs["write_docs"] = write_docs
+ if not docs_format:
+ kwargs["docs_format"] = "default"
+ else:
+ kwargs["docs_format"] = docs_format
+
+ elif docs_format is not None:
+ raise click.UsageError(
+ "Error: --docs-format can only be used with --write-docs."
+ )
+
file_list = []
for file in files:
if os.path.isdir(file):
diff --git a/src/pynxtools/dataconverter/helpers.py b/src/pynxtools/dataconverter/helpers.py
index e9b35dffb..87c5691f0 100644
--- a/src/pynxtools/dataconverter/helpers.py
+++ b/src/pynxtools/dataconverter/helpers.py
@@ -790,6 +790,64 @@ def get_concept_basepath(path: str) -> str:
return "/" + "/".join(concept_path)
+def get_concept_path_from_elem(elem: ET.Element) -> str:
+ """
+ Process individual XML element to generate the NeXus concept path.
+
+ Output is e.g. "NXexperiment:/NXentry/NXinstrument/NXdetector".
+ """
+
+ name = elem.attrib.get("name", "")
+ elem_type = elem.attrib.get("type", "")
+ nxdlbase = elem.attrib.get("nxdlbase", "") # .split("/")[-1]
+ nxdlbase_class = elem.attrib.get("nxdlbase_class", "")
+ nxdlpath = elem.attrib.get("nxdlpath", "")
+ category = elem.attrib.get("category", "")
+ # optional = elem.attrib.get("optional", "")
+ # extends = elem.attrib.get("extends", "")
+
+ # print(f"tag: {tag}")
+ # print(f"name: {name}")
+ # print(f"elem_type: {elem_type}")
+ # print(f"nxdlbase: {nxdlbase}")
+ # print(f"nxdlbase_class: {nxdlbase_class}")
+ # print(f"nxdlpath: {nxdlpath}")
+ # # print(f"optional: {optional}")
+ # # print(f"extends: {extends}")
+ # print("\n")
+
+ concept_path = ""
+
+ if elem.tag.endswith("group"):
+ if nxdlbase_class and nxdlbase_class == "application":
+ concept_path += "NXmpes:"
+ concept_path += nxdlpath # + = f"(elem_type)"
+
+ else:
+ if nxdlbase:
+ concept_path += nxdlbase.replace(".nxdl.xml", "").split(os.path.sep)[-1]
+ concept_path += nxdlpath # + = f"(elem_type)"
+
+ elif elem.tag.endswith("field"):
+ pass
+
+ elif elem.tag.endswith("attribute"):
+ pass
+ elif elem.tag.endswith("definition"):
+ concept_path += name
+
+ return concept_path
+
+ # if nxdlpath:
+ # # Split the nxdlpath and construct the string
+ # path_parts = nxdlpath.strip("/").split("/")
+ # formatted_path = "/".join(path_parts)
+ # return f"{formatted_path}({elem_type})"
+ # else:
+ # # For elements with no path, return the name and type
+ # return f"{name}({elem_type})"
+
+
def remove_namespace_from_tag(tag):
"""Helper function to remove the namespace from an XML tag."""
diff --git a/src/pynxtools/dataconverter/writer.py b/src/pynxtools/dataconverter/writer.py
index d22307c88..83e0201e1 100644
--- a/src/pynxtools/dataconverter/writer.py
+++ b/src/pynxtools/dataconverter/writer.py
@@ -19,19 +19,22 @@
# pylint: disable=R0912
+import io
import copy
import logging
-import sys
import xml.etree.ElementTree as ET
+from typing import Optional
import h5py
import numpy as np
+from docutils.core import publish_string
from pynxtools.dataconverter import helpers
from pynxtools.dataconverter.exceptions import InvalidDictProvided
from pynxtools.definitions.dev_tools.utils.nxdl_utils import (
NxdlAttributeNotFoundError,
get_node_at_nxdl_path,
+ get_inherited_nodes,
)
logger = logging.getLogger("pynxtools") # pylint: disable=C0103
@@ -109,7 +112,7 @@ def handle_shape_entries(data, file, path):
# pylint: disable=too-many-locals, inconsistent-return-statements
-def handle_dicts_entries(data, grp, entry_name, output_path, path):
+def handle_dicts_entries(data, grp, entry_name, output_path, path, docs):
"""Handle function for dictionaries found as value of the nexus file.
Several cases can be encoutered:
@@ -118,12 +121,14 @@ def handle_dicts_entries(data, grp, entry_name, output_path, path):
- Internal links
- External links
- compression label"""
+
+ # print(data, grp, entry_name, output_path, path, docs)
if "link" in data:
file, path = split_link(data, output_path)
# generate virtual datasets from slices
if "shape" in data.keys():
layout = handle_shape_entries(data, file, path)
- grp.create_virtual_dataset(entry_name, layout)
+ dataset = grp.create_virtual_dataset(entry_name, layout)
# multiple datasets to concatenate
elif "link" in data.keys() and isinstance(data["link"], list):
total_length = 0
@@ -141,7 +146,7 @@ def handle_dicts_entries(data, grp, entry_name, output_path, path):
for vsource in sources:
layout[offset : offset + vsource.shape[0]] = vsource
offset += vsource.shape[0]
- grp.create_virtual_dataset(entry_name, layout, fillvalue=0)
+ dataset = grp.create_virtual_dataset(entry_name, layout, fillvalue=0)
# internal and external links
elif "link" in data.keys():
if ":/" not in data["link"]:
@@ -159,7 +164,7 @@ def handle_dicts_entries(data, grp, entry_name, output_path, path):
)
if accept is True:
strength = data["strength"]
- grp.create_dataset(
+ dataset = grp.create_dataset(
entry_name,
data=data["compress"],
compression="gzip",
@@ -167,13 +172,20 @@ def handle_dicts_entries(data, grp, entry_name, output_path, path):
compression_opts=strength,
)
else:
- grp.create_dataset(entry_name, data=data["compress"])
+ dataset = grp.create_dataset(entry_name, data=data["compress"])
else:
raise InvalidDictProvided(
"A dictionary was provided to the template but it didn't"
" fall into any of the know cases of handling"
" dictionaries. This occured for: " + entry_name
)
+
+ if docs:
+ try:
+ dataset.attrs["docs"] = docs
+ except NameError:
+ pass
+
# Check whether link has been stabilished or not
try:
return grp[entry_name]
@@ -198,10 +210,14 @@ class Writer:
output_nexus (h5py.File): The h5py file object to manipulate output file.
nxdl_data (dict): Stores xml data from given nxdl file to use during conversion.
nxs_namespace (str): The namespace used in the NXDL tags. Helps search for XML children.
+ write_docs (bool): Write docs for the individual NeXus concepts as HDF5 attributes.
"""
def __init__(
- self, data: dict = None, nxdl_f_path: str = None, output_path: str = None
+ self,
+ data: dict = None,
+ nxdl_f_path: str = None,
+ output_path: str = None,
):
"""Constructs the necessary objects required by the Writer class."""
self.data = data
@@ -211,6 +227,9 @@ def __init__(
self.nxdl_data = ET.parse(self.nxdl_f_path).getroot()
self.nxs_namespace = get_namespace(self.nxdl_data)
+ self.write_docs: bool = False
+ self.docs_format: str = "default"
+
def __nxdl_to_attrs(self, path: str = "/") -> dict:
"""
Return a dictionary of all the attributes at the given path in the NXDL and
@@ -237,6 +256,77 @@ def __nxdl_to_attrs(self, path: str = "/") -> dict:
return elem.attrib
+ def __nxdl_docs(self, path: str = "/") -> Optional[str]:
+ """Get the NXDL docs for a path in the data."""
+
+ def extract_and_format_docs(elem: ET.Element) -> str:
+ """Get the docstring for a given element in the NDXL tree."""
+ docs_elements = elem.findall(f"{self.nxs_namespace}doc")
+ if docs_elements:
+ docs = docs_elements[0].text
+ if self.docs_format != "default":
+ docs = publish_string(
+ docs,
+ writer_name=self.docs_format,
+ settings_overrides={"warning_stream": io.StringIO()},
+ ).decode("utf-8")
+ return docs.strip().replace("\\n", "\n")
+ return ""
+
+ docs: str = ""
+
+ if not self.write_docs:
+ return None
+
+ nxdl_path = helpers.convert_data_converter_dict_to_nxdl_path(path)
+
+ if nxdl_path == "/ENTRY":
+ # Special case for docs of application definition
+ app_def_docs = extract_and_format_docs(self.nxdl_data)
+ if app_def_docs:
+ return app_def_docs
+
+ class_path, nxdl_elem_path, elist = get_inherited_nodes(
+ nxdl_path, elem=copy.deepcopy(self.nxdl_data)
+ )
+
+ path_to_check = "/ENTRY/INSTRUMENT/ELECTRONANALYSER/energy_resolution" # /physical_quantity" # == "/ENTRY/SAMPLE/flood_gun_current_env/flood_gun"
+
+ if nxdl_path == path_to_check:
+ for thing in [
+ # path,
+ # nxdl_path,
+ # class_path,
+ # nxdl_elem_path,
+ # elist
+ ]:
+ print(thing, "\n")
+ for elem in elist:
+ if nxdl_path == path_to_check:
+ # print(elem.tag)
+ # print("\t elem.attrib:", elem.attrib.keys())
+
+ if elem.tag.endswith(("group", "field", "attribute", "definition")):
+ concept_path = helpers.get_concept_path_from_elem(elem), "\n"
+ # print(concept_path)
+
+ if not docs:
+ # Only use docs from superclasses if they are not extended.
+ docs += extract_and_format_docs(elem)
+ # print("\n")
+
+ if not elist:
+ # Handle docs for attributeS
+ (_, inherited_nodes, _) = get_inherited_nodes(
+ nxdl_path, elem=copy.deepcopy(self.nxdl_data)
+ )
+ attrs = inherited_nodes[-1].findall(f"{self.nxs_namespace}attribute")
+ for attr in attrs:
+ if attr.attrib["name"] == path.split("@")[-1]:
+ docs += extract_and_format_docs(attr)
+
+ return docs
+
def ensure_and_get_parent_node(self, path: str, undocumented_paths) -> h5py.Group:
"""Returns the parent if it exists for a given path else creates the parent group."""
parent_path = path[0 : path.rindex("/")] or "/"
@@ -249,6 +339,11 @@ def ensure_and_get_parent_node(self, path: str, undocumented_paths) -> h5py.Grou
if attrs is not None:
grp.attrs["NX_class"] = attrs["type"]
+
+ docs = self.__nxdl_docs(parent_path)
+ if docs:
+ grp.attrs["docs"] = docs
+
return grp
return self.output_nexus[parent_path_hdf5]
@@ -263,6 +358,8 @@ def add_units_key(dataset, path):
dataset.attrs["units"] = self.data[units_key]
for path, value in self.data.items():
+ docs = self.__nxdl_docs(path)
+
try:
if path[path.rindex("/") + 1 :] == "@units":
continue
@@ -279,17 +376,22 @@ def add_units_key(dataset, path):
grp = self.ensure_and_get_parent_node(
path, self.data.undocumented.keys()
)
+
if isinstance(data, dict):
if "compress" in data.keys():
dataset = handle_dicts_entries(
- data, grp, entry_name, self.output_path, path
+ data, grp, entry_name, self.output_path, path, docs
)
+
else:
hdf5_links_for_later.append(
- [data, grp, entry_name, self.output_path, path]
+ [data, grp, entry_name, self.output_path, path, docs]
)
else:
dataset = grp.create_dataset(entry_name, data=data)
+ if docs:
+ dataset.attrs["docs"] = docs
+
except InvalidDictProvided as exc:
print(str(exc))
except Exception as exc:
@@ -305,6 +407,7 @@ def add_units_key(dataset, path):
del self.data[links[-1]]
for path, value in self.data.items():
+ docs = self.__nxdl_docs(path)
try:
if path[path.rindex("/") + 1 :] == "@units":
continue
@@ -322,19 +425,28 @@ def add_units_key(dataset, path):
add_units_key(self.output_nexus[path_hdf5], path)
else:
- # consider changing the name here the lvalue can also be group!
dataset = self.ensure_and_get_parent_node(
path, self.data.undocumented.keys()
)
dataset.attrs[entry_name[1:]] = data
+ if docs:
+ # Write docs for attributes like __docs
+ dataset.attrs[f"{entry_name[1:]}_docs"] = docs
except Exception as exc:
raise IOError(
f"Unknown error occured writing the path: {path} "
f"with the following message: {str(exc)}"
) from exc
- def write(self):
- """Writes the NeXus file with previously validated data from the reader with NXDL attrs."""
+ def write(self, write_docs: bool = False, docs_format: str = "default"):
+ """
+ Writes the NeXus file with previously validated data from the reader with NXDL attrs.
+
+ Args:
+ write_docs (bool): Write docs for the individual NeXus concepts as HDF5 attributes. The default is False.
+ """
+ self.write_docs = write_docs
+ self.docs_format = docs_format
try:
self._put_data_into_hdf5()
finally:
diff --git a/tests/dataconverter/test_writer.py b/tests/dataconverter/test_writer.py
index acc84d8d5..a6502c519 100644
--- a/tests/dataconverter/test_writer.py
+++ b/tests/dataconverter/test_writer.py
@@ -59,6 +59,29 @@ def test_write(writer):
assert test_nxs["/my_entry/nxodd_name/posint_value"].shape == (3,) # pylint: disable=no-member
+def test_write_docs(writer):
+ """Test for the Writer's write_docs option. Checks whether docs are written for NeXus concepts."""
+ writer.write(write_docs=True)
+ test_nxs = h5py.File(writer.output_path, "r")
+ # print(writer.output_path)
+ assert (
+ test_nxs["/my_entry"].attrs["docs"]
+ == "This is a dummy NXDL to test out the dataconverter."
+ )
+ assert (
+ test_nxs["/my_entry/definition"].attrs["version__docs"]
+ == "This is the version of the definition."
+ )
+ assert (
+ test_nxs["/my_entry/nxodd_name/int_value"].attrs["docs"]
+ == "A dummy entry for an int value."
+ )
+ assert (
+ test_nxs["/my_entry/required_group"].attrs["docs"]
+ == "This is a required yet empty group."
+ )
+
+
def test_write_link(writer):
"""Test for the Writer's write function.