Source code for emgdecompy.preprocessing

# Copyright (C) 2022 Daniel King, Jasmine Ortega, Rada Rudyak, Rowan Sivanandam
# This script contains functions that preprocess raw EMG data for input into
# the blind source separation algorithm.

from scipy import linalg
from scipy.signal import butter, lfilter
import numpy as np

[docs]def flatten_signal(raw): """ Takes the raw EMG signal array, flattens it, and removes empty channels with no data. Parameters ---------- raw: numpy.ndarray Raw EMG signal array. Returns ------- numpy.ndarray Flattened EMG signal array, with empty channels removed. """ # Flatten input array raw_flattened = raw.flatten() # Remove empty channels and then removes dimension of size 1 raw_flattened = np.array( [channel for channel in raw_flattened if 0 not in channel.shape] ).squeeze() return raw_flattened
[docs]def butter_bandpass_filter(data, lowcut=10, highcut=900, fs=2048, order=6): """ Filters input data using a Butterworth band-pass filter. Parameters ---------- data: numpy.ndarray 1D array containing data to be filtered. lowcut: float Lower range of band-pass filter. highcut: float Upper range of band-pass filter. fs: float Sampling frequency in Hz. order: int Order of filter. Returns ------- numpy.ndarray Filtered data. Examples -------- >>> butter_bandpass_filter(data, 10, 900, 2048, order=6) """ b, a = butter(order, [lowcut, highcut], fs=fs, btype="band") filtered_data = lfilter(b, a, data) return filtered_data
[docs]def extend_input_by_R(x, R): """ Takes a one-dimensional array and extends it using past observations. Parameters ---------- x: numpy.ndarray 1D array to be extended. R: int How far to extend x. Returns ------- numpy.ndarray len(x) by R+1 extended array. Examples -------- >>> R = 5 >>> x = np.array([1, 2, 3]) >>> extend_input_by_R(x, R) array([[1., 2., 3.], [0., 1., 2.], [0., 0., 1.], [0., 0., 0.], [0., 0., 0.], [0., 0., 0.]]) """ # Create array with R+1 rows and length of x + R columns extended_x = np.zeros((R + 1, len(x) + R)) # Create array where each row is a delayed version of the previous row for i in range(R + 1): extended_x[i][i : i + len(x)] = x # Optional: Cut off extra R rows extended_x = extended_x.T[0 : len(x)].T return extended_x
[docs]def extend_all_channels(x_mat, R): """ Takes an array with dimensions M by K, where M represents number of channels and K represents observations, and "extends" it to return an array of shape M * (R+1) by K. Parameters ---------- x_mat: numpy.ndarray 2D array to be extended. R: int How far to extend x. Returns ------- numpy.ndarray M(R+1) x K extended array. Examples -------- >>> R = 3 >>> x_mat = np.array([[1, 2, 3, 4,], [5, 6, 7, 8,]]) >>> extend_all_channels(x_mat, R) array([[1., 2., 3., 4.], [0., 1., 2., 3.], [0., 0., 1., 2.], [0., 0., 0., 1.], [5., 6., 7., 8.], [0., 5., 6., 7.], [0., 0., 5., 6.], [0., 0., 0., 5.]]) """ extended_x_mat = np.zeros([x_mat.shape[0], (R + 1), x_mat.shape[1]]) for i, channel in enumerate(x_mat): # Extend channel extended_channel = extend_input_by_R(channel, R) # Add extended channel to the overall matrix of extended channels extended_x_mat[i] = extended_channel # Reshape to get rid of channels extended_x_mat = extended_x_mat.reshape(x_mat.shape[0] * (R + 1), x_mat.shape[1]) return extended_x_mat
[docs]def center_matrix(x): """ Subtract mean of each row. Results in the data being centered around x=0. Parameters ---------- x: numpy.ndarray Matrix of arrays to be centered. Returns ------- numpy.ndarray Centered matrix array. Examples -------- >>> x = np.array([[1, 2, 3], [4, 6, 8]]) >>> center_matrix(x) array([[-1., 0., 1.], [-2., 0., 2.]]) """ x_cent = x.T - np.mean(x.T, axis=0) x_cent = x_cent.T return x_cent
[docs]def whiten(x): """ Whiten the input matrix through zero component analysis. Parameters ---------- x: numpy.ndarray Centred 2D array to be whitened. Returns ------- numpy.ndarray Whitened array. Examples -------- >>> x = np.array([[1, 2, 3, 4], # Feature-1 [5, 6, 7, 8]]) # Feature-2 >>> whiten(x) array([[-1.34217726e+08, -1.34217725e+08, -1.34217725e+08, -1.34217724e+08], [ 1.34217730e+08, 1.34217731e+08, 1.34217731e+08, 1.34217732e+08]]) """ # Calculate covariance matrix cov_mat = np.cov(x, rowvar=True, bias=True) # Eigenvalues and eigenvectors w, v = linalg.eig(cov_mat) # Apply regularization factor, replacing eigenvalues smaller than it with the factor reg_factor = w[round(len(w) / 2):].mean() w = np.where(w < reg_factor, reg_factor, w) # Diagonal matrix inverse square root of eigenvalues diagw = np.diag(1 / (w ** 0.5)) diagw = diagw.real # Whitening using zero component analysis: v diagw v.T x wzca = np.dot(v, np.dot(diagw, v.T)) z = np.dot(wzca, x) return z