Source code for metadataStore.utilities.utility

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
import numpy as np
import six
from collections import defaultdict
import six
import itertools


[docs]def get_calib_dict(run_header): """ Return the calibration dictionary that is saved in the run_header Parameters ---------- run_header : dict Run header to convert events to lists. Can only be one header. Returns ------- dict Dictionary that contains all information inside the run_header's beamline_config key. If there are multiple 'configs' then the return value is a nested dictionary keyed on config_# for the number of config dicts bool True: Multiple 'config' sections were present, dict is a nested dict False: One 'config' section was present, dict is not a nested dict """ nested = True calib_dict = {key: run_header['configs'][key]['config_params'] for key in run_header['configs']} # if there is only one configs section, no need to return an extra calib if len(calib_dict) == 1: calib_dict = calib_dict[list(calib_dict)[0]] nested = False return calib_dict, nested
[docs]def get_data_keys(run_header): """ Return the names of the data keys. This function assumes that there is only one event descriptor in a run header Parameters ---------- run_header : dict Run header to convert events to lists. Can only be one header. Returns ------- list List of the data keys in the run header keyed by the event_descriptor name """ # print('run_header: {0}'.format(list(run_header))) try: for ev_desc_key, ev_desc_dict in six.iteritems(run_header[u'event_descriptors']): descriptor_name = ev_desc_dict['descriptor_name'] try: for ev_key, ev_dict in six.iteritems(ev_desc_dict['events']): # this assumes all events in an event_descriptor have the same keys return list(ev_dict['data']) except KeyError as e: raise ValueError('Header does not include any events. Make ' 'sure data=True in search() and header ' 'includes events.\nOriginal error: {0}'.format(e)) except KeyError as e: raise KeyError('Header does not include "event_descriptors". ' 'Original Error: {0}'.format(e))
[docs]def get_data_keys_futureproof(run_header): """ Return the names of the data keys. This function assumes that there is only one event descriptor in a run header Note: This function only works for one event_descriptor per run_header. As soon as we start getting multiple event_descriptors per run header this function will need to be modified Parameters ---------- run_header : dict Run header to convert events to lists. Can only be one header. Returns ------- dict List of the data keys in the run header keyed by the event_descriptor descriptor_name. If descriptor_name is not unique for all event_descriptors, see Notes 1 Notes ----- 1. the descriptor_name field in event_descriptors is assumed to be unique. If it is not, then append the last four characters of _id to it. Ideally this would be something more like a PV name and less hostile than the hashed _id field """ # print('run_header: {0}'.format(list(run_header))) ev_keys = {} try: # get the event descriptor keys # ev_desc_names = [ev_desc_dict['descriptor_name'] for ev_desc_dict in # six.itervalues(run_header[u'event_descriptors'])] # if len(ev_desc_names) != len(set(ev_desc_names)): # # there is at least one name collision in ev_desc_keys # raise NotImplementedError("You passed me a run header with " # "multiple event_descriptors of the same " # "name. I cannot handle this yet. Go yell" # " at Arman or Eric.\nDescriptor names: " # "{0}".format(ev_desc_names)) try: for (ev_desc_key, ev_desc_dict) in six.iteritems( run_header[u'event_descriptors']): descriptor_name = ev_desc_dict['descriptor_name'] ev_keys[ev_desc_key] = { 'name': ev_desc_dict['descriptor_name'], 'data_keys': list( six.itervalues(ev_desc_dict['events']).next()['data']), } except KeyError as e: raise ValueError('Header does not include any events. Make ' 'sure data=True in search() and header ' 'includes events.' '\nOriginal error: {0}'.format(e)) # # try: # for ev_key, ev_dict in six.iteritems(ev_desc_dict['events']): # # this assumes all events in an event_descriptor have the # # same keys. Which they should... # return list(ev_dict['data']) # except KeyError as e: # raise ValueError('Header does not include any events. Make ' # 'sure data=True in search() and header ' # 'includes events.' # '\nOriginal error: {0}'.format(e)) except KeyError as e: print(e) raise KeyError('Header does not include "event_descriptors". ' '\nOriginal Error: {0}'.format(e)) return ev_keys
[docs]def listify(run_header, data_keys=None, bash_to_lower=True): """Transpose the events into lists from this: run_header : { "event_0_data" : {"key1": "val1", "key2": "val2", ...}, "event_1_data" : {"key1": "val1", "key2": "val2", ...}, ... } to this: {"key1": [val1, val2, ...], "key2" = [val1, val2, ...], "keyN" = [val1, val2, ...], "time" = [time1, time2, ...], } Parameters ---------- run_header : dict Run header to convert events to lists event_descriptors_key : str Name of the event_descriptor data_keys : hashable or list, optional - If data_key is a valid key for an event_data entry, turn that event_data into a list - If data_key is a list of valid keys for event_data entries, turn those event_data keys into lists - If data_key is None, turn all event data keys into lists bash_to_lower : boolean True: Compare strings after casting to lowercase False: Compare strings without casting to lowercase Returns ------- dict data is keyed on run header data keys with an extra field 'time' """ # get the keys from the run header header_keys = get_data_keys(run_header) if len(header_keys) == 1: # turn it in to a list header_keys = [header_keys] # set defaults if necessary if data_keys is None: data_keys = header_keys try: # check for data_keys being iterable data_keys.__iter__() except AttributeError: # turn data_keys in to a list data_keys = [data_keys] if not 'time' in data_keys: data_keys.append('time') # print('data_keys: {0}'.format(data_keys)) # forcibly cast to lower case if bash_to_lower: data_keys, header_keys = [[k.lower() for k in key_tmp] for key_tmp in (data_keys, header_keys)] # listify the data in the run header data_dict = defaultdict(list) # print('data_dict keys: {0}'.format(list(data_dict))) for ev_desc_key, ev_desc_dict in \ six.iteritems(run_header['event_descriptors']): data_key = list(ev_desc_dict['events']) # data_key.sort(key=lambda x: int(x.split("_")[-1])) # print("sorted data keys: {0}".format(data_key)) for index, (ev_key) in enumerate(data_key): ev_dict = ev_desc_dict['events'][ev_key] for data_key, data in six.iteritems(ev_dict['data']): if data_key in data_keys: data_dict[data_key].append(data) return data_dict
def tablify(run_headers): if isinstance(run_headers, dict): header_keys = run_headers.keys() table = dict() desc_list = list() table['headers'] = header_keys desc_fields = list() for entry in header_keys: descriptor_keys = run_headers[entry]['event_descriptors'].keys() desc_list.append(descriptor_keys) temp = list() for desc in descriptor_keys: temp.append(run_headers[entry]['event_descriptors'][desc]) desc_fields.append(temp) table['descriptor_fields'] = desc_fields table['event_descriptors'] = desc_list else: raise TypeError('Header must be a python dict') return table if __name__ == "__main__": from metadataStore.userapi.commands import search return_dict = search(data=True, scan_id=388, owner='edill') print('search_dict: {0}'.format(return_dict)) keys = list(return_dict) return_dict = return_dict[keys[0]] print('search_dict: {0}'.format(return_dict)) keys = get_data_keys(return_dict) print('keys: {0}'.format(keys)) data = listify(return_dict, u'ub') print("data: {0}".format(data)) print("search_dict keys: {0}".format(list(return_dict))) print('calibration_dict: {0}'.format(get_calib_dict(return_dict)))