Skip to content
Snippets Groups Projects
Commit 788cc90b authored by Köllőd Csaba's avatar Köllőd Csaba
Browse files

Dev: adding packets and plot

parent 23586c2c
No related branches found
No related tags found
No related merge requests found
......@@ -3,28 +3,27 @@ from pathlib import Path
from struct import unpack
import numpy as np
def _read_block_data(file, shape, format_, format_size):
sh = np.prod(shape)
data = unpack(format_ * sh, file.read(format_size * sh))
data = np.array(data)
data = data.reshape(shape)
return data
from matplotlib import pyplot as plt
class FlowPacket:
def __init__(self, date_, flow, hr, spo2, vbat_eeg, vbat_arm,
header_fs = 1
signal_fs = 250
gyro_acc_fs = 50
sound_fs = 6000
grs_fs = 10
def __init__(self, date_, flow, hr, spo2,
# vbat_eeg, vbat_arm,
eeg, eeg_giro, eeg_acc,
emg, emg_giro, emg_acc,
sound, gsr):
self.date = date_
self.flow = flow
self.hr = hr
self.spo2 = spo2
self.vbat_eeg = vbat_eeg
self.vbat_arm = vbat_arm
self.duration = 1
self.flow = [flow]
self.hr = [hr]
self.spo2 = [spo2]
self.eeg = eeg
self.eeg_giro = eeg_giro
self.eeg_acc = eeg_acc
......@@ -35,10 +34,47 @@ class FlowPacket:
self.gsr = gsr
def __add__(self, other):
pass
self.flow.extend(other.flow)
self.hr.extend(other.hr)
self.spo2.extend(other.spo2)
self.eeg = np.concatenate((self.eeg, other.eeg), axis=-1)
self.eeg_giro = np.concatenate((self.eeg_giro, other.eeg_giro), axis=-1)
self.eeg_acc = np.concatenate((self.eeg_acc, other.eeg_acc), axis=-1)
self.emg = np.concatenate((self.emg, other.emg), axis=-1)
self.emg_giro = np.concatenate((self.emg_giro, other.emg_giro), axis=-1)
self.emg_acc = np.concatenate((self.emg_acc, other.emg_acc), axis=-1)
self.sound = np.concatenate((self.sound, other.sound), axis=-1)
self.gsr = np.concatenate((self.gsr, other.gsr), axis=-1)
self.duration += other.duration
return self
def _plot(self, data, title):
plt.figure()
plt.suptitle(title)
chs = data.shape[0]
for i in range(chs):
plt.subplot(chs, 1, i + 1)
plt.plot(np.linspace(0, self.duration, data.shape[1]), data[i, :])
plt.title(f'ch{i + 1}')
plt.xlabel('time (s)')
def plot(self):
self._plot(self.eeg, 'EEG')
self._plot(self.eeg_giro, 'EEG Gyro')
self._plot(self.emg, 'EMG')
plt.show()
def process_packet(file):
def _read_block_data(file, shape, format_, format_size):
sh = np.prod(shape)
data = unpack(format_ * sh, file.read(format_size * sh))
data = np.array(data)
data = data.reshape(shape)
return data
def read_packet(file):
# header
date_, time_, flow, hr, spo2, vbat_eeg, vbat_arm = unpack('>II10sBHHH', file.read(25))
date_ = str(date_)
......@@ -65,17 +101,22 @@ def process_packet(file):
gsr = _read_block_data(file, (10, 1), 'h', 2).T
print(sound.shape, gsr.shape)
return date_, flow, hr, spo2, vbat_eeg, vbat_arm, eeg, eeg_giro, eeg_acc, emg, emg_giro
packet = FlowPacket(date_, flow, hr, spo2, # vbat_eeg, vbat_arm,
eeg, eeg_giro, eeg_acc,
emg, emg_giro, emg_acc,
sound, gsr)
return packet
def read_dat_file(file):
no_packets = unpack('>I', file.read(4))[0]
process_packet(file)
# def process_data(byte_data):
# n_packets = unpack('>i', byte_data.pop(4))[0]
# print(n_packets)
packet_holder = read_packet(file)
for _ in range(no_packets - 1):
packet_holder += read_packet(file)
remaining = file.read()
assert len(remaining) == 0, f'Remaining {len(remaining)}'
return packet_holder
def read_data(path):
......@@ -83,10 +124,11 @@ def read_data(path):
files = path.rglob('*.dat')
for file in files:
with open(file, 'rb') as f:
read_dat_file(f)
data = read_dat_file(f)
data.plot()
exit(12)
if __name__ == '__main__':
path = r'D:\Users\Csabi\dev\MCC Flow\database'
path = r'D:\Users\Csabi\OneDrive - Pázmány Péter Katolikus Egyetem\MCC Flow\database'
read_data(path)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment