Show:

Constructs a ProAct.SubscribableStream. This is a Stream that has a custom subscribe function, used to subscribe to a source.

This can be used to stream sources like browser events. The stream is lazy, when there are no listeners to it, it is not subscribed to the source, on the first listener it is subscribed, when every listener is unsubscibed, it is unsubscribed.

ProAct.SubscribableStream is part of the proact-streams module of ProAct.js.

Constructor

ProAct.SubscribableStream

(
  • subscribe
  • queueName
  • source
  • transforms
)

Parameters:

  • subscribe Function

    A function used to subscribe to a source, when the first listener to this stream is attached.

  • queueName String

    The name of the queue all the updates should be pushed to.

    If this parameter is null/undefined the default queue of flow is used.

    If this parameter is not a string it is used as the source.

  • source ProAct.Actor

    A default source of the stream, can be null.

    If this is the only one passed argument and it is a number - it becomes the size of the buffer.

  • transforms Array

    A list of transformation to be used on all incoming chages.

    If the arguments passed are two and this is a number - it becomes the size of the buffer.

Methods

accumulate

(
  • initVal
  • accumulationFunction
)
ProAct.Stream

Inherited from ProAct.Actor but overwritten in src/js/streams/stream.js:321

Creates a new ProAct.Stream instance with source this and accumulation the passed accumulation function.

 var acc = stream.accumulate(0, function (p, v) {
   return p + v;
 });

 acc.on(console.log); // sums

Parameters:

  • initVal Object

    Initial value for the accumulation. For example '0' for sum.

  • accumulationFunction Object

    The function to accumulate.

Returns:

ProAct.Stream:

A new ProAct.Stream instance with the accumulation applied.

accumulation

(
  • initVal
  • accumulationFunction
)
ProAct.Actor protected

Adds an accumulation transformation to this actor.

Accumulation is used to compute a value based on the previous one.

Parameters:

  • initVal Object

    Initial value for the accumulation. For example '0' for sum.

  • accumulationFunction Object

    The function to accumulate.

Returns:

ProAct.Actor:

this

afterInit

() protected

Called automatically after initialization of this actor.

By default it changes the state of this to ready.

It can be overridden to define more complex initialization logic.

beforeDestroy

() protected

Called immediately before destruction.

The idea is to be implemented by extenders to free additional resources on destroy.

bufferit

(
  • size
)
ProAct.SizeBufferedStream

Creates a new ProAct.SizeBufferedStream instance having as source this.

Parameters:

  • size Number

    The size of the buffer of the new ProAct.SizeBufferedStream.

canClose

() protected

Inherited from ProAct.Actor but overwritten in src/js/streams/stream.js:416

Checks if this can be closed.

Uses the number of the active sources to decide if this stream is ready to be closed. If the active sources are zero - it can.

close

() ProAct.Actor

Closes this actor => it state becomes closed.

This sends a close event to all the subscribers to closing.

After closing the actor it can't emit events anymore.

Example:

 var actor = new ProAct.Actor();
 actor.onClose(function () {
   console.log('Done!');
 });

 actor.close(); // We will see 'Done!' on the console output.

Returns:

ProAct.Actor:

This instance - can be chained.

defaultActions

() Array | String protected

A list of actions or action to be used when no action is passed for the methods working with actions.

Returns:

Array | String:

The actions to be used if no actions are provided to action related methods, like on, off, update.

defaultListeners

() Object protected

Generates the initial listeners object. It can be overridden for alternative listeners collections. It is used for resetting all the listeners too.

The default types of listeners are:

 {
   change: [],
   error: [],
   close: []
 }

Returns:

Object:

A map containing the default listeners collections.

defer

(
  • event
  • listener
)
ProAct.Actor protected

Inherited from ProAct.Actor but overwritten in src/js/streams/stream.js:232

Defers a ProAct.Actor listener.

For streams this means pushing it to active flow using push. If the listener is object with 'property' field, it is done using defer. That way the reactive environment is updated only once, but the streams are not part of it.

Parameters:

  • event Object

    The event/value to pass to the listener.

  • listener Object

    The listener to defer. It should be a function or object defining the call method.

Returns:

ProAct.Actor:

this

delay

(
  • delay
)
ProAct.DelayedStream

Creates a new ProAct.DelayedStream instance having as source this.

Parameters:

  • delay Number

    The time delay to be used for flushing the buffer of the new stream.

destroy

()

Destroys this ProAct.Actor instance.

The state of this is set to destroyed.

Calls beforeDestroy

diff

(
  • seed
  • differ
)

Creates a new ProAct.Stream with source - this. It emits the difference between the last update and the current incomming update from the source.


 source.diff(0, function(prev, v) {
     return v - prev;
 });

 // source :
 // --3---5------6---|->
 // diff:
 // --3---2------1---|->

Parameters:

  • seed Object

    A value to pass the differ function as previous on the inital notification from the source.

  • differ Function

    Creates the difference, receives two params - the previous update and the current. It can be skipped - the default differ function returns array with two elements - the previous and the curren updates.

doClose

() protected

This method is called when a close event is pushed to this Actor.

It removes all the subscriptions to the Actor and sets its state to closed.

Do not call this method; it is private!

doInit

() protected

Allocating of resources or initializing is done here.

Empty by default.

filter

(
  • filteringFunction
)
ProAct.Stream

Inherited from ProAct.Actor but overwritten in src/js/streams/stream.js:295

Creates a new ProAct.Stream instance with source this and filtering the passed filtering function.

  var filtered = stream.filter(function (v) {
    return v % 2 === 1;
  });

  filtered.on(function (v) {
    console.log(v); // odds
  });

Parameters:

  • filteringFunction Object

    The filtering function or object with a call method, should return boolean.

Returns:

ProAct.Stream:

A new ProAct.Stream instance with the filtering applied.

filtering

(
  • filteringFunction
)
ProAct.Actor protected

Adds a filtering transformation to this actor.

Filtering can be used to filter the incoming update values. For example you can filter by only odd numbers as update values.

Parameters:

  • filteringFunction Object

    The filtering function or object with a call method, should return boolean.

Returns:

ProAct.Actor:

this

flatMap

(
  • mapper
)

Creates a new ProAct.Stream with source - this. For every update incomming from the source, a new Actor is created using the mapper function. All the updates, emitted by the streams, returned by the mapper are emitted by the Actor created by flatMap

 source.flatMap(function (v) {
   return ProAct.seq(100, [v, v +1 ]);
 });

 // source:
 // -1---2----4-----3-----2-----1---->
 // flatMap
 // -1-2-2-3--4-5---3-4---2-3---1-2-->

Parameters:

  • mapper Function

    A function that returns an ProAct.Actor using the incomming notification.

flatMapFirst

(
  • mapper
)

Creates a new ProAct.Stream with source - this. For every update comming from this, a new ProAct.Actor is created using the logic passed through mapper. The first such Actor becomes the source of the Actor, returned by this method. When it finishes, if a new Actor is emitted, it becomes the source.

 source.flatMapLast(function (v) {
   return ProAct.seq(100, [v, v + 1, v + 2, v + 3]);
 });

 // source:
 // -1---2----4-----3-----2-----1----|->
 // flatMapFirst
 // -1-2-3-4--4-5-6-7-----2-3-4-5-|->

Parameters:

  • mapper Function

    A function that returns an ProAct.Actor using the incomming notification.

flatMapLast

(
  • mapper
)

Creates a new ProAct.Stream with source - this. For every update comming from this, a new ProAct.Actor is created using the logic passed through mapper. This new Actor becomes the current source of the ProAct.Stream, returned by this method. The next update will create a new source, which will become the current one and replace the old one. This is the same as flatMapLimited, with limit of 1.

 source.flatMapLast(function (v) {
   return ProAct.seq(100, [v, v + 1, v + 2, v + 3]);
 });

 // source:
 // -1---2----4-----3-----2-----1----|->
 // flatMapLast
 // -1-2-2-3-44-5-6-3-4-5-2-3-4-1-2-3-4-|->

Parameters:

  • mapper Function

    A function that returns an ProAct.Actor using the incomming notification.

flatMapLimited

(
  • mapper
  • limit
)

Creates a new ProAct.Stream with source - this. For every update incomming from the source, a new Actor is created using the mapper function. ALl the updates, emitted by the streams, returned by the mapper are emitted by the Actor created by flatMap. The number of the currently active sources is limited by the passed limit. All the sources created after the limit is reached are queued and reused as sources later.

