ProAct.SubscribableStream Class
Constructs a ProAct.SubscribableStream
. This is a Stream
that has a custom subscribe
function, used to subscribe to a source.
This can be used to stream sources like browser events. The stream is lazy, when there are no listeners to it, it is not subscribed to the source, on the first listener it is subscribed, when every listener is unsubscibed, it is unsubscribed.
ProAct.SubscribableStream
is part of the proact-streams
module of ProAct.js.
Constructor
ProAct.SubscribableStream
-
subscribe
-
queueName
-
source
-
transforms
Parameters:
-
subscribe
FunctionA function used to subscribe to a source, when the first listener to this stream is attached.
-
queueName
StringThe name of the queue all the updates should be pushed to.
If this parameter is null/undefined the default queue of flow is used.
If this parameter is not a string it is used as the source.
-
source
ProAct.ActorA default source of the stream, can be null.
If this is the only one passed argument and it is a number - it becomes the size of the buffer.
-
transforms
ArrayA list of transformation to be used on all incoming chages.
If the arguments passed are two and this is a number - it becomes the size of the buffer.
Item Index
Methods
- accumulate
- accumulation
- afterInit
- beforeDestroy
- bufferit
- canClose
- close
- defaultActions
- defaultListeners
- defer
- delay
- destroy
- diff
- doClose
- doInit
- filter
- filtering
- flatMap
- flatMapFirst
- flatMapLast
- flatMapLimited
- fromLambda
- init
- into
- makeCloseListener
- makeErrListener
- makeEvent
- makeListener
- map
- mapping
- merge
- off
- offAll
- offClose
- offErr
- on
- onAll
- onClose
- onErr
- out
- reduce
- skip
- skipDuplicates
- skipWhile
- take
- takeWhile
- throttle
- toProArray
- toProperty
- toStream
- transform
- transformStored
Properties
Methods
accumulate
-
initVal
-
accumulationFunction
Creates a new ProAct.Stream
instance with source this and accumulation
the passed accumulation function.
var acc = stream.accumulate(0, function (p, v) {
return p + v;
});
acc.on(console.log); // sums
Parameters:
Returns:
A new ProAct.Stream
instance with the accumulation applied.
accumulation
-
initVal
-
accumulationFunction
Adds an accumulation transformation to this actor.
Accumulation is used to compute a value based on the previous one.
Parameters:
Returns:
this
afterInit
()
protected
Called automatically after initialization of this actor.
By default it changes the state of this to ready.
It can be overridden to define more complex initialization logic.
beforeDestroy
()
protected
Called immediately before destruction.
The idea is to be implemented by extenders to free additional resources on destroy.
bufferit
-
size
Creates a new ProAct.SizeBufferedStream instance having as source this.
Parameters:
-
size
NumberThe size of the buffer of the new ProAct.SizeBufferedStream.
Returns:
A ProAct.SizeBufferedStream instance.
canClose
()
protected
Checks if this can be closed.
Uses the number of the active sources to decide if this stream
is ready to be closed.
If the active sources are zero - it can.
close
()
ProAct.Actor
Closes this actor => it state becomes closed.
This sends a close
event to all the subscribers to closing.
After closing the actor it can't emit events anymore.
Example:
var actor = new ProAct.Actor();
actor.onClose(function () {
console.log('Done!');
});
actor.close(); // We will see 'Done!' on the console output.
Returns:
This instance - can be chained.
defaultActions
()
Array | String
protected
A list of actions or action to be used when no action is passed for the methods working with actions.
defaultListeners
()
Object
protected
Generates the initial listeners object. It can be overridden for alternative listeners collections. It is used for resetting all the listeners too.
The default types of listeners are:
{
change: [],
error: [],
close: []
}
Returns:
A map containing the default listeners collections.
defer
-
event
-
listener
Defers a ProAct.Actor
listener.
For streams this means pushing it to active flow using push. If the listener is object with 'property' field, it is done using defer. That way the reactive environment is updated only once, but the streams are not part of it.
Parameters:
Returns:
this
delay
-
delay
Creates a new ProAct.DelayedStream instance having as source this.
Parameters:
-
delay
NumberThe time delay to be used for flushing the buffer of the new stream.
Returns:
A ProAct.DelayedStream instance.
destroy
()
diff
-
seed
-
differ
Creates a new ProAct.Stream with source - this
.
It emits the difference between the last update and the current incomming update from the source.
source.diff(0, function(prev, v) {
return v - prev;
});
// source :
// --3---5------6---|->
// diff:
// --3---2------1---|->
Parameters:
-
seed
ObjectA value to pass the
differ
function as previous on the inital notification from the source. -
differ
FunctionCreates the difference, receives two params - the previous update and the current. It can be skipped - the default
differ
function returns array with two elements - the previous and the curren updates.
doClose
()
protected
This method is called when a close
event is pushed to this Actor
.
It removes all the subscriptions to the Actor
and sets its
state to closed.
Do not call this method; it is private!
doInit
()
protected
Allocating of resources or initializing is done here.
Empty by default.
filter
-
filteringFunction
Creates a new ProAct.Stream
instance with source this and filtering
the passed filtering function.
var filtered = stream.filter(function (v) {
return v % 2 === 1;
});
filtered.on(function (v) {
console.log(v); // odds
});
Parameters:
-
filteringFunction
ObjectThe filtering function or object with a call method, should return boolean.
Returns:
A new ProAct.Stream
instance with the filtering applied.
filtering
-
filteringFunction
Adds a filtering transformation to this actor.
Filtering can be used to filter the incoming update values. For example you can filter by only odd numbers as update values.
Parameters:
-
filteringFunction
ObjectThe filtering function or object with a call method, should return boolean.
Returns:
this
flatMap
-
mapper
Creates a new ProAct.Stream with source - this
.
For every update incomming from the source, a new Actor
is created using the mapper
function. All the updates, emitted by the streams, returned by the mapper
are emitted by the
Actor
created by flatMap
source.flatMap(function (v) {
return ProAct.seq(100, [v, v +1 ]);
});
// source:
// -1---2----4-----3-----2-----1---->
// flatMap
// -1-2-2-3--4-5---3-4---2-3---1-2-->
Parameters:
-
mapper
FunctionA function that returns an
ProAct.Actor
using the incomming notification.
flatMapFirst
-
mapper
Creates a new ProAct.Stream with source - this
.
For every update comming from this
, a new ProAct.Actor
is created using the logic
passed through mapper
. The first such Actor
becomes the source of the Actor
, returned by this
method. When it finishes, if a new Actor
is emitted, it becomes the source.
source.flatMapLast(function (v) {
return ProAct.seq(100, [v, v + 1, v + 2, v + 3]);
});
// source:
// -1---2----4-----3-----2-----1----|->
// flatMapFirst
// -1-2-3-4--4-5-6-7-----2-3-4-5-|->
Parameters:
-
mapper
FunctionA function that returns an
ProAct.Actor
using the incomming notification.
flatMapLast
-
mapper
Creates a new ProAct.Stream with source - this
.
For every update comming from this
, a new ProAct.Actor
is created using the logic
passed through mapper
. This new Actor
becomes the current source of the ProAct.Stream
,
returned by this method. The next update will create a new source, which will become
the current one and replace the old one. This is the same as flatMapLimited,
with limit
of 1
.
source.flatMapLast(function (v) {
return ProAct.seq(100, [v, v + 1, v + 2, v + 3]);
});
// source:
// -1---2----4-----3-----2-----1----|->
// flatMapLast
// -1-2-2-3-44-5-6-3-4-5-2-3-4-1-2-3-4-|->
Parameters:
-
mapper
FunctionA function that returns an
ProAct.Actor
using the incomming notification.
flatMapLimited
-
mapper
-
limit
Creates a new ProAct.Stream with source - this
.
For every update incomming from the source, a new Actor
is created using the mapper
function. ALl the updates, emitted by the streams, returned by the mapper
are emitted by the
Actor
created by flatMap
. The number of the currently active sources is limited by the
passed limit
. All the sources created after the limit is reached are queued and reused as sources later.
fromLambda
-
lambda
Creates a new ProAct.Stream with source - this
.
The logic of the stream is implemented through the passed lambda
parameter.
TODO The first parameter of the lambda should be called something else and not stream.
source.fromLambda(function (stream, notification) {
stream.trigger(notification);
});
// Just forwards notifications..
Parameters:
-
lambda
FunctionA function, with two arguments - the returned by this function stream and notification. For every update comming from
this
, the lambda is called with the update and the stream in it. Has thetrigger
,triggerErr
andtriggerClose
methods.
init
()
into
-
[...]
Links source actors into this ProAct.Stream
. This means that this stream
is listening for changes from the sources.
The streams count their sources and when the sources are zero, they become inactive.
var stream1 = ProAct.stream();
var stream2 = ProAct.stream();
var stream = ProAct.stream();
stream.into(stream1, stream2);
stream.on(function (v) {
console.log(v);
});
Now if the any of the source streams is emits, the notification will be printed on the output.
Parameters:
-
[...]
#crossLink "ProAct.Actor" optionalZero or more source }{{/crossLink}}s to set as sources.
Returns:
this
makeCloseListener
()
Object
protected
Creates the closing listener of this stream.
Pushes a closing notification into the stream by default.
Returns:
The closing listener of this stream.
makeErrListener
()
Object
protected
Creates the error listener of this stream.
The listener pushes the incoming event into this Stream
by default.
Returns:
The error listener of this stream.
makeEvent
-
source
Creates the event to be send to the listeners on update.
Streams don't create new events by default, the event is the source.
Parameters:
-
source
ProAct.EventThe source event of the event. It can be null
Returns:
The event.
makeListener
()
Object
protected
Creates the listener of this stream.
Returns:
The listener of this stream.
map
-
mappingFunction
Creates a new ProAct.Stream
instance with source this and mapping
the passed mapping function.
var mapped = stream.map(function (v) {
return v * v;
});
mapped.on(function (v) {
console.log(v); // squares
});
Parameters:
-
mappingFunction
ObjectFunction or object with a call method to use as map function.
Returns:
A new ProAct.Stream
instance with the mapping applied.
mapping
-
mappingFunction
Adds a mapping transformation to this actor.
Mapping transformations just transform one value into another. For example if we get update with the value of 3 and we have mapping transformation that returns the updating value powered by 2, we'll get 9 as actual updating value.
Parameters:
-
mappingFunction
ObjectFunction or object with a call method to use as map function.
Returns:
this
merge
-
[...]
Creates a new ProAct.Stream
instance that merges this with other streams.
The new instance will have new value on value from any of the source streams.
var merged = stream1.merge(stream2);
Here if stream1
emits:
1--2---3----5-----X
and steam2
emits:
----A-----B-----C-----D--X
merged
will emit:
1--2A--3--B-5---C-----D--X
Parameters:
-
[...]
Object optionalA list of streams to be set as sources.
Returns:
A new ProAct.Stream
instance with the sources this and all the passed streams.
off
-
actions
-
listener
Removes a listener from the passed action.
If this method is called without parameters, all the listeners for all the actions are removed. The listeners are reset using defaultActions.
If the last listener is removed using this method, this stream
authomatically unsubscribes
from the source, using the function, returned by the subscribe
function passed to the constructor.
Examples are:
Removing a listener:
var listener = function (v) {
console.log(v);
};
stream.on(listener);
stream.off(listener);
Or for removing all the listeners attached to an stream:
stream.off();
Or for removing all the listeners of a given type attached to an stream:
stream.off('error');
Or for removing a listener from different type of actions:
var listener = function (v) {
console.log(v);
};
stream.on(listener);
stream.onErr(listener);
stream.off(['error', 'change'], listener);
Parameters:
-
actions
Array | StringThe action/actions to stop listening for. If this parameter is skipped or null/undefined, the actions from defaultActions are used.
The actions can be skipped and on their place as first parameter to be passed the listener.
-
listener
ObjectThe listener to detach. If it is skipped, null or undefined all the listeners are removed from this actor.
Returns:
offAll
-
listener
Removes all notifications listener from the passed action.
Parameters:
-
listener
ObjectThe listener to detach. If it is skipped, null or undefined all the listeners are removed from this actor.
Returns:
this
offClose
-
listener
Removes a close notification listener from the passed action.
This is the same as calling off('close', listener)
on an Actor
...
Parameters:
-
listener
ObjectThe listener to detach. If it is skipped, null or undefined all the listeners are removed from this actor.
Returns:
this
offErr
-
listener
Removes an error listener from the passed action.
This is the same as calling off('error', listener)
on an Actor
...
Parameters:
-
listener
ObjectThe listener to detach. If it is skipped, null or undefined all the listeners are removed from this actor.
Returns:
this
on
-
actions
-
listener
Attaches a new listener to this ProAct.SubscribableStream
.
The listener may be function or object that defines a call method.
On the first attached listener the subscribe
function passed to the constructor will be called.
That way the stream will be subscribed to custom data source.
stream.on(function (v) {
console.log(v);
});
stream.on('error', function (v) {
console.error(v);
});
stream.on({
call: function (v) {
console.log(v);
}
});
Parameters:
-
actions
Array | StringThe action/actions to listen for. If this parameter is skipped or null/undefined, the actions from defaultActions are used.
The actions can be skipped and on their place as first parameter to be passed the listener.
-
listener
ObjectThe listener to attach. It must be instance of Function or object with a call method.
Returns:
onAll
-
listener
Attaches the passed listener to listen to values, errors and the close notification from this ProAct.Actor
.
The listener may be function or object that defines a call method.
Parameters:
-
listener
ObjectThe listener to attach. It must be instance of Function or object with a call method.
Returns:
this
onClose
-
listener
Attaches a new close notifcation listener to this ProAct.Actor
.
The listener may be function or object that defines a call method.
This is the same as calling on('close', listener)
on an Actor
...
Parameters:
-
listener
ObjectThe listener to attach. It must be instance of Function or object with a call method.
Returns:
this
onErr
-
listener
Attaches a new error listener to this ProAct.Actor.
The listener may be function or object that defines a call method.
This is the same as calling on('error', listener)
on an Actor
...
Parameters:
-
listener
ObjectThe listener to attach. It must be instance of Function or object with a call method.
Returns:
this
out
-
destination
The reverse of into - sets this actor as a source to the passed destination actor.
var sourceActor = <Actor implementation>;
var actor = <Actor implementation>;
sourceActor.out(actor);
actor.on(function (v) {
console.log(v);
});
Now if the any of the source actors is updated, the update will be printed on the console by the actor
.
Parameters:
-
destination
ProAct.ActorThe actor to set as source this to.
Returns:
this
reduce
-
initVal
-
accumulationFunction
Generates a new ProAct.Property containing the state of an accumulations.
The value will be updated with every update coming to this actor.
Parameters:
Returns:
A ProAct.Property instance observing this with the accumulation applied.
skip
-
n
Creates a new ProAct.Stream with source - this
.
It skips the first n
updates incoming from this
.
source : --3---4---5--4---3---4---5--|-> skip(3): -------------4---3---4---5--|->
Parameters:
-
n
NumberThe number of notifications to skip.
skipDuplicates
-
comparator
Creates a new ProAct.Stream with source - this
.
It skips dublicating elements, comming one after another.
source.skipDuplicates();
// source :
// --3---5---5--4---3---3---5--|->
// skipDuplicates:
// --3---5------4---3-------5--|->
Parameters:
-
comparator
FunctionA function used to compare the elements. If nothing is passed it defaults to comparing using
===
.
skipWhile
-
condition
Creates a new ProAct.Stream with source - this
.
It skips notifications from its source, while a condition is true.
source.skipWhile(function (v) {
return v % 2 === 1;
});
// source :
// --3---5---2--4---3---4---5--|->
// skipWhile:
// ----------2--4---3---4---5--|->
Parameters:
-
condition
FunctionA condition function, which is called for each of the incoming values While it returns true, the elements are skipped, after it returns false for the first time, the current and all the following values are emitted.
take
-
limit
Creates a new ProAct.Stream with source - this
.
It takes the first limit
updates incoming from this
.
source : --3---4---5--4---3---4---5--|-> skip(3): --3---4---5--|->
Parameters:
-
limit
NumberThe number of notifications to emit.
takeWhile
-
condition
Creates a new ProAct.Stream with source - this
.
It emits notifications from its source, while a condition is true.
source.takeWhile(function (v) {
return v % 2 === 1;
});
// source :
// --3---5---2--4---3---4---5--|->
// takeWhile:
// --3---5--|->
Parameters:
-
condition
FunctionA condition function, which is called for each of the incoming values While it returns true, the elements are emitted, after it returns false for the first time, the stream created by takeWhile closes.
throttle
-
delay
Creates a new ProAct.ThrottlingStream instance having as source this.
Parameters:
-
delay
NumberThe time delay to be used for flushing the buffer of the new stream.
Returns:
A ProAct.ThrottlingStream instance.
toProArray
()
ProAct.Array
Creates and returns a ProAct.Array instance, which tracks the changes of this. Uses the current queue for queueing changes.
Returns:
A ProAct.Array
instance tracking the changes of this
.
toProperty
()
Creates a {{{#crossLink "ProAct.Property"}}{{/crossLink}} instance,
dependent on this.
Comes from the proact-properties
module.
toStream
()
Turns this ProAct.Actor
to a ProAct.Stream.
In reality this method creates a new Stream
with source this.
transform
-
transformation
Adds a new transformation to the list of transformations of this actor.
A transformation is a function or an object that has a call method defined. This function or call method should have one argument and to return a transformed version of it. If the returned value is {@link ProAct.Actor.BadValue}, the next transformations are skipped and the updating value/event becomes - bad value.
Every value/event that updates this actor will be transformed using the new transformation.
Parameters:
-
transformation
ObjectThe transformation to add.
Returns:
this
transformStored
-
transformation
-
type
Adds a new transformation to the list of transformations of this actor.
A transformation is a function or an object that has a call method defined. This function or call method should have one argument and to return a transformed version of it. If the returned value is {@link ProAct.Actor.BadValue}, the next transformations are skipped and the updating value/event becomes - bad value.
Every value/event that updates this actor will be transformed using the new transformation.
The idea of this method is that it just calls transform, but it can be overidden from another module.
TODO Maybe transformStored is a bad name
Parameters:
Returns:
this