"""
Phi for Python (pyPhi) — Version 2.0
By Sal Garcia (sgarciam@ic.ac.uk salvadorgarciamunoz@gmail.com)
Added Feb 23 2026
* Added _validate_inputs function for input validation and observation reconciliation
* Integrated validation into pca, pls, lpls entry points
* Replaced np.tile with numpy broadcasting throughout
* Optimized _Ab_btbinv with fast path for complete data
* var_t (score covariance matrix) stored in model objects to avoid recalculation
* Added _extract_array and _calc_r2 helper functions to reduce duplication
* Replaced hardcoded F-distribution and chi2 lookup tables with scipy.stats
* Replaced hardcoded t-distribution with scipy.stats
Added Feb 07 2026
* fixed cat_2_matrix for the output to be consistent with MBPLS
Added Jan 30 2025
* Added a pinv alternative protection in spectra_savgol for the case where
inv fails
Added Jan 20 2025
* Added the 'cca' flag to the pls routine to calculate CCA between
the Ts and each of the Ys (one by one), calculating loadings and scores
equivalent to a perfectly orthogonalized OPLS model. The covariant scores (Tcv)
the covariant Loadings (Pcv) and predictive weights (Wcv) are added
as keys to the model object.
[The covariant loadings(Pcv) are equivalent to the predictive loadings in OPLS]
* Added cca and cca-multi routines to perform PLS-CCA (alternative to OPLS)
as of now cca-multi remains unused.
Added Nov 18th, 2024
* replaced interp2d with RectBivariateSpline
* Protected SPE lim calculations for near zero residuals
* Added build_polynomial function to create linear regression
models with variable selection assited by PLS
by merge from James
* Added spectra preprocessing methods
* bootstrap PLS
by Salvador Garcia (sgarciam@ic.ac.uk salvadorgarciamunoz@gmail.com)
Added Dec 19th 2023
* phi.clean_htmls removes all html files in the working directory
* clean_empty_rows returns also the names of the rows removed
Added May 1st
* YMB is now added in the same structure as the XMB
* Corrected the dimensionality of the lwpls prediction, it was a double-nested array.
Added Apr 30
* Modified Multi-block PLS to include the block name in the variable name
Added Apr 29
* Included the unique routine and adjusted the parse_materials routine so materials
and lots are in the same order as in the raw data
Added Apr 27
* Enhanced adapt_pls_4_pyomo to use variable names as indices if flag is sent
Added Apr 25
* Enhanced the varimax_rotation to adjust the r2 and r2pv to the rotated loadings
Added Apr 21
* Re added varimax_rotation with complete model rotation for PCA and PLS
Added Apr 17
* Added tpls and tpls_pred
Added Apr 15
* Added jrpls model and jrpls_pred
* Added routines to reconcile columns to rows identifier so that X and R materices
correspond correctly
* Added routines to reconcile rows across a list of dataframes and produces a list
of dataframes containing only those observations present in all dataframes
Added on Apr 9 2023
* Added lpls and lpls_pred routines
* Added parse_materials to read linear table and produce R or Ri
Release as of Nov 23 2022
* Added a function to export PLS model to gPROMS code
Release as of Aug 22 2022
* Fixed access to NEOS server and use of GAMS instead of IPOPT
Release as of Aug 12 2022
* Fixed the SPE calculations in pls_pred and pca_pred
* Changed to a more efficient inversion in pca_pred (=pls_pred)
* Added a pseudo-inverse option in pmp for pca_pred
Release as of Aug 2 2022
* Added replicate_data
"""
import numpy as np
import pandas as pd
import datetime
from scipy.special import factorial
from scipy.stats import norm, f as f_dist, chi2
from scipy.optimize import fsolve
from scipy.interpolate import RectBivariateSpline
from scipy.stats import t as t_dist
from shutil import which
import os
from numpy import eye, asarray, dot, sum, diag
from numpy.linalg import svd
import matplotlib.pyplot as plt
from statsmodels.distributions.empirical_distribution import ECDF
os.environ['NEOS_EMAIL'] = 'pyphisoftware@gmail.com'
try:
from pyomo.environ import *
pyomo_ok = True
except ImportError:
pyomo_ok = False
if bool(which('gams')):
gams_ok = True
else:
gams_ok = False
ipopt_ok = bool(which('ipopt'))
if pyomo_ok and gams_ok:
from pyomo.solvers.plugins.solvers.GAMS import GAMSDirect, GAMSShell
gams_ok = (GAMSDirect().available(exception_flag=False)
or GAMSShell().available(exception_flag=False))
[docs]
def ma57_dummy_check():
m = ConcreteModel()
m.x = Var()
m.Obj = Objective(expr = m.x**2 -1)
s = SolverFactory('ipopt')
s.options['linear_solver'] = 'ma57'
import logging
pyomo_logger = logging.getLogger('pyomo.core')
LOG_DEFAULT = pyomo_logger.level
pyomo_logger.setLevel(logging.ERROR)
r = s.solve(m)
pyomo_logger.setLevel(LOG_DEFAULT)
ma57_ok = r.solver.status == SolverStatus.ok
if ma57_ok:
print("MA57 available to IPOPT")
return ma57_ok
if pyomo_ok and ipopt_ok:
ma57_ok = ma57_dummy_check()
else:
ma57_ok = False
if not(pyomo_ok) or (not(ipopt_ok) and not(gams_ok)):
print('Will be using the NEOS server in the absence of IPOPT and GAMS')
# =============================================================================
# Helper functions (new in v1.0)
# =============================================================================
def _extract_array(X):
"""Extract numpy array, observation IDs, and variable IDs from DataFrame or ndarray.
Args:
X: numpy array or pandas DataFrame (first column = obs IDs)
Returns:
(array, obsid, varid) where obsid/varid are False if input is ndarray
"""
if isinstance(X, np.ndarray):
return X.copy(), False, False
elif isinstance(X, pd.DataFrame):
arr = np.array(X.values[:, 1:]).astype(float)
obsid = X.values[:, 0].astype(str).tolist()
varid = X.columns.values[1:].tolist()
return arr, obsid, varid
def _calc_r2(residual, TSS, TSSpv, prev_r2=None, prev_r2pv=None):
"""Calculate cumulative R2 and R2 per variable, optionally appending to previous.
Args:
residual: residual matrix after deflation
TSS: total sum of squares (scalar)
TSSpv: total sum of squares per variable (1D array)
prev_r2: previous cumulative R2 array (None for first component)
prev_r2pv: previous cumulative R2pv matrix (None for first component)
Returns:
(r2, r2pv) updated arrays
"""
r2_new = 1 - np.sum(residual**2) / TSS
r2pv_new = (1 - np.sum(residual**2, axis=0) / TSSpv).reshape(-1, 1)
if prev_r2 is None:
return r2_new, r2pv_new
return np.hstack((prev_r2, r2_new)), np.hstack((prev_r2pv, r2pv_new))
def _r2_cumulative_to_per_component(r2, r2pv, A):
"""Convert cumulative R2 to per-component R2."""
for a in range(A-1, 0, -1):
r2[a] = r2[a] - r2[a-1]
r2pv[:, a] = r2pv[:, a] - r2pv[:, a-1]
return r2, r2pv
def _Ab_btbinv(A, b, A_not_nan_map):
"""Project c = Ab / (b'b), accounting for missing data.
Optimized: uses broadcasting instead of np.tile, with fast path
when there is no missing data.
"""
b_flat = b.ravel()
numer = A @ b_flat
# Fast path: no missing data
if A_not_nan_map.all():
return (numer / np.dot(b_flat, b_flat)).reshape(-1, 1)
denom = np.sum((A_not_nan_map * b_flat)**2, axis=1)
return (numer / denom).reshape(-1, 1)
# =============================================================================
# Input validation (new in v2.0)
# =============================================================================
def _validate_inputs(X, Y=None, A=None, mcs=None):
"""Validate and reconcile inputs for PCA and PLS model building.
Checks performed:
- X (and Y if provided) are DataFrame or ndarray
- DataFrames have a string/object first column (observation IDs)
- No duplicate observation IDs
- X and Y observation IDs match one-to-one (reorders Y if needed)
- A does not exceed rank limits
- mcs flag is a recognized value
Args:
X: pandas DataFrame or numpy ndarray
Y: pandas DataFrame, numpy ndarray, or None (PCA case)
A: int, number of components requested, or None to skip check
mcs: mcs flag value(s) to validate, or None to skip check
For PLS, pass as tuple: (mcsX, mcsY)
Returns:
X, Y — potentially reordered so observations align.
If Y is None, returns (X, None).
Raises:
ValueError: on validation failures that cannot be auto-corrected
"""
_valid_mcs = {True, False, 'center', 'autoscale'}
# --- Validate types ---
if not isinstance(X, (np.ndarray, pd.DataFrame)):
raise ValueError(
f"X must be a pandas DataFrame or numpy ndarray, got {type(X).__name__}")
if Y is not None and not isinstance(Y, (np.ndarray, pd.DataFrame)):
raise ValueError(
f"Y must be a pandas DataFrame or numpy ndarray, got {type(Y).__name__}")
# --- Validate mcs flags ---
if mcs is not None:
mcs_values = mcs if isinstance(mcs, tuple) else (mcs,)
for m in mcs_values:
if m not in _valid_mcs:
raise ValueError(
f"mcs value '{m}' not recognized. Must be one of: True, False, 'center', 'autoscale'")
# --- Validate A ---
if A is not None:
if not isinstance(A, (int, np.integer)) or A < 1:
raise ValueError(f"A must be a positive integer, got {A}")
# --- Helper: check DataFrame structure ---
def _check_df_structure(df, name):
if df.shape[1] < 2:
raise ValueError(
f"{name} DataFrame must have at least 2 columns "
f"(1 obs ID column + 1 data column), got {df.shape[1]}")
data_cols = df.iloc[:, 1:]
non_numeric = []
for col in data_cols.columns:
try:
data_cols[col].astype(float)
except (ValueError, TypeError):
non_numeric.append(col)
if non_numeric:
raise ValueError(
f"{name} has non-numeric data columns: {non_numeric[:5]}"
+ (f" ... and {len(non_numeric)-5} more" if len(non_numeric) > 5 else ""))
# --- Helper: check for duplicate obs IDs ---
def _check_duplicates(df, name):
obs_col = df.iloc[:, 0].astype(str)
dupes = obs_col[obs_col.duplicated(keep=False)]
if len(dupes) > 0:
unique_dupes = dupes.unique().tolist()
raise ValueError(
f"{name} has duplicate observation IDs: {unique_dupes[:10]}"
+ (f" ... and {len(unique_dupes)-10} more" if len(unique_dupes) > 10 else ""))
# --- DataFrame-specific validation ---
if isinstance(X, pd.DataFrame):
_check_df_structure(X, "X")
_check_duplicates(X, "X")
if Y is not None and isinstance(Y, pd.DataFrame):
_check_df_structure(Y, "Y")
_check_duplicates(Y, "Y")
# --- Validate A against dimensions ---
if A is not None:
if isinstance(X, pd.DataFrame):
n_rows = X.shape[0]
n_cols = X.shape[1] - 1
else:
n_rows = X.shape[0]
n_cols = X.shape[1]
max_A = min(n_rows, n_cols)
if A > max_A:
raise ValueError(
f"A={A} exceeds max allowable components for X "
f"({n_rows} observations x {n_cols} variables, max A={max_A})")
# --- Reconcile X and Y observations ---
if Y is None:
return X, None
# Both numpy: can only check row count
if isinstance(X, np.ndarray) and isinstance(Y, np.ndarray):
if X.shape[0] != Y.shape[0]:
raise ValueError(
f"X and Y must have the same number of rows. "
f"X has {X.shape[0]}, Y has {Y.shape[0]}")
return X, Y
# One numpy, one DataFrame: check row count only
if isinstance(X, np.ndarray) != isinstance(Y, np.ndarray):
if X.shape[0] != Y.shape[0]:
raise ValueError(
f"X and Y must have the same number of rows. "
f"X has {X.shape[0]}, Y has {Y.shape[0]}. "
f"Cannot reconcile observation order when mixing ndarray and DataFrame.")
print("Warning: Cannot verify observation alignment when mixing "
"ndarray and DataFrame. Ensure rows correspond.")
return X, Y
# Both DataFrames: full reconciliation
x_obs = X.iloc[:, 0].astype(str).tolist()
y_obs = Y.iloc[:, 0].astype(str).tolist()
x_set = set(x_obs)
y_set = set(y_obs)
in_x_only = x_set - y_set
in_y_only = y_set - x_set
common = x_set & y_set
if len(common) == 0:
raise ValueError(
"X and Y share no common observation IDs. "
f"X IDs sample: {x_obs[:5]}, Y IDs sample: {y_obs[:5]}. "
"Check that the first column contains matching observation identifiers.")
if in_x_only:
dropped = sorted(in_x_only)
print(f"Warning: {len(in_x_only)} observation(s) in X not found in Y — dropped: "
+ (', '.join(dropped[:10]) + (f' ... and {len(dropped)-10} more' if len(dropped) > 10 else '')))
X = X[X.iloc[:, 0].astype(str).isin(common)].reset_index(drop=True)
if in_y_only:
dropped = sorted(in_y_only)
print(f"Warning: {len(in_y_only)} observation(s) in Y not found in X — dropped: "
+ (', '.join(dropped[:10]) + (f' ... and {len(dropped)-10} more' if len(dropped) > 10 else '')))
Y = Y[Y.iloc[:, 0].astype(str).isin(common)].reset_index(drop=True)
x_obs = X.iloc[:, 0].astype(str).tolist()
y_obs = Y.iloc[:, 0].astype(str).tolist()
if x_obs == y_obs:
return X, Y
print("Warning: Observation order in Y does not match X. Reordering Y to align with X.")
y_obs_to_idx = {obs: idx for idx, obs in enumerate(y_obs)}
new_order = [y_obs_to_idx[obs] for obs in x_obs]
Y = Y.iloc[new_order].reset_index(drop=True)
return X, Y
# =============================================================================
# Statistical functions (v1.0: scipy.stats replaces lookup tables)
# =============================================================================
[docs]
def f99(i, j):
"""Return the F-distribution critical value at 99% confidence.
Args:
df1 (int): Numerator degrees of freedom.
df2 (int): Denominator degrees of freedom.
Returns:
float: F critical value at alpha = 0.01.
"""
if i <= 0 or j <= 0:
return 1.0
return float(f_dist.ppf(0.99, i, j))
[docs]
def f95(i, j):
"""Return the F-distribution critical value at 95% confidence.
Args:
df1 (int): Numerator degrees of freedom.
df2 (int): Denominator degrees of freedom.
Returns:
float: F critical value at alpha = 0.05.
"""
if i <= 0 or j <= 0:
return 1.0
return float(f_dist.ppf(0.95, i, j))
[docs]
def spe_ci(spe):
"""Estimate SPE control limits from training data using a chi-squared approximation.
Args:
spe_values (ndarray): SPE values from the training set (n_obs × 1).
alpha (float): Confidence level. Default ``0.95`` (also returns 99%).
Returns:
tuple: ``(lim95, lim99)`` — SPE control limits at 95% and 99%.
"""
spem = np.mean(spe)
if spem > 1E-16:
spev = np.var(spe, ddof=1)
g = spev / (2 * spem)
h = (2 * spem**2) / spev
lim95 = g * chi2.ppf(0.95, h)
lim99 = g * chi2.ppf(0.99, h)
else:
lim95 = 0
lim99 = 0
return lim95, lim99
[docs]
def single_score_conf_int(t):
"""Calculate confidence ellipse radii for score scatter plots.
Args:
mvmobj (dict): Fitted PCA or PLS model.
alpha (float): Confidence level. Default ``0.95``.
Returns:
ndarray: Ellipse radii for each pair of scores.
"""
n = t.shape[0]
st = np.var(t, ddof=1)
lim95 = t_dist.ppf(0.975, df=n) * np.sqrt(st)
lim99 = t_dist.ppf(0.995, df=n) * np.sqrt(st)
return lim95, lim99
[docs]
def scores_conf_int_calc(st, N):
"""Calculate per-score univariate confidence intervals.
Args:
mvmobj (dict): Fitted PCA or PLS model.
alpha (float): Confidence level. Default ``0.95``.
Returns:
ndarray: Confidence interval half-widths for each latent variable (A,).
"""
n_points = 100
cte2 = ((N-1)*(N+1)*(2)) / (N*(N-2))
f95_ = cte2 * f95(2, N-2)
f99_ = cte2 * f99(2, N-2)
xd95 = np.sqrt(f95_ * st[0,0])
xd99 = np.sqrt(f99_ * st[0,0])
xd95 = np.linspace(-xd95, xd95, num=n_points)
xd99 = np.linspace(-xd99, xd99, num=n_points)
st = np.linalg.inv(st)
s11, s22, s12, s21 = st[0,0], st[1,1], st[0,1], st[1,0]
a = np.tile(s22, n_points)
b_ = xd95 * np.tile(s12, n_points) + xd95 * np.tile(s21, n_points)
c = (xd95**2) * np.tile(s11, n_points) - f95_
safe_chk = b_**2 - 4*a*c
safe_chk[safe_chk < 0] = 0
yd95p = (-b_ + np.sqrt(safe_chk)) / (2*a)
yd95n = (-b_ - np.sqrt(safe_chk)) / (2*a)
a = np.tile(s22, n_points)
b_ = xd99 * np.tile(s12, n_points) + xd99 * np.tile(s21, n_points)
c = (xd99**2) * np.tile(s11, n_points) - f99_
safe_chk = b_**2 - 4*a*c
safe_chk[safe_chk < 0] = 0
yd99p = (-b_ + np.sqrt(safe_chk)) / (2*a)
yd99n = (-b_ - np.sqrt(safe_chk)) / (2*a)
return xd95, xd99, yd95p, yd95n, yd99p, yd99n
# =============================================================================
# Core utility functions
# =============================================================================
[docs]
def clean_htmls():
'''Deletes all .html files in the current directory.'''
for f in os.listdir('.'):
if 'html' in f:
os.remove(f)
return
[docs]
def z2n(X, X_nan_map):
"""Replace zeros with NaN (zero to NaN).
Args:
X (np.ndarray): Input array.
Returns:
np.ndarray: Array with zeros replaced by ``np.nan``.
"""
X[X_nan_map == 1] = np.nan
return X
[docs]
def n2z(X):
"""Replace NaN with zero (NaN to zero).
Args:
X (np.ndarray): Input array.
Returns:
tuple: ``(X_filled, nan_map)`` — array with NaNs replaced by 0,
and a boolean mask where ``True`` indicates original non-NaN positions.
"""
X_nan_map = np.isnan(X)
if X_nan_map.any():
X_nan_map = X_nan_map * 1
X[X_nan_map==1] = 0
else:
X_nan_map = X_nan_map * 1
return X, X_nan_map
[docs]
def mean(X):
X_nan_map = np.isnan(X)
X_ = X.copy()
if X_nan_map.any():
X_nan_map = X_nan_map * 1
X_[X_nan_map==1] = 0
aux = np.sum(X_nan_map, axis=0)
x_mean = np.sum(X_, axis=0, keepdims=1) / (np.ones((1, X_.shape[1])) * X_.shape[0] - aux)
else:
x_mean = np.mean(X_, axis=0, keepdims=1)
return x_mean
[docs]
def std(X):
x_mean = mean(X)
X_nan_map = np.isnan(X)
if X_nan_map.any():
X_nan_map = X_nan_map * 1
X_ = X.copy()
X_[X_nan_map==1] = 0
aux_mat = (X_ - x_mean)**2
aux_mat[X_nan_map==1] = 0
aux = np.sum(X_nan_map, axis=0)
x_std = np.sqrt((np.sum(aux_mat, axis=0, keepdims=1)) / (np.ones((1, X_.shape[1])) * (X_.shape[0]-1-aux)))
else:
x_std = np.sqrt(np.sum((X - x_mean)**2, axis=0, keepdims=1) / (np.ones((1, X.shape[1])) * (X.shape[0]-1)))
return x_std
[docs]
def meancenterscale(X, *, mcs=True):
"""Mean-center and/or scale a data matrix.
Args:
X (np.ndarray): Data matrix to preprocess (n_obs × n_vars).
mcs (str or bool): Preprocessing mode.
``'autoscale'``: mean-center and scale to unit variance.
``'center'``: mean-center only.
``False``: return unchanged.
Returns:
tuple: ``(X_processed, x_mean, x_std)`` — preprocessed matrix,
column means, and column standard deviations.
"""
if isinstance(mcs, bool):
if mcs:
x_mean = mean(X)
x_std = std(X)
X = X - x_mean
X = X / x_std
else:
x_mean = np.nan
x_std = np.nan
elif mcs == 'center':
x_mean = mean(X)
X = X - x_mean
x_std = np.ones((1, X.shape[1]))
elif mcs == 'autoscale':
x_std = std(X)
X = X / x_std
x_mean = np.zeros((1, X.shape[1]))
else:
x_mean = np.nan
x_std = np.nan
return X, x_mean, x_std
[docs]
def find(a, func):
"""Find row indices where the first column equals a given value.
Args:
X (pd.DataFrame or np.ndarray): Data matrix to search.
value: Value to search for in the first column.
Returns:
list: Row indices where the match was found.
"""
return [i for (i, val) in enumerate(a) if func(val)]
# =============================================================================
# PCA
# =============================================================================
[docs]
def pca(X, A, *, mcs=True, md_algorithm='nipals', force_nipals=True, shush=False, cross_val=0):
"""Fit a Principal Component Analysis (PCA) model.
Supports missing data via NIPALS. Can use SVD for complete
data as well.
Args:
X (pd.DataFrame or np.ndarray): Observations × variables matrix.
If a DataFrame, the first column must contain observation IDs.
A (int): Number of principal components to extract.
mcs (str or bool): Mean-centering/scaling flag.
``'autoscale'`` (default): mean-center and scale to unit variance.
``'center'``: mean-center only.
``False``: no preprocessing.
md_algorithm (str): Missing-data algorithm. ``'nipals'`` (default) or
``'nlp'``.
force_nipals (bool): If ``True``, forces NIPALS even when data is
complete. Default ``False``.
cross_val (int): Cross-validation percentage of elements to remove
per round. ``0`` = no CV, ``100`` = leave-one-out,
``1–99`` = element-wise removal. Default ``0``.
shush (bool): If ``True``, suppresses printed output. Default ``False``.
tolerance (float): NIPALS convergence tolerance. Default ``1e-10``.
maxit (int): Maximum NIPALS iterations per component. Default ``5000``.
Returns:
dict: Fitted PCA model with keys:
- ``T`` (ndarray): Scores matrix (n_obs × A).
- ``P`` (ndarray): Loadings matrix (n_vars × A).
- ``r2x`` (float): Cumulative R² for X.
- ``r2xpv`` (ndarray): Per-variable R² (n_vars × A).
- ``mx`` (ndarray): Variable means used for preprocessing.
- ``sx`` (ndarray): Variable std devs used for preprocessing.
- ``var_t`` (ndarray): Score covariance matrix (A × A).
- ``T2`` (ndarray): Hotelling's T² for training observations.
- ``T2_lim95`` (float): 95% T² control limit.
- ``T2_lim99`` (float): 99% T² control limit.
- ``speX`` (ndarray): X-space SPE for training observations.
- ``speX_lim95`` (float): 95% SPE control limit.
- ``speX_lim99`` (float): 99% SPE control limit.
- ``obsidX`` (list): Observation IDs (only if X was a DataFrame).
- ``varidX`` (list): Variable IDs (only if X was a DataFrame).
- ``q2x`` (float): Cross-validated Q² (only if ``cross_val > 0``).
NLP algorithn for missing data as in:
de la Fuente, R.L.N., García‐Muñoz, S. and Biegler, L.T., 2010.
An efficient nonlinear programming strategy for PCA models with
incomplete data sets. Journal of Chemometrics, 24(6), pp.301-311.
"""
X, _ = _validate_inputs(X, Y=None, A=A, mcs=mcs)
if cross_val == 0:
pcaobj = pca_(X, A, mcs=mcs, md_algorithm=md_algorithm, force_nipals=force_nipals, shush=shush)
pcaobj['type'] = 'pca'
elif 0 < cross_val < 100:
if isinstance(X, np.ndarray):
X_ = X.copy()
elif isinstance(X, pd.DataFrame):
X_ = np.array(X.values[:,1:]).astype(float)
if isinstance(mcs, bool):
if mcs:
X_, x_mean, x_std = meancenterscale(X_)
else:
x_mean = np.zeros((1, X_.shape[1]))
x_std = np.ones((1, X_.shape[1]))
elif mcs == 'center':
X_, x_mean, x_std = meancenterscale(X_, mcs='center')
elif mcs == 'autoscale':
X_, x_mean, x_std = meancenterscale(X_, mcs='autoscale')
X_nan_map = np.isnan(X_)
not_Xmiss = (~X_nan_map) * 1
X_, Xnanmap = n2z(X_)
TSS = np.sum(X_**2)
TSSpv = np.sum(X_**2, axis=0)
cols = X_.shape[1]
rows = X_.shape[0]
X_ = z2n(X_, Xnanmap)
for a in range(A):
if not shush:
print('Cross validating PC #' + str(a+1))
not_removed_map = not_Xmiss.copy().reshape(rows*cols, -1)
Xrnd = np.random.random(X_.shape) * not_Xmiss
indx = np.argsort(Xrnd.ravel())
elements_to_remove = int(np.ceil(rows * cols * (cross_val/100)))
error = np.zeros((rows*cols, 1))
rounds = 1
while np.sum(not_removed_map) > 0:
rounds += 1
X_copy = X_.copy()
if indx.size > elements_to_remove:
indx_this = indx[:elements_to_remove]
indx = indx[elements_to_remove:]
else:
indx_this = indx
X_copy = X_copy.reshape(rows*cols, 1)
elements_out = X_copy[indx_this]
X_copy[indx_this] = np.nan
X_copy = X_copy.reshape(rows, cols)
not_removed_map[indx_this] = 0
auxmap = np.sum(np.isnan(X_copy) * 1, axis=1)
indx2 = np.where(auxmap == X_copy.shape[1])[0].tolist()
if len(indx2) > 0:
X_copy = np.delete(X_copy, indx2, 0)
pcaobj_ = pca_(X_copy, 1, mcs=False, shush=True)
xhat = pcaobj_['T'] @ pcaobj_['P'].T
xhat = np.insert(xhat, indx2, np.nan, axis=0)
xhat = xhat.reshape(rows*cols, 1)
error[indx_this] = elements_out - xhat[indx_this]
error = error.reshape(rows, cols)
error, dummy = n2z(error)
PRESSpv = np.sum(error**2, axis=0)
PRESS = np.sum(error**2)
if a == 0:
q2 = 1 - PRESS/TSS
q2pv = (1 - PRESSpv/TSSpv).reshape(-1, 1)
else:
q2 = np.hstack((q2, 1 - PRESS/TSS))
q2pv = np.hstack((q2pv, (1 - PRESSpv/TSSpv).reshape(-1, 1)))
X_copy = X_.copy()
pcaobj_ = pca_(X_copy, 1, mcs=False, shush=True)
xhat = pcaobj_['T'] @ pcaobj_['P'].T
X_, Xnanmap = n2z(X_)
X_ = (X_ - xhat) * not_Xmiss
r2, r2pv = _calc_r2(X_, TSS, TSSpv,
None if a == 0 else r2,
None if a == 0 else r2pv)
X_ = z2n(X_, Xnanmap)
pcaobj = pca_(X, A, mcs=mcs, force_nipals=True, shush=True)
r2, r2pv = _r2_cumulative_to_per_component(r2, r2pv, A)
q2, q2pv = _r2_cumulative_to_per_component(q2, q2pv, A)
r2xc = np.cumsum(r2)
q2xc = np.cumsum(q2)
eigs = np.var(pcaobj['T'], axis=0)
pcaobj['q2'] = q2
pcaobj['q2pv'] = q2pv
if not shush:
print('phi.pca using NIPALS and cross validation (' + str(cross_val) + '%) executed on: ' + str(datetime.datetime.now()))
print('--------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) Q2X sum(Q2X)')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+": {:8.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2[a], r2xc[a], q2[a], q2xc[a]))
else:
print("PC #1: {:8.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2, r2xc[0], q2, q2xc[0]))
print('--------------------------------------------------------------')
pcaobj['type'] = 'pca'
else:
pcaobj = 'Cannot cross validate with those options'
return pcaobj
[docs]
def pca_(X, A, *, mcs=True, md_algorithm='nipals', force_nipals=True, shush=False):
X_, obsidX, varidX = _extract_array(X)
if isinstance(mcs, bool):
if mcs:
X_, x_mean, x_std = meancenterscale(X_)
else:
x_mean = np.zeros((1, X_.shape[1]))
x_std = np.ones((1, X_.shape[1]))
elif mcs == 'center':
X_, x_mean, x_std = meancenterscale(X_, mcs='center')
elif mcs == 'autoscale':
X_, x_mean, x_std = meancenterscale(X_, mcs='autoscale')
X_nan_map = np.isnan(X_)
not_Xmiss = (~X_nan_map) * 1
if not X_nan_map.any() and not force_nipals and ((X_.shape[1]/X_.shape[0] >= 10) or (X_.shape[0]/X_.shape[1] >= 10)):
if not shush:
print('phi.pca using SVD executed on: ' + str(datetime.datetime.now()))
TSS = np.sum(X_**2)
TSSpv = np.sum(X_**2, axis=0)
if X_.shape[1]/X_.shape[0] >= 10:
[U, S, Th] = np.linalg.svd(X_ @ X_.T)
T = Th.T[:, :A]
P = X_.T @ T
for a in range(A):
P[:, a] = P[:, a] / np.linalg.norm(P[:, a])
T = X_ @ P
elif X_.shape[0]/X_.shape[1] >= 10:
[U, S, Ph] = np.linalg.svd(X_.T @ X_)
P = Ph.T[:, :A]
T = X_ @ P
r2 = None; r2pv = None
for a in range(A):
X_ = X_ - T[:,[a]] @ P[:,[a]].T
r2, r2pv = _calc_r2(X_, TSS, TSSpv, r2, r2pv)
r2, r2pv = _r2_cumulative_to_per_component(r2, r2pv, A)
var_t = (T.T @ T) / T.shape[0]
pca_obj = {'T':T, 'P':P, 'r2x':r2, 'r2xpv':r2pv, 'mx':x_mean, 'sx':x_std, 'var_t':var_t}
if not isinstance(obsidX, bool):
pca_obj['obsidX'] = obsidX
pca_obj['varidX'] = varidX
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2)
if not shush:
print('--------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) ')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+": {:8.3f} {:.3f} {:.3f}".format(eigs[a], r2[a], r2xc[a]))
else:
print("PC #1: {:8.3f} {:.3f} {:.3f}".format(eigs[0], r2, r2xc[0]))
print('--------------------------------------------------------------')
T2 = hott2(pca_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1)
speX_lim95, speX_lim99 = spe_ci(speX)
pca_obj['T2'] = T2
pca_obj['T2_lim99'] = T2_lim99
pca_obj['T2_lim95'] = T2_lim95
pca_obj['speX'] = speX
pca_obj['speX_lim99'] = speX_lim99
pca_obj['speX_lim95'] = speX_lim95
return pca_obj
else:
if md_algorithm == 'nipals':
if not shush:
print('phi.pca using NIPALS executed on: ' + str(datetime.datetime.now()))
X_, dummy = n2z(X_)
epsilon = 1E-10
maxit = 5000
TSS = np.sum(X_**2)
TSSpv = np.sum(X_**2, axis=0)
r2 = None; r2pv = None
for a in range(A):
ti = X_[:, [np.argmax(std(X_))]]
Converged = False
num_it = 0
while not Converged:
# p = X't / (t't) with missing data handling
pi = np.sum(X_ * ti, axis=0) / np.sum((ti * not_Xmiss)**2, axis=0)
pi = pi / np.linalg.norm(pi)
# t_new = Xp / (p'p)
tn = X_ @ pi
ptp = np.sum((pi * not_Xmiss)**2, axis=1)
tn = tn / ptp
pi = pi.reshape(-1, 1)
if abs((np.linalg.norm(ti) - np.linalg.norm(tn))) / np.linalg.norm(ti) < epsilon:
Converged = True
if num_it > maxit:
Converged = True
if Converged:
if (len(ti[ti<0]) > 0) and (len(ti[ti>0]) > 0):
if np.var(ti[ti<0]) > np.var(ti[ti>=0]):
tn = -tn; ti = -ti; pi = -pi
if not shush:
print('# Iterations for PC #'+str(a+1)+': ', str(num_it))
if a == 0:
T = tn.reshape(-1, 1)
P = pi
else:
T = np.hstack((T, tn.reshape(-1, 1)))
P = np.hstack((P, pi))
X_ = (X_ - ti @ pi.T) * not_Xmiss
r2, r2pv = _calc_r2(X_, TSS, TSSpv, r2, r2pv)
else:
num_it += 1
ti = tn.reshape(-1, 1)
if a == 0:
numIT = num_it
else:
numIT = np.hstack((numIT, num_it))
r2, r2pv = _r2_cumulative_to_per_component(r2, r2pv, A)
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2)
if not shush:
print('--------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) ')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+": {:8.3f} {:.3f} {:.3f}".format(eigs[a], r2[a], r2xc[a]))
else:
print("PC #1: {:8.3f} {:.3f} {:.3f}".format(eigs[0], r2, r2xc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
pca_obj = {'T':T, 'P':P, 'r2x':r2, 'r2xpv':r2pv, 'mx':x_mean, 'sx':x_std, 'var_t':var_t}
if not isinstance(obsidX, bool):
pca_obj['obsidX'] = obsidX
pca_obj['varidX'] = varidX
T2 = hott2(pca_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1)
speX_lim95, speX_lim99 = spe_ci(speX)
pca_obj['T2'] = T2
pca_obj['T2_lim99'] = T2_lim99
pca_obj['T2_lim95'] = T2_lim95
pca_obj['speX'] = speX
pca_obj['speX_lim99'] = speX_lim99
pca_obj['speX_lim95'] = speX_lim95
return pca_obj
elif md_algorithm == 'nlp' and pyomo_ok:
if not shush:
print('phi.pca using NLP with Ipopt executed on: ' + str(datetime.datetime.now()))
pcaobj_ = pca_(X, A, mcs=mcs, md_algorithm='nipals', shush=True)
pcaobj_ = prep_pca_4_MDbyNLP(pcaobj_, X_)
TSS = np.sum(X_**2)
TSSpv = np.sum(X_**2, axis=0)
model = ConcreteModel()
model.A = Set(initialize=pcaobj_['pyo_A'])
model.N = Set(initialize=pcaobj_['pyo_N'])
model.O = Set(initialize=pcaobj_['pyo_O'])
model.P = Var(model.N, model.A, within=Reals, initialize=pcaobj_['pyo_P_init'])
model.T = Var(model.O, model.A, within=Reals, initialize=pcaobj_['pyo_T_init'])
model.psi = Param(model.O, model.N, initialize=pcaobj_['pyo_psi'])
model.X = Param(model.O, model.N, initialize=pcaobj_['pyo_X'])
model.delta = Param(model.A, model.A, initialize=lambda model, a1, a2: 1.0 if a1==a2 else 0)
def _c20b_con(model, a1, a2):
return sum(model.P[j, a1] * model.P[j, a2] for j in model.N) == model.delta[a1, a2]
model.c20b = Constraint(model.A, model.A, rule=_c20b_con)
def _20c_con(model, a1, a2):
if a2 < a1:
return sum(model.T[o, a1] * model.T[o, a2] for o in model.O) == 0
else:
return Constraint.Skip
model.c20c = Constraint(model.A, model.A, rule=_20c_con)
def mean_zero(model, i):
return sum(model.T[o,i] for o in model.O) == 0
model.eq3 = Constraint(model.A, rule=mean_zero)
def _eq_20a_obj(model):
return sum(sum((model.X[o,n] - model.psi[o,n] * sum(model.T[o,a] * model.P[n,a] for a in model.A))**2 for n in model.N) for o in model.O)
model.obj = Objective(rule=_eq_20a_obj)
if ipopt_ok:
print("Solving NLP using local IPOPT executable")
solver = SolverFactory('ipopt')
if ma57_ok:
solver.options['linear_solver'] = 'ma57'
results = solver.solve(model, tee=True)
elif gams_ok:
print("Solving NLP using GAMS/IPOPT interface")
solver = SolverFactory('gams:ipopt')
results = solver.solve(model, tee=True)
else:
print("Solving NLP using IPOPT on remote NEOS server")
solver_manager = SolverManagerFactory('neos')
results = solver_manager.solve(model, opt='ipopt', tee=True)
T = np.array([[value(model.T[o,a]) for a in model.A] for o in model.O])
P = np.array([[value(model.P[n,a]) for a in model.A] for n in model.N])
r2 = None; r2pv = None
for a in range(A):
ti = T[:,[a]]; pi = P[:,[a]]
if np.var(ti[ti<0]) > np.var(ti[ti>=0]):
T[:,[a]] = -T[:,[a]]; P[:,[a]] = -P[:,[a]]
ti = -ti; pi = -pi
X_ = (X_ - ti @ pi.T) * not_Xmiss
r2, r2pv = _calc_r2(X_, TSS, TSSpv, r2, r2pv)
r2, r2pv = _r2_cumulative_to_per_component(r2, r2pv, A)
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2)
if not shush:
print('--------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) ')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+": {:8.3f} {:.3f} {:.3f}".format(eigs[a], r2[a], r2xc[a]))
else:
print("PC #1: {:8.3f} {:.3f} {:.3f}".format(eigs[0], r2, r2xc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
pca_obj = {'T':T, 'P':P, 'r2x':r2, 'r2xpv':r2pv, 'mx':x_mean, 'sx':x_std, 'var_t':var_t}
if not isinstance(obsidX, bool):
pca_obj['obsidX'] = obsidX
pca_obj['varidX'] = varidX
T2 = hott2(pca_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1)
speX_lim95, speX_lim99 = spe_ci(speX)
pca_obj['T2'] = T2
pca_obj['T2_lim99'] = T2_lim99
pca_obj['T2_lim95'] = T2_lim95
pca_obj['speX'] = speX
pca_obj['speX_lim99'] = speX_lim99
pca_obj['speX_lim95'] = speX_lim95
return pca_obj
elif md_algorithm == 'nlp' and not pyomo_ok:
print('Pyomo was not found in your system sorry')
print('visit http://www.pyomo.org/ ')
return 1
# =============================================================================
# PLS
# =============================================================================
[docs]
def pls_cca(pls_obj, Xmcs, Ymcs, not_Xmiss):
Tcv=[]; Pcv=[]; Wcv=[]; Betacv=[]
firstone = True
for i in np.arange(Ymcs.shape[1]):
y_ = Ymcs[:, i].reshape(-1, 1)
corr, wt, wy = cca(pls_obj['T'], y_)
t_cv = (pls_obj['T'] @ wt).reshape(-1, 1)
beta = np.linalg.lstsq(t_cv, y_, rcond=None)[0]
w_cv = (pls_obj['Ws'] @ wt).reshape(-1, 1)
p_cv = np.sum(Xmcs * t_cv, axis=0) / np.sum((t_cv * not_Xmiss)**2, axis=0)
p_cv = p_cv.reshape(-1, 1)
if firstone:
Tcv = t_cv; Pcv = p_cv; Wcv = w_cv; Betacv = beta[0][0]
firstone = False
else:
Tcv = np.hstack((Tcv, t_cv))
Pcv = np.hstack((Pcv, p_cv))
Wcv = np.hstack((Wcv, w_cv))
Betacv = np.vstack((Betacv, beta[0][0]))
return Tcv, Pcv, Wcv, Betacv
[docs]
def pls(X, Y, A, *, mcsX=True, mcsY=True, md_algorithm='nipals', force_nipals=True, shush=False,
cross_val=0, cross_val_X=False, cca=False):
"""Fit a Partial Least Squares (PLS) regression model.
Supports missing data in both X and Y via NIPALS. Optionally computes
CCA-based covariant components (equivalent to OPLS predictive space).
Args:
X (pd.DataFrame or np.ndarray): Predictor matrix (n_obs × n_x).
If a DataFrame, the first column must contain observation IDs.
Y (pd.DataFrame or np.ndarray): Response matrix (n_obs × n_y).
If a DataFrame, the first column must contain observation IDs.
Observation IDs are reconciled with X automatically.
A (int): Number of latent variables.
mcsX, mcsY: Preprocessing flags
Each can be ``'autoscale'``, ``'center'``, or ``False``.
Default ``'autoscale'``.
md_algorithm (str): Missing-data algorithm. ``'nipals'`` or ``'nlp'``
``'nipals'`` is (default).
force_nipals (bool): Force NIPALS even for complete data. Default ``False``.
cross_val (int): Cross-validation level. ``0`` = none,
``100`` = LOO, ``1–99`` = element-wise. Default ``0``.
cross_val_X (bool): Also cross-validate X-space. Default ``False``.
shush (bool): Suppress printed output. Default ``False``.
tolerance (float): NIPALS convergence tolerance. Default ``1e-10``.
maxit (int): Max NIPALS iterations per component. Default ``5000``.
cca (bool): If ``True``, compute CCA-based covariant components and
add ``Tcv``, ``Pcv``, ``Wcv`` to the model. Default ``False``.
Returns:
dict: Fitted PLS model with keys:
- ``T`` (ndarray): X-scores (n_obs × A).
- ``P`` (ndarray): X-loadings (n_x × A).
- ``Q`` (ndarray): Y-loadings (n_y × A).
- ``W`` (ndarray): X-weights (n_x × A).
- ``Ws`` (ndarray): Rotated weights W*(P'W)⁻¹ (n_x × A).
- ``r2x`` (float): Cumulative R² for X.
- ``r2xpv`` (ndarray): Per-variable R² for X (n_x × A).
- ``r2y`` (float): Cumulative R² for Y.
- ``r2ypv`` (ndarray): Per-variable R² for Y (n_y × A).
- ``mx``, ``sx`` (ndarray): X preprocessing parameters.
- ``my``, ``sy`` (ndarray): Y preprocessing parameters.
- ``var_t`` (ndarray): Score covariance matrix (A × A).
- ``T2``, ``T2_lim95``, ``T2_lim99``: Hotelling's T² and limits.
- ``speX``, ``speX_lim95``, ``speX_lim99``: X-space SPE and limits.
- ``speY``, ``speY_lim95``, ``speY_lim99``: Y-space SPE and limits.
- ``obsidX``, ``varidX``: IDs (only if X was a DataFrame).
- ``obsidY``, ``varidY``: IDs (only if Y was a DataFrame).
- ``q2x``, ``q2y`` (float): Cross-validated Q² (if ``cross_val > 0``).
- ``Tcv``, ``Pcv``, ``Wcv``: CCA covariant components (if ``cca=True``).
NLP approach to missing data as in:
Puwakkatiya‐Kankanamage, E.H., García‐Muñoz, S. and Biegler, L.T., 2014.
An optimization‐based undeflated PLS (OUPLS) method to handle missing data in
the training set. Journal of Chemometrics, 28(7), pp.575-584.
"""
X, Y = _validate_inputs(X, Y, A=A, mcs=(mcsX, mcsY))
if cross_val == 0:
plsobj = pls_(X, Y, A, mcsX=mcsX, mcsY=mcsY, md_algorithm=md_algorithm, force_nipals=force_nipals, shush=shush, cca=cca)
plsobj['type'] = 'pls'
elif 0 < cross_val < 100:
if isinstance(X, np.ndarray):
X_ = X.copy()
elif isinstance(X, pd.DataFrame):
X_ = np.array(X.values[:,1:]).astype(float)
if isinstance(mcsX, bool):
if mcsX:
X_, x_mean, x_std = meancenterscale(X_)
else:
x_mean = np.zeros((1, X_.shape[1]))
x_std = np.ones((1, X_.shape[1]))
elif mcsX == 'center':
X_, x_mean, x_std = meancenterscale(X_, mcs='center')
elif mcsX == 'autoscale':
X_, x_mean, x_std = meancenterscale(X_, mcs='autoscale')
X_nan_map = np.isnan(X_)
not_Xmiss = (~X_nan_map) * 1
if isinstance(Y, np.ndarray):
Y_ = Y.copy()
elif isinstance(Y, pd.DataFrame):
Y_ = np.array(Y.values[:,1:]).astype(float)
if isinstance(mcsY, bool):
if mcsY:
Y_, y_mean, y_std = meancenterscale(Y_)
else:
y_mean = np.zeros((1, Y_.shape[1]))
y_std = np.ones((1, Y_.shape[1]))
elif mcsY == 'center':
Y_, y_mean, y_std = meancenterscale(Y_, mcs='center')
elif mcsY == 'autoscale':
Y_, y_mean, y_std = meancenterscale(Y_, mcs='autoscale')
Y_nan_map = np.isnan(Y_)
not_Ymiss = (~Y_nan_map) * 1
X_, Xnanmap = n2z(X_)
TSSX = np.sum(X_**2)
TSSXpv = np.sum(X_**2, axis=0)
colsX = X_.shape[1]; rowsX = X_.shape[0]
X_ = z2n(X_, Xnanmap)
Y_, Ynanmap = n2z(Y_)
TSSY = np.sum(Y_**2)
TSSYpv = np.sum(Y_**2, axis=0)
colsY = Y_.shape[1]; rowsY = Y_.shape[0]
Y_ = z2n(Y_, Ynanmap)
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None
q2Y = None; q2Ypv = None; q2X = None; q2Xpv = None
for a in range(A):
if not shush:
print('Cross validating LV #' + str(a+1))
not_removed_mapY = not_Ymiss.copy().reshape(rowsY*colsY, -1)
Yrnd = np.random.random(Y_.shape) * not_Ymiss
indxY = np.argsort(Yrnd.ravel())
elements_to_remove_Y = int(np.ceil(rowsY * colsY * (cross_val/100)))
errorY = np.zeros((rowsY*colsY, 1))
if cross_val_X:
not_removed_mapX = not_Xmiss.copy().reshape(rowsX*colsX, -1)
Xrnd = np.random.random(X_.shape) * not_Xmiss
indxX = np.argsort(Xrnd.ravel())
elements_to_remove_X = int(np.ceil(rowsX * colsX * (cross_val/100)))
errorX = np.zeros((rowsX*colsX, 1))
else:
not_removed_mapX = 0
number_of_rounds = 1
while np.sum(not_removed_mapX) > 0 or np.sum(not_removed_mapY) > 0:
number_of_rounds += 1
X_copy = X_.copy()
if cross_val_X:
if indxX.size > elements_to_remove_X:
indx_this_roundX = indxX[:elements_to_remove_X]
indxX = indxX[elements_to_remove_X:]
else:
indx_this_roundX = indxX
X_copy = X_copy.reshape(rowsX*colsX, 1)
elements_outX = X_copy[indx_this_roundX]
X_copy[indx_this_roundX] = np.nan
X_copy = X_copy.reshape(rowsX, colsX)
not_removed_mapX[indx_this_roundX] = 0
auxmap = np.sum(np.isnan(X_copy)*1, axis=1)
indx2 = np.where(auxmap == X_copy.shape[1])[0].tolist()
else:
indx2 = []
Y_copy = Y_.copy()
if indxY.size > elements_to_remove_Y:
indx_this_roundY = indxY[:elements_to_remove_Y]
indxY = indxY[elements_to_remove_Y:]
else:
indx_this_roundY = indxY
Y_copy = Y_copy.reshape(rowsY*colsY, 1)
elements_outY = Y_copy[indx_this_roundY]
Y_copy[indx_this_roundY] = np.nan
Y_copy = Y_copy.reshape(rowsY, colsY)
not_removed_mapY[indx_this_roundY] = 0
auxmap = np.sum(np.isnan(Y_copy)*1, axis=1)
indx3 = np.where(auxmap == Y_copy.shape[1])[0].tolist()
indx4 = np.unique(indx3 + indx2).tolist()
if len(indx4) > 0:
X_copy = np.delete(X_copy, indx4, 0)
Y_copy = np.delete(Y_copy, indx4, 0)
plsobj_ = pls_(X_copy, Y_copy, 1, mcsX=False, mcsY=False, shush=True)
plspred = pls_pred(X_, plsobj_)
if cross_val_X:
xhat = plspred['Tnew'] @ plsobj_['P'].T
xhat = xhat.reshape(rowsX*colsX, 1)
errorX[indx_this_roundX] = elements_outX - xhat[indx_this_roundX]
yhat = plspred['Tnew'] @ plsobj_['Q'].T
yhat = yhat.reshape(rowsY*colsY, 1)
errorY[indx_this_roundY] = elements_outY - yhat[indx_this_roundY]
if cross_val_X:
errorX = errorX.reshape(rowsX, colsX)
errorX, dummy = n2z(errorX)
PRESSXpv = np.sum(errorX**2, axis=0)
PRESSX = np.sum(errorX**2)
errorY = errorY.reshape(rowsY, colsY)
errorY, dummy = n2z(errorY)
PRESSYpv = np.sum(errorY**2, axis=0)
PRESSY = np.sum(errorY**2)
if a == 0:
q2Y = 1 - PRESSY/TSSY
q2Ypv = (1 - PRESSYpv/TSSYpv).reshape(-1, 1)
if cross_val_X:
q2X = 1 - PRESSX/TSSX
q2Xpv = (1 - PRESSXpv/TSSXpv).reshape(-1, 1)
else:
q2Y = np.hstack((q2Y, 1 - PRESSY/TSSY))
q2Ypv = np.hstack((q2Ypv, (1 - PRESSYpv/TSSYpv).reshape(-1, 1)))
if cross_val_X:
q2X = np.hstack((q2X, 1 - PRESSX/TSSX))
q2Xpv = np.hstack((q2Xpv, (1 - PRESSXpv/TSSXpv).reshape(-1, 1)))
X_copy = X_.copy(); Y_copy = Y_.copy()
plsobj_ = pls_(X_copy, Y_copy, 1, mcsX=False, mcsY=False, shush=True)
xhat = plsobj_['T'] @ plsobj_['P'].T
yhat = plsobj_['T'] @ plsobj_['Q'].T
X_, Xnanmap = n2z(X_)
Y_, Ynanmap = n2z(Y_)
X_ = (X_ - xhat) * not_Xmiss
Y_ = (Y_ - yhat) * not_Ymiss
r2X, r2Xpv = _calc_r2(X_, TSSX, TSSXpv,
None if a == 0 else r2X,
None if a == 0 else r2Xpv)
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv,
None if a == 0 else r2Y,
None if a == 0 else r2Ypv)
X_ = z2n(X_, Xnanmap)
Y_ = z2n(Y_, Ynanmap)
plsobj = pls_(X, Y, A, mcsX=mcsX, mcsY=mcsY, shush=True, cca=cca)
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
if cross_val_X:
q2X, q2Xpv = _r2_cumulative_to_per_component(q2X, q2Xpv, A)
else:
q2X = False; q2Xpv = False
q2Y, q2Ypv = _r2_cumulative_to_per_component(q2Y, q2Ypv, A)
r2xc = np.cumsum(r2X); r2yc = np.cumsum(r2Y)
q2xc = np.cumsum(q2X) if cross_val_X else False
q2yc = np.cumsum(q2Y)
eigs = np.var(plsobj['T'], axis=0)
plsobj['q2Y'] = q2Y
plsobj['q2Ypv'] = q2Ypv
if cross_val_X:
plsobj['q2X'] = q2X
plsobj['q2Xpv'] = q2Xpv
if not shush:
print('phi.pls using NIPALS and cross-validation (' + str(cross_val) + '%) executed on: ' + str(datetime.datetime.now()))
if not cross_val_X:
print('---------------------------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) R2Y sum(R2Y) Q2Y sum(Q2Y)')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+":{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], r2Y[a], r2yc[a], q2Y[a], q2yc[a]))
else:
print("PC #1:{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], r2Y, r2yc[0], q2Y, q2yc[0]))
print('---------------------------------------------------------------------------------')
else:
print('-------------------------------------------------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) Q2X sum(Q2X) R2Y sum(R2Y) Q2Y sum(Q2Y)')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+":{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], q2X[a], q2xc[a], r2Y[a], r2yc[a], q2Y[a], q2yc[a]))
else:
print("PC #1:{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], q2X, q2xc[0], r2Y, r2yc[0], q2Y, q2yc[0]))
print('-------------------------------------------------------------------------------------------------------')
plsobj['type'] = 'pls'
elif cross_val == 100:
# LOO cross-validation
if isinstance(X, np.ndarray):
X_ = X.copy()
elif isinstance(X, pd.DataFrame):
X_ = np.array(X.values[:,1:]).astype(float)
if isinstance(mcsX, bool):
if mcsX:
X_, x_mean, x_std = meancenterscale(X_)
else:
x_mean = np.zeros((1, X_.shape[1]))
x_std = np.ones((1, X_.shape[1]))
elif mcsX == 'center':
X_, x_mean, x_std = meancenterscale(X_, mcs='center')
elif mcsX == 'autoscale':
X_, x_mean, x_std = meancenterscale(X_, mcs='autoscale')
X_nan_map = np.isnan(X_)
not_Xmiss = (~X_nan_map) * 1
if isinstance(Y, np.ndarray):
Y_ = Y.copy()
elif isinstance(Y, pd.DataFrame):
Y_ = np.array(Y.values[:,1:]).astype(float)
if isinstance(mcsY, bool):
if mcsY:
Y_, y_mean, y_std = meancenterscale(Y_)
else:
y_mean = np.zeros((1, Y_.shape[1]))
y_std = np.ones((1, Y_.shape[1]))
elif mcsY == 'center':
Y_, y_mean, y_std = meancenterscale(Y_, mcs='center')
elif mcsY == 'autoscale':
Y_, y_mean, y_std = meancenterscale(Y_, mcs='autoscale')
Y_nan_map = np.isnan(Y_)
not_Ymiss = (~Y_nan_map) * 1
X_, Xnanmap = n2z(X_)
TSSX = np.sum(X_**2)
TSSXpv = np.sum(X_**2, axis=0)
colsX = X_.shape[1]; rowsX = X_.shape[0]
X_ = z2n(X_, Xnanmap)
Y_, Ynanmap = n2z(Y_)
TSSY = np.sum(Y_**2)
TSSYpv = np.sum(Y_**2, axis=0)
colsY = Y_.shape[1]; rowsY = Y_.shape[0]
Y_ = z2n(Y_, Ynanmap)
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None
for a in range(A):
errorY = np.zeros((rowsY*colsY, 1))
if cross_val_X:
errorX = np.zeros((rowsX*colsX, 1))
for o in range(X_.shape[0]):
X_copy = X_.copy(); Y_copy = Y_.copy()
elements_outX = X_copy[o, :].copy()
elements_outY = Y_copy[o, :].copy()
X_copy = np.delete(X_copy, o, 0)
Y_copy = np.delete(Y_copy, o, 0)
plsobj_ = pls_(X_copy, Y_copy, 1, mcsX=False, mcsY=False, shush=True)
plspred = pls_pred(elements_outX, plsobj_)
if o == 0:
if cross_val_X:
errorX_mat = elements_outX - plspred['Xhat']
errorY_mat = elements_outY - plspred['Yhat']
else:
if cross_val_X:
errorX_mat = np.vstack((errorX_mat, elements_outX - plspred['Xhat']))
errorY_mat = np.vstack((errorY_mat, elements_outY - plspred['Yhat']))
if cross_val_X:
errorX_mat, dummy = n2z(errorX_mat)
PRESSXpv = np.sum(errorX_mat**2, axis=0)
PRESSX = np.sum(errorX_mat**2)
errorY_mat, dummy = n2z(errorY_mat)
PRESSYpv = np.sum(errorY_mat**2, axis=0)
PRESSY = np.sum(errorY_mat**2)
if a == 0:
q2Y = 1 - PRESSY/TSSY
q2Ypv = (1 - PRESSYpv/TSSYpv).reshape(-1, 1)
if cross_val_X:
q2X = 1 - PRESSX/TSSX
q2Xpv = (1 - PRESSXpv/TSSXpv).reshape(-1, 1)
else:
q2Y = np.hstack((q2Y, 1 - PRESSY/TSSY))
q2Ypv = np.hstack((q2Ypv, (1 - PRESSYpv/TSSYpv).reshape(-1, 1)))
if cross_val_X:
q2X = np.hstack((q2X, 1 - PRESSX/TSSX))
q2Xpv = np.hstack((q2Xpv, (1 - PRESSXpv/TSSXpv).reshape(-1, 1)))
X_copy = X_.copy(); Y_copy = Y_.copy()
plsobj_ = pls_(X_copy, Y_copy, 1, mcsX=False, mcsY=False, shush=True)
xhat = plsobj_['T'] @ plsobj_['P'].T
yhat = plsobj_['T'] @ plsobj_['Q'].T
X_, Xnanmap = n2z(X_)
Y_, Ynanmap = n2z(Y_)
X_ = (X_ - xhat) * not_Xmiss
Y_ = (Y_ - yhat) * not_Ymiss
r2X, r2Xpv = _calc_r2(X_, TSSX, TSSXpv,
None if a == 0 else r2X,
None if a == 0 else r2Xpv)
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv,
None if a == 0 else r2Y,
None if a == 0 else r2Ypv)
X_ = z2n(X_, Xnanmap)
Y_ = z2n(Y_, Ynanmap)
plsobj = pls_(X, Y, A, mcsX=mcsX, mcsY=mcsY, shush=True, cca=cca)
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
if cross_val_X:
q2X, q2Xpv = _r2_cumulative_to_per_component(q2X, q2Xpv, A)
else:
q2X = False; q2Xpv = False
q2Y, q2Ypv = _r2_cumulative_to_per_component(q2Y, q2Ypv, A)
r2xc = np.cumsum(r2X); r2yc = np.cumsum(r2Y)
q2xc = np.cumsum(q2X) if cross_val_X else False
q2yc = np.cumsum(q2Y)
eigs = np.var(plsobj['T'], axis=0)
plsobj['q2Y'] = q2Y
plsobj['q2Ypv'] = q2Ypv
plsobj['type'] = 'pls'
if cross_val_X:
plsobj['q2X'] = q2X
plsobj['q2Xpv'] = q2Xpv
if not shush:
if not cross_val_X:
print('---------------------------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) R2Y sum(R2Y) Q2Y sum(Q2Y)')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+":{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], r2Y[a], r2yc[a], q2Y[a], q2yc[a]))
else:
print("PC #1:{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], r2Y, r2yc[0], q2Y, q2yc[0]))
print('---------------------------------------------------------------------------------')
else:
print('-------------------------------------------------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) Q2X sum(Q2X) R2Y sum(R2Y) Q2Y sum(Q2Y)')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+":{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], q2X[a], q2xc[a], r2Y[a], r2yc[a], q2Y[a], q2yc[a]))
else:
print("PC #1:{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], q2X, q2xc[0], r2Y, r2yc[0], q2Y, q2yc[0]))
print('-------------------------------------------------------------------------------------------------------')
else:
plsobj = 'Cannot cross validate with those options'
return plsobj
[docs]
def pls_(X, Y, A, *, mcsX=True, mcsY=True, md_algorithm='nipals',
force_nipals=True, shush=False, cca=False):
X_, obsidX, varidX = _extract_array(X)
Y_, obsidY, varidY = _extract_array(Y)
if isinstance(mcsX, bool):
if mcsX:
X_, x_mean, x_std = meancenterscale(X_)
else:
x_mean = np.zeros((1, X_.shape[1]))
x_std = np.ones((1, X_.shape[1]))
elif mcsX == 'center':
X_, x_mean, x_std = meancenterscale(X_, mcs='center')
elif mcsX == 'autoscale':
X_, x_mean, x_std = meancenterscale(X_, mcs='autoscale')
if isinstance(mcsY, bool):
if mcsY:
Y_, y_mean, y_std = meancenterscale(Y_)
else:
y_mean = np.zeros((1, Y_.shape[1]))
y_std = np.ones((1, Y_.shape[1]))
elif mcsY == 'center':
Y_, y_mean, y_std = meancenterscale(Y_, mcs='center')
elif mcsY == 'autoscale':
Y_, y_mean, y_std = meancenterscale(Y_, mcs='autoscale')
X_nan_map = np.isnan(X_)
not_Xmiss = (~X_nan_map) * 1
Y_nan_map = np.isnan(Y_)
not_Ymiss = (~Y_nan_map) * 1
if (not X_nan_map.any() and not Y_nan_map.any()) and not force_nipals:
if cca:
Xmcs = X_.copy(); Ymcs = Y_.copy()
if not shush:
print('phi.pls using SVD executed on: ' + str(datetime.datetime.now()))
TSSX = np.sum(X_**2); TSSXpv = np.sum(X_**2, axis=0)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None
for a in range(A):
[U_, S, Wh] = np.linalg.svd((X_.T @ Y_) @ (Y_.T @ X_))
w = Wh.T[:, [0]]
t = X_ @ w
q = Y_.T @ t / (t.T @ t)
u = Y_ @ q / (q.T @ q)
p = X_.T @ t / (t.T @ t)
X_ = X_ - t @ p.T
Y_ = Y_ - t @ q.T
if a == 0:
W = w; T = t; Q = q; U = u; P = p
else:
W = np.hstack((W, w)); T = np.hstack((T, t))
Q = np.hstack((Q, q)); U = np.hstack((U, u)); P = np.hstack((P, p))
r2X, r2Xpv = _calc_r2(X_, TSSX, TSSXpv, r2X, r2Xpv)
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv, r2Y, r2Ypv)
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
Ws = W @ np.linalg.pinv(P.T @ W)
Ws[:, 0] = W[:, 0]
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2X); r2yc = np.cumsum(r2Y)
if not shush:
print('--------------------------------------------------------------')
print('LV # Eig R2X sum(R2X) R2Y sum(R2Y)')
if A > 1:
for a in range(A):
print("LV #"+str(a+1)+": {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], r2Y[a], r2yc[a]))
else:
print("LV #1: {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], r2Y, r2yc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
pls_obj = {'T':T, 'P':P, 'Q':Q, 'W':W, 'Ws':Ws, 'U':U,
'r2x':r2X, 'r2xpv':r2Xpv, 'mx':x_mean, 'sx':x_std,
'r2y':r2Y, 'r2ypv':r2Ypv, 'my':y_mean, 'sy':y_std, 'var_t':var_t}
if not isinstance(obsidX, bool):
pls_obj['obsidX'] = obsidX; pls_obj['varidX'] = varidX
if not isinstance(obsidY, bool):
pls_obj['obsidY'] = obsidY; pls_obj['varidY'] = varidY
T2 = hott2(pls_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1)
speX_lim95, speX_lim99 = spe_ci(speX)
speY = np.sum(Y_**2, axis=1, keepdims=1)
speY_lim95, speY_lim99 = spe_ci(speY)
pls_obj['T2'] = T2; pls_obj['T2_lim99'] = T2_lim99; pls_obj['T2_lim95'] = T2_lim95
pls_obj['speX'] = speX; pls_obj['speX_lim99'] = speX_lim99; pls_obj['speX_lim95'] = speX_lim95
pls_obj['speY'] = speY; pls_obj['speY_lim99'] = speY_lim99; pls_obj['speY_lim95'] = speY_lim95
if cca:
Tcv, Pcv, Wcv, Betacv = pls_cca(pls_obj, Xmcs, Ymcs, not_Xmiss)
pls_obj['Tcv'] = Tcv; pls_obj['Pcv'] = Pcv; pls_obj['Wcv'] = Wcv; pls_obj['Betacv'] = Betacv
return pls_obj
else:
if md_algorithm == 'nipals':
if not shush:
print('phi.pls using NIPALS executed on: ' + str(datetime.datetime.now()))
X_, dummy = n2z(X_)
Y_, dummy = n2z(Y_)
epsilon = 1E-9; maxit = 2000
if cca:
Xmcs = X_.copy(); Ymcs = Y_.copy()
TSSX = np.sum(X_**2); TSSXpv = np.sum(X_**2, axis=0)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None
for a in range(A):
ui = Y_[:, [np.argmax(std(Y_))]]
Converged = False; num_it = 0
while not Converged:
# w = X'u / (u'u)
wi = np.sum(X_ * ui, axis=0) / np.sum((ui * not_Xmiss)**2, axis=0)
wi = wi / np.linalg.norm(wi)
# t = Xw / (w'w)
ti = X_ @ wi
wtw = np.sum((wi * not_Xmiss)**2, axis=1)
ti = ti / wtw
ti = ti.reshape(-1, 1); wi = wi.reshape(-1, 1)
# q = Y't / (t't)
qi = np.sum(Y_ * ti, axis=0) / np.sum((ti * not_Ymiss)**2, axis=0)
# u = Yq / (q'q)
qi_1d = qi.copy()
qi = qi.reshape(-1, 1)
un = Y_ @ qi
qtq = np.sum((qi_1d * not_Ymiss)**2, axis=1).reshape(-1, 1)
un = un / qtq
if abs((np.linalg.norm(ui) - np.linalg.norm(un))) / np.linalg.norm(ui) < epsilon:
Converged = True
if num_it > maxit:
Converged = True
if Converged:
if np.var(ti[ti<0]) > np.var(ti[ti>=0]):
ti = -ti; wi = -wi; un = -un; qi = -qi
if not shush:
print('# Iterations for LV #'+str(a+1)+': ', str(num_it))
# p = X't / (t't)
pi = np.sum(X_ * ti, axis=0) / np.sum((ti * not_Xmiss)**2, axis=0)
pi = pi.reshape(-1, 1)
X_ = (X_ - ti @ pi.T) * not_Xmiss
Y_ = (Y_ - ti @ qi.T) * not_Ymiss
if a == 0:
T = ti; P = pi; W = wi; U = un; Q = qi
else:
T = np.hstack((T, ti)); U = np.hstack((U, un))
P = np.hstack((P, pi)); W = np.hstack((W, wi)); Q = np.hstack((Q, qi))
r2X, r2Xpv = _calc_r2(X_, TSSX, TSSXpv, r2X, r2Xpv)
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv, r2Y, r2Ypv)
else:
num_it += 1
ui = un
if a == 0:
numIT = num_it
else:
numIT = np.hstack((numIT, num_it))
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
Ws = W @ np.linalg.pinv(P.T @ W)
Ws[:, 0] = W[:, 0]
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2X); r2yc = np.cumsum(r2Y)
if not shush:
print('--------------------------------------------------------------')
print('LV # Eig R2X sum(R2X) R2Y sum(R2Y)')
if A > 1:
for a in range(A):
print("LV #"+str(a+1)+": {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], r2Y[a], r2yc[a]))
else:
print("LV #1: {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], r2Y, r2yc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
pls_obj = {'T':T, 'P':P, 'Q':Q, 'W':W, 'Ws':Ws, 'U':U,
'r2x':r2X, 'r2xpv':r2Xpv, 'mx':x_mean, 'sx':x_std,
'r2y':r2Y, 'r2ypv':r2Ypv, 'my':y_mean, 'sy':y_std, 'var_t':var_t}
if not isinstance(obsidX, bool):
pls_obj['obsidX'] = obsidX; pls_obj['varidX'] = varidX
if not isinstance(obsidY, bool):
pls_obj['obsidY'] = obsidY; pls_obj['varidY'] = varidY
T2 = hott2(pls_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1)
speX_lim95, speX_lim99 = spe_ci(speX)
speY = np.sum(Y_**2, axis=1, keepdims=1)
speY_lim95, speY_lim99 = spe_ci(speY)
pls_obj['T2'] = T2; pls_obj['T2_lim99'] = T2_lim99; pls_obj['T2_lim95'] = T2_lim95
pls_obj['speX'] = speX; pls_obj['speX_lim99'] = speX_lim99; pls_obj['speX_lim95'] = speX_lim95
pls_obj['speY'] = speY; pls_obj['speY_lim99'] = speY_lim99; pls_obj['speY_lim95'] = speY_lim95
if cca:
Tcv, Pcv, Wcv, Betacv = pls_cca(pls_obj, Xmcs, Ymcs, not_Xmiss)
pls_obj['Tcv'] = Tcv; pls_obj['Pcv'] = Pcv; pls_obj['Wcv'] = Wcv; pls_obj['Betacv'] = Betacv
return pls_obj
elif md_algorithm == 'nlp':
shush = False
if not shush:
print('phi.pls using NLP with Ipopt executed on: ' + str(datetime.datetime.now()))
X_, dummy = n2z(X_)
Y_, dummy = n2z(Y_)
plsobj_ = pls_(X, Y, A, mcsX=mcsX, mcsY=mcsY, md_algorithm='nipals', shush=True)
plsobj_ = prep_pls_4_MDbyNLP(plsobj_, X_, Y_)
TSSX = np.sum(X_**2); TSSXpv = np.sum(X_**2, axis=0)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
model = ConcreteModel()
model.A = Set(initialize=plsobj_['pyo_A'])
model.N = Set(initialize=plsobj_['pyo_N'])
model.M = Set(initialize=plsobj_['pyo_M'])
model.O = Set(initialize=plsobj_['pyo_O'])
model.P = Var(model.N, model.A, within=Reals, initialize=plsobj_['pyo_P_init'])
model.T = Var(model.O, model.A, within=Reals, initialize=plsobj_['pyo_T_init'])
model.psi = Param(model.O, model.N, initialize=plsobj_['pyo_psi'])
model.X = Param(model.O, model.N, initialize=plsobj_['pyo_X'])
model.theta = Param(model.O, model.M, initialize=plsobj_['pyo_theta'])
model.Y = Param(model.O, model.M, initialize=plsobj_['pyo_Y'])
model.delta = Param(model.A, model.A, initialize=lambda model, a1, a2: 1.0 if a1==a2 else 0)
def _c27bc_con(model, a1, a2):
return sum(model.P[j, a1] * model.P[j, a2] for j in model.N) == model.delta[a1, a2]
model.c27bc = Constraint(model.A, model.A, rule=_c27bc_con)
def _27d_con(model, a1, a2):
if a2 < a1:
return sum(model.T[o, a1] * model.T[o, a2] for o in model.O) == 0
else:
return Constraint.Skip
model.c27d = Constraint(model.A, model.A, rule=_27d_con)
def _27e_con(model, i):
return sum(model.T[o,i] for o in model.O) == 0
model.c27e = Constraint(model.A, rule=_27e_con)
def _eq_27a_obj(model):
return sum(sum(sum((model.theta[o,m]*model.Y[o,m]) * (model.X[o,n] - model.psi[o,n] * sum(model.T[o,a] * model.P[n,a] for a in model.A)) for o in model.O)**2 for n in model.N) for m in model.M)
model.obj = Objective(rule=_eq_27a_obj)
if ipopt_ok:
solver = SolverFactory('ipopt')
if ma57_ok: solver.options['linear_solver'] = 'ma57'
results = solver.solve(model, tee=True)
elif gams_ok:
solver = SolverFactory('gams:ipopt')
results = solver.solve(model, tee=True)
else:
solver_manager = SolverManagerFactory('neos')
results = solver_manager.solve(model, opt='ipopt', tee=True)
T = np.array([[value(model.T[o,a]) for a in model.A] for o in model.O])
P = np.array([[value(model.P[n,a]) for a in model.A] for n in model.N])
# Obtain Ws via NLP
Taux = np2D2pyomo(T)
modelb = ConcreteModel()
modelb.A = Set(initialize=plsobj_['pyo_A'])
modelb.N = Set(initialize=plsobj_['pyo_N'])
modelb.O = Set(initialize=plsobj_['pyo_O'])
modelb.Ws = Var(model.N, model.A, within=Reals, initialize=plsobj_['pyo_Ws_init'])
modelb.T = Param(model.O, model.A, within=Reals, initialize=Taux)
modelb.psi = Param(model.O, model.N, initialize=plsobj_['pyo_psi'])
modelb.X = Param(model.O, model.N, initialize=plsobj_['pyo_X'])
def _eq_obj(model):
return sum(sum((model.T[o,a] - sum(model.psi[o,n] * model.X[o,n] * model.Ws[n,a] for n in model.N))**2 for a in model.A) for o in model.O)
modelb.obj = Objective(rule=_eq_obj)
if ipopt_ok:
solver = SolverFactory('ipopt')
if ma57_ok: solver.options['linear_solver'] = 'ma57'
results = solver.solve(modelb, tee=True)
elif gams_ok:
solver = SolverFactory('gams:ipopt')
results = solver.solve(modelb, tee=True)
else:
solver_manager = SolverManagerFactory('neos')
results = solver_manager.solve(modelb, opt='ipopt', tee=True)
Ws = np.array([[value(modelb.Ws[n,a]) for a in modelb.A] for n in modelb.N])
# Obtain Q via NLP
Xhat = T @ P.T
Xaux = X_.copy(); Xaux[X_nan_map] = Xhat[X_nan_map]
Xaux = np2D2pyomo(Xaux); Taux = np2D2pyomo(T)
model2 = ConcreteModel()
model2.A = Set(initialize=plsobj_['pyo_A'])
model2.N = Set(initialize=plsobj_['pyo_N'])
model2.M = Set(initialize=plsobj_['pyo_M'])
model2.O = Set(initialize=plsobj_['pyo_O'])
model2.T = Param(model.O, model.A, within=Reals, initialize=Taux)
model2.Q = Var(model.M, model.A, within=Reals, initialize=plsobj_['pyo_Q_init'])
model2.X = Param(model.O, model.N, initialize=plsobj_['pyo_X'])
model2.theta = Param(model.O, model.M, initialize=plsobj_['pyo_theta'])
model2.Y = Param(model.O, model.M, initialize=plsobj_['pyo_Y'])
model2.delta = Param(model.A, model.A, initialize=lambda model, a1, a2: 1.0 if a1==a2 else 0)
def _eq_36a_mod_obj(model):
return sum(sum(sum((model.X[o,n]) * (model.Y[o,m] - model.theta[o,m] * sum(model.T[o,a] * model.Q[m,a] for a in model.A)) for o in model.O)**2 for n in model.N) for m in model.M)
model2.obj = Objective(rule=_eq_36a_mod_obj)
if ipopt_ok:
solver = SolverFactory('ipopt')
if ma57_ok: solver.options['linear_solver'] = 'ma57'
results = solver.solve(model2, tee=True)
elif gams_ok:
solver = SolverFactory('gams:ipopt')
results = solver.solve(model2, tee=True)
else:
solver_manager = SolverManagerFactory('neos')
results = solver_manager.solve(model2, opt='ipopt', tee=True)
Q = np.array([[value(model2.Q[m,a]) for a in model2.A] for m in model2.M])
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None
for a in range(A):
ti = T[:,[a]]; pi = P[:,[a]]; qi = Q[:,[a]]
X_ = (X_ - ti @ pi.T) * not_Xmiss
Y_ = (Y_ - ti @ qi.T) * not_Ymiss
r2X, r2Xpv = _calc_r2(X_, TSSX, TSSXpv, r2X, r2Xpv)
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv, r2Y, r2Ypv)
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2X); r2yc = np.cumsum(r2Y)
if not shush:
print('--------------------------------------------------------------')
print('LV # Eig R2X sum(R2X) R2Y sum(R2Y)')
if A > 1:
for a in range(A):
print("LV #"+str(a+1)+": {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], r2Y[a], r2yc[a]))
else:
print("LV #1: {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], r2Y, r2yc[0]))
print('--------------------------------------------------------------')
W = 1; U = 1
var_t = (T.T @ T) / T.shape[0]
pls_obj = {'T':T, 'P':P, 'Q':Q, 'W':W, 'Ws':Ws, 'U':U,
'r2x':r2X, 'r2xpv':r2Xpv, 'mx':x_mean, 'sx':x_std,
'r2y':r2Y, 'r2ypv':r2Ypv, 'my':y_mean, 'sy':y_std, 'var_t':var_t}
if not isinstance(obsidX, bool):
pls_obj['obsidX'] = obsidX; pls_obj['varidX'] = varidX
if not isinstance(obsidY, bool):
pls_obj['obsidY'] = obsidY; pls_obj['varidY'] = varidY
T2 = hott2(pls_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1)
speX_lim95, speX_lim99 = spe_ci(speX)
speY = np.sum(Y_**2, axis=1, keepdims=1)
speY_lim95, speY_lim99 = spe_ci(speY)
pls_obj['T2'] = T2; pls_obj['T2_lim99'] = T2_lim99; pls_obj['T2_lim95'] = T2_lim95
pls_obj['speX'] = speX; pls_obj['speX_lim99'] = speX_lim99; pls_obj['speX_lim95'] = speX_lim95
pls_obj['speY'] = speY; pls_obj['speY_lim99'] = speY_lim99; pls_obj['speY_lim95'] = speY_lim95
return pls_obj
# =============================================================================
# Prediction functions (v1.0: use stored var_t, broadcasting)
# =============================================================================
[docs]
def hott2(mvmobj, *, Xnew=False, Tnew=False):
"""Compute Hotelling's T² statistic.
Args:
mvmobj (dict): Fitted PCA or PLS model.
Xnew (pd.DataFrame or np.ndarray): New X observations (optional).
If provided, scores are computed internally before T² calculation.
Tnew (np.ndarray): Pre-computed scores (optional). Used directly
if provided; avoids redundant projection.
Returns:
ndarray: T² value for each observation (n_obs,).
Note:
If neither ``Xnew`` nor ``Tnew`` is provided, returns T² for the
training set stored in ``mvmobj``.
"""
var_t = mvmobj.get('var_t', (mvmobj['T'].T @ mvmobj['T']) / mvmobj['T'].shape[0])
var_t_inv = np.linalg.inv(var_t)
if isinstance(Xnew, bool) and not isinstance(Tnew, bool):
hott2_ = np.sum((Tnew @ var_t_inv) * Tnew, axis=1)
elif isinstance(Tnew, bool) and not isinstance(Xnew, bool):
if 'Q' in mvmobj:
xpred = pls_pred(Xnew, mvmobj)
else:
xpred = pca_pred(Xnew, mvmobj)
Tnew = xpred['Tnew']
hott2_ = np.sum((Tnew @ var_t_inv) * Tnew, axis=1)
elif isinstance(Xnew, bool) and isinstance(Tnew, bool):
Tnew = mvmobj['T']
hott2_ = np.sum((Tnew @ var_t_inv) * Tnew, axis=1)
return hott2_
[docs]
def pca_pred(Xnew, pcaobj, *, algorithm='p2mp'):
"""Project new observations onto a fitted PCA model.
Args:
Xnew (pd.DataFrame or np.ndarray): New observations to project.
Variables must match those used to train ``pcaobj``.
pcaobj (dict): Fitted PCA model from :func:`pca`.
algorithm (str): Projection algorithm. ``'p2mp'`` (default) handles
missing data; ``'standard'`` uses direct matrix multiplication
and requires complete data.
Returns:
dict: Prediction results with keys:
- ``Tnew`` (ndarray): Projected scores (n_new × A).
- ``Xhat`` (ndarray): Reconstructed X in original scale.
- ``speX`` (ndarray): SPE for each new observation.
- ``T2`` (ndarray): Hotelling's T² for each new observation.
"""
if isinstance(Xnew, np.ndarray):
X_ = Xnew.copy()
if X_.ndim == 1:
X_ = X_.reshape(1, -1)
elif isinstance(Xnew, pd.DataFrame):
X_ = np.array(Xnew.values[:,1:]).astype(float)
var_t = pcaobj.get('var_t', (pcaobj['T'].T @ pcaobj['T']) / pcaobj['T'].shape[0])
X_nan_map = np.isnan(X_)
if not X_nan_map.any():
X_mcs = (X_ - pcaobj['mx']) / pcaobj['sx']
tnew = X_mcs @ pcaobj['P']
xhat = tnew @ pcaobj['P'].T * pcaobj['sx'] + pcaobj['mx']
htt2 = np.sum((tnew @ np.linalg.inv(var_t)) * tnew, axis=1)
spe_ = X_mcs - tnew @ pcaobj['P'].T
spe_ = np.sum(spe_**2, axis=1, keepdims=True)
xpred = {'Xhat':xhat, 'Tnew':tnew, 'speX':spe_, 'T2':htt2}
elif algorithm == 'p2mp':
not_Xmiss = (~X_nan_map) * 1
Xmcs = (X_ - pcaobj['mx']) / pcaobj['sx']
Xmcs, dummy = n2z(Xmcs)
for i in range(Xmcs.shape[0]):
row_map = not_Xmiss[[i], :]
tempP = pcaobj['P'] * row_map.T
PTP = tempP.T @ tempP
try:
tnew_, resid, rank, s = np.linalg.lstsq(PTP, tempP.T @ Xmcs[[i], :].T, rcond=None)
except:
tnew_ = np.linalg.pinv(PTP) @ tempP.T @ Xmcs[[i], :].T
if i == 0:
tnew = tnew_.T
else:
tnew = np.vstack((tnew, tnew_.T))
xhat = tnew @ pcaobj['P'].T * pcaobj['sx'] + pcaobj['mx']
htt2 = np.sum((tnew @ np.linalg.inv(var_t)) * tnew, axis=1)
spe_ = (Xmcs - tnew @ pcaobj['P'].T) * not_Xmiss
spe_ = np.sum(spe_**2, axis=1, keepdims=True)
xpred = {'Xhat':xhat, 'Tnew':tnew, 'speX':spe_, 'T2':htt2}
return xpred
[docs]
def pls_pred(Xnew, plsobj):
"""Predict Y for new observations using a fitted PLS model.
Args:
Xnew (pd.DataFrame or np.ndarray): New predictor observations.
Variables must match those used to train ``plsobj``.
plsobj (dict): Fitted PLS model from :func:`pls`.
algorithm (str): Projection algorithm. ``'p2mp'`` (default) handles
missing data; ``'standard'`` requires complete data.
Returns:
dict: Prediction results with keys:
- ``Tnew`` (ndarray): X-scores for new observations (n_new × A).
- ``Yhat`` (ndarray): Predicted Y in original scale (n_new × n_y).
- ``Xhat`` (ndarray): Reconstructed X in original scale.
- ``speX`` (ndarray): X-space SPE for each new observation.
- ``T2`` (ndarray): Hotelling's T² for each new observation.
- ``Tcv`` (ndarray): CCA covariant scores (only if model has ``Wcv``).
"""
algorithm = 'p2mp'; force_deflation = False
if isinstance(Xnew, np.ndarray):
X_ = Xnew.copy()
if X_.ndim == 1:
X_ = X_.reshape(1, -1)
elif isinstance(Xnew, pd.DataFrame):
X_ = np.array(Xnew.values[:,1:]).astype(float)
elif isinstance(Xnew, dict):
data_ = []; names_ = []
for k in Xnew.keys():
data_.append(Xnew[k]); names_.append(k)
c = 0
for i, x in enumerate(data_):
x_ = x.values[:,1:].astype(float)
if c == 0:
X_ = x_.copy()
else:
X_ = np.hstack((X_, x_))
c += 1
var_t = plsobj.get('var_t', (plsobj['T'].T @ plsobj['T']) / plsobj['T'].shape[0])
X_nan_map = np.isnan(X_)
if not X_nan_map.any() and not force_deflation:
X_mcs = (X_ - plsobj['mx']) / plsobj['sx']
tnew = X_mcs @ plsobj['Ws']
yhat = tnew @ plsobj['Q'].T * plsobj['sy'] + plsobj['my']
xhat = tnew @ plsobj['P'].T * plsobj['sx'] + plsobj['mx']
htt2 = np.sum((tnew @ np.linalg.inv(var_t)) * tnew, axis=1)
speX = X_mcs - tnew @ plsobj['P'].T
speX = np.sum(speX**2, axis=1, keepdims=True)
ypred = {'Yhat':yhat, 'Xhat':xhat, 'Tnew':tnew, 'speX':speX, 'T2':htt2}
if 'Wcv' in plsobj:
ypred['Tcv'] = X_mcs @ plsobj['Wcv']
elif algorithm == 'p2mp':
not_Xmiss = (~X_nan_map) * 1
Xmcs = (X_ - plsobj['mx']) / plsobj['sx']
Xmcs, dummy = n2z(Xmcs)
for i in range(Xmcs.shape[0]):
row_map = not_Xmiss[[i], :]
tempW = plsobj['W'] * row_map.T
for a in range(plsobj['W'].shape[1]):
WTW = tempW[:,[a]].T @ tempW[:,[a]]
tnew_aux, resid, rank, s = np.linalg.lstsq(WTW, tempW[:,[a]].T @ Xmcs[[i],:].T, rcond=None)
Xmcs[[i], :] = (Xmcs[[i], :] - tnew_aux @ plsobj['P'][:,[a]].T) * row_map
if a == 0:
tnew_ = tnew_aux
else:
tnew_ = np.vstack((tnew_, tnew_aux))
if i == 0:
tnew = tnew_.T
else:
tnew = np.vstack((tnew, tnew_.T))
yhat = tnew @ plsobj['Q'].T * plsobj['sy'] + plsobj['my']
xhat = tnew @ plsobj['P'].T * plsobj['sx'] + plsobj['mx']
htt2 = np.sum((tnew @ np.linalg.inv(var_t)) * tnew, axis=1)
X_, dummy = n2z(X_)
speX = ((X_ - plsobj['mx']) / plsobj['sx']) - tnew @ plsobj['P'].T
speX = speX * not_Xmiss
speX = np.sum(speX**2, axis=1, keepdims=True)
ypred = {'Yhat':yhat, 'Xhat':xhat, 'Tnew':tnew, 'speX':speX, 'T2':htt2}
if 'Wcv' in plsobj:
ypred['Tcv'] = ((X_ - plsobj['mx']) / plsobj['sx']) @ plsobj['Wcv']
return ypred
[docs]
def spe(mvmobj, Xnew, *, Ynew=False):
"""Compute Squared Prediction Error (SPE / Q statistic).
Args:
mvmobj (dict): Fitted PCA or PLS model.
Xnew (pd.DataFrame or np.ndarray): New X observations.
Ynew (pd.DataFrame or np.ndarray): New Y observations (optional).
Only used for PLS models to also return Y-space SPE.
Returns:
ndarray or tuple:
- If ``Ynew`` is not provided (or model is PCA): returns
``speX`` (ndarray, shape n_obs × 1).
- If ``Ynew`` is provided and model is PLS: returns
``(speX, speY)`` tuple of arrays.
"""
if 'Q' in mvmobj:
xpred = pls_pred(Xnew, mvmobj)
else:
xpred = pca_pred(Xnew, mvmobj)
Tnew = xpred['Tnew']
if isinstance(Xnew, np.ndarray):
X_ = Xnew.copy()
elif isinstance(Xnew, pd.DataFrame):
X_ = np.array(Xnew.values[:,1:]).astype(float)
if isinstance(Ynew, np.ndarray):
Y_ = Ynew.copy()
elif isinstance(Ynew, pd.DataFrame):
Y_ = np.array(Ynew.values[:,1:]).astype(float)
Xnewhat = Tnew @ mvmobj['P'].T
Xres = (X_ - mvmobj['mx']) / mvmobj['sx'] - Xnewhat
spex_ = np.sum(Xres**2, axis=1, keepdims=True)
if not isinstance(Ynew, bool) and ('Q' in mvmobj):
Ynewhat = Tnew @ mvmobj['Q'].T
Yres = (Y_ - mvmobj['my']) / mvmobj['sy'] - Ynewhat
spey_ = np.sum(Yres**2, axis=1, keepdims=True)
return spex_, spey_
else:
return spex_
# =============================================================================
# LWPLS
# =============================================================================
[docs]
def lwpls(xnew, loc_par, mvmobj, X, Y, *, shush=False):
"""Locally Weighted PLS (LWPLS) prediction for a single new observation.
Per Kim et al. Int. J. Pharmaceutics 421 (2011) 269–274.
Args:
xnew (np.ndarray or pd.DataFrame): Single new observation (1 × n_x).
loc_par (float): Locality parameter controlling the width of the
Gaussian kernel. Larger values include more training observations.
mvmobj (dict): Global PLS model from :func:`pls`, used to define
the score space for distance calculation.
X (pd.DataFrame or np.ndarray): Training X data.
Y (pd.DataFrame or np.ndarray): Training Y data.
shush (bool): Suppress printed output. Default ``False``.
Returns:
dict: Prediction results with keys:
- ``Yhat`` (ndarray): Locally predicted Y (1 × n_y).
- ``weights`` (ndarray): Observation weights used in local model.
"""
if not shush:
print('phi.lwpls executed on: ' + str(datetime.datetime.now()))
xnew = np.reshape(xnew, (-1, 1))
if isinstance(X, pd.DataFrame):
X = np.array(X.values[:,1:]).astype(float)
if isinstance(Y, pd.DataFrame):
Y = np.array(Y.values[:,1:]).astype(float)
vip = np.sum(np.abs(mvmobj['Ws'] * mvmobj['r2y']), axis=1).reshape(-1, 1)
theta = vip
D = X - xnew.T
d2 = D * theta.T * D
d2 = np.sqrt(np.sum(d2, axis=1))
omega = np.exp(-d2 / (np.var(d2, ddof=1) * loc_par))
OMEGA = np.diag(omega)
omega = omega.reshape(-1, 1)
X_weighted_mean = (np.sum(omega * X, axis=0) / np.sum(omega)).reshape(-1, 1)
Y_weighted_mean = (np.sum(omega * Y, axis=0) / np.sum(omega)).reshape(-1, 1)
Xi = X - X_weighted_mean.T
Yi = Y - Y_weighted_mean.T
xnewi = xnew - X_weighted_mean
yhat = Y_weighted_mean
for a in range(mvmobj['T'].shape[1]):
[U_, S, Wh] = np.linalg.svd(Xi.T @ OMEGA @ Yi @ Yi.T @ OMEGA @ Xi)
w = Wh.T[:, [0]]
t = Xi @ w
p = Xi.T @ OMEGA @ t / (t.T @ OMEGA @ t)
q = Yi.T @ OMEGA @ t / (t.T @ OMEGA @ t)
tnew = xnewi.T @ w
yhat = yhat + q @ tnew
Xi = Xi - t @ p.T
Yi = Yi - t @ q.T
xnewi = xnewi - p @ tnew
return yhat[0].T
# =============================================================================
# Contributions
# =============================================================================
[docs]
def contributions(mvmobj, X, cont_type, *, Y=False, from_obs=False, to_obs=False, lv_space=False):
"""Compute variable contributions to monitoring statistics.
Args:
mvmobj (dict): Fitted PCA or PLS model.
Xnew (pd.DataFrame or np.ndarray): Observations to diagnose.
cont_type (str): Type of contribution to compute.
``'scores'``: contribution to each score.
``'spex'``: contribution to X-space SPE.
``'spey'``: contribution to Y-space SPE (PLS only).
``'t2'``: contribution to Hotelling's T².
Ynew (pd.DataFrame or np.ndarray): Y observations (optional,
required for ``cont_type='spey'``).
Returns:
ndarray: Contribution values (n_obs × n_vars).
Ref: Miller, P., Swanson, R.E. and Heckler, C.E., 1998. Contribution plots: a missing link
in multivariate quality control.
Applied mathematics and computer science, 8(4), pp.775-792.
"""
if isinstance(lv_space, bool):
lv_space = list(range(mvmobj['T'].shape[1]))
elif isinstance(lv_space, int):
lv_space = (np.array([lv_space]) - 1).tolist()
elif isinstance(lv_space, list):
lv_space = (np.array(lv_space) - 1).tolist()
if isinstance(to_obs, int):
to_obs = [to_obs]
if not isinstance(from_obs, bool):
if isinstance(from_obs, int):
from_obs = [from_obs]
if isinstance(X, np.ndarray):
X_ = X.copy()
elif isinstance(X, pd.DataFrame):
X_ = np.array(X.values[:,1:]).astype(float)
if not isinstance(Y, bool):
if isinstance(Y, np.ndarray):
Y_ = Y.copy()
elif isinstance(Y, pd.DataFrame):
Y_ = np.array(Y.values[:,1:]).astype(float)
if cont_type == 'ht2' or cont_type == 'scores':
X_ = (X_ - mvmobj['mx']) / mvmobj['sx']
X_, dummy = n2z(X_)
t_stdevs = np.std(mvmobj['T'], axis=0, ddof=1)
loadings = mvmobj['Ws'] if 'Q' in mvmobj else mvmobj['P']
to_obs_mean = np.mean(X_[to_obs, :], axis=0, keepdims=True)
to_cont = np.zeros((1, X_.shape[1]))
for a in lv_space:
aux_ = (to_obs_mean * np.abs(loadings[:, a].T)) / t_stdevs[a]
to_cont = to_cont + (aux_ if cont_type == 'scores' else aux_**2)
if not isinstance(from_obs, bool):
from_obs_mean = np.mean(X_[from_obs, :], axis=0, keepdims=1)
from_cont = np.zeros((1, X_.shape[1]))
for a in lv_space:
aux_ = (from_obs_mean * np.abs(loadings[:, a].T)) / t_stdevs[a]
from_cont = from_cont + (aux_ if cont_type == 'scores' else aux_**2)
return to_cont - from_cont
else:
return to_cont
elif cont_type == 'spe':
X_ = X_[to_obs, :]
if 'Q' in mvmobj:
pred = pls_pred(X_, mvmobj)
else:
pred = pca_pred(X_, mvmobj)
Xhat = pred['Xhat']
Xhatmcs = (Xhat - mvmobj['mx']) / mvmobj['sx']
X_ = (X_ - mvmobj['mx']) / mvmobj['sx']
Xerror = X_ - Xhatmcs
Xerror, dummy = n2z(Xerror)
contsX = (Xerror**2) * np.sign(Xerror)
contsX = np.mean(contsX, axis=0, keepdims=True)
if not isinstance(Y, bool):
Y_ = Y_[to_obs, :]
Yhat = pred['Yhat']
Yhatmcs = (Yhat - mvmobj['my']) / mvmobj['sy']
Y_ = (Y_ - mvmobj['my']) / mvmobj['sy']
Yerror = Y_ - Yhatmcs
Yerror, dummy = n2z(Yerror)
contsY = (Yerror**2) * np.sign(Yerror)
contsY = np.mean(contsY, axis=0, keepdims=True)
return contsX, contsY
else:
return contsX
# =============================================================================
# Data cleaning utilities
# =============================================================================
[docs]
def clean_empty_rows(X, *, shush=False):
"""Remove rows that are entirely NaN.
Args:
X (pd.DataFrame or np.ndarray): Input data matrix.
shush (bool): Suppress printed output. Default ``False``.
Returns:
pd.DataFrame or np.ndarray: Data with fully empty rows removed.
"""
if isinstance(X, np.ndarray):
X_ = X.copy()
ObsID_ = ['Obs #'+str(n) for n in np.arange(X.shape[0])+1]
elif isinstance(X, pd.DataFrame):
X_ = np.array(X.values[:,1:]).astype(float)
ObsID_ = X.values[:,0].astype(str).tolist()
X_nan_map = np.isnan(X_)
Xmiss = np.sum(X_nan_map * 1, axis=1)
indx = find(Xmiss, lambda x: x == X_.shape[1])
rows_rem = []
if len(indx) > 0:
for i in indx:
if not shush:
print('Removing row ', ObsID_[i], ' due to 100% missing data')
rows_rem.append(ObsID_[i])
if isinstance(X, pd.DataFrame):
X_ = X.drop(X.index.values[indx].tolist())
else:
X_ = np.delete(X_, indx, 0)
return X_, rows_rem
else:
return X, rows_rem
[docs]
def clean_low_variances(X, *, shush=False, min_var=1E-10):
"""Remove columns with variance below a threshold.
Args:
X (pd.DataFrame or np.ndarray): Input data matrix.
min_var (float): Minimum acceptable variance. Default ``1e-10``.
shush (bool): Suppress printed output. Default ``False``.
Returns:
pd.DataFrame or np.ndarray: Data with low-variance columns removed.
"""
cols_removed = []
if isinstance(X, pd.DataFrame):
X_ = np.array(X.values[:,1:]).astype(float)
varidX = X.columns.values[1:].tolist()
else:
X_ = X.copy()
varidX = ['Var #'+str(n) for n in np.arange(X.shape[1])+1]
X_nan_map = np.isnan(X_)
Xmiss = np.sum(X_nan_map * 1, axis=0)
indx = find(Xmiss, lambda x: x >= (X_.shape[0]-3))
if len(indx) > 0:
for i in indx:
if not shush:
print('Removing variable ', varidX[i], ' due to 100% missing data')
if isinstance(X, pd.DataFrame):
for i in indx:
cols_removed.append(varidX[i])
X_pd = X.drop(X.columns[np.array(indx)+1], axis=1)
X_ = np.array(X_pd.values[:,1:]).astype(float)
else:
for i in indx:
cols_removed.append(varidX[i])
X_ = np.delete(X_, indx, 1)
else:
X_pd = X.copy()
new_cols = X_pd.columns[1:].tolist()
std_x = std(X_).flatten()
indx = find(std_x, lambda x: x < min_var)
if len(indx) > 0:
for i in indx:
if not shush:
print('Removing variable ', new_cols[i], ' due to low variance')
if isinstance(X_pd, pd.DataFrame):
for i in indx:
cols_removed.append(new_cols[i])
X_ = X_pd.drop(X_pd.columns[np.array(indx)+1], axis=1)
else:
X_ = np.delete(X_, indx, 1)
for j in indx:
cols_removed.append(varidX[j])
return X_, cols_removed
else:
return X_pd, cols_removed
# =============================================================================
# Spectra preprocessing
# =============================================================================
[docs]
def spectra_snv(x):
"""Apply Standard Normal Variate (SNV) correction to spectra.
Each spectrum (row) is mean-centered and scaled by its own standard
deviation. Removes multiplicative scatter effects.
Args:
X (pd.DataFrame or np.ndarray): Spectra matrix (n_samples × n_wavelengths).
If a DataFrame, the first column must contain sample IDs.
Returns:
pd.DataFrame or np.ndarray: SNV-corrected spectra (same type as input).
"""
if isinstance(x, pd.DataFrame):
x_columns = x.columns
x_values = x.values
x_values[:,1:] = spectra_snv(x_values[:,1:].astype(float))
return pd.DataFrame(x_values, columns=x_columns)
else:
if x.ndim == 2:
mean_x = np.mean(x, axis=1, keepdims=1)
x = x - mean_x
std_x = np.sqrt(np.sum(x**2, axis=1) / (x.shape[1]-1)).reshape(-1, 1)
x = x / std_x
return x
else:
x = x - np.mean(x)
stdx = np.sqrt(np.sum(x**2) / (len(x)-1))
return x / stdx
[docs]
def spectra_savgol(ws, od, op, Dm):
"""Apply Savitzky-Golay smoothing and/or differentiation to spectra.
Args:
X (pd.DataFrame or np.ndarray): Spectra matrix (n_samples × n_wavelengths).
If a DataFrame, the first column must contain sample IDs.
window (int): Window length (must be odd and greater than ``poly``).
poly (int): Polynomial order for the filter.
deriv (int): Derivative order. ``0`` = smoothing only, ``1`` = first
derivative, ``2`` = second derivative.
Returns:
pd.DataFrame or np.ndarray: Filtered spectra (same type as input).
"""
if isinstance(Dm, pd.DataFrame):
x_columns = Dm.columns.tolist()
FirstElement = [x_columns[0]]
x_columns = x_columns[1:]
FirstElement.extend(x_columns[ws:-ws])
x_values = Dm.values
Col1 = Dm.values[:,0].tolist()
Col1 = np.reshape(Col1, (-1, 1))
aux, M = spectra_savgol(ws, od, op, x_values[:,1:].astype(float))
data_ = np.hstack((Col1, aux))
return pd.DataFrame(data=data_, columns=FirstElement), M
else:
l = Dm.shape[0] if Dm.ndim == 1 else Dm.shape[1]
x_vec = np.arange(-ws, ws+1).reshape(-1, 1)
X_sg = np.ones((2*ws+1, 1))
for oo in np.arange(1, op+1):
X_sg = np.hstack((X_sg, x_vec**oo))
try:
XtXiXt = np.linalg.inv(X_sg.T @ X_sg) @ X_sg.T
except:
XtXiXt = np.linalg.pinv(X_sg.T @ X_sg) @ X_sg.T
coeffs = (XtXiXt[od, :] * factorial(od)).reshape(1, -1)
for i in np.arange(1, l-2*ws+1):
if i == 1:
M = np.hstack((coeffs, np.zeros((1, l-2*ws-1))))
elif i < l-2*ws:
m_ = np.hstack((np.zeros((1, i-1)), coeffs, np.zeros((1, l-2*ws-1-i+1))))
M = np.vstack((M, m_))
else:
m_ = np.hstack((np.zeros((1, l-2*ws-1)), coeffs))
M = np.vstack((M, m_))
if Dm.ndim == 1:
Dm_sg = M @ Dm
else:
Dm_sg = Dm @ M.T
return Dm_sg, M
[docs]
def spectra_mean_center(Dm):
"""Mean-center each wavelength across the sample set.
Args:
X (pd.DataFrame or np.ndarray): Spectra matrix (n_samples × n_wavelengths).
Returns:
pd.DataFrame or np.ndarray: Mean-centered spectra.
"""
if isinstance(Dm, pd.DataFrame):
Dm_columns = Dm.columns
Dm_values = Dm.values
Dm_values[:,1:] = spectra_mean_center(Dm_values[:,1:].astype(float))
return pd.DataFrame(Dm_values, columns=Dm_columns)
else:
if Dm.ndim == 2:
return Dm - Dm.mean(axis=1)[:, None]
else:
return Dm - Dm.mean()
[docs]
def spectra_autoscale(Dm):
"""Autoscale spectra (mean-center and scale each wavelength to unit variance).
Args:
X (pd.DataFrame or np.ndarray): Spectra matrix (n_samples × n_wavelengths).
Returns:
pd.DataFrame or np.ndarray: Autoscaled spectra.
"""
if isinstance(Dm, pd.DataFrame):
Dm_columns = Dm.columns
Dm_values = Dm.values
Dm_values[:,1:] = spectra_autoscale(Dm_values[:,1:].astype(float))
return pd.DataFrame(Dm_values, columns=Dm_columns)
else:
if Dm.ndim == 2:
return Dm / Dm.std(axis=1, ddof=1)[:, None]
else:
return Dm / Dm.std(ddof=1)
[docs]
def spectra_baseline_correction(Dm):
"""Apply piecewise linear baseline correction to spectra.
Args:
X (pd.DataFrame or np.ndarray): Spectra matrix (n_samples × n_wavelengths).
If a DataFrame, the first column must contain sample IDs.
anchor_points (list of int): Column indices to use as baseline anchor
points for the piecewise linear interpolation.
Returns:
pd.DataFrame or np.ndarray: Baseline-corrected spectra.
"""
if isinstance(Dm, pd.DataFrame):
Dm_columns = Dm.columns
Dm_values = Dm.values
Dm_values[:,1:] = spectra_baseline_correction(Dm_values[:,1:].astype(float))
return pd.DataFrame(Dm_values, columns=Dm_columns)
else:
if Dm.ndim == 2:
return Dm - Dm.min(axis=1)[:, None]
else:
return Dm - Dm.min()
[docs]
def spectra_msc(Dm, reference_spectra=None):
"""Apply Multiplicative Scatter Correction (MSC) to spectra.
Args:
X (pd.DataFrame or np.ndarray): Spectra matrix (n_samples × n_wavelengths).
If a DataFrame, the first column must contain sample IDs.
reference (np.ndarray): Reference spectrum to correct against.
Defaults to the mean spectrum of ``X``.
Returns:
pd.DataFrame or np.ndarray: MSC-corrected spectra (same type as input).
"""
if isinstance(Dm, pd.DataFrame):
Dm_columns = Dm.columns
Dm_values = Dm.values
Dm_values[:,1:] = spectra_msc(Dm_values[:,1:].astype(float))
return pd.DataFrame(Dm_values, columns=Dm_columns)
else:
if Dm.ndim == 2:
if reference_spectra is None:
reference_spectra = Dm.mean(axis=0)
V = np.vstack([np.ones(reference_spectra.shape), reference_spectra])
U = Dm @ V.T @ np.linalg.inv(V @ V.T)
return (Dm - U[:,0, None]) / U[:,1, None]
else:
if reference_spectra is None:
raise ValueError("msc needs a reference spectra or to be able to use the mean of many spectra")
V = np.vstack([np.ones(reference_spectra.shape), reference_spectra])
U = Dm @ V.T @ np.linalg.inv(V @ V.T)
return (Dm - U[0]) / U[1]
# =============================================================================
# Bootstrap PLS
# =============================================================================
[docs]
def bootstrap_pls(X, Y, num_latents, num_samples, **kwargs):
"""Estimate PLS loading uncertainty via bootstrap resampling.
Args:
X (pd.DataFrame or np.ndarray): Training X data.
Y (pd.DataFrame or np.ndarray): Training Y data.
A (int): Number of latent variables.
n_boots (int): Number of bootstrap iterations.
mcs (tuple): Preprocessing flags. Default ``('autoscale', 'autoscale')``.
shush (bool): Suppress per-iteration output. Default ``True``.
Returns:
dict: Bootstrap results with keys:
- ``W_boot`` (ndarray): Bootstrap distribution of W (n_boots × n_x × A).
- ``Q_boot`` (ndarray): Bootstrap distribution of Q (n_boots × n_y × A).
- ``W_mean``, ``W_std``: Mean and std of bootstrap W.
- ``Q_mean``, ``Q_std``: Mean and std of bootstrap Q.
"""
if isinstance(X, pd.DataFrame):
Dm_values = X.values[:,1:].astype(float)
if isinstance(Y, pd.DataFrame):
Y = Y.values[:,1:].astype(float)
boot_pls_obj = []
for _ in range(num_samples):
sample_indexes = np.random.randint(Dm_values.shape[0], None, Dm_values.shape[0])
boot_pls_obj.append(pls(Dm_values[sample_indexes], Y[sample_indexes], num_latents, shush=True, **kwargs))
return boot_pls_obj
[docs]
def bootstrap_pls_pred(X_new, bootstrap_pls_obj, quantiles=[0.025, 0.975]):
"""Predict Y with uncertainty estimates using a bootstrap PLS ensemble.
Args:
Xnew (pd.DataFrame or np.ndarray): New X observations to predict.
boot_obj (dict): Bootstrap model from :func:`bootstrap_pls`.
alpha (float): Confidence level for prediction intervals. Default ``0.95``.
Returns:
dict: Prediction results with keys:
- ``Yhat`` (ndarray): Mean predicted Y (n_new × n_y).
- ``Yhat_lb`` (ndarray): Lower bound of prediction interval.
- ``Yhat_ub`` (ndarray): Upper bound of prediction interval.
- ``Yhat_std`` (ndarray): Std dev of bootstrap predictions.
"""
for q in quantiles:
if q >= 1 or q <= 0:
raise ValueError("Quantiles must be between zero and one")
means = []; sds = []
for pls_obj in bootstrap_pls_obj:
means.append(pls_pred(X_new, pls_obj)["Yhat"])
sds.append(np.sqrt(pls_obj["speY"].mean()))
means = np.array(means).squeeze()
sds = np.array(sds)
dist = norm(means, sds[:, None])
ppf = []
for q in quantiles:
def cdf(x):
return dist.cdf(x).mean(axis=0) - np.ones_like(x)*q
ppf.append(fsolve(cdf, means.mean(axis=0)))
return ppf
# =============================================================================
# Pyomo utilities
# =============================================================================
[docs]
def np2D2pyomo(arr, *, varids=False):
"""Convert a 2D NumPy array to a Pyomo-compatible dictionary.
Args:
data (np.ndarray): 2D array to convert.
Returns:
dict: Dictionary keyed by ``(i, j)`` integer index tuples.
"""
if not varids:
return dict(((i+1, j+1), arr[i][j]) for i in range(arr.shape[0]) for j in range(arr.shape[1]))
else:
return dict(((varids[i], j+1), arr[i][j]) for i in range(arr.shape[0]) for j in range(arr.shape[1]))
[docs]
def np1D2pyomo(arr, *, indexes=False):
"""Convert a 1D NumPy array to a Pyomo-compatible dictionary.
Args:
data (np.ndarray): 1D array to convert.
Returns:
dict: Dictionary keyed by integer index.
"""
if arr.ndim == 2:
arr = arr[0]
if isinstance(indexes, bool):
return dict(((j+1), arr[j]) for j in range(len(arr)))
elif isinstance(indexes, list):
return dict((indexes[j], arr[j]) for j in range(len(arr)))
[docs]
def adapt_pls_4_pyomo(plsobj, *, use_var_ids=False):
"""Convert PLS model arrays to Pyomo-compatible dictionaries.
Transforms ``P``, ``Q``, ``W``, ``Ws``, ``mx``, ``sx``, ``my``, ``sy``
into the indexed dict format required by Pyomo ``Param`` objects.
Args:
plsobj (dict): Fitted PLS model from :func:`pls`.
Returns:
dict: Model parameters as Pyomo-indexed dictionaries.
"""
plsobj_ = plsobj.copy()
A = plsobj['T'].shape[1]; N = plsobj['P'].shape[0]; M = plsobj['Q'].shape[0]
pyo_A = list(np.arange(1, A+1))
if not use_var_ids:
pyo_N = list(np.arange(1, N+1)); pyo_M = list(np.arange(1, M+1))
pyo_Ws = np2D2pyomo(plsobj['Ws']); pyo_Q = np2D2pyomo(plsobj['Q']); pyo_P = np2D2pyomo(plsobj['P'])
var_t = np.var(plsobj['T'], axis=0)
pyo_var_t = np1D2pyomo(var_t)
pyo_mx = np1D2pyomo(plsobj['mx']); pyo_sx = np1D2pyomo(plsobj['sx'])
pyo_my = np1D2pyomo(plsobj['my']); pyo_sy = np1D2pyomo(plsobj['sy'])
else:
pyo_N = plsobj['varidX']; pyo_M = plsobj['varidY']
pyo_Ws = np2D2pyomo(plsobj['Ws'], varids=plsobj['varidX'])
pyo_Q = np2D2pyomo(plsobj['Q'], varids=plsobj['varidY'])
pyo_P = np2D2pyomo(plsobj['P'], varids=plsobj['varidX'])
var_t = np.var(plsobj['T'], axis=0)
pyo_var_t = np1D2pyomo(var_t)
pyo_mx = np1D2pyomo(plsobj['mx'], indexes=plsobj['varidX'])
pyo_sx = np1D2pyomo(plsobj['sx'], indexes=plsobj['varidX'])
pyo_my = np1D2pyomo(plsobj['my'], indexes=plsobj['varidY'])
pyo_sy = np1D2pyomo(plsobj['sy'], indexes=plsobj['varidY'])
plsobj_.update({'pyo_A':pyo_A, 'pyo_N':pyo_N, 'pyo_M':pyo_M, 'pyo_Ws':pyo_Ws, 'pyo_Q':pyo_Q,
'pyo_P':pyo_P, 'pyo_var_t':pyo_var_t, 'pyo_mx':pyo_mx, 'pyo_sx':pyo_sx,
'pyo_my':pyo_my, 'pyo_sy':pyo_sy, 'speX_lim95':plsobj['speX_lim95']})
return plsobj_
[docs]
def prep_pca_4_MDbyNLP(pcaobj, X):
"""Prepare a PCA model for missing-data imputation by NLP.
Extracts and formats the loadings and preprocessing parameters needed
to set up a Pyomo optimization problem for MD imputation.
Args:
pcaobj (dict): Fitted PCA model from :func:`pca`.
Returns:
dict: Parameters formatted for use in a Pyomo MD-by-NLP formulation.
"""
pcaobj_ = pcaobj.copy()
X_nan_map = np.isnan(X)
psi = (~X_nan_map) * 1
X, dummy = n2z(X)
A = pcaobj['T'].shape[1]; O = pcaobj['T'].shape[0]; N = pcaobj['P'].shape[0]
pyo_A = list(np.arange(1, A+1)); pyo_N = list(np.arange(1, N+1)); pyo_O = list(np.arange(1, O+1))
pcaobj_.update({'pyo_A':pyo_A, 'pyo_N':pyo_N, 'pyo_O':pyo_O,
'pyo_P_init':np2D2pyomo(pcaobj['P']), 'pyo_T_init':np2D2pyomo(pcaobj['T']),
'pyo_X':np2D2pyomo(X), 'pyo_psi':np2D2pyomo(psi)})
return pcaobj_
[docs]
def prep_pls_4_MDbyNLP(plsobj, X, Y):
"""Prepare a PLS model for missing-data imputation by NLP.
Args:
plsobj (dict): Fitted PLS model from :func:`pls`.
Returns:
dict: Parameters formatted for use in a Pyomo MD-by-NLP formulation.
"""
plsobj_ = plsobj.copy()
X_nan_map = np.isnan(X); psi = (~X_nan_map)*1; X, dummy = n2z(X)
Y_nan_map = np.isnan(Y); theta = (~Y_nan_map)*1; Y, dummy = n2z(Y)
A = plsobj['T'].shape[1]; O = plsobj['T'].shape[0]
N = plsobj['P'].shape[0]; M = plsobj['Q'].shape[0]
pyo_A = list(np.arange(1,A+1)); pyo_N = list(np.arange(1,N+1))
pyo_O = list(np.arange(1,O+1)); pyo_M = list(np.arange(1,M+1))
plsobj_.update({'pyo_A':pyo_A, 'pyo_N':pyo_N, 'pyo_O':pyo_O, 'pyo_M':pyo_M,
'pyo_P_init':np2D2pyomo(plsobj['P']), 'pyo_Ws_init':np2D2pyomo(plsobj['Ws']),
'pyo_T_init':np2D2pyomo(plsobj['T']), 'pyo_Q_init':np2D2pyomo(plsobj['Q']),
'pyo_X':np2D2pyomo(X), 'pyo_psi':np2D2pyomo(psi),
'pyo_Y':np2D2pyomo(Y), 'pyo_theta':np2D2pyomo(theta)})
return plsobj_
[docs]
def conv_pls_2_eiot(plsobj, *, r_length=False):
"""Convert a PLS model for use in EIOT (Extended Iterative Optimization Technology).
Args:
plsobj (dict): Fitted PLS model from :func:`pls`.
r2y_threshold (float): Minimum cumulative R²Y to determine the number
of LVs to retain. Default ``0.95``.
Returns:
dict: EIOT-compatible model parameters.
"""
plsobj_ = plsobj.copy()
A = plsobj['T'].shape[1]; N = plsobj['P'].shape[0]; M = plsobj['Q'].shape[0]
pyo_A = list(np.arange(1,A+1)); pyo_N = list(np.arange(1,N+1)); pyo_M = list(np.arange(1,M+1))
pyo_Ws = np2D2pyomo(plsobj['Ws']); pyo_Q = np2D2pyomo(plsobj['Q']); pyo_P = np2D2pyomo(plsobj['P'])
var_t = np.var(plsobj['T'], axis=0)
pyo_var_t = np1D2pyomo(var_t)
pyo_mx = np1D2pyomo(plsobj['mx']); pyo_sx = np1D2pyomo(plsobj['sx'])
pyo_my = np1D2pyomo(plsobj['my']); pyo_sy = np1D2pyomo(plsobj['sy'])
if not isinstance(r_length, bool):
if r_length < N:
indx_r = list(np.arange(1, r_length+1)); indx_rk_eq = list(np.arange(r_length+1, N+1))
else:
if r_length > N:
print('r_length >> N !!'); print('Forcing r_length=N')
indx_r = pyo_N; indx_rk_eq = 0
else:
indx_r = pyo_N; indx_rk_eq = 0
plsobj_.update({'pyo_A':pyo_A, 'pyo_N':pyo_N, 'pyo_M':pyo_M, 'pyo_Ws':pyo_Ws, 'pyo_Q':pyo_Q,
'pyo_P':pyo_P, 'pyo_var_t':pyo_var_t, 'indx_r':indx_r, 'indx_rk_eq':indx_rk_eq,
'pyo_mx':pyo_mx, 'pyo_sx':pyo_sx, 'pyo_my':pyo_my, 'pyo_sy':pyo_sy,
'S_I':np.nan, 'pyo_S_I':np.nan, 'var_t':var_t})
return plsobj_
# =============================================================================
# Categorical, Multi-block PLS, Replicate data, gPROMS export
# =============================================================================
[docs]
def cat_2_matrix(X):
"""Convert a categorical variable column to a binary indicator matrix.
Args:
x (pd.DataFrame): Data frame with columns of categorical data
First column is the variable ID.
shush (bool): Suppress printed output. Default ``False``.
Returns:
x_binary (pd.DataFrame): Binary indicator matrix with one column
per unique category (same type as input), all categories concatenated
xmb (pd.DataFrame): Binary indicator matrix with one column
per unique category (same type as input) categories organized by block
for multi-block models (if DataFrame has multiple columns)
"""
FirstOne = True; Xmat = []; Xcat = []; XmatMB = []; blknames = []
for x in X:
if not FirstOne:
blknames.append(x)
categories = np.unique(X[x])
Xmat_ = []
for c in categories:
Xcat.append(c)
xmat_ = (X[x] == c) * 1
Xmat.append(xmat_)
Xmat_.append(xmat_)
Xmat_ = pd.DataFrame(np.array(Xmat_).T, columns=categories)
Xmat_.insert(0, firstcol, X[firstcol])
XmatMB.append(Xmat_)
else:
firstcol = x; FirstOne = False
Xmat = pd.DataFrame(np.array(Xmat).T, columns=Xcat)
Xmat.insert(0, firstcol, X[firstcol].values)
aux_dict = dict()
for bname, bdata in zip(blknames, XmatMB):
aux_dict[bname] = bdata
return Xmat, aux_dict
[docs]
def mbpls(XMB, YMB, A, *, mcsX=True, mcsY=True, md_algorithm_='nipals',
force_nipals_=False, shush_=False, cross_val_=0, cross_val_X_=False, cca=False):
"""Fit a Multi-Block PLS (MBPLS) model.
Args:
Xmb (dict): Dictionary of X blocks ``{'block_name': pd.DataFrame}``.
Each DataFrame's first column must contain observation IDs.
Y (pd.DataFrame or np.ndarray): Response matrix. First column is
observation IDs if a DataFrame.
A (int): Number of latent variables.
mcs (tuple): Preprocessing flags ``(mcs_X, mcs_Y)``. Default
``('autoscale', 'autoscale')``.
shush (bool): Suppress printed output. Default ``False``.
cross_val (int): Cross-validation level (same as :func:`pls`).
cross_val_X (bool): Cross-validate X-space. Default ``False``.
Returns:
dict: Fitted MBPLS model, extending the standard PLS model dict
with per-block keys:
- ``T`` (ndarray): Super-scores.
- ``Tb`` (dict): Per-block scores keyed by block name.
- ``Pb`` (dict): Per-block loadings.
- ``Wb`` (dict): Per-block weights.
- ``r2xb`` (dict): Per-block R² contributions.
- ``block_importance`` (ndarray): Variance importance per block.
Plus all standard PLS keys (``Q``, ``r2y``, ``speX``, etc.).
"""
x_means=[]; x_stds=[]; y_means=[]; y_stds=[]
Xblk_scales=[]; Yblk_scales=[]; Xcols_per_block=[]; Ycols_per_block=[]
X_var_names=[]; Y_var_names=[]; obsids=[]
if isinstance(XMB, dict):
data_ = []; names_ = []
for k in XMB.keys():
data_.append(XMB[k]); names_.append(k)
XMB = {'data':data_, 'blknames':names_}
x = XMB['data'][0]
columns = x.columns.tolist()
obsid_column_name = columns[0]
obsids = x[obsid_column_name].tolist()
c = 0
for x in XMB['data']:
x_ = x.values[:,1:].astype(float)
columns = x.columns.tolist()
for i, h in enumerate(columns):
if i != 0:
X_var_names.append(XMB['blknames'][c]+' '+h)
if isinstance(mcsX, bool):
if mcsX:
x_, x_mean_, x_std_ = meancenterscale(x_)
else:
x_mean_ = np.zeros((1, x_.shape[1])); x_std_ = np.ones((1, x_.shape[1]))
elif mcsX[c] == 'center':
x_, x_mean_, x_std_ = meancenterscale(x_, mcs='center')
elif mcsX[c] == 'autoscale':
x_, x_mean_, x_std_ = meancenterscale(x_, mcs='autoscale')
blck_scale = np.sqrt(np.sum(std(x_)**2))
x_means.append(x_mean_); x_stds.append(x_std_)
Xblk_scales.append(blck_scale); Xcols_per_block.append(x_.shape[1])
x_ = x_ / blck_scale
if c == 0: X_ = x_.copy()
else: X_ = np.hstack((X_, x_))
c += 1
elif isinstance(XMB, pd.DataFrame):
columns = XMB.columns.tolist()
obsid_column_name = columns[0]; obsids = XMB[obsid_column_name].tolist()
for i, h in enumerate(columns):
if i != 0: X_var_names.append(h)
x_ = XMB.values[:,1:].astype(float)
if isinstance(mcsX, bool):
if mcsX: x_, x_mean_, x_std_ = meancenterscale(x_)
else: x_mean_ = np.zeros((1, x_.shape[1])); x_std_ = np.ones((1, x_.shape[1]))
elif mcsX[0] == 'center':
x_, x_mean_, x_std_ = meancenterscale(x_, mcs='center')
elif mcsX[0] == 'autoscale':
x_, x_mean_, x_std_ = meancenterscale(x_, mcs='autoscale')
blck_scale = np.sqrt(np.sum(std(x_)**2))
x_means.append(x_mean_); x_stds.append(x_std_)
Xblk_scales.append(blck_scale); Xcols_per_block.append(x_.shape[1])
X_ = x_ / blck_scale
if isinstance(YMB, dict):
data_ = []; names_ = []
for k in YMB.keys():
data_.append(YMB[k]); names_.append(k)
YMB = {'data':data_, 'blknames':names_}
c = 0
for y in YMB['data']:
y_ = y.values[:,1:].astype(float)
columns = y.columns.tolist()
for i, h in enumerate(columns):
if i != 0: Y_var_names.append(h)
if isinstance(mcsY, bool):
if mcsY: y_, y_mean_, y_std_ = meancenterscale(y_)
else: y_mean_ = np.zeros((1, y_.shape[1])); y_std_ = np.ones((1, y_.shape[1]))
elif mcsY[c] == 'center':
y_, y_mean_, y_std_ = meancenterscale(y_, mcs='center')
elif mcsY[c] == 'autoscale':
y_, y_mean_, y_std_ = meancenterscale(y_, mcs='autoscale')
blck_scale = np.sqrt(np.sum(std(y_)**2))
y_means.append(y_mean_); y_stds.append(y_std_)
Yblk_scales.append(blck_scale); Ycols_per_block.append(y_.shape[1])
y_ = y_ / blck_scale
if c == 0: Y_ = y_.copy()
else: Y_ = np.hstack((Y_, y_))
c += 1
elif isinstance(YMB, pd.DataFrame):
y_ = YMB.values[:,1:].astype(float)
columns = YMB.columns.tolist()
for i, h in enumerate(columns):
if i != 0: Y_var_names.append(h)
if isinstance(mcsY, bool):
if mcsY: y_, y_mean_, y_std_ = meancenterscale(y_)
else: y_mean_ = np.zeros((1, y_.shape[1])); y_std_ = np.ones((1, y_.shape[1]))
elif mcsY[0] == 'center':
y_, y_mean_, y_std_ = meancenterscale(y_, mcs='center')
elif mcsY[0] == 'autoscale':
y_, y_mean_, y_std_ = meancenterscale(y_, mcs='autoscale')
blck_scale = np.sqrt(np.sum(std(y_)**2))
y_means.append(y_mean_); y_stds.append(y_std_)
Yblk_scales.append(blck_scale); Ycols_per_block.append(y_.shape[1])
Y_ = y_ / blck_scale
X_pd = pd.DataFrame(X_, columns=X_var_names)
X_pd.insert(0, obsid_column_name, obsids)
Y_pd = pd.DataFrame(Y_, columns=Y_var_names)
Y_pd.insert(0, obsid_column_name, obsids)
pls_obj_ = pls(X_pd, Y_pd, A, mcsX=False, mcsY=False, md_algorithm=md_algorithm_,
force_nipals=force_nipals_, shush=shush_, cross_val=cross_val_, cross_val_X=cross_val_X_, cca=cca)
pls_obj_['type'] = 'mbpls'
Wsb=[]; Wb=[]; Tb=[]
for i, c in enumerate(Xcols_per_block):
start_index = 0 if i == 0 else np.sum(Xcols_per_block[0:i])
end_index = start_index + c
wsb_ = pls_obj_['Ws'][start_index:end_index, :].copy()
for j in range(wsb_.shape[1]):
wsb_[:, j] = wsb_[:, j] / np.linalg.norm(wsb_[:, j])
Wsb.append(wsb_)
wb_ = pls_obj_['W'][start_index:end_index, :].copy()
for j in range(wb_.shape[1]):
wb_[:, j] = wb_[:, j] / np.linalg.norm(wb_[:, j])
Wb.append(wb_)
Xb = X_[:, start_index:end_index]
X_nan_map = np.isnan(Xb)
not_Xmiss = (~X_nan_map) * 1
Xb, dummy = n2z(Xb)
TSS = np.sum(Xb**2)
for a in range(A):
w_ = wb_[:, [a]]
w_t = w_.T * not_Xmiss
w_t = np.sum(w_t**2, axis=1, keepdims=True)
tb_ = (Xb @ w_) / w_t
if a == 0: tb = tb_
else: tb = np.hstack((tb, tb_))
tb_t = tb_.T * not_Xmiss.T
tb_t = np.sum(tb_t**2, axis=1, keepdims=True)
try: pb_ = (Xb.T @ tb_) / tb_t
except: pb_ = 0
Xb = (Xb - tb_ @ pb_.T) * not_Xmiss
r2pb_aux = 1 - (np.sum(Xb**2)/TSS)
if a == 0: r2pb_ = r2pb_aux
else: r2pb_ = np.hstack((r2pb_, r2pb_aux))
if i == 0: r2pbX = r2pb_
else: r2pbX = np.vstack((r2pbX, r2pb_))
Tb.append(tb)
for a in range(A):
u = pls_obj_['U'][:, [a]].copy()
for i, c in enumerate(Xcols_per_block):
if i == 0: T_a = Tb[i][:, [a]]
else: T_a = np.hstack((T_a, Tb[i][:, [a]]))
wt_ = (T_a.T @ u) / (u.T @ u)
if a == 0: Wt = wt_
else: Wt = np.hstack((Wt, wt_))
pls_obj_['x_means'] = x_means; pls_obj_['x_stds'] = x_stds
pls_obj_['y_means'] = y_means; pls_obj_['y_stds'] = y_stds
pls_obj_['Xblk_scales'] = Xblk_scales; pls_obj_['Yblk_scales'] = Yblk_scales
pls_obj_['Wsb'] = Wsb; pls_obj_['Wt'] = Wt
mx_ = [j for l in x_means for j in l[0]]
pls_obj_['mx'] = np.array(mx_)
sx_ = [j * Xblk_scales[i] for i, l in enumerate(x_stds) for j in l[0]]
pls_obj_['sx'] = np.array(sx_)
my_ = [j for l in y_means for j in l[0]]
pls_obj_['my'] = np.array(my_)
sy_ = [j * Yblk_scales[i] for i, l in enumerate(y_stds) for j in l[0]]
pls_obj_['sy'] = np.array(sy_)
if isinstance(XMB, dict):
for a in range(A-1, 0, -1):
r2pbX[:, [a]] = r2pbX[:, [a]] - r2pbX[:, [a-1]]
r2pbXc = np.cumsum(r2pbX, axis=1)
else:
for a in range(A-1, 0, -1):
r2pbX[a] = r2pbX[a] - r2pbX[a-1]
r2pbXc = np.cumsum(r2pbX)
pls_obj_['r2pbX'] = r2pbX; pls_obj_['r2pbXc'] = r2pbXc
if isinstance(XMB, dict): pls_obj_['Xblocknames'] = XMB['blknames']
if isinstance(YMB, dict): pls_obj_['Yblocknames'] = YMB['blknames']
return pls_obj_
[docs]
def replicate_data(mvm_obj, X, num_replicates, *, as_set=False, rep_Y=False, Y=False):
"""Augment a dataset by adding small noise replicates.
Useful for regularizing models when training data is limited.
Args:
X (pd.DataFrame or np.ndarray): Original data matrix.
n_reps (int): Number of noisy replicates to add. Default ``2``.
noise_level (float): Standard deviation of additive Gaussian noise
relative to each variable's std dev. Default ``0.01``.
Returns:
pd.DataFrame or np.ndarray: Augmented matrix with original +
replicated rows (same type as input).
"""
if not rep_Y:
if 'Q' in mvm_obj:
preds = pls_pred(X, mvm_obj); xhat = preds['Xhat']; tnew = preds['Tnew']
else:
preds = pca_pred(X, mvm_obj); xhat = preds['Xhat']; tnew = preds['Tnew']
xhat = tnew @ mvm_obj['P'].T
xmcs = (X.values[:,1:] - mvm_obj['mx']) / mvm_obj['sx']
data_residuals = xmcs - xhat
if not as_set:
if np.mod(num_replicates, X.shape[0]) == 0:
new_set = np.tile(xhat, (int(num_replicates/X.shape[0]), 1))
else:
reps = int(np.floor(num_replicates/X.shape[0]))
new_set = np.tile(xhat, (reps, 1))
new_set = np.vstack((new_set, xhat[:np.mod(num_replicates, X.shape[0]), :]))
obsids = ['clone'+str(i) for i in np.arange(new_set.shape[0])+1]
else:
new_set = np.tile(xhat, (num_replicates, 1))
obsids = []
obsid_o = X[X.columns[0]].values.astype(str).tolist()
for i in np.arange(num_replicates)+1:
obsids.extend([m+'_clone'+str(i) for m in obsid_o])
uncertainty_matrix = np.empty((new_set.shape[0], 0))
for i in range(data_residuals.shape[1]):
ecdf = ECDF(data_residuals[:, i])
new_residual = np.random.uniform(0, 1, new_set.shape[0])
y = np.array(ecdf.y.tolist()); x = np.array(ecdf.x.tolist())
new_residual = np.interp(new_residual, y[1:], x[1:])
uncertainty_matrix = np.hstack((uncertainty_matrix, new_residual.reshape(-1, 1)))
new_set = (new_set + uncertainty_matrix) * mvm_obj['sx'] + mvm_obj['mx']
new_set_pd = pd.DataFrame(new_set, columns=X.columns[1:].tolist())
new_set_pd.insert(0, X.columns[0], obsids)
return new_set_pd
elif 'Q' in mvm_obj:
preds = pls_pred(X, mvm_obj); yhat = preds['Yhat']; tnew = preds['Tnew']
yhat = tnew @ mvm_obj['Q'].T
ymcs = (Y.values[:,1:] - mvm_obj['my']) / mvm_obj['sy']
data_residuals = ymcs - yhat
if not as_set:
if np.mod(num_replicates, Y.shape[0]) == 0:
new_set = np.tile(yhat, (int(num_replicates/Y.shape[0]), 1))
else:
reps = int(np.floor(num_replicates/Y.shape[0]))
new_set = np.tile(yhat, (reps, 1))
new_set = np.vstack((new_set, yhat[:np.mod(num_replicates, Y.shape[0]), :]))
obsids = ['clone'+str(i) for i in np.arange(new_set.shape[0])+1]
else:
new_set = np.tile(yhat, (num_replicates, 1))
obsids = []
obsid_o = Y[Y.columns[0]].values.astype(str).tolist()
for i in np.arange(num_replicates)+1:
obsids.extend([m+'_clone'+str(i) for m in obsid_o])
uncertainty_matrix = np.empty((new_set.shape[0], 0))
for i in range(data_residuals.shape[1]):
ecdf = ECDF(data_residuals[:, i])
new_residual = np.random.uniform(0, 1, new_set.shape[0])
y = np.array(ecdf.y.tolist()); x = np.array(ecdf.x.tolist())
new_residual = np.interp(new_residual, y[1:], x[1:])
uncertainty_matrix = np.hstack((uncertainty_matrix, new_residual.reshape(-1, 1)))
new_set = (new_set + uncertainty_matrix) * mvm_obj['sy'] + mvm_obj['my']
new_set_pd = pd.DataFrame(new_set, columns=Y.columns[1:].tolist())
new_set_pd.insert(0, Y.columns[0], obsids)
return new_set_pd
else:
print('You need a PLS model to replicate Y')
[docs]
def export_2_gproms(mvmobj, *, fname='phi_export.txt'):
'''Export PLS model to gPROMS syntax.'''
top_lines = [
'PARAMETER', 'X_VARS AS ORDERED_SET', 'Y_VARS AS ORDERED_SET', 'A AS INTEGER',
'VARIABLE',
'X_MEANS as ARRAY(X_VARS) OF no_type', 'X_STD AS ARRAY(X_VARS) OF no_type',
'Y_MEANS as ARRAY(Y_VARS) OF no_type', 'Y_STD AS ARRAY(Y_VARS) OF no_type',
'Ws AS ARRAY(X_VARS,A) OF no_type', 'Q AS ARRAY(Y_VARS,A) OF no_type',
'P AS ARRAY(X_VARS,A) OF no_type', 'T AS ARRAY(A) OF no_type',
'Tvar AS ARRAY(A) OF no_type',
'X_HAT AS ARRAY(X_VARS) OF no_type # Mean-centered and scaled',
'Y_HAT AS ARRAY(Y_VARS) OF no_type # Mean-centered and scaled',
'X_PRED AS ARRAY(X_VARS) OF no_type # In original units',
'Y_PRED AS ARRAY(Y_VARS) OF no_type # In original units',
'X_NEW AS ARRAY(X_VARS) OF no_type # In original units',
'X_MCS AS ARRAY(X_VARS) OF no_type # Mean-centered and scaled',
'HT2 AS no_type', 'SPEX AS no_type', 'SET']
x_var_line = "X_VARS:=['" + "','".join(mvmobj['varidX']) + "'];"
y_var_line = "Y_VARS:=['" + "','".join(mvmobj['varidY']) + "'];"
top_lines.extend([x_var_line, y_var_line, 'A:='+str(mvmobj['T'].shape[1])+';'])
mid_lines = [
'EQUATION', 'X_MCS * X_STD = (X_NEW-X_MEANS);',
'FOR j:=1 TO A DO', 'T(j) = SIGMA(X_MCS*Ws(,j));', 'END',
'FOR i IN Y_VARS DO', 'Y_HAT(i) = SIGMA(T*Q(i,));', 'END',
'FOR i IN X_VARS DO', 'X_HAT(i) = SIGMA(T*P(i,));', 'END',
'(X_HAT * X_STD) + X_MEANS = X_PRED;', '(Y_HAT * Y_STD) + Y_MEANS = Y_PRED;',
'HT2 = SIGMA ((T^2)/Tvar);', 'SPEX = SIGMA ((X_MCS - X_HAT)^2);']
assign_lines = ['ASSIGN']
for i, xvar in enumerate(mvmobj['varidX']):
assign_lines.append("X_MEANS('"+xvar+"') := "+str(mvmobj['mx'][0,i])+";" )
for i, xvar in enumerate(mvmobj['varidX']):
assign_lines.append("X_STD('"+xvar+"') := "+str(mvmobj['sx'][0,i])+";" )
for i, yvar in enumerate(mvmobj['varidY']):
assign_lines.append("Y_MEANS('"+yvar+"') := "+str(mvmobj['my'][0,i])+";" )
for i, yvar in enumerate(mvmobj['varidY']):
assign_lines.append("Y_STD('"+yvar+"') := "+str(mvmobj['sy'][0,i])+";" )
for i, xvar in enumerate(mvmobj['varidX']):
for j in np.arange(mvmobj['Ws'].shape[1]):
assign_lines.append("Ws('"+xvar+"',"+str(j+1)+") := "+str(mvmobj['Ws'][i,j])+";")
for i, xvar in enumerate(mvmobj['varidX']):
for j in np.arange(mvmobj['P'].shape[1]):
assign_lines.append("P('"+xvar+"',"+str(j+1)+") := "+str(mvmobj['P'][i,j])+";")
for i, yvar in enumerate(mvmobj['varidY']):
for j in np.arange(mvmobj['Q'].shape[1]):
assign_lines.append("Q('"+yvar+"',"+str(j+1)+") := "+str(mvmobj['Q'][i,j])+";")
tvar = np.std(mvmobj['T'], axis=0, ddof=1)
for j in np.arange(mvmobj['T'].shape[1]):
assign_lines.append("Tvar("+str(j+1)+") := "+str(tvar[j])+";" )
lines = top_lines + mid_lines + assign_lines
with open(fname, "w") as outfile:
outfile.write("\n".join(lines))
return
# =============================================================================
# Parsing utilities
# =============================================================================
[docs]
def unique(df, colid):
"""Return unique values preserving original order.
Args:
x (list or np.ndarray): Input sequence.
Returns:
list: Unique values in the order they first appear.
"""
return df.drop_duplicates(subset=colid, keep='first')[colid].values.tolist()
[docs]
def parse_materials(filename, sheetname):
'''Build R matrices for JRPLS from linear table in Excel.'''
materials = pd.read_excel(filename, sheet_name=sheetname)
ok = True
for lot in unique(materials, 'Finished Product Lot'):
this_lot = materials[materials["Finished Product Lot"]==lot]
for mt, m in zip(this_lot['Material'].values, this_lot['Material Lot'].values):
try:
if np.isnan(m):
print('Lot '+lot+' has no Material Lot for '+mt); ok = False; break
except:
pass
if not ok: break
print('Lot :'+lot+' ratio/qty adds to '+str(np.sum(this_lot['Ratio or Quantity'].values)))
if ok:
JR = []
materials_used = unique(materials, 'Material')
fp_lots = unique(materials, 'Finished Product Lot')
for m in materials_used:
mat_lots = np.unique(materials['Material Lot'][materials['Material']==m]).tolist()
r_mat = []
for lot in fp_lots:
rvec = np.zeros(len(mat_lots))
this_lot_this_mat = materials[(materials["Finished Product Lot"]==lot) & (materials['Material']==m)]
for l, r in zip(this_lot_this_mat['Material Lot'].values, this_lot_this_mat['Ratio or Quantity'].values):
rvec[mat_lots.index(l)] = r
r_mat.append(rvec)
r_mat_pd = pd.DataFrame(np.array(r_mat), columns=mat_lots)
r_mat_pd.insert(0, 'FPLot', fp_lots)
JR.append(r_mat_pd)
return JR, materials_used
else:
print('Data needs revision'); return False, False
[docs]
def isin_ordered_col0(df, alist):
df_ = df[df[df.columns[0]].isin(alist)].set_index(df.columns[0]).reindex(alist).reset_index()
return df_
[docs]
def reconcile_rows(df_list):
"""Align two DataFrames by their observation IDs (first column).
Reorders Y to match the row order of X. Observations present in one
but not the other are dropped, with a warning printed.
Args:
X (pd.DataFrame): Reference DataFrame. First column is observation IDs.
Y (pd.DataFrame): DataFrame to align. First column is observation IDs.
Returns:
tuple: ``(X_aligned, Y_aligned)`` — DataFrames sharing the same
ordered set of observation IDs.
"""
all_rows = []
for df in df_list:
all_rows.extend(df[df.columns[0]].values.tolist())
all_rows = np.unique(all_rows)
for df in df_list:
rows = df[df.columns[0]].values.tolist()
all_rows = [r for r in all_rows if r in rows]
return [isin_ordered_col0(df, all_rows) for df in df_list]
[docs]
def reconcile_rows_to_columns(df_list_r, df_list_c):
"""Map DataFrame rows to the columns of another DataFrame.
Used in L-shaped data structures where material lot IDs appear as
column headers in X and as row IDs in R.
Args:
X (pd.DataFrame): Process data where columns (after the first)
correspond to lot IDs.
R (pd.DataFrame): Material property data where the first column
contains lot IDs.
Returns:
tuple: ``(X_matched, R_matched)`` — aligned matrices ready for LPLS.
"""
df_list_r_o = []; df_list_c_o = []
for dfr, dfc in zip(df_list_r, df_list_c):
all_ids = list(set(dfc.columns[1:].tolist()) & set(dfr[dfr.columns[0]].values.tolist()))
# Preserve order from dfr
rows = dfr[dfr.columns[0]].values.tolist()
cols = dfc.columns[1:].tolist()
all_ids = [i for i in rows if i in cols]
dfr_ = isin_ordered_col0(dfr, all_ids)
dfc_ = dfc[all_ids].copy()
dfc_.insert(0, dfc.columns[0], dfc[dfc.columns[0]].values.tolist())
df_list_r_o.append(dfr_); df_list_c_o.append(dfc_)
return df_list_r_o, df_list_c_o
# =============================================================================
# LPLS, JRPLS, TPLS (use optimized _Ab_btbinv)
# =============================================================================
[docs]
def lpls(X, R, Y, A, *, shush=False):
"""Fit an L-shaped PLS (LPLS) model.
Models the relationship between lot physical properties (R),
process observations (X), and product quality (Y), where X rows
correspond to lots described by R columns.
Per Muteki et al., Chemom. Intell. Lab. Syst. 85 (2007) 186–194.
Args:
X (pd.DataFrame or np.ndarray): Process data matrix (n_obs × n_x).
First column is observation IDs if a DataFrame.
R (pd.DataFrame or np.ndarray): Raw material property matrix
(n_lots × n_r). Columns of X map to rows of R.
Y (pd.DataFrame or np.ndarray): Quality/response matrix
(n_lots × n_y). Rows match rows of R.
A (int): Number of latent variables.
shush (bool): Suppress printed output. Default ``False``.
Returns:
dict: Fitted LPLS model with keys:
- ``T`` (ndarray): X-space scores (n_obs × A).
- ``P`` (ndarray): X-loadings (n_x × A).
- ``Q`` (ndarray): Y-loadings (n_y × A).
- ``H`` (ndarray): R-space scores (n_lots × A).
- ``V`` (ndarray): R-space loadings (n_r × A).
- ``Rscores`` (ndarray): R projected scores.
- ``Ss`` (ndarray): Rotated R weights S*(V'S)⁻¹.
- ``r2x``, ``r2xpv``: R² for X space.
- ``r2y``, ``r2ypv``: R² for Y space.
- ``r2r``, ``r2rpv``: R² for R space.
- ``mx``, ``sx``, ``my``, ``sy``, ``mr``, ``sr``: Preprocessing params.
- ``var_t``: Score covariance matrix.
- ``T2``, ``T2_lim95``, ``T2_lim99``: Hotelling's T² and limits.
- ``speX``, ``speX_lim95``, ``speX_lim99``: X SPE and limits.
- ``speY``, ``speY_lim95``, ``speY_lim99``: Y SPE and limits.
- ``speR``, ``speR_lim95``, ``speR_lim99``: R SPE and limits.
"""
# Validate R-Y (blend observations must match row-to-row)
R, Y = _validate_inputs(R, Y, A=A)
X_, obsidX, varidX = _extract_array(X)
Y_, obsidY, varidY = _extract_array(Y)
R_, obsidR, varidR = _extract_array(R)
X_, x_mean, x_std = meancenterscale(X_)
Y_, y_mean, y_std = meancenterscale(Y_)
R_, r_mean, r_std = meancenterscale(R_)
not_Xmiss = (~np.isnan(X_)) * 1
not_Ymiss = (~np.isnan(Y_)) * 1
not_Rmiss = (~np.isnan(R_)) * 1
if not shush:
print('phi.lpls using NIPALS executed on: ' + str(datetime.datetime.now()))
X_, dummy = n2z(X_); Xhat = np.zeros(X_.shape)
Y_, dummy = n2z(Y_); R_, dummy = n2z(R_)
epsilon = 1E-9; maxit = 2000
TSSX = np.sum(X_**2); TSSXpv = np.sum(X_**2, axis=0)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
TSSR = np.sum(R_**2); TSSRpv = np.sum(R_**2, axis=0)
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None; r2R = None; r2Rpv = None
for a in range(A):
ui = Y_[:, [np.argmax(std(Y_))]]
Converged = False; num_it = 0
while not Converged:
hi = _Ab_btbinv(R_.T, ui, not_Rmiss.T)
si = _Ab_btbinv(X_.T, hi, not_Xmiss.T)
si = si / np.linalg.norm(si)
ri = _Ab_btbinv(X_, si, not_Xmiss)
ti = _Ab_btbinv(R_, ri, not_Rmiss)
qi = _Ab_btbinv(Y_.T, ti, not_Ymiss.T)
un = _Ab_btbinv(Y_, qi, not_Ymiss)
if abs((np.linalg.norm(ui) - np.linalg.norm(un))) / np.linalg.norm(ui) < epsilon:
Converged = True
if num_it > maxit:
Converged = True
if Converged:
if not shush: print('# Iterations for LV #'+str(a+1)+': ', str(num_it))
pi = _Ab_btbinv(R_.T, ti, not_Rmiss.T)
vi = _Ab_btbinv(X_.T, ri, not_Xmiss.T)
R_ = (R_ - ti @ pi.T) * not_Rmiss
X_ = (X_ - ri @ vi.T) * not_Xmiss
Y_ = (Y_ - ti @ qi.T) * not_Ymiss
Xhat = Xhat + ri @ vi.T
if a == 0:
T=ti; P=pi; S=si; U=un; Q=qi; H=hi; V=vi; Rscores=ri
else:
T=np.hstack((T,ti)); U=np.hstack((U,un)); P=np.hstack((P,pi))
Q=np.hstack((Q,qi)); S=np.hstack((S,si)); H=np.hstack((H,hi))
V=np.hstack((V,vi)); Rscores=np.hstack((Rscores,ri))
r2X, r2Xpv = _calc_r2(X_, TSSX, TSSXpv, r2X, r2Xpv)
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv, r2Y, r2Ypv)
r2R, r2Rpv = _calc_r2(R_, TSSR, TSSRpv, r2R, r2Rpv)
else:
num_it += 1; ui = un
if a == 0: numIT = num_it
else: numIT = np.hstack((numIT, num_it))
Xhat = Xhat * x_std + x_mean
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
r2R, r2Rpv = _r2_cumulative_to_per_component(r2R, r2Rpv, A)
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2X); r2yc = np.cumsum(r2Y); r2rc = np.cumsum(r2R)
if not shush:
print('--------------------------------------------------------------')
print('LV # Eig R2X sum(R2X) R2R sum(R2R) R2Y sum(R2Y)')
if A > 1:
for a in range(A):
print("LV #"+str(a+1)+": {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], r2R[a], r2rc[a], r2Y[a], r2yc[a]))
else:
print("LV #1: {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], r2R, r2rc[0], r2Y, r2yc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
lpls_obj = {'T':T, 'P':P, 'Q':Q, 'U':U, 'S':S, 'H':H, 'V':V, 'Rscores':Rscores,
'r2x':r2X, 'r2xpv':r2Xpv, 'mx':x_mean, 'sx':x_std,
'r2y':r2Y, 'r2ypv':r2Ypv, 'my':y_mean, 'sy':y_std,
'r2r':r2R, 'r2rpv':r2Rpv, 'mr':r_mean, 'sr':r_std,
'Xhat':Xhat, 'var_t':var_t}
if not isinstance(obsidX, bool): lpls_obj['obsidX'] = obsidX; lpls_obj['varidX'] = varidX
if not isinstance(obsidY, bool): lpls_obj['obsidY'] = obsidY; lpls_obj['varidY'] = varidY
if not isinstance(obsidR, bool): lpls_obj['obsidR'] = obsidR; lpls_obj['varidR'] = varidR
T2 = hott2(lpls_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1); speX_lim95, speX_lim99 = spe_ci(speX)
speY = np.sum(Y_**2, axis=1, keepdims=1); speY_lim95, speY_lim99 = spe_ci(speY)
speR = np.sum(R_**2, axis=1, keepdims=1); speR_lim95, speR_lim99 = spe_ci(speR)
lpls_obj.update({'T2':T2, 'T2_lim99':T2_lim99, 'T2_lim95':T2_lim95,
'speX':speX, 'speX_lim99':speX_lim99, 'speX_lim95':speX_lim95,
'speY':speY, 'speY_lim99':speY_lim99, 'speY_lim95':speY_lim95,
'speR':speR, 'speR_lim99':speR_lim99, 'speR_lim95':speR_lim95})
lpls_obj['Ss'] = S @ np.linalg.pinv(V.T @ S)
lpls_obj['type'] = 'lpls'
return lpls_obj
[docs]
def lpls_pred(rnew, lpls_obj):
"""Predict Y for new lot(s) using a fitted LPLS model.
Args:
rnew (np.ndarray or pd.DataFrame): R-space observation(s) for new
lot(s). Variables must match those in ``lpls_obj``.
lpls_obj (dict): Fitted LPLS model from :func:`lpls`.
Returns:
dict: Prediction results with keys:
- ``Tnew`` (ndarray): Projected scores (n_new × A).
- ``Yhat`` (ndarray): Predicted Y in original scale.
- ``speR`` (ndarray): R-space SPE for each new lot.
"""
if isinstance(rnew, np.ndarray): rnew__ = [rnew.copy()]
elif isinstance(rnew, list): rnew__ = np.array(rnew)
elif isinstance(rnew, pd.DataFrame): rnew__ = rnew.values[:,1:].astype(float)
tnew = []; sper = []
for rnew_ in rnew__:
rnew_ = ((rnew_ - lpls_obj['mr']) / lpls_obj['sr']).reshape(-1, 1)
ti = []
for a in np.arange(lpls_obj['T'].shape[1]):
ti_ = (rnew_.T @ lpls_obj['Rscores'][:,a] / (lpls_obj['Rscores'][:,a].T @ lpls_obj['Rscores'][:,a]))
ti.append(ti_[0])
rnew_ = rnew_ - (ti_ * lpls_obj['P'][:,a]).reshape(-1, 1)
tnew.append(np.array(ti))
sper.append(np.sum(rnew_**2))
tnew = np.array(tnew); sper = np.array(sper)
yhat = (tnew @ lpls_obj['Q'].T) * lpls_obj['sy'] + lpls_obj['my']
return {'Tnew':tnew, 'Yhat':yhat, 'speR':sper}
[docs]
def jrpls(Xi, Ri, Y, A, *, shush=False):
"""Fit a Joint R-LPLS (JRPLS) model across multiple campaigns.
Extends LPLS to handle multiple manufacturing campaigns, each with
their own X (process) and R (raw material) blocks sharing a common Y.
Per Garcia-Munoz, Chemom. Intell. Lab. Syst. 133 (2014) 49–62.
Args:
Xi (dict): Process data blocks ``{'campaign': pd.DataFrame}``.
Each DataFrame's first column is observation IDs.
Ri (dict): Raw material property blocks ``{'campaign': pd.DataFrame}``.
Keys must match ``Xi``. First column is lot IDs.
Y (pd.DataFrame or np.ndarray): Shared response matrix. Rows match
lots across all campaigns.
A (int): Number of latent variables.
shush (bool): Suppress printed output. Default ``False``.
Returns:
dict: Fitted JRPLS model with per-campaign sub-dicts and shared keys.
Structure mirrors :func:`lpls` output but indexed by campaign.
"""
X = []; varidX = []; obsidX = []
materials = list(Xi.keys())
for k in Xi.keys():
X_, obsidX_, varidX_ = _extract_array(Xi[k])
X.append(X_); varidX.append(varidX_); obsidX.append(obsidX_)
Y_, obsidY, varidY = _extract_array(Y)
R = []; varidR = []; obsidR = []
for k in materials:
R_, obsidR_, varidR_ = _extract_array(Ri[k])
varidR.append(varidR_); obsidR.append(obsidR_); R.append(R_)
x_mean = []; x_std = []; jr_scale = []; r_mean = []; r_std = []
not_Xmiss = []; not_Rmiss = []; Xhat = []
TSSX = []; TSSXpv = []; TSSR = []; TSSRpv = []
X__ = []; R__ = []
for X_i, R_i in zip(X, R):
X_i, x_mean_, x_std_ = meancenterscale(X_i)
R_i, r_mean_, r_std_ = meancenterscale(R_i)
jr_scale_ = np.sqrt(X_i.shape[1])
X_i = X_i / jr_scale_
x_mean.append(x_mean_); x_std.append(x_std_); jr_scale.append(jr_scale_)
r_mean.append(r_mean_); r_std.append(r_std_)
not_Xmiss.append((~np.isnan(X_i)) * 1)
not_Rmiss.append((~np.isnan(R_i)) * 1)
X_i, dummy = n2z(X_i); R_i, dummy = n2z(R_i)
X__.append(X_i); R__.append(R_i)
Xhat.append(np.zeros(X_i.shape))
TSSX.append(np.sum(X_i**2)); TSSXpv.append(np.sum(X_i**2, axis=0))
TSSR.append(np.sum(R_i**2)); TSSRpv.append(np.sum(R_i**2, axis=0))
X = X__; R = R__
Y_, y_mean, y_std = meancenterscale(Y_)
not_Ymiss = (~np.isnan(Y_)) * 1
Y_, dummy = n2z(Y_)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
if not shush:
print('phi.jrpls using NIPALS executed on: ' + str(datetime.datetime.now()))
epsilon = 1E-9; maxit = 2000
for a in range(A):
ui = Y_[:, [np.argmax(std(Y_))]]
Converged = False; num_it = 0
while not Converged:
hi = [_Ab_btbinv(R[i].T, ui, not_Rmiss[i].T) for i in range(len(R))]
si = [_Ab_btbinv(X[i].T, hi[i], not_Xmiss[i].T) for i in range(len(X))]
js = np.array([y for x in si for y in x])
for i in range(len(si)):
si[i] = si[i] / np.linalg.norm(js)
ri = [_Ab_btbinv(X[i], si[i], not_Xmiss[i]) for i in range(len(X))]
jr = np.array([y for x in ri for y in x]).astype(float)
R_ = np.hstack(R); not_Rmiss_ = np.hstack(not_Rmiss)
ti = _Ab_btbinv(R_, jr, not_Rmiss_)
qi = _Ab_btbinv(Y_.T, ti, not_Ymiss.T)
un = _Ab_btbinv(Y_, qi, not_Ymiss)
if abs((np.linalg.norm(ui) - np.linalg.norm(un))) / np.linalg.norm(ui) < epsilon:
Converged = True
if num_it > maxit:
Converged = True
if Converged:
if not shush:
print('# Iterations for LV #'+str(a+1)+': ', str(num_it))
pi = [_Ab_btbinv(R[i].T, ti, not_Rmiss[i].T) for i in range(len(R))]
vi = [_Ab_btbinv(X[i].T, ri[i], not_Xmiss[i].T) for i in range(len(X))]
for i in range(len(R)):
R[i] = (R[i] - ti @ pi[i].T) * not_Rmiss[i]
X[i] = (X[i] - ri[i] @ vi[i].T) * not_Xmiss[i]
Xhat[i] = Xhat[i] + ri[i] @ vi[i].T
Y_ = (Y_ - ti @ qi.T) * not_Ymiss
if a == 0:
T=ti; P=pi; S=si; U=un; Q=qi; H=hi; V=vi; Rscores=ri
r2X = []; r2Xpv = []; r2R = []; r2Rpv = []
for i in range(len(X)):
r2X.append(1 - np.sum(X[i]**2)/TSSX[i])
r2Xpv.append((1 - np.sum(X[i]**2, axis=0)/TSSXpv[i]).reshape(-1, 1))
r2R.append(1 - np.sum(R[i]**2)/TSSR[i])
r2Rpv.append((1 - np.sum(R[i]**2, axis=0)/TSSRpv[i]).reshape(-1, 1))
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv)
else:
T = np.hstack((T, ti)); U = np.hstack((U, un)); Q = np.hstack((Q, qi))
for i in range(len(P)): P[i] = np.hstack((P[i], pi[i]))
for i in range(len(S)): S[i] = np.hstack((S[i], si[i]))
for i in range(len(H)): H[i] = np.hstack((H[i], hi[i]))
for i in range(len(V)): V[i] = np.hstack((V[i], vi[i]))
for i in range(len(Rscores)): Rscores[i] = np.hstack((Rscores[i], ri[i]))
for i in range(len(X)):
r2X[i] = np.hstack((r2X[i], 1 - np.sum(X[i]**2)/TSSX[i]))
r2Xpv[i] = np.hstack((r2Xpv[i], (1 - np.sum(X[i]**2, axis=0)/TSSXpv[i]).reshape(-1, 1)))
r2R[i] = np.hstack((r2R[i], 1 - np.sum(R[i]**2)/TSSR[i]))
r2Rpv[i] = np.hstack((r2Rpv[i], (1 - np.sum(R[i]**2, axis=0)/TSSRpv[i]).reshape(-1, 1)))
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv, r2Y, r2Ypv)
else:
num_it += 1; ui = un
if a == 0: numIT = num_it
else: numIT = np.hstack((numIT, num_it))
for i in range(len(Xhat)):
Xhat[i] = Xhat[i] * x_std[i] + x_mean[i]
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
r2xc = []; r2rc = []
for i in range(len(X)):
for a in range(A-1, 0, -1):
r2X[i][a] = r2X[i][a] - r2X[i][a-1]
r2Xpv[i][:, a] = r2Xpv[i][:, a] - r2Xpv[i][:, a-1]
r2R[i][a] = r2R[i][a] - r2R[i][a-1]
r2Rpv[i][:, a] = r2Rpv[i][:, a] - r2Rpv[i][:, a-1]
for i, r in enumerate(r2Xpv):
if i == 0: r2xpv_all = r
else: r2xpv_all = np.vstack((r2xpv_all, r))
r2xc.append(np.cumsum(r2X[i]))
r2rc.append(np.cumsum(r2R[i]))
eigs = np.var(T, axis=0)
r2yc = np.cumsum(r2Y)
r2rc = np.mean(np.array(r2rc), axis=0)
r2xc = np.mean(np.array(r2xc), axis=0)
r2x = np.mean(np.array(r2X), axis=0)
r2r = np.mean(np.array(r2R), axis=0)
if not shush:
print('--------------------------------------------------------------')
print('LV # Eig R2X sum(R2X) R2R sum(R2R) R2Y sum(R2Y)')
if A > 1:
for a in range(A):
print("LV #"+str(a+1)+": {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2x[a], r2xc[a], r2r[a], r2rc[a], r2Y[a], r2yc[a]))
else:
print("LV #1: {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2x, r2xc[0], r2r, r2rc[0], r2Y, r2yc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
jrpls_obj = {'T':T, 'P':P, 'Q':Q, 'U':U, 'S':S, 'H':H, 'V':V, 'Rscores':Rscores,
'r2xi':r2X, 'r2xpvi':r2Xpv, 'r2xpv':r2xpv_all,
'mx':x_mean, 'sx':x_std,
'r2y':r2Y, 'r2ypv':r2Ypv, 'my':y_mean, 'sy':y_std,
'r2ri':r2R, 'r2rpvi':r2Rpv, 'mr':r_mean, 'sr':r_std,
'Xhat':Xhat, 'materials':materials, 'var_t':var_t}
if not isinstance(obsidX[0], bool):
jrpls_obj['obsidXi'] = obsidX; jrpls_obj['varidXi'] = varidX
varidXall = [materials[i]+':'+varidX[i][j] for i in range(len(materials)) for j in range(len(varidX[i]))]
jrpls_obj['varidX'] = varidXall
if not isinstance(obsidR[0], bool):
jrpls_obj['obsidRi'] = obsidR; jrpls_obj['varidRi'] = varidR
if not isinstance(obsidY, bool):
jrpls_obj['obsidY'] = obsidY; jrpls_obj['varidY'] = varidY
T2 = hott2(jrpls_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = []; speR = []; speX_lim95 = []; speX_lim99 = []; speR_lim95 = []; speR_lim99 = []
for i in range(len(X)):
speX.append(np.sum(X[i]**2, axis=1, keepdims=1))
s95, s99 = spe_ci(np.sum(X[i]**2, axis=1, keepdims=1))
speX_lim95.append(s95); speX_lim99.append(s99)
speR.append(np.sum(R[i]**2, axis=1, keepdims=1))
s95, s99 = spe_ci(np.sum(R[i]**2, axis=1, keepdims=1))
speR_lim95.append(s95); speR_lim99.append(s99)
speY = np.sum(Y_**2, axis=1, keepdims=1)
speY_lim95, speY_lim99 = spe_ci(speY)
jrpls_obj.update({'T2':T2, 'T2_lim99':T2_lim99, 'T2_lim95':T2_lim95,
'speX':speX, 'speX_lim99':speX_lim99, 'speX_lim95':speX_lim95,
'speY':speY, 'speY_lim99':speY_lim99, 'speY_lim95':speY_lim95,
'speR':speR, 'speR_lim99':speR_lim99, 'speR_lim95':speR_lim95})
Wsi = []; Ws = []
for i in range(len(S)):
Wsi.append(S[i] @ np.linalg.pinv(V[i].T @ S[i]))
if i == 0: Ws = S[i] @ np.linalg.pinv(V[i].T @ S[i])
else: Ws = np.vstack((Ws, S[i] @ np.linalg.pinv(V[i].T @ S[i])))
jrpls_obj['Ssi'] = Wsi; jrpls_obj['Ss'] = Ws
jrpls_obj['type'] = 'jrpls'
return jrpls_obj
[docs]
def jrpls_pred(rnew, jrplsobj):
"""Predict Y for a new observation using a fitted JRPLS model.
Args:
xnew (pd.DataFrame or np.ndarray): New process observation(s).
Variables must match the specified campaign's X block.
rnew (pd.DataFrame or np.ndarray): New raw material lot properties.
Variables must match the specified campaign's R block.
campaign (str): Name of the campaign this observation belongs to.
jrpls_obj (dict): Fitted JRPLS model from :func:`jrpls`.
Returns:
dict: Prediction results with keys:
- ``Tnew`` (ndarray): Projected X-scores.
- ``Yhat`` (ndarray): Predicted Y in original scale.
- ``speX`` (ndarray): X-space SPE.
- ``speR`` (ndarray): R-space SPE.
- ``T2`` (ndarray): Hotelling's T².
Example:
rnew={
'MAT1': [('A0129',0.557949425 ),('A0130',0.442050575 )],
'MAT2': [('Lac0003',1)],
'MAT3': [('TLC018', 1) ],
'MAT4': [('M0012', 1) ],
'MAT5':[('CS0017', 1) ]
}
"""
ok = True
if isinstance(rnew, list):
i = 0
for r, mr, sr in zip(rnew, jrplsobj['mr'], jrplsobj['sr']):
if not(len(r) == len(mr[0])):
ok = False
if i == 0:
rnew_ = r; mr_ = mr; sr_ = sr
Rscores = jrplsobj['Rscores'][i]
P = jrplsobj['P'][i]
else:
rnew_ = np.hstack((rnew_, r))
mr_ = np.hstack((mr_, mr)); sr_ = np.hstack((sr_, sr))
Rscores = np.vstack((Rscores, jrplsobj['Rscores'][i]))
P = np.vstack((P, jrplsobj['P'][i]))
i += 1
elif isinstance(rnew, dict):
rnew_ = [['*']] * len(jrplsobj['materials'])
for k in list(rnew.keys()):
i = jrplsobj['materials'].index(k)
ri = np.zeros((jrplsobj['mr'][i].shape[1]))
for m, r in rnew[k]:
e = jrplsobj['varidRi'][i].index(m)
ri[e] = r
rnew_[i] = ri
return jrpls_pred(rnew_, jrplsobj)
if ok:
bkzeros = 0; selmat = []
for i, r in enumerate(jrplsobj['Rscores']):
frontzeros = Rscores.shape[0] - bkzeros - r.shape[0]
row = np.vstack((np.zeros((bkzeros, 1)), np.ones((r.shape[0], 1)), np.zeros((frontzeros, 1))))
bkzeros += r.shape[0]; selmat.append(row)
rnew_ = ((rnew_ - mr_) / sr_).reshape(-1, 1)
ti = []
for a in np.arange(jrplsobj['T'].shape[1]):
ti_ = (rnew_.T @ Rscores[:, a] / (Rscores[:, a].T @ Rscores[:, a]))
ti.append(ti_[0])
rnew_ = rnew_ - (ti_ * P[:, a]).reshape(-1, 1)
tnew = np.array(ti)
sper = [np.sum(rnew_[row == 1]**2) for row in selmat]
yhat = (tnew @ jrplsobj['Q'].T) * jrplsobj['sy'] + jrplsobj['my']
return {'Tnew':tnew, 'Yhat':yhat, 'speR':sper}
else:
return 'dimensions of rnew did not match model'
[docs]
def tpls(Xi, Ri, Z, Y, A, *, shush=False):
"""Fit a TPLS model.
Models relationships between time-varying process trajectories (Z),
raw material properties (R), and product quality (Y).
Args:
Z (pd.DataFrame or np.ndarray): Process trajectory matrix.
First column is observation IDs if a DataFrame.
Xi (dict): Process data blocks ``{'campaign': pd.DataFrame}``.
Each DataFrame's first column is observation IDs.
Ri (dict): Raw material property blocks ``{'campaign': pd.DataFrame}``.
Keys must match ``Xi``. First column is lot IDs.
Y (pd.DataFrame or np.ndarray): Shared response matrix. Rows match
lots across all campaigns.
A (int): Number of latent variables.
shush (bool): Suppress printed output. Default ``False``.
Returns:
dict: Fitted TPLS model. Keys mirror :func:`jrpls` with an additional
``Ws`` (ndarray) rotated weight matrix for Z-space.
"""
X = []; varidX = []; obsidX = []
materials = list(Xi.keys())
for k in Xi.keys():
X_, obsidX_, varidX_ = _extract_array(Xi[k])
X.append(X_); varidX.append(varidX_); obsidX.append(obsidX_)
Y_, obsidY, varidY = _extract_array(Y)
Z_, obsidZ, varidZ = _extract_array(Z)
R_list = []; varidR = []; obsidR = []
for k in materials:
R_, obsidR_, varidR_ = _extract_array(Ri[k])
varidR.append(varidR_); obsidR.append(obsidR_); R_list.append(R_)
x_mean = []; x_std = []; jr_scale = []; r_mean = []; r_std = []
not_Xmiss = []; not_Rmiss = []; Xhat = []
TSSX = []; TSSXpv = []; TSSR = []; TSSRpv = []
X__ = []; R__ = []
for X_i, R_i in zip(X, R_list):
X_i, x_mean_, x_std_ = meancenterscale(X_i)
R_i, r_mean_, r_std_ = meancenterscale(R_i)
jr_scale_ = np.sqrt(X_i.shape[1])
X_i = X_i / jr_scale_
x_mean.append(x_mean_); x_std.append(x_std_); jr_scale.append(jr_scale_)
r_mean.append(r_mean_); r_std.append(r_std_)
not_Xmiss.append((~np.isnan(X_i)) * 1)
not_Rmiss.append((~np.isnan(R_i)) * 1)
X_i, dummy = n2z(X_i); R_i, dummy = n2z(R_i)
X__.append(X_i); R__.append(R_i)
Xhat.append(np.zeros(X_i.shape))
TSSX.append(np.sum(X_i**2)); TSSXpv.append(np.sum(X_i**2, axis=0))
TSSR.append(np.sum(R_i**2)); TSSRpv.append(np.sum(R_i**2, axis=0))
X = X__; R = R__
Y_, y_mean, y_std = meancenterscale(Y_)
not_Ymiss = (~np.isnan(Y_)) * 1
Y_, dummy = n2z(Y_)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
Z_, z_mean, z_std = meancenterscale(Z_)
not_Zmiss = (~np.isnan(Z_)) * 1
Z_, dummy = n2z(Z_)
TSSZ = np.sum(Z_**2); TSSZpv = np.sum(Z_**2, axis=0)
if not shush:
print('phi.tpls using NIPALS executed on: ' + str(datetime.datetime.now()))
epsilon = 1E-9; maxit = 2000
for a in range(A):
ui = Y_[:, [np.argmax(std(Y_))]]
Converged = False; num_it = 0
while not Converged:
hi = [_Ab_btbinv(R[i].T, ui, not_Rmiss[i].T) for i in range(len(R))]
si = [_Ab_btbinv(X[i].T, hi[i], not_Xmiss[i].T) for i in range(len(X))]
js = np.array([y for x in si for y in x])
for i in range(len(si)):
si[i] = si[i] / np.linalg.norm(js)
ri = [_Ab_btbinv(X[i], si[i], not_Xmiss[i]) for i in range(len(X))]
jr = np.array([y for x in ri for y in x]).astype(float)
R_ = np.hstack(R); not_Rmiss_ = np.hstack(not_Rmiss)
t_rx = _Ab_btbinv(R_, jr, not_Rmiss_)
wi = _Ab_btbinv(Z_.T, ui, not_Zmiss.T)
wi = wi / np.linalg.norm(wi)
t_z = _Ab_btbinv(Z_, wi, not_Zmiss)
Taux = np.hstack((t_rx, t_z))
plsobj_ = pls(Taux, Y_, 1, mcsX=False, mcsY=False, shush=True, force_nipals=True)
wt_i = plsobj_['W']; qi = plsobj_['Q']; un = plsobj_['U']; ti = plsobj_['T']
if abs((np.linalg.norm(ui) - np.linalg.norm(un))) / np.linalg.norm(ui) < epsilon:
Converged = True
if num_it > maxit:
Converged = True
if Converged:
if not shush:
print('# Iterations for LV #'+str(a+1)+': ', str(num_it))
pi = [_Ab_btbinv(R[i].T, ti, not_Rmiss[i].T) for i in range(len(R))]
vi = [_Ab_btbinv(X[i].T, ri[i], not_Xmiss[i].T) for i in range(len(X))]
pzi = _Ab_btbinv(Z_.T, ti, not_Zmiss.T)
for i in range(len(R)):
R[i] = (R[i] - ti @ pi[i].T) * not_Rmiss[i]
X[i] = (X[i] - ri[i] @ vi[i].T) * not_Xmiss[i]
Xhat[i] = Xhat[i] + ri[i] @ vi[i].T
Y_ = (Y_ - ti @ qi.T) * not_Ymiss
Z_ = (Z_ - ti @ pzi.T) * not_Zmiss
if a == 0:
T=ti; P=pi; Pz=pzi; S=si; U=un; Q=qi; H=hi; V=vi
Rscores=ri; W=wi; Wt=wt_i
r2X = []; r2Xpv = []; r2R = []; r2Rpv = []
for i in range(len(X)):
r2X.append(1 - np.sum(X[i]**2)/TSSX[i])
r2Xpv.append((1 - np.sum(X[i]**2, axis=0)/TSSXpv[i]).reshape(-1, 1))
r2R.append(1 - np.sum(R[i]**2)/TSSR[i])
r2Rpv.append((1 - np.sum(R[i]**2, axis=0)/TSSRpv[i]).reshape(-1, 1))
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv)
r2Z, r2Zpv = _calc_r2(Z_, TSSZ, TSSZpv)
else:
T = np.hstack((T, ti)); U = np.hstack((U, un)); Q = np.hstack((Q, qi))
W = np.hstack((W, wi)); Wt = np.hstack((Wt, wt_i)); Pz = np.hstack((Pz, pzi))
for i in range(len(P)): P[i] = np.hstack((P[i], pi[i]))
for i in range(len(S)): S[i] = np.hstack((S[i], si[i]))
for i in range(len(H)): H[i] = np.hstack((H[i], hi[i]))
for i in range(len(V)): V[i] = np.hstack((V[i], vi[i]))
for i in range(len(Rscores)): Rscores[i] = np.hstack((Rscores[i], ri[i]))
for i in range(len(X)):
r2X[i] = np.hstack((r2X[i], 1 - np.sum(X[i]**2)/TSSX[i]))
r2Xpv[i] = np.hstack((r2Xpv[i], (1 - np.sum(X[i]**2, axis=0)/TSSXpv[i]).reshape(-1, 1)))
r2R[i] = np.hstack((r2R[i], 1 - np.sum(R[i]**2)/TSSR[i]))
r2Rpv[i] = np.hstack((r2Rpv[i], (1 - np.sum(R[i]**2, axis=0)/TSSRpv[i]).reshape(-1, 1)))
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv, r2Y, r2Ypv)
r2Z, r2Zpv = _calc_r2(Z_, TSSZ, TSSZpv, r2Z, r2Zpv)
else:
num_it += 1; ui = un
if a == 0: numIT = num_it
else: numIT = np.hstack((numIT, num_it))
for i in range(len(Xhat)):
Xhat[i] = Xhat[i] * x_std[i] + x_mean[i]
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
r2Z, r2Zpv = _r2_cumulative_to_per_component(r2Z, r2Zpv, A)
r2xc = []; r2rc = []
for i in range(len(X)):
for a in range(A-1, 0, -1):
r2X[i][a] = r2X[i][a] - r2X[i][a-1]
r2Xpv[i][:, a] = r2Xpv[i][:, a] - r2Xpv[i][:, a-1]
r2R[i][a] = r2R[i][a] - r2R[i][a-1]
r2Rpv[i][:, a] = r2Rpv[i][:, a] - r2Rpv[i][:, a-1]
for i, r in enumerate(r2Xpv):
if i == 0: r2xpv_all = r
else: r2xpv_all = np.vstack((r2xpv_all, r))
r2xc.append(np.cumsum(r2X[i]))
r2rc.append(np.cumsum(r2R[i]))
r2yc = np.cumsum(r2Y); r2zc = np.cumsum(r2Z)
r2rc = np.mean(np.array(r2rc), axis=0)
r2xc = np.mean(np.array(r2xc), axis=0)
r2x = np.mean(np.array(r2X), axis=0)
r2r = np.mean(np.array(r2R), axis=0)
if not shush:
print('--------------------------------------------------------------')
print('LV # R2X sum(R2X) R2R sum(R2R) R2Z sum(R2Z) R2Y sum(R2Y)')
if A > 1:
for a in range(A):
print("LV #"+str(a+1)+": {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(r2x[a], r2xc[a], r2r[a], r2rc[a], r2Z[a], r2zc[a], r2Y[a], r2yc[a]))
else:
print("LV #1: {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(r2x, r2xc[0], r2r, r2rc[0], r2Z, r2zc[0], r2Y, r2yc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
tpls_obj = {'T':T, 'P':P, 'Q':Q, 'U':U, 'S':S, 'H':H, 'V':V, 'Rscores':Rscores,
'r2xi':r2X, 'r2xpvi':r2Xpv, 'r2xpv':r2xpv_all,
'mx':x_mean, 'sx':x_std,
'r2y':r2Y, 'r2ypv':r2Ypv, 'my':y_mean, 'sy':y_std,
'r2ri':r2R, 'r2rpvi':r2Rpv, 'mr':r_mean, 'sr':r_std,
'r2z':r2Z, 'r2zpv':r2Zpv, 'mz':z_mean, 'sz':z_std,
'Xhat':Xhat, 'materials':materials, 'Wt':Wt, 'W':W, 'Pz':Pz, 'var_t':var_t}
if not isinstance(obsidX[0], bool):
tpls_obj['obsidXi'] = obsidX; tpls_obj['varidXi'] = varidX
varidXall = [materials[i]+':'+varidX[i][j] for i in range(len(materials)) for j in range(len(varidX[i]))]
tpls_obj['varidX'] = varidXall
if not isinstance(obsidR[0], bool):
tpls_obj['obsidRi'] = obsidR; tpls_obj['varidRi'] = varidR
if not isinstance(obsidY, bool):
tpls_obj['obsidY'] = obsidY; tpls_obj['varidY'] = varidY
if not isinstance(obsidZ, bool):
tpls_obj['obsidZ'] = obsidZ; tpls_obj['varidZ'] = varidZ
T2 = hott2(tpls_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = []; speR = []; speX_lim95 = []; speX_lim99 = []; speR_lim95 = []; speR_lim99 = []
for i in range(len(X)):
speX.append(np.sum(X[i]**2, axis=1, keepdims=1))
s95, s99 = spe_ci(np.sum(X[i]**2, axis=1, keepdims=1))
speX_lim95.append(s95); speX_lim99.append(s99)
speR.append(np.sum(R[i]**2, axis=1, keepdims=1))
s95, s99 = spe_ci(np.sum(R[i]**2, axis=1, keepdims=1))
speR_lim95.append(s95); speR_lim99.append(s99)
speY = np.sum(Y_**2, axis=1, keepdims=1); speY_lim95, speY_lim99 = spe_ci(speY)
speZ = np.sum(Z_**2, axis=1, keepdims=1); speZ_lim95, speZ_lim99 = spe_ci(speZ)
tpls_obj.update({'T2':T2, 'T2_lim99':T2_lim99, 'T2_lim95':T2_lim95,
'speX':speX, 'speX_lim99':speX_lim99, 'speX_lim95':speX_lim95,
'speY':speY, 'speY_lim99':speY_lim99, 'speY_lim95':speY_lim95,
'speR':speR, 'speR_lim99':speR_lim99, 'speR_lim95':speR_lim95,
'speZ':speZ, 'speZ_lim99':speZ_lim99, 'speZ_lim95':speZ_lim95})
Wsi = []; Ws = []
for i in range(len(S)):
Wsi.append(S[i] @ np.linalg.pinv(V[i].T @ S[i]))
if i == 0: Ws = S[i] @ np.linalg.pinv(V[i].T @ S[i])
else: Ws = np.vstack((Ws, S[i] @ np.linalg.pinv(V[i].T @ S[i])))
tpls_obj['Ssi'] = Wsi; tpls_obj['Ss'] = Ws
tpls_obj['type'] = 'tpls'
tpls_obj['Ws'] = W @ np.linalg.pinv(Pz.T @ W)
return tpls_obj
[docs]
def jypls(Xi, Yi, A, *, shush=False):
"""Fit a Joint-Y PLS (JYPLS) model across multiple campaigns.
Each campaign has its own X block (different variables allowed), but all
campaigns share a common Y column space and a jointly estimated Q matrix.
Per Garcia-Munoz, MacGregor, Kourti, Chemom. Intell. Lab. Syst. 79
(2005) 101–114.
Args:
Xi (dict): Predictor blocks ``{'campaign_name': pd.DataFrame}``.
Each X can have a different number of columns.
First column of each DataFrame is observation IDs.
Yi (dict): Response blocks ``{'campaign_name': pd.DataFrame}``.
Keys must match ``Xi``. All Y blocks must have identical columns
(same Y variable space across campaigns).
First column of each DataFrame is observation IDs.
A (int): Number of latent variables.
shush (bool): Suppress printed output. Default ``False``.
Returns:
dict: Fitted JYPLS model with keys:
- ``Q`` (ndarray): Shared Y-loadings (n_y × A).
- ``T`` (dict): Per-campaign X-scores.
- ``P`` (dict): Per-campaign X-loadings.
- ``W`` (dict): Per-campaign X-weights.
- ``Ws`` (dict): Per-campaign rotated weights W*(P'W)⁻¹.
- ``r2xi`` (dict): Per-campaign R² for X.
- ``r2yi`` (dict): Per-campaign R² for Y.
- ``r2y`` (float): Overall R² for Y.
- ``mx``, ``sx`` (dict): Per-campaign X preprocessing params.
- ``my``, ``sy`` (ndarray): Shared Y preprocessing params.
- ``blk_scale`` (dict): Per-campaign block scaling factors.
- ``var_t`` (ndarray): Pooled score covariance matrix.
- ``campaigns`` (list): Ordered list of campaign names.
"""
campaigns = list(Xi.keys())
# --- Validate keys match ---
if set(Xi.keys()) != set(Yi.keys()):
raise ValueError(
f"Xi and Yi must have the same campaign keys. "
f"Xi keys: {list(Xi.keys())}, Yi keys: {list(Yi.keys())}")
# --- Validate and reconcile per campaign, check Y columns ---
y_col_ref = None
y_col_ref_name = None
for k in campaigns:
Xi[k], Yi[k] = _validate_inputs(Xi[k], Yi[k])
if isinstance(Yi[k], pd.DataFrame):
ycols = Yi[k].columns[1:].tolist()
if y_col_ref is None:
y_col_ref = ycols
y_col_ref_name = k
elif ycols != y_col_ref:
raise ValueError(
f"All Y blocks must have the same columns (Joint-Y space). "
f"Campaign '{k}' has columns {ycols[:5]}... "
f"but campaign '{y_col_ref_name}' has {y_col_ref[:5]}...")
# --- Extract arrays ---
X = []; Y = []; varidX = []; obsidX = []; varidY = False; obsidY = []
for k in campaigns:
x_, ox, vx = _extract_array(Xi[k])
y_, oy, vy = _extract_array(Yi[k])
X.append(x_); Y.append(y_)
varidX.append(vx); obsidX.append(ox)
obsidY.append(oy)
if vy is not False:
varidY = vy # same for all campaigns
# --- Compute joint Y mean and std from stacked Y ---
Y_stacked = np.vstack(Y)
y_mean = mean(Y_stacked)
y_std = std(Y_stacked)
# --- Scale each block ---
x_mean = []; x_std = []; blk_scale = []
not_Xmiss = []; not_Ymiss = []
TSSX = []; TSSXpv = []
TSSY_i = []; TSSYpv_i = []
for i in range(len(campaigns)):
# X_i: mean center, autoscale, block scale
X[i], xm, xs = meancenterscale(X[i])
bs = np.sqrt(X[i].shape[1])
X[i] = X[i] / bs
x_mean.append(xm); x_std.append(xs); blk_scale.append(bs)
not_Xmiss.append((~np.isnan(X[i])) * 1)
X[i], _ = n2z(X[i])
TSSX.append(np.sum(X[i]**2))
TSSXpv.append(np.sum(X[i]**2, axis=0))
# Y_i: scale with joint mean/std
Y[i] = (Y[i] - y_mean) / y_std
not_Ymiss.append((~np.isnan(Y[i])) * 1)
Y[i], _ = n2z(Y[i])
TSSY_i.append(np.sum(Y[i]**2))
TSSYpv_i.append(np.sum(Y[i]**2, axis=0))
# Joint Y TSS from scaled stacked Y
Y_stacked_mcs = np.vstack(Y)
TSSY = np.sum(Y_stacked_mcs**2)
TSSYpv = np.sum(Y_stacked_mcs**2, axis=0)
if not shush:
print('phi.jypls using NIPALS executed on: ' + str(datetime.datetime.now()))
# --- NIPALS ---
epsilon = 1E-9; maxit = 2000
for a in range(A):
# Initialize u_i from column with max variance in stacked Y
Y_stack_curr = np.vstack(Y)
max_col = np.argmax(std(Y_stack_curr))
ui = [Y[i][:, [max_col]] for i in range(len(campaigns))]
Converged = False; num_it = 0
while not Converged:
# Step 1: w_i = X_i' u_i / (u_i' u_i)
wi = []
for i in range(len(campaigns)):
w_ = _Ab_btbinv(X[i].T, ui[i], not_Xmiss[i].T)
w_ = w_ / np.linalg.norm(w_)
wi.append(w_)
# Step 2: t_i = X_i w_i / (w_i' w_i)
ti = []
for i in range(len(campaigns)):
t_ = _Ab_btbinv(X[i], wi[i], not_Xmiss[i])
ti.append(t_)
# Step 3: Joint q from stacked t and Y
t_stacked = np.vstack(ti)
Y_stack_curr = np.vstack(Y)
not_Ymiss_stacked = np.vstack(not_Ymiss)
qi = _Ab_btbinv(Y_stack_curr.T, t_stacked, not_Ymiss_stacked.T)
# Step 4: u_i = Y_i q / (q' q)
un = []
for i in range(len(campaigns)):
u_ = _Ab_btbinv(Y[i], qi, not_Ymiss[i])
un.append(u_)
# Check convergence across all campaigns
conv = all(
abs(np.linalg.norm(ui[i]) - np.linalg.norm(un[i])) / max(np.linalg.norm(ui[i]), 1e-16) < epsilon
for i in range(len(campaigns)))
if conv or num_it > maxit:
Converged = True
if Converged:
if not shush:
print('# Iterations for LV #'+str(a+1)+': ', str(num_it))
# Sign convention: ensure var(t>0) > var(t<0) on stacked scores
t_stacked = np.vstack(ti)
if (len(t_stacked[t_stacked < 0]) > 0) and (len(t_stacked[t_stacked > 0]) > 0):
if np.var(t_stacked[t_stacked < 0]) > np.var(t_stacked[t_stacked >= 0]):
ti = [-t for t in ti]
wi = [-w for w in wi]
un = [-u for u in un]
qi = -qi
# Compute p_i for deflation: p_i = X_i' t_i / (t_i' t_i)
pi = []
for i in range(len(campaigns)):
p_ = _Ab_btbinv(X[i].T, ti[i], not_Xmiss[i].T)
pi.append(p_)
# Deflate all blocks
for i in range(len(campaigns)):
X[i] = (X[i] - ti[i] @ pi[i].T) * not_Xmiss[i]
Y[i] = (Y[i] - ti[i] @ qi.T) * not_Ymiss[i]
# Store
if a == 0:
T_blk = [[t] for t in ti]
P_blk = [[p] for p in pi]
W_blk = [[w] for w in wi]
U_blk = [[u] for u in un]
Q = qi
r2X = []; r2Xpv = []; r2Yi = []
for i in range(len(campaigns)):
r2X.append(1 - np.sum(X[i]**2) / TSSX[i])
r2Xpv.append((1 - np.sum(X[i]**2, axis=0) / TSSXpv[i]).reshape(-1, 1))
r2Yi.append(1 - np.sum(Y[i]**2) / TSSY_i[i])
Y_stack_res = np.vstack(Y)
r2Y, r2Ypv = _calc_r2(Y_stack_res, TSSY, TSSYpv)
else:
for i in range(len(campaigns)):
T_blk[i].append(ti[i])
P_blk[i].append(pi[i])
W_blk[i].append(wi[i])
U_blk[i].append(un[i])
Q = np.hstack((Q, qi))
for i in range(len(campaigns)):
r2X[i] = np.hstack((r2X[i], 1 - np.sum(X[i]**2) / TSSX[i]))
r2Xpv[i] = np.hstack((r2Xpv[i], (1 - np.sum(X[i]**2, axis=0) / TSSXpv[i]).reshape(-1, 1)))
r2Yi[i] = np.hstack((r2Yi[i], 1 - np.sum(Y[i]**2) / TSSY_i[i]))
Y_stack_res = np.vstack(Y)
r2Y, r2Ypv = _calc_r2(Y_stack_res, TSSY, TSSYpv, r2Y, r2Ypv)
else:
num_it += 1
ui = un
# --- Assemble per-campaign matrices ---
T = [np.hstack(t_list) for t_list in T_blk]
P = [np.hstack(p_list) for p_list in P_blk]
W = [np.hstack(w_list) for w_list in W_blk]
U = [np.hstack(u_list) for u_list in U_blk]
# Ws per campaign
Ws = []
for i in range(len(campaigns)):
Ws_i = W[i] @ np.linalg.pinv(P[i].T @ W[i])
Ws_i[:, 0] = W[i][:, 0]
Ws.append(Ws_i)
# --- R2 per component ---
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
for i in range(len(campaigns)):
for aa in range(A-1, 0, -1):
r2X[i][aa] = r2X[i][aa] - r2X[i][aa-1]
r2Xpv[i][:, aa] = r2Xpv[i][:, aa] - r2Xpv[i][:, aa-1]
r2Yi[i][aa] = r2Yi[i][aa] - r2Yi[i][aa-1]
# Stacked T for diagnostics
T_stacked = np.vstack(T)
var_t = (T_stacked.T @ T_stacked) / T_stacked.shape[0]
eigs = np.var(T_stacked, axis=0)
# Aggregate R2X across campaigns
r2x_avg = np.mean(np.array(r2X), axis=0)
r2xc = np.cumsum(r2x_avg)
r2yc = np.cumsum(r2Y)
if not shush:
print('--------------------------------------------------------------')
print('LV # Eig R2X(avg) sum(R2X) R2Y sum(R2Y)')
if A > 1:
for aa in range(A):
print("LV #"+str(aa+1)+": {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(
eigs[aa], r2x_avg[aa], r2xc[aa], r2Y[aa], r2yc[aa]))
else:
print("LV #1: {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(
eigs[0], r2x_avg, r2xc[0], r2Y, r2yc[0]))
print('')
print('R2Y and R2X per campaign:')
for i, k in enumerate(campaigns):
r2xi_c = np.cumsum(r2X[i]) if A > 1 else r2X[i]
r2yi_c = np.cumsum(r2Yi[i]) if A > 1 else r2Yi[i]
r2xi_t = r2xi_c[-1] if A > 1 else r2xi_c
r2yi_t = r2yi_c[-1] if A > 1 else r2yi_c
print(" {}: R2X={:.3f} R2Y={:.3f}".format(k, r2xi_t, r2yi_t))
print('--------------------------------------------------------------')
# --- Diagnostics ---
T2 = hott2({'T': T_stacked, 'var_t': var_t}, Tnew=T_stacked)
n = T_stacked.shape[0]
T2_lim99 = (((n-1)*(n+1)*A) / (n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A) / (n*(n-A))) * f95(A, (n-A))
speX = []; speX_lim95 = []; speX_lim99 = []
speY = []; speY_lim95 = []; speY_lim99 = []
for i in range(len(campaigns)):
sx_ = np.sum(X[i]**2, axis=1, keepdims=1)
s95, s99 = spe_ci(sx_)
speX.append(sx_); speX_lim95.append(s95); speX_lim99.append(s99)
sy_ = np.sum(Y[i]**2, axis=1, keepdims=1)
s95, s99 = spe_ci(sy_)
speY.append(sy_); speY_lim95.append(s95); speY_lim99.append(s99)
# Build varidX aggregated list
varidXall = []
for i, k in enumerate(campaigns):
if varidX[i] is not False:
varidXall.append([k + ':' + v for v in varidX[i]])
else:
varidXall.append([k + ':Var' + str(j) for j in range(X[i].shape[1] if isinstance(X[i], np.ndarray) else 0)])
jypls_obj = {
'T': T, 'P': P, 'Q': Q, 'W': W, 'Ws': Ws, 'U': U,
'r2xi': r2X, 'r2xpvi': r2Xpv,
'r2y': r2Y, 'r2ypv': r2Ypv, 'r2yi': r2Yi,
'mx': x_mean, 'sx': x_std, 'blk_scale': blk_scale,
'my': y_mean, 'sy': y_std,
'campaigns': campaigns, 'var_t': var_t,
'varidXi': varidX, 'varidXall': varidXall,
'obsidXi': obsidX, 'obsidYi': obsidY,
'T2': T2, 'T2_lim99': T2_lim99, 'T2_lim95': T2_lim95,
'speX': speX, 'speX_lim99': speX_lim99, 'speX_lim95': speX_lim95,
'speY': speY, 'speY_lim99': speY_lim99, 'speY_lim95': speY_lim95,
'type': 'jypls'
}
if varidY is not False:
jypls_obj['varidY'] = varidY
return jypls_obj
[docs]
def jypls_pred(xnew, campaign, jypls_obj):
"""Predict Y for a new observation using a fitted JYPLS model.
Args:
xnew (pd.DataFrame or np.ndarray): New X observation(s). Variables
must match those of the specified campaign.
campaign (str): Campaign name this observation belongs to.
Must match a key used when building the model with :func:`jypls`.
jypls_obj (dict): Fitted JYPLS model from :func:`jypls`.
Returns:
dict: Prediction results with keys:
- ``Tnew`` (ndarray): Projected X-scores (n_new × A).
- ``Yhat`` (ndarray): Predicted Y in original scale (n_new × n_y).
- ``speX`` (ndarray): X-space SPE for each new observation.
- ``T2`` (ndarray): Hotelling's T² using pooled score covariance.
"""
if campaign not in jypls_obj['campaigns']:
raise ValueError(
f"Campaign '{campaign}' not found in model. "
f"Available campaigns: {jypls_obj['campaigns']}")
idx = jypls_obj['campaigns'].index(campaign)
# Extract array
if isinstance(xnew, pd.DataFrame):
X_ = np.array(xnew.values[:, 1:]).astype(float)
elif isinstance(xnew, np.ndarray):
X_ = xnew.copy()
if X_.ndim == 1:
X_ = X_.reshape(1, -1)
# Validate dimensions
expected_cols = jypls_obj['mx'][idx].shape[1] if jypls_obj['mx'][idx].ndim == 2 else len(jypls_obj['mx'][idx])
if X_.shape[1] != expected_cols:
raise ValueError(
f"xnew has {X_.shape[1]} variables but campaign '{campaign}' "
f"expects {expected_cols} variables.")
# Scale: mean center, autoscale, block scale (same as training)
X_mcs = (X_ - jypls_obj['mx'][idx]) / jypls_obj['sx'][idx]
X_mcs = X_mcs / jypls_obj['blk_scale'][idx]
# Handle missing data
X_nan_map = np.isnan(X_mcs)
if not X_nan_map.any():
# Complete data: use Ws for direct projection
tnew = X_mcs @ jypls_obj['Ws'][idx]
xhat = tnew @ jypls_obj['P'][idx].T
speX = np.sum((X_mcs - xhat)**2, axis=1, keepdims=True)
else:
# Missing data: deflation-based projection using W
not_Xmiss = (~X_nan_map) * 1
X_mcs, _ = n2z(X_mcs)
for i in range(X_mcs.shape[0]):
row_map = not_Xmiss[[i], :]
tempW = jypls_obj['W'][idx] * row_map.T
for aa in range(jypls_obj['Q'].shape[1]):
WTW = tempW[:, [aa]].T @ tempW[:, [aa]]
tnew_aux, _, _, _ = np.linalg.lstsq(WTW, tempW[:, [aa]].T @ X_mcs[[i], :].T, rcond=None)
X_mcs[[i], :] = (X_mcs[[i], :] - tnew_aux @ jypls_obj['P'][idx][:, [aa]].T) * row_map
if aa == 0: tnew_ = tnew_aux
else: tnew_ = np.vstack((tnew_, tnew_aux))
if i == 0: tnew = tnew_.T
else: tnew = np.vstack((tnew, tnew_.T))
xhat = tnew @ jypls_obj['P'][idx].T
speX = np.sum((X_mcs - xhat)**2 * not_Xmiss, axis=1, keepdims=True)
# Predict Y using shared Q, unscale
yhat = tnew @ jypls_obj['Q'].T * jypls_obj['sy'] + jypls_obj['my']
# T2 using pooled var_t
var_t_inv = np.linalg.inv(jypls_obj['var_t'])
T2 = np.sum((tnew @ var_t_inv) * tnew, axis=1)
return {'Tnew': tnew, 'Yhat': yhat, 'speX': speX, 'T2': T2}
[docs]
def tpls_pred(rnew, znew, tplsobj):
"""Predict Y for new observations using a fitted TPLS model.
Args:
rnew (np.ndarray or pd.DataFrame): New R-space (raw material) data.
znew (np.ndarray or pd.DataFrame): New Z-space (trajectory) data.
tpls_obj (dict): Fitted TPLS model from :func:`tpls`.
Returns:
dict: Prediction results with keys:
- ``Tnew`` (ndarray): Projected scores.
- ``Yhat`` (ndarray): Predicted Y in original scale.
- ``speR`` (ndarray): R-space SPE.
- ``speZ`` (ndarray): Z-space SPE.
- ``T2`` (ndarray): Hotelling's T².
Example for rnew:
rnew={
'MAT1': [('A0129',0.557949425 ),('A0130',0.442050575 )],
'MAT2': [('Lac0003',1)],
'MAT3': [('TLC018', 1) ],
'MAT4': [('M0012', 1) ],
'MAT5':[('CS0017', 1) ]
}
"""
ok = True
if isinstance(rnew, list):
i = 0
for r, mr, sr in zip(rnew, tplsobj['mr'], tplsobj['sr']):
if not(len(r) == len(mr[0])):
ok = False
if i == 0:
rnew_ = r; mr_ = mr; sr_ = sr
Rscores = tplsobj['Rscores'][i]
P = tplsobj['P'][i]
else:
rnew_ = np.hstack((rnew_, r))
mr_ = np.hstack((mr_, mr)); sr_ = np.hstack((sr_, sr))
Rscores = np.vstack((Rscores, tplsobj['Rscores'][i]))
P = np.vstack((P, tplsobj['P'][i]))
i += 1
elif isinstance(rnew, dict):
rnew_ = [['*']] * len(tplsobj['materials'])
for k in list(rnew.keys()):
i = tplsobj['materials'].index(k)
ri = np.zeros((tplsobj['mr'][i].shape[1]))
for m, r in rnew[k]:
e = tplsobj['varidRi'][i].index(m)
ri[e] = r
rnew_[i] = ri
return tpls_pred(rnew_, znew, tplsobj)
if isinstance(znew, pd.DataFrame):
znew_ = znew.values.reshape(-1)[1:].astype(float)
elif isinstance(znew, list):
znew_ = np.array(znew)
elif isinstance(znew, np.ndarray):
znew_ = znew.copy()
if not(len(znew_) == tplsobj['mz'].shape[1]):
ok = False
if ok:
bkzeros = 0; selmat = []
for i, r in enumerate(tplsobj['Rscores']):
frontzeros = Rscores.shape[0] - bkzeros - r.shape[0]
row = np.vstack((np.zeros((bkzeros, 1)), np.ones((r.shape[0], 1)), np.zeros((frontzeros, 1))))
bkzeros += r.shape[0]; selmat.append(row)
rnew_ = ((rnew_ - mr_) / sr_).reshape(-1, 1)
znew_ = ((znew_ - tplsobj['mz']) / tplsobj['sz']).reshape(-1, 1)
tnew = []
for a in np.arange(tplsobj['T'].shape[1]):
ti_rx_ = (rnew_.T @ Rscores[:, a] / (Rscores[:, a].T @ Rscores[:, a]))
ti_z_ = znew_.T @ tplsobj['W'][:, a]
ti_ = np.array([ti_rx_, ti_z_]).reshape(1, -1) @ tplsobj['Wt'][:, a]
tnew.append(ti_[0])
rnew_ = rnew_ - (ti_ * P[:, a]).reshape(-1, 1)
znew_ = znew_ - (ti_ * tplsobj['Pz'][:, a]).reshape(-1, 1)
sper = [np.sum(rnew_[row == 1]**2) for row in selmat]
spez = np.sum(znew_**2)
tnew = np.array(tnew)
yhat = (tnew @ tplsobj['Q'].T) * tplsobj['sy'] + tplsobj['my']
return {'Tnew':tnew, 'Yhat':yhat, 'speR':sper, 'speZ':spez}
else:
return 'dimensions of rnew or znew did not match model'
# =============================================================================
# Varimax rotation
# =============================================================================
[docs]
def varimax_(X, gamma=1.0, q=20, tol=1e-6):
p, k = X.shape
R = eye(k); d = 0
for i in range(q):
d_ = d
Lambda = dot(X, R)
u, s, vh = svd(dot(X.T, asarray(Lambda)**3 - (gamma/p) * dot(Lambda, diag(diag(dot(Lambda.T, Lambda))))))
R = dot(u, vh)
d = sum(s)
if d_ != 0 and d/d_ < 1 + tol: break
return dot(X, R)
[docs]
def varimax_rotation(mvm_obj, X, *, Y=False):
"""Apply Varimax rotation to PCA or PLS loadings.
Rotates loadings toward a simple structure (sparse, interpretable).
Updates the model object in-place and returns the rotated model.
Args:
mvm_obj (dict): Fitted PCA or PLS model.
X (pd.DataFrame or np.ndarray): Training X data used to reproject
scores after rotation.
Y (pd.DataFrame or np.ndarray): Training Y data (optional, for PLS).
Returns:
dict: Model with rotated loadings and reprojected scores.
"""
mvmobj = mvm_obj.copy()
if isinstance(X, pd.DataFrame): X_ = X.values[:,1:].astype(float)
else: X_ = X.copy()
if isinstance(Y, pd.DataFrame): Y_ = Y.values[:,1:].astype(float)
elif isinstance(Y, np.ndarray): Y_ = Y.copy()
X_ = (X_ - mvmobj['mx']) / mvmobj['sx']
not_Xmiss = (~np.isnan(X_)) * 1
X_, Xmap = n2z(X_)
TSSX = np.sum(X_**2); TSSXpv = np.sum(X_**2, axis=0)
if not isinstance(Y, bool):
Y_ = (Y_ - mvmobj['my']) / mvmobj['sy']
not_Ymiss = (~np.isnan(Y_)) * 1
Y_, Ymap = n2z(Y_)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
A = mvmobj['T'].shape[1]
if 'Q' in mvmobj:
Wrot = varimax_(mvmobj['W'])
Trot=[]; Prot=[]; Qrot=[]; Urot=[]
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None
for a in np.arange(A):
ti = _Ab_btbinv(X_, Wrot[:,a], not_Xmiss)
pi = _Ab_btbinv(X_.T, ti, not_Xmiss.T)
qi = _Ab_btbinv(Y_.T, ti, not_Ymiss.T)
ui = _Ab_btbinv(Y_, qi, not_Ymiss)
X_ = (X_ - ti @ pi.T) * not_Xmiss
Y_ = (Y_ - ti @ qi.T) * not_Ymiss
r2Xpv_new = np.zeros(len(TSSXpv))
r2Xpv_new[TSSXpv>0] = 1 - (np.sum(X_**2, axis=0)[TSSXpv>0] / TSSXpv[TSSXpv>0])
r2X_new = 1 - np.sum(X_**2) / TSSX
r2Ypv_new = np.zeros(len(TSSYpv))
r2Ypv_new[TSSYpv>0] = 1 - (np.sum(Y_**2, axis=0)[TSSYpv>0] / TSSYpv[TSSYpv>0])
r2Y_new = 1 - np.sum(Y_**2) / TSSY
if r2X is None:
r2X = r2X_new; r2Xpv = r2Xpv_new.reshape(-1,1)
r2Y = r2Y_new; r2Ypv = r2Ypv_new.reshape(-1,1)
else:
r2X = np.hstack((r2X, r2X_new)); r2Xpv = np.hstack((r2Xpv, r2Xpv_new.reshape(-1,1)))
r2Y = np.hstack((r2Y, r2Y_new)); r2Ypv = np.hstack((r2Ypv, r2Ypv_new.reshape(-1,1)))
Trot.append(ti); Prot.append(pi); Qrot.append(qi); Urot.append(ui)
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
Trot = np.array(Trot).T[0]; Prot = np.array(Prot).T[0]
Qrot = np.array(Qrot).T[0]; Urot = np.array(Urot).T[0]
Wsrot = Wrot @ np.linalg.pinv(Prot.T @ Wrot)
mvmobj.update({'W':Wrot, 'T':Trot, 'P':Prot, 'Q':Qrot, 'U':Urot, 'Ws':Wsrot,
'r2x':r2X, 'r2xpv':r2Xpv, 'r2y':r2Y, 'r2ypv':r2Ypv})
else:
Prot = varimax_(mvmobj['P'])
Trot = []; r2X = None; r2Xpv = None
for a in np.arange(A):
ti = _Ab_btbinv(X_, Prot[:,a], not_Xmiss)
Trot.append(ti)
X_ = (X_ - ti @ Prot[:,[a]].T) * not_Xmiss
r2Xpv_new = np.zeros(len(TSSXpv))
r2Xpv_new[TSSXpv>0] = 1 - (np.sum(X_**2, axis=0)[TSSXpv>0] / TSSXpv[TSSXpv>0])
r2X_new = 1 - np.sum(X_**2) / TSSX
if r2X is None:
r2X = r2X_new; r2Xpv = r2Xpv_new.reshape(-1,1)
else:
r2X = np.hstack((r2X, r2X_new)); r2Xpv = np.hstack((r2Xpv, r2Xpv_new.reshape(-1,1)))
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
mvmobj.update({'P':Prot, 'T':np.array(Trot).T[0], 'r2x':r2X, 'r2xpv':r2Xpv})
return mvmobj
# =============================================================================
# Build polynomial
# =============================================================================
[docs]
def findstr(string):
"""Find indices of strings containing a given pattern.
Args:
str_list (list of str): List of strings to search.
pattern (str): Substring to search for.
Returns:
list: Indices of elements in ``str_list`` that contain ``pattern``.
"""
return [i for i, s in enumerate(string) if s == '*' or s == '/']
[docs]
def evalvar(data, vname):
if vname.find('^') > 0:
actual_vname = vname[:vname.find('^')].strip()
if actual_vname in data.columns[1:]:
power = float(vname[vname.find('^')+1:])
return data[actual_vname].values.reshape(-1, 1)**power
return False
else:
actual_vname = vname.strip()
if actual_vname in data.columns[1:]:
return data[actual_vname].values.reshape(-1, 1)
return False
[docs]
def writeeq(beta_, features_):
eq_str = []
for b, f, i in zip(beta_, features_, np.arange(len(features_))):
if f == 'Bias':
eq_str.append(str(b) if b < 0 else ' + '+str(b))
else:
eq_str.append((str(b) if b < 0 or i == 0 else ' + '+str(b)) + ' * '+f)
return ''.join(eq_str)
[docs]
def build_polynomial(data, factors, response, *, bias_term=True):
'''Linear regression with variable selection assisted by PLS.'''
for j, f in enumerate(factors):
if f.find('*') > 0 or f.find('/') > 0:
indx = findstr(f)
for ii, i in enumerate(indx):
if ii == 0:
vname1 = f[0:i]
vname2 = f[i+1:indx[1]] if len(indx) > 1 else f[i+1:]
vals1 = evalvar(data, vname1); vals2 = evalvar(data, vname2)
xcol = vals1 * vals2 if f[i] == '*' else vals1 / vals2
else:
vname = f[i+1:] if len(indx) == ii+1 else f[i+1:indx[ii+1]]
vals = evalvar(data, vname)
xcol = xcol * vals if f[i] == '*' else xcol / vals
X = xcol if j == 0 else np.hstack((X, xcol))
else:
temp = evalvar(data, f)
X = temp if j == 0 else np.hstack((X, temp))
print('Built X from factors')
X_df = pd.DataFrame(X, columns=factors)
X_df.insert(0, data.columns[0], data[data.columns[0]].values)
Y_df = data[[data.columns[0], response]]
Y_arr = data[response].values
pls_obj = pls(X_df, Y_df, len(factors))
Ypred = pls_pred(X_df, pls_obj)['Yhat']
RMSE = [np.sqrt(np.mean((Y_df.values[:,1:].astype(float) - Ypred)**2))]
vip = np.sum(np.abs(pls_obj['Ws'] * pls_obj['r2y']), axis=1).ravel()
sort_indx = np.argsort(-vip)
sort_asc_indx = np.argsort(vip)
vip_sorted = vip[sort_indx]
sorted_factors = [factors[i] for i in sort_indx]
plt.figure()
plt.bar(np.arange(len(sorted_factors)), vip_sorted)
plt.xticks(np.arange(len(sorted_factors)), labels=sorted_factors, rotation=60)
plt.ylabel('VIP'); plt.xlabel('Factors'); plt.tight_layout()
sorted_asc_factors = [factors[i] for i in sort_asc_indx]
X_df_m = X_df.copy()
for f in sorted_asc_factors[:-1]:
X_df_m.drop(f, axis=1, inplace=True)
pls_obj_m = pls(X_df_m, Y_df, X_df_m.shape[1]-1, shush=True)
Ypred = pls_pred(X_df_m, pls_obj_m)['Yhat']
RMSE.append(np.sqrt(np.mean((Y_df.values[:,1:].astype(float) - Ypred)**2)))
sorted_asc_labels = ['Full'] + [factors[i] for i in sort_asc_indx[:-1]]
plt.figure()
plt.bar(np.arange(len(factors)), RMSE)
plt.xticks(np.arange(len(factors)), labels=sorted_asc_labels, rotation=60)
plt.ylabel('RMSE ('+response+')'); plt.xlabel('Factors removed from model'); plt.tight_layout()
Xaug = np.hstack((X, np.ones((X.shape[0], 1))))
factors_out = factors.copy(); factors_out.append('Bias')
betasOLSlssq, r1, r2, r3 = np.linalg.lstsq(Xaug, Y_arr, rcond=None)
eqstr = writeeq(betasOLSlssq, factors_out)
return betasOLSlssq, factors_out, Xaug, Y_arr, eqstr
# =============================================================================
# CCA
# =============================================================================
[docs]
def cca(X, Y, tol=1e-6, max_iter=1000):
"""Canonical Correlation Analysis (CCA) between PLS scores and Y.
Computes the maximum covariance directions between the score matrix T
and response Y. Equivalent to computing the predictive component in OPLS.
Args:
T (np.ndarray): Score matrix from a fitted PLS model (n_obs × A).
Y (pd.DataFrame or np.ndarray): Response matrix (n_obs × n_y).
mcs (tuple): Preprocessing flags for T and Y. Default
``('autoscale', 'autoscale')``.
Returns:
dict: CCA results with keys:
- ``Tcv`` (ndarray): Covariant scores.
- ``Pcv`` (ndarray): Covariant loadings (predictive loadings in OPLS sense).
- ``Wcv`` (ndarray): Covariant weights.
"""
X = X - np.mean(X, axis=0); Y = Y - np.mean(Y, axis=0)
Sigma_XX = X.T @ X; Sigma_YY = Y.T @ Y; Sigma_XY = X.T @ Y
w_x = np.random.rand(X.shape[1]); w_y = np.random.rand(Y.shape[1])
w_x /= np.linalg.norm(w_x); w_y /= np.linalg.norm(w_y)
for iteration in range(max_iter):
w_x_old = w_x.copy(); w_y_old = w_y.copy()
w_x = np.linalg.solve(Sigma_XX, Sigma_XY @ w_y); w_x /= np.linalg.norm(w_x)
w_y = np.linalg.solve(Sigma_YY, Sigma_XY.T @ w_x); w_y /= np.linalg.norm(w_y)
correlation = w_x.T @ Sigma_XY @ w_y
if np.linalg.norm(w_x - w_x_old) < tol and np.linalg.norm(w_y - w_y_old) < tol:
break
return correlation, w_x, w_y
[docs]
def cca_multi(X, Y, num_components=1, tol=1e-6, max_iter=1000):
"""CCA with multiple canonical variates."""
X = X - np.mean(X, axis=0); Y = Y - np.mean(Y, axis=0)
correlations = []; W_X = []; W_Y = []
for component in range(num_components):
Sigma_XX = X.T @ X; Sigma_YY = Y.T @ Y; Sigma_XY = X.T @ Y
w_x = np.random.rand(X.shape[1]); w_y = np.random.rand(Y.shape[1])
w_x /= np.linalg.norm(w_x); w_y /= np.linalg.norm(w_y)
for iteration in range(max_iter):
w_x_old = w_x.copy(); w_y_old = w_y.copy()
w_x = np.linalg.solve(Sigma_XX, Sigma_XY @ w_y); w_x /= np.linalg.norm(w_x)
w_y = np.linalg.solve(Sigma_YY, Sigma_XY.T @ w_x); w_y /= np.linalg.norm(w_y)
correlation = w_x.T @ Sigma_XY @ w_y
if np.linalg.norm(w_x - w_x_old) < tol and np.linalg.norm(w_y - w_y_old) < tol:
break
correlations.append(correlation); W_X.append(w_x); W_Y.append(w_y)
X -= (X @ w_x[:, np.newaxis]) @ w_x[np.newaxis, :]
Y -= (Y @ w_y[:, np.newaxis]) @ w_y[np.newaxis, :]
return {'correlations': np.array(correlations), 'W_X': np.array(W_X).T, 'W_Y': np.array(W_Y).T}