Problem
I have about a million records in Redis which I want to dump into Elasticsearch periodically. I just want to make sure that my script is decent enough in terms of speed and no memory leaks.
'use strict';
const redis = require('redis');
const bluebird = require('bluebird');
const request = require('request');
const elasticsearch = require('elasticsearch');
const fs = require('fs');
const _ = require('lodash');
const async = require('async');
const sh = require('shorthash');
const sleep = require('sleep');
const config = require('../config');
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
let client = redis.createClient({
host: config.redis.url,
port: config.redis.port
});
let ES = elasticsearch.Client({
host: config.elasticsearch.url,
requestTimeout: 30000000
});
var keys = fs.readFileSync('no-keys').toString().split('n');
keys = keys.filter((e) => e);
let chunkedKeys = _.chunk(keys, 1000);
console.log('We have ' + chunkedKeys.length + ' keys');
_.each(chunkedKeys, (chunkedKey) => {
client.mget(chunkedKey, (mgetError, replies) => {
if (mgetError) {
console.error(mgetError);
}
console.log('MGET complete from Redis');
console.log('We have ' + replies.length + ' documents');
async.mapLimit(replies, 5, (reply, callback) => {
try {
let content = JSON.parse(reply);
let k = sh.unique(content.url);
let body = [{index: {_index: config.elasticsearch.index, _type: 'article', _id: k, _timestamp: (new Date()).toISOString() }}];
body.push(content);
callback(null, body);
} catch(e) {
console.error(e);
callback(e, []);
}
}, (err, results) => {
if(err) {
console.log(err);
}
let mergedResult = _.flatten(results.filter((e) => e));
console.log('Export complete with ' + mergedResult.length);
ES.bulk({body: mergedResult}, () => {
console.log('Import complete');
});
});
});
});
Solution
I can see two problems with your script:
- You are doing sync stuff in
async
. It is not recommended. Async is recommended for asynchronous operations. You are using it to do synchronous stuff – in the async.mapLimit block. Neither JSON.parse nor sh.unique is asynchronous. It is critical that you under the difference. Please read async docs on synchronous operations - You are buffering all your keys into memory. I guess that is a smaller problem, but it is less efficient than using a readStream.
Please take a look at my implementation and feel free to use any parts of it
'use strict';
const redis = require('redis');
const bluebird = require('bluebird');
const request = require('request');
const elasticsearch = require('elasticsearch');
const fs = require('fs');
const _ = require('lodash');
const async = require('async');
const sh = require('shorthash');
const sleep = require('sleep');
const config = require('../config');
const readline = require('readline'); //handy wrapper for readStream
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
let client = redis.createClient({
host: config.redis.url,
port: config.redis.port
});
let ES = elasticsearch.Client({
host: config.elasticsearch.url,
requestTimeout: 30000000
});
const readLineStream = readline.createInterface({ //this stream will serve keys one per line
input: fs.createReadStream('no-keys')
});
let keysBuffer = [];
readLineStream.on('line', (line) => { // you were splitting with 'n' I guess your input is key per-line
if (line) { //check for empty lines
keysBuffer.push(line);
if (keysBuffer.length === 1000) {
migrateKeys(keysBuffer);
keysBuffer = [];
}
}
});
readLineStream.on('end', ()=>{
if(keysBuffer.length >0){
migrateKeys(keysBuffer); // remember to flush your local buffer
}
});
function migrateKeys(chunkOfKeys) {
client.mget(chunkOfKeys, (mgetError, replies) => {
if (mgetError) {
console.error(mgetError); // Consider returning early
}
console.log('MGET complete from Redis');
console.log('We have ' + replies.length + ' documents');
let parsedReplies = [];
replies.forEach((reply)=> {
try {
let content = JSON.parse(reply);
parsedReplies.push([{
index: {
_index: config.elasticsearch.index,
_type: 'article',
_id: sh.unique(content.url),
_timestamp: (new Date()).toISOString()
}
}, content]); // no need to filter out replies with parse errors
} catch (e) {
console.error(e);
}
}); //using async for sync code is not recommended
console.log('Export complete with ' + parsedReplies.length);
ES.bulk({body: parsedReplies}, () => {
console.log('Import complete');
});
});
}