/**
* @file Manages Salesforce Bulk API related operations
* @author Shinichi Tomita <shinichi.tomita@gmail.com>
*/
var util = require('util'),
stream = require('stream'),
Stream = stream.Stream,
events = require('events'),
_ = require('underscore')._,
Connection = require('./connection'),
RecordStream = require('./record-stream'),
CSV = require('./csv'),
Promise = require('./promise');
/*--------------------------------------------*/
/**
* Class for Bulk API Job
*
* @protected
* @class Bulk~Job
* @extends events.EventEmitter
*
* @param {Bulk} bulk - Bulk API object
* @param {String} [type] - SObject type
* @param {String} [operation] - Bulk load operation ('insert', 'update', 'upsert', 'delete', or 'hardDelete')
* @param {Object} [options] - Options for bulk loading operation
* @param {String} [options.extIdField] - External ID field name (used when upsert operation).
* @param {String} [jobId] - Job ID (if already available)
*/
var Job = function(bulk, type, operation, options, jobId) {
this._bulk = bulk;
this.type = type;
this.operation = operation;
this.options = options || {};
this.id = jobId;
this.state = this.id ? 'Open' : 'Unknown';
this._batches = {};
};
util.inherits(Job, events.EventEmitter);
/**
* Open new job and get jobinfo
*
* @method Bulk~Job#open
* @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
* @returns {Promise.<Bulk~JobInfo>}
*/
Job.prototype.open = function(callback) {
var self = this;
var bulk = this._bulk;
var logger = bulk._logger;
// if not requested opening job
if (!this._jobInfo) {
var body = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">',
'<operation>' + this.operation.toLowerCase() + '</operation>',
'<object>' + this.type + '</object>',
(this.options.extIdField ?
'<externalIdFieldName>'+this.options.extIdField+'</externalIdFieldName>' :
''),
'<contentType>CSV</contentType>',
'</jobInfo>'
].join('');
this._jobInfo = bulk._request({
method : 'POST',
path : "/job",
body : body,
headers : {
"Content-Type" : "application/xml; charset=utf-8"
},
responseType: "application/xml"
}).then(function(res) {
self.emit("open", res.jobInfo);
self.id = res.jobInfo.id;
self.state = res.jobInfo.state;
return res.jobInfo;
}, function(err) {
self.emit("error", err);
throw err;
});
}
return this._jobInfo.thenCall(callback);
};
/**
* Create a new batch instance in the job
*
* @method Bulk~Job#createBatch
* @returns {Bulk~Batch}
*/
Job.prototype.createBatch = function() {
var batch = new Batch(this);
var self = this;
batch.on('queue', function() {
self._batches[batch.id] = batch;
});
return batch;
};
/**
* Get a batch instance specified by given batch ID
*
* @method Bulk~Job#batch
* @param {String} batchId - Batch ID
* @returns {Bulk~Batch}
*/
Job.prototype.batch = function(batchId) {
var batch = this._batches[batchId];
if (!batch) {
batch = new Batch(this, batchId);
this._batches[batchId] = batch;
}
return batch;
};
/**
* Check the job status from server
*
* @method Bulk~Job#check
* @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
* @returns {Promise.<Bulk~JobInfo>}
*/
Job.prototype.check = function(callback) {
var self = this;
var bulk = this._bulk;
var logger = bulk._logger;
return this.open().then(function() {
return bulk._request({
method : 'GET',
path : "/job/" + self.id,
responseType: "application/xml"
});
}).then(function(res) {
logger.debug(res.jobInfo);
self.state = res.jobInfo.state;
return res.jobInfo;
}).thenCall(callback);
};
/**
* List all registered batch info in job
*
* @method Bulk~Job#list
* @param {Callback.<Array.<Bulk~BatchInfo>>} [callback] - Callback function
* @returns {Promise.<Array.<Bulk~BatchInfo>>}
*/
Job.prototype.list = function(callback) {
var self = this;
var bulk = this._bulk;
var logger = bulk._logger;
return this.open().then(function() {
return bulk._request({
method : 'GET',
path : "/job/" + self.id + "/batch",
responseType: "application/xml"
});
}).then(function(res) {
logger.debug(res.batchInfoList.batchInfo);
var batchInfoList = res.batchInfoList;
batchInfoList = _.isArray(batchInfoList.batchInfo) ? batchInfoList.batchInfo : [ batchInfoList.batchInfo ];
return batchInfoList;
}).thenCall(callback);
};
/**
* Close opened job
*
* @method Bulk~Job#close
* @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
* @returns {Promise.<Bulk~JobInfo>}
*/
Job.prototype.close = function() {
var self = this;
return this._changeState("Closed").then(function(jobInfo) {
self.id = null;
self.emit("close", jobInfo);
return jobInfo;
}, function(err) {
self.emit("error", err);
throw err;
});
};
/**
* Set the status to abort
*
* @method Bulk~Job#abort
* @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
* @returns {Promise.<Bulk~JobInfo>}
*/
Job.prototype.abort = function() {
var self = this;
return this._changeState("Aborted").then(function(jobInfo) {
self.id = null;
self.emit("abort", jobInfo);
self.state = "Aborted";
return jobInfo;
}, function(err) {
self.emit("error", err);
throw err;
});
};
/**
* @private
*/
Job.prototype._changeState = function(state, callback) {
var self = this;
var bulk = this._bulk;
var logger = bulk._logger;
return this.open().then(function() {
var body = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">',
'<state>' + state + '</state>',
'</jobInfo>'
].join('');
return bulk._request({
method : 'POST',
path : "/job/" + self.id,
body : body,
headers : {
"Content-Type" : "application/xml; charset=utf-8"
},
responseType: "application/xml"
});
}).then(function(res) {
logger.debug(res.jobInfo);
self.state = res.jobInfo.state;
return res.jobInfo;
}).thenCall(callback);
};
/*--------------------------------------------*/
/**
* Batch (extends RecordStream implements Sendable)
*
* @protected
* @class Bulk~Batch
* @extends {RecordStream}
* @implements {Promise.<Array.<RecordResult>>}
* @param {Bulk~Job} job - Bulk job object
* @param {String} [batchId] - Batch ID (if already available)
*/
var Batch = function(job, batchId) {
Batch.super_.apply(this);
this.sendable = true;
this.job = job;
this.id = batchId;
this._bulk = job._bulk;
this._csvStream = new RecordStream.CSVStream();
this._csvStream.stream().pipe(this.stream());
this._deferred = Promise.defer();
};
util.inherits(Batch, RecordStream);
/**
* Execute batch operation
*
* @method Bulk~Batch#execute
* @param {Array.<Record>|stream.Stream|String} [input] - Input source for batch operation. Accepts array of records, CSv string, and CSV data input stream.
* @param {Callback.<Array.<RecordResult>>} [callback] - Callback function
* @returns {Bulk~Batch}
*/
Batch.prototype.run =
Batch.prototype.exec =
Batch.prototype.execute = function(input, callback) {
var self = this;
if (typeof input === 'function') { // if input argument is omitted
callback = input;
input = null;
}
// if batch is already executed
if (this._result) {
throw new Error("Batch already executed.");
}
var rdeferred = Promise.defer();
this._result = rdeferred.promise;
this._result.thenCall(callback).then(function(res) {
self._deferred.resolve(res);
}, function(err) {
self._deferred.reject(err);
});
this.once('response', function(res) {
rdeferred.resolve(res);
});
this.once('error', function(err) {
rdeferred.reject(err);
});
if (input instanceof Stream) {
input.pipe(this.stream());
} else {
var data;
if (_.isArray(input)) {
_.forEach(input, function(record) { self.send(record); });
} else if (_.isString(input)){
data = input;
var stream = this.stream();
stream.write(data);
stream.end();
}
}
// return Batch instance for chaining
return this;
};
/**
* Promise/A+ interface
* http://promises-aplus.github.io/promises-spec/
*
* Delegate to deferred promise, return promise instance for batch result
*
* @method Bulk~Batch#then
*/
Batch.prototype.then = function(onResolved, onReject, onProgress) {
return this._deferred.promise.then(onResolved, onReject, onProgress);
};
/**
* Promise/A+ extension
* Call "then" using given node-style callback function
*
* @method Bulk~Batch#thenCall
*/
Batch.prototype.thenCall = function(callback) {
return _.isFunction(callback) ? this.then(function(res) {
return callback(null, res);
}, function(err) {
return callback(err);
}) : this;
};
/**
* @override
*/
Batch.prototype.send = function(record) {
record = _.clone(record);
if (this.job.operation === "insert") {
delete record.Id;
} else if (this.job.operation === "delete") {
record = { Id: record.Id };
}
delete record.type;
delete record.attributes;
return this._csvStream.send(record);
};
/**
* @override
*/
Batch.prototype.end = function(record) {
if (record) {
this.send(record);
}
this.sendable = false;
this._csvStream.end();
};
/**
* Check batch status in server
*
* @method Bulk~Batch#check
* @param {Callback.<Bulk~BatchInfo>} [callback] - Callback function
* @returns {Promise.<Bulk~BatchInfo>}
*/
Batch.prototype.check = function(callback) {
var self = this;
var bulk = this._bulk;
var logger = bulk._logger;
var jobId = this.job.id;
var batchId = this.id;
if (!jobId || !batchId) {
throw new Error("Batch not started.");
}
return bulk._request({
method : 'GET',
path : "/job/" + jobId + "/batch/" + batchId,
responseType: "application/xml"
}).then(function(res) {
logger.debug(res.batchInfo);
return res.batchInfo;
}).thenCall(callback);
};
/**
* Polling the batch result and retrieve
*
* @method Bulk~Batch#poll
* @param {Number} interval - Polling interval in milliseconds
* @param {Number} timeout - Polling timeout in milliseconds
*/
Batch.prototype.poll = function(interval, timeout) {
var self = this;
var jobId = this.job.id;
var batchId = this.id;
if (!jobId || !batchId) {
throw new Error("Batch not started.");
}
var startTime = new Date().getTime();
var poll = function() {
var now = new Date().getTime();
if (startTime + timeout < now) {
self.emit('error', new Error("polling time out"));
return;
}
self.check(function(err, res) {
if (err) {
self.emit('error', err);
} else {
if (res.state === "Failed") {
if (parseInt(res.numberRecordsProcessed, 10) > 0) {
self.retrieve();
} else {
self.emit('error', new Error(res.stateMessage));
}
} else if (res.state === "Completed") {
self.retrieve();
} else {
setTimeout(poll, interval);
}
}
});
};
setTimeout(poll, interval);
};
/**
* Retrieve batch result
*
* @method Bulk~Batch#retrieve
* @param {Callback.<Array.<RecordResult>>} [callback] - Callback function
* @returns {Promise.<Array.<RecordResult>>}
*/
Batch.prototype.retrieve = function(callback) {
var self = this;
var bulk = this._bulk;
var jobId = this.job.id;
var batchId = this.id;
if (!jobId || !batchId) {
throw new Error("Batch not started.");
}
return bulk._request({
method : 'GET',
path : "/job/" + jobId + "/batch/" + batchId + "/result"
}).then(function(results) {
results = _.map(results, function(ret) {
return {
id: ret.Id || null,
success: ret.Success === "true",
errors: ret.Error ? [ ret.Error ] : []
};
});
self.emit('response', results);
return results;
}, function(err) {
self.emit('error', err);
throw err;
}).thenCall(callback);
};
/**
* @override
*/
Batch.prototype.stream = function() {
if (!this._stream) {
this._stream = new BatchStream(this);
}
return this._stream;
};
/*--------------------------------------------*/
/**
* Batch uploading stream (extends WritableStream)
*
* @private
* @class Bulk~BatchStream
* @extends stream.Stream
*/
var BatchStream = function(batch) {
BatchStream.super_.call(this);
this.batch = batch;
this.writable = true;
};
util.inherits(BatchStream, Stream);
/**
* @private
*/
BatchStream.prototype._getRequestStream = function() {
var batch = this.batch;
var bulk = batch._bulk;
var logger = bulk._logger;
if (!this._reqStream) {
this._reqStream = bulk._request({
method : 'POST',
path : "/job/" + batch.job.id + "/batch",
headers: {
"Content-Type": "text/csv"
},
responseType: "application/xml"
}, function(err, res) {
if (err) {
batch.emit('error', err);
} else {
logger.debug(res.batchInfo);
batch.id = res.batchInfo.id;
batch.emit('queue', res.batchInfo);
}
}).stream();
}
return this._reqStream;
};
/**
* @override
*/
BatchStream.prototype.write = function(data) {
var batch = this.batch;
if (!batch.job.id) {
this._queue(data);
return;
}
return this._getRequestStream().write(data);
};
/**
* @override
*/
BatchStream.prototype.end = function(data) {
var batch = this.batch;
if (!batch.job.id) {
this._ending = true;
if (data) {
this._queue(data);
}
return;
}
this.writable = false;
this._getRequestStream().end(data);
};
/**
* @private
*/
BatchStream.prototype._queue = function(data) {
var bstream = this;
var batch = this.batch;
var job = batch.job;
if (!this._buffer) {
this._buffer = [];
job.open(function(err) {
if (err) {
batch.emit("error", err);
return;
}
bstream._buffer.forEach(function(data) {
bstream.write(data);
});
if (bstream._ending) {
bstream.end();
}
bstream._buffer = [];
});
}
this._buffer.push(data);
};
/*--------------------------------------------*/
/**
* Class for Bulk API
*
* @class
* @param {Connection} conn - Connection object
*/
var Bulk = function(conn) {
this._conn = conn;
this._logger = conn._logger;
};
/**
* Polling interval in milliseconds
* @type {Number}
*/
Bulk.prototype.pollInterval = 1000;
/**
* Polling timeout in milliseconds
* @type {Number}
*/
Bulk.prototype.pollTimeout = 10000;
/** @private **/
Bulk.prototype._request = function(params, callback) {
var conn = this._conn;
params = _.clone(params);
var baseUrl = [ conn.instanceUrl, "services/async", conn.version ].join('/');
params.url = baseUrl + params.path;
var options = {
responseContentType: params.responseType,
beforesend: function(conn, params) {
params.headers["X-SFDC-SESSION"] = conn.accessToken;
},
parseError: function(err) {
return {
code: err.error.exceptionCode,
message: err.error.exceptionMessage
};
}
};
delete params.path;
delete params.responseType;
return this._conn._request(params, callback, options);
};
/**
* Create and start bulkload job and batch
*
* @param {String} type - SObject type
* @param {String} operation - Bulk load operation ('insert', 'update', 'upsert', 'delete', or 'hardDelete')
* @param {Object} [options] - Options for bulk loading operation
* @param {String} [options.extIdField] - External ID field name (used when upsert operation).
* @param {Array.<Record>|stream.Stream|String} [input] - Input source for bulkload. Accepts array of records, CSv string, and CSV data input stream.
* @param {Callback.<Array.<RecordResult>>} [callback] - Callback function
* @returns {Bulk~Batch}
*/
Bulk.prototype.load = function(type, operation, options, input, callback) {
var self = this;
if (!type || !operation) {
throw new Error("Insufficient arguments. At least, 'type' and 'operation' are required.");
}
if (operation.toLowerCase() !== 'upsert') { // options is only for upsert operation
callback = input;
input = options;
options = null;
}
var job = this.createJob(type, operation, options);
var batch = job.createBatch();
var cleanup = function() { job.close(); };
batch.on('response', cleanup);
batch.on('error', cleanup);
batch.on('queue', function() { batch.poll(self.pollInterval, self.pollTimeout); });
return batch.execute(input, callback);
};
/**
* Create a new job instance
*
* @param {String} type - SObject type
* @param {String} operation - Bulk load operation ('insert', 'update', 'upsert', 'delete', or 'hardDelete')
* @param {Object} [options] - Options for bulk loading operation
* @returns {Bulk~Job}
*/
Bulk.prototype.createJob = function(type, operation, options) {
var job = new Job(this, type, operation, options);
job.open();
return job;
};
/**
* Get a job instance specified by given job ID
*
* @param {String} jobId - Job ID
* @returns {Bulk~Job}
*/
Bulk.prototype.job = function(jobId) {
return new Job(this, null, null, null, jobId);
};
/*--------------------------------------------*/
module.exports = Bulk;