Source code for toto.filters.lanczos_filter
""" Apply a low pass 1st or 2nd order lanczos filter
Parameters
~~~~~~~~~~
input_array : panda obj
The input data.
window : int
window in hour, a good window is 40 h window of hourly data
type : str
Can be `lanczos lowpas 1st order`, `lanczos lowpas 2nd order` depending on the order.
Examples:
~~~~~~~~~
>>> df['filtered']=lanczos_filter.lanczos_filter(df['signal'].copy(),args={'window':100,'Type':'lanczos lowpas 1st order'})
>>>
"""
from scipy.signal import butter, filtfilt, detrend
from toto.core.toolbox import lanc
import numpy as np
[docs]def lanczos_filter(input_array,args={'window':int(),\
'type':{'lanczos lowpas 1st order':True,
'lanczos lowpas 2nd order':False}
}):
window=args['window']
filter_type=args.get('type','lanczos lowpas 1st order')
mean = input_array.mean()
delt=(input_array.index[1]-input_array.index[0]).total_seconds()/3600 # in hours
if filter_type == 'lanczos lowpas 1st order':
input_array= lanczos_lowpass_first_order(input_array - mean, window, dt=delt) + mean
elif filter_type == 'lanczos lowpas 2nd order':
input_array= lanczos_lowpass_second_order(input_array - mean, window, dt=delt) + mean
return input_array
[docs]def lanczos_lowpass_second_order(data, window, dt=1):
"""
Inpulse response filter
"""
fs = (2*np.pi) / (dt*3600)
nyq = 0.5 * fs
C = 0.802
window = int( window / dt )
highcut = (2*np.pi) / (window*3600)
high = (highcut / nyq) #/ C # to prevent phase lag
m = 100 # Rule of thumb is 100 point for a 40 h window of hourly data
coefs=lanczos_lowpass_filter_coeffs(high, m)#window)
d2 = filtfilt(coefs,[1.0],data,axis=0)#,padtype=padtype,padlen=padlen)
#d2 = np.convolve(data, coefs, 'same')
#if(len(idx)>0):
# d2[result_nan_idx]=nan
## replace edge points with nan if pading is not used
#if (padtype is None) and (fill_edge_nan==True):
d2[:m] = np.nan
d2[-m:] = np.nan
return d2
[docs]def lanczos_lowpass_first_order(data, window, dt=1):
freq = 1./window # Hours
window = int( window / dt )
pad = np.zeros(window) * np.NaN
wt = lanc(window, freq)
wt = lanc(5*window, freq)
return np.convolve(wt, data, mode='same')
[docs]def lanczos_lowpass_filter_coeffs(cf,m,normalize=True):
"""return the convolution coefficients for low pass lanczos filter.
Parameters
~~~~~~~~~~
Cf: float
Cutoff frequency expressed as a ratio of a Nyquist frequency.
M: int
Size of filtering window size.
Returns
~~~~~~~
Results: list
Coefficients of filtering window.
"""
coscoef=[cf*np.sin(np.pi*k*cf)/(np.pi*k*cf) for k in range(1,m+1,1)]
sigma=[np.sin(np.pi*k/m)/(np.pi*k/m) for k in range(1,m+1,1)]
prod= [c*s for c,s in zip(coscoef,sigma)]
temp = prod[-1::-1]+[cf]+prod
res=np.array(temp)
if normalize:
res = res/res.sum()
return res