/**
* @file Manages Streaming APIs
* @author Shinichi Tomita <shinichi.tomita@gmail.com>
*/
var events = require('events'),
util = require('util'),
request = require('request'),
async = require('async'),
_ = require('underscore')._,
Faye = require('faye');
/**
* Streaming API topic class
*
* @class Streaming~Topic
* @param {Streaming} steaming - Streaming API object
* @param {String} name - Topic name
*/
var Topic = module.exports = function(streaming, name) {
this._streaming = streaming;
this.name = name;
};
/**
* @typedef {Object} Streaming~StreamingMessage
* @prop {Object} event
* @prop {Object} event.type - Event type
* @prop {Record} sobject - Record information
*/
/**
* Subscribe listener to topic
*
* @method Streaming~Topic#subscribe
* @param {Callback.<Streaming~StreamingMesasge>} listener - Streaming message listener
* @returns {Streaming~Topic}
*/
Topic.prototype.subscribe = function(listener) {
this._streaming.subscribe(this.name, listener);
return this;
};
/**
* Unsubscribe listener from topic
*
* @method Streaming~Topic#unsubscribe
* @param {Callback.<Streaming~StreamingMesasge>} listener - Streaming message listener
* @returns {Streaming~Topic}
*/
Topic.prototype.unsubscribe = function(listener) {
this._streaming.unsubscribe(this.name, listener);
return this;
};
/*--------------------------------------------*/
/**
* Streaming API class
*
* @class
* @extends events.EventEmitter
* @param {Connection} conn - Connection object
*/
var Streaming = function(conn) {
this._conn = conn;
};
util.inherits(Streaming, events.EventEmitter);
/** @private **/
Streaming.prototype._baseUrl = function(name) {
return [ this._conn.instanceUrl, "cometd", this._conn.version ].join('/');
};
/**
* Get named topic
*
* @param {String} name - Topic name
* @returns {Streaming~Topic}
*/
Streaming.prototype.topic = function(name) {
this._topics = this._topics || {};
var topic = this._topics[name] =
this._topics[name] || new Topic(this, name);
return topic;
};
/**
* Subscribe topic
*
* @param {String} name - Topic name
* @param {Callback.<Streaming~StreamingMessage>} listener - Streaming message listener
* @returns {Streaming}
*/
Streaming.prototype.subscribe = function(name, listener) {
if (!this._fayeClient) {
Faye.Transport.NodeHttp.prototype.batching = false; // prevent streaming API server error
this._fayeClient = new Faye.Client(this._baseUrl(), {});
this._fayeClient.setHeader('Authorization', 'OAuth '+this._conn.accessToken);
}
this._fayeClient.subscribe("/topic/"+name, listener);
return this;
};
/**
* Unsubscribe topic
*
* @param {String} name - Topic name
* @param {Callback.<Streaming~StreamingMessage>} listener - Streaming message listener
* @returns {Streaming}
*/
Streaming.prototype.unsubscribe = function(name, listener) {
if (this._fayeClient) {
this._fayeClient.unsubscribe("/topic/"+name, listener);
}
return this;
};
module.exports = Streaming;