Skip to content
Snippets Groups Projects
Commit 06958a5c authored by Péter Szabolcs Mátyás's avatar Péter Szabolcs Mátyás
Browse files

pyPPG and pobm examples

parent b2e3bc5c
Branches
No related tags found
No related merge requests found
temp_dir
.idea
.venv
PPG.mat
ICU_COVID_patient1.csv
pobm_env.yml
\ No newline at end of file
# onlab_pypsg
# Szabi onlab_pypsg
......
main.py 0 → 100644
# -*- coding: utf-8 -*-
"""Edf_IO.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1UtbYz101dv-sZ4SdllLNl3oYvF8rORzg
# EDF to Excel
Install reruired libraries
"""
"""The code below converts an Excel file into an EDF file.
---
"""
import pandas as pd
import pyedflib
import numpy as np
from datetime import datetime
import mne
import matplotlib.pyplot as plt
def convert_excel_to_edf(excel_path, edf_output_path):
# Load the Excel file
excel_data = pd.ExcelFile(excel_path)
# Load the Records and Patient sheets
records_data = excel_data.parse('Records', header=None)
patients_data = excel_data.parse('Patient')
# Extract patient information (use 'XX' for missing values)
patient_data = patients_data.iloc[0].to_dict()
patient_name = str(patient_data.get('Name', 'XX'))
patient_birthdate = str(patient_data.get('Birthdate', 'XX'))
patient_sex = str(patient_data.get('Sex', 'XX'))
patient_ID = str(patient_data.get('ID', 'XX'))
record_date = patient_data.get('Recording_date', 'XX')
patient_height = str(patient_data.get('Height', 'XX'))
patient_weight = str(patient_data.get('Weight', 'XX'))
notes = str(patient_data.get('Notes', 'XX'))
# Handle record_date as datetime if valid
try:
record_start_datetime = datetime.strptime(record_date, '%Y-%m-%d')
except (ValueError, TypeError):
record_start_datetime = datetime.now() # Default to current datetime if invalid
# Extract signal data and sampling frequencies
header_row = records_data.iloc[0].tolist() # First row contains signal labels (e.g., 'EKG', 'PPG')
frequency_row = records_data.iloc[1].tolist() # Second row contains sampling frequencies
signal_data = records_data.iloc[2:].to_numpy() # Signal data starts from the third row
# Prepare signals and sampling frequencies
signals = [signal_data[:, i] for i in range(len(header_row))]
signals = [np.array(signal, dtype=np.float64) for signal in signals]
sampling_frequencies = [int(freq) for freq in frequency_row]
# Create EDF+ file
with pyedflib.EdfWriter(edf_output_path, len(signals), file_type=pyedflib.FILETYPE_EDFPLUS) as edf:
# Set channel headers
channel_headers = []
for i, label in enumerate(header_row):
ch_info = {
'label': label,
'dimension': 'mV',
'sample_frequency': sampling_frequencies[i],
'physical_min': np.min(signals[i]),
'physical_max': np.max(signals[i]),
'digital_min': -32768,
'digital_max': 32767,
'transducer': 'N/A',
'prefilter': 'None'
}
channel_headers.append(ch_info)
edf.setSignalHeaders(channel_headers)
# Write the samples to the EDF file
edf.writeSamples(signals)
# Set patient and recording information
edf.setPatientName(patient_name) # Set patient name
edf.setPatientCode(patient_ID) # Set patient ID
# Clean and truncate additional metadata
additional_info = f"Birthdate_{patient_birthdate}_Height_{patient_height}_Weight_{patient_weight}_Notes_{notes}"
additional_info = ''.join(c if c.isascii() and c != ' ' else '_' for c in additional_info) # ASCII only
additional_info = additional_info[:50] # Limit to 50 chars to leave room for other fields
edf.setRecordingAdditional(additional_info)
# Set other fields to fit within the 80-character limit
edf.setTechnician("") # Leave technician blank
edf.setEquipment("") # Leave equipment blank
edf.setAdmincode("") # Leave admincode blank
# Set the start date and time for the recording
edf.setStartdatetime(record_start_datetime)
if __name__ == "__main__":
# Paths for input Excel file and output EDF+ file
excel_path = "simulated_signals.xlsx"
edf_output_path = "output_sample.edf"
# Convert Excel to EDF+
convert_excel_to_edf(excel_path, edf_output_path)
print(f"EDF+ file created at: {edf_output_path}")
"""# Plot the channel
With this code, we can validate the EDF file. The code loads and visualizes the desired data.
"""
import pyedflib
import matplotlib.pyplot as plt
def plot_edf(edf_path, channel_index, start_sample, num_samples):
# Open the EDF file
with pyedflib.EdfReader(edf_path) as edf:
n_signals = edf.signals_in_file # Get the number of signals in the file
# Check if the channel index is valid
if channel_index < 0 or channel_index >= n_signals:
print(f"Invalid channel index. The file has {n_signals} channels.")
return
# Get signal labels
signal_labels = edf.getSignalLabels()
# Read and slice the signal
signal = edf.readSignal(channel_index) # Read the selected channel data
signal = signal[start_sample:start_sample + num_samples] # Slice the desired range
# Plot the selected channel
plt.figure(figsize=(12, 4))
plt.plot(signal, label=f"Channel: {signal_labels[channel_index]}")
plt.title(f"Channel {channel_index + 1}: {signal_labels[channel_index]} (Samples {start_sample} to {start_sample + num_samples})")
plt.xlabel("Sample Index")
plt.ylabel("Amplitude")
plt.legend()
plt.show()
# Usage
edf_path = "output_sample.edf" #Specify the EDF file to be plotted
channel_index = 0 # Specify the channel index (0-based)
start_sample = 5000 # Specify the starting sample index
num_samples = 10000 # Specify the number of samples to plot
plot_edf(edf_path, channel_index, start_sample, num_samples)
\ No newline at end of file
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pobm.obm.desat import DesaturationsMeasures, desat_embedding
from pobm.prep import set_range, median_spo2
def features_all_desat(signal, time_signal, ODI_Threshold = 6, hard_threshold = 88, relative = True, desat_max_length = 14400):
time_signal = np.array(time_signal)
# desat_class = DesaturationsMeasures(ODI_Threshold=ODI_Threshold,hard_threshold=hard_threshold, relative=relative, desat_max_length=desat_max_length)
desat_class = DesaturationsMeasures(ODI_Threshold=ODI_Threshold, hard_threshold=hard_threshold, desat_max_length=desat_max_length)
desat_class.compute(signal)
begin_idx = desat_class.begin
end_idx = desat_class.end
desaturations, desaturation_valid, desaturation_length_all, desaturation_int_100_all, \
desaturation_int_max_all, desaturation_depth_100_all, desaturation_depth_max_all, \
desaturation_slope_all = desat_embedding(begin_idx, end_idx)
time_spo2_array = np.array(range(len(signal)))
starts = []
for (i, desaturation) in enumerate(desaturations):
starts.append(desaturation['Start'])
desaturation_idx = (time_spo2_array >= desaturation['Start']) & (time_spo2_array <= desaturation['End'])
if np.sum(desaturation_idx) == 0:
continue
signal = np.array(signal)
desaturation_time = time_spo2_array[desaturation_idx]
desaturation_spo2 = signal[desaturation_idx]
desaturation_min = np.nanmin(desaturation_spo2)
desaturation_max = np.nanmax(desaturation_spo2)
desaturation_valid[i] = True
desaturation_length_all[i] = desaturation['Duration']
desaturation_int_100_all[i] = np.nansum(100 - desaturation_spo2)
desaturation_int_max_all[i] = np.nansum(desaturation_max - desaturation_spo2)
desaturation_depth_100_all[i] = 100 - desaturation_min
desaturation_depth_max_all[i] = desaturation_max - desaturation_min
desaturation_idx_max = np.where(desaturation_spo2 == desaturation_max)[0][0]
desaturation_idx_min = np.where(desaturation_spo2 == desaturation_min)[0][-1]
desaturation_idx_max_min = np.arange(desaturation_idx_max, desaturation_idx_min + 1)
if len(desaturation_idx_max_min) > 0:
p = np.polyfit(np.int64(desaturation_time[desaturation_idx_max_min]),
desaturation_spo2[desaturation_idx_max_min], 1)
desaturation_slope_all[i] = p[0]
begin_time = time_signal[begin_idx]
end_time = time_signal[end_idx]
desat_patient = pd.DataFrame({
"begin": begin_time,
"end": end_time,
"begin_idx": begin_idx,
"end_idx": end_idx,
"depth": desaturation_depth_max_all,
"length": desaturation_length_all,
"area": desaturation_int_max_all
})
return desat_patient
pd.read_table('ICU_COVID_patient1.csv',delimiter=',')
data_icu=pd.read_table('ICU_COVID_patient1.csv',delimiter=',')
data_icu['Time'] = pd.to_datetime(data_icu['Time'])
data_icu['SpO2'] = pd.to_numeric(data_icu['SpO2'], errors='coerce')
# # Plot SpO2
# plt.figure(figsize=(5, 3))
# plt.plot(range(0, len(data_icu['SpO2'])), data_icu['SpO2'])
#
# plt.xlabel('Time[s]')
# plt.ylabel('SpO2 (%)')
# plt.title('SpO2 Variation Over Time')
# plt.xticks(rotation=45)
# plt.grid(True)
# plt.show()
#
# # Plot filtered SpO2
spo2_signal= data_icu['SpO2']
spo2_signal = set_range(spo2_signal)
spo2_signal = median_spo2(spo2_signal, FilterLength=301)
#
# plt.figure(figsize=(5, 3))
# plt.plot(range(0, len(spo2_signal)), spo2_signal)
#
# # Plot filtered SpO2
# plt.xlabel('Time[s]')
# plt.ylabel('SpO2 (%)')
# plt.title('SpO2 Variation Over Time')
# plt.xticks(rotation=45)
# plt.grid(True)
# plt.show()
test_desat = features_all_desat(spo2_signal,data_icu['Time'],ODI_Threshold=4, hard_threshold=93, relative=False, desat_max_length =14400)
test_desat
\ No newline at end of file
import pyPPG
from pyPPG import PPG, Fiducials, Biomarkers
from pyPPG.datahandling import load_data, plot_fiducials, save_data, load_fiducials
import pyPPG.preproc as PP
import pyPPG.fiducials as FP
import pyPPG.biomarkers as BM
import pyPPG.ppg_sqi as SQI
import numpy as np
import sys
import json
import pandas as pd
import scipy.io
###########################################################################
################################## EXAMPLE ################################
###########################################################################
#[docs]
def ppg_example(data_path="", fs=0, start_sig=0, end_sig=-1, fiducials=pd.DataFrame(), process_type="both", channel="Pleth",
filtering=True, fL=0.5000001, fH=12, order=4, sm_wins={'ppg':50,'vpg':10,'apg':10,'jpg':10}, correction=pd.DataFrame(),
plotfig=True, savingfolder="temp_dir", savefig=True, show_fig=True, savingformat="both", print_flag=True, use_tk=False,
check_ppg_len=True, saved_fiducials="", savedata=True):
'''
This is an example code for PPG analysis. The main parts:
1) Loading a raw PPG signal: various file formats such as .mat, .csv, .txt, or .edf.
2) Get Fiducial points: extract the fiducial points of PPG, PPG', PPG'' and PPG'" signals
3) Plot Fiducial Points
4) Get Biomarkers: extract 74 PPG biomarkers in four categories:
- PPG signal
- Signal ratios
- PPG derivatives
- Derivatives ratios
5) Get Statistics: summary of the 74 PPG biomarkers
6) SQI calculation: calculates the PPG Signal Quality Index
7) Save data: save the extracted Fiducial points, Biomarkers, and Statistics into .csv file
:param data_path: path of the PPG signal
:type data_path: str
:param fs: sampling_frequency
:type fs: int
:param start_sig: beginning the of signal in sample
:type start_sig: int
:param end_sig: end of the signal in sample
:type end_sig: int
:param fiducials: DataFrame of the fiducial points
:type fiducials: pyPPG.Fiducials DataFrame
:param process_type: the type of the process, which can be "fiducials", "biomarkers", or "both"
:type process_type: str
:param channel: channel of the .edf file
:type channel: channel of the .edf file
:param filtering: a bool for filtering
:type filtering: bool
:param fL: Lower cutoff frequency (Hz)
:type fL: float
:param fH: Upper cutoff frequency (Hz)
:type fH: float
:param order: Filter order
:type order: int
:param sm_wins: dictionary of smoothing windows in millisecond:
- ppg: window for PPG signal
- vpg: window for PPG' signal
- apg: window for PPG" signal
- jpg: window for PPG'" signal
:type sm_wins: dict
:param correction: DataFrame where the key is the name of the fiducial points and the value is bool
:type correction: DataFrame
:param plotfig: a bool for plot figure
:type plotfig: bool
:param savingfolder: location of the saved data
:type savingfolder: str
:param savefig: a bool for current figure saving
:type savefig: bool
:param show_fig: a bool for show figure
:type show_fig: bool
:param savingformat: file format of the saved date, the provided file formats .mat, .csv, or both
:type savingformat: str
:param print_flag: a bool for print message
:type print_flag: bool
:param use_tk: a bool for using tkinter interface
:type use_tk: bool
:param check_ppg: a bool for checking ppg length and sampling frequency
:type check_ppg: bool
:param saved_fiducials: path of the file of the saved fiducial points
:type saved_fiducials: str
:param savedata: a bool for saving data
:type savedata: bool
:return: file_names: dictionary of the saved file names
Example:
.. code-block:: python
from pyPPG.example import ppg_example
# run example code
ppg_example()
'''
## Loading a raw PPG signal
signal = load_data(data_path=data_path, fs=fs, start_sig=start_sig, end_sig=end_sig, channel=channel, use_tk=True, print_flag=print_flag)
## Preprocessing
# Initialise the filters
prep = PP.Preprocess(fL=fL, fH=fH, order=order, sm_wins=sm_wins)
# Filter and calculate the PPG, PPG', PPG", and PPG'" signals
signal.filtering = filtering
signal.fL = fL
signal.fH = fH
signal.order = order
signal.sm_wins = sm_wins
signal.ppg, signal.vpg, signal.apg, signal.jpg = prep.get_signals(s=signal)
# Initialise the correction for fiducial points
corr_on = ['on', 'dn', 'dp', 'v', 'w', 'f']
correction.loc[0, corr_on] = True
signal.correction=correction
## Create a PPG class
s = PPG(s=signal, check_ppg_len=check_ppg_len)
## Get Fiducial points
if process_type == 'fiducials' or process_type == 'both':
# Initialise the fiducials package
fpex = FP.FpCollection(s=s)
# Extract fiducial points
fiducials = fpex.get_fiducials(s=s)
if print_flag: print("Fiducial points:\n", fiducials + s.start_sig)
# Create a fiducials class
fp = Fiducials(fp=fiducials)
# Save data
if savedata:
fp_new = Fiducials(fp=fp.get_fp() + s.start_sig)
file_names=save_data(savingformat=savingformat, savingfolder=savingfolder, print_flag=print_flag, s=s, fp=fp_new)
## PPG SQI
# Calculate SQI
ppgSQI = round(np.mean(SQI.get_ppgSQI(ppg=s.ppg, fs=s.fs, annotation=fp.sp)) * 100, 2)
if print_flag: print('Mean PPG SQI: ', ppgSQI, '%')
## Plot fiducial points
if plotfig: plot_fiducials(s=s, fp=fp, savefig=savefig, savingfolder=savingfolder, show_fig=show_fig, print_flag=print_flag, use_tk=use_tk)
## Load saved fiducial points from MATLAB struct
if ".mat" in saved_fiducials:
tmp_fp1 = load_fiducials(saved_fiducials=saved_fiducials)
tmp_fp2 = tmp_fp1[(tmp_fp1['on']>= s.start_sig) & (tmp_fp1['off']<= s.end_sig)]
fiducials = tmp_fp2-s.start_sig
fiducials.index =range(0,len(fiducials))
## Get Biomarkers and Statistics
if (process_type == 'biomarkers' or process_type == 'both') and len(fiducials)>0:
# Initialise the biomarkers package
fp = Fiducials(fp=fiducials)
bmex = BM.BmCollection(s=s, fp=fp)
# Extract biomarkers
bm_defs, bm_vals, bm_stats = bmex.get_biomarkers()
if print_flag:
tmp_keys = bm_stats.keys()
print('Statistics of the biomarkers:')
for i in tmp_keys: print(i, '\n', bm_stats[i])
# Create a biomarkers class
bm = Biomarkers(bm_defs=bm_defs, bm_vals=bm_vals, bm_stats=bm_stats)
# Save data
if savedata:
fp_new = Fiducials(fp=fp.get_fp() + s.start_sig)
file_names=save_data(savingformat=savingformat, savingfolder=savingfolder, print_flag=print_flag, s=s, fp=fp_new, bm=bm)
if print_flag: print('Program finished')
return file_names
###########################################################################
############################## RUN EXAMPLE CODE ###########################
###########################################################################
if __name__ == "__main__":
if len(sys.argv) > 1:
input_data = json.loads(sys.argv[1])
function_name = input_data['function']
function_args = input_data['args']
if function_name == 'ppg_example':
file_names = ppg_example(**function_args)
print(json.dumps(file_names))
else:
print("Invalid function name")
else:
print("Please provide function name and arguments as JSON string")
ppg_example(savefig=True)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment