RSS Git Download  Clone
Raw Blame History 5kB 222 lines
const IORedis = require('ioredis')
const redisInfo = require('redis-info')
const {EventEmitter} = require('events')

const redisInfoCache = new Map()
const redisNodesCache = new Map()
async function getInfo(server){
  if(redisInfoCache.has(server)){
    return redisInfoCache.get(server)
  }
  const redis = new Redis(server)
  const rawInfo = await redis.info()
  const info = redisInfo.parse(rawInfo)
  redisInfoCache.set(server, info)
  return info
}
async function isClusterEnabled(server){
  const {cluster_enabled} = await getInfo(server)
  return Boolean(parseInt(cluster_enabled))
}
async function getClusterNodes(server, force = false){
  if(redisNodesCache.has(server) && !force){
    return redisNode.get(server)
  }
  const redis = new Redis(server)

  const rawNodes = await new Promise((resolve, reject)=>{
    redis.sendCommand(
      new Redis.Command(
        'CLUSTER',
        ['NODES'],
        'utf-8',
        function(err,value) {
          if (err)
            reject(err)
          else
            resolve(value.toString())
        }
      )
    )
  })

  const lines = rawNodes.trim().split("\n")
  const nodes = lines.reduce((arr, line)=>{
    if(!line){
      return arr
    }
    const row = line.split(' ')
    const [
      node_id,
      server,
      flags,
    ] = row
    const [ target, slots ] = server.split('@')
    const [ host, port ] = target.split(':')
    const node = {
      host,
      port,
    }
    arr.push(node)
    return arr
  }, [])
  redisNodesCache.set(server, nodes)
  return nodes
}
async function createRedis(server, defaultConfig = {}){
  if(Array.isArray(server)){
    return new Redis.Cluster(server)
  }

  const clusterEnabled = await isClusterEnabled(server)
  if(!clusterEnabled){
    return new Redis(server)
  }

  const nodes = await getClusterNodes(server)

  // console.log({nodes})

  const servers = nodes.map(node =>{
    return {...defaultConfig, ...node}
  })

  // console.log({servers})
  return new RedisCluster(servers)
}

class Cluster extends IORedis.Cluster{
  static create = createRedis
  originalRename(...args){
    return parent.rename(...args)
  }
  async rename(key, keyNew, callback){
    let res = null
    let err = null
    try{
      let [ value, ttl ] = await Promise.all([
        this.dumpBuffer(key),
        this.ttl(key),
      ])
      ttl = parseInt(ttl)
      if(ttl<0){
        ttl = 0
      }
      await this.del(keyNew)
      await this.restore(keyNew, ttl, value)
      await this.del(key)
      res = 'OK'
    }
    catch(e){
      err = e
    }

    if(typeof callback === 'function'){
      callback(err, res)
    }
    else if (err){
      throw err
    }
    return res

  }
  originalPipeline(...args){
    return parent.pipeline(...args)
  }
  pipeline(...args){
    return this.multi(...args)
  }
  scanStream(...args){
    const stream = new EventEmitter()
    this.streamNodes({
      stream,
      method: 'scanStream',
      params: args,
    })
    return stream
  }
  async streamNodes(options={}){
    let {
      nodes = this.nodes('master'),
      stream = new EventEmitter(),
      method,
      params = [],
    } = options
    for(let node of nodes){
      await new Promise((resolve, reject)=>{
        const nodeStream = node[method](...params)
        nodeStream.on('data', (resultKeys) => {
          console.log({resultKeys})
          stream.emit('data', resultKeys)
        })
        nodeStream.on('end', async () => {
          try {
            resolve()
          } catch (e) {
            stream.emit('error',e)
            reject(e)
          }
        })
      })
    }
    stream.emit('end')
  }
}

function getServersFromEnv(prefix='', redisConfig = {}){

  const REDIS_SERVERS = process.env[prefix+'REDIS_SERVERS']
  const REDIS_PASS = process.env[prefix+'REDIS_PASS']
  const REDIS_PORT = process.env[prefix+'REDIS_PORT'] || '6379'

  const redisServersIpList = REDIS_SERVERS.split(',')

  const redisConfigDefault = {
    port: REDIS_PORT,
    password: REDIS_PASS,
    retryStrategy: function(){
      return false
    },
  }

  redisConfig = {
    ...redisConfigDefault,
    ...redisConfig,
  }

  const redisServers = redisServersIpList.map( server =>{
    const [host, port] = server.split(':')
    return {
      ...redisConfig,
      host,
      port,
    }
  })

  return redisServers
}

function Redis(server, options){
  if(!Array.isArray(server)){
    const {host} = server
    if(host.slice(0,4).toUpperCase()==='!ENV'){
      let ENV_KEY_PREFIX = ''
      if(host.slice(4,5)==='='){
        ENV_KEY_PREFIX = host.slice(5)
      }
      server = getServersFromEnv(ENV_KEY_PREFIX)
    }
  }

  if(Array.isArray(server)){
    return new Cluster(server, options)
  }
  else{
    return new IORedis(server, options)
  }
  return
}
Redis.Cluster = Cluster


module.exports = Redis