RSS Git Download  Clone
Raw Blame History 9kB 263 lines
/**
 * Monitoring data store — exact port of Angular MonitoringDataService.
 * Manages profiler (MONITOR) and pubsub (PSUBSCRIBE) data streams.
 */
import { create } from 'zustand'
import { request, getClient } from './socket.service'
import { decode as msgpackDecode } from '@msgpack/msgpack'

export interface ProfilerEntry {
    displayTime: string
    fullTimestamp: string
    database: string
    source: string
    command: string
}

export interface PubsubEntry {
    displayTime: string
    fullTimestamp: string
    channel: string
    message: string
}

const PROFILER_STORAGE_KEY = 'p3xr-profiler-entries'
const PUBSUB_STORAGE_KEY = 'p3xr-pubsub-entries'
const MAX_ENTRIES = 10000
const MAX_STORAGE_ENTRIES = 100
const SAVE_DEBOUNCE = 2000

type ProfilerCallback = (entry: ProfilerEntry) => void
type PubsubCallback = (entry: PubsubEntry) => void

interface MonitoringDataState {
    profilerEntries: ProfilerEntry[]
    pubsubEntries: PubsubEntry[]
    profilerStarted: boolean
    pubsubStarted: boolean
    pubsubPattern: string
}

// Subscribers (outside Zustand to avoid re-renders on every entry)
let profilerListeners: Set<ProfilerCallback> = new Set()
let pubsubListeners: Set<PubsubCallback> = new Set()
let profilerSaveTimeout: any = null
let pubsubSaveTimeout: any = null
let langFn: () => string = () => 'en'
let initialized = false
let profilerDesired = false
let pubsubDesired = false
let profilerGeneration = 0
let pubsubGeneration = 0
let profilerStartPromise: Promise<void> | null = null
let pubsubStartPromise: Promise<void> | null = null

function decodePubsubMessage(message: any): string {
    if (message instanceof ArrayBuffer) {
        try {
            const decoded = msgpackDecode(new Uint8Array(message))
            return JSON.stringify(decoded, null, 2)
        } catch {
            return new TextDecoder().decode(message)
        }
    }
    return String(message)
}

function saveToStorage(key: string, entries: any[]): void {
    const toSave = entries.slice(-MAX_STORAGE_ENTRIES)
    try { localStorage.setItem(key, JSON.stringify(toSave)) }
    catch { try { localStorage.removeItem(key) } catch {} }
}

function restoreFromStorage(): { profiler: ProfilerEntry[]; pubsub: PubsubEntry[] } {
    let profiler: ProfilerEntry[] = []
    let pubsub: PubsubEntry[] = []
    try {
        const pj = localStorage.getItem(PROFILER_STORAGE_KEY)
        if (pj) profiler = JSON.parse(pj)
    } catch {}
    try {
        const sj = localStorage.getItem(PUBSUB_STORAGE_KEY)
        if (sj) pubsub = JSON.parse(sj)
    } catch {}
    return { profiler, pubsub }
}

const onMonitorData = (data: any) => {
    const lang = langFn()
    const date = new Date(parseFloat(data.time) * 1000)
    const displayTime = date.toLocaleTimeString(lang, { hour: '2-digit', minute: '2-digit', second: '2-digit', hour12: false, fractionalSecondDigits: 3 } as any)
    const entry: ProfilerEntry = {
        displayTime,
        fullTimestamp: date.toISOString(),
        database: data.database,
        source: data.source,
        command: (data.args || []).join(' '),
    }
    const store = useMonitoringDataStore.getState()
    let entries = [...store.profilerEntries, entry]
    if (entries.length > MAX_ENTRIES) entries = entries.slice(-MAX_ENTRIES)
    useMonitoringDataStore.setState({ profilerEntries: entries })
    profilerListeners.forEach(cb => cb(entry))
    // Debounced save
    if (!profilerSaveTimeout) {
        profilerSaveTimeout = setTimeout(() => {
            profilerSaveTimeout = null
            saveToStorage(PROFILER_STORAGE_KEY, useMonitoringDataStore.getState().profilerEntries)
        }, SAVE_DEBOUNCE)
    }
}

const onPubSubMessage = (data: any) => {
    const lang = langFn()
    const date = new Date()
    const displayTime = date.toLocaleTimeString(lang, { hour: '2-digit', minute: '2-digit', second: '2-digit', hour12: false })
    const entry: PubsubEntry = {
        displayTime,
        fullTimestamp: date.toISOString(),
        channel: data.channel,
        message: decodePubsubMessage(data.message),
    }
    const store = useMonitoringDataStore.getState()
    let entries = [...store.pubsubEntries, entry]
    if (entries.length > MAX_ENTRIES) entries = entries.slice(-MAX_ENTRIES)
    useMonitoringDataStore.setState({ pubsubEntries: entries })
    pubsubListeners.forEach(cb => cb(entry))
    if (!pubsubSaveTimeout) {
        pubsubSaveTimeout = setTimeout(() => {
            pubsubSaveTimeout = null
            saveToStorage(PUBSUB_STORAGE_KEY, useMonitoringDataStore.getState().pubsubEntries)
        }, SAVE_DEBOUNCE)
    }
}

