"""Speaker diarization via voice fingerprinting. Computes a 256-dim embedding per audio chunk (resemblyzer / GE2E), then assigns a stable `Speaker N` label by cosine-similarity against running cluster centroids. Runs on GPU (CUDA) when available, falls back to CPU. """ import io import threading import wave as wavmod try: import numpy as np _HAS_NUMPY = True except ImportError: np = None _HAS_NUMPY = False _encoder = None _encoder_device = None _encoder_lock = threading.Lock() # Running speaker clusters: list of dicts { "centroid": np.ndarray, "count": int } _clusters = [] _clusters_lock = threading.Lock() # Cosine similarity threshold for matching an existing speaker. # Higher = stricter (more new speakers), lower = looser (more merging). # 0.75 is a reasonable default for resemblyzer / VoxCeleb embeddings. SIMILARITY_THRESHOLD = 0.75 def _load_encoder(): """Lazy-load the resemblyzer VoiceEncoder on CUDA if possible, else CPU.""" global _encoder, _encoder_device if _encoder is not None: return _encoder with _encoder_lock: if _encoder is not None: return _encoder if not _HAS_NUMPY: print(" Diarizer: disabled (install with: pip install p3x-meet-assistant[gpu])") _encoder = False return _encoder try: import torch from resemblyzer import VoiceEncoder device = "cuda" if torch.cuda.is_available() else "cpu" _encoder = VoiceEncoder(device=device) _encoder_device = device print(f" Diarizer: VoiceEncoder loaded on {device}") except Exception as e: print(f" Diarizer: disabled ({e})") _encoder = False # sentinel: unavailable return _encoder def is_available(): enc = _load_encoder() return enc is not None and enc is not False def device(): _load_encoder() return _encoder_device def _wav_bytes_to_float32(wav_bytes): """Decode WAV bytes to mono float32 at 16kHz expected by resemblyzer.""" with wavmod.open(io.BytesIO(wav_bytes), "rb") as wf: sample_rate = wf.getframerate() n_channels = wf.getnchannels() sample_width = wf.getsampwidth() frames = wf.readframes(wf.getnframes()) if sample_width != 2: return None, None audio = np.frombuffer(frames, dtype=np.int16).astype(np.float32) / 32768.0 if n_channels > 1: audio = audio.reshape(-1, n_channels).mean(axis=1) return audio, sample_rate def _resample_to_16k(audio, sample_rate): if sample_rate == 16000: return audio # Linear resample — good enough for voice embedding ratio = 16000 / sample_rate new_len = int(len(audio) * ratio) idx = np.linspace(0, len(audio) - 1, new_len) return np.interp(idx, np.arange(len(audio)), audio).astype(np.float32) def _cosine(a, b): na = np.linalg.norm(a) nb = np.linalg.norm(b) if na == 0 or nb == 0: return 0.0 return float(np.dot(a, b) / (na * nb)) def identify_speaker(wav_bytes): """Return a 1-based integer speaker id for the given WAV chunk, or None on failure.""" enc = _load_encoder() if not enc: return None try: audio, sr = _wav_bytes_to_float32(wav_bytes) if audio is None or len(audio) < 16000 * 0.5: # <0.5s return None audio = _resample_to_16k(audio, sr) embedding = enc.embed_utterance(audio) except Exception: return None with _clusters_lock: best_idx, best_sim = -1, -1.0 all_sims = [] for i, c in enumerate(_clusters): sim = _cosine(embedding, c["centroid"]) all_sims.append(sim) if sim > best_sim: best_sim, best_idx = sim, i if best_idx >= 0 and best_sim >= SIMILARITY_THRESHOLD: c = _clusters[best_idx] n = c["count"] c["centroid"] = (c["centroid"] * n + embedding) / (n + 1) c["count"] = n + 1 sims_str = ", ".join(f"{s:.2f}" for s in all_sims) print(f" [diarize] match S{best_idx + 1} (sim={best_sim:.2f}; all=[{sims_str}])") return best_idx + 1 _clusters.append({"centroid": embedding.astype(np.float32), "count": 1}) sims_str = ", ".join(f"{s:.2f}" for s in all_sims) if all_sims else "none" print(f" [diarize] NEW S{len(_clusters)} (best={best_sim:.2f}; all=[{sims_str}])") return len(_clusters) def reset(): """Clear all learned speakers (e.g. new session).""" with _clusters_lock: _clusters.clear() def speaker_count(): with _clusters_lock: return len(_clusters)