"""Application state, WebSocket broadcasting, audio capture loop, and control functions.""" import asyncio import os import tempfile import threading import time from datetime import datetime from meet_assistant import BASE_DIR, diarizer from meet_assistant import engines from meet_assistant.engines import AVAILABLE_ENGINES, ENGINE_CLASSES # ── Session auto-save ───────────────────────────────────────────────────────── _SESSIONS_DIR = os.path.join(BASE_DIR, "sessions") _session_file = None _session_lock = threading.Lock() def _session_path(): """Return the current session's file path, creating a new one per day-hour-minute.""" global _session_file with _session_lock: if _session_file is None: os.makedirs(_SESSIONS_DIR, exist_ok=True) stamp = datetime.now().strftime("%Y-%m-%d-%H-%M") _session_file = os.path.join(_SESSIONS_DIR, f"{stamp}.txt") return _session_file def _append_to_session(line): try: path = _session_path() with open(path, "a", encoding="utf-8") as f: f.write(line + "\n") except Exception: pass # Never let session-save errors break transcription def _reset_session_file(): global _session_file with _session_lock: _session_file = None # ── Mutable state ────────────────────────────────────────────────────────────── loop = None clients = set() current_engine = None current_engine_name = None # Supported transcription languages. One per session to avoid cross-language # hallucinations. Native names shown in UI. SUPPORTED_LANGUAGES = { "en": "English", "hu": "Magyar", "de": "Deutsch", "fr": "Fran\u00e7ais", "es": "Espa\u00f1ol", "it": "Italiano", "pt": "Portugu\u00eas", "nl": "Nederlands", "pl": "Polski", "cs": "\u010ce\u0161tina", } current_language = "en" server_capture = False # True when PulseAudio capture is active (localhost mode) speaker_stop = threading.Event() # ── Broadcasting ─────────────────────────────────────────────────────────────── async def _broadcast(data): dead = set() for ws in list(clients): try: await ws.send_json(data) except Exception: dead.add(ws) clients.difference_update(dead) def broadcast(data): if loop and clients: asyncio.run_coroutine_threadsafe(_broadcast(data), loop) _recent = [] # dedup buffer: (timestamp, text) def send_transcript(text, lang=None, speaker=None): now = time.time() for ts, prev in _recent: if now - ts < 3 and prev == text: return _recent.append((now, text)) _recent[:] = [(ts, t) for ts, t in _recent if now - ts < 5] stamp = datetime.now().strftime("%H:%M:%S") broadcast({ "type": "transcript", "speaker": speaker, "text": text, "lang": lang.upper() if lang else None, "time": stamp, }) # Auto-save to disk spk_label = f"Speaker {speaker}" if speaker else "Speaker" lang_tag = f" [{lang.upper()}]" if lang else "" _append_to_session(f"[{stamp}] {spk_label}{lang_tag}: {text}") def send_status(status): broadcast({"type": "status", "status": status}) def send_state(): broadcast({ "type": "state", "engine": current_engine_name, "engines": AVAILABLE_ENGINES, "language": current_language, "languages": SUPPORTED_LANGUAGES, }) # ── Diarization helpers ──────────────────────────────────────────────────────── def _audio_data_to_wav_bytes(audio_data): """Convert a speech_recognition AudioData into raw WAV bytes.""" import speech_recognition as sr raw = audio_data.get_raw_data() return sr.AudioData(raw, audio_data.sample_rate, audio_data.sample_width).get_wav_data() def _diarize(wav_bytes): try: return diarizer.identify_speaker(wav_bytes) except Exception: return None # ── Capture loop ─────────────────────────────────────────────────────────────── def speaker_capture_loop(recognizer, speaker): """Capture speaker audio via PulseAudio monitor and transcribe.""" from meet_assistant.audio import _suppress_stderr, _restore_stderr import speech_recognition as sr while not speaker_stop.is_set(): try: send_status("listening") old_err = _suppress_stderr() try: with speaker as source: recognizer.adjust_for_ambient_noise(source, duration=0.5) audio = recognizer.listen(source, timeout=10, phrase_time_limit=30) finally: _restore_stderr(old_err) except sr.WaitTimeoutError: continue except Exception: time.sleep(1) continue engine = current_engine if not engine: continue send_status("processing") try: text, lang = engine.transcribe_audio(audio, language=current_language) if text: spk = _diarize(_audio_data_to_wav_bytes(audio)) send_transcript(text, lang or current_language, speaker=spk) except Exception as e: broadcast({"type": "error", "error": str(e)}) # ── Control functions ────────────────────────────────────────────────────────── def set_language(lang): global current_language if lang not in SUPPORTED_LANGUAGES: return current_language = lang send_state() def reset_speakers(): """Clear diarizer memory, rolling prompt context, and start a new session file.""" diarizer.reset() engines.reset_context() _reset_session_file() def transcribe_browser_audio(wav_bytes): """Transcribe WAV audio received from browser capture (getUserMedia / getDisplayMedia).""" engine = current_engine if not engine: return with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: f.write(wav_bytes) p = f.name try: text, lang = engine.transcribe_wav(p, language=current_language) if text: spk = _diarize(wav_bytes) send_transcript(text, lang or current_language, speaker=spk) except Exception as e: broadcast({"type": "error", "error": str(e)}) finally: os.unlink(p)