marcellopoliti's picture
fix dockerfile
9da994b
raw
history blame
7 kB
"""Utils module"""
from datetime import datetime
import logging
import pyaudio
import wave
import logging
import os
import functools
import yaml
from types import SimpleNamespace
import pygame
import time
import pyaudio
import math
import struct
import wave
import time
import os
from threading import Thread
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
format_mapping = {"pyaudio.paInt16": pyaudio.paInt16}
def yaml_file_decorator(yaml_file_path):
"""Decorator to pass a config file to other functions
Args:
yaml_file_path (_type_): path to config file
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Load YAML file
with open(yaml_file_path, "r") as file:
config = yaml.safe_load(file)
# Pass the loaded YAML data to the decorated function
result = func(config, *args, **kwargs)
return result
return wrapper
return decorator
@yaml_file_decorator("conf/recording.yaml")
def record_audio(config) -> str:
"""function to record audio. Configuaration file is in conf/recordings.yaml
Args:
config (dict): configuration of recording parameters
Returns:
recording_path (str): destination path of recorded audio
"""
config = SimpleNamespace(**config)
p = pyaudio.PyAudio()
sample_format = format_mapping.get(config.sample_format, 8)
stream = p.open(
format=sample_format,
channels=1, # config.channels,
rate=config.fs * 2,
frames_per_buffer=config.chunk,
input=True,
)
frames = [] # Initialize array to store frames
# Store data in chunks for 3 seconds
for i in range(0, int(config.fs / config.chunk * config.seconds)):
data = stream.read(config.chunk)
frames.append(data)
# Stop and close the stream
stream.stop_stream()
stream.close()
# Terminate the PortAudio interface
p.terminate()
logging.info("Finished recording")
# Save the recorded data as a WAV file
recording_path = os.path.join(config.recording_folder, get_current_time() + ".wav")
recording_path
with wave.open(recording_path, "wb") as wf:
wf.setnchannels(config.channels)
wf.setsampwidth(p.get_sample_size(sample_format)) # config.sample_format
wf.setframerate(config.fs)
wf.writeframes(b"".join(frames))
wf.close()
return recording_path
def play_mp3(mp3_file_path):
try:
pygame.mixer.init()
pygame.mixer.music.load(mp3_file_path)
pygame.mixer.music.play()
# Wait for the music to finish playing
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
except Exception as e:
print(f"Error: {e}")
def get_current_time():
now = datetime.now()
dt_string = now.strftime("%d-%m-%Y_%H:%M:%S")
return dt_string
def read_text(file_path: str) -> str:
try:
with open(file_path, "r") as file_in:
text = file_in.read()
file_in.close()
logging.info(f"Success: file {file_path} read correctly")
return text
except FileNotFoundError as e:
logging.error(f"Error: File {file_path} not found - {str(e)}")
return ""
def get_pyaudio_format(subtype):
if subtype == "PCM_16":
return pyaudio.paInt16
elif subtype == "PCM_8":
return pyaudio.paInt8
elif subtype == "PCM_32":
return pyaudio.paInt32
else:
return pyaudio.paInt16
class Recorder:
"""Class to continuosly listen to user input, ans save audio when noise is detected.
Once noise is detected it will be run the function_to_call with the create filename as parameter
Args:
function_to_call: (function) func to be called with the generated file path as parameter
"""
def __init__(self, function_to_call):
self.Threshold = 10
self.SHORT_NORMALIZE = 1.0 / 32768.0
self.chunk = 1024
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16_000
self.swidth = 2
self.TIMEOUT_LENGTH = 2
self.f_name_directory = r"/Users/marcellopoliti/Documents/Coding/pischool/lux-voice-processing/data/audio_recordings"
self.function_to_call = function_to_call
self.p = pyaudio.PyAudio()
self.stream = self.p.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
output=True,
frames_per_buffer=self.chunk,
)
# @staticmethod
def rms(self, frame):
count = len(frame) / self.swidth
format = "%dh" % (count)
shorts = struct.unpack(format, frame)
sum_squares = 0.0
for sample in shorts:
n = sample * self.SHORT_NORMALIZE
sum_squares += n * n
rms = math.pow(sum_squares / count, 0.5)
return rms * 1000
def record(self):
print("Noise detected, recording beginning")
rec = []
current = time.time()
end = time.time() + self.TIMEOUT_LENGTH
while current <= end:
data = self.stream.read(self.chunk)
if self.rms(data) >= self.Threshold:
end = time.time() + self.TIMEOUT_LENGTH
current = time.time()
rec.append(data)
filename = self.write(b"".join(rec))
return filename
def write(self, recording):
n_files = len(os.listdir(self.f_name_directory))
filename = os.path.join(self.f_name_directory, "{}.wav".format(n_files))
wf = wave.open(filename, "wb")
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.p.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(recording)
wf.close()
logging.info("Written to file: {}".format(filename))
return filename
def listen(self):
print("Listening beginning")
while True:
input = self.stream.read(self.chunk, exception_on_overflow=False)
rms_val = self.rms(input)
if rms_val > self.Threshold:
filename = self.record()
self.function_to_call(filename)
# Function to check for new recordings and print a message
def check_for_new_recordings(function_to_call):
previous_files = set(os.listdir("audio_recordings"))
while True:
current_files = set(os.listdir("audio_recordings"))
new_files = current_files - previous_files
for new_file in new_files:
print(f"New recording detected: {new_file}")
function_to_call(new_file)
print("Returning to listening")
previous_files = current_files
time.sleep(2) # Check for new recordings every 2 seconds
if __name__ == "__main__":
recordings_path = record_audio()