Show:
  1. /**
  2. * The `proact-streams` module provides stateless streams to the ProAct.js API.
  3. * FRP reactive streams.
  4. *
  5. * @module proact-streams
  6. * @main proact-streams
  7. */
  8.  
  9. // PRIVATE
  10. var StreamUtil = {
  11. go: function (event, useTransformations) {
  12. if (this.listeners.change.length === 0) {
  13. return this;
  14. }
  15. if (useTransformations) {
  16. try {
  17. event = P.Actor.transform(this, event);
  18. } catch (e) {
  19. StreamUtil.triggerErr.call(this, e);
  20. return this;
  21. }
  22. }
  23.  
  24. if (event === P.Actor.BadValue) {
  25. return this;
  26. }
  27.  
  28. return ActorUtil.update.call(this, event);
  29. },
  30.  
  31. triggerMany: function () {
  32. var i, args = slice.call(arguments), ln = args.length;
  33.  
  34. for (i = 0; i < ln; i++) {
  35. this.trigger(args[i], true);
  36. }
  37.  
  38. return this;
  39. },
  40.  
  41. trigger: function (event, useTransformations) {
  42. if (useTransformations === undefined) {
  43. useTransformations = true;
  44. }
  45.  
  46. return StreamUtil.go.call(this, event, useTransformations);
  47. },
  48.  
  49. triggerErr: function (err) {
  50. return ActorUtil.update.call(this, err, 'error');
  51. },
  52.  
  53. triggerClose: function (data) {
  54. return ActorUtil.update.call(this, data, 'close');
  55. }
  56.  
  57. };
  58.  
  59. /**
  60. * <p>
  61. * Constructs a `ProAct.Stream`.
  62. * The stream is a simple {{#crossLink "ProAct.Actor"}}{{/crossLink}}, without state.
  63. * </p>
  64. * <p>
  65. * The streams are ment to emit values, events, changes and can be plugged into other `Actors`.
  66. * For example it is possible to connect multiple streams, to merge them and to separate them,
  67. * to plug them into properties.
  68. * </p>
  69. * <p>
  70. * The reactive environment consists of the properties and the objects containing them, but
  71. * the outside world is not reactive. It is possible to use the `ProAct.Streams` as connections from the
  72. * outside world to the reactive environment.
  73. * </p>
  74. * <p>
  75. * The transformations can be used to change the events or values emitetted.
  76. * </p>
  77. * <p>
  78. * `ProAct.Stream` is part of the proact-streams module of ProAct.js.
  79. * </p>
  80. *
  81. * @class ProAct.Stream
  82. * @extends ProAct.Actor
  83. * @constructor
  84. * @param {String} queueName
  85. * The name of the queue all the updates should be pushed to.
  86. * <p>
  87. * If this parameter is null/undefined the default queue of
  88. * {{#crossLink "ProAct/flow:property"}}{{/crossLink}} is used.
  89. * </p>
  90. * <p>
  91. * If this parameter is not a string it is used as the
  92. * <i>source</i>.
  93. * </p>
  94. * @param {ProAct.Actor} source
  95. * A default source of the stream, can be null.
  96. * @param {Array} transforms
  97. * A list of transformation to be used on all incoming chages.
  98. */
  99. function Stream (queueName, source, transforms) {
  100. if (queueName && !P.U.isString(queueName)) {
  101. transforms = source;
  102. source = queueName;
  103. queueName = null;
  104. }
  105. P.Actor.call(this, queueName, transforms);
  106.  
  107. this.sourceNumber = 0;
  108.  
  109. if (source) {
  110. this.into(source);
  111. }
  112. }
  113. ProAct.Stream = ProAct.S = Stream;
  114.  
  115. P.U.ex(P.S, {
  116. fromString: function (str, args) {
  117. throw new Error('Stream.fromString is not implemented!');
  118. }
  119. });
  120.  
  121. ProAct.Stream.prototype = P.U.ex(Object.create(P.Actor.prototype), {
  122.  
  123. /**
  124. * Reference to the constructor of this object.
  125. *
  126. * @property constructor
  127. * @type ProAct.Stream
  128. * @final
  129. * @for ProAct.Stream
  130. */
  131. constructor: ProAct.Stream,
  132.  
  133. /**
  134. * Creates the <i>event</i> to be send to the listeners on update.
  135. * <p>
  136. * Streams don't create new events by default, the event is the source.
  137. * </p>
  138. *
  139. * @for ProAct.Stream
  140. * @protected
  141. * @instance
  142. * @method makeEvent
  143. * @param {ProAct.Event} source
  144. * The source event of the event. It can be null
  145. * @return {ProAct.Event}
  146. * The event.
  147. */
  148. makeEvent: function (source) {
  149. return source;
  150. },
  151.  
  152. /**
  153. * Creates the <i>listener</i> of this stream.
  154. *
  155. * @for ProAct.Stream
  156. * @protected
  157. * @instance
  158. * @method makeListener
  159. * @return {Object}
  160. * The <i>listener of this stream</i>.
  161. */
  162. makeListener: function () {
  163. if (!this.listener) {
  164. var stream = this;
  165. this.listener = function (event) {
  166. if (stream.trigger) {
  167. stream.trigger(event, true);
  168. } else {
  169. StreamUtil.trigger.call(stream, event, true);
  170. }
  171. };
  172. }
  173.  
  174. return this.listener;
  175. },
  176.  
  177. /**
  178. * Creates the <i>error listener</i> of this stream.
  179. * <p>
  180. * The listener pushes the incoming event into `this Stream` by default.
  181. * </p>
  182. *
  183. * @for ProAct.Stream
  184. * @protected
  185. * @instance
  186. * @method makeErrListener
  187. * @return {Object}
  188. * The <i>error listener of this stream</i>.
  189. */
  190. makeErrListener: function () {
  191. if (!this.errListener) {
  192. var stream = this;
  193. this.errListener = function (error) {
  194. if (stream.triggerErr) {
  195. stream.triggerErr(event);
  196. } else {
  197. StreamUtil.triggerErr.call(stream, error);
  198. }
  199. };
  200. }
  201.  
  202. return this.errListener;
  203. },
  204.  
  205. /**
  206. * Creates the <i>closing listener</i> of this stream.
  207. *
  208. * Pushes a closing notification into the stream by default.
  209. *
  210. * @for ProAct.Stream
  211. * @instance
  212. * @protected
  213. * @method makeCloseListener
  214. * @return {Object}
  215. * The <i>closing listener of this stream</i>.
  216. */
  217. makeCloseListener: function () {
  218. if (!this.closeListener) {
  219. var stream = this;
  220. this.closeListener = function (data) {
  221. if (stream.triggerClose) {
  222. stream.triggerClose(data);
  223. } else {
  224. StreamUtil.triggerClose.call(stream, data);
  225. }
  226. };
  227. }
  228.  
  229. return this.closeListener;
  230. },
  231.  
  232. /**
  233. * Defers a `ProAct.Actor` listener.
  234. * <p>
  235. * For streams this means pushing it to active flow using {{#crossLink "ProAct.Flow/push:method"}}{{/crossLink}}.
  236. * If the listener is object with 'property' field, it is done using {{#crossLink "ProAct.Actor/defer:method"}}{{/crossLink}}.
  237. * That way the reactive environment is updated only once, but the streams are not part of it.
  238. * </p>
  239. *
  240. * @for ProAct.Stream
  241. * @protected
  242. * @instance
  243. * @method defer
  244. * @param {Object} event
  245. * The event/value to pass to the listener.
  246. * @param {Object} listener
  247. * The listener to defer. It should be a function or object defining the <i>call</i> method.
  248. * @return {ProAct.Actor}
  249. * <i>this</i>
  250. */
  251. defer: function (event, listener) {
  252. if (!listener) {
  253. return;
  254. }
  255.  
  256. if (listener.property) {
  257. P.Actor.prototype.defer.call(this, event, listener);
  258. return;
  259. }
  260. var queueName = (listener.queueName) ? listener.queueName : this.queueName;
  261.  
  262. if (P.U.isFunction(listener)) {
  263. P.flow.push(queueName, listener, [event]);
  264. } else {
  265. P.flow.push(queueName, listener, listener.call, [event]);
  266. }
  267. },
  268.  
  269. /**
  270. * Creates a new `ProAct.Stream` instance with source <i>this</i> and mapping
  271. * the passed <i>mapping function</i>.
  272. *
  273. * ```
  274. * var mapped = stream.map(function (v) {
  275. * return v * v;
  276. * });
  277. *
  278. * mapped.on(function (v) {
  279. * console.log(v); // squares
  280. * });
  281. * ```
  282. *
  283. * @for ProAct.Stream
  284. * @instance
  285. * @method map
  286. * @param {Object} mappingFunction
  287. * Function or object with a <i>call method</i> to use as map function.
  288. * @return {ProAct.Stream}
  289. * A new `ProAct.Stream` instance with the <i>mapping</i> applied.
  290. */
  291. map: function (mappingFunction) {
  292. return new P.S(this).mapping(mappingFunction);
  293. },
  294.  
  295. /**
  296. * Creates a new `ProAct.Stream` instance with source <i>this</i> and filtering
  297. * the passed <i>filtering function</i>.
  298. *
  299. * ```
  300. * var filtered = stream.filter(function (v) {
  301. * return v % 2 === 1;
  302. * });
  303. *
  304. * filtered.on(function (v) {
  305. * console.log(v); // odds
  306. * });
  307. * ```
  308. *
  309. * @for ProAct.Stream
  310. * @instance
  311. * @method filter
  312. * @param {Object} filteringFunction
  313. * The filtering function or object with a call method, should return boolean.
  314. * @return {ProAct.Stream}
  315. * A new `ProAct.Stream` instance with the <i>filtering</i> applied.
  316. */
  317. filter: function (filteringFunction) {
  318. return new P.S(this).filtering(filteringFunction);
  319. },
  320.  
  321. /**
  322. * Creates a new `ProAct.Stream` instance with source <i>this</i> and accumulation
  323. * the passed <i>accumulation function</i>.
  324. *
  325. * ```
  326. * var acc = stream.accumulate(0, function (p, v) {
  327. * return p + v;
  328. * });
  329. *
  330. * acc.on(console.log); // sums
  331. * ```
  332. *
  333. * @for ProAct.Stream
  334. * @instance
  335. * @method accumulate
  336. * @param {Object} initVal
  337. * Initial value for the accumulation. For example '0' for sum.
  338. * @param {Object} accumulationFunction
  339. * The function to accumulate.
  340. * @return {ProAct.Stream}
  341. * A new `ProAct.Stream` instance with the <i>accumulation</i> applied.
  342. */
  343. accumulate: function (initVal, accumulationFunction) {
  344. return new P.S(this).accumulation(initVal, accumulationFunction);
  345. },
  346.  
  347. /**
  348. * Creates a new `ProAct.Stream` instance that merges this with other streams.
  349. * The new instance will have new value on value from any of the source streams.
  350. *
  351. * ```
  352. * var merged = stream1.merge(stream2);
  353. * ```
  354. *
  355. * Here if `stream1` emits:
  356. * 1--2---3----5-----X
  357. *
  358. * and `steam2` emits:
  359. * ----A-----B-----C-----D--X
  360. *
  361. * `merged` will emit:
  362. * 1--2A--3--B-5---C-----D--X
  363. *
  364. * @for ProAct.Stream
  365. * @instance
  366. * @method merge
  367. * @param [...]
  368. * A list of streams to be set as sources.
  369. * @return {ProAct.Stream}
  370. * A new `ProAct.Stream` instance with the sources this and all the passed streams.
  371. */
  372. merge: function () {
  373. var sources = [this].concat(slice.call(arguments)),
  374. result = new P.S();
  375.  
  376. return P.S.prototype.into.apply(result, sources);
  377. },
  378.  
  379. /**
  380. * Links source actors into this `ProAct.Stream`. This means that <i>this stream</i>
  381. * is listening for changes from the <i>sources</i>.
  382. *
  383. * The streams count their sources and when the sources are zero, they become inactive.
  384. *
  385. * ```
  386. * var stream1 = ProAct.stream();
  387. * var stream2 = ProAct.stream();
  388. * var stream = ProAct.stream();
  389. *
  390. * stream.into(stream1, stream2);
  391. * stream.on(function (v) {
  392. * console.log(v);
  393. * });
  394. *
  395. * ```
  396. *
  397. * Now if the any of the source streams is emits,
  398. * the notification will be printed on the output.
  399. *
  400. * @for ProAct.Stream
  401. * @instance
  402. * @method into
  403. * @param [...]
  404. * Zero or more source {{#crossLink "ProAct.Actor"}}{{/crossLink}}s to set as sources.
  405. * @return {ProAct.Stream}
  406. * <b>this</b>
  407. */
  408. into: function () {
  409. ProAct.Actor.prototype.into.apply(this, arguments);
  410.  
  411. this.sourceNumber += arguments.length;
  412.  
  413. return this;
  414. },
  415.  
  416. /**
  417. * Checks if <i>this</i> can be closed.
  418. *
  419. * Uses the number of the active sources to decide if `this stream` is ready to be closed.
  420. * If the active sources are zero - it can.
  421. *
  422. * @for ProAct.Stream
  423. * @protected
  424. * @instance
  425. * @method canClose
  426. */
  427. canClose: function () {
  428. this.sourceNumber -= 1;
  429.  
  430. return this.sourceNumber <= 0;
  431. }
  432. });
  433.  
  434. // Methods added to the ProAct.Actor from the proact-streams module.
  435. P.U.ex(P.Actor.prototype, {
  436.  
  437. /**
  438. * Turns this `ProAct.Actor` to a {{#crossLink "ProAct.Stream"}}{{/crossLink}}.
  439. *
  440. * In reality this method creates a new `Stream` with source this.
  441. *
  442. * @for ProAct.Actor
  443. * @instance
  444. * @method toStream
  445. */
  446. toStream: function () {
  447. return new P.S(this.queueName, this);
  448. },
  449.  
  450. /**
  451. * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
  452. * It skips the first `n` updates incoming from `this`.
  453. *
  454. * source : --3---4---5--4---3---4---5--|->
  455. * skip(3): -------------4---3---4---5--|->
  456. *
  457. * @for ProAct.Actor
  458. * @instance
  459. * @method skip
  460. * @param {Number} n The number of notifications to skip.
  461. */
  462. skip: function (n) {
  463. var i = n, self = this;
  464. return this.fromLambda(function (stream, event) {
  465. if (event === ProAct.Actor.Close) {
  466. stream.close();
  467. return;
  468. }
  469.  
  470. i--;
  471. if (i < 0) {
  472. self.offAll(stream.lambda);
  473. stream.into(self);
  474. stream.trigger(event);
  475. }
  476. });
  477. },
  478.  
  479. /**
  480. * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
  481. * It skips notifications from its source, while a condition is true.
  482. *
  483. * ```
  484. *
  485. * source.skipWhile(function (v) {
  486. * return v % 2 === 1;
  487. * });
  488. *
  489. * // source :
  490. * // --3---5---2--4---3---4---5--|->
  491. * // skipWhile:
  492. * // ----------2--4---3---4---5--|->
  493. *
  494. * ```
  495. *
  496. * @for ProAct.Actor
  497. * @instance
  498. * @method skipWhile
  499. * @param {Function} condition
  500. * A condition function, which is called for each of the incoming values
  501. * While it returns true, the elements are skipped,
  502. * after it returns false for the first time, the current and all the following values are emitted.
  503. */
  504. skipWhile: function (condition) {
  505. var self = this,
  506. cond = condition ? condition : function (e) {
  507. return e;
  508. };
  509. return this.fromLambda(function (stream, event) {
  510. if (event === ProAct.Actor.close) {
  511. stream.close();
  512. return;
  513. }
  514.  
  515. if (!cond(event)) {
  516. self.offAll(stream.lambda);
  517. stream.into(self);
  518. stream.trigger(event);
  519. }
  520. });
  521. },
  522.  
  523. /**
  524. * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
  525. * It skips dublicating elements, comming one after another.
  526. *
  527. * ```
  528. *
  529. * source.skipDuplicates();
  530. *
  531. * // source :
  532. * // --3---5---5--4---3---3---5--|->
  533. * // skipDuplicates:
  534. * // --3---5------4---3-------5--|->
  535. *
  536. * ```
  537. *
  538. * @for ProAct.Actor
  539. * @instance
  540. * @method skipDuplicates
  541. * @param {Function} comparator
  542. * A function used to compare the elements.
  543. * If nothing is passed it defaults to comparing using `===`.
  544. */
  545. skipDuplicates: function (comparator) {
  546. var last = undefined,
  547. cmp = comparator ? comparator : function (a, b) {
  548. return a === b;
  549. };
  550. return this.fromLambda(function (stream, event) {
  551. if (!cmp(last, event)) {
  552. stream.trigger(event);
  553. last = event;
  554. }
  555. });
  556. },
  557.  
  558. /**
  559. * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
  560. * It emits the difference between the last update and the current incomming update from the source.
  561. *
  562. * ```
  563. *
  564. * source.diff(0, function(prev, v) {
  565. * return v - prev;
  566. * });
  567. *
  568. * // source :
  569. * // --3---5------6---|->
  570. * // diff:
  571. * // --3---2------1---|->
  572. *
  573. * ```
  574. *
  575. * @for ProAct.Actor
  576. * @instance
  577. * @method diff
  578. * @param {Object} seed
  579. * A value to pass the `differ` function as previous on the inital notification from the source.
  580. * @param {Function} differ
  581. * Creates the difference, receives two params - the previous update and the current.
  582. * It can be skipped - the default `differ` function returns array with two elements - the previous and the curren updates.
  583. */
  584. diff: function(seed, differ) {
  585. var last = seed,
  586. fn = differ ? differ : function (last, next) {
  587. return [last, next];
  588. };
  589. return this.fromLambda(function (stream, event) {
  590. if (event === ProAct.Actor.close) {
  591. stream.close();
  592. return;
  593. }
  594.  
  595. if (last === undefined) {
  596. last = event;
  597. return;
  598. }
  599.  
  600. stream.trigger(differ(last, event));
  601. last = event;
  602. });
  603. },
  604.  
  605. /**
  606. * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
  607. * It takes the first `limit` updates incoming from `this`.
  608. *
  609. * source : --3---4---5--4---3---4---5--|->
  610. * skip(3): --3---4---5--|->
  611. *
  612. * @for ProAct.Actor
  613. * @instance
  614. * @method take
  615. * @param {Number} limit The number of notifications to emit.
  616. */
  617. take: function (limit) {
  618. var left = limit;
  619. return this.fromLambda(function (stream, event) {
  620. left--;
  621. if (left >= 0) {
  622. stream.trigger(event, true);
  623. }
  624. if (left <= 0 && stream.state === ProAct.States.ready) {
  625. stream.close();
  626. }
  627. });
  628. },
  629.  
  630. /**
  631. * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
  632. * It emits notifications from its source, while a condition is true.
  633. *
  634. * ```
  635. *
  636. * source.takeWhile(function (v) {
  637. * return v % 2 === 1;
  638. * });
  639. *
  640. * // source :
  641. * // --3---5---2--4---3---4---5--|->
  642. * // takeWhile:
  643. * // --3---5--|->
  644. *
  645. * ```
  646. *
  647. * @for ProAct.Actor
  648. * @instance
  649. * @method takeWhile
  650. * @param {Function} condition
  651. * A condition function, which is called for each of the incoming values
  652. * While it returns true, the elements are emitted,
  653. * after it returns false for the first time, the stream created by takeWhile closes.
  654. */
  655. takeWhile: function (condition) {
  656. return this.fromLambda(function (stream, event) {
  657. if (condition.call(null, event)) {
  658. stream.trigger(event);
  659. } else {
  660. stream.close();
  661. }
  662. });
  663. },
  664.  
  665. /**
  666. * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
  667. * The logic of the stream is implemented through the passed `lambda` parameter.
  668. *
  669. * TODO The first parameter of the lambda should be called something else and not stream.
  670. *
  671. * ```
  672. * source.fromLambda(function (stream, notification) {
  673. * stream.trigger(notification);
  674. * });
  675. *
  676. * // Just forwards notifications..
  677. *
  678. * ```
  679. *
  680. * @for ProAct.Actor
  681. * @instance
  682. * @method fromLambda
  683. * @param {Function} lambda
  684. * A function, with two arguments - the returned by this function stream and notification.
  685. * For every update comming from `this`, the lambda is called with the update and the stream in it.
  686. * Has the `trigger`, `triggerErr` and `triggerClose` methods.
  687. */
  688. fromLambda: function (lambda) {
  689. var stream = new ProAct.Stream(this.queueName),
  690. listener = function (e) {
  691. stream.trigger = StreamUtil.trigger;
  692. stream.triggerErr = StreamUtil.triggerErr;
  693. stream.triggerClose = StreamUtil.triggerClose;
  694.  
  695. lambda.call(null, stream, e);
  696.  
  697. stream.trigger = undefined;
  698. stream.triggerErr = undefined;
  699. stream.triggerClose = undefined;
  700. };
  701. this.onAll(listener);
  702. stream.lambda = listener;
  703.  
  704. return stream;
  705. },
  706.  
  707. /**
  708. * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
  709. * For every update incomming from the source, a new `Actor` is created using the `mapper`
  710. * function. All the updates, emitted by the streams, returned by the `mapper` are emitted by the
  711. * `Actor` created by `flatMap`
  712. *
  713. *
  714. * ```
  715. * source.flatMap(function (v) {
  716. * return ProAct.seq(100, [v, v +1 ]);
  717. * });
  718. *
  719. * // source:
  720. * // -1---2----4-----3-----2-----1---->
  721. * // flatMap
  722. * // -1-2-2-3--4-5---3-4---2-3---1-2-->
  723. *
  724. * ```
  725. *
  726. * @for ProAct.Actor
  727. * @instance
  728. * @method flatMap
  729. * @param {Function} mapper
  730. * A function that returns an `ProAct.Actor` using the incomming notification.
  731. */
  732. flatMap: function (mapper) {
  733. return this.fromLambda(function (stream, e) {
  734. if (e !== P.Actor.Close) {
  735. var actor = mapper ? mapper.call(null, e) : e;
  736. stream.into(actor);
  737. }
  738. });
  739. },
  740.  
  741. /**
  742. * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
  743. * For every update incomming from the source, a new `Actor` is created using the `mapper`
  744. * function. ALl the updates, emitted by the streams, returned by the `mapper` are emitted by the
  745. * `Actor` created by `flatMap`. The number of the currently active sources is limited by the
  746. * passed `limit`. All the sources created after the limit is reached are queued and reused as sources later.
  747. *
  748. *
  749. * @for ProAct.Actor
  750. * @instance
  751. * @method flatMapLimited
  752. * @param {Function} mapper
  753. * A function that returns an `ProAct.Actor` using the incomming notification.
  754. * @param {Number} limit
  755. * The number of the currently active sources.
  756. */
  757. flatMapLimited: function (mapper, limit) {
  758. var queue = [], current = 0, addActor = function (stream, actor) {
  759. if (!actor) {
  760. return;
  761. }
  762. if (current < limit) {
  763. current++;
  764. stream.into(actor);
  765.  
  766. actor.onClose(function () {
  767. current--;
  768. actor.offAll(stream.makeListener());
  769.  
  770. addActor(stream, queue.shift());
  771. });
  772. } else {
  773. queue.push(actor);
  774. }
  775. };
  776.  
  777. return this.fromLambda(function (stream, e) {
  778. var actor = mapper ? mapper.call(null, e) : e;
  779.  
  780. addActor(stream, actor);
  781. });
  782. },
  783.  
  784. /**
  785. * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
  786. * For every update comming from `this`, a new `ProAct.Actor` is created using the logic
  787. * passed through `mapper`. This new `Actor` becomes the current source of the `ProAct.Stream`,
  788. * returned by this method. The next update will create a new source, which will become
  789. * the current one and replace the old one. This is the same as {{#crossLink "ProAct.Actor/flatMapLimited:method"}}{{/crossLink}},
  790. * with `limit` of `1`.
  791. *
  792. * ```
  793. * source.flatMapLast(function (v) {
  794. * return ProAct.seq(100, [v, v + 1, v + 2, v + 3]);
  795. * });
  796. *
  797. * // source:
  798. * // -1---2----4-----3-----2-----1----|->
  799. * // flatMapLast
  800. * // -1-2-2-3-44-5-6-3-4-5-2-3-4-1-2-3-4-|->
  801. *
  802. * ```
  803. *
  804. * @for ProAct.Actor
  805. * @instance
  806. * @method flatMapLast
  807. * @param {Function} mapper
  808. * A function that returns an `ProAct.Actor` using the incomming notification.
  809. */
  810. flatMapLast: function (mapper) {
  811. var oldActor;
  812. return this.fromLambda(function (stream, e) {
  813. var actor = mapper ? mapper.call(null, e) : e;
  814. if (oldActor) {
  815. oldActor.offAll(stream.makeListener());
  816. }
  817. oldActor = actor;
  818. stream.into(actor);
  819. });
  820. },
  821.  
  822. /**
  823. * Creates a new {{#crossLink "ProAct.Stream"}}{{/crossLink}} with source - `this`.
  824. * For every update comming from `this`, a new `ProAct.Actor` is created using the logic
  825. * passed through `mapper`. The first such `Actor` becomes the source of the `Actor`, returned by this
  826. * method. When it finishes, if a new `Actor` is emitted, it becomes the source.
  827. *
  828. * ```
  829. * source.flatMapLast(function (v) {
  830. * return ProAct.seq(100, [v, v + 1, v + 2, v + 3]);
  831. * });
  832. *
  833. * // source:
  834. * // -1---2----4-----3-----2-----1----|->
  835. * // flatMapFirst
  836. * // -1-2-3-4--4-5-6-7-----2-3-4-5-|->
  837. *
  838. * ```
  839. *
  840. * @for ProAct.Actor
  841. * @instance
  842. * @method flatMapFirst
  843. * @param {Function} mapper
  844. * A function that returns an `ProAct.Actor` using the incomming notification.
  845. */
  846. flatMapFirst: function (mapper) {
  847. var oldActor;
  848. return this.fromLambda(function (stream, e) {
  849. if (oldActor && oldActor.state !== ProAct.States.closed) {
  850. return;
  851. }
  852.  
  853. var actor = mapper ? mapper.call(null, e) : e;
  854. if (oldActor) {
  855. oldActor.offAll(stream.makeListener());
  856. }
  857. oldActor = actor;
  858. stream.into(actor);
  859. });
  860. }
  861. });
  862.  
  863. P.S.prototype.t = P.S.prototype.trigger;
  864. P.S.prototype.tt = P.S.prototype.triggerMany;
  865.