diff --git a/audio_feedback.py b/audio_feedback.py index 81cc8095a5a2bbb0f8ba646edac73eb0360b8d4b..97783d96b55694ecabd51dfa70e164439efb8dcf 100644 --- a/audio_feedback.py +++ b/audio_feedback.py @@ -4,6 +4,7 @@ import time import threading import constants as c + # Globális változók sample_rate = 44100 phase_left = 0 @@ -12,6 +13,9 @@ is_playing = False last_call_time = 0 controller_lock = threading.Lock() stream = None +volume = 0.2 +max_volume = 1 +past_first_N3 = False def check_pip(tester_list): @@ -22,16 +26,21 @@ def check_pip(tester_list): - Itt implementáljuk a logikát, ami dönt a feedbackról. Ez visszamenőleg tudja elemezni a listát és ennek megfelelően dönt - Meghívhatja az audio_controller és az analyze_file is + - Addig nincsen feedback, amég az első N3 ciklus meg nem történt, hogy az alvás megszilárduljon. :param tester_list: Yasa_output_list, vagy kimentett lista :return: bool, szükséges e az audiofeedback az aktuális epochra """ - if len(tester_list) > 60: + global past_first_N3 + if not past_first_N3: + last_ten = [{k: v for k, v in elem.items() if k != 'Stage'} for elem in tester_list[-10:]] + deep_sleep_met = all((elem['N3'] > 0.6) for elem in last_ten) + if deep_sleep_met: + past_first_N3 = True + else: last_probas = [{k: v for k, v in elem.items() if k != 'Stage'} for elem in tester_list[-c.CHECK_LEN:]] - all_conditions_met = all((elem['N2'] + elem['N3'] > 0.7) and (elem['N3'] <= 0.5) for elem in last_probas) - pre_check_conditions_met = all( - (elem['N2'] + elem['N3'] + elem['R'] >= 0.6) for elem in tester_list[-c.CHECK_LEN - 15:-c.CHECK_LEN]) - return all_conditions_met and pre_check_conditions_met + all_conditions_met = all((elem['N2'] + elem['N3'] > c.N2_N3) and (elem['N3'] <= c.N3) for elem in last_probas) + return all_conditions_met def callback(outdata, frames, time, status): @@ -42,10 +51,10 @@ def callback(outdata, frames, time, status): - Létrehozza és betölti folyamatosan a binautális ütemet a streameléshez, és követi a fázist - Korrigálja a túlcsordulást a végén """ - global phase_left, phase_right + global phase_left, phase_right, volume t = (np.arange(frames) / sample_rate).reshape(-1, 1) - left = np.sin(2 * np.pi * c.FREQUENCY_LEFT * t + phase_left) - right = np.sin(2 * np.pi * c.FREQUENCY_RIGHT * t + phase_right) + left = np.sin(2 * np.pi * c.FREQUENCY_LEFT * t + phase_left) * volume + right = np.sin(2 * np.pi * c.FREQUENCY_RIGHT * t + phase_right) * volume outdata[:] = np.hstack([left, right]) phase_left += 2 * np.pi * c.FREQUENCY_LEFT * frames / sample_rate phase_right += 2 * np.pi * c.FREQUENCY_RIGHT * frames / sample_rate @@ -64,8 +73,9 @@ def start_stream(): lesz kellően, és nem recseg amikor nagyobb számításigény merül fel a kódban """ - global is_playing, stream + global is_playing, stream, volume if not is_playing: + volume = 0.2 stream = sd.OutputStream(channels=2, callback=callback, samplerate=sample_rate, blocksize=sample_rate) stream.start() is_playing = True @@ -93,7 +103,7 @@ def audio_controller(yasa_output_list): lejátszás - Ha valami miatt egy percig nincs hívás leállítja a lejátszást """ - global is_playing, last_call_time, controller_lock + global is_playing, last_call_time, controller_lock, volume should_play = check_pip(yasa_output_list) with controller_lock: current_time = time.time() @@ -106,5 +116,9 @@ def audio_controller(yasa_output_list): start_stream() elif not should_play and is_playing: stop_stream() + elif should_play and is_playing: + volume = min(volume + 0.2, max_volume) last_call_time = current_time + +