"""
adaptive split data into bins.
"""
import functools
import numpy as np
from .data import data_index, data_mask
try:
import matplotlib.patches as mpathes
except ImportError:
pass
[docs]
class AdaptiveBound(object):
"""adaptive bound cut for data value"""
def __init__(self, base_data, bins, base_bound=None):
self._base_bound = base_bound
if isinstance(bins, int):
self._base_data = np.array([base_data])
self.bins = [[bins]]
elif isinstance(bins, list):
self._base_data = np.array(base_data)
self.bins = bins
else:
raise TypeError(
"bins should be int or list of int, "
+ "insteading of {}".format(type(bins))
)
[docs]
@functools.lru_cache()
def get_bounds_data(self):
"""get split data bounds, and the data after splitting"""
if self._base_bound is None:
self._base_bound = AdaptiveBound.base_bound(self._base_data)
bounds, datas = AdaptiveBound.loop_split_bound(
self._base_data, self.bins, self._base_bound
)
return bounds, datas
[docs]
def get_bounds(self):
"""get split data bounds"""
bounds, _ = self.get_bounds_data()
return bounds
[docs]
def get_bool_mask(self, data):
"""bool mask for splitting data"""
bounds = self.get_bounds()
idx_data = data[: bounds[0][0].shape[0]]
ret = []
for lb, rb in bounds:
mask = np.logical_and(
idx_data >= lb[..., np.newaxis], idx_data < rb[..., np.newaxis]
)
mask = np.all(mask, axis=0)
ret.append(mask)
return ret
[docs]
def split_full_data(self, data, base_index=None):
"""split structure data, (TODO because large IO, the method is slow.)"""
base_data = [[data_index(data, i) for i in base_index]]
mask = self.get_bool_mask(base_data)
ret = []
for i in mask:
ret.append(data_mask(data, i))
return ret
[docs]
def split_data(self, data):
"""split data, the shape is same as base_data"""
mask = self.get_bool_mask(data)
ret = []
for i in mask:
ret.append(data[..., i])
return ret
[docs]
@staticmethod
def single_split_bound(data, n=2, base_bound=None):
"""split data in the order of data value
>>> data = np.array([1.0, 2.0, 1.4, 3.1])
>>> bound = AdaptiveBound.single_split_bound(data)
>>> [(float(i[0]+1e-6), float(i[1]+1e-6)) for i in bound]
[(1.0..., 1.7...), (1.7..., 3.1...)]
"""
if base_bound is None:
base_bound = np.min(data), np.max(data) + 1e-6
num_lb = base_bound[0]
bounds = []
for j in range(1, n):
num_rb = np.percentile(data, j / n * 100, axis=0) + 1e-6
bounds.append((num_lb, num_rb))
num_lb = num_rb
bounds.append((num_lb, base_bound[1]))
return bounds
[docs]
@staticmethod
def multi_split_bound(datas, n, base_bound=None):
"""multi data for single_split_bound, so `n` is list of int
>>> data = np.array([[1.0, 2.0, 1.4, 3.1], [2.0, 1.0, 3.0, 1.0]])
>>> bound, _ = AdaptiveBound.multi_split_bound(data, [2, 1])
>>> [(float(i[0][0]+1e-6), float(i[1][0]+1e-6)) for i in bound]
[(1.0..., 1.7...), (1.7..., 3.1...)]
"""
datas = np.array(datas)
if base_bound is None:
base_bound = AdaptiveBound.base_bound(datas)
bound_chain = [base_bound]
data_chain = [datas]
for idx, size in enumerate(n):
new_bound_chain = []
new_data_chain = []
for bnd, data in zip(bound_chain, data_chain):
idx_data = data[idx]
bounds = AdaptiveBound.single_split_bound(
idx_data, size, base_bound=(bnd[0][idx], bnd[1][idx])
)
for i in bounds:
lb, rb = i
l_bnd, r_bnd = bnd
l_bnd = np.copy(l_bnd)
r_bnd = np.copy(r_bnd)
l_bnd[idx] = lb
r_bnd[idx] = rb
new_bound_chain.append((l_bnd, r_bnd))
mask = np.logical_and(idx_data >= lb, idx_data < rb)
new_data_chain.append(data[:, mask])
bound_chain = new_bound_chain
data_chain = new_data_chain
return bound_chain, data_chain
[docs]
@staticmethod
def loop_split_bound(datas, n, base_bound=None):
"""loop for multi_split_bound, so `n` is list of list of int"""
datas = np.array(datas)
if base_bound is None:
base_bound = AdaptiveBound.base_bound(datas)
bound_chain = [base_bound]
data_chain = [datas]
for idx, size in enumerate(n):
new_bound_chain = []
new_data_chain = []
for bnd, data in zip(bound_chain, data_chain):
bound, data_i = AdaptiveBound.multi_split_bound(
data, size, base_bound=bnd
)
new_bound_chain += bound
new_data_chain += data_i
bound_chain = new_bound_chain
data_chain = new_data_chain
return bound_chain, data_chain
[docs]
@staticmethod
def base_bound(data):
"""base bound for the data"""
lb = np.min(data, axis=-1) - 1e-6
rb = np.max(data, axis=-1) + 1e-6
return (lb, rb)
[docs]
def get_bound_patch(self, **kwargs):
ret = []
for i, bnd in enumerate(self.get_bounds()):
min_x, min_y = bnd[0]
max_x, max_y = bnd[1]
rect = mpathes.Rectangle(
(min_x, min_y), max_x - min_x, max_y - min_y, **kwargs
) # cmap(weights[i]/max_weight))
ret.append(rect)
return ret
[docs]
def plot_bound(self, ax, **kwargs):
for i in self.get_bound_patch(**kwargs):
ax.add_patch(i)
[docs]
def binning_shape_function(m, bins):
adp = AdaptiveBound(m, bins)
bnds, n = adp.get_bounds_data()
x1 = [i[0][0] for i in bnds]
x = np.array(x1 + [bnds[-1][1][0]])
y1 = [ni.shape[0] / (x[i + 1] - x[i]) for i, ni in enumerate(n)]
y2 = [(y1[i] + y1[i + 1]) / 2 for i in range(len(y1) - 1)]
y = np.array([y1[0]] + y2 + [y1[-1]])
return x, y
[docs]
def adaptive_shape(m, bins, xmin, xmax):
x, y = binning_shape_function(m, bins)
cut = (x < xmax) & (x >= xmin)
x = x[cut]
y = y[cut]
y[0] = y[0] * (x[1] - xmin) / (x[1] - x[0])
y[-1] = y[-1] * (x[-2] - xmax) / (x[-2] - x[-1])
x[0] = xmin
x[-1] = xmax
from tf_pwa.generator.linear_interpolation import LinearInterp
return LinearInterp(x, y)
[docs]
def cal_chi2(numbers, n_fp):
weights = []
# print(numbers)
# chi21 = []
for ndata, nmc in numbers:
weight = (ndata - nmc) / np.sqrt(np.abs(ndata))
weights.append(weight**2)
# chi21.append(ndata * np.log(nmc))
max_weight = np.max(weights)
chi2 = np.sum(weights)
print("bins: ", len(weights))
print("number of free parameters: ", n_fp)
ndf = len(weights) - 1 - n_fp
print(
"chi2/ndf: ", np.sum(weights), "/", ndf
) # ,"another", np.sum(chi21))
return chi2, ndf