Source: record-stream.js

/**
 * @file Represents stream that handles Salesforce record as stream data
 * @author Shinichi Tomita <shinichi.tomita@gmail.com>
 */
var events = require('events'),
    stream = require('stream'),
    Stream = stream.Stream,
    util   = require('util'),
    _      = require('underscore'),
    CSV    = require('./csv');

/**
 * Class for Record Stream
 *
 * @abstract
 * @class
 * @constructor
 * @extends events.EventEmitter
 */
var RecordStream = module.exports = function() {
  this.sendable = false;
  this.receivable = false;
  this.on('error', function() {
    this.sendable = false;
    this.recievable = false;
  });
  this.on('end', function() {
    this.recievable = false;
  });
};

util.inherits(RecordStream, events.EventEmitter);


/*--- Output Record Stream methods (Sendable) ---*/

/**
 * Output record into stream.
 * 
 * @param {Record} record - Record object
 */
RecordStream.prototype.send = function(record) {
  // abstract
};

/**
 * End sending records into stream. 
 */
RecordStream.prototype.end = function() {
  this.sendable = false;
};

/**
 * Destroy record stream;
 */
RecordStream.prototype.destroy = function() {
  this.reciebable = false;
  this.sendable = false;
};

/**
 * Destroy record stream after all record submission in the queue;
 */
RecordStream.prototype.destroySoon = function() {
  //
};


/*--- Input Record Stream methods (Receivable) ---*/

/*
 * Pause record fetch
 * @abstract
 */
RecordStream.prototype.pause = function() {
  // abstract
};

/**
 * Resume record fetch and query execution
 * @abstract
 */
RecordStream.prototype.resume = function() {
  // abstract
};

/**
 * Streaming pipe for record manipulation
 * Originally from Node.js's Stream#pipe 
 * https://github.com/joyent/node/blob/master/lib/stream.js
 *
 * @param {RecordStream} dest - Destination output stream for records
 * @param {Object} [options]
 * @returns {RecordStream}
 */
RecordStream.prototype.pipe = function (dest, options) {
  var source = this;

  var onRecord = function(record) {
    if (dest.send && false === dest.send(record)) { source.pause(); }
  };

  source.on('record', onRecord);

  var onDrain = function() { source.resume(); };

  dest.on('drain', onDrain);

  var didOnEnd = false;
  var onEnd = function() {
    if (didOnEnd) { return; }
    didOnEnd = true;
    dest.end();
  };

  var onClose = function() {
    if (didOnEnd) { return; }
    didOnEnd = true;
    if (typeof dest.destroy === 'function') { dest.destroy(); }
  };

  // If the 'end' option is not supplied, dest.end() will be called when
  // source gets the 'end' or 'close' events.  Only dest.end() once.
  if (!options || options.end !== false) {
    source.on('end', onEnd);
    source.on('close', onClose);
  }

  // don't leave dangling pipes when there are errors.
  var onError = function(err) {
    cleanup();
    if (this.listeners('error').length === 0) {
      throw err; // Unhandled stream error in pipe.
    }
  };

  source.on('error', onError);
  dest.on('error', onError);

  // remove all the event listeners that were added.
  var cleanup = function() {
    source.removeListener('record', onRecord);
    dest.removeListener('drain', onDrain);

    source.removeListener('end', onEnd);
    source.removeListener('close', onClose);

    source.removeListener('error', onError);
    dest.removeListener('error', onError);

    source.removeListener('end', cleanup);
    source.removeListener('close', cleanup);

    dest.removeListener('end', cleanup);
    dest.removeListener('close', cleanup);
  };

  source.on('end', cleanup);
  source.on('close', cleanup);

  dest.on('end', cleanup);
  dest.on('close', cleanup);

  dest.emit('pipe', source);

  // Allow for unix-like usage: A.pipe(B).pipe(C)
  return dest;
};


/**
 * Mapping incoming record from upstream, and pass to downstream
 *
 * @param {RecordMapFunction} fn - Record mapping function
 * @returns {RecordStream}
 */
RecordStream.prototype.map = function(fn) {
  return this.pipe(RecordStream.map(fn));
};

/**
 * Filtering incoming record from upstream, and pass to downstream
 *
 * @param {RecordFilterFunction} fn - Record filtering function
 * @returns {RecordStream}
 */
RecordStream.prototype.filter = function(fn) {
  return this.pipe(RecordStream.filter(fn));
};

/**
 * Create Node.js stream instance for serializing/deserialize records
 *
 * @returns {stream.Stream}
 */
