/**
* @module proact-streams
*/
/**
* Creates a {{#crossLink "ProAct.Stream"}}{{/crossLink}} instance.
*
* This method is capable of creating various `source` streams.
*
* For example if the method is called like that:
* ```
* var stream = ProAct.stream();
* ```
*
* A sourceless stream will be created, but it will be possible to invoke `trigger*` on it:
* ```
* stream.trigger(val);
* stream.triggerErr(new Error());
* stream.triggerClose();
* ```
*
* The method can be called with a subscribe function too:
* ```
* var stream = ProAct.stream(function (source) {
* // ... logic using the source - the source is a stream, that has trigger/triggerErr/triggerClose
* $('.sel').on('click.myClick', function (e) {
* source.trigger(e);
* });
*
* return function () {
* // unsubscribing logic
* $('.sel').off('click.myClick');
* };
* });
* ```
*
* So subscribe/unsubscribe to an even source can be programmed using this method.
*
* The first argument can be a string too and if that's the case, {{#crossLink "ProAct.Stream"}}{{/crossLink}}'s
* `fromString` method will be used for the stream construction.
*
* @for ProAct
* @method stream
* @param {String|Function} [subscribe]
* Can be null for no subsbcribe functon, can function to be used for subscribtion to a source or
* can be string to use it with {{#crossLink "ProAct.Stream/fromString:method"}}{{/crossLink}}
* @param {Array} [transformations]
* A list of transformation to be used on all incoming chages.
* @param {ProAct.Actor} source
* A default source of the stream, can be null.
* @param {String} queueName
* The name of the queue all the updates should be pushed to.
* <p>
* If this parameter is null/undefined the default queue of
* {{#crossLink "ProAct/flow:property"}}{{/crossLink}} is used.
* </p>
* <p>
* If this parameter is not a string it is used as the
* <i>source</i>.
* </p>
* @static
* @return {ProAct.Stream}
* A {{#crossLink "ProAct.Stream"}}{{/crossLink}} instance.
*/
function stream (subscribe, transformations, source, queueName) {
var stream;
if (!subscribe) {
stream = new ProAct.Stream(queueName, source, transformations);
} else if (P.U.isFunction(subscribe)) {
stream = new ProAct.SubscribableStream(subscribe, queueName, source, transformations);
} else if (P.U.isString(subscribe)) {
stream = Stream.fromString(subscribe, slice.call(arguments, 1));
}
stream.trigger = StreamUtil.trigger;
stream.triggerErr = StreamUtil.triggerErr;
stream.triggerClose= StreamUtil.triggerClose;
return stream;
}
ProAct.stream = stream;
/**
* Creates a closed {{#crossLink "ProAct.Stream"}}{{/crossLink}}.
*
* @for ProAct
* @method closed
* @static
* @return {ProAct.Stream}
* A closed {{#crossLink "ProAct.Stream"}}{{/crossLink}} instance.
*/
function closed () {
return P.stream().close();
}
ProAct.closed = P.never = closed;
/**
* Creates a {{#crossLink "ProAct.Stream"}}{{/crossLink}}, which emits the passed "value" once and then closes.
* <p>Example:</p>
* <pre>
var stream = ProAct.timeout(1000, 7);
stream.on(function (v) {
console.log(v);
});
// This will print '7' after 1s and will close.
* </pre>
*
* @for ProAct
* @method timeout
* @static
* @param {Number} timeout
* The time to wait (in milliseconds) before emitting the <i>value</i> and close.
* @param {Object} value
* The value to emit.
* @return {ProAct.Stream}
* A {{#crossLink "ProAct.Stream"}}{{/crossLink}} instance.
*/
function timeout (timeout, value) {
var stream = P.stream();
window.setTimeout(function () {
stream.trigger(value);
stream.close();
}, timeout);
return stream;
}
ProAct.timeout = ProAct.later = timeout;
/**
* Creates a {{#crossLink "ProAct.Stream"}}{{/crossLink}}, which emits the passed "value" over and over again at given time interval.
* <p>Example:</p>
* <pre>
var stream = ProAct.interval(1000, 7);
stream.on(function (v) {
console.log(v);
});
// This will print one number on every 1s and the numbers will be 7,7,7,7,7....
* </pre>
*
* @for ProAct
* @method interval
* @static
* @param {Number} interval
* The time in milliseconds on which the <i>value</i> will be emitted.
* @param {Object} value
* The value to emit.
* @return {ProAct.Stream}
* A {{#crossLink "ProAct.Stream"}}{{/crossLink}} instance.
*/
function interval (interval, value) {
var stream = P.stream();
window.setInterval(function () {
stream.trigger(value);
}, interval);
return stream;
}
ProAct.interval = interval;
/**
* Creates a {{#crossLink "ProAct.Stream"}}{{/crossLink}}, which emits values of the passed <i>vals</i> array on the passed <i>interval</i> milliseconds.
* <p>
* When every value is emitted through the stream it is closed.
* <p>
* <p>Example:</p>
* <pre>
var stream = ProAct.seq(1000, [4, 5]);
stream.on(function (v) {
console.log(v);
});
// This will print one number on every 1s and the numbers will be 4 5 and the stream will be closed.
* </pre>
*
* @for ProAct
* @method seq
* @static
* @param {Number} interval
* The time in milliseconds on which a value of the passed <i>vals</i> array will be emitted.
* @param {Array} vals
* The array containing the values to be emitted on the passed <i>interval</i>.
* @return {ProAct.Stream}
* A {{#crossLink "ProAct.Stream"}}{{/crossLink}} instance.
*/
function seq (interval, vals) {
var stream = P.stream(),
operation;
if (vals.length > 0) {
operation = function () {
var value = vals.shift();
stream.trigger(value);
if (vals.length === 0) {
stream.close();
} else {
window.setTimeout(operation, interval);
}
};
window.setTimeout(operation, interval);
}
return stream;
}
ProAct.seq = seq;
/**
* Creates a {{#crossLink "ProAct.Stream"}}{{/crossLink}}, which emits values of the passed <i>vals</i> array on the passed interval.
* <p>
* When every value is emitted through the stream they are emitted again and again and so on...
* <p>
* <p>Example:</p>
* <pre>
var stream = ProAct.repeat(1000, [4, 5]);
stream.on(function (v) {
console.log(v);
});
// This will print one number on every 1s and the numbers will be 4 5 4 5 4 5 4 5 4 5 .. and so on
* </pre>
*
* @for ProAct
* @method interval
* @static
* @param {Number} interval
* The time in milliseconds on which a value of the passed <i>vals</i> array will be emitted.
* @param {Array} vals
* The array containing the values to be emitted on the passed <i>interval</i>.
* @return {ProAct.Stream}
* A {{#crossLink "ProAct.Stream"}}{{/crossLink}} instance.
*/
function repeat (interval, vals) {
var stream = P.stream(), i = 0;
if (vals.length > 0) {
window.setInterval(function () {
if (i === vals.length) {
i = 0;
}
var value = vals[i++];
stream.trigger(value);
}, interval);
}
return stream;
}
ProAct.repeat = repeat;
/**
* The {{#crossLink "ProAct/fromInvoke:method"}}{{/crossLink}} creates a {{#crossLink "ProAct.Stream"}}{{/crossLink}}, which emits the result of the passed
* <i>func</i> argument on every <i>interval</i> milliseconds.
* <p>
* If <i>func</i> returns {{#crossLink "ProAct/closed:method"}}{{/crossLink}} the stream is closed.
* </p>
* <p>Example:</p>
* <pre>
var stream = ProAct.fromInvoke(1000, function () {
return 5;
});
stream.on(function (v) {
console.log(v);
});
// After 1s we'll see '5' in the log, after 2s we'll see a second '5' in the log and so on...
* </pre>
*
* @for ProAct
* @method fromInvoke
* @static
* @param {Number} interval
* The interval on which <i>func</i> will be called and its returned value will
* be triggered into the stream.
* @param {Function} func
* The function to invoke in order to get the value to trigger into the stream.
* @return {ProAct.Stream}
* A {{#crossLink "ProAct.Stream"}}{{/crossLink}} instance.
*/
function fromInvoke (interval, func) {
var stream = P.stream(), id;
id = window.setInterval(function () {
var value = func.call();
if (value !== ProAct.close) {
stream.trigger(value);
} else {
stream.close();
window.clearInterval(id);
}
}, interval);
return stream;
}
ProAct.fromInvoke = fromInvoke;
/**
* Creates a {{#crossLink "ProAct.Stream"}}{{/crossLink}}, which emits the result of an action that uses a callback
* to notify that it is finished.
*
* This can be used to create streams from http requests for example.
*
* Example:
* ```
* var stream = ProAct.fromCallback(action);
* stream.on(function (v) {
* console.log(v);
* });
*
* ```
*
* @for ProAct
* @method fromCallback
* @static
* @param {Function} callbackCaller
* The action that receives a callback.
* @return {ProAct.Stream}
* A {{#crossLink "ProAct.Stream"}}{{/crossLink}} instance.
*/
function fromCallback (callbackCaller) {
var stream = P.stream();
callbackCaller(function (result) {
stream.trigger(result);
stream.close();
});
return stream;
}
ProAct.fromCallback = fromCallback;
attachers = {
addEventListener: 'removeEventListener',
addListener: 'removeListener',
on: 'off'
};
attacherKeys = Object.keys(attachers);
/**
* Creates a {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source an evet emitter or dispatcher,
* it can be used with jQuery for example for example.
*
* Example:
* ```
* var stream = ProAct.fromEventDispatcher($('.some-input'), 'keydown');
* stream.on(function (e) {
* console.log(e.which);
* });
*
* ```
*
* @for ProAct
* @method fromEventDispatcher
* @static
* @param {Object} target
* The event dispatcher, can be a jQuery button, or a DOM element, or somethnig like that.
* @param {String} eventType
* The type of the event - for example 'click'.
* @return {ProAct.Stream}
* A {{#crossLink "ProAct.Stream"}}{{/crossLink}} instance.
*/
function fromEventDispatcher (target, eventType) {
var i, ln = attacherKeys.length,
on, off,
attacher, current;
for (i = 0; i < ln; i++) {
attacher = attacherKeys[i];
current = target[attacher];
if (current && P.U.isFunction(current)) {
on = attacher;
off = attachers[attacher];
break;
}
}
if (on === undefined) {
return null;
}
return new ProAct.SubscribableStream(function (stream) {
target[on](eventType, stream.trigger);
return function (stream) {
target[off](eventType, stream.trigger);
};
});
}
ProAct.fromEventDispatcher = fromEventDispatcher;