Skip to content

Commit

Permalink
rewrite metadata collection section
Browse files Browse the repository at this point in the history
disable reorder-imports hook, as it conflicts with newest version of ruff
  • Loading branch information
rettigl committed Mar 2, 2024
1 parent 79189fa commit 5fe83b9
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 116 deletions.
12 changes: 6 additions & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repos:

- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.1.7
rev: v0.3.0
hooks:
# Run the formatter.
- id: ruff-format
Expand All @@ -23,11 +23,11 @@ repos:
rev: v1.7.1
hooks:
- id: mypy
- repo: https://github.com/asottile/reorder_python_imports
rev: v3.8.2
hooks:
- id: reorder-python-imports
args: [--application-directories, '.:src', --py36-plus]
#- repo: https://github.com/asottile/reorder_python_imports
# rev: v3.8.2
# hooks:
# - id: reorder-python-imports
# args: [--application-directories, '.:src', --py36-plus]
- repo: https://github.com/asottile/pyupgrade
rev: v2.37.3
hooks:
Expand Down
2 changes: 1 addition & 1 deletion specsscan/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def load_scan(
self._scan_info,
self.config,
dim,
metadata=metadata,
metadata=copy.deepcopy(metadata),
collect_metadata=collect_metadata,
),
**{"loader": loader_dict},
Expand Down
216 changes: 107 additions & 109 deletions specsscan/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def handle_meta(
scan_info: dict,
config: dict,
dim: str,
metadata: dict = {},
metadata: dict = None,
collect_metadata: bool = False,
) -> dict:
"""Helper function for the handling metadata from different files
Expand All @@ -367,10 +367,14 @@ def handle_meta(
archiver needed for NeXus conversion. Defaults to False.
Returns:
dict: metadata_dict: metadata dictionary containing additional metadata from the EPICS
dict: metadata dictionary containing additional metadata from the EPICS
archive.
"""

if metadata is None:
metadata = {}

