/**
* @module proact-streams
*/
/**
* <p>
* Constructs a `ProAct.SubscribableStream`. This is a `Stream` that has a custom `subscribe` function, used to subscribe to a source.
* </p>
*
* This can be used to stream sources like browser events. The stream is lazy, when there are no listeners to it,
* it is not subscribed to the source, on the first listener it is subscribed, when every listener is unsubscibed, it is unsubscribed.
*
* <p>
* `ProAct.SubscribableStream` is part of the `proact-streams` module of ProAct.js.
* </p>
*
* @class ProAct.SubscribableStream
* @constructor
* @extends ProAct.Stream
* @param {Function} subscribe
* A function used to subscribe to a source, when the first listener to this stream is attached.
* @param {String} queueName
* The name of the queue all the updates should be pushed to.
* <p>
* If this parameter is null/undefined the default queue of
* {{#crossLink "ProAct/flow:property"}}{{/crossLink}} is used.
* </p>
* <p>
* If this parameter is not a string it is used as the
* <i>source</i>.
* </p>
* @param {ProAct.Actor} source
* A default source of the stream, can be null.
* <p>
* If this is the only one passed argument and it is a number - it becomes the size of the buffer.
* </p>
* @param {Array} transforms
* A list of transformation to be used on all incoming chages.
* <p>
* If the arguments passed are two and this is a number - it becomes the size of the buffer.
* </p>
*/
function SubscribableStream (subscribe, queueName, source, transforms) {
P.S.call(this, queueName, source, transforms);
this.subscribe = subscribe;
this.unsubscribe = null;
this.subscriptions = 0;
}
ProAct.SubscribableStream = P.SUS = SubscribableStream;
ProAct.SubscribableStream.prototype = P.U.ex(Object.create(P.S.prototype), {
/**
* Reference to the constructor of this object.
*
* @property constructor
* @type ProAct.SubscribableStream
* @final
* @for ProAct.SubscribableStream
*/
constructor: ProAct.SubscribableStream,
/**
* Attaches a new listener to this `ProAct.SubscribableStream`.
*
* The listener may be function or object that defines a <i>call</i> method.
* On the first attached listener the `subscribe` function passed to the constructor will be called.
* That way the stream will be subscribed to custom data source.
*
* ```
* stream.on(function (v) {
* console.log(v);
* });
*
* stream.on('error', function (v) {
* console.error(v);
* });
*
* stream.on({
* call: function (v) {
* console.log(v);
* }
* });
* ```
*
* @for ProAct.SubscribableStream
* @instance
* @method on
* @param {Array|String} actions
* The action/actions to listen for. If this parameter is skipped or null/undefined,
* the actions from {{#crossLink "ProAct.Actor/defaultActions:method"}}{{/crossLink}} are used.
* <p>
* The actions can be skipped and on their place as first parameter to be passed the <i>listener</i>.
* </p>
* @param {Object} listener
* The listener to attach. It must be instance of Function or object with a <i>call</i> method.
* @return {ProAct.SubscribableStream}
* <b>this</b>
*/
on: function (actions, listener) {
if (this.subscriptions === 0) {
this.unsubscribe = this.subscribe(this);
}
this.subscriptions++;
return P.S.prototype.on.call(this, actions, listener);
},
/**
* Removes a <i>listener</i> from the passed <i>action</i>.
*
* If this method is called without parameters, all the listeners for all the actions are removed.
* The listeners are reset using {{#crossLink "ProAct.Actor/defaultActions:method"}}{{/crossLink}}.
*
* If the last listener is removed using this method, `this stream` authomatically unsubscribes
* from the source, using the function, returned by the `subscribe` function passed to the constructor.
*
* Examples are:
*
* Removing a listener:
* ```
* var listener = function (v) {
* console.log(v);
* };
* stream.on(listener);
* stream.off(listener);
* ```
*
* Or for removing all the listeners attached to an stream:
* ```
* stream.off();
* ```
*
* Or for removing all the listeners of a given type attached to an stream:
* ```
* stream.off('error');
* ```
*
* Or for removing a listener from different type of actions:
* ```
* var listener = function (v) {
* console.log(v);
* };
* stream.on(listener);
* stream.onErr(listener);
*
* stream.off(['error', 'change'], listener);
* ```
*
* @for ProAct.SubscribableStream
* @instance
* @method off
* @param {Array|String} actions
* The action/actions to stop listening for. If this parameter is skipped or null/undefined,
* the actions from {{#crossLink "ProAct.Actor/defaultActions:method"}}{{/crossLink}} are used.
* <p>
* The actions can be skipped and on their place as first parameter to be passed the <i>listener</i>.
* </p>
* @param {Object} listener
* The listener to detach. If it is skipped, null or undefined all the listeners are removed from this actor.
* @return {ProAct.SubscribableStream}
* <b>this</b>
*/
off: function (actions, listener) {
this.subscriptions--;
if (!actions && !listener) {
this.subscriptions = 0;
}
if (this.subscriptions < 0) {
this.subscriptions = 0;
}
if (this.subscriptions === 0 && this.unsubscribe) {
this.unsubscribe(this);
}
return P.S.prototype.off.call(this, actions, listener);
}
});