From 7668b28df6e033f55c49a2f0ae96eaea7ac3ab85 Mon Sep 17 00:00:00 2001 From: Peter Chang Date: Fri, 16 Jun 2023 11:25:41 +0100 Subject: [PATCH 1/3] Refactor nexus.py To separate out functions and classes not related to reading NeXus files --- pynxtools/nexus/nexus.py | 981 +-------------------------------- pynxtools/nexus/nxdl_utils.py | 988 ++++++++++++++++++++++++++++++++++ 2 files changed, 990 insertions(+), 979 deletions(-) create mode 100644 pynxtools/nexus/nxdl_utils.py diff --git a/pynxtools/nexus/nexus.py b/pynxtools/nexus/nexus.py index ac1d8b36c..cce7b0f14 100644 --- a/pynxtools/nexus/nexus.py +++ b/pynxtools/nexus/nexus.py @@ -3,990 +3,13 @@ """ import os -import xml.etree.ElementTree as ET -from functools import lru_cache -from glob import glob + import sys import logging -import textwrap import h5py import click - -class NxdlAttributeError(Exception): - """An exception for throwing an error when an Nxdl attribute is not found.""" - - -def get_app_defs_names(): - """Returns all the AppDef names without their extension: .nxdl.xml""" - app_def_path_glob = f"{get_nexus_definitions_path()}{os.sep}applications{os.sep}*.nxdl*" - contrib_def_path_glob = (f"{get_nexus_definitions_path()}{os.sep}" - f"contributed_definitions{os.sep}*.nxdl*") - files = sorted(glob(app_def_path_glob)) + sorted(glob(contrib_def_path_glob)) - return [os.path.basename(file).split(".")[0] for file in files] + ["NXroot"] - - -@lru_cache(maxsize=None) -def get_xml_root(file_path): - """Reducing I/O time by caching technique""" - - return ET.parse(file_path).getroot() - - -def get_nexus_definitions_path(): - """Check NEXUS_DEF_PATH variable. -If it is empty, this function is filling it""" - try: # either given by sys env - return os.environ['NEXUS_DEF_PATH'] - except KeyError: # or it should be available locally under the dir 'definitions' - local_dir = os.path.abspath(os.path.dirname(__file__)) - return os.path.join(local_dir, f"..{os.sep}definitions") - - -def get_hdf_root(hdf_node): - """Get the root HDF5 node""" - node = hdf_node - while node.name != '/': - node = node.parent - return node - - -def get_hdf_parent(hdf_info): - """Get the parent of an hdf_node in an hdf_info""" - if 'hdf_path' not in hdf_info: - return hdf_info['hdf_node'].parent - node = get_hdf_root(hdf_info['hdf_node']) if 'hdf_root' not in hdf_info \ - else hdf_info['hdf_root'] - for child_name in hdf_info['hdf_path'].split('/'): - node = node[child_name] - return node - - -def get_parent_path(hdf_name): - """Get parent path""" - return '/'.join(hdf_name.split('/')[:-1]) - - -def get_hdf_info_parent(hdf_info): - """Get the hdf_info for the parent of an hdf_node in an hdf_info""" - if 'hdf_path' not in hdf_info: - return {'hdf_node': hdf_info['hdf_node'].parent} - node = get_hdf_root(hdf_info['hdf_node']) if 'hdf_root' not in hdf_info \ - else hdf_info['hdf_root'] - for child_name in hdf_info['hdf_path'].split('/')[1:-1]: - node = node[child_name] - return {'hdf_node': node, 'hdf_path': get_parent_path(hdf_info['hdf_path'])} - - -def get_nx_class_path(hdf_info): - """Get the full path of an HDF5 node using nexus classes -in case of a field, end with the field name""" - hdf_node = hdf_info['hdf_node'] - if hdf_node.name == '/': - return '' - if isinstance(hdf_node, h5py.Group): - return get_nx_class_path(get_hdf_info_parent(hdf_info)) + '/' + \ - (hdf_node.attrs['NX_class'] if 'NX_class' in hdf_node.attrs.keys() else - hdf_node.name.split('/')[-1]) - if isinstance(hdf_node, h5py.Dataset): - return get_nx_class_path( - get_hdf_info_parent(hdf_info)) + '/' + hdf_node.name.split('/')[-1] - return '' - - -def get_nxdl_entry(hdf_info): - """Get the nxdl application definition for an HDF5 node""" - entry = hdf_info - while isinstance(entry['hdf_node'], h5py.Dataset) or \ - 'NX_class' not in entry['hdf_node'].attrs.keys() or \ - entry['hdf_node'].attrs['NX_class'] != 'NXentry': - entry = get_hdf_info_parent(entry) - if entry['hdf_node'].name == '/': - return 'NO NXentry found' - try: - nxdef = entry['hdf_node']['definition'][()] - return nxdef.decode() - except KeyError: # 'NO Definition referenced' - return "NXentry" - - -def get_nx_class(nxdl_elem): - """Get the nexus class for a NXDL node""" - if 'category' in nxdl_elem.attrib.keys(): - return None - try: - return nxdl_elem.attrib['type'] - except KeyError: - return 'NX_CHAR' - - -def get_nx_namefit(hdf_name, name, name_any=False): - """Checks if an HDF5 node name corresponds to a child of the NXDL element -uppercase letters in front can be replaced by arbitraty name, but -uppercase to lowercase match is preferred, -so such match is counted as a measure of the fit""" - if name == hdf_name: - return len(name) * 2 - # count leading capitals - counting = 0 - while counting < len(name) and name[counting].upper() == name[counting]: - counting += 1 - if name_any or counting == len(name) or \ - (counting > 0 and hdf_name.endswith(name[counting:])): # if potential fit - # count the matching chars - fit = 0 - for i in range(min(counting, len(hdf_name))): - if hdf_name[i].upper() == name[i]: - fit += 1 - else: - break - if fit == min(counting, len(hdf_name)): # accept only full fits as better fits - return fit - return 0 - return -1 # no fit - - -def get_nx_classes(): - """Read base classes from the NeXus definition folder. -Check each file in base_classes, applications, contributed_definitions. -If its category attribute is 'base', then it is added to the list. """ - base_classes = sorted(glob(os.path.join(get_nexus_definitions_path(), - 'base_classes', '*.nxdl.xml'))) - applications = sorted(glob(os.path.join(get_nexus_definitions_path(), - 'applications', '*.nxdl.xml'))) - contributed = sorted(glob(os.path.join(get_nexus_definitions_path(), - 'contributed_definitions', '*.nxdl.xml'))) - nx_clss = [] - for nexus_file in base_classes + applications + contributed: - root = get_xml_root(nexus_file) - if root.attrib['category'] == 'base': - nx_clss.append(str(nexus_file[nexus_file.rindex(os.sep) + 1:])[:-9]) - nx_clss = sorted(nx_clss) - return nx_clss - - -def get_nx_units(): - """Read unit kinds from the NeXus definition/nxdlTypes.xsd file""" - filepath = f"{get_nexus_definitions_path()}{os.sep}nxdlTypes.xsd" - root = get_xml_root(filepath) - units_and_type_list = [] - for child in root: - for i in child.attrib.values(): - units_and_type_list.append(i) - flag = False - for line in units_and_type_list: - if line == 'anyUnitsAttr': - flag = True - nx_units = [] - elif 'NX' in line and flag is True: - nx_units.append(line) - elif line == 'primitiveType': - flag = False - else: - pass - return nx_units - - -def get_nx_attribute_type(): - """Read attribute types from the NeXus definition/nxdlTypes.xsd file""" - filepath = get_nexus_definitions_path() + '/nxdlTypes.xsd' - root = get_xml_root(filepath) - units_and_type_list = [] - for child in root: - for i in child.attrib.values(): - units_and_type_list.append(i) - flag = False - for line in units_and_type_list: - if line == 'primitiveType': - flag = True - nx_types = [] - elif 'NX' in line and flag is True: - nx_types.append(line) - elif line == 'anyUnitsAttr': - flag = False - else: - pass - return nx_types - - -def get_node_name(node): - '''Node - xml node. Returns html documentation name. - Either as specified by the 'name' or taken from the type (nx_class). - Note that if only class name is available, the NX prefix is removed and - the string is converted to UPPER case.''' - if 'name' in node.attrib.keys(): - name = node.attrib['name'] - else: - name = node.attrib['type'] - if name.startswith('NX'): - name = name[2:].upper() - return name - - -def belongs_to(nxdl_elem, child, name, class_type=None, hdf_name=None): - """Checks if an HDF5 node name corresponds to a child of the NXDL element -uppercase letters in front can be replaced by arbitraty name, but -uppercase to lowercase match is preferred""" - if class_type and get_nx_class(child) != class_type: - return False - act_htmlname = get_node_name(child) - chk_name = hdf_name or name - if act_htmlname == chk_name: - return True - if not hdf_name: # search for name fits is only allowed for hdf_nodes - return False - try: # check if nameType allows different name - name_any = bool(child.attrib['nameType'] == "any") - except KeyError: - name_any = False - params = [act_htmlname, chk_name, name_any, nxdl_elem, child, name] - return belongs_to_capital(params) - - -def belongs_to_capital(params): - """Checking continues for Upper case""" - (act_htmlname, chk_name, name_any, nxdl_elem, child, name) = params - # or starts with capital and no reserved words used - if (name_any or 'A' <= act_htmlname[0] <= 'Z') and \ - name != 'doc' and name != 'enumeration': - fit = get_nx_namefit(chk_name, act_htmlname, name_any) # check if name fits - if fit < 0: - return False - for child2 in nxdl_elem: - if get_local_name_from_xml(child) != \ - get_local_name_from_xml(child2) or get_node_name(child2) == act_htmlname: - continue - # check if the name of another sibling fits better - name_any2 = "nameType" in child2.attrib.keys() and child2.attrib["nameType"] == "any" - fit2 = get_nx_namefit(chk_name, get_node_name(child2), name_any2) - if fit2 > fit: - return False - # accept this fit - return True - return False - - -def get_local_name_from_xml(element): - """Helper function to extract the element tag without the namespace.""" - return element.tag[element.tag.rindex("}") + 1:] - - -def get_own_nxdl_child_reserved_elements(child, name, nxdl_elem): - """checking reserved elements, like doc, enumeration""" - if get_local_name_from_xml(child) == 'doc' and name == 'doc': - if nxdl_elem.get('nxdlbase'): - child.set('nxdlbase', nxdl_elem.get('nxdlbase')) - child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) - child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/doc') - return child - if get_local_name_from_xml(child) == 'enumeration' and name == 'enumeration': - if nxdl_elem.get('nxdlbase'): - child.set('nxdlbase', nxdl_elem.get('nxdlbase')) - child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) - child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/enumeration') - return child - return False - - -def get_own_nxdl_child_base_types(child, class_type, nxdl_elem, name, hdf_name): - """checking base types of group, field,m attribute""" - if get_local_name_from_xml(child) == 'group': - if (class_type is None or (class_type and get_nx_class(child) == class_type)) and \ - belongs_to(nxdl_elem, child, name, class_type, hdf_name): - if nxdl_elem.get('nxdlbase'): - child.set('nxdlbase', nxdl_elem.get('nxdlbase')) - child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) - child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/' + get_node_name(child)) - return child - if get_local_name_from_xml(child) == 'field' and \ - belongs_to(nxdl_elem, child, name, None, hdf_name): - if nxdl_elem.get('nxdlbase'): - child.set('nxdlbase', nxdl_elem.get('nxdlbase')) - child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) - child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/' + get_node_name(child)) - return child - if get_local_name_from_xml(child) == 'attribute' and \ - belongs_to(nxdl_elem, child, name, None, hdf_name): - if nxdl_elem.get('nxdlbase'): - child.set('nxdlbase', nxdl_elem.get('nxdlbase')) - child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) - child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/' + get_node_name(child)) - return child - return False - - -def get_own_nxdl_child(nxdl_elem, name, class_type=None, hdf_name=None, nexus_type=None): - """Checks if an NXDL child node fits to the specific name (either nxdl or hdf) - name - nxdl name - class_type - nxdl type or hdf classname (for groups, it is obligatory) - hdf_name - hdf name""" - for child in nxdl_elem: - if 'name' in child.attrib and child.attrib['name'] == name: - if nxdl_elem.get('nxdlbase'): - child.set('nxdlbase', nxdl_elem.get('nxdlbase')) - child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) - child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/' + get_node_name(child)) - return child - for child in nxdl_elem: - if "name" in child.attrib and child.attrib["name"] == name: - child.set('nxdlbase', nxdl_elem.get('nxdlbase')) - return child - - for child in nxdl_elem: - result = get_own_nxdl_child_reserved_elements(child, name, nxdl_elem) - if result is not False: - return result - if nexus_type and get_local_name_from_xml(child) != nexus_type: - continue - result = get_own_nxdl_child_base_types(child, class_type, nxdl_elem, name, hdf_name) - if result is not False: - return result - return None - - -def find_definition_file(bc_name): - """find the nxdl file corresponding to the name. - Note that it first checks in contributed and goes beyond only if no contributed found""" - bc_filename = None - for nxdl_folder in ['contributed_definitions', 'base_classes', 'applications']: - if os.path.exists(f"{get_nexus_definitions_path()}{os.sep}" - f"{nxdl_folder}{os.sep}{bc_name}.nxdl.xml"): - bc_filename = f"{get_nexus_definitions_path()}{os.sep}" \ - f"{nxdl_folder}{os.sep}{bc_name}.nxdl.xml" - break - return bc_filename - - -def get_nxdl_child(nxdl_elem, name, class_type=None, hdf_name=None, nexus_type=None, go_base=True): # pylint: disable=too-many-arguments - """Get the NXDL child node corresponding to a specific name -(e.g. of an HDF5 node,or of a documentation) note that if child is not found in application -definition, it also checks for the base classes""" - # search for possible fits for hdf_nodes : skipped - # only exact hits are returned when searching an nxdl child - own_child = get_own_nxdl_child(nxdl_elem, name, class_type, hdf_name, nexus_type) - if own_child is not None: - return own_child - if not go_base: - return None - bc_name = get_nx_class(nxdl_elem) # check in the base class, app def or contributed - if bc_name[2] == '_': # filter primitive types - return None - if bc_name == "group": # Check if it is the root element. Then send to NXroot.nxdl.xml - bc_name = "NXroot" - bc_filename = find_definition_file(bc_name) - if not bc_filename: - raise ValueError('nxdl file not found in definitions folder!') - bc_obj = ET.parse(bc_filename).getroot() - bc_obj.set('nxdlbase', bc_filename) - if 'category' in bc_obj.attrib: - bc_obj.set('nxdlbase_class', bc_obj.attrib['category']) - bc_obj.set('nxdlpath', '') - return get_own_nxdl_child(bc_obj, name, class_type, hdf_name, nexus_type) - - -def get_required_string(nxdl_elem): - """Check for being REQUIRED, RECOMMENDED, OPTIONAL, NOT IN SCHEMA""" - if nxdl_elem is None: - return "<>" - is_optional = 'optional' in nxdl_elem.attrib.keys() \ - and nxdl_elem.attrib['optional'] == "true" - is_minoccurs = 'minOccurs' in nxdl_elem.attrib.keys() \ - and nxdl_elem.attrib['minOccurs'] == "0" - is_recommended = 'recommended' in nxdl_elem.attrib.keys() \ - and nxdl_elem.attrib['recommended'] == "true" - - if is_recommended: - return "<>" - if is_optional or is_minoccurs: - return "<>" - # default optionality: in BASE CLASSES is true; in APPLICATIONS is false - try: - if nxdl_elem.get('nxdlbase_class') == 'base': - return "<>" - except TypeError: - return "<>" - return "<>" - - -def chk_nxdataaxis_v2(hdf_node, name, logger): - """Check if dataset is an axis""" - own_signal = hdf_node.attrs.get('signal') # check for being a Signal - if own_signal is str and own_signal == "1": - logger.debug("Dataset referenced (v2) as NXdata SIGNAL") - own_axes = hdf_node.attrs.get('axes') # check for being an axis - if own_axes is str: - axes = own_axes.split(':') - for i in len(axes): - if axes[i] and name == axes[i]: - logger.debug("Dataset referenced (v2) as NXdata AXIS #%d", i) - return None - ownpaxis = hdf_node.attrs.get('primary') - own_axis = hdf_node.attrs.get('axis') - if own_axis is int: - # also convention v1 - if ownpaxis is int and ownpaxis == 1: - logger.debug("Dataset referenced (v2) as NXdata AXIS #%d", own_axis - 1) - else: - logger.debug( - "Dataset referenced (v2) as NXdata (primary/alternative) AXIS #%d", own_axis - 1) - return None - - -def chk_nxdataaxis(hdf_node, name, logger): - """NEXUS Data Plotting Standard v3: new version from 2014""" - if not isinstance(hdf_node, h5py.Dataset): # check if it is a field in an NXdata node - return None - parent = hdf_node.parent - if not parent or (parent and not parent.attrs.get('NX_class') == "NXdata"): - return None - signal = parent.attrs.get('signal') # chk for Signal - if signal and name == signal: - logger.debug("Dataset referenced as NXdata SIGNAL") - return None - axes = parent.attrs.get('axes') # check for default Axes - if axes is str: - if name == axes: - logger.debug("Dataset referenced as NXdata AXIS") - return None - elif axes is not None: - for i, j in enumerate(axes): - if name == j: - indices = parent.attrs.get(j + '_indices') - if indices is int: - logger.debug(f"Dataset referenced as NXdata AXIS #{indices}") - else: - logger.debug(f"Dataset referenced as NXdata AXIS #{i}") - return None - indices = parent.attrs.get(name + '_indices') # check for alternative Axes - if indices is int: - logger.debug(f"Dataset referenced as NXdata alternative AXIS #{indices}") - return chk_nxdataaxis_v2(hdf_node, name, logger) # check for older conventions - - -# below there are some functions used in get_nxdl_doc function: -def write_doc_string(logger, doc, attr): - """Simple function that prints a line in the logger if doc exists""" - if doc: - logger.debug("@" + attr + ' [NX_CHAR]') - return logger, doc, attr - - -def try_find_units(logger, elem, nxdl_path, doc, attr): - """Try to find if units is defined inside the field in the NXDL element, - otherwise try to find if units is defined as a child of the NXDL element.""" - try: # try to find if units is defined inside the field in the NXDL element - unit = elem.attrib[attr] - if doc: - logger.debug(get_node_concept_path(elem) + "@" + attr + ' [' + unit + ']') - elem = None - nxdl_path.append(attr) - except KeyError: # otherwise try to find if units is defined as a child of the NXDL element - orig_elem = elem - elem = get_nxdl_child(elem, attr, nexus_type='attribute') - if elem is not None: - if doc: - logger.debug(get_node_concept_path(orig_elem) - + "@" + attr + ' - [' + get_nx_class(elem) + ']') - nxdl_path.append(elem) - else: # if no units category were defined in NXDL: - if doc: - logger.debug(get_node_concept_path(orig_elem) - + "@" + attr + " - REQUIRED, but undefined unit category") - nxdl_path.append(attr) - return logger, elem, nxdl_path, doc, attr - - -def check_attr_name_nxdl(param): - """Check for ATTRIBUTENAME_units in NXDL (normal). -If not defined, check for ATTRIBUTENAME to see if the ATTRIBUTE -is in the SCHEMA, but no units category were defined. """ - (logger, elem, nxdl_path, doc, attr, req_str) = param - orig_elem = elem - elem2 = get_nxdl_child(elem, attr, nexus_type='attribute') - if elem2 is not None: # check for ATTRIBUTENAME_units in NXDL (normal) - elem = elem2 - if doc: - logger.debug(get_node_concept_path(orig_elem) - + "@" + attr + ' - [' + get_nx_class(elem) + ']') - nxdl_path.append(elem) - else: - # if not defined, check for ATTRIBUTENAME to see if the ATTRIBUTE - # is in the SCHEMA, but no units category were defined - elem2 = get_nxdl_child(elem, attr[:-6], nexus_type='attribute') - if elem2 is not None: - req_str = '<>' - if doc: - logger.debug(get_node_concept_path(orig_elem) - + "@" + attr + " - RECOMMENDED, but undefined unit category") - nxdl_path.append(attr) - else: # otherwise: NOT IN SCHEMA - elem = elem2 - if doc: - logger.debug(get_node_concept_path(orig_elem) + "@" + attr + " - IS NOT IN SCHEMA") - return logger, elem, nxdl_path, doc, attr, req_str - - -def try_find_default(logger, orig_elem, elem, nxdl_path, doc, attr): # pylint: disable=too-many-arguments - """Try to find if default is defined as a child of the NXDL element """ - if elem is not None: - if doc: - logger.debug(get_node_concept_path(orig_elem) - + "@" + attr + ' - [' + get_nx_class(elem) + ']') - nxdl_path.append(elem) - else: # if no default category were defined in NXDL: - if doc: - logger.debug(get_node_concept_path(orig_elem) + "@" + attr + " - [NX_CHAR]") - nxdl_path.append(attr) - return logger, elem, nxdl_path, doc, attr - - -def other_attrs(logger, orig_elem, elem, nxdl_path, doc, attr): # pylint: disable=too-many-arguments - """Handle remaining attributes """ - if elem is not None: - if doc: - logger.debug(get_node_concept_path(orig_elem) - + "@" + attr + ' - [' + get_nx_class(elem) + ']') - nxdl_path.append(elem) - else: - if doc: - logger.debug(get_node_concept_path(orig_elem) + "@" + attr + " - IS NOT IN SCHEMA") - return logger, elem, nxdl_path, doc, attr - - -def check_deprecation_enum_axis(variables, doc, elist, attr, hdf_node): - """Check for several attributes. - deprecation - enums - nxdataaxis """ - logger, elem, path = variables - dep_str = elem.attrib.get('deprecated') # check for deprecation - if dep_str: - if doc: - logger.debug("DEPRECATED - " + dep_str) - for base_elem in elist if not attr else [elem]: # check for enums - sdoc = get_nxdl_child(base_elem, 'enumeration', go_base=False) - if sdoc is not None: - if doc: - logger.debug("enumeration (" + get_node_concept_path(base_elem) + "):") - for item in sdoc: - if get_local_name_from_xml(item) == 'item': - if doc: - logger.debug("-> " + item.attrib['value']) - chk_nxdataaxis(hdf_node, path.split('/')[-1], logger) # look for NXdata reference (axes/signal) - for base_elem in elist if not attr else [elem]: # check for doc - sdoc = get_nxdl_child(base_elem, 'doc', go_base=False) - if doc: - logger.debug("documentation (" + get_node_concept_path(base_elem) + "):") - logger.debug(sdoc.text if sdoc is not None else "") - return logger, elem, path, doc, elist, attr, hdf_node - - -def get_node_concept_path(elem): - """get the short version of nxdlbase:nxdlpath""" - return str(elem.get('nxdlbase').split('/')[-1] + ":" + elem.get('nxdlpath')) - - -def get_nxdl_attr_doc( # pylint: disable=too-many-arguments,too-many-locals - elem, elist, attr, hdf_node, logger, doc, nxdl_path, req_str, path, hdf_info): - """Get nxdl documentation for an attribute""" - new_elem = [] - old_elem = elem - for elem_index, act_elem1 in enumerate(elist): - act_elem = act_elem1 - # NX_class is a compulsory attribute for groups in a nexus file - # which should match the type of the corresponding NXDL element - if attr == 'NX_class' and not isinstance(hdf_node, h5py.Dataset) and elem_index == 0: - elem = None - logger, doc, attr = write_doc_string(logger, doc, attr) - new_elem = elem - break - # units category is a compulsory attribute for any fields - if attr == 'units' and isinstance(hdf_node, h5py.Dataset): - req_str = "<>" - logger, act_elem, nxdl_path, doc, attr = try_find_units(logger, - act_elem, - nxdl_path, - doc, - attr) - # units for attributes can be given as ATTRIBUTENAME_units - elif attr.endswith('_units'): - logger, act_elem, nxdl_path, doc, attr, req_str = check_attr_name_nxdl((logger, - act_elem, - nxdl_path, - doc, - attr, - req_str)) - # default is allowed for groups - elif attr == 'default' and not isinstance(hdf_node, h5py.Dataset): - req_str = "<>" - # try to find if default is defined as a child of the NXDL element - act_elem = get_nxdl_child(act_elem, attr, nexus_type='attribute', go_base=False) - logger, act_elem, nxdl_path, doc, attr = try_find_default(logger, - act_elem1, - act_elem, - nxdl_path, - doc, - attr) - else: # other attributes - act_elem = get_nxdl_child(act_elem, attr, nexus_type='attribute', go_base=False) - if act_elem is not None: - logger, act_elem, nxdl_path, doc, attr = \ - other_attrs(logger, act_elem1, act_elem, nxdl_path, doc, attr) - if act_elem is not None: - new_elem.append(act_elem) - if req_str is None: - req_str = get_required_string(act_elem) # check for being required - if doc: - logger.debug(req_str) - variables = [logger, act_elem, path] - logger, elem, path, doc, elist, attr, hdf_node = check_deprecation_enum_axis(variables, - doc, - elist, - attr, - hdf_node) - elem = old_elem - if req_str is None and doc: - if attr != 'NX_class': - logger.debug("@" + attr + " - IS NOT IN SCHEMA") - logger.debug("") - return (req_str, get_nxdl_entry(hdf_info), nxdl_path) - - -def get_nxdl_doc(hdf_info, logger, doc, attr=False): - """Get nxdl documentation for an HDF5 node (or its attribute)""" - hdf_node = hdf_info['hdf_node'] - # new way: retrieve multiple inherited base classes - (class_path, nxdl_path, elist) = \ - get_inherited_nodes(None, nx_name=get_nxdl_entry(hdf_info), hdf_node=hdf_node, - hdf_path=hdf_info['hdf_path'] if 'hdf_path' in hdf_info else None, - hdf_root=hdf_info['hdf_root'] if 'hdf_root' in hdf_info else None) - elem = elist[0] if class_path and elist else None - if doc: - logger.debug("classpath: " + str(class_path)) - logger.debug("NOT IN SCHEMA" if elem is None else - "classes:\n" + "\n".join - (get_node_concept_path(e) for e in elist)) - # old solution with a single elem instead of using elist - path = get_nx_class_path(hdf_info) - req_str = None - if elem is None: - if doc: - logger.debug("") - return ('None', None, None) - if attr: - return get_nxdl_attr_doc(elem, elist, attr, hdf_node, logger, doc, nxdl_path, - req_str, path, hdf_info) - req_str = get_required_string(elem) # check for being required - if doc: - logger.debug(req_str) - variables = [logger, elem, path] - logger, elem, path, doc, elist, attr, hdf_node = check_deprecation_enum_axis(variables, - doc, - elist, - attr, - hdf_node) - return (req_str, get_nxdl_entry(hdf_info), nxdl_path) - - -def get_doc(node, ntype, nxhtml, nxpath): - """Get documentation""" - # URL for html documentation - anchor = '' - for n_item in nxpath: - anchor += n_item.lower() + "-" - anchor = ('https://manual.nexusformat.org/classes/', - nxhtml + "#" + anchor.replace('_', '-') + ntype) - if not ntype: - anchor = anchor[:-1] - doc = "" # RST documentation from the field 'doc' - doc_field = node.find("doc") - if doc_field is not None: - doc = doc_field.text - (index, enums) = get_enums(node) # enums - if index: - enum_str = "\n " + ("Possible values:" - if len(enums.split(',')) > 1 - else "Obligatory value:") + "\n " + enums + "\n" - else: - enum_str = "" - return anchor, doc + enum_str - - -def print_doc(node, ntype, level, nxhtml, nxpath): - """Print documentation""" - anchor, doc = get_doc(node, ntype, nxhtml, nxpath) - print(" " * (level + 1) + anchor) - preferred_width = 80 + level * 2 - wrapper = textwrap.TextWrapper(initial_indent=' ' * (level + 1), width=preferred_width, - subsequent_indent=' ' * (level + 1), expand_tabs=False, - tabsize=0) - if doc is not None: - for par in doc.split('\n'): - print(wrapper.fill(par)) - - -def get_namespace(element): - """Extracts the namespace for elements in the NXDL""" - return element.tag[element.tag.index("{"):element.tag.rindex("}") + 1] - - -def get_enums(node): - """Makes list of enumerations, if node contains any. - Returns comma separated STRING of enumeration values, if there are enum tag, - otherwise empty string.""" - # collect item values from enumeration tag, if any - namespace = get_namespace(node) - enums = [] - for enumeration in node.findall(f"{namespace}enumeration"): - for item in enumeration.findall(f"{namespace}item"): - enums.append(item.attrib["value"]) - enums = ','.join(enums) - if enums != "": - return (True, '[' + enums + ']') - return (False, "") # if there is no enumeration tag, returns empty string - - -def add_base_classes(elist, nx_name=None, elem: ET.Element = None): - """Add the base classes corresponding to the last eleme in elist to the list. Note that if -elist is empty, a nxdl file with the name of nx_name or a rather room elem is used if provided""" - if elist and nx_name is None: - nx_name = get_nx_class(elist[-1]) - # to support recursive defintions, like NXsample in NXsample, the following test is removed - # if elist and nx_name and f"{nx_name}.nxdl.xml" in (e.get('nxdlbase') for e in elist): - # return - if elem is None: - if not nx_name: - return - nxdl_file_path = find_definition_file(nx_name) - if nxdl_file_path is None: - nxdl_file_path = f"{nx_name}.nxdl.xml" - elem = ET.parse(nxdl_file_path).getroot() - elem.set('nxdlbase', nxdl_file_path) - else: - elem.set('nxdlbase', '') - if 'category' in elem.attrib: - elem.set('nxdlbase_class', elem.attrib['category']) - elem.set('nxdlpath', '') - elist.append(elem) - # add inherited base class - if 'extends' in elem.attrib and elem.attrib['extends'] != 'NXobject': - add_base_classes(elist, elem.attrib['extends']) - else: - add_base_classes(elist) - - -def set_nxdlpath(child, nxdl_elem): - """ - Setting up child nxdlbase, nxdlpath and nxdlbase_class from nxdl_element. - """ - if nxdl_elem.get('nxdlbase'): - child.set('nxdlbase', nxdl_elem.get('nxdlbase')) - child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) - child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/' + get_node_name(child)) - return child - - -def get_direct_child(nxdl_elem, html_name): - """ returns the child of nxdl_elem which has a name - corresponding to the the html documentation name html_name""" - for child in nxdl_elem: - if get_local_name_from_xml(child) in ('group', 'field', 'attribute') and \ - html_name == get_node_name(child): - decorated_child = set_nxdlpath(child, nxdl_elem) - return decorated_child - return None - - -def get_field_child(nxdl_elem, html_name): - """ returns the child of nxdl_elem which has a name - corresponding to the html documentation name html_name""" - data_child = None - for child in nxdl_elem: - if get_local_name_from_xml(child) != 'field': - continue - if get_node_name(child) == html_name: - data_child = set_nxdlpath(child, nxdl_elem) - break - return data_child - - -def get_best_nxdata_child(nxdl_elem, hdf_node, hdf_name): - """ returns the child of an NXdata nxdl_elem which has a name - corresponding to the hdf_name""" - nxdata = hdf_node.parent - signals = [] - if 'signal' in nxdata.attrs.keys(): - signals.append(nxdata.attrs.get("signal")) - if "auxiliary_signals" in nxdata.attrs.keys(): - for aux_signal in nxdata.attrs.get("auxiliary_signals"): - signals.append(aux_signal) - data_child = get_field_child(nxdl_elem, 'DATA') - data_error_child = get_field_child(nxdl_elem, 'FIELDNAME_errors') - for signal in signals: - if signal == hdf_name: - return (data_child, 100) - if hdf_name.endswith('_errors') and signal == hdf_name[:-7]: - return (data_error_child, 100) - axes = [] - if "axes" in nxdata.attrs.keys(): - for axis in nxdata.attrs.get("axes"): - axes.append(axis) - axis_child = get_field_child(nxdl_elem, 'AXISNAME') - for axis in axes: - if axis == hdf_name: - return (axis_child, 100) - return (None, 0) - - -def get_best_child(nxdl_elem, hdf_node, hdf_name, hdf_class_name, nexus_type): - """ returns the child of nxdl_elem which has a name - corresponding to the the html documentation name html_name""" - bestfit = -1 - bestchild = None - if 'name' in nxdl_elem.attrib.keys() and nxdl_elem.attrib['name'] == 'NXdata' and \ - hdf_node is not None and hdf_node.parent is not None and \ - hdf_node.parent.attrs.get('NX_class') == 'NXdata': - (fnd_child, fit) = get_best_nxdata_child(nxdl_elem, hdf_node, hdf_name) - if fnd_child is not None: - return (fnd_child, fit) - for child in nxdl_elem: - fit = -2 - if get_local_name_from_xml(child) == nexus_type and \ - (nexus_type != 'group' or get_nx_class(child) == hdf_class_name): - name_any = "nameType" in nxdl_elem.attrib.keys() and \ - nxdl_elem.attrib["nameType"] == "any" - fit = get_nx_namefit(hdf_name, get_node_name(child), name_any) - if fit > bestfit: - bestfit = fit - bestchild = set_nxdlpath(child, nxdl_elem) - return (bestchild, bestfit) - - -def walk_elist(elist, html_name): - """Handle elist from low priority inheritance classes to higher""" - for ind in range(len(elist) - 1, -1, -1): - child = get_direct_child(elist[ind], html_name) - if child is None: - # check for names fitting to a superclas definition - main_child = None - for potential_direct_parent in elist: - main_child = get_direct_child(potential_direct_parent, html_name) - if main_child is not None: - (fitting_child, _) = get_best_child(elist[ind], None, html_name, - get_nx_class(main_child), - get_local_name_from_xml(main_child)) - if fitting_child is not None: - child = fitting_child - break - elist[ind] = child - if elist[ind] is None: - del elist[ind] - continue - # override: remove low priority inheritance classes if class_type is overriden - if len(elist) > ind + 1 and get_nx_class(elist[ind]) != get_nx_class(elist[ind + 1]): - del elist[ind + 1:] - # add new base class(es) if new element brings such (and not a primitive type) - if len(elist) == ind + 1 and get_nx_class(elist[ind])[0:3] != 'NX_': - add_base_classes(elist) - return elist, html_name - - -def helper_get_inherited_nodes(hdf_info2, elist, pind, attr): - """find the best fitting name in all children""" - hdf_path, hdf_node, hdf_class_path = hdf_info2 - hdf_name = hdf_path[pind] - hdf_class_name = hdf_class_path[pind] - if pind < len(hdf_path) - (2 if attr else 1): - act_nexus_type = 'group' - elif pind == len(hdf_path) - 1 and attr: - act_nexus_type = 'attribute' - else: - act_nexus_type = 'field' if isinstance(hdf_node, h5py.Dataset) else 'group' - # find the best fitting name in all children - bestfit = -1 - html_name = None - for ind in range(len(elist) - 1, -1, -1): - newelem, fit = get_best_child(elist[ind], - hdf_node, - hdf_name, - hdf_class_name, - act_nexus_type) - if fit >= bestfit and newelem is not None: - html_name = get_node_name(newelem) - return hdf_path, hdf_node, hdf_class_path, elist, pind, attr, html_name - - -def get_hdf_path(hdf_info): - """Get the hdf_path from an hdf_info""" - if 'hdf_path' in hdf_info: - return hdf_info['hdf_path'].split('/')[1:] - return hdf_info['hdf_node'].name.split('/')[1:] - - -@lru_cache(maxsize=None) -def get_inherited_nodes(nxdl_path: str = None, # pylint: disable=too-many-arguments,too-many-locals - nx_name: str = None, elem: ET.Element = None, - hdf_node=None, hdf_path=None, hdf_root=None, attr=False): - """Returns a list of ET.Element for the given path.""" - # let us start with the given definition file - elist = [] # type: ignore[var-annotated] - add_base_classes(elist, nx_name, elem) - nxdl_elem_path = [elist[0]] - - class_path = [] # type: ignore[var-annotated] - if hdf_node is not None: - hdf_info = {'hdf_node': hdf_node} - if hdf_path: - hdf_info['hdf_path'] = hdf_path - if hdf_root: - hdf_root['hdf_root'] = hdf_root - hdf_node = hdf_info['hdf_node'] - hdf_path = get_hdf_path(hdf_info) - hdf_class_path = get_nx_class_path(hdf_info).split('/')[1:] - if attr: - hdf_path.append(attr) - hdf_class_path.append(attr) - path = hdf_path - else: - html_path = nxdl_path.split('/')[1:] - path = html_path - for pind in range(len(path)): - if hdf_node is not None: - hdf_info2 = [hdf_path, hdf_node, hdf_class_path] - [hdf_path, hdf_node, hdf_class_path, elist, - pind, attr, html_name] = helper_get_inherited_nodes(hdf_info2, elist, - pind, attr) - if html_name is None: # return if NOT IN SCHEMA - return (class_path, nxdl_elem_path, None) - else: - html_name = html_path[pind] - elist, html_name = walk_elist(elist, html_name) - if elist: - class_path.append(get_nx_class(elist[0])) - nxdl_elem_path.append(elist[0]) - return (class_path, nxdl_elem_path, elist) - - -def get_node_at_nxdl_path(nxdl_path: str = None, - nx_name: str = None, elem: ET.Element = None, - exc: bool = True): - """Returns an ET.Element for the given path. - This function either takes the name for the NeXus Application Definition - we are looking for or the root elem from a previously loaded NXDL file - and finds the corresponding XML element with the needed attributes.""" - try: - (class_path, nxdlpath, elist) = get_inherited_nodes(nxdl_path, nx_name, elem) - except ValueError as value_error: - if exc: - raise NxdlAttributeError(f"Attributes were not found for {nxdl_path}. " - "Please check this entry in the template dictionary.") \ - from value_error - return None - if class_path and nxdlpath and elist: - elem = elist[0] - else: - elem = None - if exc: - raise NxdlAttributeError(f"Attributes were not found for {nxdl_path}. " - "Please check this entry in the template dictionary.") - return elem +from .nxdl_utils import * def process_node(hdf_node, hdf_path, parser, logger, doc=True): diff --git a/pynxtools/nexus/nxdl_utils.py b/pynxtools/nexus/nxdl_utils.py new file mode 100644 index 000000000..6a8182cac --- /dev/null +++ b/pynxtools/nexus/nxdl_utils.py @@ -0,0 +1,988 @@ +# pylint: disable=too-many-lines +"""Parse NeXus definition files +""" + +import os +import xml.etree.ElementTree as ET +from functools import lru_cache +from glob import glob +import sys +import logging +import textwrap + + +class NxdlAttributeError(Exception): + """An exception for throwing an error when an Nxdl attribute is not found.""" + + +def get_app_defs_names(): + """Returns all the AppDef names without their extension: .nxdl.xml""" + app_def_path_glob = f"{get_nexus_definitions_path()}{os.sep}applications{os.sep}*.nxdl*" + contrib_def_path_glob = (f"{get_nexus_definitions_path()}{os.sep}" + f"contributed_definitions{os.sep}*.nxdl*") + files = sorted(glob(app_def_path_glob)) + sorted(glob(contrib_def_path_glob)) + return [os.path.basename(file).split(".")[0] for file in files] + ["NXroot"] + + +@lru_cache(maxsize=None) +def get_xml_root(file_path): + """Reducing I/O time by caching technique""" + + return ET.parse(file_path).getroot() + + +def get_nexus_definitions_path(): + """Check NEXUS_DEF_PATH variable. +If it is empty, this function is filling it""" + try: # either given by sys env + return os.environ['NEXUS_DEF_PATH'] + except KeyError: # or it should be available locally under the dir 'definitions' + local_dir = os.path.abspath(os.path.dirname(__file__)) + return os.path.join(local_dir, f"..{os.sep}definitions") + + +def get_hdf_root(hdf_node): + """Get the root HDF5 node""" + node = hdf_node + while node.name != '/': + node = node.parent + return node + + +def get_hdf_parent(hdf_info): + """Get the parent of an hdf_node in an hdf_info""" + if 'hdf_path' not in hdf_info: + return hdf_info['hdf_node'].parent + node = get_hdf_root(hdf_info['hdf_node']) if 'hdf_root' not in hdf_info \ + else hdf_info['hdf_root'] + for child_name in hdf_info['hdf_path'].split('/'): + node = node[child_name] + return node + + +def get_parent_path(hdf_name): + """Get parent path""" + return '/'.join(hdf_name.split('/')[:-1]) + + +def get_hdf_info_parent(hdf_info): + """Get the hdf_info for the parent of an hdf_node in an hdf_info""" + if 'hdf_path' not in hdf_info: + return {'hdf_node': hdf_info['hdf_node'].parent} + node = get_hdf_root(hdf_info['hdf_node']) if 'hdf_root' not in hdf_info \ + else hdf_info['hdf_root'] + for child_name in hdf_info['hdf_path'].split('/')[1:-1]: + node = node[child_name] + return {'hdf_node': node, 'hdf_path': get_parent_path(hdf_info['hdf_path'])} + + +def get_nx_class_path(hdf_info): + """Get the full path of an HDF5 node using nexus classes +in case of a field, end with the field name""" + hdf_node = hdf_info['hdf_node'] + if hdf_node.name == '/': + return '' + if isinstance(hdf_node, h5py.Group): + return get_nx_class_path(get_hdf_info_parent(hdf_info)) + '/' + \ + (hdf_node.attrs['NX_class'] if 'NX_class' in hdf_node.attrs.keys() else + hdf_node.name.split('/')[-1]) + if isinstance(hdf_node, h5py.Dataset): + return get_nx_class_path( + get_hdf_info_parent(hdf_info)) + '/' + hdf_node.name.split('/')[-1] + return '' + + +def get_nxdl_entry(hdf_info): + """Get the nxdl application definition for an HDF5 node""" + entry = hdf_info + while isinstance(entry['hdf_node'], h5py.Dataset) or \ + 'NX_class' not in entry['hdf_node'].attrs.keys() or \ + entry['hdf_node'].attrs['NX_class'] != 'NXentry': + entry = get_hdf_info_parent(entry) + if entry['hdf_node'].name == '/': + return 'NO NXentry found' + try: + nxdef = entry['hdf_node']['definition'][()] + return nxdef.decode() + except KeyError: # 'NO Definition referenced' + return "NXentry" + + +def get_nx_class(nxdl_elem): + """Get the nexus class for a NXDL node""" + if 'category' in nxdl_elem.attrib.keys(): + return None + try: + return nxdl_elem.attrib['type'] + except KeyError: + return 'NX_CHAR' + + +def get_nx_namefit(hdf_name, name, name_any=False): + """Checks if an HDF5 node name corresponds to a child of the NXDL element +uppercase letters in front can be replaced by arbitraty name, but +uppercase to lowercase match is preferred, +so such match is counted as a measure of the fit""" + if name == hdf_name: + return len(name) * 2 + # count leading capitals + counting = 0 + while counting < len(name) and name[counting].upper() == name[counting]: + counting += 1 + if name_any or counting == len(name) or \ + (counting > 0 and hdf_name.endswith(name[counting:])): # if potential fit + # count the matching chars + fit = 0 + for i in range(min(counting, len(hdf_name))): + if hdf_name[i].upper() == name[i]: + fit += 1 + else: + break + if fit == min(counting, len(hdf_name)): # accept only full fits as better fits + return fit + return 0 + return -1 # no fit + + +def get_nx_classes(): + """Read base classes from the NeXus definition folder. +Check each file in base_classes, applications, contributed_definitions. +If its category attribute is 'base', then it is added to the list. """ + base_classes = sorted(glob(os.path.join(get_nexus_definitions_path(), + 'base_classes', '*.nxdl.xml'))) + applications = sorted(glob(os.path.join(get_nexus_definitions_path(), + 'applications', '*.nxdl.xml'))) + contributed = sorted(glob(os.path.join(get_nexus_definitions_path(), + 'contributed_definitions', '*.nxdl.xml'))) + nx_clss = [] + for nexus_file in base_classes + applications + contributed: + root = get_xml_root(nexus_file) + if root.attrib['category'] == 'base': + nx_clss.append(str(nexus_file[nexus_file.rindex(os.sep) + 1:])[:-9]) + nx_clss = sorted(nx_clss) + return nx_clss + + +def get_nx_units(): + """Read unit kinds from the NeXus definition/nxdlTypes.xsd file""" + filepath = f"{get_nexus_definitions_path()}{os.sep}nxdlTypes.xsd" + root = get_xml_root(filepath) + units_and_type_list = [] + for child in root: + for i in child.attrib.values(): + units_and_type_list.append(i) + flag = False + for line in units_and_type_list: + if line == 'anyUnitsAttr': + flag = True + nx_units = [] + elif 'NX' in line and flag is True: + nx_units.append(line) + elif line == 'primitiveType': + flag = False + else: + pass + return nx_units + + +def get_nx_attribute_type(): + """Read attribute types from the NeXus definition/nxdlTypes.xsd file""" + filepath = get_nexus_definitions_path() + '/nxdlTypes.xsd' + root = get_xml_root(filepath) + units_and_type_list = [] + for child in root: + for i in child.attrib.values(): + units_and_type_list.append(i) + flag = False + for line in units_and_type_list: + if line == 'primitiveType': + flag = True + nx_types = [] + elif 'NX' in line and flag is True: + nx_types.append(line) + elif line == 'anyUnitsAttr': + flag = False + else: + pass + return nx_types + + +def get_node_name(node): + '''Node - xml node. Returns html documentation name. + Either as specified by the 'name' or taken from the type (nx_class). + Note that if only class name is available, the NX prefix is removed and + the string is converted to UPPER case.''' + if 'name' in node.attrib.keys(): + name = node.attrib['name'] + else: + name = node.attrib['type'] + if name.startswith('NX'): + name = name[2:].upper() + return name + + +def belongs_to(nxdl_elem, child, name, class_type=None, hdf_name=None): + """Checks if an HDF5 node name corresponds to a child of the NXDL element +uppercase letters in front can be replaced by arbitraty name, but +uppercase to lowercase match is preferred""" + if class_type and get_nx_class(child) != class_type: + return False + act_htmlname = get_node_name(child) + chk_name = hdf_name or name + if act_htmlname == chk_name: + return True + if not hdf_name: # search for name fits is only allowed for hdf_nodes + return False + try: # check if nameType allows different name + name_any = bool(child.attrib['nameType'] == "any") + except KeyError: + name_any = False + params = [act_htmlname, chk_name, name_any, nxdl_elem, child, name] + return belongs_to_capital(params) + + +def belongs_to_capital(params): + """Checking continues for Upper case""" + (act_htmlname, chk_name, name_any, nxdl_elem, child, name) = params + # or starts with capital and no reserved words used + if (name_any or 'A' <= act_htmlname[0] <= 'Z') and \ + name != 'doc' and name != 'enumeration': + fit = get_nx_namefit(chk_name, act_htmlname, name_any) # check if name fits + if fit < 0: + return False + for child2 in nxdl_elem: + if get_local_name_from_xml(child) != \ + get_local_name_from_xml(child2) or get_node_name(child2) == act_htmlname: + continue + # check if the name of another sibling fits better + name_any2 = "nameType" in child2.attrib.keys() and child2.attrib["nameType"] == "any" + fit2 = get_nx_namefit(chk_name, get_node_name(child2), name_any2) + if fit2 > fit: + return False + # accept this fit + return True + return False + + +def get_local_name_from_xml(element): + """Helper function to extract the element tag without the namespace.""" + return element.tag[element.tag.rindex("}") + 1:] + + +def get_own_nxdl_child_reserved_elements(child, name, nxdl_elem): + """checking reserved elements, like doc, enumeration""" + if get_local_name_from_xml(child) == 'doc' and name == 'doc': + if nxdl_elem.get('nxdlbase'): + child.set('nxdlbase', nxdl_elem.get('nxdlbase')) + child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) + child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/doc') + return child + if get_local_name_from_xml(child) == 'enumeration' and name == 'enumeration': + if nxdl_elem.get('nxdlbase'): + child.set('nxdlbase', nxdl_elem.get('nxdlbase')) + child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) + child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/enumeration') + return child + return False + + +def get_own_nxdl_child_base_types(child, class_type, nxdl_elem, name, hdf_name): + """checking base types of group, field,m attribute""" + if get_local_name_from_xml(child) == 'group': + if (class_type is None or (class_type and get_nx_class(child) == class_type)) and \ + belongs_to(nxdl_elem, child, name, class_type, hdf_name): + if nxdl_elem.get('nxdlbase'): + child.set('nxdlbase', nxdl_elem.get('nxdlbase')) + child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) + child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/' + get_node_name(child)) + return child + if get_local_name_from_xml(child) == 'field' and \ + belongs_to(nxdl_elem, child, name, None, hdf_name): + if nxdl_elem.get('nxdlbase'): + child.set('nxdlbase', nxdl_elem.get('nxdlbase')) + child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) + child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/' + get_node_name(child)) + return child + if get_local_name_from_xml(child) == 'attribute' and \ + belongs_to(nxdl_elem, child, name, None, hdf_name): + if nxdl_elem.get('nxdlbase'): + child.set('nxdlbase', nxdl_elem.get('nxdlbase')) + child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) + child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/' + get_node_name(child)) + return child + return False + + +def get_own_nxdl_child(nxdl_elem, name, class_type=None, hdf_name=None, nexus_type=None): + """Checks if an NXDL child node fits to the specific name (either nxdl or hdf) + name - nxdl name + class_type - nxdl type or hdf classname (for groups, it is obligatory) + hdf_name - hdf name""" + for child in nxdl_elem: + if 'name' in child.attrib and child.attrib['name'] == name: + if nxdl_elem.get('nxdlbase'): + child.set('nxdlbase', nxdl_elem.get('nxdlbase')) + child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) + child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/' + get_node_name(child)) + return child + for child in nxdl_elem: + if "name" in child.attrib and child.attrib["name"] == name: + child.set('nxdlbase', nxdl_elem.get('nxdlbase')) + return child + + for child in nxdl_elem: + result = get_own_nxdl_child_reserved_elements(child, name, nxdl_elem) + if result is not False: + return result + if nexus_type and get_local_name_from_xml(child) != nexus_type: + continue + result = get_own_nxdl_child_base_types(child, class_type, nxdl_elem, name, hdf_name) + if result is not False: + return result + return None + + +def find_definition_file(bc_name): + """find the nxdl file corresponding to the name. + Note that it first checks in contributed and goes beyond only if no contributed found""" + bc_filename = None + for nxdl_folder in ['contributed_definitions', 'base_classes', 'applications']: + if os.path.exists(f"{get_nexus_definitions_path()}{os.sep}" + f"{nxdl_folder}{os.sep}{bc_name}.nxdl.xml"): + bc_filename = f"{get_nexus_definitions_path()}{os.sep}" \ + f"{nxdl_folder}{os.sep}{bc_name}.nxdl.xml" + break + return bc_filename + + +def get_nxdl_child(nxdl_elem, name, class_type=None, hdf_name=None, nexus_type=None, go_base=True): # pylint: disable=too-many-arguments + """Get the NXDL child node corresponding to a specific name +(e.g. of an HDF5 node,or of a documentation) note that if child is not found in application +definition, it also checks for the base classes""" + # search for possible fits for hdf_nodes : skipped + # only exact hits are returned when searching an nxdl child + own_child = get_own_nxdl_child(nxdl_elem, name, class_type, hdf_name, nexus_type) + if own_child is not None: + return own_child + if not go_base: + return None + bc_name = get_nx_class(nxdl_elem) # check in the base class, app def or contributed + if bc_name[2] == '_': # filter primitive types + return None + if bc_name == "group": # Check if it is the root element. Then send to NXroot.nxdl.xml + bc_name = "NXroot" + bc_filename = find_definition_file(bc_name) + if not bc_filename: + raise ValueError('nxdl file not found in definitions folder!') + bc_obj = ET.parse(bc_filename).getroot() + bc_obj.set('nxdlbase', bc_filename) + if 'category' in bc_obj.attrib: + bc_obj.set('nxdlbase_class', bc_obj.attrib['category']) + bc_obj.set('nxdlpath', '') + return get_own_nxdl_child(bc_obj, name, class_type, hdf_name, nexus_type) + + +def get_required_string(nxdl_elem): + """Check for being REQUIRED, RECOMMENDED, OPTIONAL, NOT IN SCHEMA""" + if nxdl_elem is None: + return "<>" + is_optional = 'optional' in nxdl_elem.attrib.keys() \ + and nxdl_elem.attrib['optional'] == "true" + is_minoccurs = 'minOccurs' in nxdl_elem.attrib.keys() \ + and nxdl_elem.attrib['minOccurs'] == "0" + is_recommended = 'recommended' in nxdl_elem.attrib.keys() \ + and nxdl_elem.attrib['recommended'] == "true" + + if is_recommended: + return "<>" + if is_optional or is_minoccurs: + return "<>" + # default optionality: in BASE CLASSES is true; in APPLICATIONS is false + try: + if nxdl_elem.get('nxdlbase_class') == 'base': + return "<>" + except TypeError: + return "<>" + return "<>" + + +def chk_nxdataaxis_v2(hdf_node, name, logger): + """Check if dataset is an axis""" + own_signal = hdf_node.attrs.get('signal') # check for being a Signal + if own_signal is str and own_signal == "1": + logger.debug("Dataset referenced (v2) as NXdata SIGNAL") + own_axes = hdf_node.attrs.get('axes') # check for being an axis + if own_axes is str: + axes = own_axes.split(':') + for i in len(axes): + if axes[i] and name == axes[i]: + logger.debug("Dataset referenced (v2) as NXdata AXIS #%d", i) + return None + ownpaxis = hdf_node.attrs.get('primary') + own_axis = hdf_node.attrs.get('axis') + if own_axis is int: + # also convention v1 + if ownpaxis is int and ownpaxis == 1: + logger.debug("Dataset referenced (v2) as NXdata AXIS #%d", own_axis - 1) + else: + logger.debug( + "Dataset referenced (v2) as NXdata (primary/alternative) AXIS #%d", own_axis - 1) + return None + + +def chk_nxdataaxis(hdf_node, name, logger): + """NEXUS Data Plotting Standard v3: new version from 2014""" + if not isinstance(hdf_node, h5py.Dataset): # check if it is a field in an NXdata node + return None + parent = hdf_node.parent + if not parent or (parent and not parent.attrs.get('NX_class') == "NXdata"): + return None + signal = parent.attrs.get('signal') # chk for Signal + if signal and name == signal: + logger.debug("Dataset referenced as NXdata SIGNAL") + return None + axes = parent.attrs.get('axes') # check for default Axes + if axes is str: + if name == axes: + logger.debug("Dataset referenced as NXdata AXIS") + return None + elif axes is not None: + for i, j in enumerate(axes): + if name == j: + indices = parent.attrs.get(j + '_indices') + if indices is int: + logger.debug(f"Dataset referenced as NXdata AXIS #{indices}") + else: + logger.debug(f"Dataset referenced as NXdata AXIS #{i}") + return None + indices = parent.attrs.get(name + '_indices') # check for alternative Axes + if indices is int: + logger.debug(f"Dataset referenced as NXdata alternative AXIS #{indices}") + return chk_nxdataaxis_v2(hdf_node, name, logger) # check for older conventions + + +# below there are some functions used in get_nxdl_doc function: +def write_doc_string(logger, doc, attr): + """Simple function that prints a line in the logger if doc exists""" + if doc: + logger.debug("@" + attr + ' [NX_CHAR]') + return logger, doc, attr + + +def try_find_units(logger, elem, nxdl_path, doc, attr): + """Try to find if units is defined inside the field in the NXDL element, + otherwise try to find if units is defined as a child of the NXDL element.""" + try: # try to find if units is defined inside the field in the NXDL element + unit = elem.attrib[attr] + if doc: + logger.debug(get_node_concept_path(elem) + "@" + attr + ' [' + unit + ']') + elem = None + nxdl_path.append(attr) + except KeyError: # otherwise try to find if units is defined as a child of the NXDL element + orig_elem = elem + elem = get_nxdl_child(elem, attr, nexus_type='attribute') + if elem is not None: + if doc: + logger.debug(get_node_concept_path(orig_elem) + + "@" + attr + ' - [' + get_nx_class(elem) + ']') + nxdl_path.append(elem) + else: # if no units category were defined in NXDL: + if doc: + logger.debug(get_node_concept_path(orig_elem) + + "@" + attr + " - REQUIRED, but undefined unit category") + nxdl_path.append(attr) + return logger, elem, nxdl_path, doc, attr + + +def check_attr_name_nxdl(param): + """Check for ATTRIBUTENAME_units in NXDL (normal). +If not defined, check for ATTRIBUTENAME to see if the ATTRIBUTE +is in the SCHEMA, but no units category were defined. """ + (logger, elem, nxdl_path, doc, attr, req_str) = param + orig_elem = elem + elem2 = get_nxdl_child(elem, attr, nexus_type='attribute') + if elem2 is not None: # check for ATTRIBUTENAME_units in NXDL (normal) + elem = elem2 + if doc: + logger.debug(get_node_concept_path(orig_elem) + + "@" + attr + ' - [' + get_nx_class(elem) + ']') + nxdl_path.append(elem) + else: + # if not defined, check for ATTRIBUTENAME to see if the ATTRIBUTE + # is in the SCHEMA, but no units category were defined + elem2 = get_nxdl_child(elem, attr[:-6], nexus_type='attribute') + if elem2 is not None: + req_str = '<>' + if doc: + logger.debug(get_node_concept_path(orig_elem) + + "@" + attr + " - RECOMMENDED, but undefined unit category") + nxdl_path.append(attr) + else: # otherwise: NOT IN SCHEMA + elem = elem2 + if doc: + logger.debug(get_node_concept_path(orig_elem) + "@" + attr + " - IS NOT IN SCHEMA") + return logger, elem, nxdl_path, doc, attr, req_str + + +def try_find_default(logger, orig_elem, elem, nxdl_path, doc, attr): # pylint: disable=too-many-arguments + """Try to find if default is defined as a child of the NXDL element """ + if elem is not None: + if doc: + logger.debug(get_node_concept_path(orig_elem) + + "@" + attr + ' - [' + get_nx_class(elem) + ']') + nxdl_path.append(elem) + else: # if no default category were defined in NXDL: + if doc: + logger.debug(get_node_concept_path(orig_elem) + "@" + attr + " - [NX_CHAR]") + nxdl_path.append(attr) + return logger, elem, nxdl_path, doc, attr + + +def other_attrs(logger, orig_elem, elem, nxdl_path, doc, attr): # pylint: disable=too-many-arguments + """Handle remaining attributes """ + if elem is not None: + if doc: + logger.debug(get_node_concept_path(orig_elem) + + "@" + attr + ' - [' + get_nx_class(elem) + ']') + nxdl_path.append(elem) + else: + if doc: + logger.debug(get_node_concept_path(orig_elem) + "@" + attr + " - IS NOT IN SCHEMA") + return logger, elem, nxdl_path, doc, attr + + +def check_deprecation_enum_axis(variables, doc, elist, attr, hdf_node): + """Check for several attributes. - deprecation - enums - nxdataaxis """ + logger, elem, path = variables + dep_str = elem.attrib.get('deprecated') # check for deprecation + if dep_str: + if doc: + logger.debug("DEPRECATED - " + dep_str) + for base_elem in elist if not attr else [elem]: # check for enums + sdoc = get_nxdl_child(base_elem, 'enumeration', go_base=False) + if sdoc is not None: + if doc: + logger.debug("enumeration (" + get_node_concept_path(base_elem) + "):") + for item in sdoc: + if get_local_name_from_xml(item) == 'item': + if doc: + logger.debug("-> " + item.attrib['value']) + chk_nxdataaxis(hdf_node, path.split('/')[-1], logger) # look for NXdata reference (axes/signal) + for base_elem in elist if not attr else [elem]: # check for doc + sdoc = get_nxdl_child(base_elem, 'doc', go_base=False) + if doc: + logger.debug("documentation (" + get_node_concept_path(base_elem) + "):") + logger.debug(sdoc.text if sdoc is not None else "") + return logger, elem, path, doc, elist, attr, hdf_node + + +def get_node_concept_path(elem): + """get the short version of nxdlbase:nxdlpath""" + return str(elem.get('nxdlbase').split('/')[-1] + ":" + elem.get('nxdlpath')) + + +def get_nxdl_attr_doc( # pylint: disable=too-many-arguments,too-many-locals + elem, elist, attr, hdf_node, logger, doc, nxdl_path, req_str, path, hdf_info): + """Get nxdl documentation for an attribute""" + new_elem = [] + old_elem = elem + for elem_index, act_elem1 in enumerate(elist): + act_elem = act_elem1 + # NX_class is a compulsory attribute for groups in a nexus file + # which should match the type of the corresponding NXDL element + if attr == 'NX_class' and not isinstance(hdf_node, h5py.Dataset) and elem_index == 0: + elem = None + logger, doc, attr = write_doc_string(logger, doc, attr) + new_elem = elem + break + # units category is a compulsory attribute for any fields + if attr == 'units' and isinstance(hdf_node, h5py.Dataset): + req_str = "<>" + logger, act_elem, nxdl_path, doc, attr = try_find_units(logger, + act_elem, + nxdl_path, + doc, + attr) + # units for attributes can be given as ATTRIBUTENAME_units + elif attr.endswith('_units'): + logger, act_elem, nxdl_path, doc, attr, req_str = check_attr_name_nxdl((logger, + act_elem, + nxdl_path, + doc, + attr, + req_str)) + # default is allowed for groups + elif attr == 'default' and not isinstance(hdf_node, h5py.Dataset): + req_str = "<>" + # try to find if default is defined as a child of the NXDL element + act_elem = get_nxdl_child(act_elem, attr, nexus_type='attribute', go_base=False) + logger, act_elem, nxdl_path, doc, attr = try_find_default(logger, + act_elem1, + act_elem, + nxdl_path, + doc, + attr) + else: # other attributes + act_elem = get_nxdl_child(act_elem, attr, nexus_type='attribute', go_base=False) + if act_elem is not None: + logger, act_elem, nxdl_path, doc, attr = \ + other_attrs(logger, act_elem1, act_elem, nxdl_path, doc, attr) + if act_elem is not None: + new_elem.append(act_elem) + if req_str is None: + req_str = get_required_string(act_elem) # check for being required + if doc: + logger.debug(req_str) + variables = [logger, act_elem, path] + logger, elem, path, doc, elist, attr, hdf_node = check_deprecation_enum_axis(variables, + doc, + elist, + attr, + hdf_node) + elem = old_elem + if req_str is None and doc: + if attr != 'NX_class': + logger.debug("@" + attr + " - IS NOT IN SCHEMA") + logger.debug("") + return (req_str, get_nxdl_entry(hdf_info), nxdl_path) + + +def get_nxdl_doc(hdf_info, logger, doc, attr=False): + """Get nxdl documentation for an HDF5 node (or its attribute)""" + hdf_node = hdf_info['hdf_node'] + # new way: retrieve multiple inherited base classes + (class_path, nxdl_path, elist) = \ + get_inherited_nodes(None, nx_name=get_nxdl_entry(hdf_info), hdf_node=hdf_node, + hdf_path=hdf_info['hdf_path'] if 'hdf_path' in hdf_info else None, + hdf_root=hdf_info['hdf_root'] if 'hdf_root' in hdf_info else None) + elem = elist[0] if class_path and elist else None + if doc: + logger.debug("classpath: " + str(class_path)) + logger.debug("NOT IN SCHEMA" if elem is None else + "classes:\n" + "\n".join + (get_node_concept_path(e) for e in elist)) + # old solution with a single elem instead of using elist + path = get_nx_class_path(hdf_info) + req_str = None + if elem is None: + if doc: + logger.debug("") + return ('None', None, None) + if attr: + return get_nxdl_attr_doc(elem, elist, attr, hdf_node, logger, doc, nxdl_path, + req_str, path, hdf_info) + req_str = get_required_string(elem) # check for being required + if doc: + logger.debug(req_str) + variables = [logger, elem, path] + logger, elem, path, doc, elist, attr, hdf_node = check_deprecation_enum_axis(variables, + doc, + elist, + attr, + hdf_node) + return (req_str, get_nxdl_entry(hdf_info), nxdl_path) + + +def get_doc(node, ntype, nxhtml, nxpath): + """Get documentation""" + # URL for html documentation + anchor = '' + for n_item in nxpath: + anchor += n_item.lower() + "-" + anchor = ('https://manual.nexusformat.org/classes/', + nxhtml + "#" + anchor.replace('_', '-') + ntype) + if not ntype: + anchor = anchor[:-1] + doc = "" # RST documentation from the field 'doc' + doc_field = node.find("doc") + if doc_field is not None: + doc = doc_field.text + (index, enums) = get_enums(node) # enums + if index: + enum_str = "\n " + ("Possible values:" + if len(enums.split(',')) > 1 + else "Obligatory value:") + "\n " + enums + "\n" + else: + enum_str = "" + return anchor, doc + enum_str + + +def print_doc(node, ntype, level, nxhtml, nxpath): + """Print documentation""" + anchor, doc = get_doc(node, ntype, nxhtml, nxpath) + print(" " * (level + 1) + anchor) + preferred_width = 80 + level * 2 + wrapper = textwrap.TextWrapper(initial_indent=' ' * (level + 1), width=preferred_width, + subsequent_indent=' ' * (level + 1), expand_tabs=False, + tabsize=0) + if doc is not None: + for par in doc.split('\n'): + print(wrapper.fill(par)) + + +def get_namespace(element): + """Extracts the namespace for elements in the NXDL""" + return element.tag[element.tag.index("{"):element.tag.rindex("}") + 1] + + +def get_enums(node): + """Makes list of enumerations, if node contains any. + Returns comma separated STRING of enumeration values, if there are enum tag, + otherwise empty string.""" + # collect item values from enumeration tag, if any + namespace = get_namespace(node) + enums = [] + for enumeration in node.findall(f"{namespace}enumeration"): + for item in enumeration.findall(f"{namespace}item"): + enums.append(item.attrib["value"]) + enums = ','.join(enums) + if enums != "": + return (True, '[' + enums + ']') + return (False, "") # if there is no enumeration tag, returns empty string + + +def add_base_classes(elist, nx_name=None, elem: ET.Element = None): + """Add the base classes corresponding to the last eleme in elist to the list. Note that if +elist is empty, a nxdl file with the name of nx_name or a rather room elem is used if provided""" + if elist and nx_name is None: + nx_name = get_nx_class(elist[-1]) + # to support recursive defintions, like NXsample in NXsample, the following test is removed + # if elist and nx_name and f"{nx_name}.nxdl.xml" in (e.get('nxdlbase') for e in elist): + # return + if elem is None: + if not nx_name: + return + nxdl_file_path = find_definition_file(nx_name) + if nxdl_file_path is None: + nxdl_file_path = f"{nx_name}.nxdl.xml" + elem = ET.parse(nxdl_file_path).getroot() + elem.set('nxdlbase', nxdl_file_path) + else: + elem.set('nxdlbase', '') + if 'category' in elem.attrib: + elem.set('nxdlbase_class', elem.attrib['category']) + elem.set('nxdlpath', '') + elist.append(elem) + # add inherited base class + if 'extends' in elem.attrib and elem.attrib['extends'] != 'NXobject': + add_base_classes(elist, elem.attrib['extends']) + else: + add_base_classes(elist) + + +def set_nxdlpath(child, nxdl_elem): + """ + Setting up child nxdlbase, nxdlpath and nxdlbase_class from nxdl_element. + """ + if nxdl_elem.get('nxdlbase'): + child.set('nxdlbase', nxdl_elem.get('nxdlbase')) + child.set('nxdlbase_class', nxdl_elem.get('nxdlbase_class')) + child.set('nxdlpath', nxdl_elem.get('nxdlpath') + '/' + get_node_name(child)) + return child + + +def get_direct_child(nxdl_elem, html_name): + """ returns the child of nxdl_elem which has a name + corresponding to the the html documentation name html_name""" + for child in nxdl_elem: + if get_local_name_from_xml(child) in ('group', 'field', 'attribute') and \ + html_name == get_node_name(child): + decorated_child = set_nxdlpath(child, nxdl_elem) + return decorated_child + return None + + +def get_field_child(nxdl_elem, html_name): + """ returns the child of nxdl_elem which has a name + corresponding to the html documentation name html_name""" + data_child = None + for child in nxdl_elem: + if get_local_name_from_xml(child) != 'field': + continue + if get_node_name(child) == html_name: + data_child = set_nxdlpath(child, nxdl_elem) + break + return data_child + + +def get_best_nxdata_child(nxdl_elem, hdf_node, hdf_name): + """ returns the child of an NXdata nxdl_elem which has a name + corresponding to the hdf_name""" + nxdata = hdf_node.parent + signals = [] + if 'signal' in nxdata.attrs.keys(): + signals.append(nxdata.attrs.get("signal")) + if "auxiliary_signals" in nxdata.attrs.keys(): + for aux_signal in nxdata.attrs.get("auxiliary_signals"): + signals.append(aux_signal) + data_child = get_field_child(nxdl_elem, 'DATA') + data_error_child = get_field_child(nxdl_elem, 'FIELDNAME_errors') + for signal in signals: + if signal == hdf_name: + return (data_child, 100) + if hdf_name.endswith('_errors') and signal == hdf_name[:-7]: + return (data_error_child, 100) + axes = [] + if "axes" in nxdata.attrs.keys(): + for axis in nxdata.attrs.get("axes"): + axes.append(axis) + axis_child = get_field_child(nxdl_elem, 'AXISNAME') + for axis in axes: + if axis == hdf_name: + return (axis_child, 100) + return (None, 0) + + +def get_best_child(nxdl_elem, hdf_node, hdf_name, hdf_class_name, nexus_type): + """ returns the child of nxdl_elem which has a name + corresponding to the the html documentation name html_name""" + bestfit = -1 + bestchild = None + if 'name' in nxdl_elem.attrib.keys() and nxdl_elem.attrib['name'] == 'NXdata' and \ + hdf_node is not None and hdf_node.parent is not None and \ + hdf_node.parent.attrs.get('NX_class') == 'NXdata': + (fnd_child, fit) = get_best_nxdata_child(nxdl_elem, hdf_node, hdf_name) + if fnd_child is not None: + return (fnd_child, fit) + for child in nxdl_elem: + fit = -2 + if get_local_name_from_xml(child) == nexus_type and \ + (nexus_type != 'group' or get_nx_class(child) == hdf_class_name): + name_any = "nameType" in nxdl_elem.attrib.keys() and \ + nxdl_elem.attrib["nameType"] == "any" + fit = get_nx_namefit(hdf_name, get_node_name(child), name_any) + if fit > bestfit: + bestfit = fit + bestchild = set_nxdlpath(child, nxdl_elem) + return (bestchild, bestfit) + + +def walk_elist(elist, html_name): + """Handle elist from low priority inheritance classes to higher""" + for ind in range(len(elist) - 1, -1, -1): + child = get_direct_child(elist[ind], html_name) + if child is None: + # check for names fitting to a superclas definition + main_child = None + for potential_direct_parent in elist: + main_child = get_direct_child(potential_direct_parent, html_name) + if main_child is not None: + (fitting_child, _) = get_best_child(elist[ind], None, html_name, + get_nx_class(main_child), + get_local_name_from_xml(main_child)) + if fitting_child is not None: + child = fitting_child + break + elist[ind] = child + if elist[ind] is None: + del elist[ind] + continue + # override: remove low priority inheritance classes if class_type is overriden + if len(elist) > ind + 1 and get_nx_class(elist[ind]) != get_nx_class(elist[ind + 1]): + del elist[ind + 1:] + # add new base class(es) if new element brings such (and not a primitive type) + if len(elist) == ind + 1 and get_nx_class(elist[ind])[0:3] != 'NX_': + add_base_classes(elist) + return elist, html_name + + +def helper_get_inherited_nodes(hdf_info2, elist, pind, attr): + """find the best fitting name in all children""" + hdf_path, hdf_node, hdf_class_path = hdf_info2 + hdf_name = hdf_path[pind] + hdf_class_name = hdf_class_path[pind] + if pind < len(hdf_path) - (2 if attr else 1): + act_nexus_type = 'group' + elif pind == len(hdf_path) - 1 and attr: + act_nexus_type = 'attribute' + else: + act_nexus_type = 'field' if isinstance(hdf_node, h5py.Dataset) else 'group' + # find the best fitting name in all children + bestfit = -1 + html_name = None + for ind in range(len(elist) - 1, -1, -1): + newelem, fit = get_best_child(elist[ind], + hdf_node, + hdf_name, + hdf_class_name, + act_nexus_type) + if fit >= bestfit and newelem is not None: + html_name = get_node_name(newelem) + return hdf_path, hdf_node, hdf_class_path, elist, pind, attr, html_name + + +def get_hdf_path(hdf_info): + """Get the hdf_path from an hdf_info""" + if 'hdf_path' in hdf_info: + return hdf_info['hdf_path'].split('/')[1:] + return hdf_info['hdf_node'].name.split('/')[1:] + + +@lru_cache(maxsize=None) +def get_inherited_nodes(nxdl_path: str = None, # pylint: disable=too-many-arguments,too-many-locals + nx_name: str = None, elem: ET.Element = None, + hdf_node=None, hdf_path=None, hdf_root=None, attr=False): + """Returns a list of ET.Element for the given path.""" + # let us start with the given definition file + elist = [] # type: ignore[var-annotated] + add_base_classes(elist, nx_name, elem) + nxdl_elem_path = [elist[0]] + + class_path = [] # type: ignore[var-annotated] + if hdf_node is not None: + hdf_info = {'hdf_node': hdf_node} + if hdf_path: + hdf_info['hdf_path'] = hdf_path + if hdf_root: + hdf_root['hdf_root'] = hdf_root + hdf_node = hdf_info['hdf_node'] + hdf_path = get_hdf_path(hdf_info) + hdf_class_path = get_nx_class_path(hdf_info).split('/')[1:] + if attr: + hdf_path.append(attr) + hdf_class_path.append(attr) + path = hdf_path + else: + html_path = nxdl_path.split('/')[1:] + path = html_path + for pind in range(len(path)): + if hdf_node is not None: + hdf_info2 = [hdf_path, hdf_node, hdf_class_path] + [hdf_path, hdf_node, hdf_class_path, elist, + pind, attr, html_name] = helper_get_inherited_nodes(hdf_info2, elist, + pind, attr) + if html_name is None: # return if NOT IN SCHEMA + return (class_path, nxdl_elem_path, None) + else: + html_name = html_path[pind] + elist, html_name = walk_elist(elist, html_name) + if elist: + class_path.append(get_nx_class(elist[0])) + nxdl_elem_path.append(elist[0]) + return (class_path, nxdl_elem_path, elist) + + +def get_node_at_nxdl_path(nxdl_path: str = None, + nx_name: str = None, elem: ET.Element = None, + exc: bool = True): + """Returns an ET.Element for the given path. + This function either takes the name for the NeXus Application Definition + we are looking for or the root elem from a previously loaded NXDL file + and finds the corresponding XML element with the needed attributes.""" + try: + (class_path, nxdlpath, elist) = get_inherited_nodes(nxdl_path, nx_name, elem) + except ValueError as value_error: + if exc: + raise NxdlAttributeError(f"Attributes were not found for {nxdl_path}. " + "Please check this entry in the template dictionary.") \ + from value_error + return None + if class_path and nxdlpath and elist: + elem = elist[0] + else: + elem = None + if exc: + raise NxdlAttributeError(f"Attributes were not found for {nxdl_path}. " + "Please check this entry in the template dictionary.") + return elem + From f05ca0d22b72cb4eccddfa0afb5e98eec4502bef Mon Sep 17 00:00:00 2001 From: Peter Chang Date: Fri, 16 Jun 2023 12:06:56 +0100 Subject: [PATCH 2/3] Add back some functions --- pynxtools/nexus/nexus.py | 116 ++++++++++++++++++++++++++++++++++ pynxtools/nexus/nxdl_utils.py | 116 ---------------------------------- 2 files changed, 116 insertions(+), 116 deletions(-) diff --git a/pynxtools/nexus/nexus.py b/pynxtools/nexus/nexus.py index cce7b0f14..30e013614 100644 --- a/pynxtools/nexus/nexus.py +++ b/pynxtools/nexus/nexus.py @@ -12,6 +12,122 @@ from .nxdl_utils import * +def get_nxdl_entry(hdf_info): + """Get the nxdl application definition for an HDF5 node""" + entry = hdf_info + while isinstance(entry['hdf_node'], h5py.Dataset) or \ + 'NX_class' not in entry['hdf_node'].attrs.keys() or \ + entry['hdf_node'].attrs['NX_class'] != 'NXentry': + entry = get_hdf_info_parent(entry) + if entry['hdf_node'].name == '/': + return 'NO NXentry found' + try: + nxdef = entry['hdf_node']['definition'][()] + return nxdef.decode() + except KeyError: # 'NO Definition referenced' + return "NXentry" + +def get_nxdl_attr_doc( # pylint: disable=too-many-arguments,too-many-locals + elem, elist, attr, hdf_node, logger, doc, nxdl_path, req_str, path, hdf_info): + """Get nxdl documentation for an attribute""" + new_elem = [] + old_elem = elem + for elem_index, act_elem1 in enumerate(elist): + act_elem = act_elem1 + # NX_class is a compulsory attribute for groups in a nexus file + # which should match the type of the corresponding NXDL element + if attr == 'NX_class' and not isinstance(hdf_node, h5py.Dataset) and elem_index == 0: + elem = None + logger, doc, attr = write_doc_string(logger, doc, attr) + new_elem = elem + break + # units category is a compulsory attribute for any fields + if attr == 'units' and isinstance(hdf_node, h5py.Dataset): + req_str = "<>" + logger, act_elem, nxdl_path, doc, attr = try_find_units(logger, + act_elem, + nxdl_path, + doc, + attr) + # units for attributes can be given as ATTRIBUTENAME_units + elif attr.endswith('_units'): + logger, act_elem, nxdl_path, doc, attr, req_str = check_attr_name_nxdl((logger, + act_elem, + nxdl_path, + doc, + attr, + req_str)) + # default is allowed for groups + elif attr == 'default' and not isinstance(hdf_node, h5py.Dataset): + req_str = "<>" + # try to find if default is defined as a child of the NXDL element + act_elem = get_nxdl_child(act_elem, attr, nexus_type='attribute', go_base=False) + logger, act_elem, nxdl_path, doc, attr = try_find_default(logger, + act_elem1, + act_elem, + nxdl_path, + doc, + attr) + else: # other attributes + act_elem = get_nxdl_child(act_elem, attr, nexus_type='attribute', go_base=False) + if act_elem is not None: + logger, act_elem, nxdl_path, doc, attr = \ + other_attrs(logger, act_elem1, act_elem, nxdl_path, doc, attr) + if act_elem is not None: + new_elem.append(act_elem) + if req_str is None: + req_str = get_required_string(act_elem) # check for being required + if doc: + logger.debug(req_str) + variables = [logger, act_elem, path] + logger, elem, path, doc, elist, attr, hdf_node = check_deprecation_enum_axis(variables, + doc, + elist, + attr, + hdf_node) + elem = old_elem + if req_str is None and doc: + if attr != 'NX_class': + logger.debug("@" + attr + " - IS NOT IN SCHEMA") + logger.debug("") + return (req_str, get_nxdl_entry(hdf_info), nxdl_path) + + +def get_nxdl_doc(hdf_info, logger, doc, attr=False): + """Get nxdl documentation for an HDF5 node (or its attribute)""" + hdf_node = hdf_info['hdf_node'] + # new way: retrieve multiple inherited base classes + (class_path, nxdl_path, elist) = \ + get_inherited_nodes(None, nx_name=get_nxdl_entry(hdf_info), hdf_node=hdf_node, + hdf_path=hdf_info['hdf_path'] if 'hdf_path' in hdf_info else None, + hdf_root=hdf_info['hdf_root'] if 'hdf_root' in hdf_info else None) + elem = elist[0] if class_path and elist else None + if doc: + logger.debug("classpath: " + str(class_path)) + logger.debug("NOT IN SCHEMA" if elem is None else + "classes:\n" + "\n".join + (get_node_concept_path(e) for e in elist)) + # old solution with a single elem instead of using elist + path = get_nx_class_path(hdf_info) + req_str = None + if elem is None: + if doc: + logger.debug("") + return ('None', None, None) + if attr: + return get_nxdl_attr_doc(elem, elist, attr, hdf_node, logger, doc, nxdl_path, + req_str, path, hdf_info) + req_str = get_required_string(elem) # check for being required + if doc: + logger.debug(req_str) + variables = [logger, elem, path] + logger, elem, path, doc, elist, attr, hdf_node = check_deprecation_enum_axis(variables, + doc, + elist, + attr, + hdf_node) + return (req_str, get_nxdl_entry(hdf_info), nxdl_path) + def process_node(hdf_node, hdf_path, parser, logger, doc=True): """Processes an hdf5 node. - it logs the node found and also checks for its attributes diff --git a/pynxtools/nexus/nxdl_utils.py b/pynxtools/nexus/nxdl_utils.py index 6a8182cac..e398cc274 100644 --- a/pynxtools/nexus/nxdl_utils.py +++ b/pynxtools/nexus/nxdl_utils.py @@ -92,20 +92,6 @@ def get_nx_class_path(hdf_info): return '' -def get_nxdl_entry(hdf_info): - """Get the nxdl application definition for an HDF5 node""" - entry = hdf_info - while isinstance(entry['hdf_node'], h5py.Dataset) or \ - 'NX_class' not in entry['hdf_node'].attrs.keys() or \ - entry['hdf_node'].attrs['NX_class'] != 'NXentry': - entry = get_hdf_info_parent(entry) - if entry['hdf_node'].name == '/': - return 'NO NXentry found' - try: - nxdef = entry['hdf_node']['definition'][()] - return nxdef.decode() - except KeyError: # 'NO Definition referenced' - return "NXentry" def get_nx_class(nxdl_elem): @@ -581,108 +567,6 @@ def get_node_concept_path(elem): return str(elem.get('nxdlbase').split('/')[-1] + ":" + elem.get('nxdlpath')) -def get_nxdl_attr_doc( # pylint: disable=too-many-arguments,too-many-locals - elem, elist, attr, hdf_node, logger, doc, nxdl_path, req_str, path, hdf_info): - """Get nxdl documentation for an attribute""" - new_elem = [] - old_elem = elem - for elem_index, act_elem1 in enumerate(elist): - act_elem = act_elem1 - # NX_class is a compulsory attribute for groups in a nexus file - # which should match the type of the corresponding NXDL element - if attr == 'NX_class' and not isinstance(hdf_node, h5py.Dataset) and elem_index == 0: - elem = None - logger, doc, attr = write_doc_string(logger, doc, attr) - new_elem = elem - break - # units category is a compulsory attribute for any fields - if attr == 'units' and isinstance(hdf_node, h5py.Dataset): - req_str = "<>" - logger, act_elem, nxdl_path, doc, attr = try_find_units(logger, - act_elem, - nxdl_path, - doc, - attr) - # units for attributes can be given as ATTRIBUTENAME_units - elif attr.endswith('_units'): - logger, act_elem, nxdl_path, doc, attr, req_str = check_attr_name_nxdl((logger, - act_elem, - nxdl_path, - doc, - attr, - req_str)) - # default is allowed for groups - elif attr == 'default' and not isinstance(hdf_node, h5py.Dataset): - req_str = "<>" - # try to find if default is defined as a child of the NXDL element - act_elem = get_nxdl_child(act_elem, attr, nexus_type='attribute', go_base=False) - logger, act_elem, nxdl_path, doc, attr = try_find_default(logger, - act_elem1, - act_elem, - nxdl_path, - doc, - attr) - else: # other attributes - act_elem = get_nxdl_child(act_elem, attr, nexus_type='attribute', go_base=False) - if act_elem is not None: - logger, act_elem, nxdl_path, doc, attr = \ - other_attrs(logger, act_elem1, act_elem, nxdl_path, doc, attr) - if act_elem is not None: - new_elem.append(act_elem) - if req_str is None: - req_str = get_required_string(act_elem) # check for being required - if doc: - logger.debug(req_str) - variables = [logger, act_elem, path] - logger, elem, path, doc, elist, attr, hdf_node = check_deprecation_enum_axis(variables, - doc, - elist, - attr, - hdf_node) - elem = old_elem - if req_str is None and doc: - if attr != 'NX_class': - logger.debug("@" + attr + " - IS NOT IN SCHEMA") - logger.debug("") - return (req_str, get_nxdl_entry(hdf_info), nxdl_path) - - -def get_nxdl_doc(hdf_info, logger, doc, attr=False): - """Get nxdl documentation for an HDF5 node (or its attribute)""" - hdf_node = hdf_info['hdf_node'] - # new way: retrieve multiple inherited base classes - (class_path, nxdl_path, elist) = \ - get_inherited_nodes(None, nx_name=get_nxdl_entry(hdf_info), hdf_node=hdf_node, - hdf_path=hdf_info['hdf_path'] if 'hdf_path' in hdf_info else None, - hdf_root=hdf_info['hdf_root'] if 'hdf_root' in hdf_info else None) - elem = elist[0] if class_path and elist else None - if doc: - logger.debug("classpath: " + str(class_path)) - logger.debug("NOT IN SCHEMA" if elem is None else - "classes:\n" + "\n".join - (get_node_concept_path(e) for e in elist)) - # old solution with a single elem instead of using elist - path = get_nx_class_path(hdf_info) - req_str = None - if elem is None: - if doc: - logger.debug("") - return ('None', None, None) - if attr: - return get_nxdl_attr_doc(elem, elist, attr, hdf_node, logger, doc, nxdl_path, - req_str, path, hdf_info) - req_str = get_required_string(elem) # check for being required - if doc: - logger.debug(req_str) - variables = [logger, elem, path] - logger, elem, path, doc, elist, attr, hdf_node = check_deprecation_enum_axis(variables, - doc, - elist, - attr, - hdf_node) - return (req_str, get_nxdl_entry(hdf_info), nxdl_path) - - def get_doc(node, ntype, nxhtml, nxpath): """Get documentation""" # URL for html documentation From 5f9117ac298a9d651f944509d8459baf87896f37 Mon Sep 17 00:00:00 2001 From: Peter Chang Date: Mon, 19 Jun 2023 14:20:31 +0100 Subject: [PATCH 3/3] Do a little more refactoring --- pynxtools/nexus/nexus.py | 173 +++++++++++++++++++++++++++++++++- pynxtools/nexus/nxdl_utils.py | 160 +------------------------------ 2 files changed, 175 insertions(+), 158 deletions(-) diff --git a/pynxtools/nexus/nexus.py b/pynxtools/nexus/nexus.py index 30e013614..600d7ef5c 100644 --- a/pynxtools/nexus/nexus.py +++ b/pynxtools/nexus/nexus.py @@ -27,6 +27,103 @@ def get_nxdl_entry(hdf_info): except KeyError: # 'NO Definition referenced' return "NXentry" + +def get_nx_class_path(hdf_info): + """Get the full path of an HDF5 node using nexus classes +in case of a field, end with the field name""" + hdf_node = hdf_info['hdf_node'] + if hdf_node.name == '/': + return '' + if isinstance(hdf_node, h5py.Group): + return get_nx_class_path(get_hdf_info_parent(hdf_info)) + '/' + \ + (hdf_node.attrs['NX_class'] if 'NX_class' in hdf_node.attrs.keys() else + hdf_node.name.split('/')[-1]) + if isinstance(hdf_node, h5py.Dataset): + return get_nx_class_path( + get_hdf_info_parent(hdf_info)) + '/' + hdf_node.name.split('/')[-1] + return '' + + +def chk_nxdataaxis_v2(hdf_node, name, logger): + """Check if dataset is an axis""" + own_signal = hdf_node.attrs.get('signal') # check for being a Signal + if own_signal is str and own_signal == "1": + logger.debug("Dataset referenced (v2) as NXdata SIGNAL") + own_axes = hdf_node.attrs.get('axes') # check for being an axis + if own_axes is str: + axes = own_axes.split(':') + for i in len(axes): + if axes[i] and name == axes[i]: + logger.debug("Dataset referenced (v2) as NXdata AXIS #%d", i) + return None + ownpaxis = hdf_node.attrs.get('primary') + own_axis = hdf_node.attrs.get('axis') + if own_axis is int: + # also convention v1 + if ownpaxis is int and ownpaxis == 1: + logger.debug("Dataset referenced (v2) as NXdata AXIS #%d", own_axis - 1) + else: + logger.debug( + "Dataset referenced (v2) as NXdata (primary/alternative) AXIS #%d", own_axis - 1) + return None + + +def chk_nxdataaxis(hdf_node, name, logger): + """NEXUS Data Plotting Standard v3: new version from 2014""" + if not isinstance(hdf_node, h5py.Dataset): # check if it is a field in an NXdata node + return None + parent = hdf_node.parent + if not parent or (parent and not parent.attrs.get('NX_class') == "NXdata"): + return None + signal = parent.attrs.get('signal') # chk for Signal + if signal and name == signal: + logger.debug("Dataset referenced as NXdata SIGNAL") + return None + axes = parent.attrs.get('axes') # check for default Axes + if axes is str: + if name == axes: + logger.debug("Dataset referenced as NXdata AXIS") + return None + elif axes is not None: + for i, j in enumerate(axes): + if name == j: + indices = parent.attrs.get(j + '_indices') + if indices is int: + logger.debug(f"Dataset referenced as NXdata AXIS #{indices}") + else: + logger.debug(f"Dataset referenced as NXdata AXIS #{i}") + return None + indices = parent.attrs.get(name + '_indices') # check for alternative Axes + if indices is int: + logger.debug(f"Dataset referenced as NXdata alternative AXIS #{indices}") + return chk_nxdataaxis_v2(hdf_node, name, logger) # check for older conventions + + +def check_deprecation_enum_axis(variables, doc, elist, attr, hdf_node): + """Check for several attributes. - deprecation - enums - nxdataaxis """ + logger, elem, path = variables + dep_str = elem.attrib.get('deprecated') # check for deprecation + if dep_str: + if doc: + logger.debug("DEPRECATED - " + dep_str) + for base_elem in elist if not attr else [elem]: # check for enums + sdoc = get_nxdl_child(base_elem, 'enumeration', go_base=False) + if sdoc is not None: + if doc: + logger.debug("enumeration (" + get_node_concept_path(base_elem) + "):") + for item in sdoc: + if get_local_name_from_xml(item) == 'item': + if doc: + logger.debug("-> " + item.attrib['value']) + chk_nxdataaxis(hdf_node, path.split('/')[-1], logger) # look for NXdata reference (axes/signal) + for base_elem in elist if not attr else [elem]: # check for doc + sdoc = get_nxdl_child(base_elem, 'doc', go_base=False) + if doc: + logger.debug("documentation (" + get_node_concept_path(base_elem) + "):") + logger.debug(sdoc.text if sdoc is not None else "") + return logger, elem, path, doc, elist, attr, hdf_node + + def get_nxdl_attr_doc( # pylint: disable=too-many-arguments,too-many-locals elem, elist, attr, hdf_node, logger, doc, nxdl_path, req_str, path, hdf_info): """Get nxdl documentation for an attribute""" @@ -98,7 +195,7 @@ def get_nxdl_doc(hdf_info, logger, doc, attr=False): hdf_node = hdf_info['hdf_node'] # new way: retrieve multiple inherited base classes (class_path, nxdl_path, elist) = \ - get_inherited_nodes(None, nx_name=get_nxdl_entry(hdf_info), hdf_node=hdf_node, + get_inherited_hdf_nodes(nx_name=get_nxdl_entry(hdf_info), hdf_node=hdf_node, hdf_path=hdf_info['hdf_path'] if 'hdf_path' in hdf_info else None, hdf_root=hdf_info['hdf_root'] if 'hdf_root' in hdf_info else None) elem = elist[0] if class_path and elist else None @@ -128,6 +225,78 @@ def get_nxdl_doc(hdf_info, logger, doc, attr=False): hdf_node) return (req_str, get_nxdl_entry(hdf_info), nxdl_path) + +def helper_get_inherited_nodes(hdf_info2, elist, pind, attr): + """find the best fitting name in all children""" + hdf_path, hdf_node, hdf_class_path = hdf_info2 + hdf_name = hdf_path[pind] + hdf_class_name = hdf_class_path[pind] + if pind < len(hdf_path) - (2 if attr else 1): + act_nexus_type = 'group' + elif pind == len(hdf_path) - 1 and attr: + act_nexus_type = 'attribute' + else: + act_nexus_type = 'field' if isinstance(hdf_node, h5py.Dataset) else 'group' + # find the best fitting name in all children + bestfit = -1 + html_name = None + for ind in range(len(elist) - 1, -1, -1): + newelem, fit = get_best_child(elist[ind], + hdf_node, + hdf_name, + hdf_class_name, + act_nexus_type) + if fit >= bestfit and newelem is not None: + html_name = get_node_name(newelem) + return hdf_path, hdf_node, hdf_class_path, elist, pind, attr, html_name + + +def get_hdf_path(hdf_info): + """Get the hdf_path from an hdf_info""" + if 'hdf_path' in hdf_info: + return hdf_info['hdf_path'].split('/')[1:] + return hdf_info['hdf_node'].name.split('/')[1:] + + +@lru_cache(maxsize=None) +def get_inherited_hdf_nodes(nx_name: str = None, elem: ET.Element = None,# pylint: disable=too-many-arguments,too-many-locals + hdf_node=None, hdf_path=None, hdf_root=None, attr=False): + """Returns a list of ET.Element for the given path.""" + # let us start with the given definition file + if hdf_node is None: + raise ValueError('hdf_node must not be None') + elist = [] # type: ignore[var-annotated] + add_base_classes(elist, nx_name, elem) + nxdl_elem_path = [elist[0]] + + class_path = [] # type: ignore[var-annotated] + hdf_info = {'hdf_node': hdf_node} + if hdf_path: + hdf_info['hdf_path'] = hdf_path + if hdf_root: + hdf_root['hdf_root'] = hdf_root + hdf_node = hdf_info['hdf_node'] + hdf_path = get_hdf_path(hdf_info) + hdf_class_path = get_nx_class_path(hdf_info).split('/')[1:] + if attr: + hdf_path.append(attr) + hdf_class_path.append(attr) + path = hdf_path + + for pind in range(len(path)): + hdf_info2 = [hdf_path, hdf_node, hdf_class_path] + [hdf_path, hdf_node, hdf_class_path, elist, + pind, attr, html_name] = helper_get_inherited_nodes(hdf_info2, elist, + pind, attr) + if html_name is None: # return if NOT IN SCHEMA + return (class_path, nxdl_elem_path, None) + elist, html_name = walk_elist(elist, html_name) + if elist: + class_path.append(get_nx_class(elist[0])) + nxdl_elem_path.append(elist[0]) + return (class_path, nxdl_elem_path, elist) + + def process_node(hdf_node, hdf_path, parser, logger, doc=True): """Processes an hdf5 node. - it logs the node found and also checks for its attributes @@ -371,7 +540,7 @@ def get_all_is_a_rel_from_hdf_node(hdf_node, hdf_path): """ hdf_info = {'hdf_path': hdf_path, 'hdf_node': hdf_node} (_, _, elist) = \ - get_inherited_nodes(None, nx_name=get_nxdl_entry(hdf_info), hdf_node=hdf_node, + get_inherited_hdf_nodes(nx_name=get_nxdl_entry(hdf_info), hdf_node=hdf_node, hdf_path=hdf_info['hdf_path'] if 'hdf_path' in hdf_info else None, hdf_root=hdf_info['hdf_root'] if 'hdf_root' in hdf_info else None) return elist diff --git a/pynxtools/nexus/nxdl_utils.py b/pynxtools/nexus/nxdl_utils.py index e398cc274..b3211891a 100644 --- a/pynxtools/nexus/nxdl_utils.py +++ b/pynxtools/nexus/nxdl_utils.py @@ -76,24 +76,6 @@ def get_hdf_info_parent(hdf_info): return {'hdf_node': node, 'hdf_path': get_parent_path(hdf_info['hdf_path'])} -def get_nx_class_path(hdf_info): - """Get the full path of an HDF5 node using nexus classes -in case of a field, end with the field name""" - hdf_node = hdf_info['hdf_node'] - if hdf_node.name == '/': - return '' - if isinstance(hdf_node, h5py.Group): - return get_nx_class_path(get_hdf_info_parent(hdf_info)) + '/' + \ - (hdf_node.attrs['NX_class'] if 'NX_class' in hdf_node.attrs.keys() else - hdf_node.name.split('/')[-1]) - if isinstance(hdf_node, h5py.Dataset): - return get_nx_class_path( - get_hdf_info_parent(hdf_info)) + '/' + hdf_node.name.split('/')[-1] - return '' - - - - def get_nx_class(nxdl_elem): """Get the nexus class for a NXDL node""" if 'category' in nxdl_elem.attrib.keys(): @@ -391,62 +373,6 @@ def get_required_string(nxdl_elem): return "<>" return "<>" - -def chk_nxdataaxis_v2(hdf_node, name, logger): - """Check if dataset is an axis""" - own_signal = hdf_node.attrs.get('signal') # check for being a Signal - if own_signal is str and own_signal == "1": - logger.debug("Dataset referenced (v2) as NXdata SIGNAL") - own_axes = hdf_node.attrs.get('axes') # check for being an axis - if own_axes is str: - axes = own_axes.split(':') - for i in len(axes): - if axes[i] and name == axes[i]: - logger.debug("Dataset referenced (v2) as NXdata AXIS #%d", i) - return None - ownpaxis = hdf_node.attrs.get('primary') - own_axis = hdf_node.attrs.get('axis') - if own_axis is int: - # also convention v1 - if ownpaxis is int and ownpaxis == 1: - logger.debug("Dataset referenced (v2) as NXdata AXIS #%d", own_axis - 1) - else: - logger.debug( - "Dataset referenced (v2) as NXdata (primary/alternative) AXIS #%d", own_axis - 1) - return None - - -def chk_nxdataaxis(hdf_node, name, logger): - """NEXUS Data Plotting Standard v3: new version from 2014""" - if not isinstance(hdf_node, h5py.Dataset): # check if it is a field in an NXdata node - return None - parent = hdf_node.parent - if not parent or (parent and not parent.attrs.get('NX_class') == "NXdata"): - return None - signal = parent.attrs.get('signal') # chk for Signal - if signal and name == signal: - logger.debug("Dataset referenced as NXdata SIGNAL") - return None - axes = parent.attrs.get('axes') # check for default Axes - if axes is str: - if name == axes: - logger.debug("Dataset referenced as NXdata AXIS") - return None - elif axes is not None: - for i, j in enumerate(axes): - if name == j: - indices = parent.attrs.get(j + '_indices') - if indices is int: - logger.debug(f"Dataset referenced as NXdata AXIS #{indices}") - else: - logger.debug(f"Dataset referenced as NXdata AXIS #{i}") - return None - indices = parent.attrs.get(name + '_indices') # check for alternative Axes - if indices is int: - logger.debug(f"Dataset referenced as NXdata alternative AXIS #{indices}") - return chk_nxdataaxis_v2(hdf_node, name, logger) # check for older conventions - - # below there are some functions used in get_nxdl_doc function: def write_doc_string(logger, doc, attr): """Simple function that prints a line in the logger if doc exists""" @@ -537,31 +463,6 @@ def other_attrs(logger, orig_elem, elem, nxdl_path, doc, attr): # pylint: disab return logger, elem, nxdl_path, doc, attr -def check_deprecation_enum_axis(variables, doc, elist, attr, hdf_node): - """Check for several attributes. - deprecation - enums - nxdataaxis """ - logger, elem, path = variables - dep_str = elem.attrib.get('deprecated') # check for deprecation - if dep_str: - if doc: - logger.debug("DEPRECATED - " + dep_str) - for base_elem in elist if not attr else [elem]: # check for enums - sdoc = get_nxdl_child(base_elem, 'enumeration', go_base=False) - if sdoc is not None: - if doc: - logger.debug("enumeration (" + get_node_concept_path(base_elem) + "):") - for item in sdoc: - if get_local_name_from_xml(item) == 'item': - if doc: - logger.debug("-> " + item.attrib['value']) - chk_nxdataaxis(hdf_node, path.split('/')[-1], logger) # look for NXdata reference (axes/signal) - for base_elem in elist if not attr else [elem]: # check for doc - sdoc = get_nxdl_child(base_elem, 'doc', go_base=False) - if doc: - logger.debug("documentation (" + get_node_concept_path(base_elem) + "):") - logger.debug(sdoc.text if sdoc is not None else "") - return logger, elem, path, doc, elist, attr, hdf_node - - def get_node_concept_path(elem): """get the short version of nxdlbase:nxdlpath""" return str(elem.get('nxdlbase').split('/')[-1] + ":" + elem.get('nxdlpath')) @@ -770,42 +671,11 @@ def walk_elist(elist, html_name): return elist, html_name -def helper_get_inherited_nodes(hdf_info2, elist, pind, attr): - """find the best fitting name in all children""" - hdf_path, hdf_node, hdf_class_path = hdf_info2 - hdf_name = hdf_path[pind] - hdf_class_name = hdf_class_path[pind] - if pind < len(hdf_path) - (2 if attr else 1): - act_nexus_type = 'group' - elif pind == len(hdf_path) - 1 and attr: - act_nexus_type = 'attribute' - else: - act_nexus_type = 'field' if isinstance(hdf_node, h5py.Dataset) else 'group' - # find the best fitting name in all children - bestfit = -1 - html_name = None - for ind in range(len(elist) - 1, -1, -1): - newelem, fit = get_best_child(elist[ind], - hdf_node, - hdf_name, - hdf_class_name, - act_nexus_type) - if fit >= bestfit and newelem is not None: - html_name = get_node_name(newelem) - return hdf_path, hdf_node, hdf_class_path, elist, pind, attr, html_name - - -def get_hdf_path(hdf_info): - """Get the hdf_path from an hdf_info""" - if 'hdf_path' in hdf_info: - return hdf_info['hdf_path'].split('/')[1:] - return hdf_info['hdf_node'].name.split('/')[1:] - @lru_cache(maxsize=None) def get_inherited_nodes(nxdl_path: str = None, # pylint: disable=too-many-arguments,too-many-locals nx_name: str = None, elem: ET.Element = None, - hdf_node=None, hdf_path=None, hdf_root=None, attr=False): + attr=False): """Returns a list of ET.Element for the given path.""" # let us start with the given definition file elist = [] # type: ignore[var-annotated] @@ -813,32 +683,10 @@ def get_inherited_nodes(nxdl_path: str = None, # pylint: disable=too-many-argum nxdl_elem_path = [elist[0]] class_path = [] # type: ignore[var-annotated] - if hdf_node is not None: - hdf_info = {'hdf_node': hdf_node} - if hdf_path: - hdf_info['hdf_path'] = hdf_path - if hdf_root: - hdf_root['hdf_root'] = hdf_root - hdf_node = hdf_info['hdf_node'] - hdf_path = get_hdf_path(hdf_info) - hdf_class_path = get_nx_class_path(hdf_info).split('/')[1:] - if attr: - hdf_path.append(attr) - hdf_class_path.append(attr) - path = hdf_path - else: - html_path = nxdl_path.split('/')[1:] - path = html_path + html_path = nxdl_path.split('/')[1:] + path = html_path for pind in range(len(path)): - if hdf_node is not None: - hdf_info2 = [hdf_path, hdf_node, hdf_class_path] - [hdf_path, hdf_node, hdf_class_path, elist, - pind, attr, html_name] = helper_get_inherited_nodes(hdf_info2, elist, - pind, attr) - if html_name is None: # return if NOT IN SCHEMA - return (class_path, nxdl_elem_path, None) - else: - html_name = html_path[pind] + html_name = html_path[pind] elist, html_name = walk_elist(elist, html_name) if elist: class_path.append(get_nx_class(elist[0]))