Source code for tf_pwa.mass_cons

import numpy as np

# Minkowski metric (1, -1, -1, -1)
METRIC = np.array([1.0, -1.0, -1.0, -1.0])


[docs] def mass_sq(p4): """Compute invariant mass squared: m^2 = E^2 - px^2 - py^2 - pz^2""" return np.sum(p4 * p4 * METRIC, axis=-1)
[docs] class MassCons: """ Kinematic constraint model for particle physics. Always constrains individual particle masses. Optionally adds extra mass constraints (e.g., total mass). """ def __init__(self, n_particles, mass_constraints=None): """ Parameters: ----------- n_particles : int Number of particles in the fit mass_constraints : list of tuples or None, optional Extra mass constraints beyond individual particles. Each tuple contains particle indices to combine for a mass constraint. Example: [(0,1,2)] adds total mass constraint on particles 0, 1, 2. If None, only individual particle masses are constrained. Note: If momentum_target is used in do_constraints(), avoid adding total mass constraint here as it's already implied by fixed 4-momentum. """ self.n_particles = n_particles self.n_mass_constraints = n_particles if mass_constraints: self.n_mass_constraints += len(mass_constraints) self.n_dim = n_particles * 4 # Pre-compute indices: individual particles + extra constraints self._mass_indices_list = [ np.array([i], dtype=np.int32) for i in range(n_particles) ] if mass_constraints: for c in mass_constraints: self._mass_indices_list.append(np.array(c, dtype=np.int32))
[docs] def do_constraints( self, x, mass_targets_sq=None, momentum_target=None, max_iter=50, tol=1e-12, chunk_size=2000, ): """ Apply kinematic constraints using Newton-Raphson iteration. Parameters: ----------- x : ndarray Input 4-momenta, shape (n_events, n_particles, 4) mass_targets_sq : ndarray or None, optional Target mass-squared values, shape (n_mass_constraints,) or (n_events, n_mass_constraints). If None, computed from the mean of input data for each constraint (individual particles and combined masses). Input data should already be close to physical values for meaningful results. momentum_target : ndarray or None, optional Target total 4-momentum [E, px, py, pz], shape (4,) or (n_events, 4). If None, no momentum constraint is applied. max_iter : int Maximum number of iterations tol : float Convergence tolerance chunk_size : int Number of events per chunk. Default: 2000. Returns: -------- ndarray : Adjusted 4-momenta """ n_events = x.shape[0] n_mass = self.n_mass_constraints if mass_targets_sq is None: mass_targets_sq = [] for idx in self._mass_indices_list: total_p = np.sum(x[:, idx], axis=1) mass_targets_sq.append(np.mean(mass_sq(total_p))) mass_targets_sq = np.stack(mass_targets_sq, axis=-1) # Broadcast targets upfront mass_arr = self._broadcast_to_events(mass_targets_sq, n_events, n_mass) mom_arr = None if momentum_target is not None: mom_arr = self._broadcast_to_events(momentum_target, n_events, 4) # No chunking needed for small datasets if chunk_size >= n_events: return self._do_chunk(x, mass_arr, mom_arr, max_iter, tol) # Process in chunks result = np.empty_like(x) n_chunks = (n_events + chunk_size - 1) // chunk_size for i in range(n_chunks): start = i * chunk_size end = min((i + 1) * chunk_size, n_events) chunk_mom = mom_arr[start:end] if mom_arr is not None else None result[start:end] = self._do_chunk( x[start:end], mass_arr[start:end], chunk_mom, max_iter, tol ) return result
def _broadcast_to_events(self, arr, n_events, last_dim): """Broadcast array to (n_events, last_dim) shape.""" arr = np.asarray(arr) if arr.ndim == 1: return np.broadcast_to(arr, (n_events, last_dim)).copy() elif arr.shape[0] == 1: return np.broadcast_to(arr[0], (n_events, last_dim)).copy() return arr def _do_chunk(self, x, mass_targets_sq, momentum_target, max_iter, tol): """Process a single chunk of events. Arrays already broadcast to (n_events, dim).""" n_events = x.shape[0] n_particles = self.n_particles n_mass = self.n_mass_constraints n_dim = self.n_dim if momentum_target is not None: n_constraints = n_mass + 4 else: n_constraints = n_mass z = x.copy().reshape(n_events, n_dim) # Pre-allocate arrays J = np.zeros((n_events, n_constraints, n_dim)) c_vals = np.zeros((n_events, n_constraints)) eye_diag = np.eye(n_constraints) * 1e-12 mass_indices_list = self._mass_indices_list z_p = z.reshape(n_events, n_particles, 4) for iteration in range(max_iter): # === Mass constraints === for c_idx in range(n_mass): idx = mass_indices_list[c_idx] total_p = np.sum(z_p[:, idx], axis=1) c_vals[:, c_idx] = mass_sq(total_p) - mass_targets_sq[:, c_idx] grad = 2.0 * total_p * METRIC for pid in idx: J[:, c_idx, 4 * pid : 4 * pid + 4] = grad # Total momentum total_p_all = np.sum(z_p, axis=1) # === Momentum constraints === if momentum_target is not None: c_vals[:, n_mass:] = total_p_all - momentum_target for comp in range(4): J[:, n_mass + comp, comp::4] = 1.0 # Check convergence if np.max(np.abs(c_vals)) < tol: break # Newton step JJT = np.matmul(J, J.transpose(0, 2, 1)) JJT += eye_diag lambda_vals = np.linalg.solve( JJT, c_vals[:, :, np.newaxis] ).squeeze(-1) z -= np.matmul( J.transpose(0, 2, 1), lambda_vals[:, :, np.newaxis] ).squeeze(-1) z_p = z.reshape(n_events, n_particles, 4) return z.reshape(n_events, n_particles, 4)