Source code for ewoksbm08.io.read_xas

import logging
import re

import h5py
import numpy
from blissdata.h5api.dynamic_hdf5 import Group

from .types import XdiData

logger = logging.getLogger(__name__)

GroupType = Group | h5py.Group


[docs] def read_xas_hdf5( h5scan: GroupType, mono_counter: str, crystal_motor: str, counters: list[str], mca_counters: list[str], livetime_normalization: float | None = None, skip_mca: dict[str, list[int | None]] = None, mono_edge_theoretical: float | None = None, mono_edge_experimental: float | None = None, ) -> XdiData: """ :param h5scan: NXentry group of one Bliss scan. :param mono_counter: energy or theta counter. :param crystal_motor: defines the mono crystal that is selected. :param counters: map name in the measurement group to name in XDI. :param mca_counters: counters the be extracted for all MCA detector. Use `skip_mca` to skip some. :param livetime_normalization: normalize `mca_counters` to a common live-time. :param skip_mca: scan MCA detector. :param mono_edge_theoretical: theoretical edge position in `mono_counter` units. :param mono_edge_experimental: experimental edge position in `mono_counter` units. :returns: XAS scan object """ xdi_metadata = { "Beamline": {}, } # Scan start_time = h5scan["start_time"][()].decode() end_time = h5scan["end_time"][()].decode() xdi_metadata["Scan"] = {"start_time": start_time, "end_time": end_time} # Facility instrument = h5scan["instrument"] if "machine" in instrument: current = instrument["machine/current"][()] filling_mode = instrument["machine/filling_mode"][()].decode() else: current = None filling_mode = None xdi_metadata["Facility"] = { "current": current, "fillingMode": filling_mode, } # Mono si_hkl = _get_si_hkl(h5scan, crystal_motor) d_spacing = _mono_d_spacing(*si_hkl) si_hkl_string = "".join(map(str, si_hkl)) xdi_metadata["Mono"] = {"name": f"Si {si_hkl_string}", "d_spacing": d_spacing} # Energy in eV from monochromator counter measurement = h5scan["measurement"] if mono_counter not in measurement: raise ValueError( f"{mono_counter!r} is not in {measurement.file.filename}::{measurement.name}" ) mono_dset = measurement[mono_counter] energy = _convert_to_energy( mono_dset, d_spacing, mono_edge_theoretical, mono_edge_experimental ) column_names = ["energy"] column_data = [energy] # Regular counters if counters: ctr_column_names, ctr_column_data = _get_counter_columns(h5scan, counters) column_names += ctr_column_names column_data += ctr_column_data # MCA counters if mca_counters: mca_column_names, mca_column_data = _get_mca_columns( h5scan, mca_counters, livetime_normalization, skip_mca or {} ) column_names += mca_column_names column_data += mca_column_data # Ensure all columns have the same length lengths = [len(values) for values in column_data] if len(set(lengths)) > 1: n = min(lengths) column_data = [values[:n] for values in column_data] return XdiData( column_names=column_names, column_data=column_data, xdi_metadata=xdi_metadata )
def _convert_to_energy( mono_dset: h5py.Dataset, d_spacing: float, mono_edge_theoretical: float | None, mono_edge_experimental: float | None, ) -> numpy.ndarray: """Energy in eV from theta in radians or energy in keV or eV.""" mono_values = mono_dset[()] if mono_edge_theoretical is not None and mono_edge_experimental is not None: mono_values += round(mono_edge_theoretical - mono_edge_experimental, 7) # TODO: should this be fixed in blissdata? When there are no units, it waits retry_timeout # seconds and then returns, even when the scan is over. mono_units = dict(mono_dset.attrs).get("units", "").lower() if mono_units == "ev": return mono_values if mono_units == "kev": return mono_values * 1000 if mono_units in ("deg", "degree", "degrees"): mono_values = numpy.radians(mono_values) mono_units = "" if mono_units in ("", "rad", "radian", "radians"): return _theta_to_energy(mono_values, d_spacing) raise ValueError(f"Unknown monochromator counter units {mono_units!r}") def _mono_d_spacing(si_h: int, si_k: int, si_l: int) -> float: """D-spacing from Si HKL planes in Å. Cubic lattice: dhkl(Å) = A0(Å) / sqrt(h^2+k^2+l^2) """ si_lattice_constant = 5.42944537 # Å return si_lattice_constant / numpy.sqrt(si_h**2 + si_k**2 + si_l**2) def _theta_to_energy(theta_angle: float | numpy.ndarray, d_spacing: float) -> float: """Converts the monochromator angle in radians into energy given a monochromator crystal d-spacing. Bragg's Law: E(eV) = (h * c) / (2 * dhkl(Å) * sinθ) """ hc = 12398.42014541 # Å/eV sin_theta = numpy.sin(theta_angle) return numpy.round(hc / (2 * d_spacing * sin_theta), 3) def _get_counter_columns( h5scan: GroupType, counters: list[str] ) -> tuple[list[str], list[numpy.ndarray]]: measurement = h5scan["measurement"] column_names = [] column_data = [] for name in counters: if name in measurement: column_names.append(name) column_data.append(measurement[name][()]) else: logger.warning( "Skip: regular counter %r not in %r", name, f"{measurement.file.filename}::{measurement.name}", ) return column_names, column_data def _get_mca_columns( h5scan: GroupType, mca_counters: list[str], livetime_normalization: float | None, skip_mca: dict[str, list[int]], ) -> tuple[list[str], list[numpy.ndarray]]: instrument = h5scan["instrument"] mca_detectors = _get_mca_detectors(instrument) if not mca_detectors: logger.warning( "No MCA detector found in %s::%s", h5scan.file.filename, instrument.name ) return [], [] if livetime_normalization is not None and livetime_normalization <= 0: livetime_normalization = _get_count_time(h5scan) or -1 column_short_names = [] column_prefixes = [] column_data = [] for (controller_name, detector_index), detector in mca_detectors.items(): skip_indices = skip_mca.get(controller_name, []) if detector_index in skip_indices: continue for counter_name in mca_counters: values = _mca_counter_values(detector, counter_name, livetime_normalization) if values is None: continue column_short_name = f"{counter_name}_{detector_index}" column_short_names.append(column_short_name) column_prefixes.append(controller_name) column_data.append(values) if len(column_short_names) != len(set(column_short_names)): column_names = [ f"{prefix}_{name}" for prefix, name in zip(column_prefixes, column_short_names) ] else: column_names = column_short_names return column_names, column_data def _get_mca_detectors(instrument: GroupType) -> dict[tuple[str, int], GroupType]: """Find all HDF5 groups that belong to a detector index of an MCA controller.""" groups = {} for name in instrument: item = instrument[name] if not isinstance(item, GroupType): continue if "type" in item: detector_type = item["type"][()].decode() else: continue if detector_type == "mca": match = re.match(r"^(.*?)_det(\d+)$", name) if match: controller_name = match.group(1) detector_index = int(match.group(2)) groups[(controller_name, detector_index)] = item return groups def _mca_counter_values( detector: GroupType, counter_name: str, livetime_normalization: float | None, ) -> numpy.ndarray | None: """Return the counter values (if present) for a detector index of an MCA controller.""" if counter_name not in detector: logger.warning( "Skip: MCA counter %r not in %r", counter_name, f"{detector.file.filename}::{detector.name}", ) return None values = detector[counter_name][()] # Extract live-time to normalize to from elapsed time per point if livetime_normalization is not None and livetime_normalization <= 0: if "elapsed_time" in detector: elapsed_time = detector["elapsed_time"][()] livetime_normalization = numpy.nanmedian(elapsed_time) logger.info( "Median elapsed time per scan point: %s seconds", livetime_normalization ) else: livetime_normalization = None logger.warning( "Skip live-time normalization of MCA counter %r since %r is not in %r", counter_name, "elapsed_time", f"{detector.file.filename}::{detector.name}", ) # Normalize counts to a common live-time if livetime_normalization is not None: if "live_time" in detector: effective_livetime = detector["live_time"][()] if values.size != effective_livetime.size: n = min(values.size, effective_livetime.size) values = values[:n] effective_livetime = effective_livetime[:n] values = values / effective_livetime * livetime_normalization else: logger.warning( "Skip live-time normalization of MCA counter %r since %r is not in %r", counter_name, "live_time", f"{detector.file.filename}::{detector.name}", ) return values def _get_count_time(h5scan: GroupType) -> float | None: if "scan_parameters" in h5scan: scan_parameters = h5scan["scan_parameters"] if "count_time" in scan_parameters: count_time = scan_parameters["count_time"][()] logger.info("User preset count time: %s seconds", count_time) return count_time def _get_si_hkl(h5scan: GroupType, crystal_motor: str) -> tuple[int, int, int]: positioners = h5scan["instrument/positioners"] if crystal_motor not in positioners: raise ValueError( f"{crystal_motor!r} is not in {positioners.file.filename}::{positioners.name}" ) if positioners[crystal_motor][()] > 0: return 3, 1, 1 else: return 1, 1, 1