print("Gathering metadata from different locations")
# get metadata from LUT dataframe
lut_meta = {}
energy_scan_mode = "fixed"
Expand All @@ -386,138 +390,132 @@ def handle_meta(
if len(set(kinetic_energy)) > 1 and scan_info["ScanType"] == "voltage":
energy_scan_mode = "sweep"

scan_meta = complete_dictionary(lut_meta, scan_info) # merging two dictionaries
metadata["scan_info"] = complete_dictionary(
metadata.get("scan_info", {}),
complete_dictionary(lut_meta, scan_info),
) # merging dictionaries

# Get metadata from Epics archive, if not present already
if collect_metadata:
metadata_dict = get_archive_meta(
scan_meta,
metadata,
config,
)
print("Collecting time stamps...")
if "time" in metadata["scan_info"]:
time_list = [metadata["scan_info"]["time"][0], metadata["scan_info"]["time"][-1]]
elif "StartTime" in metadata["scan_info"]:
time_list = [metadata["scan_info"]["StartTime"]]
else:
metadata_dict = {"scan_info": scan_meta}
raise ValueError("Could not find timestamps in scan info.")

dt_list_iso = [time.replace(".", "-").replace(" ", "T") for time in time_list]
datetime_list = [dt.datetime.fromisoformat(dt_iso) for dt_iso in dt_list_iso]
ts_from = dt.datetime.timestamp(datetime_list[0]) # POSIX timestamp
ts_to = dt.datetime.timestamp(datetime_list[-1]) # POSIX timestamp
metadata["timing"] = {
"acquisition_start": dt.datetime.utcfromtimestamp(ts_from)
.replace(tzinfo=dt.timezone.utc)
.isoformat(),
"acquisition_stop": dt.datetime.utcfromtimestamp(ts_to)
.replace(tzinfo=dt.timezone.utc)
.isoformat(),
"acquisition_duration": int(ts_to - ts_from),
"collection_time": float(ts_to - ts_from),
}

if collect_metadata:
# Get metadata from Epics archive if not present already
start = dt.datetime.utcfromtimestamp(ts_from).isoformat()

# replace metadata names by epics channels
try:
replace_dict = config["epics_channels"]
for key in list(metadata["scan_info"]):
if key.lower() in replace_dict:
metadata["scan_info"][replace_dict[key.lower()]] = metadata["scan_info"][key]
metadata["scan_info"].pop(key)
epics_channels = replace_dict.values()
except KeyError:
epics_channels = []

channels_missing = set(epics_channels) - set(metadata["scan_info"].keys())
if channels_missing:
print("Collecting data from the EPICS archive...")
for channel in channels_missing:
try:
_, vals = get_archiver_data(
archiver_url=config.get("archiver_url"),
archiver_channel=channel,
ts_from=ts_from,
ts_to=ts_to,
)
metadata["scan_info"][f"{channel}"] = np.mean(vals)

metadata_dict = complete_dictionary(metadata_dict, metadata) # merging two dictionaries
except IndexError:
metadata["scan_info"][f"{channel}"] = np.nan
print(
f"Data for channel {channel} doesn't exist for time {start}",
)
except HTTPError as exc:
print(
f"Incorrect URL for the archive channel {channel}. "
"Make sure that the channel name and file start and end times are "
"correct.",
)
print("Error code: ", exc)
except URLError as exc:
print(
f"Cannot access the archive URL for channel {channel}. "
f"Make sure that you are within the FHI network."
f"Skipping over channels {channels_missing}.",
)
print("Error code: ", exc)
break

metadata_dict["scan_info"]["energy_scan_mode"] = energy_scan_mode
metadata["scan_info"]["energy_scan_mode"] = energy_scan_mode

lens_modes_all = {
"real": config["spa_params"]["calib2d_dict"]["supported_space_modes"],
"reciprocal": config["spa_params"]["calib2d_dict"]["supported_angle_modes"],
}
lens_mode = scan_meta["LensMode"]
lens_mode = metadata["scan_info"]["LensMode"]
for projection, mode_list in lens_modes_all.items():
if lens_mode in mode_list:
metadata_dict["scan_info"]["projection"] = projection
metadata["scan_info"]["projection"] = projection
fast = "Angle" if projection == "reciprocal" else "Position"
metadata_dict["scan_info"]["scheme"] = (
metadata["scan_info"]["scheme"] = (
"angular dispersive" if projection == "reciprocal" else "spatial dispersive"
)

metadata_dict["scan_info"]["slow_axes"] = dim
metadata_dict["scan_info"]["fast_axes"] = ["Ekin", fast]
metadata["scan_info"]["slow_axes"] = dim
metadata["scan_info"]["fast_axes"] = ["Ekin", fast]

print("Done!")

return metadata_dict
return metadata


def get_archive_meta(
scan_meta: dict,
metadata: dict,
config: dict,
) -> dict:
"""Function to collect the EPICS archive metadata for the handle_meta function.
def get_archiver_data(
archiver_url: str,
archiver_channel: str,
ts_from: float,
ts_to: float,
) -> tuple[np.ndarray, np.ndarray]:
"""Extract time stamps and corresponding data from and EPICS archiver instance
Args:
scan_meta (dict): scan_metadata class dict
metadata (dict): external metadata dict
config (dict): config dictionary
Raises:
ValueError: Raised if time stamps cannot be found
archiver_url (str): URL of the archiver data extraction interface
archiver_channel (str): EPICS channel to extract data for
ts_from (float): starting time stamp of the range of interest
ts_to (float): ending time stamp of the range of interest
Returns:
dict: metadata dict completed with archiver meta data
tuple[List, List]: The extracted time stamps and corresponding data
"""

metadata_dict = {}
if "time" in scan_meta:
time_list = [scan_meta["time"][0], scan_meta["time"][-1]]
elif "StartTime" in scan_meta:
time_list = [scan_meta["StartTime"]]
else:
raise ValueError("Could not find timestamps in scan info.")

dt_list_iso = [time.replace(".", "-").replace(" ", "T") for time in time_list]
datetime_list = [dt.datetime.fromisoformat(dt_iso) for dt_iso in dt_list_iso]
ts_from = dt.datetime.timestamp(datetime_list[0]) # POSIX timestamp
ts_to = dt.datetime.timestamp(datetime_list[-1]) # POSIX timestamp
metadata_dict["timing"] = {
"acquisition_start": dt.datetime.utcfromtimestamp(ts_from)
.replace(tzinfo=dt.timezone.utc)
.isoformat(),
"acquisition_stop": dt.datetime.utcfromtimestamp(ts_to)
.replace(tzinfo=dt.timezone.utc)
.isoformat(),
"acquisition_duration": int(ts_to - ts_from),
"collection_time": float(ts_to - ts_from),
}
filestart = dt.datetime.utcfromtimestamp(ts_from).isoformat() # Epics time in UTC?
fileend = dt.datetime.utcfromtimestamp(ts_to).isoformat()

scan_meta = complete_dictionary(scan_meta, metadata.get("scan_info", {}))

try:
replace_dict = config["epics_channels"]
for key in list(scan_meta):
if key.lower() in replace_dict:
scan_meta[replace_dict[key.lower()]] = scan_meta[key]
scan_meta.pop(key)
epics_channels = replace_dict.values()
except KeyError:
epics_channels = []
metadata_dict["scan_info"] = scan_meta

channels_missing = set(epics_channels) - set(scan_meta.keys())
if channels_missing:
print("Collecting data from the EPICS archive...")
for channel in channels_missing:
try:
req_str = (
"http://aa0.fhi-berlin.mpg.de:17668/retrieval/"
+ "data/getData.json?pv="
+ channel
+ "&from="
+ filestart
+ "Z&to="
+ fileend
+ "Z"
)
with urlopen(req_str) as req:
data = json.load(req)
vals = [x["val"] for x in data[0]["data"]]
metadata_dict["scan_info"][f"{channel}"] = sum(vals) / len(vals)
except (IndexError, ZeroDivisionError):
metadata_dict["scan_info"][f"{channel}"] = np.nan
print(f"Data for channel {channel} doesn't exist for time {filestart}")
except HTTPError as error:
print(
f"Incorrect URL for the archive channel {channel}. "
"Make sure that the channel name, file start "
"and end times are correct.",
)
print("Error code: ", error)
except URLError as error:
print(
f"Cannot access the archive URL for channel {channel}. "
f"Make sure that you are within the FHI network."
f"Skipping over channels {channels_missing}.",
)
print("Error code: ", error)
break
return metadata_dict
iso_from = dt.datetime.utcfromtimestamp(ts_from).isoformat()
iso_to = dt.datetime.utcfromtimestamp(ts_to).isoformat()
req_str = archiver_url + archiver_channel + "&from=" + iso_from + "Z&to=" + iso_to + "Z"
with urlopen(req_str) as req:
data = json.load(req)
secs = [x["secs"] + x["nanos"] * 1e-9 for x in data[0]["data"]]
vals = [x["val"] for x in data[0]["data"]]

return (np.asarray(secs), np.asarray(vals))


def find_scan(path: Path, scan: int) -> list[Path]:
Expand Down

0 comments on commit 5fe83b9

Please sign in to comment.