Skip to content
Snippets Groups Projects
Commit 27a4dd4b authored by Péter Szabolcs Mátyás's avatar Péter Szabolcs Mátyás
Browse files

The structure of the package, no documentation yet.

parent de0667de
No related branches found
No related tags found
No related merge requests found
from scipy.io import savemat
def save_data(data, output_path):
if not output_path.endswith('.mat'):
output_path += '.mat'
savemat(output_path, data)
import numpy as np
import pyedflib
from pyPPG.datahandling import save_data
def cut_edf(edf_path, output_path, start_time, duration): #Time is specified in sec
......@@ -58,6 +60,8 @@ def cut_edf(edf_path, output_path, start_time, duration): #Time is specified i
edf_new.close()
if __name__ == "__main__":
cut_edf("old_stuff/mesa-sleep-0006.edf", "20p.edf", 12000, 1200)
\ No newline at end of file
import pyedflib
def read_edf_signals(edf_path, channel_names):
"""Load raw data from EDF file.
:param edf_path: .
:type edf_path: str
:param channel_names: .
:type channel_names: list
:returns:
"""
with pyedflib.EdfReader(edf_path) as edf:
labels = edf.getSignalLabels()
signals = {}
for name in channel_names:
if name in labels:
idx = labels.index(name)
fs = edf.getSampleFrequency(idx)
sig = edf.readSignal(idx)
signals[name] = {'signal': sig, 'fs': fs}
return signals
\ No newline at end of file
from pyPSG.IO.edf_read import read_edf_signals
from pyPSG.IO.data_handling import save_data
def biomarker_extractor(edf_path, matlab_path, channels = {"ppg": "", "ecg": "", "spo2": ""}): #TODO channel nevek dict-ben
for ch, name in channels.items():
if name == "":
del channels[ch]
signals = read_edf_signals(edf_path, channels.values())
extracted_bms = {}
for ch, name in channels.items():
if ch == "ecg":
exec(
ch + "_bm = get_" + ch + "_biomarkers(signals['" + name + "']['signal'], signals['" + name + "']['fs'], matlab_path)")
else:
exec(
ch + "_bm = get_" + ch + "_biomarkers(signals['" + name + "']['signal'], signals['" + name + "']['fs'])")
extracted_bms[ch] = eval(ch + "_bm")
return extracted_bms
if __name__ == "__main__":
matlab_path = r'C://Program Files//MATLAB//MATLAB Runtime//v910//runtime//win64'
channels = {"ppg": "Pleth", "ecg": "EKG", "spo2": "SpO2"}
extracted_bms = biomarker_extractor("../sample.edf", matlab_path, channels)
save_data(extracted_bms, "biomarkers")
print(extracted_bms)
\ No newline at end of file
from pyPSG.utils import HiddenPrints
from pecg import Preprocessing as Pre
from pecg.ecg import FiducialPoints as Fp
from pecg.ecg import Biomarkers as Bm
from pyPSG.biomarkers.get_hrv_bm import get_hrv_biomarkers
def get_ecg_biomarkers(signal, fs, matlab_path, get_hrv = True):
pre = Pre.Preprocessing(signal, fs)
# Notch filter the powerline:
filtered_signal = pre.notch(n_freq=50) # 50 Hz for european powerline, 60 Hz for USA
# Bandpass for baseline wander and high-frequency noise:
filtered_signal = pre.bpfilt()
fp = Fp.FiducialPoints(filtered_signal, fs)
# Two different peak detector algorithms:
with HiddenPrints(): # to avoid long verbose of the peak detector functions
jqrs_peaks = fp.jqrs()
xqrs_peaks = fp.xqrs()
fiducials = fp.wavedet(matlab_path, peaks=jqrs_peaks)
bm = Bm.Biomarkers(filtered_signal, fs, fiducials)
ints, stat_i = bm.intervals()
waves, stat_w = bm.waves()
ecg_biomarker = {
"ints": ints,
"stat_i": stat_i,
"waves": waves,
"stat_w": stat_w,
}
if get_hrv:
hrv_biomarker = get_hrv_biomarkers(jqrs_peaks, fs)
combined_biomarkers = {
"ecg": ecg_biomarker,
"hrv": hrv_biomarker
}
return combined_biomarkers
else:
return ecg_biomarker
\ No newline at end of file
import numpy as np
from pyPSG.biomarkers import hrv_bms as hrv
def get_hrv_biomarkers(peaks, fs):
rr_intervals = np.diff(peaks) / fs
all_metrics = hrv.get_all_metrics(rr_intervals)
return all_metrics
\ No newline at end of file
import pandas as pd
from dotmap import DotMap
from pyPPG import PPG, Fiducials, Biomarkers
import pyPPG.preproc as PP
import pyPPG.fiducials as FP
import pyPPG.biomarkers as BM
from pyPSG.biomarkers.get_hrv_bm import get_hrv_biomarkers
def get_ppg_biomarkers(signal, fs, filtering=True, fL=0.5000001, fH=12, order=4,
sm_wins={'ppg': 50, 'vpg': 10, 'apg': 10, 'jpg': 10}, correction=pd.DataFrame(), get_brv = True):
ppg_signal = DotMap()
ppg_signal.v = signal
ppg_signal.fs = fs
ppg_signal.start_sig = 0
ppg_signal.end_sig = len(signal)
ppg_signal.name = "custom_ppg"
# Initialise the filters
prep = PP.Preprocess(fL=fL, fH=fH, order=order, sm_wins=sm_wins)
# Filter and calculate the PPG, PPG', PPG", and PPG'" signals
ppg_signal.filtering = filtering
ppg_signal.fL = fL
ppg_signal.fH = fH
ppg_signal.order = order
ppg_signal.sm_wins = sm_wins
ppg_signal.ppg, ppg_signal.vpg, ppg_signal.apg, ppg_signal.jpg = prep.get_signals(s=ppg_signal)
# Initialise the correction for fiducial points
corr_on = ['on', 'dn', 'dp', 'v', 'w', 'f']
correction.loc[0, corr_on] = True
ppg_signal.correction = correction
## Create a PPG class
s = PPG(s=ppg_signal, check_ppg_len=True)
## Get Fiducial points
# Initialise the fiducials package
fpex = FP.FpCollection(s=s)
# Extract fiducial points
fiducials = fpex.get_fiducials(s=s)
# print("Fiducial points:\n", fiducials + s.start_sig) #TODO: szepen megcsinalani mint Marci
# Create a fiducials class
fp = Fiducials(fp=fiducials)
# Calculate SQI
# ppgSQI = round(np.mean(SQI.get_ppgSQI(ppg=s.ppg, fs=s.fs, annotation=fp.sp)) * 100, 2)
# print('Mean PPG SQI: ', ppgSQI, '%')
# Initialise the biomarkers package
fp = Fiducials(fp=fiducials)
bmex = BM.BmCollection(s=s, fp=fp)
# Extract biomarkers
bm_defs, bm_vals, bm_stats = bmex.get_biomarkers()
# Create a biomarkers class
pyppg_bm = Biomarkers(bm_defs=bm_defs, bm_vals=bm_vals, bm_stats=bm_stats)
if get_brv:
peaks = fiducials.sp
brv_bm = get_hrv_biomarkers(peaks, fs)
ppg_bm = {
"ppg": pyppg_bm,
"brv": brv_bm
}
return ppg_bm
else:
return pyppg_bm
\ No newline at end of file
import pyedflib
import numpy as np
import pandas as pd
from dotmap import DotMap
# POBM moduls
from pobm.obm.desat import DesaturationsMeasures, desat_embedding
from pobm.obm.complex import ComplexityMeasures
from pobm.obm.burden import HypoxicBurdenMeasures
......@@ -11,38 +7,6 @@ from pobm.obm.general import OverallGeneralMeasures
from pobm.obm.periodicity import PRSAMeasures, PSDMeasures
from pobm.prep import set_range, median_spo2
# PyPPG moduls
from pyPPG import PPG, Fiducials, Biomarkers
import pyPPG.preproc as PP
import pyPPG.fiducials as FP
import pyPPG.biomarkers as BM
import pyPPG.ppg_sqi as SQI
#PECG moduls
from pecg import Preprocessing as ecgPre
from pecg.ecg import FiducialPoints as ecgFp
from pecg.ecg import Biomarkers as ecgBm
#HRV moduls
import mhrv_module as hrv
from utils import HiddenPrints
def read_edf_signals(edf_path, channel_names):
with pyedflib.EdfReader(edf_path) as edf:
labels = edf.getSignalLabels()
signals = {}
for name in channel_names:
if name in labels:
idx = labels.index(name)
fs = edf.getSampleFrequency(idx)
sig = edf.readSignal(idx)
signals[name] = {'signal': sig, 'fs': fs}
return signals
def features_all_desat(signal, time_signal, ODI_Threshold=6, hard_threshold=88, relative=True, desat_max_length=14400):
time_signal = np.array(time_signal)
......@@ -185,9 +149,6 @@ def get_spo2_biomarkers(signal, fs, patient_name="Unknown"):
spo2_signal = median_spo2(spo2_signal, FilterLength=301)
time_signal = np.arange(0, len(spo2_signal)) / fs
test_desat = features_all_desat(spo2_signal, time_signal, ODI_Threshold=4, hard_threshold=93, relative=False,
desat_max_length=14400)
biomarker = pd.DataFrame()
time_begin = time_signal[0]
......@@ -196,190 +157,3 @@ def get_spo2_biomarkers(signal, fs, patient_name="Unknown"):
biomarker = extract_biomarkers_per_signal(spo2_signal, patient_name, time_begin, time_end)
return biomarker
\ No newline at end of file
def get_ppg_biomarkers(signal, fs, filtering=True, fL=0.5000001, fH=12, order=4, sm_wins={'ppg':50,'vpg':10,'apg':10,'jpg':10}, correction=pd.DataFrame()):
ppg_signal = DotMap()
ppg_signal.v = signal
ppg_signal.fs = fs
ppg_signal.start_sig = 0
ppg_signal.end_sig = len(signal)
ppg_signal.name = "custom_ppg"
# Initialise the filters
prep = PP.Preprocess(fL=fL, fH=fH, order=order, sm_wins=sm_wins)
# Filter and calculate the PPG, PPG', PPG", and PPG'" signals
ppg_signal.filtering = filtering
ppg_signal.fL = fL
ppg_signal.fH = fH
ppg_signal.order = order
ppg_signal.sm_wins = sm_wins
ppg_signal.ppg, ppg_signal.vpg, ppg_signal.apg, ppg_signal.jpg = prep.get_signals(s=ppg_signal)
# Initialise the correction for fiducial points
corr_on = ['on', 'dn', 'dp', 'v', 'w', 'f']
correction.loc[0, corr_on] = True
ppg_signal.correction = correction
## Create a PPG class
s = PPG(s=ppg_signal, check_ppg_len=True)
## Get Fiducial points
# Initialise the fiducials package
fpex = FP.FpCollection(s=s)
# Extract fiducial points
fiducials = fpex.get_fiducials(s=s)
# print("Fiducial points:\n", fiducials + s.start_sig) #TODO: szepen megcsinalani mint Marci
# Create a fiducials class
fp = Fiducials(fp=fiducials)
# Calculate SQI
# ppgSQI = round(np.mean(SQI.get_ppgSQI(ppg=s.ppg, fs=s.fs, annotation=fp.sp)) * 100, 2)
# print('Mean PPG SQI: ', ppgSQI, '%')
# Initialise the biomarkers package
fp = Fiducials(fp=fiducials)
bmex = BM.BmCollection(s=s, fp=fp)
# Extract biomarkers
bm_defs, bm_vals, bm_stats = bmex.get_biomarkers()
# Create a biomarkers class
bm = Biomarkers(bm_defs=bm_defs, bm_vals=bm_vals, bm_stats=bm_stats)
return bm
def get_ecg_biomarkers(signal, fs, matlab_path):
pre = ecgPre.Preprocessing(signal, fs)
# Notch filter the powerline:
filtered_signal = pre.notch(n_freq=50) # 50 Hz for european powerline, 60 Hz for USA
# Bandpass for baseline wander and high-frequency noise:
filtered_signal = pre.bpfilt()
fp = ecgFp.FiducialPoints(filtered_signal, fs)
# Two different peak detector algorithms:
with HiddenPrints(): # to avoid long verbose of the peak detector functions
jqrs_peaks = fp.jqrs()
xqrs_peaks = fp.xqrs()
fiducials = fp.wavedet(matlab_path, peaks=jqrs_peaks)
bm = ecgBm.Biomarkers(filtered_signal, fs, fiducials)
ints, stat_i = bm.intervals()
waves, stat_w = bm.waves()
ecg_biomarker = {
"ints": ints,
"stat_i": stat_i,
"waves": waves,
"stat_w": stat_w,
}
return ecg_biomarker
def get_brv_metrics(signal, fs, filtering=True, fL=0.5000001, fH=12, order=4, sm_wins={'ppg':50,'vpg':10,'apg':10,'jpg':10}, correction=pd.DataFrame()):
ppg_signal = DotMap()
ppg_signal.v = signal
ppg_signal.fs = fs
ppg_signal.start_sig = 0
ppg_signal.end_sig = len(signal)
ppg_signal.name = "custom_ppg"
# Initialise the filters
prep = PP.Preprocess(fL=fL, fH=fH, order=order, sm_wins=sm_wins)
# Filter and calculate the PPG, PPG', PPG", and PPG'" signals
ppg_signal.filtering = filtering
ppg_signal.fL = fL
ppg_signal.fH = fH
ppg_signal.order = order
ppg_signal.sm_wins = sm_wins
ppg_signal.ppg, ppg_signal.vpg, ppg_signal.apg, ppg_signal.jpg = prep.get_signals(s=ppg_signal)
# Initialise the correction for fiducial points
corr_on = ['on', 'dn', 'dp', 'v', 'w', 'f']
correction.loc[0, corr_on] = True
ppg_signal.correction = correction
## Create a PPG class
s = PPG(s=ppg_signal, check_ppg_len=True)
## Get Fiducial points
# Initialise the fiducials package
fpex = FP.FpCollection(s=s)
# Extract fiducial points
fiducials = fpex.get_fiducials(s=s)
peaks = fiducials.sp
pp_intervals = np.diff(peaks) / fs
pp_intervals = np.array(pp_intervals, dtype=np.float64)
metrics = hrv.get_all_metrics(pp_intervals)
return metrics
def get_hrv_metrics(signal, fs):
pre = ecgPre.Preprocessing(signal, fs)
# Notch filter the powerline:
filtered_signal = pre.notch(n_freq=50) # 50 Hz for european powerline, 60 Hz for USA
# Bandpass for baseline wander and high-frequency noise:
filtered_signal = ecgPre.Preprocessing(filtered_signal, fs).bpfilt()
fp = ecgFp.FiducialPoints(filtered_signal, fs)
r_peaks = fp.jqrs()
ecg_rr_intervals = np.diff(r_peaks) / fs
all_metrics = hrv.get_all_metrics(ecg_rr_intervals)
return all_metrics
if __name__ == "__main__":
edf_path = "test03.edf"
channels = ["SpO2", "Pleth", "EKG"]
patient_name = "Patient_1"
matlab_pat = r'C://Program Files//MATLAB//MATLAB Runtime//v910//runtime//win64' # edit to the MATLAB Runtime path installed in your machine
signals = read_edf_signals(edf_path, channels)
ecg_signal = signals["EKG"]["signal"]
ecg_fs = signals["EKG"]["fs"]
ecg_results = get_ecg_biomarkers(ecg_signal, ecg_fs, matlab_pat)
spo2_signal = signals["SpO2"]["signal"]
spo2_fs = signals["SpO2"]["fs"]
spo2_results = get_spo2_biomarkers(spo2_signal, spo2_fs, patient_name)
ppg_signal = signals["Pleth"]["signal"]
ppg_fs = signals["Pleth"]["fs"]
ppg_results = get_ppg_biomarkers(ppg_signal, ppg_fs)
hrv_results = get_hrv_metrics(ecg_signal, ecg_fs)
brv_results = get_brv_metrics(ppg_signal, ppg_fs)
print("SpO2 biomarkers") #TODO: egy dataframebe a biomarkereket
print(spo2_results)
print("PPG biomarkers")
print(ppg_results)
print("ECG biomarkers")
print(ecg_results)
print("HRV biomarkers")
print(ppg_results)
print("BRV biomarkers")
print(brv_results)
......@@ -505,6 +505,8 @@ def comp_freq(segment, vlf_band = [0.003, 0.04], lf_band = [0.04, 0.15], hf_ban
def get_all_metrics(rr_intervals):
rr_intervals = np.asarray(rr_intervals, dtype=np.float64).flatten()
AVNN = comp_AVNN(rr_intervals)
SDNN = comp_SDNN(rr_intervals)
RMSSD = comp_RMSSD(rr_intervals)
......@@ -567,7 +569,7 @@ def get_all_metrics(rr_intervals):
if __name__ == "__main__":
a = 0
# edf_path = "test03.edf"
# edf_path = "sample.edf"
# channels = ["SpO2", "Pleth", "EKG"]
# patient_name = "Patient_1"
#
......
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment