Show:
/**
 * The `proact-streams` module provides stateless streams to the ProAct.js API.
 * FRP reactive streams.
 *
 * @module proact-streams
 * @main proact-streams
 */

// PRIVATE
var StreamUtil = {
  go: function (event, useTransformations) {
    if (this.listeners.change.length === 0) {
      return this;
    }
    if (useTransformations) {
      try {
        event = P.Actor.transform(this, event);
      } catch (e) {
        StreamUtil.triggerErr.call(this, e);
        return this;
      }
    }

    if (event === P.Actor.BadValue) {
      return this;
    }

    return ActorUtil.update.call(this, event);
  },

  triggerMany: function () {
    var i, args = slice.call(arguments), ln = args.length;

    for (i = 0; i < ln; i++) {
      this.trigger(args[i], true);
    }

    return this;
  },

  trigger: function (event, useTransformations) {
    if (useTransformations === undefined) {
      useTransformations = true;
    }

    return StreamUtil.go.call(this, event, useTransformations);
  },

  triggerErr: function (err) {
    return ActorUtil.update.call(this, err, 'error');
  },

  triggerClose: function (data) {
    return ActorUtil.update.call(this, data, 'close');
  }

};

/**
 * <p>
 *  Constructs a `ProAct.Stream`.
 *  The stream is a simple {{#crossLink "ProAct.Actor"}}{{/crossLink}}, without state.
 * </p>
 * <p>
 *  The streams are ment to emit values, events, changes and can be plugged into other `Actors`.
 *  For example it is possible to connect multiple streams, to merge them and to separate them,
 *  to plug them into properties.
 * </p>
 * <p>
 *  The reactive environment consists of the properties and the objects containing them, but
 *  the outside world is not reactive. It is possible to use the `ProAct.Streams` as connections from the
 *  outside world to the reactive environment.
 * </p>
 * <p>
 *    The transformations can be used to change the events or values emitetted.
 * </p>
 * <p>
 *  `ProAct.Stream` is part of the proact-streams module of ProAct.js.
 * </p>
 *
 * @class ProAct.Stream
 * @extends ProAct.Actor
 * @constructor
 * @param {String} queueName
 *      The name of the queue all the updates should be pushed to.
 *      <p>
 *        If this parameter is null/undefined the default queue of
 *        {{#crossLink "ProAct/flow:property"}}{{/crossLink}} is used.
 *      </p>
 *      <p>
 *        If this parameter is not a string it is used as the
 *        <i>source</i>.
 *      </p>
 * @param {ProAct.Actor} source
 *      A default source of the stream, can be null.
 * @param {Array} transforms
 *      A list of transformation to be used on all incoming chages.
 */
function Stream (queueName, source, transforms) {
  if (queueName && !P.U.isString(queueName)) {
    transforms = source;
    source = queueName;
    queueName = null;
  }
  P.Actor.call(this, queueName, transforms);

  this.sourceNumber = 0;

  if (source) {
    this.into(source);
  }
}
ProAct.Stream = ProAct.S = Stream;

P.U.ex(P.S, {
  fromString: function (str, args) {
    throw new Error('Stream.fromString is not implemented!');
  }
});