Parameters:

  • mapper Function

    A function that returns an ProAct.Actor using the incomming notification.

  • limit Number

    The number of the currently active sources.

fromLambda

(
  • lambda
)

Creates a new ProAct.Stream with source - this. The logic of the stream is implemented through the passed lambda parameter.

TODO The first parameter of the lambda should be called something else and not stream.

 source.fromLambda(function (stream, notification) {
   stream.trigger(notification);
 });

 // Just forwards notifications..

Parameters:

  • lambda Function

    A function, with two arguments - the returned by this function stream and notification. For every update comming from this, the lambda is called with the update and the stream in it. Has the trigger, triggerErr and triggerClose methods.

init

()

Initializes this actor.

This method logic is run only if the current state of this is init.

Then afterInit is called to finish the initialization.

into

(
  • [...]
)
ProAct.Stream

Inherited from ProAct.Actor but overwritten in src/js/streams/stream.js:379

Links source actors into this ProAct.Stream. This means that this stream is listening for changes from the sources.

The streams count their sources and when the sources are zero, they become inactive.

 var stream1 = ProAct.stream();
 var stream2 = ProAct.stream();
 var stream = ProAct.stream();

 stream.into(stream1, stream2);
 stream.on(function (v) {
   console.log(v);
 });

Now if the any of the source streams is emits, the notification will be printed on the output.

Parameters:

  • [...] #crossLink "ProAct.Actor" optional

    Zero or more source }{{/crossLink}}s to set as sources.

Returns:

makeCloseListener

() Object protected

Inherited from ProAct.Actor but overwritten in src/js/streams/stream.js:205

Creates the closing listener of this stream.

Pushes a closing notification into the stream by default.

Returns:

Object:

The closing listener of this stream.

makeErrListener

() Object protected

Inherited from ProAct.Actor but overwritten in src/js/streams/stream.js:177

Creates the error listener of this stream.

The listener pushes the incoming event into this Stream by default.

Returns:

Object:

The error listener of this stream.

makeEvent

(
  • source
)
ProAct.Event protected

Inherited from ProAct.Actor but overwritten in src/js/streams/stream.js:133

Creates the event to be send to the listeners on update.

Streams don't create new events by default, the event is the source.

Parameters:

  • source ProAct.Event

    The source event of the event. It can be null

Returns:

ProAct.Event:

The event.

makeListener

() Object protected

Inherited from ProAct.Actor but overwritten in src/js/streams/stream.js:152

Creates the listener of this stream.

Returns:

Object:

The listener of this stream.

map

(
  • mappingFunction
)
ProAct.Stream

Inherited from ProAct.Actor but overwritten in src/js/streams/stream.js:269

Creates a new ProAct.Stream instance with source this and mapping the passed mapping function.

  var mapped = stream.map(function (v) {
    return v * v;
  });

  mapped.on(function (v) {
    console.log(v); // squares
  });

Parameters:

  • mappingFunction Object

    Function or object with a call method to use as map function.

Returns:

ProAct.Stream:

A new ProAct.Stream instance with the mapping applied.

mapping

(
  • mappingFunction
)
ProAct.Actor protected

Adds a mapping transformation to this actor.

Mapping transformations just transform one value into another. For example if we get update with the value of 3 and we have mapping transformation that returns the updating value powered by 2, we'll get 9 as actual updating value.

Parameters:

  • mappingFunction Object

    Function or object with a call method to use as map function.

Returns:

ProAct.Actor:

this

merge

(
  • [...]
)
ProAct.Stream

Creates a new ProAct.Stream instance that merges this with other streams. The new instance will have new value on value from any of the source streams.

 var merged = stream1.merge(stream2);

Here if stream1 emits: 1--2---3----5-----X

and steam2 emits: ----A-----B-----C-----D--X

merged will emit: 1--2A--3--B-5---C-----D--X

Parameters:

  • [...] Object optional

    A list of streams to be set as sources.

Returns:

ProAct.Stream:

A new ProAct.Stream instance with the sources this and all the passed streams.

off

(
  • actions
  • listener
)
ProAct.SubscribableStream

Removes a listener from the passed action.

If this method is called without parameters, all the listeners for all the actions are removed. The listeners are reset using defaultActions.

