RSS Git Download  Clone
Raw Blame History 2kB 64 lines
const consolePrefix = 'socket.io timeseries mrange'

export default async (options) => {
    const {socket, payload} = options;

    try {
        const redis = socket.p3xrs.ioredis
        const from = payload.from || '-'
        const to = payload.to || '+'
        const filter = payload.filter

        if (!filter) {
            socket.emit(options.responseEvent, {
                status: 'ok',
                data: [],
            })
            return
        }

        const args = [from, to, 'FILTER', filter]

        if (payload.aggregation && payload.aggregation.type && payload.aggregation.timeBucket) {
            args.push('AGGREGATION', payload.aggregation.type, payload.aggregation.timeBucket)
        }

        if (payload.count) {
            args.push('COUNT', payload.count)
        }

        console.info(consolePrefix, args)

        const raw = await redis.call('TS.MRANGE', ...args)

        // raw is [[key, labels, [[ts, val], ...]], ...]
        const data = raw.map(entry => {
            const key = entry[0]
            const labels = {}
            if (Array.isArray(entry[1])) {
                for (const pair of entry[1]) {
                    if (Array.isArray(pair) && pair.length === 2) {
                        labels[pair[0]] = pair[1]
                    }
                }
            }
            const points = (entry[2] || []).map(point => ({
                timestamp: typeof point[0] === 'number' ? point[0] : parseInt(point[0]),
                value: typeof point[1] === 'number' ? point[1] : parseFloat(point[1]),
            }))
            return { key, labels, data: points }
        })

        socket.emit(options.responseEvent, {
            status: 'ok',
            data: data,
        })
    } catch (e) {
        console.error(e)
        socket.emit(options.responseEvent, {
            status: 'error',
            error: e.message,
        })
    }
}