ProAct.Stream.prototype = P.U.ex(Object.create(P.Actor.prototype), {

  /**
   * Reference to the constructor of this object.
   *
   * @property constructor
   * @type ProAct.Stream
   * @final
   * @for ProAct.Stream
   */
  constructor: ProAct.Stream,

  /**
   * Creates the <i>event</i> to be send to the listeners on update.
   * <p>
   *  Streams don't create new events by default, the event is the source.
   * </p>
   *
   * @for ProAct.Stream
   * @protected
   * @instance
   * @method makeEvent
   * @param {ProAct.Event} source
   *      The source event of the event. It can be null
   * @return {ProAct.Event}
   *      The event.
   */
  makeEvent: function (source) {
    return source;
  },

  /**
   * Creates the <i>listener</i> of this stream.
   *
   * @for ProAct.Stream
   * @protected
   * @instance
   * @method makeListener
   * @return {Object}
   *      The <i>listener of this stream</i>.
   */
  makeListener: function () {
    if (!this.listener) {
      var stream = this;
      this.listener = function (event) {
        if (stream.trigger) {
          stream.trigger(event, true);
        } else {
          StreamUtil.trigger.call(stream, event, true);
        }
      };
    }

    return this.listener;
  },

  /**
   * Creates the <i>error listener</i> of this stream.
   * <p>
   *  The listener pushes the incoming event into `this Stream` by default.
   * </p>
   *
   * @for ProAct.Stream
   * @protected
   * @instance
   * @method makeErrListener
   * @return {Object}
   *      The <i>error listener of this stream</i>.
   */
  makeErrListener: function () {
    if (!this.errListener) {
      var stream = this;
      this.errListener = function (error) {
        if (stream.triggerErr) {
          stream.triggerErr(event);
        } else {
          StreamUtil.triggerErr.call(stream, error);
        }
      };
    }

    return this.errListener;
  },

  /**
   * Creates the <i>closing listener</i> of this stream.
   *
   * Pushes a closing notification into the stream by default.
   *
   * @for ProAct.Stream
   * @instance
   * @protected
   * @method makeCloseListener
   * @return {Object}
   *      The <i>closing listener of this stream</i>.
   */
  makeCloseListener: function () {
    if (!this.closeListener) {
      var stream = this;
      this.closeListener = function (data) {
        if (stream.triggerClose) {
          stream.triggerClose(data);
        } else {
          StreamUtil.triggerClose.call(stream, data);
        }
      };
    }

    return this.closeListener;
  },

  /**
   * Defers a `ProAct.Actor` listener.
   * <p>
   *  For streams this means pushing it to active flow using {{#crossLink "ProAct.Flow/push:method"}}{{/crossLink}}.
   *  If the listener is object with 'property' field, it is done using {{#crossLink "ProAct.Actor/defer:method"}}{{/crossLink}}.
   *  That way the reactive environment is updated only once, but the streams are not part of it.
   * </p>
   *
   * @for ProAct.Stream
   * @protected
   * @instance
   * @method defer
   * @param {Object} event
   *      The event/value to pass to the listener.
   * @param {Object} listener
   *      The listener to defer. It should be a function or object defining the <i>call</i> method.
   * @return {ProAct.Actor}
   *      <i>this</i>
   */
  defer: function (event, listener) {
    if (!listener) {
      return;
    }

    if (listener.property) {
      P.Actor.prototype.defer.call(this, event, listener);
      return;
    }
    var queueName = (listener.queueName) ? listener.queueName : this.queueName;

    if (P.U.isFunction(listener)) {
      P.flow.push(queueName, listener, [event]);
    } else {
      P.flow.push(queueName, listener, listener.call, [event]);
    }
  },

  /**
   * Creates a new `ProAct.Stream` instance with source <i>this</i> and mapping
   * the passed <i>mapping function</i>.
   *
   * ```
   *   var mapped = stream.map(function (v) {
   *     return v * v;
   *   });
   *
   *   mapped.on(function (v) {
   *     console.log(v); // squares
   *   });
   * ```
   *
   * @for ProAct.Stream
   * @instance
   * @method map
   * @param {Object} mappingFunction
   *      Function or object with a <i>call method</i> to use as map function.
   * @return {ProAct.Stream}
   *      A new `ProAct.Stream` instance with the <i>mapping</i> applied.
   */
  map: function (mappingFunction) {
    return new P.S(this).mapping(mappingFunction);
  },

  /**
   * Creates a new `ProAct.Stream` instance with source <i>this</i> and filtering
   * the passed <i>filtering function</i>.
   *
   * ```
   *   var filtered = stream.filter(function (v) {
   *     return v % 2 === 1;
   *   });
   *
   *   filtered.on(function (v) {
   *     console.log(v); // odds
   *   });
   * ```
   *
   * @for ProAct.Stream
   * @instance
   * @method filter
   * @param {Object} filteringFunction
   *      The filtering function or object with a call method, should return boolean.
   * @return {ProAct.Stream}
   *      A new `ProAct.Stream` instance with the <i>filtering</i> applied.
   */
  filter: function (filteringFunction) {
    return new P.S(this).filtering(filteringFunction);
  },

  /**
   * Creates a new `ProAct.Stream` instance with source <i>this</i> and accumulation
   * the passed <i>accumulation function</i>.
   *
   * ```
   *  var acc = stream.accumulate(0, function (p, v) {
   *    return p + v;
   *  });
   *
   *  acc.on(console.log); // sums
   * ```
   *
   * @for ProAct.Stream
   * @instance
   * @method accumulate
   * @param {Object} initVal
   *      Initial value for the accumulation. For example '0' for sum.
   * @param {Object} accumulationFunction
   *      The function to accumulate.
   * @return {ProAct.Stream}
   *      A new `ProAct.Stream` instance with the <i>accumulation</i> applied.
   */
  accumulate: function (initVal, accumulationFunction) {
    return new P.S(this).accumulation(initVal, accumulationFunction);
  },

  /**
   * Creates a new `ProAct.Stream` instance that merges this with other streams.
   * The new instance will have new value on value from any of the source streams.
   *
   * ```
   *  var merged = stream1.merge(stream2);
   * ```
   *
   * Here if `stream1` emits:
   * 1--2---3----5-----X
   *
   * and `steam2` emits:
   * ----A-----B-----C-----D--X
   *
   * `merged` will emit:
   * 1--2A--3--B-5---C-----D--X
   *
   * @for ProAct.Stream
   * @instance
   * @method merge
   * @param [...]
   *      A list of streams to be set as sources.
   * @return {ProAct.Stream}
   *      A new `ProAct.Stream` instance with the sources this and all the passed streams.
   */
  merge: function () {
    var sources = [this].concat(slice.call(arguments)),
        result = new P.S();

    return P.S.prototype.into.apply(result, sources);
  },

  /**
   * Links source actors into this `ProAct.Stream`. This means that <i>this stream</i>
   * is listening for changes from the <i>sources</i>.
   *
   * The streams count their sources and when the sources are zero, they become inactive.
   *
   * ```
   *  var stream1 = ProAct.stream();
   *  var stream2 = ProAct.stream();
   *  var stream = ProAct.stream();
   *
   *  stream.into(stream1, stream2);
   *  stream.on(function (v) {
   *    console.log(v);
   *  });
   *
   * ```
   *
   * Now if the any of the source streams is emits,
   * the notification will be printed on the output.
   *
   * @for ProAct.Stream
   * @instance
   * @method into
   * @param [...]
   *      Zero or more source {{#crossLink "ProAct.Actor"}}{{/crossLink}}s to set as sources.
   * @return {ProAct.Stream}
   *      <b>this</b>
   */
  into: function () {
    ProAct.Actor.prototype.into.apply(this, arguments);

    this.sourceNumber += arguments.length;

    return this;
  },

  /**
   * Checks if <i>this</i> can be closed.
   *
   * Uses the number of the active sources to decide if `this stream` is ready to be closed.
   * If the active sources are zero - it can.
   *
   * @for ProAct.Stream
   * @protected
   * @instance
   * @method canClose
   */
  canClose: function () {
    this.sourceNumber -= 1;

    return this.sourceNumber <= 0;
  }
});

// Methods added to the ProAct.Actor from the proact-streams module.
P.U.ex(P.Actor.prototype, {

  /**
   * Turns this `ProAct.Actor` to a {{#crossLink "ProAct.Stream"}}{{/crossLink}}.
   *
   * In reality this method creates a new `Stream` with source this.
   *
   * @for ProAct.Actor
   * @instance
   * @method toStream
   */
  toStream: function () {
    return new P.S(this.queueName, this);
  },

  /**
   * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
   * It skips the first `n` updates incoming from `this`.
   *
   * source : --3---4---5--4---3---4---5--|->
   * skip(3): -------------4---3---4---5--|->
   *
   * @for ProAct.Actor
   * @instance
   * @method skip
   * @param {Number} n The number of notifications to skip.
   */
  skip: function (n) {
    var i = n, self = this;
    return this.fromLambda(function (stream, event) {
      if (event === ProAct.Actor.Close) {
        stream.close();
        return;
      }

      i--;
      if (i < 0) {
        self.offAll(stream.lambda);
        stream.into(self);
        stream.trigger(event);
      }
    });
  },

  /**
   * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
   * It skips notifications from its source, while a condition is true.
   *
   * ```
   *
   *  source.skipWhile(function (v) {
   *      return v % 2 === 1;
   *  });
   *
   *  // source :
   *  // --3---5---2--4---3---4---5--|->
   *  // skipWhile:
   *  // ----------2--4---3---4---5--|->
   *
   * ```
   *
   * @for ProAct.Actor
   * @instance
   * @method skipWhile
   * @param {Function} condition
   *        A condition function, which is called for each of the incoming values
   *        While it returns true, the elements are skipped,
   *        after it returns false for the first time, the current and all the following values are emitted.
   */
  skipWhile: function (condition) {
    var self = this,
        cond = condition ? condition : function (e) {
          return e;
        };
    return this.fromLambda(function (stream, event) {
      if (event === ProAct.Actor.close) {
        stream.close();
        return;
      }

      if (!cond(event)) {
        self.offAll(stream.lambda);
        stream.into(self);
        stream.trigger(event);
      }
    });
  },

  /**
   * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
   * It skips dublicating elements, comming one after another.
   *
   * ```
   *
   *  source.skipDuplicates();
   *
   *  // source :
   *  // --3---5---5--4---3---3---5--|->
   *  // skipDuplicates:
   *  // --3---5------4---3-------5--|->
   *
   * ```
   *
   * @for ProAct.Actor
   * @instance
   * @method skipDuplicates
   * @param {Function} comparator
   *      A function used to compare the elements.
   *      If nothing is passed it defaults to comparing using `===`.
   */
  skipDuplicates: function (comparator) {
    var last = undefined,
        cmp = comparator ? comparator : function (a, b) {
          return a === b;
        };
    return this.fromLambda(function (stream, event) {
      if (!cmp(last, event)) {
        stream.trigger(event);
        last = event;
      }
    });
  },

  /**
   * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
   * It emits the difference between the last update and the current incomming update from the source.
   *
   * ```
   *
   *  source.diff(0, function(prev, v) {
   *      return v - prev;
   *  });
   *
   *  // source :
   *  // --3---5------6---|->
   *  // diff:
   *  // --3---2------1---|->
   *
   * ```
   *
   * @for ProAct.Actor
   * @instance
   * @method diff
   * @param {Object} seed
   *      A value to pass the `differ` function as previous on the inital notification from the source.
   * @param {Function} differ
   *      Creates the difference, receives two params - the previous update and the current.
   *      It can be skipped - the default `differ` function returns array with two elements - the previous and the curren updates.
   */
  diff: function(seed, differ) {
    var last = seed,
        fn = differ ? differ : function (last, next) {
          return [last, next];
        };
    return this.fromLambda(function (stream, event) {
      if (event === ProAct.Actor.close) {
        stream.close();
        return;
      }

      if (last === undefined) {
        last = event;
        return;
      }

      stream.trigger(differ(last, event));
      last = event;
    });
  },

  /**
   * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
   * It takes the first `limit` updates incoming from `this`.
   *
   * source : --3---4---5--4---3---4---5--|->
   * skip(3): --3---4---5--|->
   *
   * @for ProAct.Actor
   * @instance
   * @method take
   * @param {Number} limit The number of notifications to emit.
   */
  take: function (limit) {
    var left = limit;
    return this.fromLambda(function (stream, event) {
      left--;
      if (left >= 0) {
        stream.trigger(event, true);
      }
      if (left <= 0 && stream.state === ProAct.States.ready) {
        stream.close();
      }
    });
  },

  /**
   * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
   * It emits notifications from its source, while a condition is true.
   *
   * ```
   *
   *  source.takeWhile(function (v) {
   *      return v % 2 === 1;
   *  });
   *
   *  // source :
   *  // --3---5---2--4---3---4---5--|->
   *  // takeWhile:
   *  // --3---5--|->
   *
   * ```
   *
   * @for ProAct.Actor
   * @instance
   * @method takeWhile
   * @param {Function} condition
   *        A condition function, which is called for each of the incoming values
   *        While it returns true, the elements are emitted,
   *        after it returns false for the first time, the stream created by takeWhile closes.
   */
  takeWhile: function (condition) {
    return this.fromLambda(function (stream, event) {
      if (condition.call(null, event)) {
        stream.trigger(event);
      } else {
        stream.close();
      }
    });
  },

  /**
   * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
   * The logic of the stream is implemented through the passed `lambda` parameter.
   *
   * TODO The first parameter of the lambda should be called something else and not stream.
   *
   * ```
   *  source.fromLambda(function (stream, notification) {
   *    stream.trigger(notification);
   *  });
   *
   *  // Just forwards notifications..
   *
   * ```
   *
   * @for ProAct.Actor
   * @instance
   * @method fromLambda
   * @param {Function} lambda
   *      A function, with two arguments - the returned by this function stream and notification.
   *      For every update comming from `this`, the lambda is called with the update and the stream in it.
   *      Has the `trigger`, `triggerErr` and `triggerClose` methods.
   */
  fromLambda: function (lambda) {
    var stream = new ProAct.Stream(this.queueName),
        listener = function (e) {
          stream.trigger = StreamUtil.trigger;
          stream.triggerErr = StreamUtil.triggerErr;
          stream.triggerClose = StreamUtil.triggerClose;

          lambda.call(null, stream, e);

          stream.trigger = undefined;
          stream.triggerErr = undefined;
          stream.triggerClose = undefined;
        };
    this.onAll(listener);
    stream.lambda = listener;

    return stream;
  },

  /**
   * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
   * For every update incomming from the source, a new `Actor` is created using the `mapper`
   * function. All the updates, emitted by the streams, returned by the `mapper` are emitted by the
   * `Actor` created by `flatMap`
   *
   *
   * ```
   *  source.flatMap(function (v) {
   *    return ProAct.seq(100, [v, v +1 ]);
   *  });
   *
   *  // source:
   *  // -1---2----4-----3-----2-----1---->
   *  // flatMap
   *  // -1-2-2-3--4-5---3-4---2-3---1-2-->
   *
   * ```
   *
   * @for ProAct.Actor
   * @instance
   * @method flatMap
   * @param {Function} mapper
   *      A function that returns an `ProAct.Actor` using the incomming notification.
   */
  flatMap: function (mapper) {
    return this.fromLambda(function (stream, e) {
      if (e !== P.Actor.Close) {
        var actor = mapper ? mapper.call(null, e) : e;
        stream.into(actor);
      }
    });
  },

  /**
   * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
   * For every update incomming from the source, a new `Actor` is created using the `mapper`
   * function. ALl the updates, emitted by the streams, returned by the `mapper` are emitted by the
   * `Actor` created by `flatMap`. The number of the currently active sources is limited by the
   * passed `limit`. All the sources created after the limit is reached are queued and reused as sources later.
   *
   *
   * @for ProAct.Actor
   * @instance
   * @method flatMapLimited
   * @param {Function} mapper
   *      A function that returns an `ProAct.Actor` using the incomming notification.
   * @param {Number} limit
   *      The number of the currently active sources.
   */
  flatMapLimited: function (mapper, limit) {
    var queue = [], current = 0, addActor = function (stream, actor) {
      if (!actor) {
        return;
      }
      if (current < limit) {
        current++;
        stream.into(actor);

        actor.onClose(function () {
          current--;
          actor.offAll(stream.makeListener());

          addActor(stream, queue.shift());
        });
      } else {
        queue.push(actor);
      }
    };

    return this.fromLambda(function (stream, e) {
      var actor = mapper ? mapper.call(null, e) : e;

      addActor(stream, actor);
    });
  },

  /**
   * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
   * For every update comming from `this`, a new `ProAct.Actor` is created using the logic
   * passed through `mapper`. This new `Actor` becomes the current source of the `ProAct.Stream`,
   * returned by this method. The next update will create a new source, which will become
   * the current one and replace the old one. This is the same as {{#crossLink "ProAct.Actor/flatMapLimited:method"}}{{/crossLink}},
   * with `limit` of `1`.
   *
   * ```
   *  source.flatMapLast(function (v) {
   *    return ProAct.seq(100, [v, v + 1, v + 2, v + 3]);
   *  });
   *
   *  // source:
   *  // -1---2----4-----3-----2-----1----|->
   *  // flatMapLast
   *  // -1-2-2-3-44-5-6-3-4-5-2-3-4-1-2-3-4-|->
   *
   * ```
   *
   * @for ProAct.Actor
   * @instance
   * @method flatMapLast
   * @param {Function} mapper
   *      A function that returns an `ProAct.Actor` using the incomming notification.
   */
  flatMapLast: function (mapper) {
    var oldActor;
    return this.fromLambda(function (stream, e) {
      var actor = mapper ? mapper.call(null, e) : e;
      if (oldActor) {
        oldActor.offAll(stream.makeListener());
      }
      oldActor = actor;
      stream.into(actor);
    });
  },

  /**
   * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
   * For every update comming from `this`, a new `ProAct.Actor` is created using the logic
   * passed through `mapper`. The first such `Actor` becomes the source of the `Actor`, returned by this
   * method. When it finishes, if a new `Actor` is emitted, it becomes the source.
   *
   * ```
   *  source.flatMapLast(function (v) {
   *    return ProAct.seq(100, [v, v + 1, v + 2, v + 3]);
   *  });
   *
   *  // source:
   *  // -1---2----4-----3-----2-----1----|->
   *  // flatMapFirst
   *  // -1-2-3-4--4-5-6-7-----2-3-4-5-|->
   *
   * ```
   *
   * @for ProAct.Actor
   * @instance
   * @method flatMapFirst
   * @param {Function} mapper
   *      A function that returns an `ProAct.Actor` using the incomming notification.
   */
  flatMapFirst: function (mapper) {
    var oldActor;
    return this.fromLambda(function (stream, e) {
      if (oldActor && oldActor.state !== ProAct.States.closed) {
        return;
      }

      var actor = mapper ? mapper.call(null, e) : e;
      if (oldActor) {
        oldActor.offAll(stream.makeListener());
      }
      oldActor = actor;
      stream.into(actor);
    });
  }
});

P.S.prototype.t = P.S.prototype.trigger;
P.S.prototype.tt = P.S.prototype.triggerMany;