If the last listener is removed using this method, this stream authomatically unsubscribes from the source, using the function, returned by the subscribe function passed to the constructor.

Examples are:

Removing a listener:

 var listener = function (v) {
   console.log(v);
 };
 stream.on(listener);
 stream.off(listener);

Or for removing all the listeners attached to an stream:

 stream.off();

Or for removing all the listeners of a given type attached to an stream:

 stream.off('error');

Or for removing a listener from different type of actions:

 var listener = function (v) {
   console.log(v);
 };
 stream.on(listener);
 stream.onErr(listener);

 stream.off(['error', 'change'], listener);

Parameters:

  • actions Array | String

    The action/actions to stop listening for. If this parameter is skipped or null/undefined, the actions from defaultActions are used.

    The actions can be skipped and on their place as first parameter to be passed the listener.

  • listener Object

    The listener to detach. If it is skipped, null or undefined all the listeners are removed from this actor.

Returns:

offAll

(
  • listener
)
ProAct.Actor

Removes all notifications listener from the passed action.

Parameters:

  • listener Object

    The listener to detach. If it is skipped, null or undefined all the listeners are removed from this actor.

Returns:

ProAct.Actor:

this

offClose

(
  • listener
)
ProAct.Actor

Removes a close notification listener from the passed action.

This is the same as calling off('close', listener) on an Actor...

Parameters:

  • listener Object

    The listener to detach. If it is skipped, null or undefined all the listeners are removed from this actor.

Returns:

ProAct.Actor:

this

offErr

(
  • listener
)
ProAct.Actor

Removes an error listener from the passed action.

This is the same as calling off('error', listener) on an Actor...

Parameters:

  • listener Object

    The listener to detach. If it is skipped, null or undefined all the listeners are removed from this actor.

Returns:

ProAct.Actor:

this

on

(
  • actions
  • listener
)
ProAct.SubscribableStream

Attaches a new listener to this ProAct.SubscribableStream.

The listener may be function or object that defines a call method. On the first attached listener the subscribe function passed to the constructor will be called. That way the stream will be subscribed to custom data source.

  stream.on(function (v) {
   console.log(v);
  });

  stream.on('error', function (v) {
   console.error(v);
  });

  stream.on({
   call: function (v) {
     console.log(v);
   }
  });

Parameters:

  • actions Array | String

    The action/actions to listen for. If this parameter is skipped or null/undefined, the actions from defaultActions are used.

    The actions can be skipped and on their place as first parameter to be passed the listener.

  • listener Object

    The listener to attach. It must be instance of Function or object with a call method.

Returns:

onAll

(
  • listener
)
ProAct.Actor

Attaches the passed listener to listen to values, errors and the close notification from this ProAct.Actor.

The listener may be function or object that defines a call method.

Parameters:

  • listener Object

    The listener to attach. It must be instance of Function or object with a call method.

Returns:

ProAct.Actor:

this

onClose

(
  • listener
)
ProAct.Actor

Attaches a new close notifcation listener to this ProAct.Actor.

The listener may be function or object that defines a call method.

This is the same as calling on('close', listener) on an Actor...

Parameters:

  • listener Object

    The listener to attach. It must be instance of Function or object with a call method.

Returns:

ProAct.Actor:

this

onErr

(
  • listener
)
ProAct.Actor

Attaches a new error listener to this ProAct.Actor.

The listener may be function or object that defines a call method.

This is the same as calling on('error', listener) on an Actor...

Parameters:

  • listener Object

    The listener to attach. It must be instance of Function or object with a call method.

Returns:

ProAct.Actor:

this

out

(
  • destination
)
ProAct.Actor

The reverse of into - sets this actor as a source to the passed destination actor.

 var sourceActor = <Actor implementation>;
 var actor = <Actor implementation>;

 sourceActor.out(actor);
 actor.on(function (v) {
   console.log(v);
 });

 Now if the any of the source actors is updated, the update will be printed on the console by the actor.

Parameters:

  • destination ProAct.Actor

    The actor to set as source this to.

Returns:

ProAct.Actor:

this

reduce

(
  • initVal
  • accumulationFunction
)
ProAct.Property

Generates a new ProAct.Property containing the state of an accumulations.

The value will be updated with every update coming to this actor.

