Source code for tf_pwa.model.cfit

import numpy as np

from ..config import create_config
from ..data import EvalLazy, data_shape, split_generator
from ..tensorflow_wrapper import tf
from .model import (
    Model,
    clip_log,
    register_nll_model,
    sum_gradient,
    sum_hessian,
)
from .opt_int import build_amp, sum_gradient_data2

set_function, get_function, register_function = create_config()


[docs] @register_function("default_bg") def f_bg(data): return data.get("bg_value", tf.ones((data_shape(data),), dtype="float64"))
[docs] @register_function("default_eff") def f_eff(data): return data.get("eff_value", tf.ones((data_shape(data),), dtype="float64"))
[docs] @register_nll_model("cfit") class Model_cfit(Model): def __init__( self, amp, w_bkg=0.001, bg_f=None, eff_f=None, resolution_size=1 ): super().__init__(amp, w_bkg, resolution_size) if bg_f is None: bg_f = get_function("default_bg") elif isinstance(bg_f, str): bg_f = get_function(bg_f) if eff_f is None: eff_f = get_function("default_eff") elif isinstance(eff_f, str): eff_f = get_function(eff_f) self.bg = EvalLazy(bg_f) self.vm = amp.vm self.w_bkg = w_bkg self.eff = eff_f self.sig = EvalLazy(lambda x: self.eff(x) * self.Amp(x))
[docs] def nll( self, data, mcdata, weight: tf.Tensor = 1.0, batch=None, bg=None, mc_weight=None, ): """ Calculate NLL. .. math:: -\\ln L = -\\sum_{x_i \\in data } w_i \\ln P(x_i;\\theta_k) \\\\ P(x_i;\\theta_k) = (1-f_{bg})Amp(x_i; \\theta_k) + f_{bg} Bg(x_{i};\\theta_k) :param data: Data array :param mcdata: MCdata array :param weight: Weight of data??? :param batch: The length of array to calculate as a vector at a time. How to fold the data array may depend on the GPU computability. :param bg: Background data array. It can be set to ``None`` if there is no such thing. :return: Real number. The value of NLL. """ data, weight = self.get_weight_data(data, weight) sw = tf.reduce_sum(weight) weight_norm = self.sum_resolution(weight) sig_data = self.sum_resolution(weight * self.sig(data)) / weight_norm bg_data = self.sum_resolution(weight * self.bg(data)) / weight_norm if mc_weight is None: int_mc = tf.reduce_mean(self.sig(mcdata)) int_bg = tf.reduce_mean(self.bg(mcdata)) else: int_mc = tf.reduce_sum(mc_weight * self.sig(mcdata)) int_bg = tf.reduce_sum(mc_weight * self.bg(mcdata)) ln_data = tf.math.log( (1 - self.w_bkg) * sig_data / int_mc + self.w_bkg * bg_data / int_bg ) nll_0 = -tf.reduce_sum(tf.cast(weight_norm, ln_data.dtype) * ln_data) return nll_0
[docs] def nll_grad_batch(self, data, mcdata, weight, mc_weight): r""" .. math:: P = (1-frac) \frac{amp(data)}{\sum amp(phsp)} + frac \frac{bg(data)}{\sum bg(phsp)} .. math:: nll = - \sum log(p) .. math:: \frac{\partial nll}{\partial \theta} = -\sum \frac{1}{p} \frac{\partial p}{\partial \theta} = - \sum \frac{\partial \ln \bar{p}}{\partial \theta } +\frac{\partial nll}{\partial I_{sig} } \frac{\partial I_{sig} }{\partial \theta} +\frac{\partial nll}{\partial I_{bg}}\frac{\partial I_{sig}}{\partial \theta} """ var = self.vm.trainable_variables mcdata = list(mcdata) mc_weight = list(mc_weight) int_sig, g_int_sig = sum_gradient(self.sig, mcdata, var, mc_weight) int_bg, g_int_bg = sum_gradient(self.bg, mcdata, var, mc_weight) v_int_sig, v_int_bg = ( tf.Variable(int_sig, dtype="float64"), tf.Variable(int_bg, dtype="float64"), ) def prob(x): return (1 - self.w_bkg) * self.sig( x ) / v_int_sig + self.w_bkg * self.bg(x) / v_int_bg ll, g_ll = sum_gradient( prob, data, var + [v_int_sig, v_int_bg], weight, trans=clip_log, resolution_size=self.resolution_size, ) g_ll_sig, g_ll_bg = g_ll[-2], g_ll[-1] g = [ -g_ll[i] - g_int_sig[i] * g_ll_sig - g_int_bg[i] * g_ll_bg for i in range(len(var)) ] return -ll, g
[docs] def nll_grad_hessian( self, data, mcdata, weight=1.0, batch=24000, bg=None, mc_weight=1.0 ): r""" The parameters are the same with ``self.nll()``, but it will return Hessian as well. .. math:: \frac{\partial^2 f}{\partial x_i \partial x_j} = \frac{\partial y_k}{\partial x_i} \frac{\partial^2 f}{\partial y_k \partial y_l} \frac{\partial y_l}{\partial x_j} + \frac{\partial f}{\partial y_k} \frac{\partial^2 y_k}{\partial x_i \partial x_j} .. math:: y = \{x_i; I_{sig}, I_{bg}\} .. math:: \frac{\partial y_k }{\partial x_i} = (\delta_{ik};\frac{\partial I_{sig}}{\partial x_i}, \frac{\partial I_{bg}}{\partial x_i}) :return NLL: Real number. The value of NLL. :return gradients: List of real numbers. The gradients for each variable. :return Hessian: 2-D Array of real numbers. The Hessian matrix of the variables. """ data, weight = self.get_weight_data(data, weight, bg=bg) if isinstance(mc_weight, float): mc_weight = tf.convert_to_tensor( [mc_weight] * data_shape(mcdata), dtype="float64" ) n_mc = tf.reduce_sum(mc_weight) sw = tf.reduce_sum(weight) var = self.vm.trainable_variables int_sig, g_int_sig, h_int_sig = sum_hessian( self.sig, split_generator(mcdata, batch), var, weight=split_generator(mc_weight, batch), ) int_bg, g_int_bg, h_int_bg = sum_hessian( self.bg, split_generator(mcdata, batch), var, weight=split_generator(mc_weight, batch), ) v_int_sig, v_int_bg = ( tf.Variable(int_sig, dtype="float64"), tf.Variable(int_bg, dtype="float64"), ) def prob(x): return (1 - self.w_bkg) * self.sig( x ) / v_int_sig + self.w_bkg * self.bg(x) / v_int_bg ll, g_ll, h_ll = sum_hessian( prob, split_generator(data, batch), var + [v_int_sig, v_int_bg], weight=split_generator(weight, batch), trans=clip_log, resolution_size=self.resolution_size, ) n_var = len(var) g_ll_sig, g_ll_bg = g_ll[-2], g_ll[-1] g = [ -g_ll[i] - g_int_sig[i] * g_ll_sig - g_int_bg[i] * g_ll_bg for i in range(len(var)) ] jac = np.concatenate([np.eye(n_var), [g_int_sig, g_int_bg]], axis=0) h = ( np.dot(jac.T, np.dot(h_ll, jac)) + g_ll_sig * h_int_sig + g_ll_bg * h_int_bg ) return -ll, g, -h
[docs] @register_nll_model("cfit_cached") class Model_cfit_cached(Model_cfit): def __init__(self, amp, w_bkg=0.001, bg_f=None, eff_f=None): super().__init__(amp, w_bkg, bg_f, eff_f) self.cached_amp = build_amp.build_amp2s(amp.decay_group) self.cached_data = {}
[docs] def nll_grad_batch(self, data, mcdata, weight, mc_weight): var = self.vm.trainable_variables data_id = id(data) mc_id = id(mcdata) mcdata = list(mcdata) mc_weight = list(mc_weight) if data_id not in self.cached_data: self.cached_data[data_id] = [ build_amp.build_angle_amp_matrix(self.Amp.decay_group, i)[1] for i in data ] if mc_id not in self.cached_data: self.cached_data[mc_id] = [ build_amp.build_angle_amp_matrix(self.Amp.decay_group, i)[1] for i in mcdata ] int_sig, g_int_sig = sum_gradient_data2( self.cached_amp, self.Amp.trainable_variables, mcdata, self.cached_data[mc_id], weight=mc_weight, ) int_bg, g_int_bg = sum_gradient(self.bg, mcdata, var, mc_weight) v_int_sig, v_int_bg = ( tf.Variable(int_sig, dtype="float64"), tf.Variable(int_bg, dtype="float64"), ) def prob(x, c_data): return (1 - self.w_bkg) * self.eff(x) * self.cached_amp( x, c_data ) / v_int_sig + self.w_bkg * self.bg(x) / v_int_bg ll, g_ll = sum_gradient_data2( prob, var + [v_int_sig, v_int_bg], data, self.cached_data[data_id], weight=weight, trans=clip_log, ) g_ll_sig, g_ll_bg = g_ll[-2], g_ll[-1] g = [ -g_ll[i] - g_int_sig[i] * g_ll_sig - g_int_bg[i] * g_ll_bg for i in range(len(var)) ] return -ll, g
[docs] @register_nll_model("cfit_extended") class ModelCfitExtended(Model): def __init__(self, amp, w_bkg=0.001, bg_f=None, eff_f=None): super().__init__(amp, w_bkg) if bg_f is None: bg_f = get_function("default_bg") elif isinstance(bg_f, str): bg_f = get_function(bg_f) if eff_f is None: eff_f = get_function("default_eff") elif isinstance(eff_f, str): eff_f = get_function(eff_f) self.bg = bg_f self.vm = amp.vm self.w_bkg = w_bkg self.eff = eff_f self.sig = lambda x: self.eff(x) * self.Amp(x)
[docs] def nll( self, data, mcdata, weight: tf.Tensor = 1.0, batch=None, bg=None, mc_weight=None, ): """ Calculate NLL. .. math:: -\\ln L = -\\sum_{x_i \\in data } w_i \\ln P(x_i;\\theta_k) \\\\ P(x_i;\\theta_k) = (1-f_{bg})Amp(x_i; \\theta_k) + f_{bg} Bg(x_{i};\\theta_k) .. math:: -\\ln L_{2} = -\\ln ( L \\lambda^{N_{data}} / {N_{data}}! e^{-\\lambda}) = -\\ L - N_{data} \\ln \\lambda + \\lambda + C .. math:: \\lambda = 1/(1-f_{bg}) \int Amp(x_i; \\theta_k) d \\Phi :param data: Data array :param mcdata: MCdata array :param weight: Weight of data??? :param batch: The length of array to calculate as a vector at a time. How to fold the data array may depend on the GPU computability. :param bg: Background data array. It can be set to ``None`` if there is no such thing. :return: Real number. The value of NLL. """ data, weight = self.get_weight_data(data, weight) sw = tf.reduce_sum(weight) sig_data = self.sig(data) bg_data = self.bg(data) if mc_weight is None: int_mc = tf.reduce_mean(self.sig(mcdata)) int_bg = tf.reduce_mean(self.bg(mcdata)) else: int_mc = tf.reduce_sum(mc_weight * self.sig(mcdata)) int_bg = tf.reduce_sum(mc_weight * self.bg(mcdata)) ln_data = tf.math.log( (1 - self.w_bkg) * sig_data / int_mc + self.w_bkg * bg_data / int_bg ) n_exp = int_mc / (1 - self.w_bkg) nll_0 = ( -tf.reduce_sum(tf.cast(weight, ln_data.dtype) * ln_data) - sw * tf.math.log(n_exp) + n_exp ) return nll_0
[docs] def nll_grad_batch(self, data, mcdata, weight, mc_weight): r""" .. math:: P = (1-frac) \frac{amp(data)}{\sum amp(phsp)} + frac \frac{bg(data)}{\sum bg(phsp)} .. math:: nll = - \sum log(p) .. math:: \frac{\partial nll}{\partial \theta} = -\sum \frac{1}{p} \frac{\partial p}{\partial \theta} = - \sum \frac{\partial \ln \bar{p}}{\partial \theta } +\frac{\partial nll}{\partial I_{sig} } \frac{\partial I_{sig} }{\partial \theta} +\frac{\partial nll}{\partial I_{bg}}\frac{\partial I_{sig}}{\partial \theta} """ var = self.vm.trainable_variables sw = tf.reduce_sum(weight) mcdata = list(mcdata) mc_weight = list(mc_weight) int_sig, g_int_sig = sum_gradient(self.sig, mcdata, var, mc_weight) int_bg, g_int_bg = sum_gradient(self.bg, mcdata, var, mc_weight) v_int_sig, v_int_bg = ( tf.Variable(int_sig, dtype="float64"), tf.Variable(int_bg, dtype="float64"), ) def prob(x): return (1 - self.w_bkg) * self.sig( x ) / v_int_sig + self.w_bkg * self.bg(x) / v_int_bg ll, g_ll = sum_gradient( prob, data, var + [v_int_sig, v_int_bg], weight, trans=clip_log ) ret = ( -ll - sw * tf.math.log(int_sig / (1 - self.w_bkg)) + int_sig / (1 - self.w_bkg) ) g_ll_sig, g_ll_bg = g_ll[-2], g_ll[-1] g = [ -g_ll[i] - g_int_sig[i] * g_ll_sig - g_int_bg[i] * g_ll_bg - sw / int_sig * g_int_sig[i] + g_int_sig[i] / (1 - self.w_bkg) for i in range(len(var)) ] return ret, g
[docs] def nll_grad_hessian( self, data, mcdata, weight=1.0, batch=24000, bg=None, mc_weight=1.0 ): r""" The parameters are the same with ``self.nll()``, but it will return Hessian as well. .. math:: \frac{\partial^2 f}{\partial x_i \partial x_j} = \frac{\partial y_k}{\partial x_i} \frac{\partial^2 f}{\partial y_k \partial y_l} \frac{\partial y_l}{\partial x_j} + \frac{\partial f}{\partial y_k} \frac{\partial^2 y_k}{\partial x_i \partial x_j} .. math:: y = \{x_i; I_{sig}, I_{bg}\} .. math:: \frac{\partial y_k }{\partial x_i} = (\delta_{ik};\frac{\partial I_{sig}}{\partial x_i}, \frac{\partial I_{bg}}{\partial x_i}) :return NLL: Real number. The value of NLL. :return gradients: List of real numbers. The gradients for each variable. :return Hessian: 2-D Array of real numbers. The Hessian matrix of the variables. """ data, weight = self.get_weight_data(data, weight, bg=bg) if isinstance(mc_weight, float): mc_weight = tf.convert_to_tensor( [mc_weight] * data_shape(mcdata), dtype="float64" ) n_mc = tf.reduce_sum(mc_weight) sw = tf.reduce_sum(weight) var = self.vm.trainable_variables int_sig, g_int_sig, h_int_sig = sum_hessian( self.sig, split_generator(mcdata, batch), var, weight=split_generator(mc_weight, batch), ) int_bg, g_int_bg, h_int_bg = sum_hessian( self.bg, split_generator(mcdata, batch), var, weight=split_generator(mc_weight, batch), ) v_int_sig, v_int_bg = ( tf.Variable(int_sig, dtype="float64"), tf.Variable(int_bg, dtype="float64"), ) def prob(x): return (1 - self.w_bkg) * self.sig( x ) / v_int_sig + self.w_bkg * self.bg(x) / v_int_bg ll, g_ll, h_ll = sum_hessian( prob, split_generator(data, batch), var + [v_int_sig, v_int_bg], weight=split_generator(weight, batch), trans=clip_log, ) ret = ( -ll - sw * tf.math.log(int_sig / (1 - self.w_bkg)) + int_sig / (1 - self.w_bkg) ) n_var = len(var) g_ll_sig, g_ll_bg = g_ll[-2], g_ll[-1] g = [ -g_ll[i] - g_int_sig[i] * g_ll_sig - g_int_bg[i] * g_ll_bg - sw / int_sig * g_int_sig[i] + g_int_sig[i] / (1 - self.w_bkg) for i in range(len(var)) ] jac = np.concatenate([np.eye(n_var), [g_int_sig, g_int_bg]], axis=0) h = ( np.dot(jac.T, np.dot(h_ll, jac)) + g_ll_sig * h_int_sig + g_ll_bg * h_int_bg + sw * ( h_int_sig / int_sig - np.outer(g_int_sig, g_int_sig) / int_sig / int_sig ) - h_int_sig / (1 - self.w_bkg) ) return ret, g, -h