# -*- coding: utf-8 -*-
"""
Created on Mon Apr 11 14:58:35 2022
Batch data is assumed to come in an excel file
with first column being batch identifier and following columns
being process variables.
Optionally the second column labeled 'PHASE','Phase' or 'phase' indicating
the phase of exceution
Change log:
* added Dec 28 2023 Titles can be sent to contribution plots via plot_title flag
Monitoring diagnostics are also plotted against sample starting with 1
* added Dec 27 2023 Corrected plots to use correct xaxis starting with sample =1
Ammended indicator variable alignment not to replace the IV with a linear sequence but to keep orginal data
* added Dec 4 2023 Added a BatchVIP calculation
* added Apr 23 2023 Corrected a very dumb mistake I made coding when tired
* added Apr 18 2023 Added descriptors routine to obtain landmarks of the batch
such as min,max,ave of a variable [during a phase if indicated so]
Modifed plot_var_all_batches to plot against the values in a Time column
and also add the legend for the BatchID
* added Apr 10 2023 Added batch contribution plots
Added build_rel_time to create a tag of relative
run time from a timestamp
* added Apr 7 2023 Added alignment using indicator variable per phase
* added Apr 5 2023 Added the capability to monitor a variable in "Soft Sensor" mode
which implies there are no measurements for it (pure prediction)
as oppose to a forecast where there are new measurments coming in time.
* added Jul 20 2022 Distribution of number of samples per phase plot
* added Aug 10 2022 refold_horizontal | clean_empty_rows | predict
* added Aug 12 2022 replicate_batch
@author: S. Garcia-Munoz sgarciam@ic.ak.uk salg@andrew.cmu.edu
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
import pyphi as phi
# Sequence of color blind friendly colors.
cb_color_seq=['b','r','m','navy','bisque','silver','aqua','pink','gray']
mpl.rcParams['axes.prop_cycle'] = mpl.cycler(color=cb_color_seq)
[docs]
def unique(df,colid):
''' Replacement of the np.unique routine, specifically for dataframes
unique(df,colid)
Args:
df: A pandas dataframe
colid: Column identifier
Returns:
A list with unique values in the order found in the dataframe
by Salvador Garcia (sgarciam@ic.ac.uk)
'''
aux=df.drop_duplicates(subset=colid,keep='first')
unique_entries=aux[colid].values.tolist()
return unique_entries
[docs]
def mean(X,axis):
X_nan_map = np.isnan(X)
X_ = X.copy()
if X_nan_map.any():
X_nan_map = X_nan_map*1
X_[X_nan_map==1] = 0
#Calculate mean without accounting for NaN'
if axis==0:
aux = np.sum(X_nan_map,axis=0)
x_mean = np.sum(X_,axis=0,keepdims=1)/(np.ones((1,X_.shape[1]))*X_.shape[0]-aux)
else:
aux = np.sum(X_nan_map,axis=1)
x_mean = np.sum(X_,axis=1,keepdims=1)/(np.ones((X_.shape[0],1))*X_.shape[1]-aux)
else:
x_mean = np.mean(X_,axis=axis,keepdims=1)
return x_mean.reshape(-1)
[docs]
def simple_align(bdata,nsamples):
''' Simple alignment for bacth data using row number to linearly interpolate
to the same number of samples
bdata_aligned= simple_align(bdata,nsamples)
Args:
bdata is a Pandas DataFrame where 1st column is Batch Identifier
and following columns are variables, each row is a new
time sample. Batches are concatenated vertically.
nsamples is the new number of samples to generate per batch
irrespective of phase
Returns:
A pandas dataframe with batch data resampled to nsamples
for all batches
by Salvador Garcia Munoz (sgarciam@imperial.ac.uk, salvadorgarciamunoz@gmail.com)
'''
bdata_a=[]
if (bdata.columns[1]=='PHASE') or \
(bdata.columns[1]=='phase') or \
(bdata.columns[1]=='Phase'):
phase=True
else:
phase = False
aux=bdata.drop_duplicates(subset=bdata.columns[0],keep='first')
unique_batches=aux[aux.columns[0]].values.tolist()
for b in unique_batches:
data_=bdata[bdata[bdata.columns[0]]==b]
indx=np.arange(data_.shape[0])
new_indx=np.linspace(0,data_.shape[0]-1,nsamples)
bname_rs=[data_[bdata.columns[0]].values[0]]*nsamples
if phase:
phase_list=data_[bdata.columns[1]].values.tolist()
roundindx=np.round(new_indx).astype('int').tolist()
phase_rs=[]
for i in roundindx:
phase_rs.append(phase_list[i])
vals=data_.values[:,2:]
cols=data_.columns[2:]
else:
vals=data_.values[:,1:]
cols=data_.columns[1:]
vals_rs=np.zeros((nsamples,vals.shape[1]))
for i in np.arange(vals.shape[1]):
vals_rs[:,i]=np.interp(new_indx,indx.astype('float'),vals[:,i].astype('float'))
df_=pd.DataFrame(vals_rs,columns=cols)
if phase:
df_.insert(0,bdata.columns[1],phase_rs)
df_.insert(0,bdata.columns[0],bname_rs)
bdata_a.append(df_)
bdata_a=pd.concat(bdata_a)
return bdata_a
[docs]
def phase_simple_align(bdata,nsamples):
''' Simple batch alignment (0 to 100%) per phase
bdata_aligned = phase_simple_align(bdata,nsamples)
Args:
bdata: is a Pandas DataFrame where 1st column is Batch Identifier
the second column is a phase indicator
and following columns are variables, each row is a new
time sample. Batches are concatenated vertically.
nsamples: if integer: Number of samples to collect per phase
if dictionary: samples to generate per phase e.g.
nsamples = {'Heating':100,'Reaction':200,'Cooling':10}
resampling is linear with respect to row number
Returns:
a pandas dataframe with batch data resampled (aligned)
by Salvador Garcia Munoz
(sgarciam@imperial.ac.uk , salvadorgarciamunoz@gmail.com)
'''
bdata_a=[]
if (bdata.columns[1]=='PHASE') or \
(bdata.columns[1]=='phase') or \
(bdata.columns[1]=='Phase'):
phase=True
else:
phase = False
if phase:
aux=bdata.drop_duplicates(subset=bdata.columns[0],keep='first')
unique_batches=aux[aux.columns[0]].values.tolist()
for b in unique_batches:
data_=bdata[bdata[bdata.columns[0]]==b]
vals_rs=[]
bname_rs=[]
phase_rs=[]
firstone=True
for p in nsamples.keys():
p_data= data_[data_[data_.columns[1]]==p]
samps=nsamples[p]
indx=np.arange(p_data.shape[0])
new_indx=np.linspace(0,p_data.shape[0]-1,samps)
bname_rs_=[p_data[p_data.columns[0]].values[0]]*samps
phase_rs_=[p_data[p_data.columns[1]].values[0]]*samps
vals=p_data.values[:,2:]
cols=p_data.columns[2:]
vals_rs_=np.zeros((samps,vals.shape[1]))
for i in np.arange(vals.shape[1]):
vals_rs_[:,i]=np.interp(new_indx,indx.astype('float'),vals[:,i].astype('float'))
if firstone:
vals_rs=vals_rs_
firstone=False
else:
vals_rs=np.vstack((vals_rs,vals_rs_))
bname_rs.extend(bname_rs_)
phase_rs.extend(phase_rs_)
df_=pd.DataFrame(vals_rs,columns=cols)
df_.insert(0,bdata.columns[1],phase_rs)
df_.insert(0,bdata.columns[0],bname_rs)
bdata_a.append(df_)
bdata_a=pd.concat(bdata_a)
return bdata_a
[docs]
def phase_iv_align(bdata,nsamples):
''' Batch alignment using an indicator variable
batch_aligned_data = phase_iv_align(bdata,nsamples)
Args:
bdata is a Pandas DataFrame where 1st column is Batch Identifier
the second column is a phase indicator
and following columns are variables, each row is a new
time sample. Batches are concatenated vertically.
nsamples:
if nsamples is a dictionary: samples to generate per phase e.g.
nsamples = {'Heating':100,'Reaction':200,'Cooling':10}
* If an indicator variable is used, with known start and end values
indicate it with a list like this:
[IVarID,num_samples,start_value,end_value]
example:
nsamples = {'Heating':['TIC101',100,30,50],'Reaction':200,'Cooling':10}
During the 'Heating' phase use TIC101 as an indicator variable
take 100 samples equidistant from TIC101=30 to TIC101=50
and align against that variable as a measure of batch evolution (instead of time)
* If an indicator variable is used, with unknown start but known end values
indicate it with a list like this:
[IVarID,num_samples,end_value]
example:
nsamples = {'Heating':['TIC101',100,50],'Reaction':200,'Cooling':10}
During the 'Heating' phase use TIC101 as an indicator variable
take 100 samples equidistant from the value of TIC101 at the start of the phase
to the point when TIC101=50 and align against that variable as a measure
of batch evolution (instead of time)
If no IV is sent, the resampling is linear with respect to row number per phase
Returns:
A pandas dataframe with batch data resampled (aligned)
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
bdata_a=[]
if (bdata.columns[1]=='PHASE') or \
(bdata.columns[1]=='phase') or \
(bdata.columns[1]=='Phase'):
phase=True
else:
phase = False
if phase:
aux=bdata.drop_duplicates(subset=bdata.columns[0],keep='first')
unique_batches=aux[aux.columns[0]].values.tolist()
for b in unique_batches:
data_=bdata[bdata[bdata.columns[0]]==b]
vals_rs=[]
bname_rs=[]
phase_rs=[]
firstone=True
for p in nsamples.keys():
p_data= data_[data_[data_.columns[1]]==p]
samps=nsamples[p]
if not(isinstance(samps,list)): #Linear alignment
indx=np.arange(p_data.shape[0])
new_indx=np.linspace(0,p_data.shape[0]-1,samps)
bname_rs_=[p_data[p_data.columns[0]].values[0]]*samps
phase_rs_=[p_data[p_data.columns[1]].values[0]]*samps
vals=p_data.values[:,2:]
cols=p_data.columns[2:]
vals_rs_=np.zeros((samps,vals.shape[1]))
for i in np.arange(vals.shape[1]):
x_=indx.astype('float')
y_=vals[:,i].astype('float')
x_=x_[~np.isnan(y_)]
y_=y_[~np.isnan(y_)]
#vals_rs_[:,i]=np.interp(new_indx,indx.astype('float'),vals[:,i].astype('float'))
if len(y_)==0:
vals_rs_[:,i]=np.nan
else:
vals_rs_[:,i]=np.interp(new_indx,x_,y_,left=np.nan,right=np.nan)
elif isinstance(samps,list): #align this phase with IV
if len(samps)==4:
iv_id=samps[0]
nsamp=samps[1]
start=samps[2]
end =samps[3]
new_indx=np.linspace(start,end,nsamp)
iv = p_data[iv_id].values.astype(float)
if len(samps)==3 :
iv_id = samps[0]
nsamp = samps[1]
end = samps[2]
if (not( isinstance(end,int) or isinstance(end,float)) and (isinstance(end,dict))
and not(firstone)):
#end is model to estimate the end value based on previous trajectory
cols=p_data.columns[2:].tolist()
fakebatch=pd.DataFrame(vals_rs,columns=cols)
fakebatch.insert(0,'phase',[p]*vals_rs.shape[0])
fakebatch.insert(0,'BatchID',[b]*vals_rs.shape[0])
preds=predict(fakebatch, end)
end_hat=preds['Yhat'][preds['Yhat'].columns[1]].values[0]
iv = p_data[iv_id].values.astype(float)
end_hat = end_hat + iv[0]
new_indx=np.linspace(iv[0],end_hat,nsamp)
else:
iv = p_data[iv_id].values.astype(float)
new_indx=np.linspace(iv[0],end,nsamp)
cols=p_data.columns[2:].tolist()
#indx_2_iv=cols.index(iv_id)
bname_rs_=[p_data[p_data.columns[0]].values[0]]*nsamp
phase_rs_=[p_data[p_data.columns[1]].values[0]]*nsamp
#Check monotonicity
if (np.all(np.diff(iv)<0) or np.all(np.diff(iv)>0)):
indx=iv
vals=p_data.values[:,2:]
#replace the IV with a linear variable (time surrogate)
#linear_indx=np.arange(vals.shape[0])
#vals[:,indx_2_iv]=linear_indx
vals_rs_=np.zeros((nsamp,vals.shape[1]))
for i in np.arange(vals.shape[1]):
x_=indx.astype('float')
y_=vals[:,i].astype('float')
x_=x_[~np.isnan(y_)]
y_=y_[~np.isnan(y_)]
#vals_rs_[:,i]=np.interp(new_indx,indx.astype('float'),vals[:,i].astype('float'))
#vals_rs_[:,i]=np.interp(new_indx,indx ,vals[:,i].astype('float'),left=np.nan,right=np.nan)
if len(y_)==0:
vals_rs_[:,i]=np.nan
else:
vals_rs_[:,i]=np.interp(new_indx,x_,y_,left=np.nan,right=np.nan)
else: #if monotonicity fails try to remove non-monotonic vals
print('Indicator variable '+iv_id+' for batch '+ b +' is not monotinic')
print('this is not ideal, maybe rethink your IV')
print('trying to remove non-monotonic samples')
x=np.arange(0,len(iv))
m=(1/sum(x**2))*sum(x*(iv-iv[0]))
#if iv[0]-iv[-1] > 0 :
if m<0:
expected_direction = -1 #decreasing IV
else:
expected_direction = 1 #increasing IV
vals=p_data.values[:,2:]
new_vals=vals[0,:]
prev_iv=iv[0]
mon_iv=[iv[0]]
for i in np.arange(1,vals.shape[0]):
if (np.sign(iv[i] - prev_iv) * expected_direction)==1: #monotonic
new_vals=np.vstack((new_vals,vals[i,:]))
prev_iv=iv[i]
mon_iv.append(iv[i])
indx=np.array(mon_iv)
vals=new_vals.copy()
#replace the IV with a linear variable (time surrogate)
#linear_indx=np.arange(vals.shape[0])
#vals[:,indx_2_iv]=linear_indx
vals_rs_=np.zeros((nsamp,vals.shape[1]))
for i in np.arange(vals.shape[1]):
x_=indx.astype('float')
y_=vals[:,i].astype('float')
x_=x_[~np.isnan(y_)]
y_=y_[~np.isnan(y_)]
#vals_rs_[:,i]=np.interp(new_indx,indx,vals[:,i].astype('float'),left=np.nan,right=np.nan)
if len(y_)==0:
vals_rs_[:,i]=np.nan
else:
vals_rs_[:,i]=np.interp(new_indx,x_,y_,left=np.nan,right=np.nan)
if firstone:
vals_rs=vals_rs_
firstone=False
else:
vals_rs=np.vstack((vals_rs,vals_rs_))
bname_rs.extend(bname_rs_)
phase_rs.extend(phase_rs_)
df_=pd.DataFrame(vals_rs,columns=cols)
df_.insert(0,bdata.columns[1],phase_rs)
df_.insert(0,bdata.columns[0],bname_rs)
bdata_a.append(df_)
bdata_a=pd.concat(bdata_a)
return bdata_a
[docs]
def plot_var_all_batches(bdata,*,which_var=False,plot_title='',mkr_style='.-',
phase_samples=False,alpha_=0.2,timecolumn=False,lot_legend=False):
'''Plotting routine for batch data plot data for all batches in a dataset
plot_var_all_batches(bdata,*,which_var=False,plot_title='',mkr_style='.-',
phase_samples=False,alpha_=0.2,timecolumn=False,lot_legend=False):
Args:
bdata: Batch data organized as: column[0] = Batch Identifier column name is unrestricted
column[1] = Phase information per sample must be called 'Phase','phase', or 'PHASE'
this information is optional
column[2:]= Variables measured throughout the batch
The data for each batch is one on top of the other in a vertical matrix
which_var: Which variables are to be plotted, if not sent, all are.
plot_title: Optional text to be used as the title of all figures
phase_samples: information used to align the batch, so that phases are marked in the plot
alpha_: Transparency for the phase dividing line
timecolumn: Name of the column that indicates time, if given all data is plotted against time
lot_legend: Flag to add a legend for the batch identifiers
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
var_list=which_var
if (bdata.columns[1]=='PHASE') or \
(bdata.columns[1]=='phase') or \
(bdata.columns[1]=='Phase'):
if isinstance(var_list,bool):
var_list=bdata.columns[2:].tolist()
else:
if isinstance(var_list,bool):
var_list=bdata.columns[1:].tolist()
if isinstance(var_list,str) and not(isinstance(var_list,list)):
var_list=[var_list]
for v in var_list:
plt.figure()
if not(isinstance(timecolumn,bool)):
dat=bdata[[bdata.columns[0],timecolumn,v]]
else:
dat=bdata[[bdata.columns[0],v]]
for b in np.unique(dat[bdata.columns[0]]):
data_=dat[v][dat[dat.columns[0]]==b]
if not(isinstance(timecolumn,bool)):
tim=dat[timecolumn][dat[dat.columns[0]]==b]
if lot_legend:
plt.plot(tim.values,data_.values,mkr_style,label=b)
else:
plt.plot(tim.values,data_.values,mkr_style)
plt.xlabel(timecolumn)
else:
if lot_legend:
plt.plot(1+np.arange(len(data_.values)),data_.values,mkr_style,label=b)
else:
plt.plot(1+np.arange(len(data_.values)),data_.values,mkr_style)
plt.xlabel('Sample')
plt.ylabel(v)
if lot_legend:
plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
if not(isinstance(phase_samples,bool)):
s_txt=1
s_lin=1
plt.axvline(x=1,color='magenta',alpha=alpha_)
for p in phase_samples.keys():
if isinstance(phase_samples[p],list):
s_lin+=phase_samples[p][1]
else:
s_lin+=phase_samples[p]
plt.axvline(x=s_lin,color='magenta',alpha=alpha_)
ylim_=plt.ylim()
plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='magenta')
if isinstance(phase_samples[p],list):
s_txt+=phase_samples[p][1]
else:
s_txt+=phase_samples[p]
plt.title(plot_title)
plt.tight_layout()
[docs]
def plot_batch(bdata,which_batch,which_var,*,include_mean_exc=False,include_set=False,
phase_samples=False,single_plot=False,plot_title=''):
'''Plotting routine for batch data
plot_batch(bdata,which_batch,which_var,*,include_mean_exc=False,include_set=False,
phase_samples=False,single_plot=False,plot_title='')
Args:
bdata: Batch data organized as: column[0] = Batch Identifier column name is unrestricted
column[1] = Phase information per sample must be called 'Phase','phase', or 'PHASE'
this information is optional
column[2:]= Variables measured throughout the batch
The data for each batch is one on top of the other in a vertical matrix
which_batch: Which batches to plot
which_var: Which variables are to be plotted, if not sent, all are.
include_mean_exc: Include the mean trajectory of the set EXCLUDING the one batch being plotted
include_set: Include all other trajectories (will be colored in light gray)
phase_samples: Information used to align the batch, so that phases are marked in the plot
single_plot: If True => Plot everything in a single axis
plot_title: Optional text to be added to the title of all figures
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
if isinstance(which_batch,str) and not(isinstance(which_batch,list)):
which_batch=[which_batch]
if isinstance(which_var,str) and not(isinstance(which_var,list)):
which_var=[which_var]
if single_plot:
plt.figure()
first_pass=True
for b in which_batch:
this_batch=bdata[bdata[bdata.columns[0]]==b]
all_others=bdata[bdata[bdata.columns[0]]!=b]
for v in which_var:
this_var_this_batch=this_batch[v].values
if not(single_plot):
plt.figure()
x_axis=np.arange(len(this_var_this_batch ))+1
plt.plot(x_axis,this_var_this_batch,'k',label=b)
if include_mean_exc or include_set:
this_var_all_others=[]
max_obs=0
for bb in np.unique(all_others[bdata.columns[0]]):
this_var_all_others.append(all_others[v][all_others[bdata.columns[0]]==bb ].values)
if len(all_others[v][all_others[bdata.columns[0]]==bb ].values ) > max_obs:
max_obs = len(all_others[v][all_others[bdata.columns[0]]==bb ].values )
this_var_all_others_=[]
for bb in np.unique(all_others[bdata.columns[0]]):
_to_append=all_others[v][all_others[bdata.columns[0]]==bb ].values
this_len=len(_to_append)
_to_append=np.append(_to_append,np.tile(np.nan,max_obs-this_len))
this_var_all_others_.append(_to_append)
this_var_all_others_=np.array(this_var_all_others_)
x_axis=np.arange(max_obs)+1
if not(single_plot):
if include_set:
for t in this_var_all_others:
x_axis=np.arange(len(t))+1
plt.plot(x_axis,t,'m',alpha=0.1)
plt.plot(x_axis,this_var_all_others[-1],'m',alpha=0.1,label='rest of set')
if include_mean_exc:
plt.plot(x_axis,mean(this_var_all_others_,axis=0),'r',label='Mean without '+b )
else:
if first_pass:
if include_set:
plt.plot(x_axis,this_var_all_others_.T,'m',alpha=0.1)
plt.plot(x_axis,this_var_all_others_[0,:],'m',alpha=0.1,label='rest of set')
if include_mean_exc:
plt.plot(x_axis,mean(this_var_all_others_,axis=0),'r',label='Mean without '+b )
first_pass=False
if not(isinstance(phase_samples,bool)):
s_txt=1
s_lin=1
plt.axvline(x=1,color='magenta',alpha=0.2)
for p in phase_samples.keys():
if isinstance(phase_samples[p],list):
s_lin+=phase_samples[p][1]
else:
s_lin+=phase_samples[p]
plt.axvline(x=s_lin,color='magenta',alpha=0.2)
ylim_=plt.ylim()
plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='magenta')
if isinstance(phase_samples[p],list):
s_txt+=phase_samples[p][1]
else:
s_txt+=phase_samples[p]
plt.title(b+' ' + plot_title)
plt.xlabel('sample')
plt.ylabel(v)
plt.legend(bbox_to_anchor=(1.04,1), borderaxespad=0)
plt.tight_layout()
[docs]
def unfold_horizontal(bdata):
if (bdata.columns[1]=='PHASE') or \
(bdata.columns[1]=='phase') or \
(bdata.columns[1]=='Phase'):
phase=True
else:
phase = False
firstone=True
clbl=[]
bid=[]
aux=bdata.drop_duplicates(subset=bdata.columns[0],keep='first')
unique_batches=aux[aux.columns[0]].values.tolist()
for b in unique_batches:
data_=bdata[bdata[bdata.columns[0]]==b]
if phase:
vals=data_.values[:,2:]
cols=data_.columns[2:]
else:
vals=data_.values[:,1:]
cols=data_.columns[1:]
row_=[]
firstone_c=True
for c in np.arange(vals.shape[1]):
r_= vals[:,[c]].reshape(1,-1)
if firstone:
for i in np.arange(1,r_.shape[1]+1):
clbl.append(cols[c]+'_'+str(i))
bid.append(cols[c] )
if firstone_c:
row_=r_
firstone_c=False
else:
row_=np.hstack((row_,r_))
if firstone:
bdata_hor=row_
firstone=False
else:
bdata_hor=np.vstack((bdata_hor,row_))
bdata_hor= pd.DataFrame(bdata_hor,columns=clbl)
bdata_hor.insert(0, bdata.columns[0],unique_batches)
return bdata_hor,clbl,bid
[docs]
def refold_horizontal(xuf,nvars,nsamples):
#xuf is strictly numberical
Xb=[]
for i in np.arange(xuf.shape[0]):
r=xuf[i,:]
for v in np.arange(nvars):
var=r[:nsamples]
var=var.reshape(-1,1)
if v<(nvars-1):
r=r[nsamples:]
if v==0:
batch=var
else:
batch=np.hstack((batch,var))
if i==0:
Xb=batch
else:
Xb=np.vstack((Xb,batch))
return Xb
def _uf_l(L,spb,vpb):
first_=True
for i in np.arange(L.shape[1]):
col_=L[:,[i]]
s_=np.arange( spb )
first_flag=True
for v in np.arange(vpb):
s_2use=(s_ + v*spb)
if first_flag:
col_rearranged=col_[s_2use]
first_flag=False
else:
col_rearranged = np.hstack((col_rearranged,col_[s_2use]))
col_rearranged = col_rearranged.reshape(1,-1)
if first_:
L_uf_hor_mon = col_rearranged.T
first_ = False
else:
L_uf_hor_mon = np.hstack((L_uf_hor_mon,col_rearranged.T ))
return L_uf_hor_mon
def _uf_hor_mon_loadings(mvmobj):
spb = mvmobj['nsamples']
vpb = mvmobj['nvars']
is_pls=False
if 'Q' in mvmobj:
is_pls=True
ninit=mvmobj['ninit']
if is_pls:
if ninit > 0:
z_ws = mvmobj['Ws'][:ninit,:]
z_w = mvmobj['W'][:ninit,:]
z_p = mvmobj['P'][:ninit,:]
z_mx = mvmobj['mx'][:ninit]
z_sx = mvmobj['sx'][:ninit]
x_ws = mvmobj['Ws'][ninit:,:]
x_w = mvmobj['W'][ninit:,:]
x_p = mvmobj['P'][ninit:,:]
x_mx = mvmobj['mx'][ninit:]
x_sx = mvmobj['sx'][ninit:]
Ws_ufm = np.vstack(( z_ws,_uf_l(x_ws,spb,vpb)))
W_ufm = np.vstack(( z_w ,_uf_l(x_w ,spb,vpb)))
P_ufm = np.vstack(( z_p ,_uf_l(x_p,spb,vpb)))
mx_ufm = np.vstack(( z_mx.reshape(-1,1),_uf_l( x_mx.reshape(-1,1),spb,vpb ))).reshape(-1)
sx_ufm = np.vstack(( z_sx.reshape(-1,1),_uf_l( x_sx.reshape(-1,1),spb,vpb ))).reshape(-1)
else:
Ws_ufm = _uf_l(mvmobj['Ws'],spb,vpb)
W_ufm = _uf_l(mvmobj['W'],spb,vpb)
P_ufm = _uf_l(mvmobj['P'],spb,vpb)
mx_ufm = _uf_l( mvmobj['mx'].reshape(-1,1),spb,vpb ).reshape(-1)
sx_ufm = _uf_l( mvmobj['sx'].reshape(-1,1),spb,vpb ).reshape(-1)
else:
P_ufm = _uf_l(mvmobj['P'],spb,vpb)
mx_ufm = _uf_l( mvmobj['mx'].reshape(-1,1),spb,vpb ).reshape(-1)
sx_ufm = _uf_l( mvmobj['sx'].reshape(-1,1),spb,vpb ).reshape(-1)
if is_pls:
mvmobj['Ws_ufm'] = Ws_ufm
mvmobj['W_ufm'] = W_ufm
mvmobj['P_ufm'] = P_ufm
else:
mvmobj['P_ufm'] = P_ufm
mvmobj['mx_ufm']=mx_ufm
mvmobj['sx_ufm']=sx_ufm
return mvmobj
[docs]
def loadings(mmvm_obj,dim,*,r2_weighted=False,which_var=False):
''' Plot batch loadings for variables as a function of time/sample
loadings(mmvm_obj,dim,*,r2_weighted=False,which_var=False)
Args:
mmvm_obj: Multiway PCA or PLS object
dim: What component or latent variable to plot
r2_weighted: If True => weight the loading by the R2pv
which_var: Variable for which the plot is done, if not sent all are plotted
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
dim=dim-1
if 'Q' in mmvm_obj:
if mmvm_obj['ninit']==0:
if r2_weighted:
aux_df=pd.DataFrame(mmvm_obj['Ws']*mmvm_obj['r2xpv'])
else:
aux_df=pd.DataFrame(mmvm_obj['Ws'])
aux_df.insert(0,'bid',mmvm_obj['bid'])
if isinstance(which_var,bool):
vars_to_plot=unique(aux_df,'bid')
else:
if isinstance(which_var,str):
vars_to_plot=[which_var]
if isinstance(which_var,list):
vars_to_plot=which_var
for i,v in enumerate(vars_to_plot):
plt.figure()
dat=aux_df[dim][aux_df['bid']==v].values
plt.fill_between(np.arange(mmvm_obj['nsamples'])+1, dat )
plt.xlabel('sample')
if r2_weighted:
plt.ylabel('$W^* * R^2$ ['+str(dim+1)+']')
else:
plt.ylabel('$W^*$ ['+str(dim+1)+']')
plt.title(v)
plt.ylim(mmvm_obj['Ws'][:,dim].min()*1.2,mmvm_obj['Ws'][:,dim].max()*1.2 )
ylim_=plt.ylim()
phase_samples=mmvm_obj['phase_samples']
if not(isinstance(phase_samples,bool)):
s_txt=1
s_lin=1
plt.axvline(x=1,color='magenta',alpha=0.2)
for p in phase_samples.keys():
if isinstance(phase_samples[p],list):
s_lin+=phase_samples[p][1]
else:
s_lin+=phase_samples[p]
plt.axvline(x=s_lin,color='magenta',alpha=0.2)
# ylim_=[mmvm_obj['Ws'][dim,:].min()*1.2,mmvm_obj['Ws'][dim,:].max()*1.2 ]
plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='magenta')
if isinstance(phase_samples[p],list):
s_txt+=phase_samples[p][1]
else:
s_txt+=phase_samples[p]
plt.tight_layout()
else:
z_loadings= mmvm_obj['Ws'] [np.arange(mmvm_obj['ninit'])]
r2pvz = mmvm_obj['r2xpv'] [np.arange(mmvm_obj['ninit']),:]
if r2_weighted:
z_loadings = z_loadings *r2pvz
zvars = mmvm_obj['varidX'][0:mmvm_obj['ninit']]
plt.figure()
plt.bar(zvars,z_loadings[:,dim] )
plt.xticks(rotation=90)
if r2_weighted:
plt.ylabel('$W^* * R^2$ ['+str(dim+1)+']')
else:
plt.ylabel('$W^*$ ['+str(dim+1)+']')
plt.title('Loadings for Initial Conditions')
plt.tight_layout()
rows_=np.arange( mmvm_obj['nsamples']*mmvm_obj['nvars'])+mmvm_obj['ninit']
if r2_weighted:
aux_df=pd.DataFrame(mmvm_obj['Ws'][rows_,:]*mmvm_obj['r2xpv'][rows_,:] )
else:
aux_df=pd.DataFrame(mmvm_obj['Ws'][rows_,:] )
aux_df.insert(0,'bid',mmvm_obj['bid'])
if isinstance(which_var,bool):
vars_to_plot=unique(aux_df,'bid')
else:
if isinstance(which_var,str):
vars_to_plot=[which_var]
if isinstance(which_var,list):
vars_to_plot=which_var
for i,v in enumerate(vars_to_plot):
plt.figure()
dat=aux_df[dim][aux_df['bid']==v].values
plt.fill_between(np.arange(mmvm_obj['nsamples'])+1, dat )
plt.xlabel('sample')
if r2_weighted:
plt.ylabel('$W^* * R^2$ ['+str(dim+1)+']')
else:
plt.ylabel('$W^*$ ['+str(dim+1)+']')
plt.title(v)
plt.ylim(mmvm_obj['Ws'][:,dim].min()*1.2,mmvm_obj['Ws'][:,dim].max()*1.2 )
ylim_=plt.ylim()
phase_samples=mmvm_obj['phase_samples']
if not(isinstance(phase_samples,bool)):
s_txt=1
s_lin=1
plt.axvline(x=1,color='magenta',alpha=0.2)
for p in phase_samples.keys():
if isinstance(phase_samples[p],list):
s_lin+=phase_samples[p][1]
else:
s_lin+=phase_samples[p]
plt.axvline(x=s_lin,color='magenta',alpha=0.2)
# ylim_=[mmvm_obj['Ws'][dim,:].min()*1.2,mmvm_obj['Ws'][dim,:].max()*1.2 ]
plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='magenta')
if isinstance(phase_samples[p],list):
s_txt+=phase_samples[p][1]
else:
s_txt+=phase_samples[p]
plt.tight_layout()
else:
if r2_weighted:
aux_df=pd.DataFrame(mmvm_obj['P']*mmvm_obj['r2xpv'])
else:
aux_df=pd.DataFrame(mmvm_obj['P'])
aux_df.insert(0,'bid',mmvm_obj['bid'])
if isinstance(which_var,bool):
vars_to_plot=unique(aux_df,'bid')
else:
if isinstance(which_var,str):
vars_to_plot=[which_var]
if isinstance(which_var,list):
vars_to_plot=which_var
for i,v in enumerate(vars_to_plot):
plt.figure()
dat=aux_df[dim][aux_df['bid']==v].values
plt.fill_between(np.arange(mmvm_obj['nsamples'])+1, dat )
plt.xlabel('sample')
if r2_weighted:
plt.ylabel('P * $R^2$ ['+str(dim+1)+']')
else:
plt.ylabel('P ['+str(dim+1)+']')
plt.title(v)
plt.ylim(mmvm_obj['P'][:,dim].min()*1.2,mmvm_obj['P'][:,dim].max()*1.2 )
ylim_=plt.ylim()
phase_samples=mmvm_obj['phase_samples']
if not(isinstance(phase_samples,bool)):
s_txt=1
s_lin=1
plt.axvline(x=1,color='magenta',alpha=0.2)
for p in phase_samples.keys():
if isinstance(phase_samples[p],list):
s_lin+=phase_samples[p][1]
else:
s_lin+=phase_samples[p]
plt.axvline(x=s_lin,color='magenta',alpha=0.2)
#ylim_=[mmvm_obj['P'][dim,:].min()*1.2,mmvm_obj['P'][dim,:].max()*1.2 ]
plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='magenta')
if isinstance(phase_samples[p],list):
s_txt+=phase_samples[p][1]
else:
s_txt+=phase_samples[p]
plt.tight_layout()
[docs]
def loadings_abs_integral(mmvm_obj,*,r2_weighted=False,addtitle=False):
'''Plot the integral of the absolute value of loadings for a batch
loadings_abs_integral(mmvm_obj,*,r2_weighted=False,addtitle=False)
Args:
mmvm_obj: A multiway PCA or PLS model
r2_weighted: Boolean flag, if True then in weights the loading by the R2pv
addtitle: Text to place in the title of the figure
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
if 'Q' in mmvm_obj:
if mmvm_obj['ninit']==0:
if r2_weighted:
aux_df=pd.DataFrame(mmvm_obj['Ws']*mmvm_obj['r2xpv'])
else:
aux_df=pd.DataFrame(mmvm_obj['Ws'])
aux_df.insert(0,'bid',mmvm_obj['bid'])
else:
rows_=np.arange( mmvm_obj['nsamples']*mmvm_obj['nvars'])+mmvm_obj['ninit']
if r2_weighted:
aux_df=pd.DataFrame(mmvm_obj['Ws'][rows_,:]*mmvm_obj['r2xpv'][rows_,:] )
else:
aux_df=pd.DataFrame(mmvm_obj['Ws'][rows_,:] )
aux_df.insert(0,'bid',mmvm_obj['bid'])
integral_of_loadings=[]
aux_vname=[]
for i,v in enumerate(unique(aux_df,'bid')):
dat=aux_df[aux_df['bid']==v].values[:,1:]
integral_of_loadings.append(np.sum(np.abs(dat),axis=0,keepdims=True)[0])
aux_vname.append(v)
integral_of_loadings=np.array(integral_of_loadings)
for a in np.arange(mmvm_obj['T'].shape[1]):
plt.figure()
plt.bar(aux_vname,integral_of_loadings[:,a])
if r2_weighted:
plt.ylabel(r'$\sum (|W^*| * R^2$ ['+str(a+1)+'])')
else:
plt.ylabel(r'$\sum (|W^*|$ ['+str(a+1)+'])')
plt.xticks(rotation=75)
if not(isinstance(addtitle,bool)) and isinstance(addtitle,str):
plt.title(addtitle)
plt.tight_layout()
else:
if r2_weighted:
aux_df=pd.DataFrame(mmvm_obj['P']*mmvm_obj['r2xpv'])
else:
aux_df=pd.DataFrame(mmvm_obj['P'])
aux_df.insert(0,'bid',mmvm_obj['bid'])
integral_of_loadings=[]
aux_vname=[]
for i,v in enumerate(unique(aux_df,'bid')):
dat=aux_df[aux_df['bid']==v].values[:,1:]
integral_of_loadings.append(np.sum(np.abs(dat),axis=0,keepdims=True)[0])
aux_vname.append(v)
integral_of_loadings=np.array(integral_of_loadings)
for a in np.arange(mmvm_obj['T'].shape[1]):
plt.figure()
plt.bar(aux_vname,integral_of_loadings[:,a])
if r2_weighted:
plt.ylabel(r'$\sum (|P| * R^2$ ['+str(a+1)+'])')
else:
plt.ylabel(r'$\sum$ (|P| ['+str(a+1)+'])')
plt.xticks(rotation=75)
if not(isinstance(addtitle,bool)) and isinstance(addtitle,str):
plt.title(addtitle)
plt.tight_layout()
[docs]
def batch_vip(mmvm_obj,*,addtitle=False):
''' plot the summation across componets of the integral of the absolute value of loadings for a batch
multiplied by the R2 [which kinda mimicks the VIP]
batch_vip(mmvm_obj,*,addtitle=False)
Args:
mmvm_obj: A multiway PCA or PLS model
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
r2_weighted=True
if 'Q' in mmvm_obj:
if mmvm_obj['ninit']==0:
if r2_weighted:
aux_df=pd.DataFrame(mmvm_obj['Ws']*np.tile(mmvm_obj['r2y'],(mmvm_obj['Ws'].shape[0],1)))
else:
aux_df=pd.DataFrame(mmvm_obj['Ws'])
aux_df.insert(0,'bid',mmvm_obj['bid'])
else:
rows_=np.arange( mmvm_obj['nsamples']*mmvm_obj['nvars'])+mmvm_obj['ninit']
if r2_weighted:
aux_df=pd.DataFrame(mmvm_obj['Ws'][rows_,:]*mmvm_obj['r2xpv'][rows_,:] )
else:
aux_df=pd.DataFrame(mmvm_obj['Ws'][rows_,:] )
aux_df.insert(0,'bid',mmvm_obj['bid'])
integral_of_loadings=[]
aux_vname=[]
for i,v in enumerate(unique(aux_df,'bid')):
dat=aux_df[aux_df['bid']==v].values[:,1:]
integral_of_loadings.append(np.sum(np.abs(dat),axis=0,keepdims=True)[0])
aux_vname.append(v)
integral_of_loadings=np.array(integral_of_loadings)
plt.figure()
plt.bar(aux_vname,np.sum(integral_of_loadings,axis=1))
plt.ylabel('Batch VIP')
plt.xticks(rotation=75)
if not(isinstance(addtitle,bool)) and isinstance(addtitle,str):
plt.title(addtitle)
plt.tight_layout()
else:
if r2_weighted:
aux_df=pd.DataFrame(mmvm_obj['P']*mmvm_obj['r2xpv'])
else:
aux_df=pd.DataFrame(mmvm_obj['P'])
aux_df.insert(0,'bid',mmvm_obj['bid'])
integral_of_loadings=[]
aux_vname=[]
for i,v in enumerate(unique(aux_df,'bid')):
dat=aux_df[aux_df['bid']==v].values[:,1:]
integral_of_loadings.append(np.sum(np.abs(dat),axis=0,keepdims=True)[0])
aux_vname.append(v)
integral_of_loadings=np.array(integral_of_loadings)
plt.figure()
plt.bar(aux_vname,np.sum(integral_of_loadings,axis=1))
plt.ylabel(r'$\sum_{a} \sum (|P| * R^2$')
plt.xticks(rotation=75)
if not(isinstance(addtitle,bool)) and isinstance(addtitle,str):
plt.title(addtitle)
plt.tight_layout()
[docs]
def r2pv(mmvm_obj,*,which_var=False):
''' Plot batch r2 for variables as a function of time/sample
r2pv(mmvm_obj,*,which_var=False)
Args:
mmvm_obj: Multiway PCA or PLS object
which_var: Variable for which the plot is done, if not sent all are plotted
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
if mmvm_obj['ninit']==0:
aux_df=pd.DataFrame(mmvm_obj['r2xpv'])
aux_df.insert(0,'bid',mmvm_obj['bid'])
if isinstance(which_var,bool):
vars_to_plot=unique(aux_df,'bid')
else:
if isinstance(which_var,str):
vars_to_plot=[which_var]
if isinstance(which_var,list):
vars_to_plot=which_var
for i,v in enumerate(vars_to_plot):
dat=aux_df[aux_df['bid']==v].values*100
dat=dat[:,1:].astype(float)
dat=np.cumsum(dat,axis=1)
dat=np.hstack((np.zeros((dat.shape[0],1)) ,dat))
plt.figure()
for a in np.arange(mmvm_obj['A'])+1:
plt.fill_between(np.arange(mmvm_obj['nsamples'])+1, dat[:,a],dat[:,a-1],label='LV #'+str(a) )
plt.xlabel('sample')
plt.ylabel('$R^2$pvX (%)')
plt.legend()
plt.title(v)
plt.ylim(0,100)
ylim_=plt.ylim()
phase_samples=mmvm_obj['phase_samples']
if not(isinstance(phase_samples,bool)):
s_txt=1
s_lin=1
plt.axvline(x=1,color='black',alpha=0.2)
for p in phase_samples.keys():
if isinstance(phase_samples[p],list):
s_lin+=phase_samples[p][1]
else:
s_lin+=phase_samples[p]
plt.axvline(x=s_lin,color='black',alpha=0.2)
plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='black')
if isinstance(phase_samples[p],list):
s_txt+=phase_samples[p][1]
else:
s_txt+=phase_samples[p]
plt.tight_layout()
if 'Q' in mmvm_obj:
r2pvy = mmvm_obj['r2ypv']*100
#yvars=mmvm_obj['varidY']
lbls=[]
for a in np.arange(1,mmvm_obj['A']+1):
lbls.append('LV #'+str(a))
r2pvy_pd=pd.DataFrame(r2pvy,index=mmvm_obj['varidY'],columns=lbls)
fig1,ax1=plt.subplots()
r2pvy_pd.plot(kind='bar', stacked=True,ax=ax1)
#ax.set_xticks(rotation=90)
ax1.set_ylabel('$R^2$pvY')
ax1.set_title('$R^2$ per LV for Y-Space')
fig1.tight_layout()
else:
r2pvz = mmvm_obj['r2xpv'] [np.arange(mmvm_obj['ninit']),:]*100
zvars = mmvm_obj['varidX'][0:mmvm_obj['ninit']]
lbls=[]
for a in np.arange(1,mmvm_obj['A']+1):
lbls.append('LV #'+str(a))
r2pvz_pd=pd.DataFrame(r2pvz,index=zvars,columns=lbls)
fig2,ax2=plt.subplots()
r2pvz_pd.plot(kind='bar', stacked=True,ax=ax2)
ax2.set_ylabel('$R^2$pvZ')
ax2.set_title('$R^2$ Initial Conditions')
fig2.tight_layout()
rows_=np.arange( mmvm_obj['nsamples']*mmvm_obj['nvars'])+mmvm_obj['ninit']
aux_df=pd.DataFrame(mmvm_obj['r2xpv'][rows_,:] )
aux_df.insert(0,'bid',mmvm_obj['bid'])
if isinstance(which_var,bool):
vars_to_plot=unique(aux_df,'bid')
else:
if isinstance(which_var,str):
vars_to_plot=[which_var]
if isinstance(which_var,list):
vars_to_plot=which_var
for i,v in enumerate(vars_to_plot):
dat=aux_df[aux_df['bid']==v].values*100
dat=dat[:,1:].astype(float)
dat=np.cumsum(dat,axis=1)
dat=np.hstack((np.zeros((dat.shape[0],1)) ,dat))
plt.figure()
for a in np.arange(mmvm_obj['A'])+1:
plt.fill_between(np.arange(mmvm_obj['nsamples'])+1, dat[:,a],dat[:,a-1],label='LV #'+str(a) )
plt.xlabel('sample')
plt.ylabel('$R^2$pvX (%)')
plt.legend()
plt.title(v)
plt.ylim(0,100)
ylim_=plt.ylim()
phase_samples=mmvm_obj['phase_samples']
if not(isinstance(phase_samples,bool)):
s_txt=1
s_lin=1
plt.axvline(x=1,color='black',alpha=0.2)
for p in phase_samples.keys():
if isinstance(phase_samples[p],list):
s_lin+=phase_samples[p][1]
else:
s_lin+=phase_samples[p]
plt.axvline(x=s_lin,color='black',alpha=0.2)
plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='black')
if isinstance(phase_samples[p],list):
s_txt+=phase_samples[p][1]
else:
s_txt+=phase_samples[p]
plt.tight_layout()
if 'Q' in mmvm_obj:
r2pvy = mmvm_obj['r2ypv']*100
#yvars=mmvm_obj['varidY']
lbls=[]
for a in np.arange(1,mmvm_obj['A']+1):
lbls.append('LV #'+str(a))
r2pvy_pd=pd.DataFrame(r2pvy,index=mmvm_obj['varidY'],columns=lbls)
fig1,ax1=plt.subplots()
r2pvy_pd.plot(kind='bar', stacked=True,ax=ax1)
ax1.set_ylabel('$R^2$pvY')
ax1.set_title('$R^2$ per LV for Y-Space')
fig1.tight_layout()
[docs]
def mpca(xbatch,a,*,unfolding='batch wise',phase_samples=False,cross_val=0):
''' Multi-way PCA for batch analysis
mpca_obj= mpca(xbatch,a,*,unfolding='batch wise',phase_samples=False,cross_val=0)
Args:
xbatch: Pandas dataframe with aligned batch data it is assumed
that all batches have the same number of samples
a: Number of PC's to fit
unfolding: 'batch wise' or 'variable wise'
phase_samples: information about samples per phase [optional]
cross_val: percent of elements for cross validation (defult is 0 = no cross val)
Returns:
mpca_obj: A Dictionary with all the parameters for the MPCA model
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
if (xbatch.columns[1]=='PHASE') or \
(xbatch.columns[1]=='phase') or \
(xbatch.columns[1]=='Phase'):
nvars = xbatch.shape[1]-2
else:
nvars = xbatch.shape[1]-1
nbatches = len(np.unique(xbatch[xbatch.columns[0]]))
nsamples = xbatch.shape[0]/nbatches
if unfolding=='batch wise':
# remove low variance columns keeping record of the original order
x_uf_,colnames,bid_o = unfold_horizontal(xbatch) # colnames is original set of columns
x_uf,colsrem = phi.clean_low_variances(x_uf_,shush=True) # colsrem are columns removed
mx_rem=x_uf_[colsrem].mean().tolist()
mx_rem=np.array(mx_rem)
mpca_obj=phi.pca(x_uf,a,cross_val=cross_val)
if len(colsrem)>0:
xtra_col = np.zeros((2+2*a,1 ))
xtra_col[0] = 1
xtra_cols = np.tile(xtra_col,(1,len(colsrem)))
xtra_cols[1,:] = mx_rem
aux = np.vstack((mpca_obj['sx'],mpca_obj['mx'],mpca_obj['P'].T,mpca_obj['r2xpv'].T))
aux = np.hstack((aux,xtra_cols))
all_cols = x_uf.columns[1:].tolist()
all_cols.extend(colsrem)
aux_pd = pd.DataFrame(aux,columns=all_cols)
aux_pd = aux_pd[colnames]
aux_new = aux_pd.values
sx_ = aux_new[0,:].reshape(1,-1)
mpca_obj['sx'] = sx_
mx_ = aux_new[1,:].reshape(1,-1)
mpca_obj['mx'] = mx_
aux_new = aux_new[2:,:]
p_ = aux_new[0:a,:]
mpca_obj['P'] = p_.T
r2xpv_ = aux_new[a:,:]
mpca_obj['r2xpv'] = r2xpv_.T
mpca_obj['varidX']= colnames
mpca_obj['bid'] = bid_o
mpca_obj['uf'] ='batch wise'
mpca_obj['phase_samples'] = phase_samples
mpca_obj['nvars'] = int(nvars)
mpca_obj['nbatches'] = int(nbatches)
mpca_obj['nsamples'] = int(nsamples)
mpca_obj['ninit'] = 0
mpca_obj['A'] = a
elif unfolding=='variable wise':
if (xbatch.columns[1]=='PHASE') or \
(xbatch.columns[1]=='phase') or \
(xbatch.columns[1]=='Phase'):
xbatch_=xbatch.copy()
xbatch_.drop(xbatch.columns[1],axis=1,inplace=True)
else:
xbatch_=xbatch.copy()
xbatch_,colsrem = phi.clean_low_variances(xbatch_) # colsrem are columns removed
xbatch_ = phi.clean_empty_rows(xbatch_)
mpca_obj=phi.pca(xbatch_,a)
mpca_obj['uf'] ='variable wise'
mpca_obj['phase_samples'] = phase_samples
mpca_obj['nvars'] = nvars
mpca_obj['nbatches'] = nbatches
mpca_obj['nsamples'] = nsamples
mpca_obj['ninit'] = 0
mpca_obj['A'] = a
else:
mpca_obj=[]
return mpca_obj
def _mimic_monitoring(mmvm_obj_f,bdata,which_batch,*,zinit=False,shush=False,soft_sensor=False):
if (not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']==0) | \
((isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0):
print ('Model and data do not correspond')
else:
T_mon = []
ht2_mon = []
spe_mon = []
spei_mon = []
cont_spei = []
cont_spe = []
cont_ht2 = []
forecast = []
if 'Q' in mmvm_obj_f:
forecast_y=[]
this_batch=bdata[bdata[bdata.columns[0]]==which_batch]
if (bdata.columns[1]=='PHASE') or \
(bdata.columns[1]=='phase') or \
(bdata.columns[1]=='Phase'):
colnames=this_batch.columns[2:].tolist()
else:
colnames=this_batch.columns[1:].tolist()
ss_indx=[]
#Make soft_sensor var all missing data
if not(isinstance(soft_sensor,bool)):
if isinstance(soft_sensor,list):
for v in soft_sensor:
if not(v in colnames):
print('Soft sensor '+v+' is not a variable in this dataset')
else:
ss_indx.append(colnames.index(v))
elif isinstance(soft_sensor,str):
if not(soft_sensor in colnames):
print('Soft sensor '+soft_sensor+' is not a variable in this dataset')
else:
ss_indx=[colnames.index(soft_sensor)]
if not(isinstance(zinit,bool)):
this_z = zinit[zinit[zinit.columns[0]]==which_batch]
vals_z = this_z.values[:,1:]
cols_z = this_z.columns[1:]
vals_z = np.array(vals_z,dtype=np.float64)
vals_z = vals_z.reshape(-1)
if (bdata.columns[1]=='PHASE') or \
(bdata.columns[1]=='phase') or \
(bdata.columns[1]=='Phase'):
vals=this_batch.values[:,2:]
else:
vals=this_batch.values[:,1:]
vals=np.array(vals,dtype=np.float64)
if len(ss_indx)>0:
vals[:,ss_indx]=np.nan
if not(shush):
print('Running batch: '+which_batch)
if 'Q' in mmvm_obj_f:
forecast_y=[]
forecast_y_=[]
for k in np.arange(mmvm_obj_f['nsamples']):
x_uf_k = vals[:k+1,:].reshape(1,-1)
x_uf_k = x_uf_k[0].tolist()
num_nans = mmvm_obj_f['nvars']*mmvm_obj_f['nsamples']-len(x_uf_k)
x_uf_k.extend([np.nan]*num_nans )
x_uf_k = np.array(x_uf_k)
if 'Q' in mmvm_obj_f:
if not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0:
x_2_pred = np.hstack((vals_z,x_uf_k))
preds = phi.pls_pred(x_2_pred, mmvm_obj_f)
elif (isinstance(zinit,bool)) and mmvm_obj_f['ninit']==0:
preds = phi.pls_pred(x_uf_k, mmvm_obj_f)
else:
print('Model and data do not correspond')
preds = False
else:
preds = phi.pca_pred(x_uf_k, mmvm_obj_f)
T_mon.append(preds['Tnew'][0])
ht2_mon.append(preds['T2'][0])
#spe_mon.append(preds['speX'][0])
if 'Q' in mmvm_obj_f:
forecast_y_.append(preds['Yhat'][0].reshape(-1))
if not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0:
ninit=mmvm_obj_f['ninit']
preds_x = preds['Xhat'][0,ninit:]
preds_z = preds['Xhat'][0,:ninit]
aux_df=pd.DataFrame(preds_x.reshape(mmvm_obj_f['nsamples'],-1),columns=colnames)
forecast.append(aux_df)
inst_preds = preds_x.reshape(-1)
inst_preds = (inst_preds - mmvm_obj_f['mx'][ninit:] )/mmvm_obj_f['sx'][ninit:]
#mean center current sample and current prediction for instantaneous SPE
x_uf_k = (x_uf_k - mmvm_obj_f['mx'][ninit:] )/mmvm_obj_f['sx'][ninit:]
else:
aux_df=pd.DataFrame(preds['Xhat'].reshape(mmvm_obj_f['nsamples'],-1),columns=colnames)
forecast.append(aux_df)
inst_preds = preds['Xhat'].reshape(-1)
inst_preds = (inst_preds - mmvm_obj_f['mx'])/mmvm_obj_f['sx']
#mean center current sample and current prediction for instantaneous SPE
x_uf_k = (x_uf_k - mmvm_obj_f['mx'])/mmvm_obj_f['sx']
else:
aux_df=pd.DataFrame(preds['Xhat'].reshape(mmvm_obj_f['nsamples'],-1),columns=colnames)
forecast.append(aux_df)
inst_preds = preds['Xhat'].reshape(-1)
inst_preds = (inst_preds - mmvm_obj_f['mx'])/mmvm_obj_f['sx']
#mean center current sample and current prediction for instantaneous SPE
x_uf_k = (x_uf_k - mmvm_obj_f['mx'])/mmvm_obj_f['sx']
inst_preds[np.isnan(x_uf_k)] = 0
x_uf_k[np.isnan(x_uf_k)] = 0
if not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0:
cont_ht2_=np.zeros(len(x_2_pred))
else:
cont_ht2_=np.zeros(len(x_uf_k))
var_t=np.var(mmvm_obj_f['T'],ddof=1,axis=0)
for a in np.arange(mmvm_obj_f['A']):
if 'Q' in mmvm_obj_f:
if not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0:
x_2_pred_= (x_2_pred- mmvm_obj_f['mx'])/mmvm_obj_f['sx']
cont_ht2_+= (x_2_pred_ * mmvm_obj_f['Ws'][:,a])**2 / var_t[a]
else:
cont_ht2_+= (x_uf_k * mmvm_obj_f['Ws'][:,a])**2 / var_t[a]
else:
cont_ht2_+= (x_uf_k * mmvm_obj_f['P'][:,a])**2 / var_t[a]
if not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0:
conts_ht2_z = cont_ht2_[:ninit]
aux_df=pd.DataFrame(cont_ht2_[ninit:].reshape(mmvm_obj_f['nsamples'],-1),columns=colnames)
cont_ht2.append(aux_df)
else:
aux_df=pd.DataFrame(cont_ht2_.reshape(mmvm_obj_f['nsamples'],-1),columns=colnames)
cont_ht2.append(aux_df)
spe_ = (inst_preds - x_uf_k)**2
aux_df = pd.DataFrame(spe_.reshape(mmvm_obj_f['nsamples'],-1),columns=colnames)
cont_spe.append(aux_df)
spe_mon.append(np.sum(spe_))
inst_samp = x_uf_k[k*mmvm_obj_f['nvars']:(k+1)*mmvm_obj_f['nvars']]
inst_preds = inst_preds[k*mmvm_obj_f['nvars']:(k+1)*mmvm_obj_f['nvars']]
spei_ = (inst_preds - inst_samp)**2
cont_spei.append(spei_)
spei_mon.append(np.sum(spei_) )
diags={'Batch':which_batch,'t_mon':np.array(T_mon),'HT2_mon':np.array(ht2_mon),
'spe_mon':np.array(spe_mon).reshape(-1),'cont_spe':cont_spe,
'spei_mon':np.array(spei_mon),'cont_spei':pd.DataFrame(np.array(cont_spei),columns=colnames),
'cont_ht2':cont_ht2,'forecast':forecast}
if 'Q' in mmvm_obj_f:
forecast_y=pd.DataFrame(np.array(forecast_y_),columns=mmvm_obj_f['varidY'])
diags['forecast y']=forecast_y
if not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0:
vals_z = (vals_z - mmvm_obj_f['mx'][:ninit] )/mmvm_obj_f['sx'][:ninit]
preds_z_ = (preds_z - mmvm_obj_f['mx'][:ninit] )/mmvm_obj_f['sx'][:ninit]
spe_z = (vals_z - preds_z_)
cont_spe_z = spe_z**2
spe_z = np.sum(cont_spe_z)
cont_spe_z = pd.DataFrame(cont_spe_z,index=cols_z,columns=['Vars'])
conts_ht2_z =pd.DataFrame(conts_ht2_z,index=cols_z,columns=['Vars'])
diags['reconstructed z'] = preds_z
diags['spe z'] = spe_z
diags['cont_spe_z'] = cont_spe_z
diags['cont_ht2_z'] = conts_ht2_z
return diags
[docs]
def monitor(mmvm_obj,bdata,*,which_batch=False,zinit=False,build_ci=True,shush=False,soft_sensor=False):
''' Routine to mimic the real-time monitoring of a batch given a model
monitor(mmvm_obj,bdata,*,which_batch=False,zinit=False,build_ci=True,shush=False,soft_sensor=False):
usage: 1st you need to run: monitor(mmvm_obj,bdata)
to mimic monitoring for all bdata batches and build CI
these new parameters are written back to mmvm_obj
Then you can run:
diagnostics = monitor(mmvm_obj,bdata,which_batch=your_batchid)
to mimic monitoring for your_batchid and will produce all dynamic metrics
and forecasts
diagnostics = monitor(mmvm_obj,bdata,which_batch=your_batchid,zinit=your_z_data)
to mimic monitoring for your_batchid using initial conditions
will produce all dynamic metrics and forecasts
diagnostics = monitor(mmvm_obj,bdata,which_batch=your_batchid,soft_sensor=your_variable)
to mimic monitoring for your_batchid will produce all dynamic metrics
and forecasts and produce soft-sensor predictions for your_variable
Returns:
diagnostics:A dictionary with all the monitoring diagnostics and contributions
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
mmvm_obj_f = mmvm_obj.copy()
mmvm_obj_f = _uf_hor_mon_loadings(mmvm_obj_f)
mmvm_obj_f['mx'] = mmvm_obj_f['mx_ufm']
mmvm_obj_f['sx'] = mmvm_obj_f['sx_ufm']
mmvm_obj_f['P'] = mmvm_obj_f['P_ufm']
aux=bdata.drop_duplicates(subset=bdata.columns[0],keep='first')
unique_batches=aux[aux.columns[0]].values.tolist()
if 'Q' in mmvm_obj:
mmvm_obj_f['W'] = mmvm_obj_f['W_ufm']
mmvm_obj_f['Ws'] = mmvm_obj_f['Ws_ufm']
if isinstance(which_batch,bool) and build_ci:
if mmvm_obj['nbatches']==len(np.unique(bdata[bdata.columns[0]]) ):
SPE = []
SPEi = []
T = np.zeros((mmvm_obj['nsamples'],mmvm_obj['A'],mmvm_obj['nbatches']))
#build confidence intervals and update model
if not(shush):
print('Building real_time confidence intervals')
for i,b in enumerate(unique_batches):
diags = _mimic_monitoring(mmvm_obj_f,bdata,b,zinit=zinit,soft_sensor=soft_sensor)
SPE.append(diags['spe_mon'])
SPEi.append(diags['spei_mon'])
T[:,:,i]=diags['t_mon']
#calculate conf int for scores
t_mon_ci_95 = []
t_mon_ci_99 = []
for a in np.arange(mmvm_obj['A']):
t_rt_ci_95 = []
t_rt_ci_99 = []
t = T[:,a,:].T
for j in np.arange(mmvm_obj_f['nsamples']):
l95,l99 = phi.single_score_conf_int(t[:,[j]])
t_rt_ci_95.append(l95)
t_rt_ci_99.append(l99)
t_mon_ci_95.append(np.array(t_rt_ci_95))
t_mon_ci_99.append(np.array(t_rt_ci_99))
#calculate conf. int for spe and HT2
#HT2 limits
n = mmvm_obj['nbatches']
A= mmvm_obj['A']
ht2_mon_ci_99 = (((n-1)*(n+1)*A)/(n*(n-A)))*phi.f99(A,(n-A))
ht2_mon_ci_95 = (((n-1)*(n+1)*A)/(n*(n-A)))*phi.f95(A,(n-A))
spe_mon_ci_95 = []
spe_mon_ci_99 = []
spei_mon_ci_95 = []
spei_mon_ci_99 = []
SPE = np.array(SPE)
SPEi = np.array(SPEi)
for j in np.arange(mmvm_obj_f['nsamples']):
l95,l99=phi.spe_ci(SPE[:,[j]])
spe_mon_ci_95.append(l95)
spe_mon_ci_99.append(l99)
l95,l99=phi.spe_ci(SPEi[:,[j]])
spei_mon_ci_95.append(l95)
spei_mon_ci_99.append(l99)
mmvm_obj['t_mon_ci_95'] = t_mon_ci_95
mmvm_obj['t_mon_ci_99'] = t_mon_ci_99
mmvm_obj['ht2_mon_ci_99'] = ht2_mon_ci_99
mmvm_obj['ht2_mon_ci_95'] = ht2_mon_ci_95
mmvm_obj['spe_mon_ci_95'] = spe_mon_ci_95
mmvm_obj['spe_mon_ci_99'] = spe_mon_ci_99
mmvm_obj['spei_mon_ci_95'] = spei_mon_ci_95
mmvm_obj['spei_mon_ci_99'] = spei_mon_ci_99
if not(shush):
print('Done')
#return mmvm_obj
else:
print("This ain't the data this model was trained on")
else:
if not('t_mon_ci_95' in mmvm_obj):
print('No monitoring conf. int. have been calculated')
print('for this model object please run: "monitor(model_obj,training_data)" ')
has_ci=False
else:
has_ci=True
if isinstance(which_batch,str):
which_batch=[which_batch]
diags=[]
allok=True
for b in which_batch:
if b in bdata[bdata.columns[0]].values.tolist():
#run monitoring on a batch
diags_ =_mimic_monitoring(mmvm_obj_f,bdata,b,zinit=zinit,soft_sensor=soft_sensor)
diags.append(diags_)
else:
print('Batch not found in data set')
allok=False
if allok:
#plot scores
for a in np.arange(mmvm_obj['A']):
plt.figure()
for i,b in enumerate(which_batch):
x_axis=np.arange(len(diags[i]['t_mon'][:,[a]] ) )+1
plt.plot(x_axis,diags[i]['t_mon'][:,[a]],'o',label=b)
if has_ci:
plt.plot(x_axis,mmvm_obj['t_mon_ci_95'][a],'y',alpha=0.3)
plt.plot(x_axis,-mmvm_obj['t_mon_ci_95'][a],'y',alpha=0.3)
plt.plot(x_axis,mmvm_obj['t_mon_ci_99'][a],'r',alpha=0.3)
plt.plot(x_axis,-mmvm_obj['t_mon_ci_99'][a],'r',alpha=0.3)
plt.xlabel('sample')
plt.ylabel('$t_'+str(a+1)+'$')
plt.title('Real time monitoring: Score plot $t_'+str(a+1)+'$')
box = plt.gca().get_position()
plt.gca().set_position([box.x0, box.y0, box.width * 0.8, box.height])
plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
#plot ht2
plt.figure()
for i,b in enumerate(which_batch):
plt.plot(x_axis,diags[i]['HT2_mon'],'o',label=b)
xlim_=plt.xlim()
if has_ci:
plt.plot([0,xlim_[1]],[mmvm_obj['ht2_mon_ci_95'],mmvm_obj['ht2_mon_ci_95']],'y',alpha=0.3)
plt.plot([0,xlim_[1]],[mmvm_obj['ht2_mon_ci_99'],mmvm_obj['ht2_mon_ci_99']],'r',alpha=0.3)
plt.xlabel('sample')
plt.ylabel("Hotelling's $T^2$")
plt.title("Real time monitoring: Hotelling's $T^2$")
box = plt.gca().get_position()
plt.gca().set_position([box.x0, box.y0, box.width * 0.8, box.height])
plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
#plot spe
plt.figure()
for i,b in enumerate(which_batch):
plt.plot(x_axis,diags[i]['spe_mon'],'o',label=b)
xlim_=plt.xlim()
if has_ci:
plt.plot(x_axis,mmvm_obj['spe_mon_ci_95'],'y',alpha=0.3)
plt.plot(x_axis,mmvm_obj['spe_mon_ci_99'],'r',alpha=0.3)
plt.xlabel('sample')
plt.ylabel("Global SPE")
plt.title("Real time monitoring: Global SPE")
box = plt.gca().get_position()
plt.gca().set_position([box.x0, box.y0, box.width * 0.8, box.height])
plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
#plot spei
plt.figure()
for i,b in enumerate(which_batch):
plt.plot(x_axis,diags[i]['spei_mon'],'o',label=b)
xlim_=plt.xlim()
if has_ci:
plt.plot(x_axis,mmvm_obj['spei_mon_ci_95'],'y',alpha=0.3)
plt.plot(x_axis,mmvm_obj['spei_mon_ci_99'],'r',alpha=0.3)
plt.xlabel('sample')
plt.ylabel("Instantaneous SPE")
plt.title("Real time monitoring: Instantaneous SPE")
box = plt.gca().get_position()
plt.gca().set_position([box.x0, box.y0, box.width * 0.8, box.height])
plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
if 'Q' in mmvm_obj:
for v in diags[0]['forecast y'].columns:
plt.figure()
for i,b in enumerate(which_batch):
plt.plot(x_axis,diags[i]['forecast y'][v],'o',label=b)
plt.xlabel('sample')
plt.ylabel(v)
plt.title('Dynamic forecast of Y')
if len(diags)==1:
diags=diags[0]
return diags
else:
return 'error batch not found'
[docs]
def mpls(xbatch,y,a,*,zinit=False,phase_samples=False,mb_each_var=False,cross_val=0,cross_val_X=False):
''' Multi-way PLS for batch analysis
mpls_obj = mpls(xbatch,y,a,*,zinit=False,phase_samples=False,mb_each_var=False,cross_val=0,cross_val_X=False):
Args:
xbatch: Pandas dataframe with aligned batch data it is assumed
that all batches have the same number of samples
y : Response to predict, one row per batch
a: Number of PC's to fit
zinit: Initial conditions <optional>
phase_samples: alignment information
mb_each_var: if "True" will make each variable measured a block
otherwise zinit is one block and xbatch another
Returns:
mpls_obj: A dictionary with all the parameters of the MPLS model
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
if (xbatch.columns[1]=='PHASE') or \
(xbatch.columns[1]=='phase') or \
(xbatch.columns[1]=='Phase'):
nvars = xbatch.shape[1]-2
else:
nvars = xbatch.shape[1]-1
nbatches = len(np.unique(xbatch[xbatch.columns[0]]))
nsamples = xbatch.shape[0]/nbatches
# remove low variance columns keeping record of the original order
x_uf_,colnames,bid_o = unfold_horizontal(xbatch) # colnames is original set of columns
x_uf,colsrem = phi.clean_low_variances(x_uf_,shush=True) # colsrem are columns removed
mx_rem=x_uf_[colsrem].mean().tolist()
mx_rem=np.array(mx_rem)
aux = np.array([colnames,bid_o])
col_names_bid_pd = pd.DataFrame(aux.T,columns=['col name','bid'])
col_names_bid_pd_ = col_names_bid_pd[col_names_bid_pd['col name'].isin(x_uf.columns[1:].tolist())]
#bid = col_names_bid_pd_['bid'].values # bid is vector indicating to what variable each col belongs
# # useful in figuring out the blocks
aux=col_names_bid_pd_.drop_duplicates(subset='bid',keep='first')
unique_bid=aux['bid'].values.tolist()
if not(isinstance(zinit,bool)):
zinit,rc=phi.clean_low_variances(zinit)
zcols=zinit.columns[1:].tolist()
XMB={'Initial Conditions':zinit}
else:
XMB=dict()
if mb_each_var:
for v in unique_bid:
these_cols=[x_uf.columns[0]]
these_cols.extend(col_names_bid_pd_['col name'][col_names_bid_pd_['bid']==v].values.tolist())
varblock=x_uf[these_cols]
XMB[v]=varblock
else:
if not(isinstance(zinit,bool)):
XMB['Trajectories']=x_uf
else:
XMB = x_uf
if not(isinstance(XMB,dict)):
mpls_obj=phi.pls(XMB,y,a,cross_val=cross_val,cross_val_X=cross_val_X,force_nipals=True)
yhat=phi.pls_pred(XMB,mpls_obj)
yhat=yhat['Yhat']
else:
mpls_obj=phi.mbpls(XMB,y,a,cross_val_=cross_val,cross_val_X_=cross_val_X,force_nipals_=True)
yhat=phi.pls_pred(XMB,mpls_obj)
yhat=yhat['Yhat']
if len(colsrem)>0:
if not(isinstance(zinit,bool)):
ninit_vars=zinit.shape[1]-1
z_sx = mpls_obj['sx'][np.arange(ninit_vars)].reshape(-1)
z_mx = mpls_obj['mx'][np.arange(ninit_vars)].reshape(-1)
z_ws = mpls_obj['Ws'][np.arange(ninit_vars),:]
z_w = mpls_obj['W'][np.arange(ninit_vars),:]
z_p = mpls_obj['P'][np.arange(ninit_vars),:]
z_r2pv = mpls_obj['r2xpv'][np.arange(ninit_vars),:]
xc_ = np.arange(ninit_vars,x_uf.shape[1]+ninit_vars-1 )
xuf_sx = mpls_obj['sx'][xc_]
xuf_mx = mpls_obj['mx'][xc_]
xuf_ws = mpls_obj['Ws'][xc_,:]
xuf_w = mpls_obj['W'][xc_,:]
xuf_p = mpls_obj['P'][xc_,:]
xuf_r2pv = mpls_obj['r2xpv'][xc_,:]
else:
xuf_sx = mpls_obj['sx']
xuf_mx = mpls_obj['mx']
xuf_ws = mpls_obj['Ws']
xuf_w = mpls_obj['W']
xuf_p = mpls_obj['P']
xuf_r2pv = mpls_obj['r2xpv']
#add removed columns with mean of zero and stdev =1 and loadings = 0
xtra_col = np.zeros((2+4*a,1 ))
xtra_col[0] = 1
xtra_cols = np.tile(xtra_col,(1,len(colsrem)))
xtra_cols[1,:] = mx_rem
aux = np.vstack((xuf_sx,xuf_mx,xuf_ws.T,xuf_w.T,xuf_p.T,xuf_r2pv.T))
aux = np.hstack((aux,xtra_cols))
all_cols = x_uf.columns[1:].tolist()
all_cols.extend(colsrem)
aux_pd = pd.DataFrame(aux,columns=all_cols)
aux_pd = aux_pd[colnames]
aux_new = aux_pd.values
if not(isinstance(zinit,bool)):
sx_ = np.hstack((z_sx,aux_new[0,:].reshape(-1)))
mpls_obj['sx'] = sx_
mx_ = np.hstack((z_mx,aux_new[1,:].reshape(-1)))
mpls_obj['mx'] = mx_
aux_new = aux_new[2:,:]
ws_ = aux_new[0:a,:]
mpls_obj['Ws'] = np.vstack((z_ws,ws_.T))
aux_new = aux_new[a:,:]
w_ = aux_new[0:a,:]
mpls_obj['W'] = np.vstack((z_w,w_.T))
aux_new = aux_new[a:,:]
p_ = aux_new[0:a,:]
mpls_obj['P'] = np.vstack((z_p,p_.T))
aux_new = aux_new[a:,:]
r2xpv_ = aux_new[0:a,:]
mpls_obj['r2xpv'] = np.vstack((z_r2pv,r2xpv_.T))
zcols.extend(colnames)
colnames=zcols
else:
sx_ = aux_new[0,:].reshape(1,-1)
mpls_obj['sx'] = sx_
mx_ = aux_new[1,:].reshape(1,-1)
mpls_obj['mx'] = mx_
aux_new = aux_new[2:,:]
ws_ = aux_new[0:a,:]
mpls_obj['Ws'] = ws_.T
aux_new = aux_new[a:,:]
w_ = aux_new[0:a,:]
mpls_obj['W'] = w_.T
aux_new = aux_new[a:,:]
p_ = aux_new[0:a,:]
mpls_obj['P'] = p_.T
aux_new = aux_new[a:,:]
r2xpv_ = aux_new[0:a,:]
mpls_obj['r2xpv'] = r2xpv_.T
mpls_obj['Yhat'] = yhat
mpls_obj['varidX'] = colnames
mpls_obj['bid'] = bid_o
mpls_obj['uf'] ='batch wise'
mpls_obj['nvars'] = int(nvars)
mpls_obj['nbatches'] = int(nbatches)
mpls_obj['nsamples'] = int(nsamples)
mpls_obj['A'] = a
mpls_obj['phase_samples'] = phase_samples
mpls_obj['mb_each_var'] = mb_each_var
if not(isinstance(zinit,bool)):
mpls_obj['ninit']=int(zinit.shape[1]-1)
else:
mpls_obj['ninit']=0
return mpls_obj
[docs]
def find(a, func):
return [i for (i, val) in enumerate(a) if func(val)]
[docs]
def clean_empty_rows(X,*,shush=False):
''' Cleans empty rows in batch data
Input:
X: Batch data to be cleaned of empty rows (all np.nan) DATAFRAME
Output:
X: Batch data Without observations removed
'''
if (X.columns[1]=='PHASE') or \
(X.columns[1]=='phase') or \
(X.columns[1]=='Phase'):
X_ = np.array(X.values[:,2:]).astype(float)
ObsID_ = X.values[:,0].astype(str)
ObsID_ = ObsID_.tolist()
else:
X_ = np.array(X.values[:,1:]).astype(float)
ObsID_ = X.values[:,0].astype(str)
ObsID_ = ObsID_.tolist()
#find rows with all data missing
X_nan_map = np.isnan(X_)
Xmiss = X_nan_map*1
Xmiss = np.sum(Xmiss,axis=1)
indx = find(Xmiss, lambda x: x==X_.shape[1])
if len(indx)>0:
for i in indx:
if not(shush):
print('Removing row from ', ObsID_[i], ' due to 100% missing data')
X_=X.drop(X.index.values[indx].tolist())
return X_
else:
return X
[docs]
def phase_sampling_dist(bdata,time_column=False,addtitle=False,use_phases=False):
''' Count and plot a histogram of the distribution of samples (or time if time_column is indicated)
consumed per phase on a batch dataset
phase_sampling_dist(bdata,time_column=False,addtitle=False,use_phases=False)
Args:
bdata: Batch data organized as: column[0] = Batch Identifier column name is unrestricted
column[1] = Phase information per sample must be called 'Phase','phase', or 'PHASE'
this information is optional
column[2:]= Variables measured throughout the batch
time_column: Indicates the name of the column with time, if not sent, counting is done in terms samples
add_title: Optional text to be placed as the figure title
use_phases: In case the user wants to only do counting for a subset of phases
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
data={}
if bdata.columns[1]=='phase' or bdata.columns[1]=='Phase' or bdata.columns[1]=='PHASE':
bids = unique(bdata,bdata.columns[0])
if isinstance(use_phases,bool):
phases = unique(bdata,bdata.columns[1])
else:
phases = use_phases
#samps_per_phase=[]
fig,ax=plt.subplots(1,len(phases)+1)
for i,p in enumerate(phases):
totsamps=[]
samps_=[]
samps_ind={}
for b in bids:
bdat= bdata[ (bdata[bdata.columns[1]]==p) & (bdata[bdata.columns[0]]==b)]
if len(bdat)==0:
samps_.append(0)
samps_ind[b]=0
elif not(isinstance(time_column,bool)):
samps_.append(bdat[time_column].values[-1]-bdat[time_column].values[0] )
totsamps.append(bdata[time_column][(bdata[bdata.columns[0]]==b)].values[-1])
samps_ind[b]=bdat[time_column].values[-1]-bdat[time_column].values[0]
else:
samps_.append(len(bdat))
totsamps.append(len(bdata[(bdata[bdata.columns[0]]==b)]))
samps_ind[b]=len(bdat)
ax[i].hist(samps_)
data[p]=samps_ind
if not(isinstance(time_column,bool)):
ax[i].set_xlabel(time_column)
else:
ax[i].set_xlabel('# Samples')
ax[i].set_ylabel('Count')
ax[i].set_title(p)
ax[-1].hist(totsamps)
if not(isinstance(time_column,bool)):
ax[-1].set_xlabel(time_column)
else:
ax[-1].set_xlabel('# Samples')
ax[-1].set_ylabel('Count')
ax[-1].set_title('Total')
if not(isinstance(addtitle,bool)):
fig.suptitle(addtitle)
fig.tight_layout()
return data
else:
print('Data is missing phase information or phase column is nor properly labeled')
[docs]
def predict(xbatch,mmvm_obj,*,zinit=False):
''' Generate predictions for a Multi-way PCA/PLS model
predictions = predict(xbatch,mmvm_obj,*,zinit=False)
Args:
xbatch: Batch data with same variables and alignment as model will generate predictions for all batches
mmvm_obj: Multi-way PLS or PCA
zinit: Initial conditions [if any]
Returns:
preds: A dictionary with keys ['Yhat', 'Xhat', 'Tnew', 'speX', 'T2']
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
if 'Q' in mmvm_obj:
x_uf,colnames,bid_o = unfold_horizontal(xbatch) # colnames is original set of columns
aux = np.array([colnames,bid_o])
col_names_bid_pd = pd.DataFrame(aux.T,columns=['col name','bid'])
#bid = col_names_bid_pd['bid'].values # bid is vector indicating to what variable each col belongs
# # useful in figuring out the blocks
aux=col_names_bid_pd.drop_duplicates(subset='bid',keep='first')
unique_bid=aux['bid'].values.tolist()
if not(isinstance(zinit,bool)):
XMB={'Initial Conditions':zinit}
else:
XMB=dict()
if mmvm_obj['mb_each_var']:
for v in unique_bid:
these_cols=[x_uf.columns[0]]
these_cols.extend(col_names_bid_pd['col name'][col_names_bid_pd['bid']==v].values.tolist())
varblock=x_uf[these_cols]
XMB[v]=varblock
else:
if not(isinstance(zinit,bool)):
XMB['Trajectories']=x_uf
else:
XMB = x_uf
pred=phi.pls_pred(XMB,mmvm_obj)
if not(isinstance(zinit,bool)):
Zhat=pred['Xhat'][:,:mmvm_obj['ninit']]
Xhat=pred['Xhat'][:,mmvm_obj['ninit']:]
Xb=refold_horizontal(Xhat,mmvm_obj['nvars'],mmvm_obj['nsamples'] )
if (xbatch.columns[1]=='PHASE') or \
(xbatch.columns[1]=='phase') or \
(xbatch.columns[1]=='Phase'):
Xb=pd.DataFrame(Xb,columns=xbatch.columns[2:])
Xb.insert(0,xbatch.columns[1],xbatch[xbatch.columns[1]].values)
Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values)
else:
Xb=pd.DataFrame(Xb,columns=xbatch.columns[1:])
Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values)
pred['Xhat']=Xb
Zhat_df=pd.DataFrame(Zhat,columns=zinit.columns[1:].tolist())
Zhat_df.insert(0,zinit.columns[0],zinit[zinit.columns[0]].values.astype(str).tolist())
pred['Zhat']=Zhat_df
test=xbatch.drop_duplicates(subset=xbatch.columns[0],keep='first')
Y_df=pd.DataFrame(pred['Yhat'],columns=mmvm_obj['varidY'])
Y_df.insert(0,test.columns[0],test[test.columns[0]].values.astype(str).tolist())
pred['Yhat']=Y_df
else:
Xb=refold_horizontal(pred['Xhat'],mmvm_obj['nvars'],mmvm_obj['nsamples'] )
if (xbatch.columns[1]=='PHASE') or \
(xbatch.columns[1]=='phase') or \
(xbatch.columns[1]=='Phase'):
Xb=pd.DataFrame(Xb,columns=xbatch.columns[2:])
Xb.insert(0,xbatch.columns[1],xbatch[xbatch.columns[1]].values)
Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values)
else:
Xb=pd.DataFrame(Xb,columns=xbatch.columns[1:])
Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values)
pred['Xhat']=Xb
test=xbatch.drop_duplicates(subset=xbatch.columns[0],keep='first')
Y_df=pd.DataFrame(pred['Yhat'],columns=mmvm_obj['varidY'])
Y_df.insert(0,test.columns[0],test[test.columns[0]].values.astype(str).tolist())
pred['Yhat']=Y_df
else:
if mmvm_obj['uf'] =='batch wise':
x_uf,colnames,bid_o = unfold_horizontal(xbatch)
pred=phi.pca_pred(x_uf,mmvm_obj)
Xb=refold_horizontal(pred['Xhat'],mmvm_obj['nvars'],mmvm_obj['nsamples'] )
if (xbatch.columns[1]=='PHASE') or \
(xbatch.columns[1]=='phase') or \
(xbatch.columns[1]=='Phase'):
Xb=pd.DataFrame(Xb,columns=xbatch.columns[2:])
Xb.insert(0,xbatch.columns[1],xbatch[xbatch.columns[1]].values)
Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values)
else:
Xb=pd.DataFrame(Xb,columns=xbatch.columns[1:])
Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values)
pred['Xhat']=Xb
if mmvm_obj['uf'] =='variable wise':
if (xbatch.columns[1]=='PHASE') or \
(xbatch.columns[1]=='phase') or \
(xbatch.columns[1]=='Phase'):
xbatch_=xbatch.copy()
xbatch_.drop(xbatch.columns[1],axis=1,inplace=True)
else:
xbatch_=xbatch.copy()
pred=phi.pca_pred(xbatch_,mmvm_obj)
Xb=pred['Xhat']
if (xbatch.columns[1]=='PHASE') or \
(xbatch.columns[1]=='phase') or \
(xbatch.columns[1]=='Phase'):
Xb=pd.DataFrame(Xb,columns=xbatch.columns[2:])
Xb.insert(0,xbatch.columns[1],xbatch[xbatch.columns[1]].values)
Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values)
else:
Xb=pd.DataFrame(Xb,columns=xbatch.columns[1:])
Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values)
pred['Xhat']=Xb
return pred
def _plot_contribs(bdata,ylims,*,var_list=False,phase_samples=False,alpha_=0.2,plot_title_=''):
'''
Internal routine please use : contributions
'''
if (bdata.columns[1]=='PHASE') or \
(bdata.columns[1]=='phase') or \
(bdata.columns[1]=='Phase'):
if isinstance(var_list,bool):
var_list=bdata.columns[2:].tolist()
else:
if isinstance(var_list,bool):
var_list=bdata.columns[1:].tolist()
if isinstance(var_list,str) and not(isinstance(var_list,list)):
var_list=[var_list]
for v in var_list:
plt.figure()
dat=bdata[[bdata.columns[0],v]]
for b in np.unique(dat[bdata.columns[0]]):
data_=dat[v][dat[dat.columns[0]]==b]
plt.fill_between(np.arange(len(data_.values))+1,data_.values)
plt.xlabel('Sample')
plt.ylabel(v)
if not(isinstance(phase_samples,bool)):
s_txt=1
s_lin=1
plt.axvline(x=1,color='magenta',alpha=alpha_)
for p in phase_samples.keys():
if isinstance(phase_samples[p],list):
s_lin+=phase_samples[p][1]
else:
s_lin+=phase_samples[p]
plt.axvline(x=s_lin,color='magenta',alpha=alpha_)
ylim_=plt.ylim()
plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='magenta')
if isinstance(phase_samples[p],list):
s_txt+=phase_samples[p][1]
else:
s_txt+=phase_samples[p]
plt.ylim(ylims)
plt.title(plot_title_)
plt.tight_layout()
[docs]
def contributions (mmvmobj,X,cont_type,*,to_obs=False,from_obs=False,
lv_space=False,phase_samples=False,dyn_conts=False,which_var=False,
plot_title=''):
''' Plot batch contribution plots to Scores, HT2 or SPE
contributions (mmvmobj,X,cont_type,*,to_obs=False,from_obs=False,
lv_space=False,phase_samples=False,dyn_conts=False,which_var=False,
plot_title='')
Args:
mmvmobj= Multiway Model
X = batch data
cont_type = 'scores' | 'ht2' | 'spe'
to_obs = Observation to calculate contributions to
from_obs = Relative basis to calculate contributions to [for 'scores' and 'ht2' only]
if not sent the origin of the model us used as the base.
Returns:
contribution_vector
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
#translate from batch numbers to indexes
#Check logic and warn user
if (X.columns[1]=='PHASE') or \
(X.columns[1]=='phase') or \
(X.columns[1]=='Phase'):
bvars = X.columns[2:]
else:
bvars = X.columns[1:]
allok=True
if isinstance(to_obs,bool):
print('Contribution calculations need a "to_obs" argument at minimum')
allok=False
if cont_type=='spe' and not(isinstance(from_obs,bool)):
print('For SPE contributions the "from_obs" argument is ignored')
if (isinstance(to_obs,list) and len(to_obs)>1) or (isinstance(from_obs,list) and len(from_obs)>1):
print('Contributions for groups of observations will be averaged')
if (cont_type=='scores') and isinstance(lv_space,bool):
print('No dimensions specified, doing contributions across all dimensions')
if allok:
#find indexes to batch names
Xuf,var1,var2=unfold_horizontal(X)
bnames=Xuf[Xuf.columns[0]].values.tolist()
to_o=[bnames.index(to_obs[i]) for i in np.arange(len(to_obs))]
if (cont_type=='scores') or (cont_type=='ht2'):
if not(isinstance(from_obs,bool)):
from_o = [bnames.index(from_obs[i]) for i in np.arange(len(from_obs))]
else:
from_o =False
cont_vec=phi.contributions(mmvmobj,Xuf,cont_type,Y=False,from_obs=from_o,to_obs=to_o,lv_space=lv_space)
elif cont_type=='spe':
cont_vec=phi.contributions(mmvmobj,Xuf,cont_type,Y=False,from_obs=False,to_obs=to_o)
ymin=cont_vec.min()
ymax=cont_vec.max()
#Plot contributions
if dyn_conts:
batch_contribs=refold_horizontal(cont_vec, mmvmobj['nvars'],mmvmobj['nsamples'])
batch_contribs = pd.DataFrame(batch_contribs,columns=bvars)
oids=['Batch Contributions']*batch_contribs.shape[0]
batch_contribs.insert(0,'BatchID',oids)
_plot_contribs(batch_contribs,(ymin, ymax),phase_samples=phase_samples,var_list=which_var,plot_title_=plot_title)
batch_contribs=refold_horizontal(abs(cont_vec), mmvmobj['nvars'],mmvmobj['nsamples'])
batch_contribs=np.sum(batch_contribs,axis=0)
plt.figure()
plt.bar(bvars, batch_contribs)
plt.xticks(rotation=75)
plt.ylabel(r'$\Sigma$ (|Contribution to '+cont_type+'| )' )
plt.title(plot_title)
plt.tight_layout()
[docs]
def build_rel_time(bdata,*,time_unit='min'):
''' Converts the column 'Timestamp' into 'Time' in time_units relative to
the start of each batch
bdata_new = build_rel_time(bdata,*,time_unit='min')
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
aux=bdata.drop_duplicates(subset=bdata.columns[0],keep='first')
unique_batches=aux[aux.columns[0]].values.tolist()
bdata_n=[]
for b in unique_batches:
this_batch=bdata[bdata[bdata.columns[0]]==b]
ts=this_batch['Timestamp'].values
deltat=ts-ts[0]
tim=[np.timedelta64(deltat[i],'s').astype(float) for i in np.arange(len(deltat))]
tim=np.array(tim)
if time_unit == 'min':
tim=tim/60
if time_unit == 'hr':
tim=tim/3600
cols=this_batch.columns.tolist()
this_batch.insert(cols.index('Timestamp'),'Time ('+time_unit+')' ,tim)
this_batch=this_batch.drop('Timestamp',axis=1)
bdata_n.append(this_batch)
bdata_n = pd.concat(bdata_n,axis=0)
return bdata_n
[docs]
def descriptors(bdata,which_var,desc,*,phase=False):
'''Get descriptor values for a batch trajectory
descriptors_df = descriptors(bdata,which_var,desc,*,phase=False)
Args:
bdata: Dataframe of batch data, first column is batch ID, second column can be phase id
which_var: List of variables to get descriptors for
desc: List of descriptors to calculate, options are:
'min'
'max'
'mean'
'median'
'std'
'var'
'range'
'ave_slope'
phase: to specify what phases to do this for
Returns:
descriptors: A dataframe with the descriptors per batch
by Salvador Garcia Munoz
sgarciam@imperial.ac.uk salvadorgarciamunoz@gmail.com
'''
has_phase=False
if ((bdata.columns[1]=='Phase') or
(bdata.columns[1]=='phase') or
(bdata.columns[1]=='PHASE')):
has_phase=True
phase_col=bdata.columns[1]
if not(has_phase) and not(isinstance(phase,bool)):
print('Cannot process phase flag without phase id in data')
phase=False
desc_val=[]
desc_lbl=[]
for i,b in enumerate(unique(bdata,bdata.columns[0])):
this_batch=bdata[bdata[bdata.columns[0]]==b]
desc_val_=[]
for v in which_var:
if not(isinstance(phase,bool)): # by phase
for p in phase:
values=this_batch[v][this_batch[phase_col]==p].values.astype(float)
values=values[~(np.isnan(values))]
for d in desc:
if d=='min':
desc_val_.append(values.min())
if d=='max':
desc_val_.append(values.max())
if d=='mean':
desc_val_.append(values.mean())
if d=='median':
desc_val_.append(np.median(values))
if d=='std':
desc_val_.append(np.std(values,ddof=1))
if d=='var':
desc_val_.append(np.var(values,ddof=1))
if d=='range':
desc_val_.append(values.max()-values.min() )
if d=='ave_slope':
x=np.arange(0,len(values))
m=(1/sum(x**2))*sum(x*(values-values[0]))
desc_val_.append(m)
if i==0:
desc_lbl.append(v +'_'+ p +'_'+ d)
else:
values=this_batch[v].values.astype(float)
values=values[~(np.isnan(values))]
for d in desc:
if d=='min':
desc_val_.append(values.min())
if d=='max':
desc_val_.append(values.max())
if d=='mean':
desc_val_.append(values.mean())
if d=='median':
desc_val_.append(np.median(values))
if d=='std':
desc_val_.append(np.std(values,ddof=1))
if d=='var':
desc_val_.append(np.var(values,ddof=1))
if d=='range':
desc_val_.append(values.max()-values.min() )
if d=='ave-slope':
x=np.arange(0,len(values))
m=(1/sum(x**2))*sum(x*(values-values[0]))
desc_val.apend(m)
if i==0:
desc_lbl.append(v +'_' + d)
desc_val.append(desc_val_)
bnames = unique(bdata,bdata.columns[0])
desc_val=np.array(desc_val)
descriptors_df=pd.DataFrame(desc_val,columns=desc_lbl )
descriptors_df.insert(0,bdata.columns[0],bnames )
return descriptors_df