RSS Git Download  Clone
Raw Blame History 7kB 218 lines
"""Application state, WebSocket broadcasting, audio capture loop, and control functions."""

import asyncio
import os
import tempfile
import threading
import time
from datetime import datetime

from meet_assistant import BASE_DIR, diarizer
from meet_assistant import engines
from meet_assistant.engines import AVAILABLE_ENGINES, ENGINE_CLASSES

# ── Session auto-save ─────────────────────────────────────────────────────────

_SESSIONS_DIR = os.path.join(BASE_DIR, "sessions")
_session_file = None
_session_lock = threading.Lock()


def _session_path():
    """Return the current session's file path, creating a new one per day-hour-minute."""
    global _session_file
    with _session_lock:
        if _session_file is None:
            os.makedirs(_SESSIONS_DIR, exist_ok=True)
            stamp = datetime.now().strftime("%Y-%m-%d-%H-%M")
            _session_file = os.path.join(_SESSIONS_DIR, f"{stamp}.txt")
        return _session_file


def _append_to_session(line):
    try:
        path = _session_path()
        with open(path, "a", encoding="utf-8") as f:
            f.write(line + "\n")
    except Exception:
        pass  # Never let session-save errors break transcription


def _reset_session_file():
    global _session_file
    with _session_lock:
        _session_file = None

# ── Mutable state ──────────────────────────────────────────────────────────────

loop = None
clients = set()
current_engine = None
current_engine_name = None

# Supported transcription languages. One per session to avoid cross-language
# hallucinations. Native names shown in UI.
SUPPORTED_LANGUAGES = {
    "en": "English",
    "hu": "Magyar",
    "de": "Deutsch",
    "fr": "Fran\u00e7ais",
    "es": "Espa\u00f1ol",
    "it": "Italiano",
    "pt": "Portugu\u00eas",
    "nl": "Nederlands",
    "pl": "Polski",
    "cs": "\u010ce\u0161tina",
}
current_language = "en"
server_capture = False  # True when PulseAudio capture is active (localhost mode)
speaker_stop = threading.Event()

# ── Broadcasting ───────────────────────────────────────────────────────────────


async def _broadcast(data):
    dead = set()
    for ws in list(clients):
        try:
            await ws.send_json(data)
        except Exception:
            dead.add(ws)
    clients.difference_update(dead)


def broadcast(data):
    if loop and clients:
        asyncio.run_coroutine_threadsafe(_broadcast(data), loop)


_recent = []  # dedup buffer: (timestamp, text)


def send_transcript(text, lang=None, speaker=None):
    now = time.time()
    for ts, prev in _recent:
        if now - ts < 3 and prev == text:
            return
    _recent.append((now, text))
    _recent[:] = [(ts, t) for ts, t in _recent if now - ts < 5]

    stamp = datetime.now().strftime("%H:%M:%S")
    broadcast({
        "type": "transcript",
        "speaker": speaker,
        "text": text,
        "lang": lang.upper() if lang else None,
        "time": stamp,
    })

    # Auto-save to disk
    spk_label = f"Speaker {speaker}" if speaker else "Speaker"
    lang_tag = f" [{lang.upper()}]" if lang else ""
    _append_to_session(f"[{stamp}] {spk_label}{lang_tag}: {text}")


def send_status(status):
    broadcast({"type": "status", "status": status})


def send_state():
    broadcast({
        "type": "state",
        "engine": current_engine_name,
        "engines": AVAILABLE_ENGINES,
        "language": current_language,
        "languages": SUPPORTED_LANGUAGES,
    })


# ── Diarization helpers ────────────────────────────────────────────────────────


def _audio_data_to_wav_bytes(audio_data):
    """Convert a speech_recognition AudioData into raw WAV bytes."""
    import speech_recognition as sr
    raw = audio_data.get_raw_data()
    return sr.AudioData(raw, audio_data.sample_rate, audio_data.sample_width).get_wav_data()


def _diarize(wav_bytes):
    try:
        return diarizer.identify_speaker(wav_bytes)
    except Exception:
        return None


# ── Capture loop ───────────────────────────────────────────────────────────────


def speaker_capture_loop(recognizer, speaker):
    """Capture speaker audio via PulseAudio monitor and transcribe."""
    from meet_assistant.audio import _suppress_stderr, _restore_stderr
    import speech_recognition as sr

    while not speaker_stop.is_set():
        try:
            send_status("listening")
            old_err = _suppress_stderr()
            try:
                with speaker as source:
                    recognizer.adjust_for_ambient_noise(source, duration=0.5)
                    audio = recognizer.listen(source, timeout=10, phrase_time_limit=30)
            finally:
                _restore_stderr(old_err)
        except sr.WaitTimeoutError:
            continue
        except Exception:
            time.sleep(1)
            continue

        engine = current_engine
        if not engine:
            continue
        send_status("processing")
        try:
            text, lang = engine.transcribe_audio(audio, language=current_language)
            if text:
                spk = _diarize(_audio_data_to_wav_bytes(audio))
                send_transcript(text, lang or current_language, speaker=spk)
        except Exception as e:
            broadcast({"type": "error", "error": str(e)})


# ── Control functions ──────────────────────────────────────────────────────────


def set_language(lang):
    global current_language
    if lang not in SUPPORTED_LANGUAGES:
        return
    current_language = lang
    send_state()


def reset_speakers():
    """Clear diarizer memory, rolling prompt context, and start a new session file."""
    diarizer.reset()
    engines.reset_context()
    _reset_session_file()


def transcribe_browser_audio(wav_bytes):
    """Transcribe WAV audio received from browser capture (getUserMedia / getDisplayMedia)."""
    engine = current_engine
    if not engine:
        return
    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
        f.write(wav_bytes)
        p = f.name
    try:
        text, lang = engine.transcribe_wav(p, language=current_language)
        if text:
            spk = _diarize(wav_bytes)
            send_transcript(text, lang or current_language, speaker=spk)
    except Exception as e:
        broadcast({"type": "error", "error": str(e)})
    finally:
        os.unlink(p)