RecordStream.prototype.stream = function(type) {
  type = type || 'csv';
  var recStream;
  if (type === "csv") {
    recStream = new RecordStream.CSVStream();
  }
  if (!recStream) {
    throw new Error("No stream type defined for '"+type+"'.");
  }
  if (this.receivable) {
    this.pipe(recStream);
  } else if (this.sendable) {
    recStream.pipe(this);
  }
  return recStream.stream(); // get Node.js stream instance
};

/* --------------------------------------------------- */

/**
 * @callback RecordMapFunction
 * @param {Record} record - Source record to map
 * @returns {Record}
 */

/**
 * Create a record stream which maps records and pass them to downstream
 *
 * @param {RecordMapFunction} fn - Record mapping function
 * @returns {RecordStream}
 */
RecordStream.map = function(fn) {
  var rstream = new RecordStream();
  rstream.receivable = true;
  rstream.send = function(record) {
    var rec = fn(record) || record; // if not returned record, use same record
    this.emit('record', rec);
  };
  return rstream;
};

/**
 * Create mapping stream using given record template
 *
 * @param {Record} record - Mapping record object. In mapping field value, temlate notation can be used to refer field value in source record, if noeval param is not true.
 * @param {Boolean} [noeval] - Disable template evaluation in mapping record.
 * @returns {RecordStream}
 */
RecordStream.recordMapStream = function(record, noeval) {
  return RecordStream.map(function(rec) {
    var mapped = { Id: rec.Id };
    for (var prop in record) {
      mapped[prop] = noeval ? record[prop] : evalMapping(record[prop], rec);
    }
    return mapped;
  });

  function evalMapping(value, mapping) {
    if (_.isString(value)) {
      var m = /^\$\{(\w+)\}$/.exec(value);
      if (m) { return mapping[m[1]]; }
      return value.replace(/\$\{(\w+)\}/g, function($0, prop) {
        var v = mapping[prop];
        return _.isNull(v) || _.isUndefined(v) ? "" : String(v);
      });
    } else {
      return value;
    }
  }
};

/**
 * @callback RecordFilterFunction
 * @param {Record} record - Source record to filter
 * @returns {Boolean}
 */

/**
 * Create a record stream which filters records and pass them to downstream
 *
 * @param {RecordFilterFunction} fn - Record filtering function
 * @returns {RecordStream}
 */
RecordStream.filter = function(fn) {
  var rstream = new RecordStream();
  rstream.receivable = true;
  rstream.send = function(record) {
    if (fn(record)) {
      this.emit('record', record);
    }
  };
  return rstream;
};


/* --------------------------------------------------- */

/**
 * CSVStream (extends RecordStream implements Receivable, Sendable)
 *
 * @protected
 * @class RecordStream.CSVStream
 * @extends RecordStream
 */
var CSVStream = RecordStream.CSVStream = function(headers) {
  var self = this;
  this.sendable = true;
  this.receivable = true;
  this.headers = headers;
  this.wroteHeaders = false;
  this._stream = new Stream();
  this._buffer = [];
  this._stream.on('data', function(data) { self._handleData(data); });
  this._stream.on('end', function(data) { self._handleEnd(data); });
};

util.inherits(CSVStream, RecordStream);

/**
 *
 * @override
 * @method RecordStream.CSVStream#send
 * @param {Record} record - Record object
 */
CSVStream.prototype.send = function(record) {
  if (!this.wroteHeaders) {
    if (!this.headers) {
      this.headers = CSV.extractHeaders([ record ]);
    }
    this._stream.emit("data", CSV.arrayToCSV(this.headers) + "\n");
    this.wroteHeaders = true;
  }
  this._stream.emit("data", CSV.recordToCSV(record, this.headers) + "\n");
};

/**
 *
 * @override
 * @method RecordStream.CSVStream#end
 * @param {Record} record - Record object
 */
CSVStream.prototype.end = function(record) {
  if (record) { this.send(record); }
  this.readable = false;
  this.sendable = false;
  this._stream.emit("end");
};

/**
 * @private
 */
CSVStream.prototype._handleData = function(data, enc) {
  this._buffer.push([ data, enc ]);
};

/**
 * @private
 */
CSVStream.prototype._handleEnd = function(data, enc) {
  var self = this;
  if (data) {
    this._buffer.push([ data, enc ]);
  }
  data = this._buffer.map(function(d) {
    return d[0].toString(d[1] || 'utf-8');
  }).join('');
  var records = CSV.parseCSV(data);
  records.forEach(function(record) {
    self.emit('record', record);
  });
  this.emit('end');
};

/**
 * Get delegating Node.js stream
 * @override
 * @method RecordStream.CSVStream#stream
 */
CSVStream.prototype.stream = function(record) {
  return this._stream;
};