Spaces:
Sleeping
Sleeping
"""Utils module""" | |
from datetime import datetime | |
import logging | |
import pyaudio | |
import wave | |
import logging | |
import os | |
import functools | |
import yaml | |
from types import SimpleNamespace | |
import pygame | |
import time | |
import pyaudio | |
import math | |
import struct | |
import wave | |
import time | |
import os | |
from threading import Thread | |
from watchdog.observers import Observer | |
from watchdog.events import FileSystemEventHandler | |
format_mapping = {"pyaudio.paInt16": pyaudio.paInt16} | |
def yaml_file_decorator(yaml_file_path): | |
"""Decorator to pass a config file to other functions | |
Args: | |
yaml_file_path (_type_): path to config file | |
""" | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
# Load YAML file | |
with open(yaml_file_path, "r") as file: | |
config = yaml.safe_load(file) | |
# Pass the loaded YAML data to the decorated function | |
result = func(config, *args, **kwargs) | |
return result | |
return wrapper | |
return decorator | |
def record_audio(config) -> str: | |
"""function to record audio. Configuaration file is in conf/recordings.yaml | |
Args: | |
config (dict): configuration of recording parameters | |
Returns: | |
recording_path (str): destination path of recorded audio | |
""" | |
config = SimpleNamespace(**config) | |
p = pyaudio.PyAudio() | |
sample_format = format_mapping.get(config.sample_format, 8) | |
stream = p.open( | |
format=sample_format, | |
channels=1, # config.channels, | |
rate=config.fs * 2, | |
frames_per_buffer=config.chunk, | |
input=True, | |
) | |
frames = [] # Initialize array to store frames | |
# Store data in chunks for 3 seconds | |
for i in range(0, int(config.fs / config.chunk * config.seconds)): | |
data = stream.read(config.chunk) | |
frames.append(data) | |
# Stop and close the stream | |
stream.stop_stream() | |
stream.close() | |
# Terminate the PortAudio interface | |
p.terminate() | |
logging.info("Finished recording") | |
# Save the recorded data as a WAV file | |
recording_path = os.path.join(config.recording_folder, get_current_time() + ".wav") | |
recording_path | |
with wave.open(recording_path, "wb") as wf: | |
wf.setnchannels(config.channels) | |
wf.setsampwidth(p.get_sample_size(sample_format)) # config.sample_format | |
wf.setframerate(config.fs) | |
wf.writeframes(b"".join(frames)) | |
wf.close() | |
return recording_path | |
def play_mp3(mp3_file_path): | |
try: | |
pygame.mixer.init() | |
pygame.mixer.music.load(mp3_file_path) | |
pygame.mixer.music.play() | |
# Wait for the music to finish playing | |
while pygame.mixer.music.get_busy(): | |
pygame.time.Clock().tick(10) | |
except Exception as e: | |
print(f"Error: {e}") | |
def get_current_time(): | |
now = datetime.now() | |
dt_string = now.strftime("%d-%m-%Y_%H:%M:%S") | |
return dt_string | |
def read_text(file_path: str) -> str: | |
try: | |
with open(file_path, "r") as file_in: | |
text = file_in.read() | |
file_in.close() | |
logging.info(f"Success: file {file_path} read correctly") | |
return text | |
except FileNotFoundError as e: | |
logging.error(f"Error: File {file_path} not found - {str(e)}") | |
return "" | |
def get_pyaudio_format(subtype): | |
if subtype == "PCM_16": | |
return pyaudio.paInt16 | |
elif subtype == "PCM_8": | |
return pyaudio.paInt8 | |
elif subtype == "PCM_32": | |
return pyaudio.paInt32 | |
else: | |
return pyaudio.paInt16 | |
class Recorder: | |
"""Class to continuosly listen to user input, ans save audio when noise is detected. | |
Once noise is detected it will be run the function_to_call with the create filename as parameter | |
Args: | |
function_to_call: (function) func to be called with the generated file path as parameter | |
""" | |
def __init__(self, function_to_call): | |
self.Threshold = 10 | |
self.SHORT_NORMALIZE = 1.0 / 32768.0 | |
self.chunk = 1024 | |
self.FORMAT = pyaudio.paInt16 | |
self.CHANNELS = 1 | |
self.RATE = 16_000 | |
self.swidth = 2 | |
self.TIMEOUT_LENGTH = 2 | |
self.f_name_directory = r"/Users/marcellopoliti/Documents/Coding/pischool/lux-voice-processing/data/audio_recordings" | |
self.function_to_call = function_to_call | |
self.p = pyaudio.PyAudio() | |
self.stream = self.p.open( | |
format=self.FORMAT, | |
channels=self.CHANNELS, | |
rate=self.RATE, | |
input=True, | |
output=True, | |
frames_per_buffer=self.chunk, | |
) | |
# @staticmethod | |
def rms(self, frame): | |
count = len(frame) / self.swidth | |
format = "%dh" % (count) | |
shorts = struct.unpack(format, frame) | |
sum_squares = 0.0 | |
for sample in shorts: | |
n = sample * self.SHORT_NORMALIZE | |
sum_squares += n * n | |
rms = math.pow(sum_squares / count, 0.5) | |
return rms * 1000 | |
def record(self): | |
print("Noise detected, recording beginning") | |
rec = [] | |
current = time.time() | |
end = time.time() + self.TIMEOUT_LENGTH | |
while current <= end: | |
data = self.stream.read(self.chunk) | |
if self.rms(data) >= self.Threshold: | |
end = time.time() + self.TIMEOUT_LENGTH | |
current = time.time() | |
rec.append(data) | |
filename = self.write(b"".join(rec)) | |
return filename | |
def write(self, recording): | |
n_files = len(os.listdir(self.f_name_directory)) | |
filename = os.path.join(self.f_name_directory, "{}.wav".format(n_files)) | |
wf = wave.open(filename, "wb") | |
wf.setnchannels(self.CHANNELS) | |
wf.setsampwidth(self.p.get_sample_size(self.FORMAT)) | |
wf.setframerate(self.RATE) | |
wf.writeframes(recording) | |
wf.close() | |
logging.info("Written to file: {}".format(filename)) | |
return filename | |
def listen(self): | |
print("Listening beginning") | |
while True: | |
input = self.stream.read(self.chunk, exception_on_overflow=False) | |
rms_val = self.rms(input) | |
if rms_val > self.Threshold: | |
filename = self.record() | |
self.function_to_call(filename) | |
# Function to check for new recordings and print a message | |
def check_for_new_recordings(function_to_call): | |
previous_files = set(os.listdir("audio_recordings")) | |
while True: | |
current_files = set(os.listdir("audio_recordings")) | |
new_files = current_files - previous_files | |
for new_file in new_files: | |
print(f"New recording detected: {new_file}") | |
function_to_call(new_file) | |
print("Returning to listening") | |
previous_files = current_files | |
time.sleep(2) # Check for new recordings every 2 seconds | |
if __name__ == "__main__": | |
recordings_path = record_audio() | |