"""Utils module""" from datetime import datetime import logging import pyaudio import wave import logging import os import functools import yaml from types import SimpleNamespace import pygame import time import pyaudio import math import struct import wave import time import os from threading import Thread from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler format_mapping = {"pyaudio.paInt16": pyaudio.paInt16} def yaml_file_decorator(yaml_file_path): """Decorator to pass a config file to other functions Args: yaml_file_path (_type_): path to config file """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): # Load YAML file with open(yaml_file_path, "r") as file: config = yaml.safe_load(file) # Pass the loaded YAML data to the decorated function result = func(config, *args, **kwargs) return result return wrapper return decorator @yaml_file_decorator("conf/recording.yaml") def record_audio(config) -> str: """function to record audio. Configuaration file is in conf/recordings.yaml Args: config (dict): configuration of recording parameters Returns: recording_path (str): destination path of recorded audio """ config = SimpleNamespace(**config) p = pyaudio.PyAudio() sample_format = format_mapping.get(config.sample_format, 8) stream = p.open( format=sample_format, channels=1, # config.channels, rate=config.fs * 2, frames_per_buffer=config.chunk, input=True, ) frames = [] # Initialize array to store frames # Store data in chunks for 3 seconds for i in range(0, int(config.fs / config.chunk * config.seconds)): data = stream.read(config.chunk) frames.append(data) # Stop and close the stream stream.stop_stream() stream.close() # Terminate the PortAudio interface p.terminate() logging.info("Finished recording") # Save the recorded data as a WAV file recording_path = os.path.join(config.recording_folder, get_current_time() + ".wav") recording_path with wave.open(recording_path, "wb") as wf: wf.setnchannels(config.channels) wf.setsampwidth(p.get_sample_size(sample_format)) # config.sample_format wf.setframerate(config.fs) wf.writeframes(b"".join(frames)) wf.close() return recording_path def play_mp3(mp3_file_path): try: pygame.mixer.init() pygame.mixer.music.load(mp3_file_path) pygame.mixer.music.play() # Wait for the music to finish playing while pygame.mixer.music.get_busy(): pygame.time.Clock().tick(10) except Exception as e: print(f"Error: {e}") def get_current_time(): now = datetime.now() dt_string = now.strftime("%d-%m-%Y_%H:%M:%S") return dt_string def read_text(file_path: str) -> str: try: with open(file_path, "r") as file_in: text = file_in.read() file_in.close() logging.info(f"Success: file {file_path} read correctly") return text except FileNotFoundError as e: logging.error(f"Error: File {file_path} not found - {str(e)}") return "" def get_pyaudio_format(subtype): if subtype == "PCM_16": return pyaudio.paInt16 elif subtype == "PCM_8": return pyaudio.paInt8 elif subtype == "PCM_32": return pyaudio.paInt32 else: return pyaudio.paInt16 class Recorder: """Class to continuosly listen to user input, ans save audio when noise is detected. Once noise is detected it will be run the function_to_call with the create filename as parameter Args: function_to_call: (function) func to be called with the generated file path as parameter """ def __init__(self, function_to_call): self.Threshold = 10 self.SHORT_NORMALIZE = 1.0 / 32768.0 self.chunk = 1024 self.FORMAT = pyaudio.paInt16 self.CHANNELS = 1 self.RATE = 16_000 self.swidth = 2 self.TIMEOUT_LENGTH = 2 self.f_name_directory = r"/Users/marcellopoliti/Documents/Coding/pischool/lux-voice-processing/data/audio_recordings" self.function_to_call = function_to_call self.p = pyaudio.PyAudio() self.stream = self.p.open( format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, output=True, frames_per_buffer=self.chunk, ) # @staticmethod def rms(self, frame): count = len(frame) / self.swidth format = "%dh" % (count) shorts = struct.unpack(format, frame) sum_squares = 0.0 for sample in shorts: n = sample * self.SHORT_NORMALIZE sum_squares += n * n rms = math.pow(sum_squares / count, 0.5) return rms * 1000 def record(self): print("Noise detected, recording beginning") rec = [] current = time.time() end = time.time() + self.TIMEOUT_LENGTH while current <= end: data = self.stream.read(self.chunk) if self.rms(data) >= self.Threshold: end = time.time() + self.TIMEOUT_LENGTH current = time.time() rec.append(data) filename = self.write(b"".join(rec)) return filename def write(self, recording): n_files = len(os.listdir(self.f_name_directory)) filename = os.path.join(self.f_name_directory, "{}.wav".format(n_files)) wf = wave.open(filename, "wb") wf.setnchannels(self.CHANNELS) wf.setsampwidth(self.p.get_sample_size(self.FORMAT)) wf.setframerate(self.RATE) wf.writeframes(recording) wf.close() logging.info("Written to file: {}".format(filename)) return filename def listen(self): print("Listening beginning") while True: input = self.stream.read(self.chunk, exception_on_overflow=False) rms_val = self.rms(input) if rms_val > self.Threshold: filename = self.record() self.function_to_call(filename) # Function to check for new recordings and print a message def check_for_new_recordings(function_to_call): previous_files = set(os.listdir("audio_recordings")) while True: current_files = set(os.listdir("audio_recordings")) new_files = current_files - previous_files for new_file in new_files: print(f"New recording detected: {new_file}") function_to_call(new_file) print("Returning to listening") previous_files = current_files time.sleep(2) # Check for new recordings every 2 seconds if __name__ == "__main__": recordings_path = record_audio()