Source code for pyValEIA.utils.filters

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# DISTRIBUTION STATEMENT A: Approved for public release. Distribution is
# unlimited.
# ----------------------------------------------------------------------------
"""Functions to filter data."""

import numpy as np


[docs] def simple_barrel_roll(xvar, yvar, barrel_radius, envelope=True, envelope_lower=0.6, envelope_upper=0.2): """Roll barrel over data to detrended over large decreases in value. Parameters ---------- xvar: array-like Independent data variable along which `yvar` will be smoothed. yvar: array-like Dependent data variable of the same size as `xvar`; needs to be scaled such that its magnitude is similar to `yvar` barrel_radius : double Radius of the 'barrel' rolling over the data in the same units as `xvar` envelope : bool If True, the barrel roll results will be used as constraints for the original `yvar` values (default=True) envelope_lower : float Fraction starting from zero to multiply the minimum by, used to calculate the lower limit of the barrel roll envelope; a larger value creates a larger lower envelope (default=0.6) envelope_upper : float Fraction starting from zero to multiply the maximum by, used to calculate the upper limit of the barrel roll envelope; a larger value creates a larger upper envelope (default=0.2) Returns ------- yvar_det : array-like Detrended values for the dependent variable, `yvar` """ # Initialize the x and y values for forward rolling strt_con_y = yvar[0] strt_con_x = xvar[0] # Initialize the list of contact points for forward rolling f_con_xs = [] f_con_ys = [] # Cycle through the data looking for contact points j = 0 while j < len(yvar) - 1: # Forward Rolling only f_con_xs.append(strt_con_x) f_con_ys.append(strt_con_y) # Get the regions of interest (within barrel view). For forward rolling, # this is greater than the start time and less than the barrel diameter # (in the same units as `xvar`) x_roi = xvar[(xvar > strt_con_x) & (xvar <= strt_con_x + 2 * barrel_radius)] y_roi = yvar[(xvar > strt_con_x) & (xvar <= strt_con_x + 2 * barrel_radius)] # Calcualte angular distance delta for each delta = beta - theta for # each x-value region of interest deltas = [] for i, xr_val in enumerate(x_roi): # Calculate the difference between each data value and the start del_x = xr_val - strt_con_x del_y = y_roi[i] - strt_con_y # Calculate the rolling angles theta = np.arctan(del_y / del_x) if 2 * barrel_radius >= np.sqrt((del_x) ** 2 + (del_y) ** 2): beta = np.arcsin(np.sqrt((del_x) ** 2 + (del_y) ** 2) / (2 * barrel_radius)) else: beta = np.pi / 2.0 delta = beta - theta deltas.append(delta * 180 / np.pi) # Move the barrel forward along the x-axis if len(x_roi) != 0: # Identify the new forward contacts strt_con_y = y_roi[deltas.index(min(deltas))] # minimum delta strt_con_x = x_roi[deltas.index(min(deltas))] j = np.where(strt_con_x == xvar)[0] # update j for while loop else: # Append last value if there is no region of interest strt_con_y = yvar[len(yvar) - 1] strt_con_x = xvar[len(yvar) - 1] j = len(yvar) # update j for while loop # The contact points identify gaps or depletions in the data. Filter these # out using linear interpolation between the contact points int_y = np.interp(np.setdiff1d(xvar, f_con_xs), f_con_xs, f_con_ys) # Combine the contact points and the interpolated data x_combined = np.concatenate((f_con_xs, np.setdiff1d(xvar, f_con_xs))) y_combined = np.concatenate((f_con_ys, int_y)) # Sort combined data by x values sorted_indices = np.argsort(x_combined) x_combined = x_combined[sorted_indices] y_combined = y_combined[sorted_indices] # If desired, replace data from inside the barrel envelope with original # y-values if envelope: # Set the upper and lower limits of the envelope brc_upper = y_combined + envelope_upper * max(y_combined) brc_lower = y_combined - envelope_lower * min(y_combined) # Initialize the output array yvar_det = np.full(shape=y_combined.shape, fill_value=np.nan) # Retrieve all the original values from between the upper and lower # barrel roll limits yvar_det[(yvar < brc_upper) & (yvar > brc_lower)] = yvar[ ((yvar < brc_upper) & (yvar > brc_lower))] # Check to see if there are NaN values left in the output nan_mask = np.isnan(yvar_det) if sum(nan_mask): yvar_det[nan_mask] = np.interp(xvar[nan_mask], xvar[~nan_mask], yvar_det[~nan_mask]) else: # If no envelope is desired, the output is the combined results yvar_det = np.array(y_combined) return yvar_det
[docs] def rolling_nanmeasure(arr, window, measure='mean'): """Calculate the rolling mean or median of an array with or without nans. Parameters ---------- arr: array-like array of values to roll over window : int window size measure : str Method to apply to data; 'mean', 'median', 'average' (default='mean') Returns ------- out : array-like rolling measured array of same length as original """ # Initialize array of same length as input out = np.full_like(arr, np.nan, dtype=float) half_w = window // 2 # Iterate through array for i in range(len(arr)): left = max(0, i - half_w) right = min(len(arr), i + half_w + 1) window_vals = arr[left:right] # Choose between mean/average and median if measure.lower() in ['mean', 'average']: if np.all(np.isnan(window_vals)): out[i] = np.nan else: out[i] = np.nanmean(window_vals) elif measure.lower() == 'median': if np.all(np.isnan(window_vals)): out[i] = np.nan else: out[i] = np.nanmedian(window_vals) else: raise ValueError('unknown method for smoothing: {:}'.format( measure)) return out
[docs] def find_nan_ranges(arr): """Identify continuous ranges of NaN values in an array. Parameters ---------- arr : array-like array with nans Returns ------- nan_list : list-like List of (start_idx, end_idx) for each contiguous NaN section """ # Get continuous ranges of nan values isnan = np.isnan(arr) edges = np.diff(isnan.astype(int)) starts = np.where(edges == 1)[0] + 1 ends = np.where(edges == -1)[0] + 1 if isnan[0]: starts = np.insert(starts, 0, 0) if isnan[-1]: ends = np.append(ends, len(arr)) nan_list = list(zip(starts, ends)) return nan_list
[docs] def find_all_gaps(inds): """Identify gaps in a list of indices. Parameters ---------- inds : list-like Array of indices Returns ------- gap_indices : list-like Indices of gap start and end Notes ----- For example, in an array of `arr=[2,3,5,6,7,8]`, this function will return `gap_inds=[1]` to indicate where the gap starts. """ gap_indices = [] # Iterate through the array and find where the gaps start for i in range(len(inds) - 1): if inds[i + 1] != inds[i] + 1: gap_indices.append(i + 1) # Append the index where the gap starts return gap_indices
[docs] def detect_outliers(arr): """Detect outliers in an array. Parameters ---------- arr : array-like Array of numbers Returns ------- outlier_indices : array-like array of indices where `arr` has outliers Notes ----- Uses InterQuartile Range (IQR) IQR = q3 - q1 outlier > q3 + 1.5 * IQR outlier < q1 - 1.5 * IQR """ # Ensure input is array-like arr = np.asarray(arr) if arr.shape == (): arr = np.array([arr]) # Get the quartiles and IQR q1 = np.percentile(arr[np.isfinite(arr)], 25) q3 = np.percentile(arr[np.isfinite(arr)], 75) iqr = q3 - q1 # Get the upper and lower limits upper_lim = q3 + 1.5 * iqr lower_lim = q1 - 1.5 * iqr # Identify the desired indices outlier_indices = np.where((arr > upper_lim) | (arr < lower_lim))[0] return outlier_indices