RSS Git Download  Clone
Raw Blame History 7kB 204 lines
const consolePrefix = 'socket.io connection-connect';
const Redis = require('../../../lib/ioredis-cluster')

const sharedIoRedis = require('../shared')

const generateConnectInfo = async (options) => {
    const {socket, redis} = options

    // console.warn('generateConnectInfo', options.payload)


    let databases
    let results
    let commands
    if (options.payload.connection.awsElastiCache === true || options.payload.connection.azure === true) {
        databases = 0
        commands = await redis.command()
    } else {
        results = await Promise.all([
            redis.config('get', 'databases'),
            redis.command(),
        ])
        databases = parseInt(results[0][1])
        commands = results[1]
    }
    //socket.p3xrs.commands = commands.map(e => e[0].toLowerCase())

    //console.log('databases', databases)

    await sharedIoRedis.getFullInfoAndSendSocket({
        redis: redis,
        responseEvent: options.responseEvent,
        socket: socket,
        extend: {
            databases: databases,
            commands: commands
        }
    })
}

module.exports = async (options) => {
    const {socket, payload} = options;

    const {connection, db} = payload

    try {
        if (socket.p3xrs.connectionId !== connection.id) {
            sharedIoRedis.disconnectRedis({
                socket: socket,
            })
        }

        if (!p3xrs.redisConnections.hasOwnProperty(connection.id)) {
            p3xrs.redisConnections[connection.id] = {
                connection: connection,
                clients: []
            }
        }
        if (!p3xrs.redisConnections[connection.id].clients.includes(socket.id)) {
            console.info(consolePrefix, 'added new socket.id', socket.id, 'to', connection.id, 'name with', connection.name)
            p3xrs.redisConnections[connection.id].clients.push(socket.id)
        }

        if (socket.p3xrs.ioredis !== undefined) {
            console.info(consolePrefix, 'redis was already connected')
            socket.p3xrs.connectionId = connection.id
            await generateConnectInfo({
                redis: socket.p3xrs.ioredis,
                socket: socket,
                responseEvent: options.responseEvent,
                payload: payload
            })

            sharedIoRedis.sendStatus({
                socket: socket,
            })
        } else {
            const actualConnection = p3xrs.connections.list.find(con => options.payload.connection.id === con.id)
            let redisConfig = Object.assign({}, actualConnection);
            delete redisConfig.name
            delete redisConfig.id
            redisConfig.retryStrategy = () => {
                return false
            }


            if (db !== undefined) {
                redisConfig.db = db
            }

            if (redisConfig.cluster === true) {
                redisConfig = [redisConfig].concat(actualConnection.nodes)
            }

            let redis = new Redis(redisConfig)
            let redisSubscriber = new Redis(redisConfig)
            // let redis = await new Redis(redisConfig, {autoDetectCluster: true})
            // let redisSubscriber = await new Redis(redisConfig, {autoDetectCluster: true})
            socket.p3xrs.connectionId = connection.id
            socket.p3xrs.ioredis = redis
            socket.p3xrs.ioredisSubscriber = redisSubscriber
            let didConnected = false

            const redisErrorFun = async function (error) {
                const consolePrefix = 'socket.io connection-connect redis error fun'
                console.warn(consolePrefix, connection.id, connection.name, 'error')
                console.error(error)
                console.warn(consolePrefix, 'didConnected', didConnected)
                if (!didConnected) {
                    socket.emit(options.responseEvent, {
                        status: 'error',
                        error: error
                    })
                }
                const disconnectedData = {
                    connectionId: socket.p3xrs.connectionId,
                    error: error,
                    status: 'error',
                }
                console.warn(consolePrefix, 'disconnectedData', disconnectedData)
                socket.p3xrs.io.emit('redis-disconnected', disconnectedData)

                try {
                    await sharedIoRedis.disconnectRedis({
                        socket: socket,
                    })
                } catch (e) {
                    console.warn(consolePrefix, 'disconnectRedis')
                    console.error(e)
                }
                delete p3xrs.redisConnections[socket.connectionId]

                socket.p3xrs.connectionId = undefined
                socket.p3xrs.ioredis = undefined
                socket.p3xrs.ioredisSubscriber = undefined

                sharedIoRedis.sendStatus({
                    socket: socket,
                })
            }

            redis.on('error', redisErrorFun)
            redisSubscriber.on('error', redisErrorFun)

            //console.warn('create psubscribe', actualConnection.id)
            redisSubscriber.psubscribe('*', function (error, count) {
                if (error) {
                    console.error(error)
                }
            })

            //console.warn('create pmessage', actualConnection.id)
            redisSubscriber.on('pmessage', function (channel, pattern, message) {
                console.log(`receive pmessage channel: ${channel} - pattern: ${pattern}, message: ${message}`);
                //console.log('list clients', actualConnection.id, JSON.stringify(p3xrs.redisConnections[actualConnection.id].clients, null, 4))
                socket.emit('pubsub-message', {
                    channel: pattern,
                    message: message,
                })
            });

            redis.on('connect', async function () {

                try {
                    console.info(consolePrefix, options.payload.connection.id, options.payload.connection.name, 'connected')
                    didConnected = true


                    await generateConnectInfo({
                        redis: redis,
                        socket: socket,
                        responseEvent: options.responseEvent,
                        payload: options.payload,
                    })

                } catch (e) {
                    console.error(e)
                    socket.emit(options.responseEvent, {
                        status: 'error',
                        error: e,
                    })
                } finally {
                    sharedIoRedis.sendStatus({
                        socket: socket,
                    })

                }


            })

        }

    } catch (error) {
        console.error(error)
        socket.emit(options.responseEvent, {
            status: 'error',
            error: error
        })

    }

}