Source code for tf_pwa.fit

import json
import time

import numpy as np
import tensorflow as tf
from scipy.optimize import BFGS, basinhopping, minimize

from .fit_improve import Cached_FG
from .fit_improve import minimize as my_minimize
from .utils import time_print


[docs] class LargeNumberError(ValueError): pass
[docs] def fit_minuit(fcn, bounds_dict={}, hesse=True, minos=False, **kwargs): try: import iminuit except ImportError: raise RuntimeError( "You haven't installed iminuit so you can't use Minuit to fit." ) """ :param fcn: :param bounds_dict: :param hesse: :param minos: :return: """ bounds_dict = bounds_dict.copy() for k, v in fcn.vm.bnd_dic.items(): if v.lower is not None and v.upper is not None: bounds_dict[k] = (v.lower, v.upper) if int(iminuit.__version__[0]) < 2: return fit_minuit_v1( fcn, bounds_dict=bounds_dict, hesse=hesse, minos=minos, **kwargs ) return fit_minuit_v2( fcn, bounds_dict=bounds_dict, hesse=hesse, minos=minos, **kwargs )
[docs] def fit_minuit_v1(fcn, bounds_dict={}, hesse=True, minos=False, **kwargs): """ :param fcn: :param bounds_dict: :param hesse: :param minos: :return: """ from iminuit import Minuit var_args = {} var_names = fcn.vm.trainable_vars for i in var_names: var_args[i] = fcn.vm.get(i) if i in bounds_dict: var_args["limit_{}".format(i)] = bounds_dict[i] var_args["error_" + i] = 0.1 f_g = Cached_FG(fcn.nll_grad) m = Minuit( f_g.fun, name=var_names, errordef=0.5, grad=f_g.grad, print_level=2, use_array_call=True, **var_args, ) print("########## begin MIGRAD") now = time.time() m.migrad() # (ncall=10000))#,precision=5e-7)) print("MIGRAD Time", time.time() - now) if hesse: print("########## begin HESSE") now = time.time() m.hesse() print("HESSE Time", time.time() - now) if minos: print("########## begin MINOS") now = time.time() m.minos() # (var="") print("MINOS Time", time.time() - now) ndf = len(m.list_of_vary_param()) ret = FitResult( dict(m.values), fcn, m.fval, ndf=ndf, success=m.migrad_ok() ) ret.set_error(dict(m.errors)) return ret
[docs] def fit_minuit_v2(fcn, bounds_dict={}, hesse=True, minos=False, **kwargs): """ :param fcn: :param bounds_dict: :param hesse: :param minos: :return: """ from iminuit import Minuit var_args = {} var_names = fcn.vm.trainable_vars x0 = [] for i in var_names: x0.append(fcn.vm.get(i)) var_args[i] = fcn.vm.get(i) if i in bounds_dict: var_args["limit_{}".format(i)] = bounds_dict[i] # var_args["error_" + i] = 0.1 f_g = Cached_FG(fcn.nll_grad) m = Minuit( f_g.fun, np.array(x0), name=var_names, grad=f_g.grad, ) m.strategy = 0 for i in var_names: if i in bounds_dict: m.limits[i] = bounds_dict[i] m.errordef = 0.5 m.print_level = 2 print("########## begin MIGRAD") now = time.time() m.migrad() # (ncall=10000))#,precision=5e-7)) print("MIGRAD Time", time.time() - now) if hesse: print("########## begin HESSE") now = time.time() m.hesse() print("HESSE Time", time.time() - now) if minos: print("########## begin MINOS") now = time.time() m.minos() # (var="") print("MINOS Time", time.time() - now) ndf = len(var_names) ret = FitResult( dict(zip(var_names, m.values)), fcn, m.fval, ndf=ndf, success=m.valid ) # print(m.errors) ret.set_error(dict(zip(var_names, m.errors))) return ret
[docs] def fit_root_fitter(fcn): from array import array import ROOT var_names = fcn.vm.trainable_vars x0 = [] for i in var_names: x0.append(fcn.vm.get(i)) f_g = Cached_FG(fcn.nll_grad) class MyMultiGenFCN(ROOT.Math.IMultiGenFunction): def NDim(self): return len(x0) def DoEval(self, x): return f_g.fun(x) def Clone(self): x = MyMultiGenFCN() ROOT.SetOwnership(x, False) return x fitter = ROOT.Fit.Fitter() myMultiGenFCN = MyMultiGenFCN() params = array("d", x0) fitter.FitFCN(myMultiGenFCN, params) fit_result = fitter.Result() x = dict(zip(var_names, [fit_result.Parameter(i) for i in range(len(x0))])) xerr = dict(zip(var_names, [fit_result.Error(i) for i in range(len(x0))])) ndf = len(var_names) ret = FitResult( x, fcn, fit_result.MinFcnValue(), ndf=ndf, success=fit_result.IsValid() ) ret.set_error(xerr) return ret
[docs] def fit_scipy( fcn, method="BFGS", bounds_dict={}, check_grad=False, improve=False, maxiter=None, jac=True, callback=None, standard_complex=True, grad_scale=1.0, gtol=1e-3, constraints=None, ): """ :param fcn: :param method: :param bounds_dict: :param kwargs: :return: """ gtol *= grad_scale args_name = fcn.vm.trainable_vars x0 = [] bnds = [] for name, i in zip(args_name, fcn.vm.trainable_variables): x0.append(i.numpy()) if name in bounds_dict: bnds.append(bounds_dict[name]) else: bnds.append((None, None)) points = [] nlls = [] hess_inv = None now = time.time() if maxiter is None: maxiter = max(100 * len(x0), 2000) min_nll = 0.0 ndf = fcn.vm.get_all_val(True) # maxiter = 0 def v_g2(x0): f_g = fcn.vm.trans_fcn_grad(fcn.nll_grad) nll, gs0 = f_g(x0) gs = [] for i, name in enumerate(args_name): x0[i] += 1e-5 nll0, _ = f_g(x0) x0[i] -= 2e-5 nll1, _ = f_g(x0) x0[i] += 1e-5 gs.append((nll0 - nll1) / 2e-5) return nll, np.array(gs) if check_grad: print("checking gradients ...") f_g = fcn.vm.trans_fcn_grad(fcn.nll_grad) nll, gs0 = f_g(x0) _, gs = v_g2(x0) for i, name in enumerate(args_name): print(args_name[i], gs[i], gs0[i]) callback_inner = lambda x, y: None if callback is not None: callback_inner = callback if method in ["BFGS", "CG", "Nelder-Mead", "test"]: def callback(x): if np.fabs(x).sum() > 1e7: x_p = dict(zip(args_name, x)) raise LargeNumberError("x too large: {}".format(x_p)) points.append(fcn.vm.get_all_val()) nlls.append(float(fcn.cached_nll)) # if len(nlls) > maxiter: # with open("fit_curve.json", "w") as f: # json.dump({"points": points, "nlls": nlls}, f, indent=2) # pass # raise Exception("Reached the largest iterations: {}".format(maxiter)) callback_inner(x, fcn) print(fcn.cached_nll) # bd = Bounds(bnds) fcn.vm.set_bound(bounds_dict) if fcn.vm.bnd_dic: print("##boundary: ") for k, v in fcn.vm.bnd_dic.items(): print(" ", k, "\t", v) f_g = fcn.vm.trans_fcn_grad(fcn.nll_grad) f_g = Cached_FG(f_g, grad_scale=grad_scale) # print(f_g) x0 = np.array(fcn.vm.get_all_val(True)) if len(x0) == 0: min_nll, _ = f_g(x0) params = fcn.get_params() return FitResult( params, fcn, min_nll, ndf=0, success=True, hess_inv=None ) # s = minimize(f_g, x0, method='trust-constr', jac=True, hess=BFGS(), options={'gtol': 1e-4, 'disp': True}) if constraints is not None: s = minimize( f_g, x0, jac=True, constraints=constraints, callback=callback, options={"disp": True}, ) elif method == "test": try: s = my_minimize( f_g, x0, method=method, jac=True, callback=callback, options={"disp": 1, "gtol": gtol, "maxiter": maxiter}, ) except LargeNumberError: return except_result(fcn, x0.shape[0]) elif jac is not True: try: s = minimize( lambda x: float(fcn(x)), x0, method=method, jac=jac, callback=callback, options={"disp": 1, "gtol": gtol, "maxiter": maxiter}, ) except LargeNumberError: return except_result(fcn, x0.shape[0]) else: try: s = minimize( f_g, x0, method=method, jac=True, callback=callback, options={"disp": 1, "gtol": gtol, "maxiter": maxiter}, ) except LargeNumberError: return except_result(fcn, x0.shape[0]) while improve and not s.success: min_nll = s.fun maxiter -= s.nit s = minimize( f_g, s.x, method=method, jac=True, callback=callback, options={"disp": 1, "gtol": gtol * 10, "maxiter": maxiter}, ) if hasattr(s, "hess_inv"): edm = np.dot(np.dot(s.hess_inv, s.jac), s.jac) else: break if edm < 1e-5: print("edm: ", edm) s.message = "Edm allowed" break if abs(s.fun - min_nll) < 1e-3: break fcn.vm.set_trans_var(s.x) # make sure fit results same as variable print(s) # xn = s.x # fcn.vm.get_all_val() # bd.get_y(s.x) # fcn.vm.set_all(s.x, True) ndf = s.x.shape[0] min_nll = s.fun / grad_scale success = s.success hess_inv = None if hasattr(s, "hess_inv"): hess_inv = fcn.vm.trans_error_matrix(s.hess_inv * grad_scale, s.x) fcn.vm.remove_bound() xn = fcn.vm.get_all_val() elif method in ["L-BFGS-B"]: def callback(x): if np.fabs(x).sum() > 1e7: x_p = dict(zip(args_name, x)) raise LargeNumberError("x too large: {}".format(x_p)) points.append([float(i) for i in x]) nlls.append(float(fcn.cached_nll)) try: s = minimize( fcn.nll_grad, np.array(x0), method=method, jac=True, bounds=bnds, callback=callback, options={ "disp": 1, "maxcor": 50, "ftol": 1e-15, "maxiter": maxiter, }, ) except LargeNumberError: return except_result(fcn, len(x0)) xn = s.x fcn.vm.set_var(xn) print(s) ndf = s.x.shape[0] min_nll = s.fun success = s.success elif method in ["Newton-CG", "trust-krylov", "trust-ncg", "trust-exact"]: fcn.vm.set_bound(bounds_dict) return fit_newton_cg(fcn, method, False) elif method in ["Newton-CG-p", "trust-krylov-p", "trust-ncg-p"]: fcn.vm.set_bound(bounds_dict) return fit_newton_cg(fcn, method[:-2], True) elif method in ["iminuit"]: m = fit_minuit(fcn) return m elif method in ["root"]: m = fit_root_fitter(fcn) return m else: raise Exception("unknown method") if check_grad: print("checking gradients ...") f_g = fcn.vm.trans_fcn_grad(fcn.nll_grad) _, gs0 = f_g(xn) gs = [] for i, name in enumerate(args_name): xn[i] += 1e-5 nll0, _ = f_g(xn) xn[i] -= 2e-5 nll1, _ = f_g(xn) xn[i] += 1e-5 gs.append((nll0 - nll1) / 2e-5) print(args_name[i], gs[i], gs0[i]) if standard_complex: fcn.vm.standard_complex() params = fcn.get_params() # vm.get_all_dic() return FitResult( params, fcn, min_nll, ndf=ndf, success=bool(success), hess_inv=hess_inv )
[docs] def fit_newton_cg( fcn, method="Newton-CG", use_hessp=False, check_hess=False, gtol=1e-4 ): vm = fcn.vm points = [] def callback(x): points.append(fcn.vm.get_all_val()) # if len(nlls) > maxiter: # with open("fit_curve.json", "w") as f: # json.dump({"points": points, "nlls": nlls}, f, indent=2) # pass # raise Exception("Reached the largest iterations: {}".format(maxiter)) print(fcn.cached_nll) # bd = Bounds(bnds) f_g = vm.trans_fcn_grad(fcn.nll_grad) if use_hessp: hessp = vm.trans_grad_hessp(fcn.grad_hessp) else: hess = vm.trans_f_grad_hess(fcn.nll_grad_hessian) hess = time_print(hess) f_g = Cached_FG(f_g) x0 = np.array(vm.get_all_val(True)) if check_hess: # check if hessp works well # hess(x0, x0) gs = [] for i, _ in enumerate(x0): x0[i] += 1e-3 _, g1 = f_g(x0) x0[i] -= 2e-3 _, g2 = f_g(x0) x0[i] += 1e-3 gs.append((g1 - g2) / 2e-3) gs = np.array(gs) # print(gs) if use_hessp: p = np.random.random(x0.shape) print("check", hessp(x0, p)[1], "==", np.dot(gs, p)) p = np.random.random(x0.shape) print("check", hessp(x0, p)[1], "==", np.dot(gs, p)) else: print(hess(x0)[2] - gs) if use_hessp: s = minimize( f_g, x0, jac=True, hessp=lambda x, p: hessp(x, p)[1], method=method ) else: s = minimize( f_g, x0, jac=True, hess=lambda x: hess(x)[2], method=method ) fcn.vm.set_trans_var(s.x) xn = s.x ndf = s.x.shape[0] min_nll = s.fun if not s.success: if np.min(np.abs(s.jac)) < gtol: s.success = True s.message = s.message + "\n But gradients allow" success = s.success print(s) # fcn.vm.set_all(xn) params = fcn.get_params() return FitResult( params, fcn, min_nll, ndf=ndf, success=success, hess_inv=None )
[docs] def except_result(fcn, ndf): params = fcn.vm.get_all_dic() return FitResult( params, fcn, float(fcn.cached_nll), ndf=ndf, success=False )
# import pymultinest
[docs] def fit_multinest(fcn): pass
[docs] class FitResult(object): def __init__( self, params, fcn, min_nll, ndf=0, success=True, hess_inv=None ): self.params = params self.error = {} self.fcn = fcn self.min_nll = float(min_nll) self.ndf = int(ndf) self.success = success self.hess_inv = hess_inv self.extra = {}
[docs] def save_as(self, file_name, save_hess=False): s = { "value": self.params, "error": self.error, "status": { "success": self.success, "NLL": self.min_nll, "Ndf": self.ndf, **self.extra, }, } if save_hess and self.hess_inv is not None: s["free_params"] = [str(i) for i in self.error] s["hess_inv"] = [[float(j) for j in i] for i in self.hess_inv] with open(file_name, "w") as f: json.dump(s, f, indent=2)
[docs] def set_error(self, error): self.error = error.copy()