import { cloneDeep } from 'lodash-es'
const triggerDisconnect = (options) => {
const {connectionId, code, socket} = options
if (p3xrs.redisConnections.hasOwnProperty(connectionId)) {
delete p3xrs.redisConnections[connectionId]
socket.p3xrs.io.emit('redis-disconnected', {
connectionId: connectionId,
status: 'code',
code: code
})
sendStatus({
socket: socket
})
}
}
const sendStatus = (options) => {
const {socket} = options
const redisConnections = {}
Object.keys(p3xrs.redisConnections).forEach((redisConnectionKey) => {
redisConnections[redisConnectionKey] = {}
Object.keys(p3xrs.redisConnections[redisConnectionKey]).forEach(redisConnectionKey2 => {
redisConnections[redisConnectionKey][redisConnectionKey2] = p3xrs.redisConnections[redisConnectionKey][redisConnectionKey2]
})
})
socket.p3xrs.io.emit('redis-status', {
redisConnections: redisConnections,
})
}
const consolePrefixDisconnectRedis = 'socket.io shared disconnect redis'
const disconnectRedis = (options) => {
const {socket} = options
//console.warn(consolePrefixDisconnectRedis, `${socket.p3xrs.connectionId} !== ${connection.id}`)
if (p3xrs.redisConnections.hasOwnProperty(socket.p3xrs.connectionId)) {
console.warn(consolePrefixDisconnectRedis, `includes ${p3xrs.redisConnections[socket.p3xrs.connectionId].clients.includes(socket.id)} length === 1 ${p3xrs.redisConnections[socket.p3xrs.connectionId].clients.length}`)
if (p3xrs.redisConnections[socket.p3xrs.connectionId].clients.includes(socket.id) && p3xrs.redisConnections[socket.p3xrs.connectionId].clients.length === 1) {
//console.warn(consolePrefixDisconnectRedis, p3xrs.redisConnections[socket.p3xrs.connectionId])
//p3xrs.redisConnections[socket.p3xrs.connectionId].ioredis.disconnect()
delete p3xrs.redisConnections[socket.p3xrs.connectionId]
} else {
let connectionIndexExisting = p3xrs.redisConnections[socket.p3xrs.connectionId].clients.indexOf(socket.id);
console.warn(consolePrefixDisconnectRedis, socket.p3xrs.connectionId, p3xrs.redisConnections[socket.p3xrs.connectionId].clients, socket.id, connectionIndexExisting)
if (connectionIndexExisting > -1) {
p3xrs.redisConnections[socket.p3xrs.connectionId].clients.splice(connectionIndexExisting, 1)
}
}
}
if (p3xrs.redisConnections.hasOwnProperty(socket.p3xrs.connectionId) && p3xrs.redisConnections[socket.p3xrs.connectionId].hasOwnProperty('clients') && p3xrs.redisConnections[socket.p3xrs.connectionId].clients.length === 0) {
delete p3xrs.redisConnections[socket.p3xrs.connectionId]
}
disconnectRedisIo(options)
socket.p3xrs.connectionId = undefined
}
const sendConnections = (options) => {
const {socket} = options
const connections = cloneDeep(p3xrs.connections);
let connectionsList = connections.list.map(connection => {
delete connection.password
delete connection.tlsCrt
delete connection.tlsKey
delete connection.tlsCa
delete connection.sshPassword
delete connection.sshPrivateKey
//TODO fix secured nodes password
if (Array.isArray(connection.nodes)) {
connection.nodes = connection.nodes.map(node => {
delete node.password
return node
})
}
return connection
})
connections.list = connectionsList
socket.p3xrs.io.emit('connections', {
status: 'ok',
connections: connections
})
}
const disconnectRedisIo = (options) => {
const {socket} = options
console.warn('shared disconnectRedisIo', 'try')
if (socket.p3xrs.ioredis !== undefined) {
console.warn('shared disconnectRedisIo', 'executed')
socket.p3xrs.ioredis.disconnect()
socket.p3xrs.ioredisSubscriber.disconnect()
socket.p3xrs.ioredis = undefined
socket.p3xrs.ioredisSubscriber = undefined
}
if (Array.isArray(socket.p3xrs.tunnels) && socket.p3xrs.tunnels.length > 0) {
for (const server of socket.p3xrs.tunnels) {
server.close()
}
socket.p3xrs.tunnels = []
}
if (socket.p3xrs.sshClient) {
socket.p3xrs.sshClient.end()
socket.p3xrs.sshClient = undefined
}
}
const getStreamKeys = (options) => {
const {redis, } = options
let {dbsize, maxKeys} = options
return new Promise(async (resolve, reject) => {
try {
/*
if (dbsize === undefined) {
dbsize = await redis.dbsize()
}
*/
if (isNaN(maxKeys) || maxKeys < 5 || maxKeys > 100000) {
maxKeys = 10000
}
//console.warn('check if received max keys', maxKeys, typeof maxKeys, !isNaN(maxKeys), maxKeys < 5, maxKeys > 100000)
/*
let count = 100
if (dbsize > 110000) {
count = 10000
} else if (dbsize > 11000) {
count = 1000
}
*/
let count = Math.round(maxKeys / 10)
if (count < 5) {
count = 5
}
//console.warn('socket.io getStreamKeys dbsize', dbsize, 'count', count, 'maxKeys', maxKeys)
const stream = redis.scanStream({
match: options.match,
count: count
});
let keys = [];
let ended = false
stream.on('data', (resultKeys) => {
/*
keys = keys.concat(resultKeys);
if (maxKeys && keys.length >= maxKeys && !ended) {
ended = true
console.warn('reached max key count', maxKeys, 'found', keys.length, 'keys our of unknown total')
//stream.pause()
//stream.destroy()
stream.emit('end')
}
*/
if (maxKeys && keys.length < maxKeys) {
keys = keys.concat(resultKeys);
if (keys.length >= maxKeys) {
ended = true
resolve(keys)
//stream.emit('end')
}
} else if (!ended) {
ended = true
resolve(keys)
}
});
stream.on('end', () => {
if (ended) {
return
}
resolve(keys);
});
/*
stream.on('error', (error) => {
console.error('getStreamKeys stream', error)
reject(error)
})
*/
} catch (e) {
reject(e)
}
})
}
/*
const getStreamTypedKeys = (options) => {
const { redis, key, match } = options
let { scan } = options
if (scan === undefined) {
scan = 'scanStream'
}
return new Promise((resolve, reject) => {
let stream;
if (scan === 'scanStream') {
stream = redis[scan]({
match: match
});
} else {
stream = redis[scan](key, {
match: match
});
}
let keys = [];
stream.on('data', (resultKeys) => {
keys = keys.concat(resultKeys);
});
stream.on('end', async () => {
try {
resolve(keys);
} catch (e) {
console.error(e);
reject(e)
}
});
})
}
*/
const getKeysInfo = async (options) => {
const {redis, keys} = options;
const keyTypePipeline = redis.pipeline()
for (let key of keys) {
keyTypePipeline.type(key)
keyTypePipeline.ttl(key)
}
const keyTypeAndTtlResults = await keyTypePipeline.exec();
const result = {}
const complexLengthPipeline = redis.pipeline()
for (let keysIndex in keys) {
const pipelineIndex = keysIndex * 2
const keyType = keyTypeAndTtlResults[pipelineIndex]
const keyTtl = keyTypeAndTtlResults[pipelineIndex + 1]
const key = keys[keysIndex]
const obj = {
type: keyType[1],
ttl: keyTtl[1],
}
// Normalize ReJSON-RL to json for the client
if (obj.type === 'ReJSON-RL') {
obj.type = 'json'
}
// Normalize TSDB-TYPE to timeseries for the client
if (obj.type === 'TSDB-TYPE') {
obj.type = 'timeseries'
}
switch (obj.type) {
case 'stream':
complexLengthPipeline.xlen(key)
break;
case 'hash':
complexLengthPipeline.hlen(key)
break;
case 'list':
complexLengthPipeline.llen(key)
break;
case 'set':
complexLengthPipeline.scard(key)
break;
case 'zset':
complexLengthPipeline.zcard(key)
break;
case 'timeseries':
complexLengthPipeline.call('TS.INFO', key)
break;
}
result[key] = obj
}
const lengthsPipeline = await complexLengthPipeline.exec()
for (let keysIndex in keys) {
const key = keys[keysIndex]
const obj = result[key]
if (obj.type === 'string' || obj.type === 'none' || obj.type === 'json') {
continue
}
const lengthPipelineElement = lengthsPipeline.shift()
if (lengthPipelineElement === undefined) {
continue
}
if (obj.type === 'timeseries') {
// TS.INFO returns flat array [field, value, ...]; extract totalSamples
const tsInfo = lengthPipelineElement[1]
if (Array.isArray(tsInfo)) {
for (let i = 0; i < tsInfo.length; i += 2) {
if (tsInfo[i] === 'totalSamples') {
obj.length = tsInfo[i + 1]
break
}
}
}
} else {
obj.length = lengthPipelineElement[1]
}
}
return result;
}
const ensureReadonlyConnections = () => {
if (p3xrs.cfg.readonlyConnections === true) {
const errorCode = new Error('readonly-connection-mode')
throw errorCode;
}
}
const ensureReadonlyConnection = ({ socket }) => {
if (socket.p3xrs.readonly === true) {
const errorCode = new Error('readonly-connection-mode')
throw errorCode;
}
}
const getFullInfo = async (options) => {
const {redis} = options;
let {payload} = options
if (payload === undefined) {
payload = {}
}
const dbsize = await redis.dbsize()
const results = await Promise.all([
redis.info(),
getStreamKeys({
dbsize: dbsize,
redis: redis,
match: payload.match,
maxKeys: payload.maxKeys,
}),
redis.pubsub('channels', '*'),
// redis.infoObject(),
])
const keys = results[1]
let keysInfo = {}
if (keys.length < 110000) {
keysInfo = await getKeysInfo({
redis: redis,
keys: keys,
})
}
// const keysInfo = []
const result = {
info: results[0],
// infoObject: results[3],
keys: keys,
keysInfo: keysInfo,
dbsize: dbsize,
channels: results[2]
}
//console.log('get full info', result)
return result
}
const getFullInfoAndSendSocket = async (options) => {
const {redis, socket, payload, setDb} = options
if (setDb === true) {
try {
await redis.call('select', payload.db || 0)
} catch(e) {
console.warn(e)
}
}
const result = await getFullInfo({
redis: redis,
payload: payload,
})
let {extend} = options
if (extend === undefined) {
extend = {}
}
socket.emit(options.responseEvent, Object.assign(extend, {
status: 'ok',
info: result.info,
// infoObject: result.infoObject,
keys: result.keys,
keysInfo: result.keysInfo,
keysInfoFetchedAt: Date.now(),
dbsize: result.dbsize,
}))
}
const argumentParser = (input, sep, keepQuotes) => {
const separator = sep || /\s/g;
let singleQuoteOpen = false;
let doubleQuoteOpen = false;
let tokenBuffer = [];
const ret = [];
console.log('argumentParser input', input)
const arr = input.split('');
for (let i = 0; i < arr.length; ++i) {
let element = arr[i];
let matches = element.match(separator);
if (element === "'" && !doubleQuoteOpen) {
if (keepQuotes === true) {
tokenBuffer.push(element);
}
singleQuoteOpen = !singleQuoteOpen;
continue;
} else if (element === '"' && !singleQuoteOpen) {
if (keepQuotes === true) {
tokenBuffer.push(element);
}
doubleQuoteOpen = !doubleQuoteOpen;
continue;
}
if (!singleQuoteOpen && !doubleQuoteOpen && matches) {
if (tokenBuffer.length > 0) {
ret.push(tokenBuffer.join(''));
tokenBuffer = [];
} else if (!!sep) {
ret.push(element);
}
} else {
tokenBuffer.push(element);
}
}
if (tokenBuffer.length > 0) {
ret.push(tokenBuffer.join(''));
} else if (!!sep) {
ret.push('');
}
return ret;
}
const parseModuleList = (rawList) => {
if (!Array.isArray(rawList)) return []
const modules = []
for (const entry of rawList) {
if (!Array.isArray(entry)) continue
const mod = {}
for (let i = 0; i < entry.length; i += 2) {
mod[entry[i]] = entry[i + 1]
}
if (mod.name) modules.push(mod)
}
return modules
}
const detectModules = async (redis) => {
try {
const rawList = await redis.call('MODULE', 'LIST')
return parseModuleList(rawList)
} catch (e) {
// MODULE LIST not supported or disabled
return []
}
}
export {
argumentParser,
ensureReadonlyConnections,
triggerDisconnect,
getStreamKeys,
disconnectRedisIo,
sendConnections,
sendStatus,
disconnectRedis,
getKeysInfo,
getFullInfo,
getFullInfoAndSendSocket,
ensureReadonlyConnection,
parseModuleList,
detectModules,
}