NodeJS script to dump data from Redis to Elasticsearch

Posted on

Problem

I have about a million records in Redis which I want to dump into Elasticsearch periodically. I just want to make sure that my script is decent enough in terms of speed and no memory leaks.

'use strict';

const redis = require('redis');
const bluebird = require('bluebird');
const request = require('request');
const elasticsearch = require('elasticsearch');
const fs = require('fs');
const _ = require('lodash');
const async = require('async');
const sh = require('shorthash');
const sleep = require('sleep');
const config = require('../config');


bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);

let client = redis.createClient({
  host: config.redis.url,
  port: config.redis.port
});

let ES = elasticsearch.Client({
  host: config.elasticsearch.url,
  requestTimeout: 30000000
});

var keys = fs.readFileSync('no-keys').toString().split('n');
keys = keys.filter((e) => e);

let chunkedKeys = _.chunk(keys, 1000);
console.log('We have ' + chunkedKeys.length + ' keys');

_.each(chunkedKeys, (chunkedKey) => {
  client.mget(chunkedKey, (mgetError, replies) => {
    if (mgetError) {
      console.error(mgetError);
    }
    console.log('MGET complete from Redis');
    console.log('We have ' + replies.length + ' documents');
    async.mapLimit(replies, 5, (reply, callback) => {
      try {
        let content = JSON.parse(reply);
        let k = sh.unique(content.url);

        let body = [{index: {_index: config.elasticsearch.index, _type: 'article', _id: k, _timestamp: (new Date()).toISOString() }}];

        body.push(content);
        callback(null, body);
      } catch(e) {
        console.error(e);
        callback(e, []);
      }
    }, (err, results) => {
      if(err) {
        console.log(err);
      }
      let mergedResult = _.flatten(results.filter((e) => e));

      console.log('Export complete with ' + mergedResult.length);

      ES.bulk({body: mergedResult}, () => {
        console.log('Import complete');
      });
    });
  });
});

Solution

I can see two problems with your script:

  • You are doing sync stuff in async. It is not recommended. Async is recommended for asynchronous operations. You are using it to do synchronous stuff – in the async.mapLimit block. Neither JSON.parse nor sh.unique is asynchronous. It is critical that you under the difference. Please read async docs on synchronous operations
  • You are buffering all your keys into memory. I guess that is a smaller problem, but it is less efficient than using a readStream.

Please take a look at my implementation and feel free to use any parts of it

'use strict';

const redis = require('redis');
const bluebird = require('bluebird');
const request = require('request');
const elasticsearch = require('elasticsearch');
const fs = require('fs');
const _ = require('lodash');
const async = require('async');
const sh = require('shorthash');
const sleep = require('sleep');
const config = require('../config');
const readline = require('readline'); //handy wrapper for readStream


bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);

let client = redis.createClient({
    host: config.redis.url,
    port: config.redis.port
});

let ES = elasticsearch.Client({
    host: config.elasticsearch.url,
    requestTimeout: 30000000
});

const readLineStream = readline.createInterface({ //this stream will serve keys one per line
    input: fs.createReadStream('no-keys')
});

let keysBuffer = [];
readLineStream.on('line', (line) => { // you were splitting with 'n' I guess your input is key per-line
    if (line) { //check for empty lines
        keysBuffer.push(line);
        if (keysBuffer.length === 1000) {
            migrateKeys(keysBuffer);
            keysBuffer = [];
        }
    }
});

readLineStream.on('end', ()=>{
    if(keysBuffer.length >0){
        migrateKeys(keysBuffer); // remember to flush your local buffer
    }
});

function migrateKeys(chunkOfKeys) {
    client.mget(chunkOfKeys, (mgetError, replies) => {
        if (mgetError) {
            console.error(mgetError); // Consider returning early
        }
        console.log('MGET complete from Redis');
        console.log('We have ' + replies.length + ' documents');

        let parsedReplies = [];
        replies.forEach((reply)=> {
            try {
                let content = JSON.parse(reply);
                parsedReplies.push([{
                    index: {
                        _index: config.elasticsearch.index,
                        _type: 'article',
                        _id: sh.unique(content.url),
                        _timestamp: (new Date()).toISOString()
                    }
                }, content]); // no need to filter out replies with parse errors
            } catch (e) {
                console.error(e);
            }
        }); //using async for sync code is not recommended

        console.log('Export complete with ' + parsedReplies.length);

        ES.bulk({body: parsedReplies}, () => {
            console.log('Import complete');
        });
    });
}

Leave a Reply

Your email address will not be published. Required fields are marked *