RSS Git Download  Clone
Raw Blame History 9kB 229 lines
import * as sharedIoRedis from '../shared.mjs'

const BATCH_SIZE = 500

export default async (options) => {
    const {socket, payload} = options

    try {
        sharedIoRedis.ensureReadonlyConnection({ socket })

        const redis = socket.p3xrs.ioredis
        const { keys, conflictMode } = payload
        // conflictMode: 'overwrite' | 'skip'

        if (!Array.isArray(keys) || keys.length === 0) {
            socket.emit(options.responseEvent, {
                status: 'error',
                error: 'No keys to import',
            })
            return
        }

        // Check existing keys if skip mode (pipelined in batches)
        let existingKeys = new Set()
        if (conflictMode === 'skip') {
            for (let i = 0; i < keys.length; i += BATCH_SIZE) {
                const batch = keys.slice(i, i + BATCH_SIZE)
                const pipeline = redis.pipeline()
                for (const entry of batch) {
                    pipeline.exists(entry.key)
                }
                const results = await pipeline.exec()
                for (let j = 0; j < batch.length; j++) {
                    if (results[j][0]) {
                        // exists check itself failed — treat as not existing
                        continue
                    }
                    if (results[j][1] === 1) {
                        existingKeys.add(batch[j].key)
                    }
                }
            }
        }

        let created = 0
        let skipped = 0
        let errors = 0

        // Process keys in batches
        for (let i = 0; i < keys.length; i += BATCH_SIZE) {
            const batch = keys.slice(i, i + BATCH_SIZE)

            // Filter out skipped keys
            const toImport = []
            for (const entry of batch) {
                if (conflictMode === 'skip' && existingKeys.has(entry.key)) {
                    skipped++
                } else {
                    toImport.push(entry)
                }
            }

            if (toImport.length === 0) continue

            // Separate pipelineable types from sequential (stream, json)
            const pipelineable = []
            const sequential = []
            for (const entry of toImport) {
                if (entry.type === 'stream' || entry.type === 'json') {
                    sequential.push(entry)
                } else {
                    pipelineable.push(entry)
                }
            }

            // --- Pipelineable keys (string, list, set, zset, hash) ---
            if (pipelineable.length > 0) {
                // Delete existing keys if overwriting
                if (conflictMode === 'overwrite') {
                    const delPipeline = redis.pipeline()
                    for (const entry of pipelineable) {
                        delPipeline.del(entry.key)
                    }
                    await delPipeline.exec()
                }

                // Build write pipeline — track which entries map to which pipeline slot
                const writePipeline = redis.pipeline()
                const pipelineEntries = [] // entries that actually got a command
                for (const entry of pipelineable) {
                    try {
                        switch (entry.type) {
                            case 'string':
                                writePipeline.set(entry.key, Buffer.from(entry.value, 'base64'))
                                pipelineEntries.push(entry)
                                break
                            case 'list':
                                if (Array.isArray(entry.value) && entry.value.length > 0) {
                                    writePipeline.rpush(entry.key, ...entry.value.map(v => Buffer.from(v, 'base64')))
                                    pipelineEntries.push(entry)
                                } else {
                                    created++ // empty list, nothing to write
                                }
                                break
                            case 'set':
                                if (Array.isArray(entry.value) && entry.value.length > 0) {
                                    writePipeline.sadd(entry.key, ...entry.value.map(v => Buffer.from(v, 'base64')))
                                    pipelineEntries.push(entry)
                                } else {
                                    created++
                                }
                                break
                            case 'zset':
                                if (Array.isArray(entry.value) && entry.value.length > 0) {
                                    const args = []
                                    for (const e of entry.value) {
                                        args.push(e.score, Buffer.from(e.member, 'base64'))
                                    }
                                    writePipeline.zadd(entry.key, ...args)
                                    pipelineEntries.push(entry)
                                } else {
                                    created++
                                }
                                break
                            case 'hash':
                                if (entry.value && typeof entry.value === 'object') {
                                    const args = []
                                    for (const [field, val] of Object.entries(entry.value)) {
                                        args.push(field, Buffer.from(val, 'base64'))
                                    }
                                    if (args.length > 0) {
                                        writePipeline.hset(entry.key, ...args)
                                        pipelineEntries.push(entry)
                                    } else {
                                        created++
                                    }
                                }
                                break
                            default:
                                errors++
                        }
                    } catch (e) {
                        console.error(`key-import: failed to prepare key "${entry.key}":`, e.message)
                        errors++
                    }
                }

                if (pipelineEntries.length > 0) {
                    const writeResults = await writePipeline.exec()

                    // Collect successful entries for TTL restore
                    const successEntries = []
                    for (let j = 0; j < writeResults.length; j++) {
                        if (writeResults[j][0]) {
                            console.error(`key-import: write failed for key "${pipelineEntries[j].key}":`, writeResults[j][0].message)
                            errors++
                        } else {
                            created++
                            if (pipelineEntries[j].pttl && pipelineEntries[j].pttl > 0) {
                                successEntries.push(pipelineEntries[j])
                            }
                        }
                    }

                    // Pipeline TTL restore only for successfully written keys
                    if (successEntries.length > 0) {
                        const ttlPipeline = redis.pipeline()
                        for (const entry of successEntries) {
                            ttlPipeline.pexpire(entry.key, entry.pttl)
                        }
                        await ttlPipeline.exec()
                    }
                }
            }

            // --- Sequential keys (stream, json) ---
            for (const entry of sequential) {
                try {
                    if (conflictMode === 'overwrite') {
                        await redis.del(entry.key)
                    }

                    if (entry.type === 'stream') {
                        if (Array.isArray(entry.value) && entry.value.length > 0) {
                            const streamPipeline = redis.pipeline()
                            for (const streamEntry of entry.value) {
                                const { id, ...fields } = streamEntry
                                const args = []
                                for (const [k, v] of Object.entries(fields)) {
                                    args.push(k, v)
                                }
                                if (args.length > 0) {
                                    streamPipeline.xadd(entry.key, '*', ...args)
                                }
                            }
                            await streamPipeline.exec()
                        }
                    } else if (entry.type === 'json') {
                        if (entry.value !== null && entry.value !== undefined) {
                            await redis.call('JSON.SET', entry.key, '$', typeof entry.value === 'string' ? entry.value : JSON.stringify(entry.value))
                        }
                    }

                    if (entry.pttl && entry.pttl > 0) {
                        await redis.pexpire(entry.key, entry.pttl)
                    }

                    created++
                } catch (e) {
                    console.error(`key-import: failed to import key "${entry.key}" (type=${entry.type}):`, e.message)
                    errors++
                }
            }
        }

        socket.emit(options.responseEvent, {
            status: 'ok',
            data: { created, skipped, errors },
        })

    } catch (e) {
        console.error(e)
        socket.emit(options.responseEvent, {
            status: 'error',
            error: e.message,
        })
    }
}