- /**
- * The `proact-streams` module provides stateless streams to the ProAct.js API.
- * FRP reactive streams.
- *
- * @module proact-streams
- * @main proact-streams
- */
-
- // PRIVATE
- var StreamUtil = {
- go: function (event, useTransformations) {
- if (this.listeners.change.length === 0) {
- return this;
- }
- if (useTransformations) {
- try {
- event = P.Actor.transform(this, event);
- } catch (e) {
- StreamUtil.triggerErr.call(this, e);
- return this;
- }
- }
-
- if (event === P.Actor.BadValue) {
- return this;
- }
-
- return ActorUtil.update.call(this, event);
- },
-
- triggerMany: function () {
- var i, args = slice.call(arguments), ln = args.length;
-
- for (i = 0; i < ln; i++) {
- this.trigger(args[i], true);
- }
-
- return this;
- },
-
- trigger: function (event, useTransformations) {
- if (useTransformations === undefined) {
- useTransformations = true;
- }
-
- return StreamUtil.go.call(this, event, useTransformations);
- },
-
- triggerErr: function (err) {
- return ActorUtil.update.call(this, err, 'error');
- },
-
- triggerClose: function (data) {
- return ActorUtil.update.call(this, data, 'close');
- }
-
- };
-
- /**
- * <p>
- * Constructs a `ProAct.Stream`.
- * The stream is a simple {{#crossLink "ProAct.Actor"}}{{/crossLink}}, without state.
- * </p>
- * <p>
- * The streams are ment to emit values, events, changes and can be plugged into other `Actors`.
- * For example it is possible to connect multiple streams, to merge them and to separate them,
- * to plug them into properties.
- * </p>
- * <p>
- * The reactive environment consists of the properties and the objects containing them, but
- * the outside world is not reactive. It is possible to use the `ProAct.Streams` as connections from the
- * outside world to the reactive environment.
- * </p>
- * <p>
- * The transformations can be used to change the events or values emitetted.
- * </p>
- * <p>
- * `ProAct.Stream` is part of the proact-streams module of ProAct.js.
- * </p>
- *
- * @class ProAct.Stream
- * @extends ProAct.Actor
- * @constructor
- * @param {String} queueName
- * The name of the queue all the updates should be pushed to.
- * <p>
- * If this parameter is null/undefined the default queue of
- * {{#crossLink "ProAct/flow:property"}}{{/crossLink}} is used.
- * </p>
- * <p>
- * If this parameter is not a string it is used as the
- * <i>source</i>.
- * </p>
- * @param {ProAct.Actor} source
- * A default source of the stream, can be null.
- * @param {Array} transforms
- * A list of transformation to be used on all incoming chages.
- */
- function Stream (queueName, source, transforms) {
- if (queueName && !P.U.isString(queueName)) {
- transforms = source;
- source = queueName;
- queueName = null;
- }
- P.Actor.call(this, queueName, transforms);
-
- this.sourceNumber = 0;
-
- if (source) {
- this.into(source);
- }
- }
- ProAct.Stream = ProAct.S = Stream;
-
- P.U.ex(P.S, {
- fromString: function (str, args) {
- throw new Error('Stream.fromString is not implemented!');
- }
- });
-
- ProAct.Stream.prototype = P.U.ex(Object.create(P.Actor.prototype), {
-
- /**
- * Reference to the constructor of this object.
- *
- * @property constructor
- * @type ProAct.Stream
- * @final
- * @for ProAct.Stream
- */
- constructor: ProAct.Stream,
-
- /**
- * Creates the <i>event</i> to be send to the listeners on update.
- * <p>
- * Streams don't create new events by default, the event is the source.
- * </p>
- *
- * @for ProAct.Stream
- * @protected
- * @instance
- * @method makeEvent
- * @param {ProAct.Event} source
- * The source event of the event. It can be null
- * @return {ProAct.Event}
- * The event.
- */
- makeEvent: function (source) {
- return source;
- },
-
- /**
- * Creates the <i>listener</i> of this stream.
- *
- * @for ProAct.Stream
- * @protected
- * @instance
- * @method makeListener
- * @return {Object}
- * The <i>listener of this stream</i>.
- */
- makeListener: function () {
- if (!this.listener) {
- var stream = this;
- this.listener = function (event) {
- if (stream.trigger) {
- stream.trigger(event, true);
- } else {
- StreamUtil.trigger.call(stream, event, true);
- }
- };
- }
-
- return this.listener;
- },
-
- /**
- * Creates the <i>error listener</i> of this stream.
- * <p>
- * The listener pushes the incoming event into `this Stream` by default.
- * </p>
- *
- * @for ProAct.Stream
- * @protected
- * @instance
- * @method makeErrListener
- * @return {Object}
- * The <i>error listener of this stream</i>.
- */
- makeErrListener: function () {
- if (!this.errListener) {
- var stream = this;
- this.errListener = function (error) {
- if (stream.triggerErr) {
- stream.triggerErr(event);
- } else {
- StreamUtil.triggerErr.call(stream, error);
- }
- };
- }
-
- return this.errListener;
- },
-
- /**
- * Creates the <i>closing listener</i> of this stream.
- *
- * Pushes a closing notification into the stream by default.
- *
- * @for ProAct.Stream
- * @instance
- * @protected
- * @method makeCloseListener
- * @return {Object}
- * The <i>closing listener of this stream</i>.
- */
- makeCloseListener: function () {
- if (!this.closeListener) {
- var stream = this;
- this.closeListener = function (data) {
- if (stream.triggerClose) {
- stream.triggerClose(data);
- } else {
- StreamUtil.triggerClose.call(stream, data);
- }
- };
- }
-
- return this.closeListener;
- },
-
- /**
- * Defers a `ProAct.Actor` listener.
- * <p>
- * For streams this means pushing it to active flow using {{#crossLink "ProAct.Flow/push:method"}}{{/crossLink}}.
- * If the listener is object with 'property' field, it is done using {{#crossLink "ProAct.Actor/defer:method"}}{{/crossLink}}.
- * That way the reactive environment is updated only once, but the streams are not part of it.
- * </p>
- *
- * @for ProAct.Stream
- * @protected
- * @instance
- * @method defer
- * @param {Object} event
- * The event/value to pass to the listener.
- * @param {Object} listener
- * The listener to defer. It should be a function or object defining the <i>call</i> method.
- * @return {ProAct.Actor}
- * <i>this</i>
- */
- defer: function (event, listener) {
- if (!listener) {
- return;
- }
-
- if (listener.property) {
- P.Actor.prototype.defer.call(this, event, listener);
- return;
- }
- var queueName = (listener.queueName) ? listener.queueName : this.queueName;
-
- if (P.U.isFunction(listener)) {
- P.flow.push(queueName, listener, [event]);
- } else {
- P.flow.push(queueName, listener, listener.call, [event]);
- }
- },
-
- /**
- * Creates a new `ProAct.Stream` instance with source <i>this</i> and mapping
- * the passed <i>mapping function</i>.
- *
- * ```
- * var mapped = stream.map(function (v) {
- * return v * v;
- * });
- *
- * mapped.on(function (v) {
- * console.log(v); // squares
- * });
- * ```
- *
- * @for ProAct.Stream
- * @instance
- * @method map
- * @param {Object} mappingFunction
- * Function or object with a <i>call method</i> to use as map function.
- * @return {ProAct.Stream}
- * A new `ProAct.Stream` instance with the <i>mapping</i> applied.
- */
- map: function (mappingFunction) {
- return new P.S(this).mapping(mappingFunction);
- },
-
- /**
- * Creates a new `ProAct.Stream` instance with source <i>this</i> and filtering
- * the passed <i>filtering function</i>.
- *
- * ```
- * var filtered = stream.filter(function (v) {
- * return v % 2 === 1;
- * });
- *
- * filtered.on(function (v) {
- * console.log(v); // odds
- * });
- * ```
- *
- * @for ProAct.Stream
- * @instance
- * @method filter
- * @param {Object} filteringFunction
- * The filtering function or object with a call method, should return boolean.
- * @return {ProAct.Stream}
- * A new `ProAct.Stream` instance with the <i>filtering</i> applied.
- */
- filter: function (filteringFunction) {
- return new P.S(this).filtering(filteringFunction);
- },
-
- /**
- * Creates a new `ProAct.Stream` instance with source <i>this</i> and accumulation
- * the passed <i>accumulation function</i>.
- *
- * ```
- * var acc = stream.accumulate(0, function (p, v) {
- * return p + v;
- * });
- *
- * acc.on(console.log); // sums
- * ```
- *
- * @for ProAct.Stream
- * @instance
- * @method accumulate
- * @param {Object} initVal
- * Initial value for the accumulation. For example '0' for sum.
- * @param {Object} accumulationFunction
- * The function to accumulate.
- * @return {ProAct.Stream}
- * A new `ProAct.Stream` instance with the <i>accumulation</i> applied.
- */
- accumulate: function (initVal, accumulationFunction) {
- return new P.S(this).accumulation(initVal, accumulationFunction);
- },
-
- /**
- * Creates a new `ProAct.Stream` instance that merges this with other streams.
- * The new instance will have new value on value from any of the source streams.
- *
- * ```
- * var merged = stream1.merge(stream2);
- * ```
- *
- * Here if `stream1` emits:
- * 1--2---3----5-----X
- *
- * and `steam2` emits:
- * ----A-----B-----C-----D--X
- *
- * `merged` will emit:
- * 1--2A--3--B-5---C-----D--X
- *
- * @for ProAct.Stream
- * @instance
- * @method merge
- * @param [...]
- * A list of streams to be set as sources.
- * @return {ProAct.Stream}
- * A new `ProAct.Stream` instance with the sources this and all the passed streams.
- */
- merge: function () {
- var sources = [this].concat(slice.call(arguments)),
- result = new P.S();
-
- return P.S.prototype.into.apply(result, sources);
- },
-
- /**
- * Links source actors into this `ProAct.Stream`. This means that <i>this stream</i>
- * is listening for changes from the <i>sources</i>.
- *
- * The streams count their sources and when the sources are zero, they become inactive.
- *
- * ```
- * var stream1 = ProAct.stream();
- * var stream2 = ProAct.stream();
- * var stream = ProAct.stream();
- *
- * stream.into(stream1, stream2);
- * stream.on(function (v) {
- * console.log(v);
- * });
- *
- * ```
- *
- * Now if the any of the source streams is emits,
- * the notification will be printed on the output.
- *
- * @for ProAct.Stream
- * @instance
- * @method into
- * @param [...]
- * Zero or more source {{#crossLink "ProAct.Actor"}}{{/crossLink}}s to set as sources.
- * @return {ProAct.Stream}
- * <b>this</b>
- */
- into: function () {
- ProAct.Actor.prototype.into.apply(this, arguments);
-
- this.sourceNumber += arguments.length;
-
- return this;
- },
-
- /**
- * Checks if <i>this</i> can be closed.
- *
- * Uses the number of the active sources to decide if `this stream` is ready to be closed.
- * If the active sources are zero - it can.
- *
- * @for ProAct.Stream
- * @protected
- * @instance
- * @method canClose
- */
- canClose: function () {
- this.sourceNumber -= 1;
-
- return this.sourceNumber <= 0;
- }
- });
-
- // Methods added to the ProAct.Actor from the proact-streams module.
- P.U.ex(P.Actor.prototype, {
-
- /**
- * Turns this `ProAct.Actor` to a {{#crossLink "ProAct.Stream"}}{{/crossLink}}.
- *
- * In reality this method creates a new `Stream` with source this.
- *
- * @for ProAct.Actor
- * @instance
- * @method toStream
- */
- toStream: function () {
- return new P.S(this.queueName, this);
- },
-
- /**
- * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
- * It skips the first `n` updates incoming from `this`.
- *
- * source : --3---4---5--4---3---4---5--|->
- * skip(3): -------------4---3---4---5--|->
- *
- * @for ProAct.Actor
- * @instance
- * @method skip
- * @param {Number} n The number of notifications to skip.
- */
- skip: function (n) {
- var i = n, self = this;
- return this.fromLambda(function (stream, event) {
- if (event === ProAct.Actor.Close) {
- stream.close();
- return;
- }
-
- i--;
- if (i < 0) {
- self.offAll(stream.lambda);
- stream.into(self);
- stream.trigger(event);
- }
- });
- },
-
- /**
- * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
- * It skips notifications from its source, while a condition is true.
- *
- * ```
- *
- * source.skipWhile(function (v) {
- * return v % 2 === 1;
- * });
- *
- * // source :
- * // --3---5---2--4---3---4---5--|->
- * // skipWhile:
- * // ----------2--4---3---4---5--|->
- *
- * ```
- *
- * @for ProAct.Actor
- * @instance
- * @method skipWhile
- * @param {Function} condition
- * A condition function, which is called for each of the incoming values
- * While it returns true, the elements are skipped,
- * after it returns false for the first time, the current and all the following values are emitted.
- */
- skipWhile: function (condition) {
- var self = this,
- cond = condition ? condition : function (e) {
- return e;
- };
- return this.fromLambda(function (stream, event) {
- if (event === ProAct.Actor.close) {
- stream.close();
- return;
- }
-
- if (!cond(event)) {
- self.offAll(stream.lambda);
- stream.into(self);
- stream.trigger(event);
- }
- });
- },
-
- /**
- * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
- * It skips dublicating elements, comming one after another.
- *
- * ```
- *
- * source.skipDuplicates();
- *
- * // source :
- * // --3---5---5--4---3---3---5--|->
- * // skipDuplicates:
- * // --3---5------4---3-------5--|->
- *
- * ```
- *
- * @for ProAct.Actor
- * @instance
- * @method skipDuplicates
- * @param {Function} comparator
- * A function used to compare the elements.
- * If nothing is passed it defaults to comparing using `===`.
- */
- skipDuplicates: function (comparator) {
- var last = undefined,
- cmp = comparator ? comparator : function (a, b) {
- return a === b;
- };
- return this.fromLambda(function (stream, event) {
- if (!cmp(last, event)) {
- stream.trigger(event);
- last = event;
- }
- });
- },
-
- /**
- * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
- * It emits the difference between the last update and the current incomming update from the source.
- *
- * ```
- *
- * source.diff(0, function(prev, v) {
- * return v - prev;
- * });
- *
- * // source :
- * // --3---5------6---|->
- * // diff:
- * // --3---2------1---|->
- *
- * ```
- *
- * @for ProAct.Actor
- * @instance
- * @method diff
- * @param {Object} seed
- * A value to pass the `differ` function as previous on the inital notification from the source.
- * @param {Function} differ
- * Creates the difference, receives two params - the previous update and the current.
- * It can be skipped - the default `differ` function returns array with two elements - the previous and the curren updates.
- */
- diff: function(seed, differ) {
- var last = seed,
- fn = differ ? differ : function (last, next) {
- return [last, next];
- };
- return this.fromLambda(function (stream, event) {
- if (event === ProAct.Actor.close) {
- stream.close();
- return;
- }
-
- if (last === undefined) {
- last = event;
- return;
- }
-
- stream.trigger(differ(last, event));
- last = event;
- });
- },
-
- /**
- * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
- * It takes the first `limit` updates incoming from `this`.
- *
- * source : --3---4---5--4---3---4---5--|->
- * skip(3): --3---4---5--|->
- *
- * @for ProAct.Actor
- * @instance
- * @method take
- * @param {Number} limit The number of notifications to emit.
- */
- take: function (limit) {
- var left = limit;
- return this.fromLambda(function (stream, event) {
- left--;
- if (left >= 0) {
- stream.trigger(event, true);
- }
- if (left <= 0 && stream.state === ProAct.States.ready) {
- stream.close();
- }
- });
- },
-
- /**
- * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
- * It emits notifications from its source, while a condition is true.
- *
- * ```
- *
- * source.takeWhile(function (v) {
- * return v % 2 === 1;
- * });
- *
- * // source :
- * // --3---5---2--4---3---4---5--|->
- * // takeWhile:
- * // --3---5--|->
- *
- * ```
- *
- * @for ProAct.Actor
- * @instance
- * @method takeWhile
- * @param {Function} condition
- * A condition function, which is called for each of the incoming values
- * While it returns true, the elements are emitted,
- * after it returns false for the first time, the stream created by takeWhile closes.
- */
- takeWhile: function (condition) {
- return this.fromLambda(function (stream, event) {
- if (condition.call(null, event)) {
- stream.trigger(event);
- } else {
- stream.close();
- }
- });
- },
-
- /**
- * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
- * The logic of the stream is implemented through the passed `lambda` parameter.
- *
- * TODO The first parameter of the lambda should be called something else and not stream.
- *
- * ```
- * source.fromLambda(function (stream, notification) {
- * stream.trigger(notification);
- * });
- *
- * // Just forwards notifications..
- *
- * ```
- *
- * @for ProAct.Actor
- * @instance
- * @method fromLambda
- * @param {Function} lambda
- * A function, with two arguments - the returned by this function stream and notification.
- * For every update comming from `this`, the lambda is called with the update and the stream in it.
- * Has the `trigger`, `triggerErr` and `triggerClose` methods.
- */
- fromLambda: function (lambda) {
- var stream = new ProAct.Stream(this.queueName),
- listener = function (e) {
- stream.trigger = StreamUtil.trigger;
- stream.triggerErr = StreamUtil.triggerErr;
- stream.triggerClose = StreamUtil.triggerClose;
-
- lambda.call(null, stream, e);
-
- stream.trigger = undefined;
- stream.triggerErr = undefined;
- stream.triggerClose = undefined;
- };
- this.onAll(listener);
- stream.lambda = listener;
-
- return stream;
- },
-
- /**
- * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
- * For every update incomming from the source, a new `Actor` is created using the `mapper`
- * function. All the updates, emitted by the streams, returned by the `mapper` are emitted by the
- * `Actor` created by `flatMap`
- *
- *
- * ```
- * source.flatMap(function (v) {
- * return ProAct.seq(100, [v, v +1 ]);
- * });
- *
- * // source:
- * // -1---2----4-----3-----2-----1---->
- * // flatMap
- * // -1-2-2-3--4-5---3-4---2-3---1-2-->
- *
- * ```
- *
- * @for ProAct.Actor
- * @instance
- * @method flatMap
- * @param {Function} mapper
- * A function that returns an `ProAct.Actor` using the incomming notification.
- */
- flatMap: function (mapper) {
- return this.fromLambda(function (stream, e) {
- if (e !== P.Actor.Close) {
- var actor = mapper ? mapper.call(null, e) : e;
- stream.into(actor);
- }
- });
- },
-
- /**
- * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
- * For every update incomming from the source, a new `Actor` is created using the `mapper`
- * function. ALl the updates, emitted by the streams, returned by the `mapper` are emitted by the
- * `Actor` created by `flatMap`. The number of the currently active sources is limited by the
- * passed `limit`. All the sources created after the limit is reached are queued and reused as sources later.
- *
- *
- * @for ProAct.Actor
- * @instance
- * @method flatMapLimited
- * @param {Function} mapper
- * A function that returns an `ProAct.Actor` using the incomming notification.
- * @param {Number} limit
- * The number of the currently active sources.
- */
- flatMapLimited: function (mapper, limit) {
- var queue = [], current = 0, addActor = function (stream, actor) {
- if (!actor) {
- return;
- }
- if (current < limit) {
- current++;
- stream.into(actor);
-
- actor.onClose(function () {
- current--;
- actor.offAll(stream.makeListener());
-
- addActor(stream, queue.shift());
- });
- } else {
- queue.push(actor);
- }
- };
-
- return this.fromLambda(function (stream, e) {
- var actor = mapper ? mapper.call(null, e) : e;
-
- addActor(stream, actor);
- });
- },
-
- /**
- * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
- * For every update comming from `this`, a new `ProAct.Actor` is created using the logic
- * passed through `mapper`. This new `Actor` becomes the current source of the `ProAct.Stream`,
- * returned by this method. The next update will create a new source, which will become
- * the current one and replace the old one. This is the same as {{#crossLink "ProAct.Actor/flatMapLimited:method"}}{{/crossLink}},
- * with `limit` of `1`.
- *
- * ```
- * source.flatMapLast(function (v) {
- * return ProAct.seq(100, [v, v + 1, v + 2, v + 3]);
- * });
- *
- * // source:
- * // -1---2----4-----3-----2-----1----|->
- * // flatMapLast
- * // -1-2-2-3-44-5-6-3-4-5-2-3-4-1-2-3-4-|->
- *
- * ```
- *
- * @for ProAct.Actor
- * @instance
- * @method flatMapLast
- * @param {Function} mapper
- * A function that returns an `ProAct.Actor` using the incomming notification.
- */
- flatMapLast: function (mapper) {
- var oldActor;
- return this.fromLambda(function (stream, e) {
- var actor = mapper ? mapper.call(null, e) : e;
- if (oldActor) {
- oldActor.offAll(stream.makeListener());
- }
- oldActor = actor;
- stream.into(actor);
- });
- },
-
- /**
- * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
- * For every update comming from `this`, a new `ProAct.Actor` is created using the logic
- * passed through `mapper`. The first such `Actor` becomes the source of the `Actor`, returned by this
- * method. When it finishes, if a new `Actor` is emitted, it becomes the source.
- *
- * ```
- * source.flatMapLast(function (v) {
- * return ProAct.seq(100, [v, v + 1, v + 2, v + 3]);
- * });
- *
- * // source:
- * // -1---2----4-----3-----2-----1----|->
- * // flatMapFirst
- * // -1-2-3-4--4-5-6-7-----2-3-4-5-|->
- *
- * ```
- *
- * @for ProAct.Actor
- * @instance
- * @method flatMapFirst
- * @param {Function} mapper
- * A function that returns an `ProAct.Actor` using the incomming notification.
- */
- flatMapFirst: function (mapper) {
- var oldActor;
- return this.fromLambda(function (stream, e) {
- if (oldActor && oldActor.state !== ProAct.States.closed) {
- return;
- }
-
- var actor = mapper ? mapper.call(null, e) : e;
- if (oldActor) {
- oldActor.offAll(stream.makeListener());
- }
- oldActor = actor;
- stream.into(actor);
- });
- }
- });
-
- P.S.prototype.t = P.S.prototype.trigger;
- P.S.prototype.tt = P.S.prototype.triggerMany;
-
-