Source code for Preprocessing

import DataHandling as DH
import utils as utl
import numpy as np
from datetime import timedelta,datetime
import inspect
from scipy import optimize
from scipy import signal as ss
import scipy.integrate as si
import scipy.signal.windows as ssw
import mne.time_frequency as mtf
#-----------------------------------------NeoDBS-------------------------------------------------

#------------------ Easy Access to Scipy.signal---------------------
'''
Matlab-style IIR filter design
'''
iir_filter_design_functions = ['butter','buttord','cheby1','cheb1ord','cheby2','cheb2ord','ellip',
                               'ellipord','bessel','iirnotch','iirpeak', 'iircomb']


[docs]def get_functions(module = 'ss'): ''' Retrieves and saves all functions names and callbacks into a dictionary Modules: - 'ss': scipy.signal - 'mtf': mne.time_frequency - 'si': scipy.integrate - 'ssw': scipy.signal.windows - 'np': numpy :param module: str :return: dict, {func_naame: obj} ''' match module: case 'ss': module = ss case 'mtf': module = mtf case 'si': module = si case 'ssw': module = ssw case 'np': module = np functions = {} for name, obj in inspect.getmembers(module, inspect.isroutine): try: functions[name] = obj except Exception as e: print(f"Skipping function {name} due to error: {e}") return functions
[docs]def extract_params_and_returns(docstring: str) -> str: ''' Extracts the 'Parameters' and 'Returns' sections from a Numpydoc-formatted documentation string. :param docstring: str, documentation string of a function. :return: A string containing only the 'Parameters' and 'Returns' sections. ''' lines = docstring.splitlines() sections_to_extract = {"Parameters", "Returns"} section_content = {section: [] for section in sections_to_extract} current_section = None capture = False for i, line in enumerate(lines): stripped = line.strip() # Start of a new section if stripped in sections_to_extract: current_section = stripped capture = True section_content[current_section].append(line) # add header continue elif stripped in {"See Also", "Notes", "Examples"}: capture = False current_section = None continue # Capture content under current section if capture and current_section: # Append blank lines and indented lines (part of parameter/return description) if line.strip() or (section_content[current_section] and section_content[current_section][-1].strip()): section_content[current_section].append(line) # Combine captured sections into final output output_lines = [] for section in ("Parameters", "Returns"): if section_content[section]: output_lines.extend(section_content[section]) output_lines.append("") # spacing between sections return "\n".join(output_lines).strip()
[docs]def call_function2(func_name, *args, **kwargs): ''' Call a function from scipy.signal by name, print its documentation, and handle exceptions. If given arguments dont work, function retries with default arguments given by module. :param func_name: str, function name :param args: dict, required arguments :param kwargs: dict, optional keyword arguments :return: output of func(*args,**kwargs) ''' module = ss functions = get_functions() if func_name not in functions: print(f"Function '{func_name}' not found in {module.__name__}") return None func = functions[func_name] # Get function signature sig = inspect.signature(func) # Extract required parameters (no defaults) required_params = [ param.name for param in sig.parameters.values() if param.default is inspect.Parameter.empty and param.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.POSITIONAL_ONLY) ] # Extract keyword default parameters default_kwargs = { param.name: param.default for param in sig.parameters.values() if param.default is not inspect.Parameter.empty and param.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY) } try: return func(*args, **kwargs) except Exception as e: print(f"Error calling {func_name}: {e}") if args: print("Error occurred even with provided arguments. Not retrying.") return None # If the user provided arguments, don't retry if required_params: print(f"Cannot retry because required positional arguments are missing: {required_params}") return None # Can't call the function without required positional args if default_kwargs: print("Retrying with default keyword arguments...") try: return func(**default_kwargs) except Exception as e2: print(f"Error with default parameters as well: {e2}") return None else: print("No default parameters available.") return None
[docs]def call_function(func_name, param_dict, module = 'ss'): ''' Call a function from module by name, print its documentation, and handle exceptions. If given arguments dont work, function retries with default arguments given by module. Modules: - 'ss': scipy.signal - 'mtf': mne.time_frequency - 'si': scipy.integrate - 'ssw': scipy.signal.windows - 'np': numpy :param func_name: str, function name :param param_dict: dict, required and optional arguments :param module: str, module name :return: output of func(*args,**kwargs) ''' match module: case 'ss': module = ss case 'mtf': module = mtf case 'si': module = si case 'ssw': module = ssw case 'np': module = np functions = get_functions(module) if func_name not in functions: print(f"Function '{func_name}' not found in {module.__name__}") return None func = functions[func_name] sig = inspect.signature(func) # Extract required positional arguments required_params = [ param.name for param in sig.parameters.values() if param.default is inspect.Parameter.empty and param.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.POSITIONAL_ONLY) ] # Extract keyword default parameters default_kwargs = { param.name: param.default for param in sig.parameters.values() if param.default is not inspect.Parameter.empty and param.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY) } # Convert saved parameters (strings) into appropriate types converted_params = {} for key, value in param_dict.items(): if key in required_params or key in default_kwargs: # Ensure it's a valid argument try: # Ensure empty strings become None, and evaluate only if value is not empty if value.strip() == "": converted_params[key] = None elif value.lower() == "none": converted_params[key] = None elif value.lower() == "inf": converted_params[key] = float("inf") else: converted_params[key] = eval(value) # Convert numbers and lists except Exception: converted_params[key] = value # Keep as string if eval fails # Split into positional and keyword arguments args = [converted_params[param] for param in required_params if param in converted_params] kwargs = {k: v for k, v in converted_params.items() if k not in required_params} try: return func(*args, **kwargs) except Exception as e: error = f"Error calling {func_name}: {e}" return error
[docs]def get_required_params(func_name, module = 'ss'): ''' Retrieves required parameters, and default values, from func_name. Modules: - 'ss': scipy.signal - 'mtf': mne.time_frequency - 'si': scipy.integrate - 'ssw': scipy.signal.windows - 'np': numpy :param func_name: str, function name :param module: str, module name. Defaults to ss :return: dict ''' match module: case 'ss': module = ss case 'mtf': module = mtf case 'si': module = si case 'ssw': module = ssw case 'np': module = np functions = get_functions(module) if func_name not in functions: print(f"Function '{func_name}' not found in {module.__name__}") return None func = functions[func_name] sig = inspect.signature(func) required_params = {} for name, param in sig.parameters.items(): if param.default is inspect.Parameter.empty: required_params[name] = None # No default value else: required_params[name] = param.default # Default value exists return required_params
[docs]def get_default_kwargs(func_name,module = 'ss'): ''' Retrieves optional keywords parameters, and default values, from func_name. Modules: - 'ss': scipy.signal - 'mtf': mne.time_frequency - 'si': scipy.integrate - 'ssw': scipy.signal.windows - 'np': numpy :param func_name: str, function name :param module: str, module name. Defaults to ss :return: dict ''' functions = get_functions(module) match module: case 'ss': module = ss case 'mtf': module = mtf case 'si': module = si case 'ssw': module = ssw if func_name not in functions: print(f"Function '{func_name}' not found in {module.__name__}") return None func = functions[func_name] sig = inspect.signature(func) default_kwargs = { param.name: param.default for param in sig.parameters.values() if param.default is not inspect.Parameter.empty and param.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY) } return default_kwargs
[docs]def get_date_from_ts(ts): ''' Returns datetime object correspondant to the given timestamp ts :param ts: int/float :return: datetime.datetime ''' return datetime.fromtimestamp(ts)
#----------------------------- NeoDBS signal processing functions------------------
[docs]def MeanSignal(signals, key = 'Y'): ''' Calculates and return mean from specific key in signal dictionary, item of signals list :param signals: list list of dictionaries of signal data. :param key: str/int, default to 'Y' key of dictionary :return: np.array computed mean ''' yy=[signal[key] for signal in signals] mean_signal = np.nanmean(yy, axis=0) return mean_signal
[docs]def StdDevSignal(signals, key = None): ''' Calculates and return standard deviation from specific key in signal dictionary, item of signals list :param signals: list, list of dictionaries of signal data. :param key: str/int :return: np.array ''' if key == None: key = 'Y' yy=[signal[key] for signal in signals] mean_signal = np.nanstd(yy, axis=0) return mean_signal
#---Power Normalization
[docs]def convert_to_db(value): ''' Convert a value to dB scale using 10 * log10. :param value: int/float :return: int/float ''' if isinstance(value, (np.ndarray,tuple)): # Handle peak values (power, frequency) return (round(10 * np.log10(value[0]+1e-6), 2), value[1]) elif isinstance(value,list): try: result = [round(float(10*np.log10(s + 1e-6)),2) for s in value] return result except Warning: print(value) elif isinstance(value, (int, float)): try: result = round(10 * np.log10(value+1e-6), 2) return result except Warning: print(value) return value # Return unchanged if it's a string or invalid type
[docs]def convert_from_db(value): ''' Convert a dB value back to linear scale using 10^(dB/10). :param value: int/float :return: int/float ''' if isinstance(value, (np.ndarray,tuple)): # Handle peak values (power, frequency) return (round(10**(value[0] / 10), 2) if value[0] != float('-inf') else 0, value[1]) elif isinstance(value,list): return [round(float(10**(s / 10) - 1e-6),2) for s in value] elif isinstance(value, (int, float)): return round(10**(value / 10), 2) if value != float('-inf') else 0 return value # Return unchanged if it's a string or invalid type
# -------------------------------------- BRAVO -------------------------------------
[docs]def removeCardiacComponent(raw, band, filterMethod="Fixed Peak Find"): ''' Applies 5th order Butterworth bandpass filter with cardiac artifact removal. With Wiener Filter and scipy.find_peaks, an ECG template is created and subtracted to the filtered signal, removing the cardiac component. Adapted from https://github.com/Fixel-Institute/BRAVO_SSR/blob/main/modules/LocalPerceptDatabase.py#L327 :param raw: dict, {'Y': signal values} :param band: Description :param filterMethod: Description :return: filtered signal, signal with only bandpass ''' a,b = band n = 5 if int(a) == 0: a = 1 n = 5 if int(b) == 0: b = 100 band = np.array([a,b]) filtered = raw['Y'] fs = 250 ba = ss.butter(n, band, 'bp', output='ba',fs=fs) PredictedSignal = ss.wiener(filtered, mysize=125) filtered -= PredictedSignal if filterMethod == "Fixed Peak Find": # Cardiac Filter peaks = [] posPeaks,_ = ss.find_peaks(filtered, prominence=[10,200], distance=fs*0.5) PosCardiacVariability = np.std(np.diff(posPeaks)) negPeaks,_ = ss.find_peaks(-filtered, prominence=[10,200], distance=fs*0.5) NegCardiacVariability = np.std(np.diff(negPeaks)) if PosCardiacVariability < NegCardiacVariability: peaks = posPeaks else: peaks = negPeaks if len(peaks)>1: if None in peaks: print('None') CardiacRate = int(np.mean(np.diff(peaks))) PrePeak = int(CardiacRate*0.25) PostPeak = int(CardiacRate*0.65) EKGMatrix = np.zeros((len(peaks)-2,PrePeak+PostPeak)) for j in range(1,len(peaks)-1): if peaks[j]+PostPeak < len(filtered) and peaks[j]-PrePeak > 0: EKGMatrix[j-1,:] = filtered[peaks[j]-PrePeak:peaks[j]+PostPeak] EKGTemplate = np.mean(EKGMatrix,axis=0) EKGTemplate = EKGTemplate / (np.max(EKGTemplate)-np.min(EKGTemplate)) def EKGTemplateFunc(xdata, amplitude, offset): return EKGTemplate * amplitude + offset for j in range(len(peaks)): if peaks[j]-PrePeak < 0: pass elif peaks[j]+PostPeak >= len(filtered) : pass else: sliceSelection = np.arange(peaks[j]-PrePeak,peaks[j]+PostPeak) params, covmat = optimize.curve_fit(EKGTemplateFunc, sliceSelection, filtered[sliceSelection]) filtered[sliceSelection] = filtered[sliceSelection] - EKGTemplateFunc(sliceSelection, *params) ecg = EKGTemplateFunc(sliceSelection, *params) filtered = ss.filtfilt(*ba, filtered) buttered = ss.filtfilt(*ba, raw['Y']) return filtered, buttered