RSS Git Download  Clone
Raw Blame History 5kB 175 lines
const Redis = require('ioredis')
const {EventEmitter} = require('events')
const redisInfo = require('redis-info')

const setDefaultPasswordOptionFromServer = require('./setDefaultPasswordOptionFromServer')

module.exports = class Cluster extends Redis.Cluster{
    constructor(server, options={}){
      server = Array.isArray(server) ? server : [server]
      options = setDefaultPasswordOptionFromServer(options, server)
      super(server, options)
    }
    async infoObject(...args){
      const [key] = args
      if(key==='keyspace'){
        return {
          databases: [await this._getClusterInfoKeyspace()]
        }
      }
      const info = await this.info(...args)
      const infoObject = redisInfo.parse(info)
      if(!key){
        infoObject.databases[0] = await this._getClusterInfoKeyspace()
      }
      return infoObject
    }
    async _getClusterInfoKeyspace(info){
      const keyspaceList = await Promise.all( this.nodes('master').map(node => {
        return node.info('keyspace')
      }) )
      let keys = 0
      let expires = 0
      for(const nodeKeyspace of keyspaceList){
        const parsed = redisInfo.parse(nodeKeyspace)
        const db0 = parsed.databases[0]
        const {
          keys:nodeKeys = 0,
          expires:nodeExpires = 0,
        } = db0
        keys += nodeKeys
        expires += nodeExpires
      }
      const avg_ttl = keyspaceList.reduce((tt, {avg_ttl:nodeAvgTtl = 0})=>{
        return tt + nodeAvgTtl
      },0)/keyspaceList.length
      const clusterKeyspace = {
        keys,
        expires,
        avg_ttl,
      }
      // console.log({clusterKeyspace})
      return clusterKeyspace
    }
    async originalDbsize(...args){
      return super.dbsize(...args)
    }
    async dbsize(){
        const nodeCounts = await Promise.all( this.nodes('master').map(node => {
          return node.dbsize()
        }) )
        const count = nodeCounts.reduce((tt, c) => tt+c ,0)
        // console.log({count})
        return count
    }
    originalRename(...args){
        return super.rename(...args)
    }
    async rename(key, keyNew, callback){
        let res = null
        let err = null
        try{
            let [ value, ttl ] = await Promise.all([
                this.dumpBuffer(key),
                this.ttl(key),
            ])
            ttl = parseInt(ttl)
            if(ttl<0){
                ttl = 0
            }
            await this.del(keyNew)
            await this.restore(keyNew, ttl, value)
            await this.del(key)
            res = 'OK'
        }
        catch(e){
            err = e
        }

        if(typeof callback === 'function'){
            callback(err, res)
        }
        else if (err){
            throw err
        }
        return res

    }
    originalPipeline(...args){
        return super.pipeline(...args)
    }
    pipeline(...pipelineArgs){
        const calls = []
        async function exec(calls){
            const results = []
            for(let c of calls){
                const result = await c()
                results.push(result)
            }
            // console.log({results})
            return results
        }
        const proxy = new Proxy(calls, {
            get: (calls, method)=>{
                return (...callArgs)=>{
                    switch(method){
                        case 'exec':
                            return exec(calls)
                            break
                    }
                    const callback = async () => {
                        let err = null
                        let result = null
                        try{
                            result = await this[method](...callArgs)
                        }
                        catch(e){
                            err = e
                        }
                        return [err, result]
                    }
                    calls.push(callback)
                    return proxy
                }
            }
        })
        return proxy

    }
    scanStream(...args){
        const stream = new EventEmitter()
        this._streamNodes({
            stream,
            method: 'scanStream',
            params: args,
        })
        return stream
    }
    async _streamNodes(options={}){
        let {
            nodes = this.nodes('master'),
            stream = new EventEmitter(),
            method,
            params = [],
        } = options
        for(let node of nodes){
            await new Promise((resolve, reject)=>{
                const nodeStream = node[method](...params)
                nodeStream.on('data', (resultKeys) => {
                    // console.log({resultKeys})
                    stream.emit('data', resultKeys)
                })
                nodeStream.on('end', async () => {
                    try {
                        resolve()
                    } catch (e) {
                        stream.emit('error',e)
                        reject(e)
                    }
                })
            })
        }
        stream.emit('end')
    }
}