Parameters:

  • initVal Object

    Initial value for the accumulation. For example '0' for sum.

  • accumulationFunction Object

    The function to accumulate.

Returns:

ProAct.Property:

A ProAct.Property instance observing this with the accumulation applied.

skip

(
  • n
)

Creates a new ProAct.Stream with source - this. It skips the first n updates incoming from this.

source : --3---4---5--4---3---4---5--|-> skip(3): -------------4---3---4---5--|->

Parameters:

  • n Number

    The number of notifications to skip.

skipDuplicates

(
  • comparator
)

Creates a new ProAct.Stream with source - this. It skips dublicating elements, comming one after another.


 source.skipDuplicates();

 // source :
 // --3---5---5--4---3---3---5--|->
 // skipDuplicates:
 // --3---5------4---3-------5--|->

Parameters:

  • comparator Function

    A function used to compare the elements. If nothing is passed it defaults to comparing using ===.

skipWhile

(
  • condition
)

Creates a new ProAct.Stream with source - this. It skips notifications from its source, while a condition is true.


 source.skipWhile(function (v) {
     return v % 2 === 1;
 });

 // source :
 // --3---5---2--4---3---4---5--|->
 // skipWhile:
 // ----------2--4---3---4---5--|->

Parameters:

  • condition Function

    A condition function, which is called for each of the incoming values While it returns true, the elements are skipped, after it returns false for the first time, the current and all the following values are emitted.

take

(
  • limit
)

Creates a new ProAct.Stream with source - this. It takes the first limit updates incoming from this.

source : --3---4---5--4---3---4---5--|-> skip(3): --3---4---5--|->

Parameters:

  • limit Number

    The number of notifications to emit.

takeWhile

(
  • condition
)

Creates a new ProAct.Stream with source - this. It emits notifications from its source, while a condition is true.


 source.takeWhile(function (v) {
     return v % 2 === 1;
 });

 // source :
 // --3---5---2--4---3---4---5--|->
 // takeWhile:
 // --3---5--|->

Parameters:

  • condition Function

    A condition function, which is called for each of the incoming values While it returns true, the elements are emitted, after it returns false for the first time, the stream created by takeWhile closes.

throttle

(
  • delay
)
ProAct.ThrottlingStream

Creates a new ProAct.ThrottlingStream instance having as source this.

Parameters:

  • delay Number

    The time delay to be used for flushing the buffer of the new stream.

toProArray

() ProAct.Array

Creates and returns a ProAct.Array instance, which tracks the changes of this. Uses the current queue for queueing changes.

Returns:

ProAct.Array:

A ProAct.Array instance tracking the changes of this.

toProperty

()

Creates a {{{#crossLink "ProAct.Property"}}{{/crossLink}} instance, dependent on this. Comes from the proact-properties module.

toStream

()

Turns this ProAct.Actor to a ProAct.Stream.

In reality this method creates a new Stream with source this.

transform

(
  • transformation
)
ProAct.Actor protected

Adds a new transformation to the list of transformations of this actor.

A transformation is a function or an object that has a call method defined. This function or call method should have one argument and to return a transformed version of it. If the returned value is {@link ProAct.Actor.BadValue}, the next transformations are skipped and the updating value/event becomes - bad value.

Every value/event that updates this actor will be transformed using the new transformation.

Parameters:

  • transformation Object

    The transformation to add.

Returns:

ProAct.Actor:

this

transformStored

(
  • transformation
  • type
)
ProAct.Actor protected

Inherited from ProAct.Actor but overwritten in src/js/core/actor.js:796

Adds a new transformation to the list of transformations of this actor.

A transformation is a function or an object that has a call method defined. This function or call method should have one argument and to return a transformed version of it. If the returned value is {@link ProAct.Actor.BadValue}, the next transformations are skipped and the updating value/event becomes - bad value.

Every value/event that updates this actor will be transformed using the new transformation.

The idea of this method is that it just calls transform, but it can be overidden from another module.

TODO Maybe transformStored is a bad name

Parameters:

  • transformation Object

    The transformation to add. Can be string - to be retrieved by name.

  • type String

    The type of the transformation, for example mapping.

Returns:

ProAct.Actor:

this

Properties

constructor

ProAct.SubscribableStream final

Reference to the constructor of this object.