Source code for tf_pwa.utils

"""
This module provides some functions that may be useful in other modules.
"""
import functools
import json
import math
import time
import warnings

import numpy as np


[docs] class AttrDict(dict): __setattr__ = dict.__setitem__ __getattr__ = dict.__getitem__
has_yaml = True try: import yaml except ImportError: has_yaml = False def _load_json_file(name): with open(name) as f: return json.load(f) def _load_yaml_file(name): with open(name) as f: return yaml.load(f, Loader=yaml.FullLoader)
[docs] def create_dir(name): import os dirname = os.path.dirname(name) if dirname == "": return True if not os.path.exists(dirname): os.makedirs(dirname, exist_ok=True) return False return True
[docs] def load_config_file(name): """ Load config file such as **Resonances.yml**. :param name: File name. Either yml file or json file. :return: Dictionary read from the file. """ if name.endswith("json"): return _load_json_file(name) if has_yaml: if name.endswith("yml"): return _load_yaml_file(name) return _load_yaml_file(name + ".yml") else: print("No yaml support, using json file") return _load_json_file(name + ".json")
[docs] def flatten_dict_data(data, fun="{}/{}".format): """ Flatten nested dictionary data into one layer dictionary. :return: Dictionary """ if isinstance(data, dict): ret = {} for i in data: tmp = flatten_dict_data(data[i]) if isinstance(tmp, dict): for j in tmp: ret[fun(i, j)] = tmp[j] else: ret[i] = tmp return ret else: return data
flatten_np_data = lambda data: flatten_dict_data( data, fun=lambda x, y: "{}{}".format(y, x[3:]) )
[docs] def error_print(x, err=None, dig=None): """ It returns a format string "value +/- error". The precision is modified according to ``err`` :param x: Value :param err: Error :return: String """ if err is None: return ("{}").format(x) if err <= 0 or math.isnan(err): return ("{} ? {}").format(x, err) d = math.ceil(math.log10(err)) b = 10**d b_err = err / b b_val = x / b if dig is None: if b_err < 0.355: # 0.100 ~ 0.354 dig = 2 elif b_err < 0.950: # 0.355 ~ 0.949 dig = 1 else: # 0.950 ~ 0.999 dig = 0 err = round(b_err, dig) * b x = round(b_val, dig) * b d_p = dig - d if d_p > 0: return ("{0:.%df} +/- {1:.%df}" % (d_p, d_p)).format(x, err) return ("{0:.0f} +/- {1:.0f}").format(x, err)
[docs] def pprint(dicts): """ Print dictionary using json format. """ try: s = json.dumps(dicts, indent=2) print(s, flush=True) except: print(dicts, flush=True)
[docs] def std_polar(rho, phi): """ To standardize a polar variable. By standard form, it means :math:`\\rho>0, -\\pi<\\phi<\\pi`. :param rho: Real number :param phi: Real number :return: ``rho``, ``phi`` """ if rho < 0: rho = -rho phi += math.pi while phi < -math.pi: phi += 2 * math.pi while phi > math.pi: phi -= 2 * math.pi return rho, phi
[docs] def deep_iter(base, deep=1): for i in base: if deep == 1: yield [i] else: for j in deep_iter(base, deep - 1): yield [i] + j
[docs] def deep_ordered_iter(base, deep=1): ids = list(base) size = len(ids) for i in deep_ordered_range(size, deep): yield [ids[j] for j in i]
[docs] def deep_ordered_range(size, deep=1, start=0): for i in range(start, size): if deep <= 1: yield [i] else: for j in deep_ordered_range(size, deep - 1, i + 1): yield [i] + j
# from amplitude.py
[docs] def is_complex(x): """ If **x** is of type ``complex``, it returns ``True``. """ try: y = complex(x) except: return False return True
# from model.py
[docs] def array_split(data, batch=None): """Split a data array. **batch** is the number of data in a row.""" if batch is None: return [data] ret = [] n_data = data[0].shape[0] n_split = (n_data + batch - 1) // batch for i in range(n_split): tmp = [] for data_i in data: tmp.append(data_i[i * batch : min(i * batch + batch, n_data)]) ret.append(tmp) return ret
[docs] def time_print(f): """It provides a wrapper to print the time cost on a process.""" @functools.wraps(f) def g(*args, **kwargs): now = time.time() ret = f(*args, **kwargs) print(f.__name__, " cost time:", time.time() - now) return ret return g
[docs] def std_periodic_var(p, mid=0.0, pi=math.pi): """ Transform a periodic variable into its range. >>> std_periodic_var(math.pi) -3.1415... >>> std_periodic_var(2*math.pi + 0.01) 0.0... :param p: Value :param mid: The middle value :param pi: Half-range :return: The transformed value """ twopi = 2 * pi while p < mid - pi: p += twopi while p >= mid + pi: p -= twopi return p
[docs] def check_positive_definite(m): """check if matrix m is postive definite >>> check_positive_definite([[1.0,0.0],[0.0, 0.1]]) True >>> check_positive_definite([[1.0,0.0],[1.0,-0.1]]) eigvalues: [-0.1 1. ] False """ e, v = np.linalg.eig(m) if np.all(e > 0.0): return True warnings.warn("matrix is not positive definited") print("eigvalues: ", e) return False
[docs] def tuple_table(fit_frac, ignore_items=["sum_diag"]): names = [] for i in fit_frac: if isinstance(i, str): names.append(i) for i in ignore_items: if i in names: names.remove(i) n_items = len(names) table = [[None] * (n_items + 1) for i in range(n_items + 1)] for k, v in fit_frac.items(): if isinstance(k, tuple): a, b = k table[names.index(a) + 1][names.index(b) + 1] = v elif k in ignore_items: continue else: a, b = k, k table[names.index(a) + 1][names.index(b) + 1] = v for i, name in enumerate(names): table[i + 1][0] = name table[0][i + 1] = name return table
[docs] def save_frac_csv(file_name, fit_frac): import csv table = tuple_table(fit_frac) with open(file_name, "w") as f: f_csv = csv.writer(f) f_csv.writerows(table)
[docs] def fit_normal(data, weights=None): """ Fit data distribution with Gaussian distribution. Though minimize the negative log likelihood function .. math:: - \\ln L = \\frac{1}{2}\\sum w_i \\frac{(\\mu - x_i )^2}{\\sigma^2} + (\\sum w_i) \\ln (\\sqrt{2\pi} \\sigma ) the fit result can be solved as .. math:: \\frac{\\partial (-\\ln L)}{\\partial \\mu} = 0 \\Rightarrow \\bar{\\mu} = \\frac{\\sum w_i x_i}{ \\sigma^2 \\sum w_i} .. math:: \\frac{\\partial (-\\ln L)}{\\partial \\sigma} = 0 \\Rightarrow \\bar{\\sigma} = \\sqrt{\\frac{\\sum w_i (\\bar{\\mu} - x_i)^2}{\\sum w_i}} From hessian .. math:: \\frac{\\partial^2 (-\\ln L)}{\\partial \\mu^2} = \\frac{\\sum w_i}{\\sigma^2} .. math:: \\frac{\\partial^2 (-\\ln L)}{\\partial \\sigma^2} = 3\\sum \\frac{\\sum w_i (\\mu - x)^2}{\\sigma^4} - \\frac{\\sum w_i}{\\sigma^2} the error matrix can wrotten as [[ :math:`\\bar{\\sigma}^2/N` , 0], [0, :math:`\\bar{\\sigma}^2/(2N)` ]] . """ if weights is None: weights = np.ones_like(data) else: weights = np.sum(weights) / np.sum(weights**2) * weights N = np.sum(weights) mu = np.sum(weights * data) / N sigma = np.sqrt(np.sum(weights * (data - mu) ** 2) / N) mu_error = sigma / np.sqrt(N) sigma_error = mu_error / np.sqrt(2) return np.array([mu, sigma]), np.array([mu_error, sigma_error])
[docs] def create_test_config(model_name, params={}, plot_params={}): from tf_pwa.config_loader import ConfigLoader config_dic = { "data": {"dat_order": ["B", "C", "D"]}, "decay": {"A": [["R_BC", "D"]], "R_BC": ["B", "C"]}, "particle": { "$top": {"A": {"J": 0, "P": -1, "mass": 1.0}}, "$finals": { "B": {"J": 0, "P": -1, "mass": 0.1}, "C": {"J": 0, "P": -1, "mass": 0.1}, "D": {"J": 0, "P": -1, "mass": 0.1}, }, "R_BC": { "J": 0, "P": +1, "mass": 0.5, "width": 0.05, "model": model_name, }, }, } config_dic["particle"]["R_BC"].update(params) config = ConfigLoader(config_dic) config.set_params({"A->R_BC.DR_BC->B.C_total_0r": 1.0}) config.set_params({"A->R_BC.DR_BC->B.C_total_0i": 0.0}) config.set_params(plot_params) return config
[docs] def plot_particle_model( model_name, params={}, plot_params={}, axis=None, special_points=None ): import matplotlib.pyplot as plt import numpy as np config = create_test_config(model_name, params, plot_params) f = config.get_particle_function("R_BC") m = np.linspace(0.2, 0.9 - 1e-12, 2000) a = f(m).numpy() if special_points is not None: special_points = np.array(special_points) at = f(special_points).numpy() if axis is None: ax3 = plt.subplot(2, 2, 3, label="argon") ax2 = plt.subplot(2, 2, 2, label="prob") ax1 = plt.subplot(2, 2, 1, sharex=ax3, label="real") ax0 = plt.subplot(2, 2, 4, sharex=ax2, sharey=ax3, label="imag") else: ax0, ax1, ax2, ax3 = axis ax3.plot(np.real(a), np.imag(a)) if special_points is not None: ax3.scatter(np.real(at), np.imag(at)) ax3.set_xlabel("Re$A$") ax3.set_ylabel("Im$A$") ax2.plot(m, np.abs(a) ** 2, label=model_name) if special_points is not None: ax2.scatter(special_points, np.abs(at) ** 2) ax2.set_ylabel("$|A|^2$") ax2.set_ylim((0, None)) ax2.axvline(x=0.2, linestyle="--") ax2.yaxis.set_label_position("right") ax2.yaxis.tick_right() ax1.plot(np.real(a), m) if special_points is not None: ax1.scatter(np.real(at), special_points) ax1.set_ylabel("mass") ax0.plot(m, np.imag(a)) if special_points is not None: ax0.scatter(special_points, np.imag(at)) ax0.set_xlabel("mass") return [ax0, ax1, ax2, ax3]
[docs] def plot_pole_function( model_name, params={}, plot_params={}, axis=None, **kwargs ): import matplotlib.pyplot as plt import numpy as np config = create_test_config(model_name, params, plot_params) p = config.get_decay().get_particle("R_BC") f = p.pole_function() mx = np.linspace(0.2, 0.9 - 1e-12, 2000) my = np.linspace(-0.1, 0.1, 2000) a = np.abs(1 / f(mx + 1.0j * my[:, None])) ** 2 if axis is None: axis = plt.gca() axis.contour(mx, my, a, linewidths=3) axis.set_xlabel("Re$\\sqrt{s}$") axis.set_ylabel("Im$\\sqrt{s}$") return axis
[docs] def search_interval(px, cl=0.6826894921370859, xrange=(0, 1)): """ Search interval (a, b) that satisfly :math:`p(a)=p(b)` and .. math:: \\frac{\\int_{a}^{b} p(x) dx}{\\int_{x_{min]}^{x_{max}} p(x) dx} = cl >>> x = np.linspace(-10, 10, 10000) >>> a, b = search_interval(np.exp(-x**2/2), xrange=(-10, 10)) >>> assert abs(a+1) < 0.01 >>> assert abs(b-1) < 0.01 """ n = px.shape[0] px = px / np.sum(px) idx = np.argsort(px)[::-1] y = px[idx] f = np.cumsum(y) cut = f < cl inner = idx[cut] left = (np.min(inner) - 1) / px.shape[0] right = (np.max(inner) + 1) / px.shape[0] a, b = xrange return a + (b - a) * left, a + (b - a) * right
[docs] def combine_asym_error(errs, N=10000): """ combine asymmetry uncertanties using convolution >>> a, b = combine_asym_error([[-0.4, 0.4], 0.3]) >>> assert abs(a+0.5) < 0.01 >>> assert abs(b-0.5) < 0.01 """ from scipy.signal import convolve errs = [[i, i] if isinstance(i, float) else i for i in errs] errs = np.abs(np.stack(errs)) max_range = np.max(np.sqrt(np.sum(errs**2, axis=0))) xrange = -max_range * 10, max_range * 10 x = np.linspace(*xrange, N) y = np.exp(-(x**2) / 2 / np.where(x > 0, errs[-1, 1], errs[-1, 0]) ** 2) y = y / np.sum(y) for i in range(errs.shape[0] - 1): tmp = np.exp( -(x**2) / 2 / np.where(x > 0, errs[i, 1], errs[i, 0]) ** 2 ) tmp = tmp / np.sum(tmp) y = convolve(y, tmp, mode="same") return search_interval(y, xrange=xrange)