export const useMonitoringDataStore = create<MonitoringDataState>(() => {
    const { profiler, pubsub } = restoreFromStorage()
    return {
        profilerEntries: profiler,
        pubsubEntries: pubsub,
        profilerStarted: false,
        pubsubStarted: false,
        pubsubPattern: '*',
    }
})

export function initMonitoringData(getLang: () => string): void {
    langFn = getLang
    if (!initialized) {
        initialized = true
    }
}

export async function startProfiler(): Promise<void> {
    profilerDesired = true
    if (useMonitoringDataStore.getState().profilerStarted) return
    if (profilerStartPromise) return profilerStartPromise

    const generation = ++profilerGeneration
    const startPromise = (async () => {
        await request({ action: 'monitor/set', payload: { enabled: true } })

        // Ignore stale async completions from a previous mount/unmount cycle.
        if (generation !== profilerGeneration || !profilerDesired) return

        const client = getClient()
        client?.removeListener('monitor-data', onMonitorData)
        client?.on('monitor-data', onMonitorData)
        useMonitoringDataStore.setState({ profilerStarted: true })
    })().finally(() => {
        if (profilerStartPromise === startPromise) profilerStartPromise = null
    })
    profilerStartPromise = startPromise

    return profilerStartPromise
}

export function stopProfiler(): void {
    const wasStarted = useMonitoringDataStore.getState().profilerStarted
    profilerDesired = false
    profilerGeneration++
    profilerStartPromise = null

    getClient()?.removeListener('monitor-data', onMonitorData)
    useMonitoringDataStore.setState({ profilerStarted: false })
    if (wasStarted) {
        request({ action: 'monitor/set', payload: { enabled: false } }).catch(() => {})
    }
    if (profilerSaveTimeout) { clearTimeout(profilerSaveTimeout); profilerSaveTimeout = null }
    saveToStorage(PROFILER_STORAGE_KEY, useMonitoringDataStore.getState().profilerEntries)
}

export async function startPubSub(): Promise<void> {
    pubsubDesired = true
    if (useMonitoringDataStore.getState().pubsubStarted) return
    if (pubsubStartPromise) return pubsubStartPromise

    const generation = ++pubsubGeneration
    const pattern = useMonitoringDataStore.getState().pubsubPattern
    const startPromise = (async () => {
        await request({ action: 'settings/subscription', payload: { subscription: true, subscriberPattern: pattern } })

        // Ignore stale async completions from a previous mount/unmount cycle.
        if (generation !== pubsubGeneration || !pubsubDesired) return

        const client = getClient()
        client?.removeListener('pubsub-message', onPubSubMessage)
        client?.on('pubsub-message', onPubSubMessage)
        useMonitoringDataStore.setState({ pubsubStarted: true })
    })().finally(() => {
        if (pubsubStartPromise === startPromise) pubsubStartPromise = null
    })
    pubsubStartPromise = startPromise

    return pubsubStartPromise
}

export function stopPubSub(): void {
    const wasStarted = useMonitoringDataStore.getState().pubsubStarted
    pubsubDesired = false
    pubsubGeneration++
    pubsubStartPromise = null

    getClient()?.removeListener('pubsub-message', onPubSubMessage)
    useMonitoringDataStore.setState({ pubsubStarted: false })
    if (wasStarted) {
        request({ action: 'settings/subscription', payload: { subscription: false, subscriberPattern: '*' } }).catch(() => {})
    }
    if (pubsubSaveTimeout) { clearTimeout(pubsubSaveTimeout); pubsubSaveTimeout = null }
    saveToStorage(PUBSUB_STORAGE_KEY, useMonitoringDataStore.getState().pubsubEntries)
}

export async function restartPubSub(): Promise<void> {
    stopPubSub()
    await startPubSub()
}

export function clearProfiler(): void {
    useMonitoringDataStore.setState({ profilerEntries: [] })
    try { localStorage.removeItem(PROFILER_STORAGE_KEY) } catch {}
}

export function clearPubSub(): void {
    useMonitoringDataStore.setState({ pubsubEntries: [] })
    try { localStorage.removeItem(PUBSUB_STORAGE_KEY) } catch {}
}

export function destroyMonitoringData(): void {
    if (profilerSaveTimeout) { clearTimeout(profilerSaveTimeout); profilerSaveTimeout = null }
    if (pubsubSaveTimeout) { clearTimeout(pubsubSaveTimeout); pubsubSaveTimeout = null }
    saveToStorage(PROFILER_STORAGE_KEY, useMonitoringDataStore.getState().profilerEntries)
    saveToStorage(PUBSUB_STORAGE_KEY, useMonitoringDataStore.getState().pubsubEntries)
}

export function onProfilerEntry(cb: ProfilerCallback): () => void {
    profilerListeners.add(cb)
    return () => profilerListeners.delete(cb)
}

export function onPubsubEntry(cb: PubsubCallback): () => void {
    pubsubListeners.add(cb)
    return () => pubsubListeners.delete(cb)
}