signals

Module: signals

Signal processing functions from different sources.

Authors:

Copyright 2015, Sage Bionetworks (http://sagebase.org), Apache v2.0 License

Functions

mhealthx.signals.accelerometer_signal_quality(gx, gy, gz)

Compute accelerometer signal quality.

gx : list
x-axis gravity acceleration
gy : list
y-axis gravity acceleration
gz : list
z-axis gravity acceleration
min_mse : float
minimum mean squared error
vertical : string
primary direction of vertical (‘x’, ‘y’, or ‘z’)
>>> from mhealthx.xio import read_accel_json
>>> input_file = '/Users/arno/DriveWork/mhealthx/mpower_sample_data/deviceMotion_walking_outbound.json.items-a2ab9333-6d63-4676-977a-08591a5d837f5221783798792869048.tmp'
>>> device_motion = True
>>> start = 150
>>> t, axyz, gxyz, uxyz, rxyz, sample_rate, duration = read_accel_json(input_file, start, device_motion)
>>> gx, gy, gz = gxyz
>>> from mhealthx.signals import accelerometer_signal_quality
>>> min_mse, vertical = accelerometer_signal_quality(gx, gy, gz)
mhealthx.signals.autocorrelate(data, unbias=2, normalize=2, plot_test=False)

Compute the autocorrelation coefficients for time series data.

Here we use scipy.signal.correlate, but the results are the same as in Yang, et al., 2012 for unbias=1: “The autocorrelation coefficient refers to the correlation of a time series with its own past or future values. iGAIT uses unbiased autocorrelation coefficients of acceleration data to scale the regularity and symmetry of gait. The autocorrelation coefficients are divided by fc(0) in Eq. (6), so that the autocorrelation coefficient is equal to 1 when t=0

NFC(t) = fc(t) / fc(0)

Here NFC(t) is the normalised autocorrelation coefficient, and fc(t) are autocorrelation coefficients.”

data : numpy array
time series data
unbias : integer or None
unbiased autocorrelation: divide by range (1) or by weighted range (2)
normalize : integer or None
normalize: divide by 1st coefficient (1) or by maximum abs. value (2)
plot_test : Boolean
plot?
coefficients : numpy array
[normalized, unbiased] autocorrelation coefficients
N : integer
number of coefficients
>>> import numpy as np
>>> from mhealthx.signals import autocorrelate
>>> data = np.random.random(100)
>>> unbias = 2
>>> normalize = 2
>>> plot_test = True
>>> coefficients, N = autocorrelate(data, unbias, normalize, plot_test)
mhealthx.signals.butter_lowpass_filter(data, sample_rate, cutoff=10, order=4)

Low-pass filter data by the [order]th order zero lag Butterworth filter whose cut frequency is set to [cutoff] Hz.

After http://stackoverflow.com/questions/25191620/ creating-lowpass-filter-in-scipy-understanding-methods-and-units

data : numpy array of floats
time-series data
sample_rate : integer
data sample rate
cutoff : float
filter cutoff
order : integer
order
y : numpy array of floats
low-pass-filtered data
>>> from mhealthx.signals import butter_lowpass_filter
>>> data = np.random.random(100)
>>> sample_rate = 10
>>> cutoff = 5
>>> order = 4
>>> y = butter_lowpass_filter(data, sample_rate, cutoff, order)
mhealthx.signals.compute_cv(x)

cv = 100 * np.std(x) / np.mean(x)

x : list or array of floats

cv : float

>>> from mhealthx.signals import compute_cv
>>> x = [1,  3, 10]
>>> cv = compute_cv(x)
mhealthx.signals.compute_interpeak(data, sample_rate)

Compute number of samples between signal peaks using the real part of FFT.

data : list or numpy array
time series data
sample_rate : float
sample rate of accelerometer reading (Hz)
interpeak : integer
number of samples between peaks
>>> import numpy as np
>>> from mhealthx.signals import compute_interpeak
>>> data = np.random.random(10000)
>>> sample_rate = 100
>>> interpeak = compute_interpeak(data, sample_rate)
mhealthx.signals.compute_mean_teagerkaiser_energy(x)

Mean Teager-Kaiser energy operator

(from TKEO function in R library(seewave) using f = 1, m = 1, M = 1)

x : numpy array of floats

tk_energy : float

>>> from mhealthx.signals import compute_mean_teagerkaiser_energy
>>> x = np.array([1,  3, 12, 25, 10])
>>> tk_energy = compute_mean_teagerkaiser_energy(x)
mhealthx.signals.compute_median_abs_dev(X, W=[], precision=1, c=1.0)

Compute the (weighted) median absolute deviation.

mad = median(abs(x - median(x))) / c

X : numpy array of floats or integers
values
W : numpy array of floats or integers
weights
precision : integer
number of decimal places to consider weights
c : float
constant used as divisor for mad computation; c = 0.6745 is used to convert from mad to standard deviation
mad : float
(weighted) median absolute deviation
>>> import numpy as np
>>> from mhealthx.signals import compute_median_abs_dev
>>> X = np.array([1,2,4,7,8])
>>> W = np.array([.1,.1,.3,.2,.3])
>>> precision = 1
>>> # [1, 2, 4, 4, 4, 7, 7, 8, 8, 8]
>>> compute_median_abs_dev(X, W, precision)
    2.0
mhealthx.signals.compute_sample_rate(t)

Compute sample rate.

t : list
time points
sample_rate : float
sample rate
duration : float
duration of time series
>>> import numpy as np
>>> from mhealthx.signals import compute_sample_rate
>>> t = np.linspace(.1, 1000, 10000)
>>> sample_rate, duration = compute_sample_rate(t)
mhealthx.signals.compute_stats(x)

Compute statistical summaries of data.

x : list or array of floats

num : integer
number of elements
min : integer
minimum
max : integer
maximum
rng : integer
range
avg : float
mean
std : float
standard deviation
med : float
median
mad : float
median absolute deviation
kurt : float
kurtosis
skew : float
skewness
cvar : float
coefficient of variation
lower25 : float
lower quartile
upper25 : float
upper quartile
inter50 : float
interquartile range
>>> from mhealthx.signals import compute_stats
>>> x = [1,  3, 10]
>>> num, min, max, rng, avg, std, med, mad, kurt, skew, cvar, lower25, upper25, inter50 = compute_stats(x)
mhealthx.signals.crossings_nonzero_pos2neg(data)

Find indices of zero crossings from positive to negative values.

From: http://stackoverflow.com/questions/3843017/
efficiently-detect-sign-changes-in-python

data : numpy array of floats

crossings : numpy array of integers
crossing indices to data
>>> import numpy as np
>>> from mhealthx.signals import crossings_nonzero_pos2neg
>>> data = np.random.random(100)
>>> crossings = crossings_nonzero_pos2neg(data)
mhealthx.signals.gravity_min_mse(gx, gy, gz)

Compute QC score based on gravity acceleration only.

Elias Chaibub-Neto used this to find gross rotations of the phone.

gx : list or numpy array
x-axis gravity accelerometer data
gy : list or numpy array
y-axis gravity accelerometer data
gz : list or numpy array
z-axis gravity accelerometer data
min_mse : float
minimum mean squared error
vertical : string
primary direction of vertical (‘x’, ‘y’, or ‘z’)
>>> from mhealthx.xio import read_accel_json
>>> input_file = '/Users/arno/DriveWork/mhealthx/mpower_sample_data/deviceMotion_walking_outbound.json.items-a2ab9333-6d63-4676-977a-08591a5d837f5221783798792869048.tmp'
>>> device_motion = True
>>> start = 150
>>> t, axyz, gxyz, uxyz, rxyz, sample_rate, duration = read_accel_json(input_file, start, device_motion)
>>> gx, gy, gz = gxyz
>>> from mhealthx.signals import gravity_min_mse
>>> min_mse, vertical = gravity_min_mse(gx, gy, gz)
mhealthx.signals.parabolic(f, x)

Quadratic interpolation for estimating the true position of an inter-sample maximum when nearby samples are known.

From: https://gist.github.com/endolith/255291 and
https://github.com/endolith/waveform-analyzer
f : list or array
vector
x : integer
index for vector f
(vx, vy) : floats
coordinates of the vertex of a parabola that goes through point x and its two neighbors
>>> from mhealthx.signals import parabolic
>>> # Define a vector f with a local maximum at index 3 (= 6); find local
>>> # maximum if points 2, 3, and 4 actually defined a parabola.
>>> f = [2, 3, 1, 6, 4, 2, 3, 1]
>>> parabolic(f, argmax(f))
>>> # Out[4]: (3.2142857142857144, 6.1607142857142856)
mhealthx.signals.root_mean_square(data)

Compute root mean square of data.

from Yang, et al., 2012: “The root mean square of acceleration indicates the intensity of motion. The RMS values of the three acceleration directions (VT, AP and ML) are calculated as: RMS_d = sqrt( (sum[i=1:N](xdi - [xd])^2 / N) ) where xdi (i = 1,2,...,N; d = VT,AP,ML) is the acceleration in either the VT, AP or ML axis, N is the length of the acceleration signal, [xd] is the mean value of acceleration in any axis.”

data : numpy array of floats

rms : float
root mean square
>>> import numpy as np
>>> from mhealthx.signals import root_mean_square
>>> data = np.random.random(100)
>>> rms = root_mean_square(data)
mhealthx.signals.signal_features(data)

Extract various features from time series data.

data : numpy array of floats
time series data
num : integer
number of elements
min : integer
minimum
max : integer
maximum
rng : integer
range
avg : float
mean
std : float
standard deviation
med : float
median
mad : float
median absolute deviation
kurt : float
kurtosis
skew : float
skewness
cvar : float
coefficient of variation
lower25 : float
lower quartile
upper25 : float
upper quartile
inter50 : float
interquartile range
rms : float
root mean squared error
entropy : float
entropy measure
tk_energy : float
mean Teager-Kaiser energy
>>> import numpy as np
>>> from mhealthx.signals import signal_features
>>> data = np.random.random(100)
>>> num, min, max, rng, avg, std, med, mad, kurt, skew, cvar, lower25, upper25, inter50, rms, entropy, tk_energy = signal_features(data)
mhealthx.signals.weighted_to_repeated_values(X, W=[], precision=1)

Create a list of repeated values from weighted values.

This is useful for computing weighted statistics (ex: weighted median).

Adapted to allow for fractional weights from
http://stackoverflow.com/questions/966896/
code-golf-shortest-code-to-find-a-weighted-median
X : numpy array of floats or integers
values
W : numpy array of floats or integers
weights
precision : integer
number of decimal places to consider weights
repeat_values : numpy array of floats or integers
repeated values according to their weight
>>> import numpy as np
>>> from mhealthx.signals import weighted_to_repeated_values
>>> X = np.array([1,2,4,7,8])
>>> W = np.array([.1,.1,.3,.2,.3])
>>> precision = 1
>>> weighted_to_repeated_values(X, W, precision)
    [1, 2, 4, 4, 4, 7, 7, 8, 8, 8]