Last active
August 1, 2017 11:20
-
-
Save matt212/9aceee8d314b2a533c3598806ed06d74 to your computer and use it in GitHub Desktop.
promise based codeflow for unziping , reading, formating and bulk streaming into postgres
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*my normal promise function looks like this | |
normalfunc.then(function (data){ | |
}).error(function (){ | |
}) | |
function normalfunc(data) | |
{ | |
return new Promise(function(resolve, reject) { | |
resolve/reject | |
}) | |
} | |
*/ | |
//////////////////Please scroll down for code flow part///////////////////////////// | |
/******************************streambulkinsert codebase looks like this****************************************************/ | |
function streambulkinsert(data) { | |
return new Promises(function(resolve, reject) { | |
//resolve(data[0]); | |
pg.connect(connectionString, function(err, client, done) { | |
if (err) { | |
console.log(err) | |
} | |
var sqlcopysyntax = 'COPY srt(starttime, endtime,content,showname,season,ep, createdat, updatedat) FROM STDIN '; | |
var stream = client.query(copyFrom(sqlcopysyntax)); | |
console.log(data[0]) | |
var started = false; | |
var started = false; | |
var internmap = through2.obj(function(arr, enc, cb) { | |
var rowText = (started ? '\n' : '') + arr.join('\t'); | |
started = true; | |
cb(null, rowText); | |
}) | |
data.forEach(function(r) { | |
internmap.write(r); | |
}) | |
internmap.end(); | |
internmap.pipe(stream); | |
stream.on('finish', function() { | |
console.log("stream man") | |
resolve(0); | |
}) | |
stream.on('error', function(err) { | |
console.log(err) | |
console.log("here man2") | |
reject(0); | |
}) | |
internmap.on('finish', function() { | |
console.log("internmap man2") | |
done(); | |
resolve(0); | |
}) | |
internmap.on('error', function(err) { | |
console.log(err) | |
reject(0); | |
}) | |
}) | |
}) | |
} | |
//////////////////////////code flow part commences////////////////////////////////////////////////////////////////////////// | |
/*put the whole thing in queue for background process and socket.io instance for update-ing dom when queues are finished !*/ | |
var baseobj=new Object(); | |
baseobj.testFolder = "folder path" | |
baseobj.ref = req.app.io | |
q.push(baseobj, jobcompleted); | |
function jobcompleted(argument) | |
{ | |
argument.ref.emit('news', "completed"); | |
} | |
var q = new Queue(function(testFolder, cb) { | |
//unzip files and readsrtfiles will read files and perform modification as per req specs | |
unzipfiles(baseobj.testFolder).then(readsrtfiles).then(function(resultset) { | |
//got formatted object array | |
//using mapseries to process single files data i.e each resultset contain data from single files which is read from | |
Promises.mapSeries(resultset, function(filemeh) { | |
if (filemeh != undefined) { | |
//return conventionaldatadump(filemeh) | |
return streambulkinsert(filemeh) | |
.then(function(a) { | |
console.log(a); | |
return a; | |
}).catch(function(err) { | |
console.log("Promise Rejected" + err); | |
}) | |
} | |
}).then(function(allItems) { | |
console.log("*************************************all process completed**********************************************************") | |
cb(baseobj); | |
}) | |
}) | |
}) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment