"""
Phi for Python (pyPhi) — Version 2.0
By Sal Garcia (sgarciam@ic.ac.uk salvadorgarciamunoz@gmail.com)
Added March 13
* Added routine to fix duplicaed obsID
Added Feb 23 2026
* Added _validate_inputs function for input validation and observation reconciliation
* Integrated validation into pca, pls, lpls entry points
* Replaced np.tile with numpy broadcasting throughout
* Optimized _Ab_btbinv with fast path for complete data
* var_t (score covariance matrix) stored in model objects to avoid recalculation
* Added _extract_array and _calc_r2 helper functions to reduce duplication
* Replaced hardcoded F-distribution and chi2 lookup tables with scipy.stats
* Replaced hardcoded t-distribution with scipy.stats
Added Feb 07 2026
* fixed cat_2_matrix for the output to be consistent with MBPLS
Added Jan 30 2025
* Added a pinv alternative protection in spectra_savgol for the case where
inv fails
Added Jan 20 2025
* Added the 'cca' flag to the pls routine to calculate CCA between
the Ts and each of the Ys (one by one), calculating loadings and scores
equivalent to a perfectly orthogonalized OPLS model. The covariant scores (Tcv)
the covariant Loadings (Pcv) and predictive weights (Wcv) are added
as keys to the model object.
[The covariant loadings(Pcv) are equivalent to the predictive loadings in OPLS]
* Added cca and cca-multi routines to perform PLS-CCA (alternative to OPLS)
as of now cca-multi remains unused.
Added Nov 18th, 2024
* replaced interp2d with RectBivariateSpline
* Protected SPE lim calculations for near zero residuals
* Added build_polynomial function to create linear regression
models with variable selection assited by PLS
by merge from James
* Added spectra preprocessing methods
* bootstrap PLS
by Salvador Garcia (sgarciam@ic.ac.uk salvadorgarciamunoz@gmail.com)
Added Dec 19th 2023
* phi.clean_htmls removes all html files in the working directory
* clean_empty_rows returns also the names of the rows removed
Added May 1st
* YMB is now added in the same structure as the XMB
* Corrected the dimensionality of the lwpls prediction, it was a double-nested array.
Added Apr 30
* Modified Multi-block PLS to include the block name in the variable name
Added Apr 29
* Included the unique routine and adjusted the parse_materials routine so materials
and lots are in the same order as in the raw data
Added Apr 27
* Enhanced adapt_pls_4_pyomo to use variable names as indices if flag is sent
Added Apr 25
* Enhanced the varimax_rotation to adjust the r2 and r2pv to the rotated loadings
Added Apr 21
* Re added varimax_rotation with complete model rotation for PCA and PLS
Added Apr 17
* Added tpls and tpls_pred
Added Apr 15
* Added jrpls model and jrpls_pred
* Added routines to reconcile columns to rows identifier so that X and R materices
correspond correctly
* Added routines to reconcile rows across a list of dataframes and produces a list
of dataframes containing only those observations present in all dataframes
Added on Apr 9 2023
* Added lpls and lpls_pred routines
* Added parse_materials to read linear table and produce R or Ri
Release as of Nov 23 2022
* Added a function to export PLS model to gPROMS code
Release as of Aug 22 2022
* Fixed access to NEOS server and use of GAMS instead of IPOPT
Release as of Aug 12 2022
* Fixed the SPE calculations in pls_pred and pca_pred
* Changed to a more efficient inversion in pca_pred (=pls_pred)
* Added a pseudo-inverse option in pmp for pca_pred
Release as of Aug 2 2022
* Added replicate_data
"""
import numpy as np
import pandas as pd
import datetime
from scipy.special import factorial
from scipy.stats import norm, f as f_dist, chi2
from scipy.optimize import fsolve
from scipy.interpolate import RectBivariateSpline
from scipy.stats import t as t_dist
from shutil import which
import os
from numpy import eye, asarray, dot, sum, diag
from numpy.linalg import svd
import matplotlib.pyplot as plt
from statsmodels.distributions.empirical_distribution import ECDF
os.environ['NEOS_EMAIL'] = 'pyphisoftware@gmail.com'
try:
from pyomo.environ import *
pyomo_ok = True
except ImportError:
pyomo_ok = False
if bool(which('gams')):
gams_ok = True
else:
gams_ok = False
ipopt_ok = bool(which('ipopt'))
if pyomo_ok and gams_ok:
from pyomo.solvers.plugins.solvers.GAMS import GAMSDirect, GAMSShell
gams_ok = (GAMSDirect().available(exception_flag=False)
or GAMSShell().available(exception_flag=False))
[docs]
def ma57_dummy_check():
m = ConcreteModel()
m.x = Var()
m.Obj = Objective(expr = m.x**2 -1)
s = SolverFactory('ipopt')
s.options['linear_solver'] = 'ma57'
import logging
pyomo_logger = logging.getLogger('pyomo.core')
LOG_DEFAULT = pyomo_logger.level
pyomo_logger.setLevel(logging.ERROR)
r = s.solve(m)
pyomo_logger.setLevel(LOG_DEFAULT)
ma57_ok = r.solver.status == SolverStatus.ok
if ma57_ok:
print("MA57 available to IPOPT")
return ma57_ok
if pyomo_ok and ipopt_ok:
try:
ma57_ok = ma57_dummy_check()
except Exception:
ma57_ok = False
else:
ma57_ok = False
if not(pyomo_ok) or (not(ipopt_ok) and not(gams_ok)):
print('Will be using the NEOS server in the absence of IPOPT and GAMS')
# =============================================================================
# Helper functions (new in v1.0)
# =============================================================================
[docs]
def add_auto_obs_id(df: pd.DataFrame) -> pd.DataFrame:
"""Add an ``Auto Obs ID`` column that makes duplicate first-column values unique.
Scans the first column of *df* for duplicate values. Unique values are
copied as-is; duplicated values receive a zero-padded numeric suffix
(e.g. ``A.1``, ``A.01``, ``A.001``) whose width is determined by the
total occurrence count for that specific ID:
* fewer than 10 occurrences → no padding (``A.1`` … ``A.9``)
* 10–99 occurrences → one leading zero (``A.01`` … ``A.99``)
* 100–999 occurrences → two leading zeros (``A.001`` … ``A.999``)
The new column is inserted at position 0; the original identifier column
is preserved unchanged.
Args:
df (pd.DataFrame): Input dataframe whose first column contains
observation identifiers. Must have at least one column.
Returns:
df (pd.DataFrame): A copy of *df* with ``Auto Obs ID`` prepended as the
first column.
df_classid (pd.DataFrame): A dataframe with ``Auto Obs ID`` as
first column and the original column ID as second column.
Raises:
ValueError: If *df* is empty (no columns).
Example:
>>> data = {"Batch ID": ["A", "A", "B"], "Temp": [10, 11, 22]}
>>> df = pd.DataFrame(data)
>>> add_auto_obs_id(df)["Auto Obs ID"].tolist()
['A.1', 'A.2', 'B']
"""
id_col = df.columns[0]
counts = df[id_col].value_counts()
duplicated_ids = counts[counts > 1].index
auto_ids = []
occurrence_tracker = {}
for val in df[id_col]:
if val in duplicated_ids:
n = counts[val] # total occurrences of this ID
occurrence_tracker[val] = occurrence_tracker.get(val, 0) + 1
idx = occurrence_tracker[val]
# Determine zero-padding width based on total duplicate count
if n < 10:
suffix = f"{idx}" # e.g. .1
elif n < 100:
suffix = f"{idx:02d}" # e.g. .01
else:
suffix = f"{idx:03d}" # e.g. .001
auto_ids.append(f"{val}.{suffix}")
else:
auto_ids.append(val)
df = df.copy()
df.insert(0, "Auto Obs ID", auto_ids)
df_classid=df[df.columns[:2].tolist() ]
df.drop(df.columns[1],axis=1,inplace=True)
return df,df_classid
def _extract_array(X):
"""Extract numpy array, observation IDs, and variable IDs from DataFrame or ndarray.
Args:
X: numpy array or pandas DataFrame (first column = obs IDs)
Returns:
(array, obsid, varid) where obsid/varid are False if input is ndarray
"""
if isinstance(X, np.ndarray):
return X.copy(), False, False
elif isinstance(X, pd.DataFrame):
arr = np.array(X.values[:, 1:]).astype(float)
obsid = X.values[:, 0].astype(str).tolist()
varid = X.columns.values[1:].tolist()
return arr, obsid, varid
def _calc_r2(residual, TSS, TSSpv, prev_r2=None, prev_r2pv=None):
"""Calculate cumulative R2 and R2 per variable, optionally appending to previous.
Args:
residual: residual matrix after deflation
TSS: total sum of squares (scalar)
TSSpv: total sum of squares per variable (1D array)
prev_r2: previous cumulative R2 array (None for first component)
prev_r2pv: previous cumulative R2pv matrix (None for first component)
Returns:
(r2, r2pv) updated arrays
"""
r2_new = 1 - np.sum(residual**2) / TSS
r2pv_new = (1 - np.sum(residual**2, axis=0) / TSSpv).reshape(-1, 1)
if prev_r2 is None:
return r2_new, r2pv_new
return np.hstack((prev_r2, r2_new)), np.hstack((prev_r2pv, r2pv_new))
def _r2_cumulative_to_per_component(r2, r2pv, A):
"""Convert cumulative R2 to per-component R2."""
for a in range(A-1, 0, -1):
r2[a] = r2[a] - r2[a-1]
r2pv[:, a] = r2pv[:, a] - r2pv[:, a-1]
return r2, r2pv
def _Ab_btbinv(A, b, A_not_nan_map):
"""Project c = Ab / (b'b), accounting for missing data.
Optimized: uses broadcasting instead of np.tile, with fast path
when there is no missing data.
"""
b_flat = b.ravel()
numer = A @ b_flat
# Fast path: no missing data
if A_not_nan_map.all():
return (numer / np.dot(b_flat, b_flat)).reshape(-1, 1)
denom = np.sum((A_not_nan_map * b_flat)**2, axis=1)
return (numer / denom).reshape(-1, 1)
# =============================================================================
# Input validation (new in v2.0)
# =============================================================================
def _validate_inputs(X, Y=None, A=None, mcs=None):
"""Validate and reconcile inputs for PCA and PLS model building.
Checks performed:
- X (and Y if provided) are DataFrame or ndarray
- DataFrames have a string/object first column (observation IDs)
- No duplicate observation IDs
- X and Y observation IDs match one-to-one (reorders Y if needed)
- A does not exceed rank limits
- mcs flag is a recognized value
Args:
X: pandas DataFrame or numpy ndarray
Y: pandas DataFrame, numpy ndarray, or None (PCA case)
A: int, number of components requested, or None to skip check
mcs: mcs flag value(s) to validate, or None to skip check
For PLS, pass as tuple: (mcsX, mcsY)
Returns:
X, Y — potentially reordered so observations align.
If Y is None, returns (X, None).
Raises:
ValueError: on validation failures that cannot be auto-corrected
"""
_valid_mcs = {True, False, 'center', 'autoscale'}
# --- Validate types ---
if not isinstance(X, (np.ndarray, pd.DataFrame)):
raise ValueError(
f"X must be a pandas DataFrame or numpy ndarray, got {type(X).__name__}")
if Y is not None and not isinstance(Y, (np.ndarray, pd.DataFrame)):
raise ValueError(
f"Y must be a pandas DataFrame or numpy ndarray, got {type(Y).__name__}")
# --- Validate mcs flags ---
if mcs is not None:
mcs_values = mcs if isinstance(mcs, tuple) else (mcs,)
for m in mcs_values:
if m not in _valid_mcs:
raise ValueError(
f"mcs value '{m}' not recognized. Must be one of: True, False, 'center', 'autoscale'")
# --- Validate A ---
if A is not None:
if not isinstance(A, (int, np.integer)) or A < 1:
raise ValueError(f"A must be a positive integer, got {A}")
# --- Helper: check DataFrame structure ---
def _check_df_structure(df, name):
if df.shape[1] < 2:
raise ValueError(
f"{name} DataFrame must have at least 2 columns "
f"(1 obs ID column + 1 data column), got {df.shape[1]}")
data_cols = df.iloc[:, 1:]
non_numeric = []
for col in data_cols.columns:
try:
data_cols[col].astype(float)
except (ValueError, TypeError):
non_numeric.append(col)
if non_numeric:
raise ValueError(
f"{name} has non-numeric data columns: {non_numeric[:5]}"
+ (f" ... and {len(non_numeric)-5} more" if len(non_numeric) > 5 else ""))
# --- Helper: check for duplicate obs IDs ---
def _check_duplicates(df, name):
obs_col = df.iloc[:, 0].astype(str)
dupes = obs_col[obs_col.duplicated(keep=False)]
if len(dupes) > 0:
unique_dupes = dupes.unique().tolist()
raise ValueError(
f"{name} has duplicate observation IDs: {unique_dupes[:10]}"
+ (f" ... and {len(unique_dupes)-10} more" if len(unique_dupes) > 10 else "")
+ ". Use phi.add_auto_obs_id(df) to automatically generate unique IDs.")
# --- DataFrame-specific validation ---
if isinstance(X, pd.DataFrame):
_check_df_structure(X, "X")
_check_duplicates(X, "X")
if Y is not None and isinstance(Y, pd.DataFrame):
_check_df_structure(Y, "Y")
_check_duplicates(Y, "Y")
# --- Validate A against dimensions ---
if A is not None:
if isinstance(X, pd.DataFrame):
n_rows = X.shape[0]
n_cols = X.shape[1] - 1
else:
n_rows = X.shape[0]
n_cols = X.shape[1]
max_A = min(n_rows, n_cols)
if A > max_A:
raise ValueError(
f"A={A} exceeds max allowable components for X "
f"({n_rows} observations x {n_cols} variables, max A={max_A})")
# --- Reconcile X and Y observations ---
if Y is None:
return X, None
# Both numpy: can only check row count
if isinstance(X, np.ndarray) and isinstance(Y, np.ndarray):
if X.shape[0] != Y.shape[0]:
raise ValueError(
f"X and Y must have the same number of rows. "
f"X has {X.shape[0]}, Y has {Y.shape[0]}")
return X, Y
# One numpy, one DataFrame: check row count only
if isinstance(X, np.ndarray) != isinstance(Y, np.ndarray):
if X.shape[0] != Y.shape[0]:
raise ValueError(
f"X and Y must have the same number of rows. "
f"X has {X.shape[0]}, Y has {Y.shape[0]}. "
f"Cannot reconcile observation order when mixing ndarray and DataFrame.")
print("Warning: Cannot verify observation alignment when mixing "
"ndarray and DataFrame. Ensure rows correspond.")
return X, Y
# Both DataFrames: full reconciliation
x_obs = X.iloc[:, 0].astype(str).tolist()
y_obs = Y.iloc[:, 0].astype(str).tolist()
x_set = set(x_obs)
y_set = set(y_obs)
in_x_only = x_set - y_set
in_y_only = y_set - x_set
common = x_set & y_set
if len(common) == 0:
raise ValueError(
"X and Y share no common observation IDs. "
f"X IDs sample: {x_obs[:5]}, Y IDs sample: {y_obs[:5]}. "
"Check that the first column contains matching observation identifiers.")
if in_x_only:
dropped = sorted(in_x_only)
print(f"Warning: {len(in_x_only)} observation(s) in X not found in Y — dropped: "
+ (', '.join(dropped[:10]) + (f' ... and {len(dropped)-10} more' if len(dropped) > 10 else '')))
X = X[X.iloc[:, 0].astype(str).isin(common)].reset_index(drop=True)
if in_y_only:
dropped = sorted(in_y_only)
print(f"Warning: {len(in_y_only)} observation(s) in Y not found in X — dropped: "
+ (', '.join(dropped[:10]) + (f' ... and {len(dropped)-10} more' if len(dropped) > 10 else '')))
Y = Y[Y.iloc[:, 0].astype(str).isin(common)].reset_index(drop=True)
x_obs = X.iloc[:, 0].astype(str).tolist()
y_obs = Y.iloc[:, 0].astype(str).tolist()
if x_obs == y_obs:
return X, Y
print("Warning: Observation order in Y does not match X. Reordering Y to align with X.")
y_obs_to_idx = {obs: idx for idx, obs in enumerate(y_obs)}
new_order = [y_obs_to_idx[obs] for obs in x_obs]
Y = Y.iloc[new_order].reset_index(drop=True)
return X, Y
# =============================================================================
# Statistical functions (v1.0: scipy.stats replaces lookup tables)
# =============================================================================
[docs]
def f99(i, j):
"""Return the F-distribution critical value at 99% confidence.
Args:
df1 (int): Numerator degrees of freedom.
df2 (int): Denominator degrees of freedom.
Returns:
float: F critical value at alpha = 0.01.
"""
if i <= 0 or j <= 0:
return 1.0
return float(f_dist.ppf(0.99, i, j))
[docs]
def f95(i, j):
"""Return the F-distribution critical value at 95% confidence.
Args:
df1 (int): Numerator degrees of freedom.
df2 (int): Denominator degrees of freedom.
Returns:
float: F critical value at alpha = 0.05.
"""
if i <= 0 or j <= 0:
return 1.0
return float(f_dist.ppf(0.95, i, j))
[docs]
def spe_ci(spe):
"""Estimate SPE control limits from training data using a chi-squared approximation.
Args:
spe_values (ndarray): SPE values from the training set (n_obs × 1).
alpha (float): Confidence level. Default ``0.95`` (also returns 99%).
Returns:
tuple: ``(lim95, lim99)`` — SPE control limits at 95% and 99%.
"""
spem = np.mean(spe)
if spem > 1E-16:
spev = np.var(spe, ddof=1)
g = spev / (2 * spem)
h = (2 * spem**2) / spev
lim95 = g * chi2.ppf(0.95, h)
lim99 = g * chi2.ppf(0.99, h)
else:
lim95 = 0
lim99 = 0
return lim95, lim99
[docs]
def single_score_conf_int(t):
"""Calculate confidence ellipse radii for score scatter plots.
Args:
mvmobj (dict): Fitted PCA or PLS model.
alpha (float): Confidence level. Default ``0.95``.
Returns:
ndarray: Ellipse radii for each pair of scores.
"""
n = t.shape[0]
st = np.var(t, ddof=1)
lim95 = t_dist.ppf(0.975, df=n) * np.sqrt(st)
lim99 = t_dist.ppf(0.995, df=n) * np.sqrt(st)
return lim95, lim99
[docs]
def scores_conf_int_calc(st, N):
"""Calculate per-score univariate confidence intervals.
Args:
mvmobj (dict): Fitted PCA or PLS model.
alpha (float): Confidence level. Default ``0.95``.
Returns:
ndarray: Confidence interval half-widths for each latent variable (A,).
"""
n_points = 100
cte2 = ((N-1)*(N+1)*(2)) / (N*(N-2))
f95_ = cte2 * f95(2, N-2)
f99_ = cte2 * f99(2, N-2)
xd95 = np.sqrt(f95_ * st[0,0])
xd99 = np.sqrt(f99_ * st[0,0])
xd95 = np.linspace(-xd95, xd95, num=n_points)
xd99 = np.linspace(-xd99, xd99, num=n_points)
st = np.linalg.inv(st)
s11, s22, s12, s21 = st[0,0], st[1,1], st[0,1], st[1,0]
a = np.tile(s22, n_points)
b_ = xd95 * np.tile(s12, n_points) + xd95 * np.tile(s21, n_points)
c = (xd95**2) * np.tile(s11, n_points) - f95_
safe_chk = b_**2 - 4*a*c
safe_chk[safe_chk < 0] = 0
yd95p = (-b_ + np.sqrt(safe_chk)) / (2*a)
yd95n = (-b_ - np.sqrt(safe_chk)) / (2*a)
a = np.tile(s22, n_points)
b_ = xd99 * np.tile(s12, n_points) + xd99 * np.tile(s21, n_points)
c = (xd99**2) * np.tile(s11, n_points) - f99_
safe_chk = b_**2 - 4*a*c
safe_chk[safe_chk < 0] = 0
yd99p = (-b_ + np.sqrt(safe_chk)) / (2*a)
yd99n = (-b_ - np.sqrt(safe_chk)) / (2*a)
return xd95, xd99, yd95p, yd95n, yd99p, yd99n
# =============================================================================
# Core utility functions
# =============================================================================
[docs]
def clean_htmls():
'''Deletes all .html files in the current directory.'''
for f in os.listdir('.'):
if 'html' in f:
os.remove(f)
return
[docs]
def z2n(X, X_nan_map):
"""Replace zeros with NaN (zero to NaN).
Args:
X (np.ndarray): Input array.
Returns:
np.ndarray: Array with zeros replaced by ``np.nan``.
"""
X[X_nan_map == 1] = np.nan
return X
[docs]
def n2z(X):
"""Replace NaN with zero (NaN to zero).
Args:
X (np.ndarray): Input array.
Returns:
tuple: ``(X_filled, nan_map)`` — array with NaNs replaced by 0,
and a boolean mask where ``True`` indicates original non-NaN positions.
"""
X_nan_map = np.isnan(X)
if X_nan_map.any():
X_nan_map = X_nan_map * 1
X[X_nan_map==1] = 0
else:
X_nan_map = X_nan_map * 1
return X, X_nan_map
[docs]
def mean(X):
X_nan_map = np.isnan(X)
X_ = X.copy()
if X_nan_map.any():
X_nan_map = X_nan_map * 1
X_[X_nan_map==1] = 0
aux = np.sum(X_nan_map, axis=0)
x_mean = np.sum(X_, axis=0, keepdims=1) / (np.ones((1, X_.shape[1])) * X_.shape[0] - aux)
else:
x_mean = np.mean(X_, axis=0, keepdims=1)
return x_mean
[docs]
def std(X):
x_mean = mean(X)
X_nan_map = np.isnan(X)
if X_nan_map.any():
X_nan_map = X_nan_map * 1
X_ = X.copy()
X_[X_nan_map==1] = 0
aux_mat = (X_ - x_mean)**2
aux_mat[X_nan_map==1] = 0
aux = np.sum(X_nan_map, axis=0)
x_std = np.sqrt((np.sum(aux_mat, axis=0, keepdims=1)) / (np.ones((1, X_.shape[1])) * (X_.shape[0]-1-aux)))
else:
x_std = np.sqrt(np.sum((X - x_mean)**2, axis=0, keepdims=1) / (np.ones((1, X.shape[1])) * (X.shape[0]-1)))
return x_std
[docs]
def meancenterscale(X, *, mcs=True):
"""Mean-center and/or scale a data matrix.
Args:
X (np.ndarray): Data matrix to preprocess (n_obs × n_vars).
mcs (str or bool): Preprocessing mode.
``'autoscale'``: mean-center and scale to unit variance.
``'center'``: mean-center only.
``False``: return unchanged.
Returns:
tuple: ``(X_processed, x_mean, x_std)`` — preprocessed matrix,
column means, and column standard deviations.
"""
if isinstance(mcs, bool):
if mcs:
x_mean = mean(X)
x_std = std(X)
X = X - x_mean
X = X / x_std
else:
x_mean = np.nan
x_std = np.nan
elif mcs == 'center':
x_mean = mean(X)
X = X - x_mean
x_std = np.ones((1, X.shape[1]))
elif mcs == 'autoscale':
x_std = std(X)
X = X / x_std
x_mean = np.zeros((1, X.shape[1]))
else:
x_mean = np.nan
x_std = np.nan
return X, x_mean, x_std
[docs]
def find(a, func):
"""Find row indices where the first column equals a given value.
Args:
X (pd.DataFrame or np.ndarray): Data matrix to search.
value: Value to search for in the first column.
Returns:
list: Row indices where the match was found.
"""
return [i for (i, val) in enumerate(a) if func(val)]
# =============================================================================
# PCA
# =============================================================================
[docs]
def pca(X, A, *, mcs=True, md_algorithm='nipals', force_nipals=True, shush=False, cross_val=0):
"""Fit a Principal Component Analysis (PCA) model.
Supports missing data via NIPALS. Can use SVD for complete
data as well.
Args:
X (pd.DataFrame or np.ndarray): Observations × variables matrix.
If a DataFrame, the first column must contain observation IDs.
A (int): Number of principal components to extract.
mcs (str or bool): Mean-centering/scaling flag.
``'autoscale'`` (default): mean-center and scale to unit variance.
``'center'``: mean-center only.
``False``: no preprocessing.
md_algorithm (str): Missing-data algorithm. ``'nipals'`` (default) or
``'nlp'``.
force_nipals (bool): If ``True``, forces NIPALS even when data is
complete. Default ``False``.
cross_val (int): Cross-validation percentage of elements to remove
per round. ``0`` = no CV, ``100`` = leave-one-out,
``1–99`` = element-wise removal. Default ``0``.
shush (bool): If ``True``, suppresses printed output. Default ``False``.
tolerance (float): NIPALS convergence tolerance. Default ``1e-10``.
maxit (int): Maximum NIPALS iterations per component. Default ``5000``.
Returns:
dict: Fitted PCA model with keys:
- ``T`` (ndarray): Scores matrix (n_obs × A).
- ``P`` (ndarray): Loadings matrix (n_vars × A).
- ``r2x`` (float): Cumulative R² for X.
- ``r2xpv`` (ndarray): Per-variable R² (n_vars × A).
- ``mx`` (ndarray): Variable means used for preprocessing.
- ``sx`` (ndarray): Variable std devs used for preprocessing.
- ``var_t`` (ndarray): Score covariance matrix (A × A).
- ``T2`` (ndarray): Hotelling's T² for training observations.
- ``T2_lim95`` (float): 95% T² control limit.
- ``T2_lim99`` (float): 99% T² control limit.
- ``speX`` (ndarray): X-space SPE for training observations.
- ``speX_lim95`` (float): 95% SPE control limit.
- ``speX_lim99`` (float): 99% SPE control limit.
- ``obsidX`` (list): Observation IDs (only if X was a DataFrame).
- ``varidX`` (list): Variable IDs (only if X was a DataFrame).
- ``q2x`` (float): Cross-validated Q² (only if ``cross_val > 0``).
NLP algorithn for missing data as in:
de la Fuente, R.L.N., García‐Muñoz, S. and Biegler, L.T., 2010.
An efficient nonlinear programming strategy for PCA models with
incomplete data sets. Journal of Chemometrics, 24(6), pp.301-311.
"""
X, _ = _validate_inputs(X, Y=None, A=A, mcs=mcs)
if cross_val == 0:
pcaobj = pca_(X, A, mcs=mcs, md_algorithm=md_algorithm, force_nipals=force_nipals, shush=shush)
pcaobj['type'] = 'pca'
elif 0 < cross_val < 100:
if isinstance(X, np.ndarray):
X_ = X.copy()
elif isinstance(X, pd.DataFrame):
X_ = np.array(X.values[:,1:]).astype(float)
if isinstance(mcs, bool):
if mcs:
X_, x_mean, x_std = meancenterscale(X_)
else:
x_mean = np.zeros((1, X_.shape[1]))
x_std = np.ones((1, X_.shape[1]))
elif mcs == 'center':
X_, x_mean, x_std = meancenterscale(X_, mcs='center')
elif mcs == 'autoscale':
X_, x_mean, x_std = meancenterscale(X_, mcs='autoscale')
X_nan_map = np.isnan(X_)
not_Xmiss = (~X_nan_map) * 1
X_, Xnanmap = n2z(X_)
TSS = np.sum(X_**2)
TSSpv = np.sum(X_**2, axis=0)
cols = X_.shape[1]
rows = X_.shape[0]
X_ = z2n(X_, Xnanmap)
for a in range(A):
if not shush:
print('Cross validating PC #' + str(a+1))
not_removed_map = not_Xmiss.copy().reshape(rows*cols, -1)
Xrnd = np.random.random(X_.shape) * not_Xmiss
indx = np.argsort(Xrnd.ravel())
elements_to_remove = int(np.ceil(rows * cols * (cross_val/100)))
error = np.zeros((rows*cols, 1))
rounds = 1
while np.sum(not_removed_map) > 0:
rounds += 1
X_copy = X_.copy()
if indx.size > elements_to_remove:
indx_this = indx[:elements_to_remove]
indx = indx[elements_to_remove:]
else:
indx_this = indx
X_copy = X_copy.reshape(rows*cols, 1)
elements_out = X_copy[indx_this]
X_copy[indx_this] = np.nan
X_copy = X_copy.reshape(rows, cols)
not_removed_map[indx_this] = 0
auxmap = np.sum(np.isnan(X_copy) * 1, axis=1)
indx2 = np.where(auxmap == X_copy.shape[1])[0].tolist()
if len(indx2) > 0:
X_copy = np.delete(X_copy, indx2, 0)
pcaobj_ = pca_(X_copy, 1, mcs=False, shush=True)
xhat = pcaobj_['T'] @ pcaobj_['P'].T
xhat = np.insert(xhat, indx2, np.nan, axis=0)
xhat = xhat.reshape(rows*cols, 1)
error[indx_this] = elements_out - xhat[indx_this]
error = error.reshape(rows, cols)
error, dummy = n2z(error)
PRESSpv = np.sum(error**2, axis=0)
PRESS = np.sum(error**2)
if a == 0:
q2 = 1 - PRESS/TSS
q2pv = (1 - PRESSpv/TSSpv).reshape(-1, 1)
else:
q2 = np.hstack((q2, 1 - PRESS/TSS))
q2pv = np.hstack((q2pv, (1 - PRESSpv/TSSpv).reshape(-1, 1)))
X_copy = X_.copy()
pcaobj_ = pca_(X_copy, 1, mcs=False, shush=True)
xhat = pcaobj_['T'] @ pcaobj_['P'].T
X_, Xnanmap = n2z(X_)
X_ = (X_ - xhat) * not_Xmiss
r2, r2pv = _calc_r2(X_, TSS, TSSpv,
None if a == 0 else r2,
None if a == 0 else r2pv)
X_ = z2n(X_, Xnanmap)
pcaobj = pca_(X, A, mcs=mcs, force_nipals=True, shush=True)
r2, r2pv = _r2_cumulative_to_per_component(r2, r2pv, A)
q2, q2pv = _r2_cumulative_to_per_component(q2, q2pv, A)
r2xc = np.cumsum(r2)
q2xc = np.cumsum(q2)
eigs = np.var(pcaobj['T'], axis=0)
pcaobj['q2'] = q2
pcaobj['q2pv'] = q2pv
if not shush:
print('phi.pca using NIPALS and cross validation (' + str(cross_val) + '%) executed on: ' + str(datetime.datetime.now()))
print('--------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) Q2X sum(Q2X)')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+": {:8.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2[a], r2xc[a], q2[a], q2xc[a]))
else:
print("PC #1: {:8.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2, r2xc[0], q2, q2xc[0]))
print('--------------------------------------------------------------')
pcaobj['type'] = 'pca'
else:
pcaobj = 'Cannot cross validate with those options'
return pcaobj
[docs]
def pca_(X, A, *, mcs=True, md_algorithm='nipals', force_nipals=True, shush=False):
X_, obsidX, varidX = _extract_array(X)
if isinstance(mcs, bool):
if mcs:
X_, x_mean, x_std = meancenterscale(X_)
else:
x_mean = np.zeros((1, X_.shape[1]))
x_std = np.ones((1, X_.shape[1]))
elif mcs == 'center':
X_, x_mean, x_std = meancenterscale(X_, mcs='center')
elif mcs == 'autoscale':
X_, x_mean, x_std = meancenterscale(X_, mcs='autoscale')
X_nan_map = np.isnan(X_)
not_Xmiss = (~X_nan_map) * 1
if not X_nan_map.any() and not force_nipals and ((X_.shape[1]/X_.shape[0] >= 10) or (X_.shape[0]/X_.shape[1] >= 10)):
if not shush:
print('phi.pca using SVD executed on: ' + str(datetime.datetime.now()))
TSS = np.sum(X_**2)
TSSpv = np.sum(X_**2, axis=0)
if X_.shape[1]/X_.shape[0] >= 10:
[U, S, Th] = np.linalg.svd(X_ @ X_.T)
T = Th.T[:, :A]
P = X_.T @ T
for a in range(A):
P[:, a] = P[:, a] / np.linalg.norm(P[:, a])
T = X_ @ P
elif X_.shape[0]/X_.shape[1] >= 10:
[U, S, Ph] = np.linalg.svd(X_.T @ X_)
P = Ph.T[:, :A]
T = X_ @ P
r2 = None; r2pv = None
for a in range(A):
X_ = X_ - T[:,[a]] @ P[:,[a]].T
r2, r2pv = _calc_r2(X_, TSS, TSSpv, r2, r2pv)
r2, r2pv = _r2_cumulative_to_per_component(r2, r2pv, A)
var_t = (T.T @ T) / T.shape[0]
pca_obj = {'T':T, 'P':P, 'r2x':r2, 'r2xpv':r2pv, 'mx':x_mean, 'sx':x_std, 'var_t':var_t}
if not isinstance(obsidX, bool):
pca_obj['obsidX'] = obsidX
pca_obj['varidX'] = varidX
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2)
if not shush:
print('--------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) ')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+": {:8.3f} {:.3f} {:.3f}".format(eigs[a], r2[a], r2xc[a]))
else:
print("PC #1: {:8.3f} {:.3f} {:.3f}".format(eigs[0], r2, r2xc[0]))
print('--------------------------------------------------------------')
T2 = hott2(pca_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1)
speX_lim95, speX_lim99 = spe_ci(speX)
pca_obj['T2'] = T2
pca_obj['T2_lim99'] = T2_lim99
pca_obj['T2_lim95'] = T2_lim95
pca_obj['speX'] = speX
pca_obj['speX_lim99'] = speX_lim99
pca_obj['speX_lim95'] = speX_lim95
return pca_obj
else:
if md_algorithm == 'nipals':
if not shush:
print('phi.pca using NIPALS executed on: ' + str(datetime.datetime.now()))
X_, dummy = n2z(X_)
epsilon = 1E-10
maxit = 5000
TSS = np.sum(X_**2)
TSSpv = np.sum(X_**2, axis=0)
r2 = None; r2pv = None
for a in range(A):
ti = X_[:, [np.argmax(std(X_))]]
Converged = False
num_it = 0
while not Converged:
# p = X't / (t't) with missing data handling
pi = np.sum(X_ * ti, axis=0) / np.sum((ti * not_Xmiss)**2, axis=0)
pi = pi / np.linalg.norm(pi)
# t_new = Xp / (p'p)
tn = X_ @ pi
ptp = np.sum((pi * not_Xmiss)**2, axis=1)
tn = tn / ptp
pi = pi.reshape(-1, 1)
if abs((np.linalg.norm(ti) - np.linalg.norm(tn))) / np.linalg.norm(ti) < epsilon:
Converged = True
if num_it > maxit:
Converged = True
if Converged:
if (len(ti[ti<0]) > 0) and (len(ti[ti>0]) > 0):
if np.var(ti[ti<0]) > np.var(ti[ti>=0]):
tn = -tn; ti = -ti; pi = -pi
if not shush:
print('# Iterations for PC #'+str(a+1)+': ', str(num_it))
if a == 0:
T = tn.reshape(-1, 1)
P = pi
else:
T = np.hstack((T, tn.reshape(-1, 1)))
P = np.hstack((P, pi))
X_ = (X_ - ti @ pi.T) * not_Xmiss
r2, r2pv = _calc_r2(X_, TSS, TSSpv, r2, r2pv)
else:
num_it += 1
ti = tn.reshape(-1, 1)
if a == 0:
numIT = num_it
else:
numIT = np.hstack((numIT, num_it))
r2, r2pv = _r2_cumulative_to_per_component(r2, r2pv, A)
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2)
if not shush:
print('--------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) ')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+": {:8.3f} {:.3f} {:.3f}".format(eigs[a], r2[a], r2xc[a]))
else:
print("PC #1: {:8.3f} {:.3f} {:.3f}".format(eigs[0], r2, r2xc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
pca_obj = {'T':T, 'P':P, 'r2x':r2, 'r2xpv':r2pv, 'mx':x_mean, 'sx':x_std, 'var_t':var_t}
if not isinstance(obsidX, bool):
pca_obj['obsidX'] = obsidX
pca_obj['varidX'] = varidX
T2 = hott2(pca_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1)
speX_lim95, speX_lim99 = spe_ci(speX)
pca_obj['T2'] = T2
pca_obj['T2_lim99'] = T2_lim99
pca_obj['T2_lim95'] = T2_lim95
pca_obj['speX'] = speX
pca_obj['speX_lim99'] = speX_lim99
pca_obj['speX_lim95'] = speX_lim95
return pca_obj
elif md_algorithm == 'nlp' and pyomo_ok:
if not shush:
print('phi.pca using NLP with Ipopt executed on: ' + str(datetime.datetime.now()))
pcaobj_ = pca_(X, A, mcs=mcs, md_algorithm='nipals', shush=True)
pcaobj_ = prep_pca_4_MDbyNLP(pcaobj_, X_)
TSS = np.sum(X_**2)
TSSpv = np.sum(X_**2, axis=0)
model = ConcreteModel()
model.A = Set(initialize=pcaobj_['pyo_A'])
model.N = Set(initialize=pcaobj_['pyo_N'])
model.O = Set(initialize=pcaobj_['pyo_O'])
model.P = Var(model.N, model.A, within=Reals, initialize=pcaobj_['pyo_P_init'])
model.T = Var(model.O, model.A, within=Reals, initialize=pcaobj_['pyo_T_init'])
model.psi = Param(model.O, model.N, initialize=pcaobj_['pyo_psi'])
model.X = Param(model.O, model.N, initialize=pcaobj_['pyo_X'])
model.delta = Param(model.A, model.A, initialize=lambda model, a1, a2: 1.0 if a1==a2 else 0)
def _c20b_con(model, a1, a2):
return sum(model.P[j, a1] * model.P[j, a2] for j in model.N) == model.delta[a1, a2]
model.c20b = Constraint(model.A, model.A, rule=_c20b_con)
def _20c_con(model, a1, a2):
if a2 < a1:
return sum(model.T[o, a1] * model.T[o, a2] for o in model.O) == 0
else:
return Constraint.Skip
model.c20c = Constraint(model.A, model.A, rule=_20c_con)
def mean_zero(model, i):
return sum(model.T[o,i] for o in model.O) == 0
model.eq3 = Constraint(model.A, rule=mean_zero)
def _eq_20a_obj(model):
return sum(sum((model.X[o,n] - model.psi[o,n] * sum(model.T[o,a] * model.P[n,a] for a in model.A))**2 for n in model.N) for o in model.O)
model.obj = Objective(rule=_eq_20a_obj)
if ipopt_ok:
print("Solving NLP using local IPOPT executable")
solver = SolverFactory('ipopt')
if ma57_ok:
solver.options['linear_solver'] = 'ma57'
results = solver.solve(model, tee=True)
elif gams_ok:
print("Solving NLP using GAMS/IPOPT interface")
solver = SolverFactory('gams:ipopt')
results = solver.solve(model, tee=True)
else:
print("Solving NLP using IPOPT on remote NEOS server")
solver_manager = SolverManagerFactory('neos')
results = solver_manager.solve(model, opt='ipopt', tee=True)
T = np.array([[value(model.T[o,a]) for a in model.A] for o in model.O])
P = np.array([[value(model.P[n,a]) for a in model.A] for n in model.N])
r2 = None; r2pv = None
for a in range(A):
ti = T[:,[a]]; pi = P[:,[a]]
if np.var(ti[ti<0]) > np.var(ti[ti>=0]):
T[:,[a]] = -T[:,[a]]; P[:,[a]] = -P[:,[a]]
ti = -ti; pi = -pi
X_ = (X_ - ti @ pi.T) * not_Xmiss
r2, r2pv = _calc_r2(X_, TSS, TSSpv, r2, r2pv)
r2, r2pv = _r2_cumulative_to_per_component(r2, r2pv, A)
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2)
if not shush:
print('--------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) ')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+": {:8.3f} {:.3f} {:.3f}".format(eigs[a], r2[a], r2xc[a]))
else:
print("PC #1: {:8.3f} {:.3f} {:.3f}".format(eigs[0], r2, r2xc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
pca_obj = {'T':T, 'P':P, 'r2x':r2, 'r2xpv':r2pv, 'mx':x_mean, 'sx':x_std, 'var_t':var_t}
if not isinstance(obsidX, bool):
pca_obj['obsidX'] = obsidX
pca_obj['varidX'] = varidX
T2 = hott2(pca_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1)
speX_lim95, speX_lim99 = spe_ci(speX)
pca_obj['T2'] = T2
pca_obj['T2_lim99'] = T2_lim99
pca_obj['T2_lim95'] = T2_lim95
pca_obj['speX'] = speX
pca_obj['speX_lim99'] = speX_lim99
pca_obj['speX_lim95'] = speX_lim95
return pca_obj
elif md_algorithm == 'nlp' and not pyomo_ok:
print('Pyomo was not found in your system sorry')
print('visit http://www.pyomo.org/ ')
return 1
# =============================================================================
# PLS
# =============================================================================
[docs]
def pls_cca(pls_obj, Xmcs, Ymcs, not_Xmiss):
Tcv=[]; Pcv=[]; Wcv=[]; Betacv=[]
firstone = True
for i in np.arange(Ymcs.shape[1]):
y_ = Ymcs[:, i].reshape(-1, 1)
corr, wt, wy = cca(pls_obj['T'], y_)
t_cv = (pls_obj['T'] @ wt).reshape(-1, 1)
beta = np.linalg.lstsq(t_cv, y_, rcond=None)[0]
w_cv = (pls_obj['Ws'] @ wt).reshape(-1, 1)
p_cv = np.sum(Xmcs * t_cv, axis=0) / np.sum((t_cv * not_Xmiss)**2, axis=0)
p_cv = p_cv.reshape(-1, 1)
if firstone:
Tcv = t_cv; Pcv = p_cv; Wcv = w_cv; Betacv = beta[0][0]
firstone = False
else:
Tcv = np.hstack((Tcv, t_cv))
Pcv = np.hstack((Pcv, p_cv))
Wcv = np.hstack((Wcv, w_cv))
Betacv = np.vstack((Betacv, beta[0][0]))
return Tcv, Pcv, Wcv, Betacv
[docs]
def pls(X, Y, A, *, mcsX=True, mcsY=True, md_algorithm='nipals', force_nipals=True, shush=False,
cross_val=0, cross_val_X=False, cca=False):
"""Fit a Partial Least Squares (PLS) regression model.
Supports missing data in both X and Y via NIPALS. Optionally computes
CCA-based covariant components (equivalent to OPLS predictive space).
Args:
X (pd.DataFrame or np.ndarray): Predictor matrix (n_obs × n_x).
If a DataFrame, the first column must contain observation IDs.
Y (pd.DataFrame or np.ndarray): Response matrix (n_obs × n_y).
If a DataFrame, the first column must contain observation IDs.
Observation IDs are reconciled with X automatically.
A (int): Number of latent variables.
mcsX, mcsY: Preprocessing flags
Each can be ``'autoscale'``, ``'center'``, or ``False``.
Default ``'autoscale'``.
md_algorithm (str): Missing-data algorithm. ``'nipals'`` or ``'nlp'``
``'nipals'`` is (default).
force_nipals (bool): Force NIPALS even for complete data. Default ``False``.
cross_val (int): Cross-validation level. ``0`` = none,
``100`` = LOO, ``1–99`` = element-wise. Default ``0``.
cross_val_X (bool): Also cross-validate X-space. Default ``False``.
shush (bool): Suppress printed output. Default ``False``.
tolerance (float): NIPALS convergence tolerance. Default ``1e-10``.
maxit (int): Max NIPALS iterations per component. Default ``5000``.
cca (bool): If ``True``, compute CCA-based covariant components and
add ``Tcv``, ``Pcv``, ``Wcv`` to the model. Default ``False``.
Returns:
dict: Fitted PLS model with keys:
- ``T`` (ndarray): X-scores (n_obs × A).
- ``P`` (ndarray): X-loadings (n_x × A).
- ``Q`` (ndarray): Y-loadings (n_y × A).
- ``W`` (ndarray): X-weights (n_x × A).
- ``Ws`` (ndarray): Rotated weights W*(P'W)⁻¹ (n_x × A).
- ``r2x`` (float): Cumulative R² for X.
- ``r2xpv`` (ndarray): Per-variable R² for X (n_x × A).
- ``r2y`` (float): Cumulative R² for Y.
- ``r2ypv`` (ndarray): Per-variable R² for Y (n_y × A).
- ``mx``, ``sx`` (ndarray): X preprocessing parameters.
- ``my``, ``sy`` (ndarray): Y preprocessing parameters.
- ``var_t`` (ndarray): Score covariance matrix (A × A).
- ``T2``, ``T2_lim95``, ``T2_lim99``: Hotelling's T² and limits.
- ``speX``, ``speX_lim95``, ``speX_lim99``: X-space SPE and limits.
- ``speY``, ``speY_lim95``, ``speY_lim99``: Y-space SPE and limits.
- ``obsidX``, ``varidX``: IDs (only if X was a DataFrame).
- ``obsidY``, ``varidY``: IDs (only if Y was a DataFrame).
- ``q2x``, ``q2y`` (float): Cross-validated Q² (if ``cross_val > 0``).
- ``Tcv``, ``Pcv``, ``Wcv``: CCA covariant components (if ``cca=True``).
NLP approach to missing data as in:
Puwakkatiya‐Kankanamage, E.H., García‐Muñoz, S. and Biegler, L.T., 2014.
An optimization‐based undeflated PLS (OUPLS) method to handle missing data in
the training set. Journal of Chemometrics, 28(7), pp.575-584.
"""
X, Y = _validate_inputs(X, Y, A=A, mcs=(mcsX, mcsY))
if cross_val == 0:
plsobj = pls_(X, Y, A, mcsX=mcsX, mcsY=mcsY, md_algorithm=md_algorithm, force_nipals=force_nipals, shush=shush, cca=cca)
plsobj['type'] = 'pls'
elif 0 < cross_val < 100:
if isinstance(X, np.ndarray):
X_ = X.copy()
elif isinstance(X, pd.DataFrame):
X_ = np.array(X.values[:,1:]).astype(float)
if isinstance(mcsX, bool):
if mcsX:
X_, x_mean, x_std = meancenterscale(X_)
else:
x_mean = np.zeros((1, X_.shape[1]))
x_std = np.ones((1, X_.shape[1]))
elif mcsX == 'center':
X_, x_mean, x_std = meancenterscale(X_, mcs='center')
elif mcsX == 'autoscale':
X_, x_mean, x_std = meancenterscale(X_, mcs='autoscale')
X_nan_map = np.isnan(X_)
not_Xmiss = (~X_nan_map) * 1
if isinstance(Y, np.ndarray):
Y_ = Y.copy()
elif isinstance(Y, pd.DataFrame):
Y_ = np.array(Y.values[:,1:]).astype(float)
if isinstance(mcsY, bool):
if mcsY:
Y_, y_mean, y_std = meancenterscale(Y_)
else:
y_mean = np.zeros((1, Y_.shape[1]))
y_std = np.ones((1, Y_.shape[1]))
elif mcsY == 'center':
Y_, y_mean, y_std = meancenterscale(Y_, mcs='center')
elif mcsY == 'autoscale':
Y_, y_mean, y_std = meancenterscale(Y_, mcs='autoscale')
Y_nan_map = np.isnan(Y_)
not_Ymiss = (~Y_nan_map) * 1
X_, Xnanmap = n2z(X_)
TSSX = np.sum(X_**2)
TSSXpv = np.sum(X_**2, axis=0)
colsX = X_.shape[1]; rowsX = X_.shape[0]
X_ = z2n(X_, Xnanmap)
Y_, Ynanmap = n2z(Y_)
TSSY = np.sum(Y_**2)
TSSYpv = np.sum(Y_**2, axis=0)
colsY = Y_.shape[1]; rowsY = Y_.shape[0]
Y_ = z2n(Y_, Ynanmap)
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None
q2Y = None; q2Ypv = None; q2X = None; q2Xpv = None
for a in range(A):
if not shush:
print('Cross validating LV #' + str(a+1))
not_removed_mapY = not_Ymiss.copy().reshape(rowsY*colsY, -1)
Yrnd = np.random.random(Y_.shape) * not_Ymiss
indxY = np.argsort(Yrnd.ravel())
elements_to_remove_Y = int(np.ceil(rowsY * colsY * (cross_val/100)))
errorY = np.zeros((rowsY*colsY, 1))
if cross_val_X:
not_removed_mapX = not_Xmiss.copy().reshape(rowsX*colsX, -1)
Xrnd = np.random.random(X_.shape) * not_Xmiss
indxX = np.argsort(Xrnd.ravel())
elements_to_remove_X = int(np.ceil(rowsX * colsX * (cross_val/100)))
errorX = np.zeros((rowsX*colsX, 1))
else:
not_removed_mapX = 0
number_of_rounds = 1
while np.sum(not_removed_mapX) > 0 or np.sum(not_removed_mapY) > 0:
number_of_rounds += 1
X_copy = X_.copy()
if cross_val_X:
if indxX.size > elements_to_remove_X:
indx_this_roundX = indxX[:elements_to_remove_X]
indxX = indxX[elements_to_remove_X:]
else:
indx_this_roundX = indxX
X_copy = X_copy.reshape(rowsX*colsX, 1)
elements_outX = X_copy[indx_this_roundX]
X_copy[indx_this_roundX] = np.nan
X_copy = X_copy.reshape(rowsX, colsX)
not_removed_mapX[indx_this_roundX] = 0
auxmap = np.sum(np.isnan(X_copy)*1, axis=1)
indx2 = np.where(auxmap == X_copy.shape[1])[0].tolist()
else:
indx2 = []
Y_copy = Y_.copy()
if indxY.size > elements_to_remove_Y:
indx_this_roundY = indxY[:elements_to_remove_Y]
indxY = indxY[elements_to_remove_Y:]
else:
indx_this_roundY = indxY
Y_copy = Y_copy.reshape(rowsY*colsY, 1)
elements_outY = Y_copy[indx_this_roundY]
Y_copy[indx_this_roundY] = np.nan
Y_copy = Y_copy.reshape(rowsY, colsY)
not_removed_mapY[indx_this_roundY] = 0
auxmap = np.sum(np.isnan(Y_copy)*1, axis=1)
indx3 = np.where(auxmap == Y_copy.shape[1])[0].tolist()
indx4 = np.unique(indx3 + indx2).tolist()
if len(indx4) > 0:
X_copy = np.delete(X_copy, indx4, 0)
Y_copy = np.delete(Y_copy, indx4, 0)
plsobj_ = pls_(X_copy, Y_copy, 1, mcsX=False, mcsY=False, shush=True)
plspred = pls_pred(X_, plsobj_)
if cross_val_X:
xhat = plspred['Tnew'] @ plsobj_['P'].T
xhat = xhat.reshape(rowsX*colsX, 1)
errorX[indx_this_roundX] = elements_outX - xhat[indx_this_roundX]
yhat = plspred['Tnew'] @ plsobj_['Q'].T
yhat = yhat.reshape(rowsY*colsY, 1)
errorY[indx_this_roundY] = elements_outY - yhat[indx_this_roundY]
if cross_val_X:
errorX = errorX.reshape(rowsX, colsX)
errorX, dummy = n2z(errorX)
PRESSXpv = np.sum(errorX**2, axis=0)
PRESSX = np.sum(errorX**2)
errorY = errorY.reshape(rowsY, colsY)
errorY, dummy = n2z(errorY)
PRESSYpv = np.sum(errorY**2, axis=0)
PRESSY = np.sum(errorY**2)
if a == 0:
q2Y = 1 - PRESSY/TSSY
q2Ypv = (1 - PRESSYpv/TSSYpv).reshape(-1, 1)
if cross_val_X:
q2X = 1 - PRESSX/TSSX
q2Xpv = (1 - PRESSXpv/TSSXpv).reshape(-1, 1)
else:
q2Y = np.hstack((q2Y, 1 - PRESSY/TSSY))
q2Ypv = np.hstack((q2Ypv, (1 - PRESSYpv/TSSYpv).reshape(-1, 1)))
if cross_val_X:
q2X = np.hstack((q2X, 1 - PRESSX/TSSX))
q2Xpv = np.hstack((q2Xpv, (1 - PRESSXpv/TSSXpv).reshape(-1, 1)))
X_copy = X_.copy(); Y_copy = Y_.copy()
plsobj_ = pls_(X_copy, Y_copy, 1, mcsX=False, mcsY=False, shush=True)
xhat = plsobj_['T'] @ plsobj_['P'].T
yhat = plsobj_['T'] @ plsobj_['Q'].T
X_, Xnanmap = n2z(X_)
Y_, Ynanmap = n2z(Y_)
X_ = (X_ - xhat) * not_Xmiss
Y_ = (Y_ - yhat) * not_Ymiss
r2X, r2Xpv = _calc_r2(X_, TSSX, TSSXpv,
None if a == 0 else r2X,
None if a == 0 else r2Xpv)
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv,
None if a == 0 else r2Y,
None if a == 0 else r2Ypv)
X_ = z2n(X_, Xnanmap)
Y_ = z2n(Y_, Ynanmap)
plsobj = pls_(X, Y, A, mcsX=mcsX, mcsY=mcsY, shush=True, cca=cca)
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
if cross_val_X:
q2X, q2Xpv = _r2_cumulative_to_per_component(q2X, q2Xpv, A)
else:
q2X = False; q2Xpv = False
q2Y, q2Ypv = _r2_cumulative_to_per_component(q2Y, q2Ypv, A)
r2xc = np.cumsum(r2X); r2yc = np.cumsum(r2Y)
q2xc = np.cumsum(q2X) if cross_val_X else False
q2yc = np.cumsum(q2Y)
eigs = np.var(plsobj['T'], axis=0)
plsobj['q2Y'] = q2Y
plsobj['q2Ypv'] = q2Ypv
if cross_val_X:
plsobj['q2X'] = q2X
plsobj['q2Xpv'] = q2Xpv
if not shush:
print('phi.pls using NIPALS and cross-validation (' + str(cross_val) + '%) executed on: ' + str(datetime.datetime.now()))
if not cross_val_X:
print('---------------------------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) R2Y sum(R2Y) Q2Y sum(Q2Y)')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+":{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], r2Y[a], r2yc[a], q2Y[a], q2yc[a]))
else:
print("PC #1:{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], r2Y, r2yc[0], q2Y, q2yc[0]))
print('---------------------------------------------------------------------------------')
else:
print('-------------------------------------------------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) Q2X sum(Q2X) R2Y sum(R2Y) Q2Y sum(Q2Y)')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+":{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], q2X[a], q2xc[a], r2Y[a], r2yc[a], q2Y[a], q2yc[a]))
else:
print("PC #1:{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], q2X, q2xc[0], r2Y, r2yc[0], q2Y, q2yc[0]))
print('-------------------------------------------------------------------------------------------------------')
plsobj['type'] = 'pls'
elif cross_val == 100:
# LOO cross-validation
if isinstance(X, np.ndarray):
X_ = X.copy()
elif isinstance(X, pd.DataFrame):
X_ = np.array(X.values[:,1:]).astype(float)
if isinstance(mcsX, bool):
if mcsX:
X_, x_mean, x_std = meancenterscale(X_)
else:
x_mean = np.zeros((1, X_.shape[1]))
x_std = np.ones((1, X_.shape[1]))
elif mcsX == 'center':
X_, x_mean, x_std = meancenterscale(X_, mcs='center')
elif mcsX == 'autoscale':
X_, x_mean, x_std = meancenterscale(X_, mcs='autoscale')
X_nan_map = np.isnan(X_)
not_Xmiss = (~X_nan_map) * 1
if isinstance(Y, np.ndarray):
Y_ = Y.copy()
elif isinstance(Y, pd.DataFrame):
Y_ = np.array(Y.values[:,1:]).astype(float)
if isinstance(mcsY, bool):
if mcsY:
Y_, y_mean, y_std = meancenterscale(Y_)
else:
y_mean = np.zeros((1, Y_.shape[1]))
y_std = np.ones((1, Y_.shape[1]))
elif mcsY == 'center':
Y_, y_mean, y_std = meancenterscale(Y_, mcs='center')
elif mcsY == 'autoscale':
Y_, y_mean, y_std = meancenterscale(Y_, mcs='autoscale')
Y_nan_map = np.isnan(Y_)
not_Ymiss = (~Y_nan_map) * 1
X_, Xnanmap = n2z(X_)
TSSX = np.sum(X_**2)
TSSXpv = np.sum(X_**2, axis=0)
colsX = X_.shape[1]; rowsX = X_.shape[0]
X_ = z2n(X_, Xnanmap)
Y_, Ynanmap = n2z(Y_)
TSSY = np.sum(Y_**2)
TSSYpv = np.sum(Y_**2, axis=0)
colsY = Y_.shape[1]; rowsY = Y_.shape[0]
Y_ = z2n(Y_, Ynanmap)
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None
for a in range(A):
errorY = np.zeros((rowsY*colsY, 1))
if cross_val_X:
errorX = np.zeros((rowsX*colsX, 1))
for o in range(X_.shape[0]):
X_copy = X_.copy(); Y_copy = Y_.copy()
elements_outX = X_copy[o, :].copy()
elements_outY = Y_copy[o, :].copy()
X_copy = np.delete(X_copy, o, 0)
Y_copy = np.delete(Y_copy, o, 0)
plsobj_ = pls_(X_copy, Y_copy, 1, mcsX=False, mcsY=False, shush=True)
plspred = pls_pred(elements_outX, plsobj_)
if o == 0:
if cross_val_X:
errorX_mat = elements_outX - plspred['Xhat']
errorY_mat = elements_outY - plspred['Yhat']
else:
if cross_val_X:
errorX_mat = np.vstack((errorX_mat, elements_outX - plspred['Xhat']))
errorY_mat = np.vstack((errorY_mat, elements_outY - plspred['Yhat']))
if cross_val_X:
errorX_mat, dummy = n2z(errorX_mat)
PRESSXpv = np.sum(errorX_mat**2, axis=0)
PRESSX = np.sum(errorX_mat**2)
errorY_mat, dummy = n2z(errorY_mat)
PRESSYpv = np.sum(errorY_mat**2, axis=0)
PRESSY = np.sum(errorY_mat**2)
if a == 0:
q2Y = 1 - PRESSY/TSSY
q2Ypv = (1 - PRESSYpv/TSSYpv).reshape(-1, 1)
if cross_val_X:
q2X = 1 - PRESSX/TSSX
q2Xpv = (1 - PRESSXpv/TSSXpv).reshape(-1, 1)
else:
q2Y = np.hstack((q2Y, 1 - PRESSY/TSSY))
q2Ypv = np.hstack((q2Ypv, (1 - PRESSYpv/TSSYpv).reshape(-1, 1)))
if cross_val_X:
q2X = np.hstack((q2X, 1 - PRESSX/TSSX))
q2Xpv = np.hstack((q2Xpv, (1 - PRESSXpv/TSSXpv).reshape(-1, 1)))
X_copy = X_.copy(); Y_copy = Y_.copy()
plsobj_ = pls_(X_copy, Y_copy, 1, mcsX=False, mcsY=False, shush=True)
xhat = plsobj_['T'] @ plsobj_['P'].T
yhat = plsobj_['T'] @ plsobj_['Q'].T
X_, Xnanmap = n2z(X_)
Y_, Ynanmap = n2z(Y_)
X_ = (X_ - xhat) * not_Xmiss
Y_ = (Y_ - yhat) * not_Ymiss
r2X, r2Xpv = _calc_r2(X_, TSSX, TSSXpv,
None if a == 0 else r2X,
None if a == 0 else r2Xpv)
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv,
None if a == 0 else r2Y,
None if a == 0 else r2Ypv)
X_ = z2n(X_, Xnanmap)
Y_ = z2n(Y_, Ynanmap)
plsobj = pls_(X, Y, A, mcsX=mcsX, mcsY=mcsY, shush=True, cca=cca)
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
if cross_val_X:
q2X, q2Xpv = _r2_cumulative_to_per_component(q2X, q2Xpv, A)
else:
q2X = False; q2Xpv = False
q2Y, q2Ypv = _r2_cumulative_to_per_component(q2Y, q2Ypv, A)
r2xc = np.cumsum(r2X); r2yc = np.cumsum(r2Y)
q2xc = np.cumsum(q2X) if cross_val_X else False
q2yc = np.cumsum(q2Y)
eigs = np.var(plsobj['T'], axis=0)
plsobj['q2Y'] = q2Y
plsobj['q2Ypv'] = q2Ypv
plsobj['type'] = 'pls'
if cross_val_X:
plsobj['q2X'] = q2X
plsobj['q2Xpv'] = q2Xpv
if not shush:
if not cross_val_X:
print('---------------------------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) R2Y sum(R2Y) Q2Y sum(Q2Y)')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+":{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], r2Y[a], r2yc[a], q2Y[a], q2yc[a]))
else:
print("PC #1:{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], r2Y, r2yc[0], q2Y, q2yc[0]))
print('---------------------------------------------------------------------------------')
else:
print('-------------------------------------------------------------------------------------------------------')
print('PC # Eig R2X sum(R2X) Q2X sum(Q2X) R2Y sum(R2Y) Q2Y sum(Q2Y)')
if A > 1:
for a in range(A):
print("PC #"+str(a+1)+":{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], q2X[a], q2xc[a], r2Y[a], r2yc[a], q2Y[a], q2yc[a]))
else:
print("PC #1:{:8.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], q2X, q2xc[0], r2Y, r2yc[0], q2Y, q2yc[0]))
print('-------------------------------------------------------------------------------------------------------')
else:
plsobj = 'Cannot cross validate with those options'
return plsobj
[docs]
def pls_(X, Y, A, *, mcsX=True, mcsY=True, md_algorithm='nipals',
force_nipals=True, shush=False, cca=False):
X_, obsidX, varidX = _extract_array(X)
Y_, obsidY, varidY = _extract_array(Y)
if isinstance(mcsX, bool):
if mcsX:
X_, x_mean, x_std = meancenterscale(X_)
else:
x_mean = np.zeros((1, X_.shape[1]))
x_std = np.ones((1, X_.shape[1]))
elif mcsX == 'center':
X_, x_mean, x_std = meancenterscale(X_, mcs='center')
elif mcsX == 'autoscale':
X_, x_mean, x_std = meancenterscale(X_, mcs='autoscale')
if isinstance(mcsY, bool):
if mcsY:
Y_, y_mean, y_std = meancenterscale(Y_)
else:
y_mean = np.zeros((1, Y_.shape[1]))
y_std = np.ones((1, Y_.shape[1]))
elif mcsY == 'center':
Y_, y_mean, y_std = meancenterscale(Y_, mcs='center')
elif mcsY == 'autoscale':
Y_, y_mean, y_std = meancenterscale(Y_, mcs='autoscale')
X_nan_map = np.isnan(X_)
not_Xmiss = (~X_nan_map) * 1
Y_nan_map = np.isnan(Y_)
not_Ymiss = (~Y_nan_map) * 1
if (not X_nan_map.any() and not Y_nan_map.any()) and not force_nipals:
if cca:
Xmcs = X_.copy(); Ymcs = Y_.copy()
if not shush:
print('phi.pls using SVD executed on: ' + str(datetime.datetime.now()))
TSSX = np.sum(X_**2); TSSXpv = np.sum(X_**2, axis=0)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None
for a in range(A):
[U_, S, Wh] = np.linalg.svd((X_.T @ Y_) @ (Y_.T @ X_))
w = Wh.T[:, [0]]
t = X_ @ w
q = Y_.T @ t / (t.T @ t)
u = Y_ @ q / (q.T @ q)
p = X_.T @ t / (t.T @ t)
X_ = X_ - t @ p.T
Y_ = Y_ - t @ q.T
if a == 0:
W = w; T = t; Q = q; U = u; P = p
else:
W = np.hstack((W, w)); T = np.hstack((T, t))
Q = np.hstack((Q, q)); U = np.hstack((U, u)); P = np.hstack((P, p))
r2X, r2Xpv = _calc_r2(X_, TSSX, TSSXpv, r2X, r2Xpv)
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv, r2Y, r2Ypv)
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
Ws = W @ np.linalg.pinv(P.T @ W)
Ws[:, 0] = W[:, 0]
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2X); r2yc = np.cumsum(r2Y)
if not shush:
print('--------------------------------------------------------------')
print('LV # Eig R2X sum(R2X) R2Y sum(R2Y)')
if A > 1:
for a in range(A):
print("LV #"+str(a+1)+": {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], r2Y[a], r2yc[a]))
else:
print("LV #1: {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], r2Y, r2yc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
pls_obj = {'T':T, 'P':P, 'Q':Q, 'W':W, 'Ws':Ws, 'U':U,
'r2x':r2X, 'r2xpv':r2Xpv, 'mx':x_mean, 'sx':x_std,
'r2y':r2Y, 'r2ypv':r2Ypv, 'my':y_mean, 'sy':y_std, 'var_t':var_t}
if not isinstance(obsidX, bool):
pls_obj['obsidX'] = obsidX; pls_obj['varidX'] = varidX
if not isinstance(obsidY, bool):
pls_obj['obsidY'] = obsidY; pls_obj['varidY'] = varidY
T2 = hott2(pls_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1)
speX_lim95, speX_lim99 = spe_ci(speX)
speY = np.sum(Y_**2, axis=1, keepdims=1)
speY_lim95, speY_lim99 = spe_ci(speY)
pls_obj['T2'] = T2; pls_obj['T2_lim99'] = T2_lim99; pls_obj['T2_lim95'] = T2_lim95
pls_obj['speX'] = speX; pls_obj['speX_lim99'] = speX_lim99; pls_obj['speX_lim95'] = speX_lim95
pls_obj['speY'] = speY; pls_obj['speY_lim99'] = speY_lim99; pls_obj['speY_lim95'] = speY_lim95
if cca:
Tcv, Pcv, Wcv, Betacv = pls_cca(pls_obj, Xmcs, Ymcs, not_Xmiss)
pls_obj['Tcv'] = Tcv; pls_obj['Pcv'] = Pcv; pls_obj['Wcv'] = Wcv; pls_obj['Betacv'] = Betacv
return pls_obj
else:
if md_algorithm == 'nipals':
if not shush:
print('phi.pls using NIPALS executed on: ' + str(datetime.datetime.now()))
X_, dummy = n2z(X_)
Y_, dummy = n2z(Y_)
epsilon = 1E-9; maxit = 2000
if cca:
Xmcs = X_.copy(); Ymcs = Y_.copy()
TSSX = np.sum(X_**2); TSSXpv = np.sum(X_**2, axis=0)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None
for a in range(A):
ui = Y_[:, [np.argmax(std(Y_))]]
Converged = False; num_it = 0
while not Converged:
# w = X'u / (u'u)
wi = np.sum(X_ * ui, axis=0) / np.sum((ui * not_Xmiss)**2, axis=0)
wi = wi / np.linalg.norm(wi)
# t = Xw / (w'w)
ti = X_ @ wi
wtw = np.sum((wi * not_Xmiss)**2, axis=1)
ti = ti / wtw
ti = ti.reshape(-1, 1); wi = wi.reshape(-1, 1)
# q = Y't / (t't)
qi = np.sum(Y_ * ti, axis=0) / np.sum((ti * not_Ymiss)**2, axis=0)
# u = Yq / (q'q)
qi_1d = qi.copy()
qi = qi.reshape(-1, 1)
un = Y_ @ qi
qtq = np.sum((qi_1d * not_Ymiss)**2, axis=1).reshape(-1, 1)
un = un / qtq
if abs((np.linalg.norm(ui) - np.linalg.norm(un))) / np.linalg.norm(ui) < epsilon:
Converged = True
if num_it > maxit:
Converged = True
if Converged:
if np.var(ti[ti<0]) > np.var(ti[ti>=0]):
ti = -ti; wi = -wi; un = -un; qi = -qi
if not shush:
print('# Iterations for LV #'+str(a+1)+': ', str(num_it))
# p = X't / (t't)
pi = np.sum(X_ * ti, axis=0) / np.sum((ti * not_Xmiss)**2, axis=0)
pi = pi.reshape(-1, 1)
X_ = (X_ - ti @ pi.T) * not_Xmiss
Y_ = (Y_ - ti @ qi.T) * not_Ymiss
if a == 0:
T = ti; P = pi; W = wi; U = un; Q = qi
else:
T = np.hstack((T, ti)); U = np.hstack((U, un))
P = np.hstack((P, pi)); W = np.hstack((W, wi)); Q = np.hstack((Q, qi))
r2X, r2Xpv = _calc_r2(X_, TSSX, TSSXpv, r2X, r2Xpv)
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv, r2Y, r2Ypv)
else:
num_it += 1
ui = un
if a == 0:
numIT = num_it
else:
numIT = np.hstack((numIT, num_it))
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
Ws = W @ np.linalg.pinv(P.T @ W)
Ws[:, 0] = W[:, 0]
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2X); r2yc = np.cumsum(r2Y)
if not shush:
print('--------------------------------------------------------------')
print('LV # Eig R2X sum(R2X) R2Y sum(R2Y)')
if A > 1:
for a in range(A):
print("LV #"+str(a+1)+": {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], r2Y[a], r2yc[a]))
else:
print("LV #1: {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], r2Y, r2yc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
pls_obj = {'T':T, 'P':P, 'Q':Q, 'W':W, 'Ws':Ws, 'U':U,
'r2x':r2X, 'r2xpv':r2Xpv, 'mx':x_mean, 'sx':x_std,
'r2y':r2Y, 'r2ypv':r2Ypv, 'my':y_mean, 'sy':y_std, 'var_t':var_t}
if not isinstance(obsidX, bool):
pls_obj['obsidX'] = obsidX; pls_obj['varidX'] = varidX
if not isinstance(obsidY, bool):
pls_obj['obsidY'] = obsidY; pls_obj['varidY'] = varidY
T2 = hott2(pls_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1)
speX_lim95, speX_lim99 = spe_ci(speX)
speY = np.sum(Y_**2, axis=1, keepdims=1)
speY_lim95, speY_lim99 = spe_ci(speY)
pls_obj['T2'] = T2; pls_obj['T2_lim99'] = T2_lim99; pls_obj['T2_lim95'] = T2_lim95
pls_obj['speX'] = speX; pls_obj['speX_lim99'] = speX_lim99; pls_obj['speX_lim95'] = speX_lim95
pls_obj['speY'] = speY; pls_obj['speY_lim99'] = speY_lim99; pls_obj['speY_lim95'] = speY_lim95
if cca:
Tcv, Pcv, Wcv, Betacv = pls_cca(pls_obj, Xmcs, Ymcs, not_Xmiss)
pls_obj['Tcv'] = Tcv; pls_obj['Pcv'] = Pcv; pls_obj['Wcv'] = Wcv; pls_obj['Betacv'] = Betacv
return pls_obj
elif md_algorithm == 'nlp':
shush = False
if not shush:
print('phi.pls using NLP with Ipopt executed on: ' + str(datetime.datetime.now()))
X_, dummy = n2z(X_)
Y_, dummy = n2z(Y_)
plsobj_ = pls_(X, Y, A, mcsX=mcsX, mcsY=mcsY, md_algorithm='nipals', shush=True)
plsobj_ = prep_pls_4_MDbyNLP(plsobj_, X_, Y_)
TSSX = np.sum(X_**2); TSSXpv = np.sum(X_**2, axis=0)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
model = ConcreteModel()
model.A = Set(initialize=plsobj_['pyo_A'])
model.N = Set(initialize=plsobj_['pyo_N'])
model.M = Set(initialize=plsobj_['pyo_M'])
model.O = Set(initialize=plsobj_['pyo_O'])
model.P = Var(model.N, model.A, within=Reals, initialize=plsobj_['pyo_P_init'])
model.T = Var(model.O, model.A, within=Reals, initialize=plsobj_['pyo_T_init'])
model.psi = Param(model.O, model.N, initialize=plsobj_['pyo_psi'])
model.X = Param(model.O, model.N, initialize=plsobj_['pyo_X'])
model.theta = Param(model.O, model.M, initialize=plsobj_['pyo_theta'])
model.Y = Param(model.O, model.M, initialize=plsobj_['pyo_Y'])
model.delta = Param(model.A, model.A, initialize=lambda model, a1, a2: 1.0 if a1==a2 else 0)
def _c27bc_con(model, a1, a2):
return sum(model.P[j, a1] * model.P[j, a2] for j in model.N) == model.delta[a1, a2]
model.c27bc = Constraint(model.A, model.A, rule=_c27bc_con)
def _27d_con(model, a1, a2):
if a2 < a1:
return sum(model.T[o, a1] * model.T[o, a2] for o in model.O) == 0
else:
return Constraint.Skip
model.c27d = Constraint(model.A, model.A, rule=_27d_con)
def _27e_con(model, i):
return sum(model.T[o,i] for o in model.O) == 0
model.c27e = Constraint(model.A, rule=_27e_con)
def _eq_27a_obj(model):
return sum(sum(sum((model.theta[o,m]*model.Y[o,m]) * (model.X[o,n] - model.psi[o,n] * sum(model.T[o,a] * model.P[n,a] for a in model.A)) for o in model.O)**2 for n in model.N) for m in model.M)
model.obj = Objective(rule=_eq_27a_obj)
if ipopt_ok:
solver = SolverFactory('ipopt')
if ma57_ok: solver.options['linear_solver'] = 'ma57'
results = solver.solve(model, tee=True)
elif gams_ok:
solver = SolverFactory('gams:ipopt')
results = solver.solve(model, tee=True)
else:
solver_manager = SolverManagerFactory('neos')
results = solver_manager.solve(model, opt='ipopt', tee=True)
T = np.array([[value(model.T[o,a]) for a in model.A] for o in model.O])
P = np.array([[value(model.P[n,a]) for a in model.A] for n in model.N])
# Obtain Ws via NLP
Taux = np2D2pyomo(T)
modelb = ConcreteModel()
modelb.A = Set(initialize=plsobj_['pyo_A'])
modelb.N = Set(initialize=plsobj_['pyo_N'])
modelb.O = Set(initialize=plsobj_['pyo_O'])
modelb.Ws = Var(model.N, model.A, within=Reals, initialize=plsobj_['pyo_Ws_init'])
modelb.T = Param(model.O, model.A, within=Reals, initialize=Taux)
modelb.psi = Param(model.O, model.N, initialize=plsobj_['pyo_psi'])
modelb.X = Param(model.O, model.N, initialize=plsobj_['pyo_X'])
def _eq_obj(model):
return sum(sum((model.T[o,a] - sum(model.psi[o,n] * model.X[o,n] * model.Ws[n,a] for n in model.N))**2 for a in model.A) for o in model.O)
modelb.obj = Objective(rule=_eq_obj)
if ipopt_ok:
solver = SolverFactory('ipopt')
if ma57_ok: solver.options['linear_solver'] = 'ma57'
results = solver.solve(modelb, tee=True)
elif gams_ok:
solver = SolverFactory('gams:ipopt')
results = solver.solve(modelb, tee=True)
else:
solver_manager = SolverManagerFactory('neos')
results = solver_manager.solve(modelb, opt='ipopt', tee=True)
Ws = np.array([[value(modelb.Ws[n,a]) for a in modelb.A] for n in modelb.N])
# Obtain Q via NLP
Xhat = T @ P.T
Xaux = X_.copy(); Xaux[X_nan_map] = Xhat[X_nan_map]
Xaux = np2D2pyomo(Xaux); Taux = np2D2pyomo(T)
model2 = ConcreteModel()
model2.A = Set(initialize=plsobj_['pyo_A'])
model2.N = Set(initialize=plsobj_['pyo_N'])
model2.M = Set(initialize=plsobj_['pyo_M'])
model2.O = Set(initialize=plsobj_['pyo_O'])
model2.T = Param(model.O, model.A, within=Reals, initialize=Taux)
model2.Q = Var(model.M, model.A, within=Reals, initialize=plsobj_['pyo_Q_init'])
model2.X = Param(model.O, model.N, initialize=plsobj_['pyo_X'])
model2.theta = Param(model.O, model.M, initialize=plsobj_['pyo_theta'])
model2.Y = Param(model.O, model.M, initialize=plsobj_['pyo_Y'])
model2.delta = Param(model.A, model.A, initialize=lambda model, a1, a2: 1.0 if a1==a2 else 0)
def _eq_36a_mod_obj(model):
return sum(sum(sum((model.X[o,n]) * (model.Y[o,m] - model.theta[o,m] * sum(model.T[o,a] * model.Q[m,a] for a in model.A)) for o in model.O)**2 for n in model.N) for m in model.M)
model2.obj = Objective(rule=_eq_36a_mod_obj)
if ipopt_ok:
solver = SolverFactory('ipopt')
if ma57_ok: solver.options['linear_solver'] = 'ma57'
results = solver.solve(model2, tee=True)
elif gams_ok:
solver = SolverFactory('gams:ipopt')
results = solver.solve(model2, tee=True)
else:
solver_manager = SolverManagerFactory('neos')
results = solver_manager.solve(model2, opt='ipopt', tee=True)
Q = np.array([[value(model2.Q[m,a]) for a in model2.A] for m in model2.M])
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None
for a in range(A):
ti = T[:,[a]]; pi = P[:,[a]]; qi = Q[:,[a]]
X_ = (X_ - ti @ pi.T) * not_Xmiss
Y_ = (Y_ - ti @ qi.T) * not_Ymiss
r2X, r2Xpv = _calc_r2(X_, TSSX, TSSXpv, r2X, r2Xpv)
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv, r2Y, r2Ypv)
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2X); r2yc = np.cumsum(r2Y)
if not shush:
print('--------------------------------------------------------------')
print('LV # Eig R2X sum(R2X) R2Y sum(R2Y)')
if A > 1:
for a in range(A):
print("LV #"+str(a+1)+": {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], r2Y[a], r2yc[a]))
else:
print("LV #1: {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], r2Y, r2yc[0]))
print('--------------------------------------------------------------')
W = 1; U = 1
var_t = (T.T @ T) / T.shape[0]
pls_obj = {'T':T, 'P':P, 'Q':Q, 'W':W, 'Ws':Ws, 'U':U,
'r2x':r2X, 'r2xpv':r2Xpv, 'mx':x_mean, 'sx':x_std,
'r2y':r2Y, 'r2ypv':r2Ypv, 'my':y_mean, 'sy':y_std, 'var_t':var_t}
if not isinstance(obsidX, bool):
pls_obj['obsidX'] = obsidX; pls_obj['varidX'] = varidX
if not isinstance(obsidY, bool):
pls_obj['obsidY'] = obsidY; pls_obj['varidY'] = varidY
T2 = hott2(pls_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1)
speX_lim95, speX_lim99 = spe_ci(speX)
speY = np.sum(Y_**2, axis=1, keepdims=1)
speY_lim95, speY_lim99 = spe_ci(speY)
pls_obj['T2'] = T2; pls_obj['T2_lim99'] = T2_lim99; pls_obj['T2_lim95'] = T2_lim95
pls_obj['speX'] = speX; pls_obj['speX_lim99'] = speX_lim99; pls_obj['speX_lim95'] = speX_lim95
pls_obj['speY'] = speY; pls_obj['speY_lim99'] = speY_lim99; pls_obj['speY_lim95'] = speY_lim95
return pls_obj
# =============================================================================
# Prediction functions (v1.0: use stored var_t, broadcasting)
# =============================================================================
[docs]
def hott2(mvmobj, *, Xnew=False, Tnew=False):
"""Compute Hotelling's T² statistic.
Args:
mvmobj (dict): Fitted PCA or PLS model.
Xnew (pd.DataFrame or np.ndarray): New X observations (optional).
If provided, scores are computed internally before T² calculation.
Tnew (np.ndarray): Pre-computed scores (optional). Used directly
if provided; avoids redundant projection.
Returns:
ndarray: T² value for each observation (n_obs,).
Note:
If neither ``Xnew`` nor ``Tnew`` is provided, returns T² for the
training set stored in ``mvmobj``.
"""
var_t = mvmobj.get('var_t', (mvmobj['T'].T @ mvmobj['T']) / mvmobj['T'].shape[0])
var_t_inv = np.linalg.inv(var_t)
if isinstance(Xnew, bool) and not isinstance(Tnew, bool):
hott2_ = np.sum((Tnew @ var_t_inv) * Tnew, axis=1)
elif isinstance(Tnew, bool) and not isinstance(Xnew, bool):
if 'Q' in mvmobj:
xpred = pls_pred(Xnew, mvmobj)
else:
xpred = pca_pred(Xnew, mvmobj)
Tnew = xpred['Tnew']
hott2_ = np.sum((Tnew @ var_t_inv) * Tnew, axis=1)
elif isinstance(Xnew, bool) and isinstance(Tnew, bool):
Tnew = mvmobj['T']
hott2_ = np.sum((Tnew @ var_t_inv) * Tnew, axis=1)
return hott2_
[docs]
def pca_pred(Xnew, pcaobj, *, algorithm='p2mp'):
"""Project new observations onto a fitted PCA model.
Args:
Xnew (pd.DataFrame or np.ndarray): New observations to project.
Variables must match those used to train ``pcaobj``.
pcaobj (dict): Fitted PCA model from :func:`pca`.
algorithm (str): Projection algorithm. ``'p2mp'`` (default) handles
missing data; ``'standard'`` uses direct matrix multiplication
and requires complete data.
Returns:
dict: Prediction results with keys:
- ``Tnew`` (ndarray): Projected scores (n_new × A).
- ``Xhat`` (ndarray): Reconstructed X in original scale.
- ``speX`` (ndarray): SPE for each new observation.
- ``T2`` (ndarray): Hotelling's T² for each new observation.
"""
if isinstance(Xnew, np.ndarray):
X_ = Xnew.copy()
if X_.ndim == 1:
X_ = X_.reshape(1, -1)
elif isinstance(Xnew, pd.DataFrame):
X_ = np.array(Xnew.values[:,1:]).astype(float)
var_t = pcaobj.get('var_t', (pcaobj['T'].T @ pcaobj['T']) / pcaobj['T'].shape[0])
X_nan_map = np.isnan(X_)
if not X_nan_map.any():
X_mcs = (X_ - pcaobj['mx']) / pcaobj['sx']
tnew = X_mcs @ pcaobj['P']
xhat = tnew @ pcaobj['P'].T * pcaobj['sx'] + pcaobj['mx']
htt2 = np.sum((tnew @ np.linalg.inv(var_t)) * tnew, axis=1)
spe_ = X_mcs - tnew @ pcaobj['P'].T
spe_ = np.sum(spe_**2, axis=1, keepdims=True)
xpred = {'Xhat':xhat, 'Tnew':tnew, 'speX':spe_, 'T2':htt2}
elif algorithm == 'p2mp':
not_Xmiss = (~X_nan_map) * 1
Xmcs = (X_ - pcaobj['mx']) / pcaobj['sx']
Xmcs, dummy = n2z(Xmcs)
for i in range(Xmcs.shape[0]):
row_map = not_Xmiss[[i], :]
tempP = pcaobj['P'] * row_map.T
PTP = tempP.T @ tempP
try:
tnew_, resid, rank, s = np.linalg.lstsq(PTP, tempP.T @ Xmcs[[i], :].T, rcond=None)
except:
tnew_ = np.linalg.pinv(PTP) @ tempP.T @ Xmcs[[i], :].T
if i == 0:
tnew = tnew_.T
else:
tnew = np.vstack((tnew, tnew_.T))
xhat = tnew @ pcaobj['P'].T * pcaobj['sx'] + pcaobj['mx']
htt2 = np.sum((tnew @ np.linalg.inv(var_t)) * tnew, axis=1)
spe_ = (Xmcs - tnew @ pcaobj['P'].T) * not_Xmiss
spe_ = np.sum(spe_**2, axis=1, keepdims=True)
xpred = {'Xhat':xhat, 'Tnew':tnew, 'speX':spe_, 'T2':htt2}
return xpred
[docs]
def pls_pred(Xnew, plsobj):
"""Predict Y for new observations using a fitted PLS model.
Args:
Xnew (pd.DataFrame or np.ndarray): New predictor observations.
Variables must match those used to train ``plsobj``.
plsobj (dict): Fitted PLS model from :func:`pls`.
algorithm (str): Projection algorithm. ``'p2mp'`` (default) handles
missing data; ``'standard'`` requires complete data.
Returns:
dict: Prediction results with keys:
- ``Tnew`` (ndarray): X-scores for new observations (n_new × A).
- ``Yhat`` (ndarray): Predicted Y in original scale (n_new × n_y).
- ``Xhat`` (ndarray): Reconstructed X in original scale.
- ``speX`` (ndarray): X-space SPE for each new observation.
- ``T2`` (ndarray): Hotelling's T² for each new observation.
- ``Tcv`` (ndarray): CCA covariant scores (only if model has ``Wcv``).
"""
algorithm = 'p2mp'; force_deflation = False
if isinstance(Xnew, np.ndarray):
X_ = Xnew.copy()
if X_.ndim == 1:
X_ = X_.reshape(1, -1)
elif isinstance(Xnew, pd.DataFrame):
X_ = np.array(Xnew.values[:,1:]).astype(float)
elif isinstance(Xnew, dict):
data_ = []; names_ = []
for k in Xnew.keys():
data_.append(Xnew[k]); names_.append(k)
c = 0
for i, x in enumerate(data_):
x_ = x.values[:,1:].astype(float)
if c == 0:
X_ = x_.copy()
else:
X_ = np.hstack((X_, x_))
c += 1
var_t = plsobj.get('var_t', (plsobj['T'].T @ plsobj['T']) / plsobj['T'].shape[0])
X_nan_map = np.isnan(X_)
if not X_nan_map.any() and not force_deflation:
X_mcs = (X_ - plsobj['mx']) / plsobj['sx']
tnew = X_mcs @ plsobj['Ws']
yhat = tnew @ plsobj['Q'].T * plsobj['sy'] + plsobj['my']
xhat = tnew @ plsobj['P'].T * plsobj['sx'] + plsobj['mx']
htt2 = np.sum((tnew @ np.linalg.inv(var_t)) * tnew, axis=1)
speX = X_mcs - tnew @ plsobj['P'].T
speX = np.sum(speX**2, axis=1, keepdims=True)
ypred = {'Yhat':yhat, 'Xhat':xhat, 'Tnew':tnew, 'speX':speX, 'T2':htt2}
if 'Wcv' in plsobj:
ypred['Tcv'] = X_mcs @ plsobj['Wcv']
elif algorithm == 'p2mp':
not_Xmiss = (~X_nan_map) * 1
Xmcs = (X_ - plsobj['mx']) / plsobj['sx']
Xmcs, dummy = n2z(Xmcs)
for i in range(Xmcs.shape[0]):
row_map = not_Xmiss[[i], :]
tempW = plsobj['W'] * row_map.T
for a in range(plsobj['W'].shape[1]):
WTW = tempW[:,[a]].T @ tempW[:,[a]]
tnew_aux, resid, rank, s = np.linalg.lstsq(WTW, tempW[:,[a]].T @ Xmcs[[i],:].T, rcond=None)
Xmcs[[i], :] = (Xmcs[[i], :] - tnew_aux @ plsobj['P'][:,[a]].T) * row_map
if a == 0:
tnew_ = tnew_aux
else:
tnew_ = np.vstack((tnew_, tnew_aux))
if i == 0:
tnew = tnew_.T
else:
tnew = np.vstack((tnew, tnew_.T))
yhat = tnew @ plsobj['Q'].T * plsobj['sy'] + plsobj['my']
xhat = tnew @ plsobj['P'].T * plsobj['sx'] + plsobj['mx']
htt2 = np.sum((tnew @ np.linalg.inv(var_t)) * tnew, axis=1)
X_, dummy = n2z(X_)
speX = ((X_ - plsobj['mx']) / plsobj['sx']) - tnew @ plsobj['P'].T
speX = speX * not_Xmiss
speX = np.sum(speX**2, axis=1, keepdims=True)
ypred = {'Yhat':yhat, 'Xhat':xhat, 'Tnew':tnew, 'speX':speX, 'T2':htt2}
if 'Wcv' in plsobj:
ypred['Tcv'] = ((X_ - plsobj['mx']) / plsobj['sx']) @ plsobj['Wcv']
return ypred
[docs]
def spe(mvmobj, Xnew, *, Ynew=False):
"""Compute Squared Prediction Error (SPE / Q statistic).
Args:
mvmobj (dict): Fitted PCA or PLS model.
Xnew (pd.DataFrame or np.ndarray): New X observations.
Ynew (pd.DataFrame or np.ndarray): New Y observations (optional).
Only used for PLS models to also return Y-space SPE.
Returns:
ndarray or tuple:
- If ``Ynew`` is not provided (or model is PCA): returns
``speX`` (ndarray, shape n_obs × 1).
- If ``Ynew`` is provided and model is PLS: returns
``(speX, speY)`` tuple of arrays.
"""
if 'Q' in mvmobj:
xpred = pls_pred(Xnew, mvmobj)
else:
xpred = pca_pred(Xnew, mvmobj)
Tnew = xpred['Tnew']
if isinstance(Xnew, np.ndarray):
X_ = Xnew.copy()
elif isinstance(Xnew, pd.DataFrame):
X_ = np.array(Xnew.values[:,1:]).astype(float)
if isinstance(Ynew, np.ndarray):
Y_ = Ynew.copy()
elif isinstance(Ynew, pd.DataFrame):
Y_ = np.array(Ynew.values[:,1:]).astype(float)
Xnewhat = Tnew @ mvmobj['P'].T
Xres = (X_ - mvmobj['mx']) / mvmobj['sx'] - Xnewhat
spex_ = np.sum(Xres**2, axis=1, keepdims=True)
if not isinstance(Ynew, bool) and ('Q' in mvmobj):
Ynewhat = Tnew @ mvmobj['Q'].T
Yres = (Y_ - mvmobj['my']) / mvmobj['sy'] - Ynewhat
spey_ = np.sum(Yres**2, axis=1, keepdims=True)
return spex_, spey_
else:
return spex_
# =============================================================================
# LWPLS
# =============================================================================
[docs]
def lwpls(xnew, loc_par, mvmobj, X, Y, *, shush=False):
"""Locally Weighted PLS (LWPLS) prediction for a single new observation.
Per Kim et al. Int. J. Pharmaceutics 421 (2011) 269–274.
Args:
xnew (np.ndarray or pd.DataFrame): Single new observation (1 × n_x).
loc_par (float): Locality parameter controlling the width of the
Gaussian kernel. Larger values include more training observations.
mvmobj (dict): Global PLS model from :func:`pls`, used to define
the score space for distance calculation.
X (pd.DataFrame or np.ndarray): Training X data.
Y (pd.DataFrame or np.ndarray): Training Y data.
shush (bool): Suppress printed output. Default ``False``.
Returns:
dict: Prediction results with keys:
- ``Yhat`` (ndarray): Locally predicted Y (1 × n_y).
- ``weights`` (ndarray): Observation weights used in local model.
"""
if not shush:
print('phi.lwpls executed on: ' + str(datetime.datetime.now()))
xnew = np.reshape(xnew, (-1, 1))
if isinstance(X, pd.DataFrame):
X = np.array(X.values[:,1:]).astype(float)
if isinstance(Y, pd.DataFrame):
Y = np.array(Y.values[:,1:]).astype(float)
vip = np.sum(np.abs(mvmobj['Ws'] * mvmobj['r2y']), axis=1).reshape(-1, 1)
theta = vip
D = X - xnew.T
d2 = D * theta.T * D
d2 = np.sqrt(np.sum(d2, axis=1))
omega = np.exp(-d2 / (np.var(d2, ddof=1) * loc_par))
OMEGA = np.diag(omega)
omega = omega.reshape(-1, 1)
X_weighted_mean = (np.sum(omega * X, axis=0) / np.sum(omega)).reshape(-1, 1)
Y_weighted_mean = (np.sum(omega * Y, axis=0) / np.sum(omega)).reshape(-1, 1)
Xi = X - X_weighted_mean.T
Yi = Y - Y_weighted_mean.T
xnewi = xnew - X_weighted_mean
yhat = Y_weighted_mean
for a in range(mvmobj['T'].shape[1]):
[U_, S, Wh] = np.linalg.svd(Xi.T @ OMEGA @ Yi @ Yi.T @ OMEGA @ Xi)
w = Wh.T[:, [0]]
t = Xi @ w
p = Xi.T @ OMEGA @ t / (t.T @ OMEGA @ t)
q = Yi.T @ OMEGA @ t / (t.T @ OMEGA @ t)
tnew = xnewi.T @ w
yhat = yhat + q @ tnew
Xi = Xi - t @ p.T
Yi = Yi - t @ q.T
xnewi = xnewi - p @ tnew
return yhat[0].T
# =============================================================================
# Contributions
# =============================================================================
[docs]
def contributions(mvmobj, X, cont_type, *, Y=False, from_obs=False, to_obs=False, lv_space=False):
"""Compute variable contributions to monitoring statistics.
Args:
mvmobj (dict): Fitted PCA or PLS model.
Xnew (pd.DataFrame or np.ndarray): Observations to diagnose.
cont_type (str): Type of contribution to compute.
``'scores'``: contribution to each score.
``'spex'``: contribution to X-space SPE.
``'spey'``: contribution to Y-space SPE (PLS only).
``'t2'``: contribution to Hotelling's T².
Ynew (pd.DataFrame or np.ndarray): Y observations (optional,
required for ``cont_type='spey'``).
Returns:
ndarray: Contribution values (n_obs × n_vars).
Ref: Miller, P., Swanson, R.E. and Heckler, C.E., 1998. Contribution plots: a missing link
in multivariate quality control.
Applied mathematics and computer science, 8(4), pp.775-792.
"""
if isinstance(lv_space, bool):
lv_space = list(range(mvmobj['T'].shape[1]))
elif isinstance(lv_space, int):
lv_space = (np.array([lv_space]) - 1).tolist()
elif isinstance(lv_space, list):
lv_space = (np.array(lv_space) - 1).tolist()
if isinstance(to_obs, int):
to_obs = [to_obs]
if not isinstance(from_obs, bool):
if isinstance(from_obs, int):
from_obs = [from_obs]
if isinstance(X, np.ndarray):
X_ = X.copy()
elif isinstance(X, pd.DataFrame):
X_ = np.array(X.values[:,1:]).astype(float)
if not isinstance(Y, bool):
if isinstance(Y, np.ndarray):
Y_ = Y.copy()
elif isinstance(Y, pd.DataFrame):
Y_ = np.array(Y.values[:,1:]).astype(float)
if cont_type == 'ht2' or cont_type == 'scores':
X_ = (X_ - mvmobj['mx']) / mvmobj['sx']
X_, dummy = n2z(X_)
t_stdevs = np.std(mvmobj['T'], axis=0, ddof=1)
loadings = mvmobj['Ws'] if 'Q' in mvmobj else mvmobj['P']
to_obs_mean = np.mean(X_[to_obs, :], axis=0, keepdims=True)
to_cont = np.zeros((1, X_.shape[1]))
for a in lv_space:
aux_ = (to_obs_mean * np.abs(loadings[:, a].T)) / t_stdevs[a]
to_cont = to_cont + (aux_ if cont_type == 'scores' else aux_**2)
if not isinstance(from_obs, bool):
from_obs_mean = np.mean(X_[from_obs, :], axis=0, keepdims=1)
from_cont = np.zeros((1, X_.shape[1]))
for a in lv_space:
aux_ = (from_obs_mean * np.abs(loadings[:, a].T)) / t_stdevs[a]
from_cont = from_cont + (aux_ if cont_type == 'scores' else aux_**2)
return to_cont - from_cont
else:
return to_cont
elif cont_type == 'spe':
X_ = X_[to_obs, :]
if 'Q' in mvmobj:
pred = pls_pred(X_, mvmobj)
else:
pred = pca_pred(X_, mvmobj)
Xhat = pred['Xhat']
Xhatmcs = (Xhat - mvmobj['mx']) / mvmobj['sx']
X_ = (X_ - mvmobj['mx']) / mvmobj['sx']
Xerror = X_ - Xhatmcs
Xerror, dummy = n2z(Xerror)
contsX = (Xerror**2) * np.sign(Xerror)
contsX = np.mean(contsX, axis=0, keepdims=True)
if not isinstance(Y, bool):
Y_ = Y_[to_obs, :]
Yhat = pred['Yhat']
Yhatmcs = (Yhat - mvmobj['my']) / mvmobj['sy']
Y_ = (Y_ - mvmobj['my']) / mvmobj['sy']
Yerror = Y_ - Yhatmcs
Yerror, dummy = n2z(Yerror)
contsY = (Yerror**2) * np.sign(Yerror)
contsY = np.mean(contsY, axis=0, keepdims=True)
return contsX, contsY
else:
return contsX
# =============================================================================
# Data cleaning utilities
# =============================================================================
[docs]
def clean_empty_rows(X, *, shush=False):
"""Remove rows that are entirely NaN.
Args:
X (pd.DataFrame or np.ndarray): Input data matrix.
shush (bool): Suppress printed output. Default ``False``.
Returns:
pd.DataFrame or np.ndarray: Data with fully empty rows removed.
"""
if isinstance(X, np.ndarray):
X_ = X.copy()
ObsID_ = ['Obs #'+str(n) for n in np.arange(X.shape[0])+1]
elif isinstance(X, pd.DataFrame):
X_ = np.array(X.values[:,1:]).astype(float)
ObsID_ = X.values[:,0].astype(str).tolist()
X_nan_map = np.isnan(X_)
Xmiss = np.sum(X_nan_map * 1, axis=1)
indx = find(Xmiss, lambda x: x == X_.shape[1])
rows_rem = []
if len(indx) > 0:
for i in indx:
if not shush:
print('Removing row ', ObsID_[i], ' due to 100% missing data')
rows_rem.append(ObsID_[i])
if isinstance(X, pd.DataFrame):
X_ = X.drop(X.index.values[indx].tolist())
else:
X_ = np.delete(X_, indx, 0)
return X_, rows_rem
else:
return X, rows_rem
[docs]
def clean_low_variances(X, *, shush=False, min_var=1E-10):
"""Remove columns with variance below a threshold.
Args:
X (pd.DataFrame or np.ndarray): Input data matrix.
min_var (float): Minimum acceptable variance. Default ``1e-10``.
shush (bool): Suppress printed output. Default ``False``.
Returns:
pd.DataFrame or np.ndarray: Data with low-variance columns removed.
"""
cols_removed = []
if isinstance(X, pd.DataFrame):
X_ = np.array(X.values[:,1:]).astype(float)
varidX = X.columns.values[1:].tolist()
else:
X_ = X.copy()
varidX = ['Var #'+str(n) for n in np.arange(X.shape[1])+1]
X_nan_map = np.isnan(X_)
Xmiss = np.sum(X_nan_map * 1, axis=0)
indx = find(Xmiss, lambda x: x >= (X_.shape[0]-3))
if len(indx) > 0:
for i in indx:
if not shush:
print('Removing variable ', varidX[i], ' due to 100% missing data')
if isinstance(X, pd.DataFrame):
for i in indx:
cols_removed.append(varidX[i])
X_pd = X.drop(X.columns[np.array(indx)+1], axis=1)
X_ = np.array(X_pd.values[:,1:]).astype(float)
else:
for i in indx:
cols_removed.append(varidX[i])
X_ = np.delete(X_, indx, 1)
else:
X_pd = X.copy()
new_cols = X_pd.columns[1:].tolist()
std_x = std(X_).flatten()
indx = find(std_x, lambda x: x < min_var)
if len(indx) > 0:
for i in indx:
if not shush:
print('Removing variable ', new_cols[i], ' due to low variance')
if isinstance(X_pd, pd.DataFrame):
for i in indx:
cols_removed.append(new_cols[i])
X_ = X_pd.drop(X_pd.columns[np.array(indx)+1], axis=1)
else:
X_ = np.delete(X_, indx, 1)
for j in indx:
cols_removed.append(varidX[j])
return X_, cols_removed
else:
return X_pd, cols_removed
# =============================================================================
# Spectra preprocessing
# =============================================================================
[docs]
def spectra_snv(x):
"""Apply Standard Normal Variate (SNV) correction to spectra.
Each spectrum (row) is mean-centered and scaled by its own standard
deviation. Removes multiplicative scatter effects.
Args:
X (pd.DataFrame or np.ndarray): Spectra matrix (n_samples × n_wavelengths).
If a DataFrame, the first column must contain sample IDs.
Returns:
pd.DataFrame or np.ndarray: SNV-corrected spectra (same type as input).
"""
if isinstance(x, pd.DataFrame):
x_columns = x.columns
x_values = x.values
x_values[:,1:] = spectra_snv(x_values[:,1:].astype(float))
return pd.DataFrame(x_values, columns=x_columns)
else:
if x.ndim == 2:
mean_x = np.mean(x, axis=1, keepdims=1)
x = x - mean_x
std_x = np.sqrt(np.sum(x**2, axis=1) / (x.shape[1]-1)).reshape(-1, 1)
x = x / std_x
return x
else:
x = x - np.mean(x)
stdx = np.sqrt(np.sum(x**2) / (len(x)-1))
return x / stdx
[docs]
def spectra_savgol(ws, od, op, Dm):
"""Apply Savitzky-Golay smoothing and/or differentiation to spectra.
Args:
X (pd.DataFrame or np.ndarray): Spectra matrix (n_samples × n_wavelengths).
If a DataFrame, the first column must contain sample IDs.
window (int): Window length (must be odd and greater than ``poly``).
poly (int): Polynomial order for the filter.
deriv (int): Derivative order. ``0`` = smoothing only, ``1`` = first
derivative, ``2`` = second derivative.
Returns:
pd.DataFrame or np.ndarray: Filtered spectra (same type as input).
"""
if isinstance(Dm, pd.DataFrame):
x_columns = Dm.columns.tolist()
FirstElement = [x_columns[0]]
x_columns = x_columns[1:]
FirstElement.extend(x_columns[ws:-ws])
x_values = Dm.values
Col1 = Dm.values[:,0].tolist()
Col1 = np.reshape(Col1, (-1, 1))
aux, M = spectra_savgol(ws, od, op, x_values[:,1:].astype(float))
data_ = np.hstack((Col1, aux))
return pd.DataFrame(data=data_, columns=FirstElement), M
else:
l = Dm.shape[0] if Dm.ndim == 1 else Dm.shape[1]
x_vec = np.arange(-ws, ws+1).reshape(-1, 1)
X_sg = np.ones((2*ws+1, 1))
for oo in np.arange(1, op+1):
X_sg = np.hstack((X_sg, x_vec**oo))
try:
XtXiXt = np.linalg.inv(X_sg.T @ X_sg) @ X_sg.T
except:
XtXiXt = np.linalg.pinv(X_sg.T @ X_sg) @ X_sg.T
coeffs = (XtXiXt[od, :] * factorial(od)).reshape(1, -1)
for i in np.arange(1, l-2*ws+1):
if i == 1:
M = np.hstack((coeffs, np.zeros((1, l-2*ws-1))))
elif i < l-2*ws:
m_ = np.hstack((np.zeros((1, i-1)), coeffs, np.zeros((1, l-2*ws-1-i+1))))
M = np.vstack((M, m_))
else:
m_ = np.hstack((np.zeros((1, l-2*ws-1)), coeffs))
M = np.vstack((M, m_))
if Dm.ndim == 1:
Dm_sg = M @ Dm
else:
Dm_sg = Dm @ M.T
return Dm_sg, M
[docs]
def spectra_mean_center(Dm):
"""Mean-center each wavelength across the sample set.
Args:
X (pd.DataFrame or np.ndarray): Spectra matrix (n_samples × n_wavelengths).
Returns:
pd.DataFrame or np.ndarray: Mean-centered spectra.
"""
if isinstance(Dm, pd.DataFrame):
Dm_columns = Dm.columns
Dm_values = Dm.values
Dm_values[:,1:] = spectra_mean_center(Dm_values[:,1:].astype(float))
return pd.DataFrame(Dm_values, columns=Dm_columns)
else:
if Dm.ndim == 2:
return Dm - Dm.mean(axis=1)[:, None]
else:
return Dm - Dm.mean()
[docs]
def spectra_autoscale(Dm):
"""Autoscale spectra (mean-center and scale each wavelength to unit variance).
Args:
X (pd.DataFrame or np.ndarray): Spectra matrix (n_samples × n_wavelengths).
Returns:
pd.DataFrame or np.ndarray: Autoscaled spectra.
"""
if isinstance(Dm, pd.DataFrame):
Dm_columns = Dm.columns
Dm_values = Dm.values
Dm_values[:,1:] = spectra_autoscale(Dm_values[:,1:].astype(float))
return pd.DataFrame(Dm_values, columns=Dm_columns)
else:
if Dm.ndim == 2:
return Dm / Dm.std(axis=1, ddof=1)[:, None]
else:
return Dm / Dm.std(ddof=1)
[docs]
def spectra_baseline_correction(Dm):
"""Apply piecewise linear baseline correction to spectra.
Args:
X (pd.DataFrame or np.ndarray): Spectra matrix (n_samples × n_wavelengths).
If a DataFrame, the first column must contain sample IDs.
anchor_points (list of int): Column indices to use as baseline anchor
points for the piecewise linear interpolation.
Returns:
pd.DataFrame or np.ndarray: Baseline-corrected spectra.
"""
if isinstance(Dm, pd.DataFrame):
Dm_columns = Dm.columns
Dm_values = Dm.values
Dm_values[:,1:] = spectra_baseline_correction(Dm_values[:,1:].astype(float))
return pd.DataFrame(Dm_values, columns=Dm_columns)
else:
if Dm.ndim == 2:
return Dm - Dm.min(axis=1)[:, None]
else:
return Dm - Dm.min()
[docs]
def spectra_msc(Dm, reference_spectra=None):
"""Apply Multiplicative Scatter Correction (MSC) to spectra.
Args:
X (pd.DataFrame or np.ndarray): Spectra matrix (n_samples × n_wavelengths).
If a DataFrame, the first column must contain sample IDs.
reference (np.ndarray): Reference spectrum to correct against.
Defaults to the mean spectrum of ``X``.
Returns:
pd.DataFrame or np.ndarray: MSC-corrected spectra (same type as input).
"""
if isinstance(Dm, pd.DataFrame):
Dm_columns = Dm.columns
Dm_values = Dm.values
Dm_values[:,1:] = spectra_msc(Dm_values[:,1:].astype(float))
return pd.DataFrame(Dm_values, columns=Dm_columns)
else:
if Dm.ndim == 2:
if reference_spectra is None:
reference_spectra = Dm.mean(axis=0)
V = np.vstack([np.ones(reference_spectra.shape), reference_spectra])
U = Dm @ V.T @ np.linalg.inv(V @ V.T)
return (Dm - U[:,0, None]) / U[:,1, None]
else:
if reference_spectra is None:
raise ValueError("msc needs a reference spectra or to be able to use the mean of many spectra")
V = np.vstack([np.ones(reference_spectra.shape), reference_spectra])
U = Dm @ V.T @ np.linalg.inv(V @ V.T)
return (Dm - U[0]) / U[1]
# =============================================================================
# Bootstrap PLS
# =============================================================================
[docs]
def bootstrap_pls(X, Y, num_latents, num_samples, **kwargs):
"""Estimate PLS loading uncertainty via bootstrap resampling.
Args:
X (pd.DataFrame or np.ndarray): Training X data.
Y (pd.DataFrame or np.ndarray): Training Y data.
A (int): Number of latent variables.
n_boots (int): Number of bootstrap iterations.
mcs (tuple): Preprocessing flags. Default ``('autoscale', 'autoscale')``.
shush (bool): Suppress per-iteration output. Default ``True``.
Returns:
dict: Bootstrap results with keys:
- ``W_boot`` (ndarray): Bootstrap distribution of W (n_boots × n_x × A).
- ``Q_boot`` (ndarray): Bootstrap distribution of Q (n_boots × n_y × A).
- ``W_mean``, ``W_std``: Mean and std of bootstrap W.
- ``Q_mean``, ``Q_std``: Mean and std of bootstrap Q.
"""
if isinstance(X, pd.DataFrame):
Dm_values = X.values[:,1:].astype(float)
if isinstance(Y, pd.DataFrame):
Y = Y.values[:,1:].astype(float)
boot_pls_obj = []
for _ in range(num_samples):
sample_indexes = np.random.randint(Dm_values.shape[0], None, Dm_values.shape[0])
boot_pls_obj.append(pls(Dm_values[sample_indexes], Y[sample_indexes], num_latents, shush=True, **kwargs))
return boot_pls_obj
[docs]
def bootstrap_pls_pred(X_new, bootstrap_pls_obj, quantiles=[0.025, 0.975]):
"""Predict Y with uncertainty estimates using a bootstrap PLS ensemble.
Args:
Xnew (pd.DataFrame or np.ndarray): New X observations to predict.
boot_obj (dict): Bootstrap model from :func:`bootstrap_pls`.
alpha (float): Confidence level for prediction intervals. Default ``0.95``.
Returns:
dict: Prediction results with keys:
- ``Yhat`` (ndarray): Mean predicted Y (n_new × n_y).
- ``Yhat_lb`` (ndarray): Lower bound of prediction interval.
- ``Yhat_ub`` (ndarray): Upper bound of prediction interval.
- ``Yhat_std`` (ndarray): Std dev of bootstrap predictions.
"""
for q in quantiles:
if q >= 1 or q <= 0:
raise ValueError("Quantiles must be between zero and one")
means = []; sds = []
for pls_obj in bootstrap_pls_obj:
means.append(pls_pred(X_new, pls_obj)["Yhat"])
sds.append(np.sqrt(pls_obj["speY"].mean()))
means = np.array(means).squeeze()
sds = np.array(sds)
dist = norm(means, sds[:, None])
ppf = []
for q in quantiles:
def cdf(x):
return dist.cdf(x).mean(axis=0) - np.ones_like(x)*q
ppf.append(fsolve(cdf, means.mean(axis=0)))
return ppf
# =============================================================================
# Pyomo utilities
# =============================================================================
[docs]
def np2D2pyomo(arr, *, varids=False):
"""Convert a 2D NumPy array to a Pyomo-compatible dictionary.
Args:
data (np.ndarray): 2D array to convert.
Returns:
dict: Dictionary keyed by ``(i, j)`` integer index tuples.
"""
if not varids:
return dict(((i+1, j+1), arr[i][j]) for i in range(arr.shape[0]) for j in range(arr.shape[1]))
else:
return dict(((varids[i], j+1), arr[i][j]) for i in range(arr.shape[0]) for j in range(arr.shape[1]))
[docs]
def np1D2pyomo(arr, *, indexes=False):
"""Convert a 1D NumPy array to a Pyomo-compatible dictionary.
Args:
data (np.ndarray): 1D array to convert.
Returns:
dict: Dictionary keyed by integer index.
"""
if arr.ndim == 2:
arr = arr[0]
if isinstance(indexes, bool):
return dict(((j+1), arr[j]) for j in range(len(arr)))
elif isinstance(indexes, list):
return dict((indexes[j], arr[j]) for j in range(len(arr)))
[docs]
def adapt_pls_4_pyomo(plsobj, *, use_var_ids=False):
"""Convert PLS model arrays to Pyomo-compatible dictionaries.
Transforms ``P``, ``Q``, ``W``, ``Ws``, ``mx``, ``sx``, ``my``, ``sy``
into the indexed dict format required by Pyomo ``Param`` objects.
Args:
plsobj (dict): Fitted PLS model from :func:`pls`.
Returns:
dict: Model parameters as Pyomo-indexed dictionaries.
"""
plsobj_ = plsobj.copy()
A = plsobj['T'].shape[1]; N = plsobj['P'].shape[0]; M = plsobj['Q'].shape[0]
pyo_A = list(np.arange(1, A+1))
if not use_var_ids:
pyo_N = list(np.arange(1, N+1)); pyo_M = list(np.arange(1, M+1))
pyo_Ws = np2D2pyomo(plsobj['Ws']); pyo_Q = np2D2pyomo(plsobj['Q']); pyo_P = np2D2pyomo(plsobj['P'])
var_t = np.var(plsobj['T'], axis=0)
pyo_var_t = np1D2pyomo(var_t)
pyo_mx = np1D2pyomo(plsobj['mx']); pyo_sx = np1D2pyomo(plsobj['sx'])
pyo_my = np1D2pyomo(plsobj['my']); pyo_sy = np1D2pyomo(plsobj['sy'])
else:
pyo_N = plsobj['varidX']; pyo_M = plsobj['varidY']
pyo_Ws = np2D2pyomo(plsobj['Ws'], varids=plsobj['varidX'])
pyo_Q = np2D2pyomo(plsobj['Q'], varids=plsobj['varidY'])
pyo_P = np2D2pyomo(plsobj['P'], varids=plsobj['varidX'])
var_t = np.var(plsobj['T'], axis=0)
pyo_var_t = np1D2pyomo(var_t)
pyo_mx = np1D2pyomo(plsobj['mx'], indexes=plsobj['varidX'])
pyo_sx = np1D2pyomo(plsobj['sx'], indexes=plsobj['varidX'])
pyo_my = np1D2pyomo(plsobj['my'], indexes=plsobj['varidY'])
pyo_sy = np1D2pyomo(plsobj['sy'], indexes=plsobj['varidY'])
plsobj_.update({'pyo_A':pyo_A, 'pyo_N':pyo_N, 'pyo_M':pyo_M, 'pyo_Ws':pyo_Ws, 'pyo_Q':pyo_Q,
'pyo_P':pyo_P, 'pyo_var_t':pyo_var_t, 'pyo_mx':pyo_mx, 'pyo_sx':pyo_sx,
'pyo_my':pyo_my, 'pyo_sy':pyo_sy, 'speX_lim95':plsobj['speX_lim95']})
return plsobj_
[docs]
def prep_pca_4_MDbyNLP(pcaobj, X):
"""Prepare a PCA model for missing-data imputation by NLP.
Extracts and formats the loadings and preprocessing parameters needed
to set up a Pyomo optimization problem for MD imputation.
Args:
pcaobj (dict): Fitted PCA model from :func:`pca`.
Returns:
dict: Parameters formatted for use in a Pyomo MD-by-NLP formulation.
"""
pcaobj_ = pcaobj.copy()
X_nan_map = np.isnan(X)
psi = (~X_nan_map) * 1
X, dummy = n2z(X)
A = pcaobj['T'].shape[1]; O = pcaobj['T'].shape[0]; N = pcaobj['P'].shape[0]
pyo_A = list(np.arange(1, A+1)); pyo_N = list(np.arange(1, N+1)); pyo_O = list(np.arange(1, O+1))
pcaobj_.update({'pyo_A':pyo_A, 'pyo_N':pyo_N, 'pyo_O':pyo_O,
'pyo_P_init':np2D2pyomo(pcaobj['P']), 'pyo_T_init':np2D2pyomo(pcaobj['T']),
'pyo_X':np2D2pyomo(X), 'pyo_psi':np2D2pyomo(psi)})
return pcaobj_
[docs]
def prep_pls_4_MDbyNLP(plsobj, X, Y):
"""Prepare a PLS model for missing-data imputation by NLP.
Args:
plsobj (dict): Fitted PLS model from :func:`pls`.
Returns:
dict: Parameters formatted for use in a Pyomo MD-by-NLP formulation.
"""
plsobj_ = plsobj.copy()
X_nan_map = np.isnan(X); psi = (~X_nan_map)*1; X, dummy = n2z(X)
Y_nan_map = np.isnan(Y); theta = (~Y_nan_map)*1; Y, dummy = n2z(Y)
A = plsobj['T'].shape[1]; O = plsobj['T'].shape[0]
N = plsobj['P'].shape[0]; M = plsobj['Q'].shape[0]
pyo_A = list(np.arange(1,A+1)); pyo_N = list(np.arange(1,N+1))
pyo_O = list(np.arange(1,O+1)); pyo_M = list(np.arange(1,M+1))
plsobj_.update({'pyo_A':pyo_A, 'pyo_N':pyo_N, 'pyo_O':pyo_O, 'pyo_M':pyo_M,
'pyo_P_init':np2D2pyomo(plsobj['P']), 'pyo_Ws_init':np2D2pyomo(plsobj['Ws']),
'pyo_T_init':np2D2pyomo(plsobj['T']), 'pyo_Q_init':np2D2pyomo(plsobj['Q']),
'pyo_X':np2D2pyomo(X), 'pyo_psi':np2D2pyomo(psi),
'pyo_Y':np2D2pyomo(Y), 'pyo_theta':np2D2pyomo(theta)})
return plsobj_
[docs]
def conv_pls_2_eiot(plsobj, *, r_length=False):
"""Convert a PLS model for use in EIOT (Extended Iterative Optimization Technology).
Args:
plsobj (dict): Fitted PLS model from :func:`pls`.
r2y_threshold (float): Minimum cumulative R²Y to determine the number
of LVs to retain. Default ``0.95``.
Returns:
dict: EIOT-compatible model parameters.
"""
plsobj_ = plsobj.copy()
A = plsobj['T'].shape[1]; N = plsobj['P'].shape[0]; M = plsobj['Q'].shape[0]
pyo_A = list(np.arange(1,A+1)); pyo_N = list(np.arange(1,N+1)); pyo_M = list(np.arange(1,M+1))
pyo_Ws = np2D2pyomo(plsobj['Ws']); pyo_Q = np2D2pyomo(plsobj['Q']); pyo_P = np2D2pyomo(plsobj['P'])
var_t = np.var(plsobj['T'], axis=0)
pyo_var_t = np1D2pyomo(var_t)
pyo_mx = np1D2pyomo(plsobj['mx']); pyo_sx = np1D2pyomo(plsobj['sx'])
pyo_my = np1D2pyomo(plsobj['my']); pyo_sy = np1D2pyomo(plsobj['sy'])
if not isinstance(r_length, bool):
if r_length < N:
indx_r = list(np.arange(1, r_length+1)); indx_rk_eq = list(np.arange(r_length+1, N+1))
else:
if r_length > N:
print('r_length >> N !!'); print('Forcing r_length=N')
indx_r = pyo_N; indx_rk_eq = 0
else:
indx_r = pyo_N; indx_rk_eq = 0
plsobj_.update({'pyo_A':pyo_A, 'pyo_N':pyo_N, 'pyo_M':pyo_M, 'pyo_Ws':pyo_Ws, 'pyo_Q':pyo_Q,
'pyo_P':pyo_P, 'pyo_var_t':pyo_var_t, 'indx_r':indx_r, 'indx_rk_eq':indx_rk_eq,
'pyo_mx':pyo_mx, 'pyo_sx':pyo_sx, 'pyo_my':pyo_my, 'pyo_sy':pyo_sy,
'S_I':np.nan, 'pyo_S_I':np.nan, 'var_t':var_t})
return plsobj_
# =============================================================================
# Categorical, Multi-block PLS, Replicate data, gPROMS export
# =============================================================================
[docs]
def cat_2_matrix(X):
"""Convert a categorical variable column to a binary indicator matrix.
Args:
x (pd.DataFrame): Data frame with columns of categorical data
First column is the variable ID.
shush (bool): Suppress printed output. Default ``False``.
Returns:
x_binary (pd.DataFrame): Binary indicator matrix with one column
per unique category (same type as input), all categories concatenated
xmb (pd.DataFrame): Binary indicator matrix with one column
per unique category (same type as input) categories organized by block
for multi-block models (if DataFrame has multiple columns)
"""
FirstOne = True; Xmat = []; Xcat = []; XmatMB = []; blknames = []
for x in X:
if not FirstOne:
blknames.append(x)
categories = np.unique(X[x])
Xmat_ = []
for c in categories:
Xcat.append(c)
xmat_ = (X[x] == c) * 1
Xmat.append(xmat_)
Xmat_.append(xmat_)
Xmat_ = pd.DataFrame(np.array(Xmat_).T, columns=categories)
Xmat_.insert(0, firstcol, X[firstcol])
XmatMB.append(Xmat_)
else:
firstcol = x; FirstOne = False
Xmat = pd.DataFrame(np.array(Xmat).T, columns=Xcat)
Xmat.insert(0, firstcol, X[firstcol].values)
aux_dict = dict()
for bname, bdata in zip(blknames, XmatMB):
aux_dict[bname] = bdata
return Xmat, aux_dict
[docs]
def mbpls(XMB, YMB, A, *, mcsX=True, mcsY=True, md_algorithm_='nipals',
force_nipals_=False, shush_=False, cross_val_=0, cross_val_X_=False, cca=False):
"""Fit a Multi-Block PLS (MBPLS) model.
Args:
Xmb (dict): Dictionary of X blocks ``{'block_name': pd.DataFrame}``.
Each DataFrame's first column must contain observation IDs.
Y (pd.DataFrame or np.ndarray): Response matrix. First column is
observation IDs if a DataFrame.
A (int): Number of latent variables.
mcs (tuple): Preprocessing flags ``(mcs_X, mcs_Y)``. Default
``('autoscale', 'autoscale')``.
shush (bool): Suppress printed output. Default ``False``.
cross_val (int): Cross-validation level (same as :func:`pls`).
cross_val_X (bool): Cross-validate X-space. Default ``False``.
Returns:
dict: Fitted MBPLS model, extending the standard PLS model dict
with per-block keys:
- ``T`` (ndarray): Super-scores.
- ``Tb`` (dict): Per-block scores keyed by block name.
- ``Pb`` (dict): Per-block loadings.
- ``Wb`` (dict): Per-block weights.
- ``r2xb`` (dict): Per-block R² contributions.
- ``block_importance`` (ndarray): Variance importance per block.
Plus all standard PLS keys (``Q``, ``r2y``, ``speX``, etc.).
"""
x_means=[]; x_stds=[]; y_means=[]; y_stds=[]
Xblk_scales=[]; Yblk_scales=[]; Xcols_per_block=[]; Ycols_per_block=[]
X_var_names=[]; Y_var_names=[]; obsids=[]
if isinstance(XMB, dict):
data_ = []; names_ = []
for k in XMB.keys():
data_.append(XMB[k]); names_.append(k)
XMB = {'data':data_, 'blknames':names_}
x = XMB['data'][0]
columns = x.columns.tolist()
obsid_column_name = columns[0]
obsids = x[obsid_column_name].tolist()
c = 0
for x in XMB['data']:
x_ = x.values[:,1:].astype(float)
columns = x.columns.tolist()
for i, h in enumerate(columns):
if i != 0:
X_var_names.append(XMB['blknames'][c]+' '+h)
if isinstance(mcsX, bool):
if mcsX:
x_, x_mean_, x_std_ = meancenterscale(x_)
else:
x_mean_ = np.zeros((1, x_.shape[1])); x_std_ = np.ones((1, x_.shape[1]))
elif mcsX[c] == 'center':
x_, x_mean_, x_std_ = meancenterscale(x_, mcs='center')
elif mcsX[c] == 'autoscale':
x_, x_mean_, x_std_ = meancenterscale(x_, mcs='autoscale')
blck_scale = np.sqrt(np.sum(std(x_)**2))
x_means.append(x_mean_); x_stds.append(x_std_)
Xblk_scales.append(blck_scale); Xcols_per_block.append(x_.shape[1])
x_ = x_ / blck_scale
if c == 0: X_ = x_.copy()
else: X_ = np.hstack((X_, x_))
c += 1
elif isinstance(XMB, pd.DataFrame):
columns = XMB.columns.tolist()
obsid_column_name = columns[0]; obsids = XMB[obsid_column_name].tolist()
for i, h in enumerate(columns):
if i != 0: X_var_names.append(h)
x_ = XMB.values[:,1:].astype(float)
if isinstance(mcsX, bool):
if mcsX: x_, x_mean_, x_std_ = meancenterscale(x_)
else: x_mean_ = np.zeros((1, x_.shape[1])); x_std_ = np.ones((1, x_.shape[1]))
elif mcsX[0] == 'center':
x_, x_mean_, x_std_ = meancenterscale(x_, mcs='center')
elif mcsX[0] == 'autoscale':
x_, x_mean_, x_std_ = meancenterscale(x_, mcs='autoscale')
blck_scale = np.sqrt(np.sum(std(x_)**2))
x_means.append(x_mean_); x_stds.append(x_std_)
Xblk_scales.append(blck_scale); Xcols_per_block.append(x_.shape[1])
X_ = x_ / blck_scale
if isinstance(YMB, dict):
data_ = []; names_ = []
for k in YMB.keys():
data_.append(YMB[k]); names_.append(k)
YMB = {'data':data_, 'blknames':names_}
c = 0
for y in YMB['data']:
y_ = y.values[:,1:].astype(float)
columns = y.columns.tolist()
for i, h in enumerate(columns):
if i != 0: Y_var_names.append(h)
if isinstance(mcsY, bool):
if mcsY: y_, y_mean_, y_std_ = meancenterscale(y_)
else: y_mean_ = np.zeros((1, y_.shape[1])); y_std_ = np.ones((1, y_.shape[1]))
elif mcsY[c] == 'center':
y_, y_mean_, y_std_ = meancenterscale(y_, mcs='center')
elif mcsY[c] == 'autoscale':
y_, y_mean_, y_std_ = meancenterscale(y_, mcs='autoscale')
blck_scale = np.sqrt(np.sum(std(y_)**2))
y_means.append(y_mean_); y_stds.append(y_std_)
Yblk_scales.append(blck_scale); Ycols_per_block.append(y_.shape[1])
y_ = y_ / blck_scale
if c == 0: Y_ = y_.copy()
else: Y_ = np.hstack((Y_, y_))
c += 1
elif isinstance(YMB, pd.DataFrame):
y_ = YMB.values[:,1:].astype(float)
columns = YMB.columns.tolist()
for i, h in enumerate(columns):
if i != 0: Y_var_names.append(h)
if isinstance(mcsY, bool):
if mcsY: y_, y_mean_, y_std_ = meancenterscale(y_)
else: y_mean_ = np.zeros((1, y_.shape[1])); y_std_ = np.ones((1, y_.shape[1]))
elif mcsY[0] == 'center':
y_, y_mean_, y_std_ = meancenterscale(y_, mcs='center')
elif mcsY[0] == 'autoscale':
y_, y_mean_, y_std_ = meancenterscale(y_, mcs='autoscale')
blck_scale = np.sqrt(np.sum(std(y_)**2))
y_means.append(y_mean_); y_stds.append(y_std_)
Yblk_scales.append(blck_scale); Ycols_per_block.append(y_.shape[1])
Y_ = y_ / blck_scale
X_pd = pd.DataFrame(X_, columns=X_var_names)
X_pd.insert(0, obsid_column_name, obsids)
Y_pd = pd.DataFrame(Y_, columns=Y_var_names)
Y_pd.insert(0, obsid_column_name, obsids)
pls_obj_ = pls(X_pd, Y_pd, A, mcsX=False, mcsY=False, md_algorithm=md_algorithm_,
force_nipals=force_nipals_, shush=shush_, cross_val=cross_val_, cross_val_X=cross_val_X_, cca=cca)
pls_obj_['type'] = 'mbpls'
Wsb=[]; Wb=[]; Tb=[]
for i, c in enumerate(Xcols_per_block):
start_index = 0 if i == 0 else np.sum(Xcols_per_block[0:i])
end_index = start_index + c
wsb_ = pls_obj_['Ws'][start_index:end_index, :].copy()
for j in range(wsb_.shape[1]):
wsb_[:, j] = wsb_[:, j] / np.linalg.norm(wsb_[:, j])
Wsb.append(wsb_)
wb_ = pls_obj_['W'][start_index:end_index, :].copy()
for j in range(wb_.shape[1]):
wb_[:, j] = wb_[:, j] / np.linalg.norm(wb_[:, j])
Wb.append(wb_)
Xb = X_[:, start_index:end_index]
X_nan_map = np.isnan(Xb)
not_Xmiss = (~X_nan_map) * 1
Xb, dummy = n2z(Xb)
TSS = np.sum(Xb**2)
for a in range(A):
w_ = wb_[:, [a]]
w_t = w_.T * not_Xmiss
w_t = np.sum(w_t**2, axis=1, keepdims=True)
tb_ = (Xb @ w_) / w_t
if a == 0: tb = tb_
else: tb = np.hstack((tb, tb_))
tb_t = tb_.T * not_Xmiss.T
tb_t = np.sum(tb_t**2, axis=1, keepdims=True)
try: pb_ = (Xb.T @ tb_) / tb_t
except: pb_ = 0
Xb = (Xb - tb_ @ pb_.T) * not_Xmiss
r2pb_aux = 1 - (np.sum(Xb**2)/TSS)
if a == 0: r2pb_ = r2pb_aux
else: r2pb_ = np.hstack((r2pb_, r2pb_aux))
if i == 0: r2pbX = r2pb_
else: r2pbX = np.vstack((r2pbX, r2pb_))
Tb.append(tb)
for a in range(A):
u = pls_obj_['U'][:, [a]].copy()
for i, c in enumerate(Xcols_per_block):
if i == 0: T_a = Tb[i][:, [a]]
else: T_a = np.hstack((T_a, Tb[i][:, [a]]))
wt_ = (T_a.T @ u) / (u.T @ u)
if a == 0: Wt = wt_
else: Wt = np.hstack((Wt, wt_))
pls_obj_['x_means'] = x_means; pls_obj_['x_stds'] = x_stds
pls_obj_['y_means'] = y_means; pls_obj_['y_stds'] = y_stds
pls_obj_['Xblk_scales'] = Xblk_scales; pls_obj_['Yblk_scales'] = Yblk_scales
pls_obj_['Wsb'] = Wsb; pls_obj_['Wt'] = Wt
mx_ = [j for l in x_means for j in l[0]]
pls_obj_['mx'] = np.array(mx_)
sx_ = [j * Xblk_scales[i] for i, l in enumerate(x_stds) for j in l[0]]
pls_obj_['sx'] = np.array(sx_)
my_ = [j for l in y_means for j in l[0]]
pls_obj_['my'] = np.array(my_)
sy_ = [j * Yblk_scales[i] for i, l in enumerate(y_stds) for j in l[0]]
pls_obj_['sy'] = np.array(sy_)
if isinstance(XMB, dict):
for a in range(A-1, 0, -1):
r2pbX[:, [a]] = r2pbX[:, [a]] - r2pbX[:, [a-1]]
r2pbXc = np.cumsum(r2pbX, axis=1)
else:
for a in range(A-1, 0, -1):
r2pbX[a] = r2pbX[a] - r2pbX[a-1]
r2pbXc = np.cumsum(r2pbX)
pls_obj_['r2pbX'] = r2pbX; pls_obj_['r2pbXc'] = r2pbXc
if isinstance(XMB, dict): pls_obj_['Xblocknames'] = XMB['blknames']
if isinstance(YMB, dict): pls_obj_['Yblocknames'] = YMB['blknames']
return pls_obj_
[docs]
def replicate_data(mvm_obj, X, num_replicates, *, as_set=False, rep_Y=False, Y=False):
"""Augment a dataset by adding small noise replicates.
Useful for regularizing models when training data is limited.
Args:
X (pd.DataFrame or np.ndarray): Original data matrix.
n_reps (int): Number of noisy replicates to add. Default ``2``.
noise_level (float): Standard deviation of additive Gaussian noise
relative to each variable's std dev. Default ``0.01``.
Returns:
pd.DataFrame or np.ndarray: Augmented matrix with original +
replicated rows (same type as input).
"""
if not rep_Y:
if 'Q' in mvm_obj:
preds = pls_pred(X, mvm_obj); xhat = preds['Xhat']; tnew = preds['Tnew']
else:
preds = pca_pred(X, mvm_obj); xhat = preds['Xhat']; tnew = preds['Tnew']
xhat = tnew @ mvm_obj['P'].T
xmcs = (X.values[:,1:] - mvm_obj['mx']) / mvm_obj['sx']
data_residuals = xmcs - xhat
if not as_set:
if np.mod(num_replicates, X.shape[0]) == 0:
new_set = np.tile(xhat, (int(num_replicates/X.shape[0]), 1))
else:
reps = int(np.floor(num_replicates/X.shape[0]))
new_set = np.tile(xhat, (reps, 1))
new_set = np.vstack((new_set, xhat[:np.mod(num_replicates, X.shape[0]), :]))
obsids = ['clone'+str(i) for i in np.arange(new_set.shape[0])+1]
else:
new_set = np.tile(xhat, (num_replicates, 1))
obsids = []
obsid_o = X[X.columns[0]].values.astype(str).tolist()
for i in np.arange(num_replicates)+1:
obsids.extend([m+'_clone'+str(i) for m in obsid_o])
uncertainty_matrix = np.empty((new_set.shape[0], 0))
for i in range(data_residuals.shape[1]):
ecdf = ECDF(data_residuals[:, i])
new_residual = np.random.uniform(0, 1, new_set.shape[0])
y = np.array(ecdf.y.tolist()); x = np.array(ecdf.x.tolist())
new_residual = np.interp(new_residual, y[1:], x[1:])
uncertainty_matrix = np.hstack((uncertainty_matrix, new_residual.reshape(-1, 1)))
new_set = (new_set + uncertainty_matrix) * mvm_obj['sx'] + mvm_obj['mx']
new_set_pd = pd.DataFrame(new_set, columns=X.columns[1:].tolist())
new_set_pd.insert(0, X.columns[0], obsids)
return new_set_pd
elif 'Q' in mvm_obj:
preds = pls_pred(X, mvm_obj); yhat = preds['Yhat']; tnew = preds['Tnew']
yhat = tnew @ mvm_obj['Q'].T
ymcs = (Y.values[:,1:] - mvm_obj['my']) / mvm_obj['sy']
data_residuals = ymcs - yhat
if not as_set:
if np.mod(num_replicates, Y.shape[0]) == 0:
new_set = np.tile(yhat, (int(num_replicates/Y.shape[0]), 1))
else:
reps = int(np.floor(num_replicates/Y.shape[0]))
new_set = np.tile(yhat, (reps, 1))
new_set = np.vstack((new_set, yhat[:np.mod(num_replicates, Y.shape[0]), :]))
obsids = ['clone'+str(i) for i in np.arange(new_set.shape[0])+1]
else:
new_set = np.tile(yhat, (num_replicates, 1))
obsids = []
obsid_o = Y[Y.columns[0]].values.astype(str).tolist()
for i in np.arange(num_replicates)+1:
obsids.extend([m+'_clone'+str(i) for m in obsid_o])
uncertainty_matrix = np.empty((new_set.shape[0], 0))
for i in range(data_residuals.shape[1]):
ecdf = ECDF(data_residuals[:, i])
new_residual = np.random.uniform(0, 1, new_set.shape[0])
y = np.array(ecdf.y.tolist()); x = np.array(ecdf.x.tolist())
new_residual = np.interp(new_residual, y[1:], x[1:])
uncertainty_matrix = np.hstack((uncertainty_matrix, new_residual.reshape(-1, 1)))
new_set = (new_set + uncertainty_matrix) * mvm_obj['sy'] + mvm_obj['my']
new_set_pd = pd.DataFrame(new_set, columns=Y.columns[1:].tolist())
new_set_pd.insert(0, Y.columns[0], obsids)
return new_set_pd
else:
print('You need a PLS model to replicate Y')
[docs]
def export_2_gproms(mvmobj, *, fname='phi_export.txt'):
'''Export PLS model to gPROMS syntax.'''
top_lines = [
'PARAMETER', 'X_VARS AS ORDERED_SET', 'Y_VARS AS ORDERED_SET', 'A AS INTEGER',
'VARIABLE',
'X_MEANS as ARRAY(X_VARS) OF no_type', 'X_STD AS ARRAY(X_VARS) OF no_type',
'Y_MEANS as ARRAY(Y_VARS) OF no_type', 'Y_STD AS ARRAY(Y_VARS) OF no_type',
'Ws AS ARRAY(X_VARS,A) OF no_type', 'Q AS ARRAY(Y_VARS,A) OF no_type',
'P AS ARRAY(X_VARS,A) OF no_type', 'T AS ARRAY(A) OF no_type',
'Tvar AS ARRAY(A) OF no_type',
'X_HAT AS ARRAY(X_VARS) OF no_type # Mean-centered and scaled',
'Y_HAT AS ARRAY(Y_VARS) OF no_type # Mean-centered and scaled',
'X_PRED AS ARRAY(X_VARS) OF no_type # In original units',
'Y_PRED AS ARRAY(Y_VARS) OF no_type # In original units',
'X_NEW AS ARRAY(X_VARS) OF no_type # In original units',
'X_MCS AS ARRAY(X_VARS) OF no_type # Mean-centered and scaled',
'HT2 AS no_type', 'SPEX AS no_type', 'SET']
x_var_line = "X_VARS:=['" + "','".join(mvmobj['varidX']) + "'];"
y_var_line = "Y_VARS:=['" + "','".join(mvmobj['varidY']) + "'];"
top_lines.extend([x_var_line, y_var_line, 'A:='+str(mvmobj['T'].shape[1])+';'])
mid_lines = [
'EQUATION', 'X_MCS * X_STD = (X_NEW-X_MEANS);',
'FOR j:=1 TO A DO', 'T(j) = SIGMA(X_MCS*Ws(,j));', 'END',
'FOR i IN Y_VARS DO', 'Y_HAT(i) = SIGMA(T*Q(i,));', 'END',
'FOR i IN X_VARS DO', 'X_HAT(i) = SIGMA(T*P(i,));', 'END',
'(X_HAT * X_STD) + X_MEANS = X_PRED;', '(Y_HAT * Y_STD) + Y_MEANS = Y_PRED;',
'HT2 = SIGMA ((T^2)/Tvar);', 'SPEX = SIGMA ((X_MCS - X_HAT)^2);']
assign_lines = ['ASSIGN']
for i, xvar in enumerate(mvmobj['varidX']):
assign_lines.append("X_MEANS('"+xvar+"') := "+str(mvmobj['mx'][0,i])+";" )
for i, xvar in enumerate(mvmobj['varidX']):
assign_lines.append("X_STD('"+xvar+"') := "+str(mvmobj['sx'][0,i])+";" )
for i, yvar in enumerate(mvmobj['varidY']):
assign_lines.append("Y_MEANS('"+yvar+"') := "+str(mvmobj['my'][0,i])+";" )
for i, yvar in enumerate(mvmobj['varidY']):
assign_lines.append("Y_STD('"+yvar+"') := "+str(mvmobj['sy'][0,i])+";" )
for i, xvar in enumerate(mvmobj['varidX']):
for j in np.arange(mvmobj['Ws'].shape[1]):
assign_lines.append("Ws('"+xvar+"',"+str(j+1)+") := "+str(mvmobj['Ws'][i,j])+";")
for i, xvar in enumerate(mvmobj['varidX']):
for j in np.arange(mvmobj['P'].shape[1]):
assign_lines.append("P('"+xvar+"',"+str(j+1)+") := "+str(mvmobj['P'][i,j])+";")
for i, yvar in enumerate(mvmobj['varidY']):
for j in np.arange(mvmobj['Q'].shape[1]):
assign_lines.append("Q('"+yvar+"',"+str(j+1)+") := "+str(mvmobj['Q'][i,j])+";")
tvar = np.std(mvmobj['T'], axis=0, ddof=1)
for j in np.arange(mvmobj['T'].shape[1]):
assign_lines.append("Tvar("+str(j+1)+") := "+str(tvar[j])+";" )
lines = top_lines + mid_lines + assign_lines
with open(fname, "w") as outfile:
outfile.write("\n".join(lines))
return
# =============================================================================
# Parsing utilities
# =============================================================================
[docs]
def unique(df, colid):
"""Return unique values preserving original order.
Args:
x (list or np.ndarray): Input sequence.
Returns:
list: Unique values in the order they first appear.
"""
return df.drop_duplicates(subset=colid, keep='first')[colid].values.tolist()
[docs]
def parse_materials(filename, sheetname):
"""Build JR matrices for JRPLS from a linear materials table in Excel.
Reads a structured Excel sheet describing the composition of finished
product lots in terms of their constituent material lots and quantities
(or ratios). Validates that every finished product lot has a material lot
assigned for each material, then constructs one JR matrix per material.
Each JR matrix is a DataFrame with one row per finished product lot and
one column per material lot for that material. The cell values are the
ratio or quantity of that material lot used in that finished product lot
(0 if not used).
The input Excel sheet must contain the following columns (in any order):
- ``Finished Product Lot``: identifier for the finished product lot.
- ``Material Lot``: identifier for the specific lot of the raw material.
- ``Ratio or Quantity``: numeric contribution of this material lot to the
finished product lot (e.g. mass fraction or absolute quantity).
- ``Material``: name or identifier for the material type.
Note:
A summary of the ratio/quantity sum is printed for each finished
product lot as a simple consistency check. The function prints
diagnostic messages to stdout and returns ``(False, False)`` if any
material lot assignment is missing.
Args:
filename (str): Path to the Excel workbook containing the materials
table.
sheetname (str): Name of the worksheet within the workbook to read.
Returns:
tuple: A two-element tuple ``(JR, materials_used)``:
- **JR** (list of pandas.DataFrame): One DataFrame per material
(in the same order as ``materials_used``). Each DataFrame has
shape ``(n_fp_lots, n_material_lots + 1)``, where the first
column ``FPLot`` holds the finished product lot identifiers and
the remaining columns are named after the material lots.
- **materials_used** (list of str): Ordered list of unique
material names found in the sheet, corresponding to the
entries in ``JR``.
If validation fails, both elements are ``False``.
Raises:
FileNotFoundError: If ``filename`` does not point to an existing file.
ValueError: If ``sheetname`` is not found in the workbook.
Example:
>>> JR, materials = parse_materials("batch_records.xlsx", "Blends")
Lot :Lot_A ratio/qty adds to 1.0
Lot :Lot_B ratio/qty adds to 1.0
>>> len(JR) # one matrix per material
3
>>> JR[0].columns.tolist()
['FPLot', 'MatLot_1', 'MatLot_2']
>>> materials
['Excipient_A', 'Excipient_B', 'API']
"""
materials = pd.read_excel(filename, sheet_name=sheetname)
ok = True
for lot in unique(materials, 'Finished Product Lot'):
this_lot = materials[materials["Finished Product Lot"]==lot]
for mt, m in zip(this_lot['Material'].values, this_lot['Material Lot'].values):
try:
if np.isnan(m):
print('Lot '+lot+' has no Material Lot for '+mt); ok = False; break
except:
pass
if not ok: break
print('Lot :'+lot+' ratio/qty adds to '+str(np.sum(this_lot['Ratio or Quantity'].values)))
if ok:
JR = []
materials_used = unique(materials, 'Material')
fp_lots = unique(materials, 'Finished Product Lot')
for m in materials_used:
mat_lots = np.unique(materials['Material Lot'][materials['Material']==m]).tolist()
r_mat = []
for lot in fp_lots:
rvec = np.zeros(len(mat_lots))
this_lot_this_mat = materials[(materials["Finished Product Lot"]==lot) & (materials['Material']==m)]
for l, r in zip(this_lot_this_mat['Material Lot'].values, this_lot_this_mat['Ratio or Quantity'].values):
rvec[mat_lots.index(l)] = r
r_mat.append(rvec)
r_mat_pd = pd.DataFrame(np.array(r_mat), columns=mat_lots)
r_mat_pd.insert(0, 'FPLot', fp_lots)
JR.append(r_mat_pd)
return JR, materials_used
else:
print('Data needs revision'); return False, False
[docs]
def isin_ordered_col0(df, alist):
df_ = df[df[df.columns[0]].isin(alist)].set_index(df.columns[0]).reindex(alist).reset_index()
return df_
[docs]
def reconcile_rows(df_list):
"""Align two DataFrames by their observation IDs (first column).
Reorders Y to match the row order of X. Observations present in one
but not the other are dropped, with a warning printed.
Args:
X (pd.DataFrame): Reference DataFrame. First column is observation IDs.
Y (pd.DataFrame): DataFrame to align. First column is observation IDs.
Returns:
tuple: ``(X_aligned, Y_aligned)`` — DataFrames sharing the same
ordered set of observation IDs.
"""
all_rows = []
for df in df_list:
all_rows.extend(df[df.columns[0]].values.tolist())
all_rows = np.unique(all_rows)
for df in df_list:
rows = df[df.columns[0]].values.tolist()
all_rows = [r for r in all_rows if r in rows]
return [isin_ordered_col0(df, all_rows) for df in df_list]
[docs]
def reconcile_rows_to_columns(df_list_r, df_list_c):
"""Map DataFrame rows to the columns of another DataFrame.
Used in L-shaped data structures where material lot IDs appear as
column headers in X and as row IDs in R.
Args:
X (pd.DataFrame): Process data where columns (after the first)
correspond to lot IDs.
R (pd.DataFrame): Material property data where the first column
contains lot IDs.
Returns:
tuple: ``(X_matched, R_matched)`` — aligned matrices ready for LPLS.
"""
df_list_r_o = []; df_list_c_o = []
for dfr, dfc in zip(df_list_r, df_list_c):
all_ids = list(set(dfc.columns[1:].tolist()) & set(dfr[dfr.columns[0]].values.tolist()))
# Preserve order from dfr
rows = dfr[dfr.columns[0]].values.tolist()
cols = dfc.columns[1:].tolist()
all_ids = [i for i in rows if i in cols]
dfr_ = isin_ordered_col0(dfr, all_ids)
dfc_ = dfc[all_ids].copy()
dfc_.insert(0, dfc.columns[0], dfc[dfc.columns[0]].values.tolist())
df_list_r_o.append(dfr_); df_list_c_o.append(dfc_)
return df_list_r_o, df_list_c_o
# =============================================================================
# LPLS, JRPLS, TPLS (use optimized _Ab_btbinv)
# =============================================================================
[docs]
def lpls(X, R, Y, A, *, shush=False):
"""Fit an L-shaped PLS (LPLS) model.
Models the relationship between lot physical properties (R),
process observations (X), and product quality (Y), where X rows
correspond to lots described by R columns.
Per Muteki et al., Chemom. Intell. Lab. Syst. 85 (2007) 186–194.
Args:
X (pd.DataFrame or np.ndarray): Process data matrix (n_obs × n_x).
First column is observation IDs if a DataFrame.
R (pd.DataFrame or np.ndarray): Raw material property matrix
(n_lots × n_r). Columns of X map to rows of R.
Y (pd.DataFrame or np.ndarray): Quality/response matrix
(n_lots × n_y). Rows match rows of R.
A (int): Number of latent variables.
shush (bool): Suppress printed output. Default ``False``.
Returns:
dict: Fitted LPLS model with keys:
- ``T`` (ndarray): X-space scores (n_obs × A).
- ``P`` (ndarray): X-loadings (n_x × A).
- ``Q`` (ndarray): Y-loadings (n_y × A).
- ``H`` (ndarray): R-space scores (n_lots × A).
- ``V`` (ndarray): R-space loadings (n_r × A).
- ``Rscores`` (ndarray): R projected scores.
- ``Ss`` (ndarray): Rotated R weights S*(V'S)⁻¹.
- ``r2x``, ``r2xpv``: R² for X space.
- ``r2y``, ``r2ypv``: R² for Y space.
- ``r2r``, ``r2rpv``: R² for R space.
- ``mx``, ``sx``, ``my``, ``sy``, ``mr``, ``sr``: Preprocessing params.
- ``var_t``: Score covariance matrix.
- ``T2``, ``T2_lim95``, ``T2_lim99``: Hotelling's T² and limits.
- ``speX``, ``speX_lim95``, ``speX_lim99``: X SPE and limits.
- ``speY``, ``speY_lim95``, ``speY_lim99``: Y SPE and limits.
- ``speR``, ``speR_lim95``, ``speR_lim99``: R SPE and limits.
"""
# Validate R-Y (blend observations must match row-to-row)
R, Y = _validate_inputs(R, Y, A=A)
X_, obsidX, varidX = _extract_array(X)
Y_, obsidY, varidY = _extract_array(Y)
R_, obsidR, varidR = _extract_array(R)
X_, x_mean, x_std = meancenterscale(X_)
Y_, y_mean, y_std = meancenterscale(Y_)
R_, r_mean, r_std = meancenterscale(R_)
not_Xmiss = (~np.isnan(X_)) * 1
not_Ymiss = (~np.isnan(Y_)) * 1
not_Rmiss = (~np.isnan(R_)) * 1
if not shush:
print('phi.lpls using NIPALS executed on: ' + str(datetime.datetime.now()))
X_, dummy = n2z(X_); Xhat = np.zeros(X_.shape)
Y_, dummy = n2z(Y_); R_, dummy = n2z(R_)
epsilon = 1E-9; maxit = 2000
TSSX = np.sum(X_**2); TSSXpv = np.sum(X_**2, axis=0)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
TSSR = np.sum(R_**2); TSSRpv = np.sum(R_**2, axis=0)
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None; r2R = None; r2Rpv = None
for a in range(A):
ui = Y_[:, [np.argmax(std(Y_))]]
Converged = False; num_it = 0
while not Converged:
hi = _Ab_btbinv(R_.T, ui, not_Rmiss.T)
si = _Ab_btbinv(X_.T, hi, not_Xmiss.T)
si = si / np.linalg.norm(si)
ri = _Ab_btbinv(X_, si, not_Xmiss)
ti = _Ab_btbinv(R_, ri, not_Rmiss)
qi = _Ab_btbinv(Y_.T, ti, not_Ymiss.T)
un = _Ab_btbinv(Y_, qi, not_Ymiss)
if abs((np.linalg.norm(ui) - np.linalg.norm(un))) / np.linalg.norm(ui) < epsilon:
Converged = True
if num_it > maxit:
Converged = True
if Converged:
if not shush: print('# Iterations for LV #'+str(a+1)+': ', str(num_it))
pi = _Ab_btbinv(R_.T, ti, not_Rmiss.T)
vi = _Ab_btbinv(X_.T, ri, not_Xmiss.T)
R_ = (R_ - ti @ pi.T) * not_Rmiss
X_ = (X_ - ri @ vi.T) * not_Xmiss
Y_ = (Y_ - ti @ qi.T) * not_Ymiss
Xhat = Xhat + ri @ vi.T
if a == 0:
T=ti; P=pi; S=si; U=un; Q=qi; H=hi; V=vi; Rscores=ri
else:
T=np.hstack((T,ti)); U=np.hstack((U,un)); P=np.hstack((P,pi))
Q=np.hstack((Q,qi)); S=np.hstack((S,si)); H=np.hstack((H,hi))
V=np.hstack((V,vi)); Rscores=np.hstack((Rscores,ri))
r2X, r2Xpv = _calc_r2(X_, TSSX, TSSXpv, r2X, r2Xpv)
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv, r2Y, r2Ypv)
r2R, r2Rpv = _calc_r2(R_, TSSR, TSSRpv, r2R, r2Rpv)
else:
num_it += 1; ui = un
if a == 0: numIT = num_it
else: numIT = np.hstack((numIT, num_it))
Xhat = Xhat * x_std + x_mean
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
r2R, r2Rpv = _r2_cumulative_to_per_component(r2R, r2Rpv, A)
eigs = np.var(T, axis=0)
r2xc = np.cumsum(r2X); r2yc = np.cumsum(r2Y); r2rc = np.cumsum(r2R)
if not shush:
print('--------------------------------------------------------------')
print('LV # Eig R2X sum(R2X) R2R sum(R2R) R2Y sum(R2Y)')
if A > 1:
for a in range(A):
print("LV #"+str(a+1)+": {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2X[a], r2xc[a], r2R[a], r2rc[a], r2Y[a], r2yc[a]))
else:
print("LV #1: {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2X, r2xc[0], r2R, r2rc[0], r2Y, r2yc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
lpls_obj = {'T':T, 'P':P, 'Q':Q, 'U':U, 'S':S, 'H':H, 'V':V, 'Rscores':Rscores,
'r2x':r2X, 'r2xpv':r2Xpv, 'mx':x_mean, 'sx':x_std,
'r2y':r2Y, 'r2ypv':r2Ypv, 'my':y_mean, 'sy':y_std,
'r2r':r2R, 'r2rpv':r2Rpv, 'mr':r_mean, 'sr':r_std,
'Xhat':Xhat, 'var_t':var_t}
if not isinstance(obsidX, bool): lpls_obj['obsidX'] = obsidX; lpls_obj['varidX'] = varidX
if not isinstance(obsidY, bool): lpls_obj['obsidY'] = obsidY; lpls_obj['varidY'] = varidY
if not isinstance(obsidR, bool): lpls_obj['obsidR'] = obsidR; lpls_obj['varidR'] = varidR
T2 = hott2(lpls_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = np.sum(X_**2, axis=1, keepdims=1); speX_lim95, speX_lim99 = spe_ci(speX)
speY = np.sum(Y_**2, axis=1, keepdims=1); speY_lim95, speY_lim99 = spe_ci(speY)
speR = np.sum(R_**2, axis=1, keepdims=1); speR_lim95, speR_lim99 = spe_ci(speR)
lpls_obj.update({'T2':T2, 'T2_lim99':T2_lim99, 'T2_lim95':T2_lim95,
'speX':speX, 'speX_lim99':speX_lim99, 'speX_lim95':speX_lim95,
'speY':speY, 'speY_lim99':speY_lim99, 'speY_lim95':speY_lim95,
'speR':speR, 'speR_lim99':speR_lim99, 'speR_lim95':speR_lim95})
lpls_obj['Ss'] = S @ np.linalg.pinv(V.T @ S)
lpls_obj['type'] = 'lpls'
return lpls_obj
[docs]
def lpls_pred(rnew, lpls_obj):
"""Predict Y for new lot(s) using a fitted LPLS model.
Args:
rnew (np.ndarray or pd.DataFrame): R-space observation(s) for new
lot(s). Variables must match those in ``lpls_obj``.
lpls_obj (dict): Fitted LPLS model from :func:`lpls`.
Returns:
dict: Prediction results with keys:
- ``Tnew`` (ndarray): Projected scores (n_new × A).
- ``Yhat`` (ndarray): Predicted Y in original scale.
- ``speR`` (ndarray): R-space SPE for each new lot.
"""
if isinstance(rnew, np.ndarray): rnew__ = [rnew.copy()]
elif isinstance(rnew, list): rnew__ = np.array(rnew)
elif isinstance(rnew, pd.DataFrame): rnew__ = rnew.values[:,1:].astype(float)
tnew = []; sper = []
for rnew_ in rnew__:
rnew_ = ((rnew_ - lpls_obj['mr']) / lpls_obj['sr']).reshape(-1, 1)
ti = []
for a in np.arange(lpls_obj['T'].shape[1]):
ti_ = (rnew_.T @ lpls_obj['Rscores'][:,a] / (lpls_obj['Rscores'][:,a].T @ lpls_obj['Rscores'][:,a]))
ti.append(ti_[0])
rnew_ = rnew_ - (ti_ * lpls_obj['P'][:,a]).reshape(-1, 1)
tnew.append(np.array(ti))
sper.append(np.sum(rnew_**2))
tnew = np.array(tnew); sper = np.array(sper)
yhat = (tnew @ lpls_obj['Q'].T) * lpls_obj['sy'] + lpls_obj['my']
return {'Tnew':tnew, 'Yhat':yhat, 'speR':sper}
[docs]
def jrpls(Xi, Ri, Y, A, *, shush=False):
"""Fit a Joint R-LPLS (JRPLS) model across multiple campaigns.
Extends LPLS to handle multiple manufacturing campaigns, each with
their own X (process) and R (raw material) blocks sharing a common Y.
Per Garcia-Munoz, Chemom. Intell. Lab. Syst. 133 (2014) 49–62.
Args:
Xi (dict): Raw material property blocks ``{'Material Type': pd.DataFrame}``.
Each DataFrame's first column is observation IDs.
Ri (dict): Blending matrices ``{'Material Type': pd.DataFrame}``.
Keys must match ``Xi``. First column is lot IDs.
Y (pd.DataFrame or np.ndarray): Shared response matrix. Rows match
lots across all campaigns.
A (int): Number of latent variables.
shush (bool): Suppress printed output. Default ``False``.
Returns:
dict: Fitted JRPLS model with per-campaign sub-dicts and shared keys.
Structure mirrors :func:`lpls` output but indexed by campaign.
"""
X = []; varidX = []; obsidX = []
materials = list(Xi.keys())
for k in Xi.keys():
X_, obsidX_, varidX_ = _extract_array(Xi[k])
X.append(X_); varidX.append(varidX_); obsidX.append(obsidX_)
Y_, obsidY, varidY = _extract_array(Y)
R = []; varidR = []; obsidR = []
for k in materials:
R_, obsidR_, varidR_ = _extract_array(Ri[k])
varidR.append(varidR_); obsidR.append(obsidR_); R.append(R_)
x_mean = []; x_std = []; jr_scale = []; r_mean = []; r_std = []
not_Xmiss = []; not_Rmiss = []; Xhat = []
TSSX = []; TSSXpv = []; TSSR = []; TSSRpv = []
X__ = []; R__ = []
for X_i, R_i in zip(X, R):
X_i, x_mean_, x_std_ = meancenterscale(X_i)
R_i, r_mean_, r_std_ = meancenterscale(R_i)
jr_scale_ = np.sqrt(X_i.shape[1])
X_i = X_i / jr_scale_
x_mean.append(x_mean_); x_std.append(x_std_); jr_scale.append(jr_scale_)
r_mean.append(r_mean_); r_std.append(r_std_)
not_Xmiss.append((~np.isnan(X_i)) * 1)
not_Rmiss.append((~np.isnan(R_i)) * 1)
X_i, dummy = n2z(X_i); R_i, dummy = n2z(R_i)
X__.append(X_i); R__.append(R_i)
Xhat.append(np.zeros(X_i.shape))
TSSX.append(np.sum(X_i**2)); TSSXpv.append(np.sum(X_i**2, axis=0))
TSSR.append(np.sum(R_i**2)); TSSRpv.append(np.sum(R_i**2, axis=0))
X = X__; R = R__
Y_, y_mean, y_std = meancenterscale(Y_)
not_Ymiss = (~np.isnan(Y_)) * 1
Y_, dummy = n2z(Y_)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
if not shush:
print('phi.jrpls using NIPALS executed on: ' + str(datetime.datetime.now()))
epsilon = 1E-9; maxit = 2000
for a in range(A):
ui = Y_[:, [np.argmax(std(Y_))]]
Converged = False; num_it = 0
while not Converged:
hi = [_Ab_btbinv(R[i].T, ui, not_Rmiss[i].T) for i in range(len(R))]
si = [_Ab_btbinv(X[i].T, hi[i], not_Xmiss[i].T) for i in range(len(X))]
js = np.array([y for x in si for y in x])
for i in range(len(si)):
si[i] = si[i] / np.linalg.norm(js)
ri = [_Ab_btbinv(X[i], si[i], not_Xmiss[i]) for i in range(len(X))]
jr = np.array([y for x in ri for y in x]).astype(float)
R_ = np.hstack(R); not_Rmiss_ = np.hstack(not_Rmiss)
ti = _Ab_btbinv(R_, jr, not_Rmiss_)
qi = _Ab_btbinv(Y_.T, ti, not_Ymiss.T)
un = _Ab_btbinv(Y_, qi, not_Ymiss)
if abs((np.linalg.norm(ui) - np.linalg.norm(un))) / np.linalg.norm(ui) < epsilon:
Converged = True
if num_it > maxit:
Converged = True
if Converged:
if not shush:
print('# Iterations for LV #'+str(a+1)+': ', str(num_it))
pi = [_Ab_btbinv(R[i].T, ti, not_Rmiss[i].T) for i in range(len(R))]
vi = [_Ab_btbinv(X[i].T, ri[i], not_Xmiss[i].T) for i in range(len(X))]
for i in range(len(R)):
R[i] = (R[i] - ti @ pi[i].T) * not_Rmiss[i]
X[i] = (X[i] - ri[i] @ vi[i].T) * not_Xmiss[i]
Xhat[i] = Xhat[i] + ri[i] @ vi[i].T
Y_ = (Y_ - ti @ qi.T) * not_Ymiss
if a == 0:
T=ti; P=pi; S=si; U=un; Q=qi; H=hi; V=vi; Rscores=ri
r2X = []; r2Xpv = []; r2R = []; r2Rpv = []
for i in range(len(X)):
r2X.append(1 - np.sum(X[i]**2)/TSSX[i])
r2Xpv.append((1 - np.sum(X[i]**2, axis=0)/TSSXpv[i]).reshape(-1, 1))
r2R.append(1 - np.sum(R[i]**2)/TSSR[i])
r2Rpv.append((1 - np.sum(R[i]**2, axis=0)/TSSRpv[i]).reshape(-1, 1))
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv)
else:
T = np.hstack((T, ti)); U = np.hstack((U, un)); Q = np.hstack((Q, qi))
for i in range(len(P)): P[i] = np.hstack((P[i], pi[i]))
for i in range(len(S)): S[i] = np.hstack((S[i], si[i]))
for i in range(len(H)): H[i] = np.hstack((H[i], hi[i]))
for i in range(len(V)): V[i] = np.hstack((V[i], vi[i]))
for i in range(len(Rscores)): Rscores[i] = np.hstack((Rscores[i], ri[i]))
for i in range(len(X)):
r2X[i] = np.hstack((r2X[i], 1 - np.sum(X[i]**2)/TSSX[i]))
r2Xpv[i] = np.hstack((r2Xpv[i], (1 - np.sum(X[i]**2, axis=0)/TSSXpv[i]).reshape(-1, 1)))
r2R[i] = np.hstack((r2R[i], 1 - np.sum(R[i]**2)/TSSR[i]))
r2Rpv[i] = np.hstack((r2Rpv[i], (1 - np.sum(R[i]**2, axis=0)/TSSRpv[i]).reshape(-1, 1)))
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv, r2Y, r2Ypv)
else:
num_it += 1; ui = un
if a == 0: numIT = num_it
else: numIT = np.hstack((numIT, num_it))
for i in range(len(Xhat)):
Xhat[i] = Xhat[i] * x_std[i] + x_mean[i]
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
r2xc = []; r2rc = []
for i in range(len(X)):
for a in range(A-1, 0, -1):
r2X[i][a] = r2X[i][a] - r2X[i][a-1]
r2Xpv[i][:, a] = r2Xpv[i][:, a] - r2Xpv[i][:, a-1]
r2R[i][a] = r2R[i][a] - r2R[i][a-1]
r2Rpv[i][:, a] = r2Rpv[i][:, a] - r2Rpv[i][:, a-1]
for i, r in enumerate(r2Xpv):
if i == 0: r2xpv_all = r
else: r2xpv_all = np.vstack((r2xpv_all, r))
r2xc.append(np.cumsum(r2X[i]))
r2rc.append(np.cumsum(r2R[i]))
eigs = np.var(T, axis=0)
r2yc = np.cumsum(r2Y)
r2rc = np.mean(np.array(r2rc), axis=0)
r2xc = np.mean(np.array(r2xc), axis=0)
r2x = np.mean(np.array(r2X), axis=0)
r2r = np.mean(np.array(r2R), axis=0)
if not shush:
print('--------------------------------------------------------------')
print('LV # Eig R2X sum(R2X) R2R sum(R2R) R2Y sum(R2Y)')
if A > 1:
for a in range(A):
print("LV #"+str(a+1)+": {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[a], r2x[a], r2xc[a], r2r[a], r2rc[a], r2Y[a], r2yc[a]))
else:
print("LV #1: {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(eigs[0], r2x, r2xc[0], r2r, r2rc[0], r2Y, r2yc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
jrpls_obj = {'T':T, 'P':P, 'Q':Q, 'U':U, 'S':S, 'H':H, 'V':V, 'Rscores':Rscores,
'r2xi':r2X, 'r2xpvi':r2Xpv, 'r2xpv':r2xpv_all,
'mx':x_mean, 'sx':x_std,
'r2y':r2Y, 'r2ypv':r2Ypv, 'my':y_mean, 'sy':y_std,
'r2ri':r2R, 'r2rpvi':r2Rpv, 'mr':r_mean, 'sr':r_std,
'Xhat':Xhat, 'materials':materials, 'var_t':var_t}
if not isinstance(obsidX[0], bool):
jrpls_obj['obsidXi'] = obsidX; jrpls_obj['varidXi'] = varidX
varidXall = [materials[i]+':'+varidX[i][j] for i in range(len(materials)) for j in range(len(varidX[i]))]
jrpls_obj['varidX'] = varidXall
if not isinstance(obsidR[0], bool):
jrpls_obj['obsidRi'] = obsidR; jrpls_obj['varidRi'] = varidR
if not isinstance(obsidY, bool):
jrpls_obj['obsidY'] = obsidY; jrpls_obj['varidY'] = varidY
T2 = hott2(jrpls_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = []; speR = []; speX_lim95 = []; speX_lim99 = []; speR_lim95 = []; speR_lim99 = []
for i in range(len(X)):
speX.append(np.sum(X[i]**2, axis=1, keepdims=1))
s95, s99 = spe_ci(np.sum(X[i]**2, axis=1, keepdims=1))
speX_lim95.append(s95); speX_lim99.append(s99)
speR.append(np.sum(R[i]**2, axis=1, keepdims=1))
s95, s99 = spe_ci(np.sum(R[i]**2, axis=1, keepdims=1))
speR_lim95.append(s95); speR_lim99.append(s99)
speY = np.sum(Y_**2, axis=1, keepdims=1)
speY_lim95, speY_lim99 = spe_ci(speY)
jrpls_obj.update({'T2':T2, 'T2_lim99':T2_lim99, 'T2_lim95':T2_lim95,
'speX':speX, 'speX_lim99':speX_lim99, 'speX_lim95':speX_lim95,
'speY':speY, 'speY_lim99':speY_lim99, 'speY_lim95':speY_lim95,
'speR':speR, 'speR_lim99':speR_lim99, 'speR_lim95':speR_lim95})
Wsi = []; Ws = []
for i in range(len(S)):
Wsi.append(S[i] @ np.linalg.pinv(V[i].T @ S[i]))
if i == 0: Ws = S[i] @ np.linalg.pinv(V[i].T @ S[i])
else: Ws = np.vstack((Ws, S[i] @ np.linalg.pinv(V[i].T @ S[i])))
jrpls_obj['Ssi'] = Wsi; jrpls_obj['Ss'] = Ws
jrpls_obj['type'] = 'jrpls'
return jrpls_obj
[docs]
def jrpls_pred(rnew, jrplsobj):
"""Predict Y for a new observation using a fitted JRPLS model.
Args:
xnew (pd.DataFrame or np.ndarray): New process observation(s).
Variables must match the specified campaign's X block.
rnew (pd.DataFrame or np.ndarray): New raw material lot properties.
Variables must match the specified campaign's R block.
campaign (str): Name of the campaign this observation belongs to.
jrpls_obj (dict): Fitted JRPLS model from :func:`jrpls`.
Returns:
dict: Prediction results with keys:
- ``Tnew`` (ndarray): Projected X-scores.
- ``Yhat`` (ndarray): Predicted Y in original scale.
- ``speX`` (ndarray): X-space SPE.
- ``speR`` (ndarray): R-space SPE.
- ``T2`` (ndarray): Hotelling's T².
Example:
rnew={
'MAT1': [('A0129',0.557949425 ),('A0130',0.442050575 )],
'MAT2': [('Lac0003',1)],
'MAT3': [('TLC018', 1) ],
'MAT4': [('M0012', 1) ],
'MAT5':[('CS0017', 1) ]
}
"""
ok = True
if isinstance(rnew, list):
i = 0
for r, mr, sr in zip(rnew, jrplsobj['mr'], jrplsobj['sr']):
if not(len(r) == len(mr[0])):
ok = False
if i == 0:
rnew_ = r; mr_ = mr; sr_ = sr
Rscores = jrplsobj['Rscores'][i]
P = jrplsobj['P'][i]
else:
rnew_ = np.hstack((rnew_, r))
mr_ = np.hstack((mr_, mr)); sr_ = np.hstack((sr_, sr))
Rscores = np.vstack((Rscores, jrplsobj['Rscores'][i]))
P = np.vstack((P, jrplsobj['P'][i]))
i += 1
elif isinstance(rnew, dict):
rnew_ = [['*']] * len(jrplsobj['materials'])
for k in list(rnew.keys()):
i = jrplsobj['materials'].index(k)
ri = np.zeros((jrplsobj['mr'][i].shape[1]))
for m, r in rnew[k]:
e = jrplsobj['varidRi'][i].index(m)
ri[e] = r
rnew_[i] = ri
return jrpls_pred(rnew_, jrplsobj)
if ok:
bkzeros = 0; selmat = []
for i, r in enumerate(jrplsobj['Rscores']):
frontzeros = Rscores.shape[0] - bkzeros - r.shape[0]
row = np.vstack((np.zeros((bkzeros, 1)), np.ones((r.shape[0], 1)), np.zeros((frontzeros, 1))))
bkzeros += r.shape[0]; selmat.append(row)
rnew_ = ((rnew_ - mr_) / sr_).reshape(-1, 1)
ti = []
for a in np.arange(jrplsobj['T'].shape[1]):
ti_ = (rnew_.T @ Rscores[:, a] / (Rscores[:, a].T @ Rscores[:, a]))
ti.append(ti_[0])
rnew_ = rnew_ - (ti_ * P[:, a]).reshape(-1, 1)
tnew = np.array(ti)
sper = [np.sum(rnew_[row == 1]**2) for row in selmat]
yhat = (tnew @ jrplsobj['Q'].T) * jrplsobj['sy'] + jrplsobj['my']
return {'Tnew':tnew, 'Yhat':yhat, 'speR':sper}
else:
return 'dimensions of rnew did not match model'
[docs]
def tpls(Xi, Ri, Z, Y, A, *, shush=False):
"""Fit a TPLS model.
Models relationships between time-varying process trajectories (Z),
raw material properties (R), and product quality (Y).
Args:
Z (pd.DataFrame or np.ndarray): Data from Process matrix.
First column is observation IDs if a DataFrame.
Xi (dict): Material Property data blocks ``{'Material Type': pd.DataFrame}``.
Each DataFrame's first column is observation IDs.
Ri (dict): Blending information property blocks ``{'Material Type': pd.DataFrame}``.
Keys must match ``Xi``. First column is lot IDs.
Y (pd.DataFrame or np.ndarray): Shared response matrix. Rows match
lots across all campaigns.
A (int): Number of latent variables.
shush (bool): Suppress printed output. Default ``False``.
Returns:
dict: Fitted TPLS model. Keys mirror :func:`jrpls` with an additional
``Ws`` (ndarray) rotated weight matrix for Z-space.
"""
X = []; varidX = []; obsidX = []
materials = list(Xi.keys())
for k in Xi.keys():
X_, obsidX_, varidX_ = _extract_array(Xi[k])
X.append(X_); varidX.append(varidX_); obsidX.append(obsidX_)
Y_, obsidY, varidY = _extract_array(Y)
Z_, obsidZ, varidZ = _extract_array(Z)
R_list = []; varidR = []; obsidR = []
for k in materials:
R_, obsidR_, varidR_ = _extract_array(Ri[k])
varidR.append(varidR_); obsidR.append(obsidR_); R_list.append(R_)
x_mean = []; x_std = []; jr_scale = []; r_mean = []; r_std = []
not_Xmiss = []; not_Rmiss = []; Xhat = []
TSSX = []; TSSXpv = []; TSSR = []; TSSRpv = []
X__ = []; R__ = []
for X_i, R_i in zip(X, R_list):
X_i, x_mean_, x_std_ = meancenterscale(X_i)
R_i, r_mean_, r_std_ = meancenterscale(R_i)
jr_scale_ = np.sqrt(X_i.shape[1])
X_i = X_i / jr_scale_
x_mean.append(x_mean_); x_std.append(x_std_); jr_scale.append(jr_scale_)
r_mean.append(r_mean_); r_std.append(r_std_)
not_Xmiss.append((~np.isnan(X_i)) * 1)
not_Rmiss.append((~np.isnan(R_i)) * 1)
X_i, dummy = n2z(X_i); R_i, dummy = n2z(R_i)
X__.append(X_i); R__.append(R_i)
Xhat.append(np.zeros(X_i.shape))
TSSX.append(np.sum(X_i**2)); TSSXpv.append(np.sum(X_i**2, axis=0))
TSSR.append(np.sum(R_i**2)); TSSRpv.append(np.sum(R_i**2, axis=0))
X = X__; R = R__
Y_, y_mean, y_std = meancenterscale(Y_)
not_Ymiss = (~np.isnan(Y_)) * 1
Y_, dummy = n2z(Y_)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
Z_, z_mean, z_std = meancenterscale(Z_)
not_Zmiss = (~np.isnan(Z_)) * 1
Z_, dummy = n2z(Z_)
TSSZ = np.sum(Z_**2); TSSZpv = np.sum(Z_**2, axis=0)
if not shush:
print('phi.tpls using NIPALS executed on: ' + str(datetime.datetime.now()))
epsilon = 1E-9; maxit = 2000
for a in range(A):
ui = Y_[:, [np.argmax(std(Y_))]]
Converged = False; num_it = 0
while not Converged:
hi = [_Ab_btbinv(R[i].T, ui, not_Rmiss[i].T) for i in range(len(R))]
si = [_Ab_btbinv(X[i].T, hi[i], not_Xmiss[i].T) for i in range(len(X))]
js = np.array([y for x in si for y in x])
for i in range(len(si)):
si[i] = si[i] / np.linalg.norm(js)
ri = [_Ab_btbinv(X[i], si[i], not_Xmiss[i]) for i in range(len(X))]
jr = np.array([y for x in ri for y in x]).astype(float)
R_ = np.hstack(R); not_Rmiss_ = np.hstack(not_Rmiss)
t_rx = _Ab_btbinv(R_, jr, not_Rmiss_)
wi = _Ab_btbinv(Z_.T, ui, not_Zmiss.T)
wi = wi / np.linalg.norm(wi)
t_z = _Ab_btbinv(Z_, wi, not_Zmiss)
Taux = np.hstack((t_rx, t_z))
plsobj_ = pls(Taux, Y_, 1, mcsX=False, mcsY=False, shush=True, force_nipals=True)
wt_i = plsobj_['W']; qi = plsobj_['Q']; un = plsobj_['U']; ti = plsobj_['T']
if abs((np.linalg.norm(ui) - np.linalg.norm(un))) / np.linalg.norm(ui) < epsilon:
Converged = True
if num_it > maxit:
Converged = True
if Converged:
if not shush:
print('# Iterations for LV #'+str(a+1)+': ', str(num_it))
pi = [_Ab_btbinv(R[i].T, ti, not_Rmiss[i].T) for i in range(len(R))]
vi = [_Ab_btbinv(X[i].T, ri[i], not_Xmiss[i].T) for i in range(len(X))]
pzi = _Ab_btbinv(Z_.T, ti, not_Zmiss.T)
for i in range(len(R)):
R[i] = (R[i] - ti @ pi[i].T) * not_Rmiss[i]
X[i] = (X[i] - ri[i] @ vi[i].T) * not_Xmiss[i]
Xhat[i] = Xhat[i] + ri[i] @ vi[i].T
Y_ = (Y_ - ti @ qi.T) * not_Ymiss
Z_ = (Z_ - ti @ pzi.T) * not_Zmiss
if a == 0:
T=ti; P=pi; Pz=pzi; S=si; U=un; Q=qi; H=hi; V=vi
Rscores=ri; W=wi; Wt=wt_i
r2X = []; r2Xpv = []; r2R = []; r2Rpv = []
for i in range(len(X)):
r2X.append(1 - np.sum(X[i]**2)/TSSX[i])
r2Xpv.append((1 - np.sum(X[i]**2, axis=0)/TSSXpv[i]).reshape(-1, 1))
r2R.append(1 - np.sum(R[i]**2)/TSSR[i])
r2Rpv.append((1 - np.sum(R[i]**2, axis=0)/TSSRpv[i]).reshape(-1, 1))
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv)
r2Z, r2Zpv = _calc_r2(Z_, TSSZ, TSSZpv)
else:
T = np.hstack((T, ti)); U = np.hstack((U, un)); Q = np.hstack((Q, qi))
W = np.hstack((W, wi)); Wt = np.hstack((Wt, wt_i)); Pz = np.hstack((Pz, pzi))
for i in range(len(P)): P[i] = np.hstack((P[i], pi[i]))
for i in range(len(S)): S[i] = np.hstack((S[i], si[i]))
for i in range(len(H)): H[i] = np.hstack((H[i], hi[i]))
for i in range(len(V)): V[i] = np.hstack((V[i], vi[i]))
for i in range(len(Rscores)): Rscores[i] = np.hstack((Rscores[i], ri[i]))
for i in range(len(X)):
r2X[i] = np.hstack((r2X[i], 1 - np.sum(X[i]**2)/TSSX[i]))
r2Xpv[i] = np.hstack((r2Xpv[i], (1 - np.sum(X[i]**2, axis=0)/TSSXpv[i]).reshape(-1, 1)))
r2R[i] = np.hstack((r2R[i], 1 - np.sum(R[i]**2)/TSSR[i]))
r2Rpv[i] = np.hstack((r2Rpv[i], (1 - np.sum(R[i]**2, axis=0)/TSSRpv[i]).reshape(-1, 1)))
r2Y, r2Ypv = _calc_r2(Y_, TSSY, TSSYpv, r2Y, r2Ypv)
r2Z, r2Zpv = _calc_r2(Z_, TSSZ, TSSZpv, r2Z, r2Zpv)
else:
num_it += 1; ui = un
if a == 0: numIT = num_it
else: numIT = np.hstack((numIT, num_it))
for i in range(len(Xhat)):
Xhat[i] = Xhat[i] * x_std[i] + x_mean[i]
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
r2Z, r2Zpv = _r2_cumulative_to_per_component(r2Z, r2Zpv, A)
r2xc = []; r2rc = []
for i in range(len(X)):
for a in range(A-1, 0, -1):
r2X[i][a] = r2X[i][a] - r2X[i][a-1]
r2Xpv[i][:, a] = r2Xpv[i][:, a] - r2Xpv[i][:, a-1]
r2R[i][a] = r2R[i][a] - r2R[i][a-1]
r2Rpv[i][:, a] = r2Rpv[i][:, a] - r2Rpv[i][:, a-1]
for i, r in enumerate(r2Xpv):
if i == 0: r2xpv_all = r
else: r2xpv_all = np.vstack((r2xpv_all, r))
r2xc.append(np.cumsum(r2X[i]))
r2rc.append(np.cumsum(r2R[i]))
r2yc = np.cumsum(r2Y); r2zc = np.cumsum(r2Z)
r2rc = np.mean(np.array(r2rc), axis=0)
r2xc = np.mean(np.array(r2xc), axis=0)
r2x = np.mean(np.array(r2X), axis=0)
r2r = np.mean(np.array(r2R), axis=0)
if not shush:
print('--------------------------------------------------------------')
print('LV # R2X sum(R2X) R2R sum(R2R) R2Z sum(R2Z) R2Y sum(R2Y)')
if A > 1:
for a in range(A):
print("LV #"+str(a+1)+": {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(r2x[a], r2xc[a], r2r[a], r2rc[a], r2Z[a], r2zc[a], r2Y[a], r2yc[a]))
else:
print("LV #1: {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(r2x, r2xc[0], r2r, r2rc[0], r2Z, r2zc[0], r2Y, r2yc[0]))
print('--------------------------------------------------------------')
var_t = (T.T @ T) / T.shape[0]
tpls_obj = {'T':T, 'P':P, 'Q':Q, 'U':U, 'S':S, 'H':H, 'V':V, 'Rscores':Rscores,
'r2xi':r2X, 'r2xpvi':r2Xpv, 'r2xpv':r2xpv_all,
'mx':x_mean, 'sx':x_std,
'r2y':r2Y, 'r2ypv':r2Ypv, 'my':y_mean, 'sy':y_std,
'r2ri':r2R, 'r2rpvi':r2Rpv, 'mr':r_mean, 'sr':r_std,
'r2z':r2Z, 'r2zpv':r2Zpv, 'mz':z_mean, 'sz':z_std,
'Xhat':Xhat, 'materials':materials, 'Wt':Wt, 'W':W, 'Pz':Pz, 'var_t':var_t}
if not isinstance(obsidX[0], bool):
tpls_obj['obsidXi'] = obsidX; tpls_obj['varidXi'] = varidX
varidXall = [materials[i]+':'+varidX[i][j] for i in range(len(materials)) for j in range(len(varidX[i]))]
tpls_obj['varidX'] = varidXall
if not isinstance(obsidR[0], bool):
tpls_obj['obsidRi'] = obsidR; tpls_obj['varidRi'] = varidR
if not isinstance(obsidY, bool):
tpls_obj['obsidY'] = obsidY; tpls_obj['varidY'] = varidY
if not isinstance(obsidZ, bool):
tpls_obj['obsidZ'] = obsidZ; tpls_obj['varidZ'] = varidZ
T2 = hott2(tpls_obj, Tnew=T)
n = T.shape[0]
T2_lim99 = (((n-1)*(n+1)*A)/(n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A)/(n*(n-A))) * f95(A, (n-A))
speX = []; speR = []; speX_lim95 = []; speX_lim99 = []; speR_lim95 = []; speR_lim99 = []
for i in range(len(X)):
speX.append(np.sum(X[i]**2, axis=1, keepdims=1))
s95, s99 = spe_ci(np.sum(X[i]**2, axis=1, keepdims=1))
speX_lim95.append(s95); speX_lim99.append(s99)
speR.append(np.sum(R[i]**2, axis=1, keepdims=1))
s95, s99 = spe_ci(np.sum(R[i]**2, axis=1, keepdims=1))
speR_lim95.append(s95); speR_lim99.append(s99)
speY = np.sum(Y_**2, axis=1, keepdims=1); speY_lim95, speY_lim99 = spe_ci(speY)
speZ = np.sum(Z_**2, axis=1, keepdims=1); speZ_lim95, speZ_lim99 = spe_ci(speZ)
tpls_obj.update({'T2':T2, 'T2_lim99':T2_lim99, 'T2_lim95':T2_lim95,
'speX':speX, 'speX_lim99':speX_lim99, 'speX_lim95':speX_lim95,
'speY':speY, 'speY_lim99':speY_lim99, 'speY_lim95':speY_lim95,
'speR':speR, 'speR_lim99':speR_lim99, 'speR_lim95':speR_lim95,
'speZ':speZ, 'speZ_lim99':speZ_lim99, 'speZ_lim95':speZ_lim95})
Wsi = []; Ws = []
for i in range(len(S)):
Wsi.append(S[i] @ np.linalg.pinv(V[i].T @ S[i]))
if i == 0: Ws = S[i] @ np.linalg.pinv(V[i].T @ S[i])
else: Ws = np.vstack((Ws, S[i] @ np.linalg.pinv(V[i].T @ S[i])))
tpls_obj['Ssi'] = Wsi; tpls_obj['Ss'] = Ws
tpls_obj['type'] = 'tpls'
tpls_obj['Ws'] = W @ np.linalg.pinv(Pz.T @ W)
return tpls_obj
[docs]
def jypls(Xi, Yi, A, *, shush=False):
"""Fit a Joint-Y PLS (JYPLS) model across multiple campaigns.
Each campaign has its own X block (different variables allowed), but all
campaigns share a common Y column space and a jointly estimated Q matrix.
Per Garcia-Munoz, MacGregor, Kourti, Chemom. Intell. Lab. Syst. 79
(2005) 101–114.
Args:
Xi (dict): Predictor blocks ``{'campaign_name': pd.DataFrame}``.
Each X can have a different number of columns.
First column of each DataFrame is observation IDs.
Yi (dict): Response blocks ``{'campaign_name': pd.DataFrame}``.
Keys must match ``Xi``. All Y blocks must have identical columns
(same Y variable space across campaigns).
First column of each DataFrame is observation IDs.
A (int): Number of latent variables.
shush (bool): Suppress printed output. Default ``False``.
Returns:
dict: Fitted JYPLS model with keys:
- ``Q`` (ndarray): Shared Y-loadings (n_y × A).
- ``T`` (dict): Per-campaign X-scores.
- ``P`` (dict): Per-campaign X-loadings.
- ``W`` (dict): Per-campaign X-weights.
- ``Ws`` (dict): Per-campaign rotated weights W*(P'W)⁻¹.
- ``r2xi`` (dict): Per-campaign R² for X.
- ``r2yi`` (dict): Per-campaign R² for Y.
- ``r2y`` (float): Overall R² for Y.
- ``mx``, ``sx`` (dict): Per-campaign X preprocessing params.
- ``my``, ``sy`` (ndarray): Shared Y preprocessing params.
- ``blk_scale`` (dict): Per-campaign block scaling factors.
- ``var_t`` (ndarray): Pooled score covariance matrix.
- ``campaigns`` (list): Ordered list of campaign names.
"""
campaigns = list(Xi.keys())
# --- Validate keys match ---
if set(Xi.keys()) != set(Yi.keys()):
raise ValueError(
f"Xi and Yi must have the same campaign keys. "
f"Xi keys: {list(Xi.keys())}, Yi keys: {list(Yi.keys())}")
# --- Validate and reconcile per campaign, check Y columns ---
y_col_ref = None
y_col_ref_name = None
for k in campaigns:
Xi[k], Yi[k] = _validate_inputs(Xi[k], Yi[k])
if isinstance(Yi[k], pd.DataFrame):
ycols = Yi[k].columns[1:].tolist()
if y_col_ref is None:
y_col_ref = ycols
y_col_ref_name = k
elif ycols != y_col_ref:
raise ValueError(
f"All Y blocks must have the same columns (Joint-Y space). "
f"Campaign '{k}' has columns {ycols[:5]}... "
f"but campaign '{y_col_ref_name}' has {y_col_ref[:5]}...")
# --- Extract arrays ---
X = []; Y = []; varidX = []; obsidX = []; varidY = False; obsidY = []
for k in campaigns:
x_, ox, vx = _extract_array(Xi[k])
y_, oy, vy = _extract_array(Yi[k])
X.append(x_); Y.append(y_)
varidX.append(vx); obsidX.append(ox)
obsidY.append(oy)
if vy is not False:
varidY = vy # same for all campaigns
# --- Compute joint Y mean and std from stacked Y ---
Y_stacked = np.vstack(Y)
y_mean = mean(Y_stacked)
y_std = std(Y_stacked)
# --- Scale each block ---
x_mean = []; x_std = []; blk_scale = []
not_Xmiss = []; not_Ymiss = []
TSSX = []; TSSXpv = []
TSSY_i = []; TSSYpv_i = []
for i in range(len(campaigns)):
# X_i: mean center, autoscale, block scale
X[i], xm, xs = meancenterscale(X[i])
bs = np.sqrt(X[i].shape[1])
X[i] = X[i] / bs
x_mean.append(xm); x_std.append(xs); blk_scale.append(bs)
not_Xmiss.append((~np.isnan(X[i])) * 1)
X[i], _ = n2z(X[i])
TSSX.append(np.sum(X[i]**2))
TSSXpv.append(np.sum(X[i]**2, axis=0))
# Y_i: scale with joint mean/std
Y[i] = (Y[i] - y_mean) / y_std
not_Ymiss.append((~np.isnan(Y[i])) * 1)
Y[i], _ = n2z(Y[i])
TSSY_i.append(np.sum(Y[i]**2))
TSSYpv_i.append(np.sum(Y[i]**2, axis=0))
# Joint Y TSS from scaled stacked Y
Y_stacked_mcs = np.vstack(Y)
TSSY = np.sum(Y_stacked_mcs**2)
TSSYpv = np.sum(Y_stacked_mcs**2, axis=0)
if not shush:
print('phi.jypls using NIPALS executed on: ' + str(datetime.datetime.now()))
# --- NIPALS ---
epsilon = 1E-9; maxit = 2000
for a in range(A):
# Initialize u_i from column with max variance in stacked Y
Y_stack_curr = np.vstack(Y)
max_col = np.argmax(std(Y_stack_curr))
ui = [Y[i][:, [max_col]] for i in range(len(campaigns))]
Converged = False; num_it = 0
while not Converged:
# Step 1: w_i = X_i' u_i / (u_i' u_i)
wi = []
for i in range(len(campaigns)):
w_ = _Ab_btbinv(X[i].T, ui[i], not_Xmiss[i].T)
w_ = w_ / np.linalg.norm(w_)
wi.append(w_)
# Step 2: t_i = X_i w_i / (w_i' w_i)
ti = []
for i in range(len(campaigns)):
t_ = _Ab_btbinv(X[i], wi[i], not_Xmiss[i])
ti.append(t_)
# Step 3: Joint q from stacked t and Y
t_stacked = np.vstack(ti)
Y_stack_curr = np.vstack(Y)
not_Ymiss_stacked = np.vstack(not_Ymiss)
qi = _Ab_btbinv(Y_stack_curr.T, t_stacked, not_Ymiss_stacked.T)
# Step 4: u_i = Y_i q / (q' q)
un = []
for i in range(len(campaigns)):
u_ = _Ab_btbinv(Y[i], qi, not_Ymiss[i])
un.append(u_)
# Check convergence across all campaigns
conv = all(
abs(np.linalg.norm(ui[i]) - np.linalg.norm(un[i])) / max(np.linalg.norm(ui[i]), 1e-16) < epsilon
for i in range(len(campaigns)))
if conv or num_it > maxit:
Converged = True
if Converged:
if not shush:
print('# Iterations for LV #'+str(a+1)+': ', str(num_it))
# Sign convention: ensure var(t>0) > var(t<0) on stacked scores
t_stacked = np.vstack(ti)
if (len(t_stacked[t_stacked < 0]) > 0) and (len(t_stacked[t_stacked > 0]) > 0):
if np.var(t_stacked[t_stacked < 0]) > np.var(t_stacked[t_stacked >= 0]):
ti = [-t for t in ti]
wi = [-w for w in wi]
un = [-u for u in un]
qi = -qi
# Compute p_i for deflation: p_i = X_i' t_i / (t_i' t_i)
pi = []
for i in range(len(campaigns)):
p_ = _Ab_btbinv(X[i].T, ti[i], not_Xmiss[i].T)
pi.append(p_)
# Deflate all blocks
for i in range(len(campaigns)):
X[i] = (X[i] - ti[i] @ pi[i].T) * not_Xmiss[i]
Y[i] = (Y[i] - ti[i] @ qi.T) * not_Ymiss[i]
# Store
if a == 0:
T_blk = [[t] for t in ti]
P_blk = [[p] for p in pi]
W_blk = [[w] for w in wi]
U_blk = [[u] for u in un]
Q = qi
r2X = []; r2Xpv = []; r2Yi = []
for i in range(len(campaigns)):
r2X.append(1 - np.sum(X[i]**2) / TSSX[i])
r2Xpv.append((1 - np.sum(X[i]**2, axis=0) / TSSXpv[i]).reshape(-1, 1))
r2Yi.append(1 - np.sum(Y[i]**2) / TSSY_i[i])
Y_stack_res = np.vstack(Y)
r2Y, r2Ypv = _calc_r2(Y_stack_res, TSSY, TSSYpv)
else:
for i in range(len(campaigns)):
T_blk[i].append(ti[i])
P_blk[i].append(pi[i])
W_blk[i].append(wi[i])
U_blk[i].append(un[i])
Q = np.hstack((Q, qi))
for i in range(len(campaigns)):
r2X[i] = np.hstack((r2X[i], 1 - np.sum(X[i]**2) / TSSX[i]))
r2Xpv[i] = np.hstack((r2Xpv[i], (1 - np.sum(X[i]**2, axis=0) / TSSXpv[i]).reshape(-1, 1)))
r2Yi[i] = np.hstack((r2Yi[i], 1 - np.sum(Y[i]**2) / TSSY_i[i]))
Y_stack_res = np.vstack(Y)
r2Y, r2Ypv = _calc_r2(Y_stack_res, TSSY, TSSYpv, r2Y, r2Ypv)
else:
num_it += 1
ui = un
# --- Assemble per-campaign matrices ---
T = [np.hstack(t_list) for t_list in T_blk]
P = [np.hstack(p_list) for p_list in P_blk]
W = [np.hstack(w_list) for w_list in W_blk]
U = [np.hstack(u_list) for u_list in U_blk]
# Ws per campaign
Ws = []
for i in range(len(campaigns)):
Ws_i = W[i] @ np.linalg.pinv(P[i].T @ W[i])
Ws_i[:, 0] = W[i][:, 0]
Ws.append(Ws_i)
# --- R2 per component ---
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
for i in range(len(campaigns)):
for aa in range(A-1, 0, -1):
r2X[i][aa] = r2X[i][aa] - r2X[i][aa-1]
r2Xpv[i][:, aa] = r2Xpv[i][:, aa] - r2Xpv[i][:, aa-1]
r2Yi[i][aa] = r2Yi[i][aa] - r2Yi[i][aa-1]
# Stacked T for diagnostics
T_stacked = np.vstack(T)
var_t = (T_stacked.T @ T_stacked) / T_stacked.shape[0]
eigs = np.var(T_stacked, axis=0)
# Aggregate R2X across campaigns
r2x_avg = np.mean(np.array(r2X), axis=0)
r2xc = np.cumsum(r2x_avg)
r2yc = np.cumsum(r2Y)
if not shush:
print('--------------------------------------------------------------')
print('LV # Eig R2X(avg) sum(R2X) R2Y sum(R2Y)')
if A > 1:
for aa in range(A):
print("LV #"+str(aa+1)+": {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(
eigs[aa], r2x_avg[aa], r2xc[aa], r2Y[aa], r2yc[aa]))
else:
print("LV #1: {:6.3f} {:.3f} {:.3f} {:.3f} {:.3f}".format(
eigs[0], r2x_avg, r2xc[0], r2Y, r2yc[0]))
print('')
print('R2Y and R2X per campaign:')
for i, k in enumerate(campaigns):
r2xi_c = np.cumsum(r2X[i]) if A > 1 else r2X[i]
r2yi_c = np.cumsum(r2Yi[i]) if A > 1 else r2Yi[i]
r2xi_t = r2xi_c[-1] if A > 1 else r2xi_c
r2yi_t = r2yi_c[-1] if A > 1 else r2yi_c
print(" {}: R2X={:.3f} R2Y={:.3f}".format(k, r2xi_t, r2yi_t))
print('--------------------------------------------------------------')
# --- Diagnostics ---
T2 = hott2({'T': T_stacked, 'var_t': var_t}, Tnew=T_stacked)
n = T_stacked.shape[0]
T2_lim99 = (((n-1)*(n+1)*A) / (n*(n-A))) * f99(A, (n-A))
T2_lim95 = (((n-1)*(n+1)*A) / (n*(n-A))) * f95(A, (n-A))
speX = []; speX_lim95 = []; speX_lim99 = []
speY = []; speY_lim95 = []; speY_lim99 = []
for i in range(len(campaigns)):
sx_ = np.sum(X[i]**2, axis=1, keepdims=1)
s95, s99 = spe_ci(sx_)
speX.append(sx_); speX_lim95.append(s95); speX_lim99.append(s99)
sy_ = np.sum(Y[i]**2, axis=1, keepdims=1)
s95, s99 = spe_ci(sy_)
speY.append(sy_); speY_lim95.append(s95); speY_lim99.append(s99)
# Build varidX aggregated list
varidXall = []
for i, k in enumerate(campaigns):
if varidX[i] is not False:
varidXall.append([k + ':' + v for v in varidX[i]])
else:
varidXall.append([k + ':Var' + str(j) for j in range(X[i].shape[1] if isinstance(X[i], np.ndarray) else 0)])
jypls_obj = {
'T': T, 'P': P, 'Q': Q, 'W': W, 'Ws': Ws, 'U': U,
'r2xi': r2X, 'r2xpvi': r2Xpv,
'r2y': r2Y, 'r2ypv': r2Ypv, 'r2yi': r2Yi,
'mx': x_mean, 'sx': x_std, 'blk_scale': blk_scale,
'my': y_mean, 'sy': y_std,
'campaigns': campaigns, 'var_t': var_t,
'varidXi': varidX, 'varidXall': varidXall,
'obsidXi': obsidX, 'obsidYi': obsidY,
'T2': T2, 'T2_lim99': T2_lim99, 'T2_lim95': T2_lim95,
'speX': speX, 'speX_lim99': speX_lim99, 'speX_lim95': speX_lim95,
'speY': speY, 'speY_lim99': speY_lim99, 'speY_lim95': speY_lim95,
'type': 'jypls'
}
if varidY is not False:
jypls_obj['varidY'] = varidY
return jypls_obj
[docs]
def jypls_pred(xnew, campaign, jypls_obj):
"""Predict Y for a new observation using a fitted JYPLS model.
Args:
xnew (pd.DataFrame or np.ndarray): New X observation(s). Variables
must match those of the specified campaign.
campaign (str): Campaign name this observation belongs to.
Must match a key used when building the model with :func:`jypls`.
jypls_obj (dict): Fitted JYPLS model from :func:`jypls`.
Returns:
dict: Prediction results with keys:
- ``Tnew`` (ndarray): Projected X-scores (n_new × A).
- ``Yhat`` (ndarray): Predicted Y in original scale (n_new × n_y).
- ``speX`` (ndarray): X-space SPE for each new observation.
- ``T2`` (ndarray): Hotelling's T² using pooled score covariance.
"""
if campaign not in jypls_obj['campaigns']:
raise ValueError(
f"Campaign '{campaign}' not found in model. "
f"Available campaigns: {jypls_obj['campaigns']}")
idx = jypls_obj['campaigns'].index(campaign)
# Extract array
if isinstance(xnew, pd.DataFrame):
X_ = np.array(xnew.values[:, 1:]).astype(float)
elif isinstance(xnew, np.ndarray):
X_ = xnew.copy()
if X_.ndim == 1:
X_ = X_.reshape(1, -1)
# Validate dimensions
expected_cols = jypls_obj['mx'][idx].shape[1] if jypls_obj['mx'][idx].ndim == 2 else len(jypls_obj['mx'][idx])
if X_.shape[1] != expected_cols:
raise ValueError(
f"xnew has {X_.shape[1]} variables but campaign '{campaign}' "
f"expects {expected_cols} variables.")
# Scale: mean center, autoscale, block scale (same as training)
X_mcs = (X_ - jypls_obj['mx'][idx]) / jypls_obj['sx'][idx]
X_mcs = X_mcs / jypls_obj['blk_scale'][idx]
# Handle missing data
X_nan_map = np.isnan(X_mcs)
if not X_nan_map.any():
# Complete data: use Ws for direct projection
tnew = X_mcs @ jypls_obj['Ws'][idx]
xhat = tnew @ jypls_obj['P'][idx].T
speX = np.sum((X_mcs - xhat)**2, axis=1, keepdims=True)
else:
# Missing data: deflation-based projection using W
not_Xmiss = (~X_nan_map) * 1
X_mcs, _ = n2z(X_mcs)
for i in range(X_mcs.shape[0]):
row_map = not_Xmiss[[i], :]
tempW = jypls_obj['W'][idx] * row_map.T
for aa in range(jypls_obj['Q'].shape[1]):
WTW = tempW[:, [aa]].T @ tempW[:, [aa]]
tnew_aux, _, _, _ = np.linalg.lstsq(WTW, tempW[:, [aa]].T @ X_mcs[[i], :].T, rcond=None)
X_mcs[[i], :] = (X_mcs[[i], :] - tnew_aux @ jypls_obj['P'][idx][:, [aa]].T) * row_map
if aa == 0: tnew_ = tnew_aux
else: tnew_ = np.vstack((tnew_, tnew_aux))
if i == 0: tnew = tnew_.T
else: tnew = np.vstack((tnew, tnew_.T))
xhat = tnew @ jypls_obj['P'][idx].T
speX = np.sum((X_mcs - xhat)**2 * not_Xmiss, axis=1, keepdims=True)
# Predict Y using shared Q, unscale
yhat = tnew @ jypls_obj['Q'].T * jypls_obj['sy'] + jypls_obj['my']
# T2 using pooled var_t
var_t_inv = np.linalg.inv(jypls_obj['var_t'])
T2 = np.sum((tnew @ var_t_inv) * tnew, axis=1)
return {'Tnew': tnew, 'Yhat': yhat, 'speX': speX, 'T2': T2}
[docs]
def tpls_pred(rnew, znew, tplsobj):
"""Predict Y for new observations using a fitted TPLS model.
Args:
rnew (np.ndarray or pd.DataFrame): New R-space (raw material) data.
znew (np.ndarray or pd.DataFrame): New Z-space (trajectory) data.
tpls_obj (dict): Fitted TPLS model from :func:`tpls`.
Returns:
dict: Prediction results with keys:
- ``Tnew`` (ndarray): Projected scores.
- ``Yhat`` (ndarray): Predicted Y in original scale.
- ``speR`` (ndarray): R-space SPE.
- ``speZ`` (ndarray): Z-space SPE.
- ``T2`` (ndarray): Hotelling's T².
Example for rnew:
rnew={
'MAT1': [('A0129',0.557949425 ),('A0130',0.442050575 )],
'MAT2': [('Lac0003',1)],
'MAT3': [('TLC018', 1) ],
'MAT4': [('M0012', 1) ],
'MAT5':[('CS0017', 1) ]
}
"""
ok = True
if isinstance(rnew, list):
i = 0
for r, mr, sr in zip(rnew, tplsobj['mr'], tplsobj['sr']):
if not(len(r) == len(mr[0])):
ok = False
if i == 0:
rnew_ = r; mr_ = mr; sr_ = sr
Rscores = tplsobj['Rscores'][i]
P = tplsobj['P'][i]
else:
rnew_ = np.hstack((rnew_, r))
mr_ = np.hstack((mr_, mr)); sr_ = np.hstack((sr_, sr))
Rscores = np.vstack((Rscores, tplsobj['Rscores'][i]))
P = np.vstack((P, tplsobj['P'][i]))
i += 1
elif isinstance(rnew, dict):
rnew_ = [['*']] * len(tplsobj['materials'])
for k in list(rnew.keys()):
i = tplsobj['materials'].index(k)
ri = np.zeros((tplsobj['mr'][i].shape[1]))
for m, r in rnew[k]:
e = tplsobj['varidRi'][i].index(m)
ri[e] = r
rnew_[i] = ri
return tpls_pred(rnew_, znew, tplsobj)
if isinstance(znew, pd.DataFrame):
znew_ = znew.values.reshape(-1)[1:].astype(float)
elif isinstance(znew, list):
znew_ = np.array(znew)
elif isinstance(znew, np.ndarray):
znew_ = znew.copy()
if not(len(znew_) == tplsobj['mz'].shape[1]):
ok = False
if ok:
bkzeros = 0; selmat = []
for i, r in enumerate(tplsobj['Rscores']):
frontzeros = Rscores.shape[0] - bkzeros - r.shape[0]
row = np.vstack((np.zeros((bkzeros, 1)), np.ones((r.shape[0], 1)), np.zeros((frontzeros, 1))))
bkzeros += r.shape[0]; selmat.append(row)
rnew_ = ((rnew_ - mr_) / sr_).reshape(-1, 1)
znew_ = ((znew_ - tplsobj['mz']) / tplsobj['sz']).reshape(-1, 1)
tnew = []
for a in np.arange(tplsobj['T'].shape[1]):
ti_rx_ = (rnew_.T @ Rscores[:, a] / (Rscores[:, a].T @ Rscores[:, a]))
ti_z_ = znew_.T @ tplsobj['W'][:, a]
ti_ = np.array([ti_rx_, ti_z_]).reshape(1, -1) @ tplsobj['Wt'][:, a]
tnew.append(ti_[0])
rnew_ = rnew_ - (ti_ * P[:, a]).reshape(-1, 1)
znew_ = znew_ - (ti_ * tplsobj['Pz'][:, a]).reshape(-1, 1)
sper = [np.sum(rnew_[row == 1]**2) for row in selmat]
spez = np.sum(znew_**2)
tnew = np.array(tnew)
yhat = (tnew @ tplsobj['Q'].T) * tplsobj['sy'] + tplsobj['my']
return {'Tnew':tnew, 'Yhat':yhat, 'speR':sper, 'speZ':spez}
else:
return 'dimensions of rnew or znew did not match model'
# =============================================================================
# Varimax rotation
# =============================================================================
[docs]
def varimax_(X, gamma=1.0, q=20, tol=1e-6):
p, k = X.shape
R = eye(k); d = 0
for i in range(q):
d_ = d
Lambda = dot(X, R)
u, s, vh = svd(dot(X.T, asarray(Lambda)**3 - (gamma/p) * dot(Lambda, diag(diag(dot(Lambda.T, Lambda))))))
R = dot(u, vh)
d = sum(s)
if d_ != 0 and d/d_ < 1 + tol: break
return dot(X, R)
[docs]
def varimax_rotation(mvm_obj, X, *, Y=False):
"""Apply Varimax rotation to PCA or PLS loadings.
Rotates loadings toward a simple structure (sparse, interpretable).
Updates the model object in-place and returns the rotated model.
Args:
mvm_obj (dict): Fitted PCA or PLS model.
X (pd.DataFrame or np.ndarray): Training X data used to reproject
scores after rotation.
Y (pd.DataFrame or np.ndarray): Training Y data (optional, for PLS).
Returns:
dict: Model with rotated loadings and reprojected scores.
"""
mvmobj = mvm_obj.copy()
if isinstance(X, pd.DataFrame): X_ = X.values[:,1:].astype(float)
else: X_ = X.copy()
if isinstance(Y, pd.DataFrame): Y_ = Y.values[:,1:].astype(float)
elif isinstance(Y, np.ndarray): Y_ = Y.copy()
X_ = (X_ - mvmobj['mx']) / mvmobj['sx']
not_Xmiss = (~np.isnan(X_)) * 1
X_, Xmap = n2z(X_)
TSSX = np.sum(X_**2); TSSXpv = np.sum(X_**2, axis=0)
if not isinstance(Y, bool):
Y_ = (Y_ - mvmobj['my']) / mvmobj['sy']
not_Ymiss = (~np.isnan(Y_)) * 1
Y_, Ymap = n2z(Y_)
TSSY = np.sum(Y_**2); TSSYpv = np.sum(Y_**2, axis=0)
A = mvmobj['T'].shape[1]
if 'Q' in mvmobj:
Wrot = varimax_(mvmobj['W'])
Trot=[]; Prot=[]; Qrot=[]; Urot=[]
r2X = None; r2Xpv = None; r2Y = None; r2Ypv = None
for a in np.arange(A):
ti = _Ab_btbinv(X_, Wrot[:,a], not_Xmiss)
pi = _Ab_btbinv(X_.T, ti, not_Xmiss.T)
qi = _Ab_btbinv(Y_.T, ti, not_Ymiss.T)
ui = _Ab_btbinv(Y_, qi, not_Ymiss)
X_ = (X_ - ti @ pi.T) * not_Xmiss
Y_ = (Y_ - ti @ qi.T) * not_Ymiss
r2Xpv_new = np.zeros(len(TSSXpv))
r2Xpv_new[TSSXpv>0] = 1 - (np.sum(X_**2, axis=0)[TSSXpv>0] / TSSXpv[TSSXpv>0])
r2X_new = 1 - np.sum(X_**2) / TSSX
r2Ypv_new = np.zeros(len(TSSYpv))
r2Ypv_new[TSSYpv>0] = 1 - (np.sum(Y_**2, axis=0)[TSSYpv>0] / TSSYpv[TSSYpv>0])
r2Y_new = 1 - np.sum(Y_**2) / TSSY
if r2X is None:
r2X = r2X_new; r2Xpv = r2Xpv_new.reshape(-1,1)
r2Y = r2Y_new; r2Ypv = r2Ypv_new.reshape(-1,1)
else:
r2X = np.hstack((r2X, r2X_new)); r2Xpv = np.hstack((r2Xpv, r2Xpv_new.reshape(-1,1)))
r2Y = np.hstack((r2Y, r2Y_new)); r2Ypv = np.hstack((r2Ypv, r2Ypv_new.reshape(-1,1)))
Trot.append(ti); Prot.append(pi); Qrot.append(qi); Urot.append(ui)
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
r2Y, r2Ypv = _r2_cumulative_to_per_component(r2Y, r2Ypv, A)
Trot = np.array(Trot).T[0]; Prot = np.array(Prot).T[0]
Qrot = np.array(Qrot).T[0]; Urot = np.array(Urot).T[0]
Wsrot = Wrot @ np.linalg.pinv(Prot.T @ Wrot)
mvmobj.update({'W':Wrot, 'T':Trot, 'P':Prot, 'Q':Qrot, 'U':Urot, 'Ws':Wsrot,
'r2x':r2X, 'r2xpv':r2Xpv, 'r2y':r2Y, 'r2ypv':r2Ypv})
else:
Prot = varimax_(mvmobj['P'])
Trot = []; r2X = None; r2Xpv = None
for a in np.arange(A):
ti = _Ab_btbinv(X_, Prot[:,a], not_Xmiss)
Trot.append(ti)
X_ = (X_ - ti @ Prot[:,[a]].T) * not_Xmiss
r2Xpv_new = np.zeros(len(TSSXpv))
r2Xpv_new[TSSXpv>0] = 1 - (np.sum(X_**2, axis=0)[TSSXpv>0] / TSSXpv[TSSXpv>0])
r2X_new = 1 - np.sum(X_**2) / TSSX
if r2X is None:
r2X = r2X_new; r2Xpv = r2Xpv_new.reshape(-1,1)
else:
r2X = np.hstack((r2X, r2X_new)); r2Xpv = np.hstack((r2Xpv, r2Xpv_new.reshape(-1,1)))
r2X, r2Xpv = _r2_cumulative_to_per_component(r2X, r2Xpv, A)
mvmobj.update({'P':Prot, 'T':np.array(Trot).T[0], 'r2x':r2X, 'r2xpv':r2Xpv})
return mvmobj
# =============================================================================
# Build polynomial
# =============================================================================
[docs]
def findstr(string):
"""Find indices of strings containing a given pattern.
Args:
str_list (list of str): List of strings to search.
pattern (str): Substring to search for.
Returns:
list: Indices of elements in ``str_list`` that contain ``pattern``.
"""
return [i for i, s in enumerate(string) if s == '*' or s == '/']
[docs]
def evalvar(data, vname):
if vname.find('^') > 0:
actual_vname = vname[:vname.find('^')].strip()
if actual_vname in data.columns[1:]:
power = float(vname[vname.find('^')+1:])
return data[actual_vname].values.reshape(-1, 1)**power
return False
else:
actual_vname = vname.strip()
if actual_vname in data.columns[1:]:
return data[actual_vname].values.reshape(-1, 1)
return False
[docs]
def writeeq(beta_, features_):
eq_str = []
for b, f, i in zip(beta_, features_, np.arange(len(features_))):
if f == 'Bias':
eq_str.append(str(b) if b < 0 else ' + '+str(b))
else:
eq_str.append((str(b) if b < 0 or i == 0 else ' + '+str(b)) + ' * '+f)
return ''.join(eq_str)
[docs]
def build_polynomial(data, factors, response, *, bias_term=True):
'''Linear regression with variable selection assisted by PLS.'''
for j, f in enumerate(factors):
if f.find('*') > 0 or f.find('/') > 0:
indx = findstr(f)
for ii, i in enumerate(indx):
if ii == 0:
vname1 = f[0:i]
vname2 = f[i+1:indx[1]] if len(indx) > 1 else f[i+1:]
vals1 = evalvar(data, vname1); vals2 = evalvar(data, vname2)
xcol = vals1 * vals2 if f[i] == '*' else vals1 / vals2
else:
vname = f[i+1:] if len(indx) == ii+1 else f[i+1:indx[ii+1]]
vals = evalvar(data, vname)
xcol = xcol * vals if f[i] == '*' else xcol / vals
X = xcol if j == 0 else np.hstack((X, xcol))
else:
temp = evalvar(data, f)
X = temp if j == 0 else np.hstack((X, temp))
print('Built X from factors')
X_df = pd.DataFrame(X, columns=factors)
X_df.insert(0, data.columns[0], data[data.columns[0]].values)
Y_df = data[[data.columns[0], response]]
Y_arr = data[response].values
pls_obj = pls(X_df, Y_df, len(factors))
Ypred = pls_pred(X_df, pls_obj)['Yhat']
RMSE = [np.sqrt(np.mean((Y_df.values[:,1:].astype(float) - Ypred)**2))]
vip = np.sum(np.abs(pls_obj['Ws'] * pls_obj['r2y']), axis=1).ravel()
sort_indx = np.argsort(-vip)
sort_asc_indx = np.argsort(vip)
vip_sorted = vip[sort_indx]
sorted_factors = [factors[i] for i in sort_indx]
plt.figure()
plt.bar(np.arange(len(sorted_factors)), vip_sorted)
plt.xticks(np.arange(len(sorted_factors)), labels=sorted_factors, rotation=60)
plt.ylabel('VIP'); plt.xlabel('Factors'); plt.tight_layout()
sorted_asc_factors = [factors[i] for i in sort_asc_indx]
X_df_m = X_df.copy()
for f in sorted_asc_factors[:-1]:
X_df_m.drop(f, axis=1, inplace=True)
pls_obj_m = pls(X_df_m, Y_df, X_df_m.shape[1]-1, shush=True)
Ypred = pls_pred(X_df_m, pls_obj_m)['Yhat']
RMSE.append(np.sqrt(np.mean((Y_df.values[:,1:].astype(float) - Ypred)**2)))
sorted_asc_labels = ['Full'] + [factors[i] for i in sort_asc_indx[:-1]]
plt.figure()
plt.bar(np.arange(len(factors)), RMSE)
plt.xticks(np.arange(len(factors)), labels=sorted_asc_labels, rotation=60)
plt.ylabel('RMSE ('+response+')'); plt.xlabel('Factors removed from model'); plt.tight_layout()
Xaug = np.hstack((X, np.ones((X.shape[0], 1))))
factors_out = factors.copy(); factors_out.append('Bias')
betasOLSlssq, r1, r2, r3 = np.linalg.lstsq(Xaug, Y_arr, rcond=None)
eqstr = writeeq(betasOLSlssq, factors_out)
return betasOLSlssq, factors_out, Xaug, Y_arr, eqstr
# =============================================================================
# CCA
# =============================================================================
[docs]
def cca(X, Y, tol=1e-6, max_iter=1000):
"""Canonical Correlation Analysis (CCA) between PLS scores and Y.
Computes the maximum covariance directions between the score matrix T
and response Y. Equivalent to computing the predictive component in OPLS.
Args:
T (np.ndarray): Score matrix from a fitted PLS model (n_obs × A).
Y (pd.DataFrame or np.ndarray): Response matrix (n_obs × n_y).
mcs (tuple): Preprocessing flags for T and Y. Default
``('autoscale', 'autoscale')``.
Returns:
dict: CCA results with keys:
- ``Tcv`` (ndarray): Covariant scores.
- ``Pcv`` (ndarray): Covariant loadings (predictive loadings in OPLS sense).
- ``Wcv`` (ndarray): Covariant weights.
"""
X = X - np.mean(X, axis=0); Y = Y - np.mean(Y, axis=0)
Sigma_XX = X.T @ X; Sigma_YY = Y.T @ Y; Sigma_XY = X.T @ Y
w_x = np.random.rand(X.shape[1]); w_y = np.random.rand(Y.shape[1])
w_x /= np.linalg.norm(w_x); w_y /= np.linalg.norm(w_y)
for iteration in range(max_iter):
w_x_old = w_x.copy(); w_y_old = w_y.copy()
w_x = np.linalg.solve(Sigma_XX, Sigma_XY @ w_y); w_x /= np.linalg.norm(w_x)
w_y = np.linalg.solve(Sigma_YY, Sigma_XY.T @ w_x); w_y /= np.linalg.norm(w_y)
correlation = w_x.T @ Sigma_XY @ w_y
if np.linalg.norm(w_x - w_x_old) < tol and np.linalg.norm(w_y - w_y_old) < tol:
break
return correlation, w_x, w_y
[docs]
def cca_multi(X, Y, num_components=1, tol=1e-6, max_iter=1000):
"""CCA with multiple canonical variates."""
X = X - np.mean(X, axis=0); Y = Y - np.mean(Y, axis=0)
correlations = []; W_X = []; W_Y = []
for component in range(num_components):
Sigma_XX = X.T @ X; Sigma_YY = Y.T @ Y; Sigma_XY = X.T @ Y
w_x = np.random.rand(X.shape[1]); w_y = np.random.rand(Y.shape[1])
w_x /= np.linalg.norm(w_x); w_y /= np.linalg.norm(w_y)
for iteration in range(max_iter):
w_x_old = w_x.copy(); w_y_old = w_y.copy()
w_x = np.linalg.solve(Sigma_XX, Sigma_XY @ w_y); w_x /= np.linalg.norm(w_x)
w_y = np.linalg.solve(Sigma_YY, Sigma_XY.T @ w_x); w_y /= np.linalg.norm(w_y)
correlation = w_x.T @ Sigma_XY @ w_y
if np.linalg.norm(w_x - w_x_old) < tol and np.linalg.norm(w_y - w_y_old) < tol:
break
correlations.append(correlation); W_X.append(w_x); W_Y.append(w_y)
X -= (X @ w_x[:, np.newaxis]) @ w_x[np.newaxis, :]
Y -= (Y @ w_y[:, np.newaxis]) @ w_y[np.newaxis, :]
return {'correlations': np.array(correlations), 'W_X': np.array(W_X).T, 'W_Y': np.array(W_Y).T}