TLDR, A callback is a function we pass to another, source function, in order to give the source function a way to talk back to us (call us back). Callbag is a standard for callbacks that enables working with streams. A callbag is any callback that follows that standard.
Take this code:
1linkconsole.log(source());
When we run this code:
source()
is called. We wait for its output.source()
.What if source()
takes some time to produce data? Instead of waiting for it,
we could tell it to "give us data" (by calling it), and give it a way to call us back
when it has some data:
1linksource(data => console.log(data));
Here, data => console.log(data)
is a callback, as its the method we provide
source()
to call us back when it has data. source
is basically a source of data, and we can communicate
with it as follows:
1link=> [GREETING] "Give me data when you have it" # --> us to source
2link<= [DATA] "Here is some data" # --> source to us
Now what if our source (e.g. source()
)
produces an indeterminate number of data entries? For example, our source might be
a function responsible for calculating the position of the cursor on X-axis, or it might be a function who is supposed
to give us messages coming from a web-socket.
👉 A source that produces an indeterminate number of data entries at indeterminate time intervals is called a stream.
In this case our simplistic callback (or the communication scheme) will be rather limiting:
None of these are available under our previous communication scheme, and we need an expanded communication scheme to be able to work properly with streams:
1link=> [GREETING] "Give me data whenever you have some" # --> us to source
2link<= [GREETING] "I will give you data whenever I have some. Tell me when to stop" # --> source to us
3link
4link<= [DATA] "Here is some data" # --> source to us
5link=> [DATA] "Give me more data" # --> us to source, when it needs to be pulled
6link
7link=> [END] "Stop sending more data" # --> us to source
8link<= [END] "I won't be sending more data (because of X)" # --> source to us
To accomodate it we can have our callback accept two arguments instead of one: The first argument denoting the type of the message, the second one denoting the content (or payload):
1linksource((type, payload) => {
2link if (type === GREET) console.log('Listening ...');
3link if (type === DATA) console.log(payload);
4link if (type === END) console.log('Source ended!');
5link});
Callbag is just a term to denote any callback (or function) that looks like
the callback we just designed for talking with source()
. In other words, any function with the following
signature is a callbag:
1link(type: GREET | DATA | END, payload?: any) => void;
👉 In the callbag spec, message types are denoted by numbers:
0
stands forGREET
(also calledSTART
)1
stands forDATA
2
stands forEND
.
Now lets look at the above example again:
1linksource((type, payload) => {
2link if (type === GREET) console.log('Listening ...');
3link if (type === DATA) console.log(payload);
4link if (type === END) console.log('Source ended!');
5link});
Here, source
is NOT a callbag, since it only accepts one argument. We can fix that by making source
accept two
arguments as well, in which case our code would change like this:
1linksource(GREET, (type, payload) => {
2link if (type === GREET) console.log('Listeing ...');
3link if (type === DATA) console.log(payload);
4link if (type === END) console.log('Source ended!');
5link});
Now what if we want to receive a limited number of data entries (say 5) from source
?
We greeted source
by calling it with GREET
alongside a callbag. According to our communication scheme,
source
needs to also greet us by sending us GREET
alongside a way to tell it to stop, i.e. another callbag:
1linklet talkback;
2linklet N = 0;
3link
4linksource(GREET, (type, payload) => {
5link if (type === GREET) {
6link talkback = payload; // --> when type is GREET, payload is a callbag
7link console.log('Listening ...');
8link }
9link
10link if (type === DATA) {
11link console.log(payload); // --> when type is DATA, payload is the data sent by source
12link N++;
13link if (N >= 5) talkback(END); // --> telling the source to stop
14link }
15link
16link if (type === END) console.log('Source ended!');
17link});
👉 So whenever someone greets someone (us greeting the source, the source greeting us), the payload should be another callbag, acting as a way to talk back to the greeter. In this example,
talkback
plays that role.
So far we've just worked with source()
as a stream, without looking inside it. But how would a callbag source
actually look like? To see that, lets build a simple callbag source that outputs an increasing number every second:
1linkconst source = (type, payload) => {
2link if (type === GREET) { // --> everything starts with a GREET
3link let talkback = payload; // --> when greeted, the payload is a way to talk back to the greeter
4link let i = 0;
5link
6link setInterval(() => talkback(DATA, i++), 1000); // --> lets tell the greeter about our increasing number every second
7link }
8link}
☝️ Here, we are not giving the caller any method of telling the source to stop sending more data. This is because we are not following the communication protocol properly: the source MUST greet back and provide a way of talking back (i.e. another callbg):
1linkconst source = (type, payload) => {
2link if (type === GREET) {
3link let talkback = payload;
4link let i = 0;
5link
6link const interval = setInterval(() => talkback(DATA, i++), 1000);
7link
8link talkback(GREET, (_type, _payload) => {
9link if (_type === END) clearInterval(interval);
10link });
11link }
12link}
In practice, you rarely need to greet sources or handle talkbacks manually. Utilities such as those provided in callbag-common take care of that for you:
1linkimport { interval, pipe, map, filter, subscribe } from 'callbag-common'
2link
3linkconst source = interval(1000) // --> emits every second
4linkpipe(
5link source,
6link map(x => x * 3), // --> multiply by 3
7link filter(x => x % 2), // --> only allow odd numbers
8link subscribe(console.log) // --> log any incoming number
9link)
10link
3
9
15
21
27
The workflow is typically like this:
👉 You create some callbag sources, using source factories:
1linkimport { interval } from 'callbag-common';
2link
3linkconst source = interval(1000);
👉 You then transform these sources using operators.
For example,
you might want to multiply each received number by 3
:
1linkimport { interval, map } from 'callbag-common';
2link
3linklet source = interval(1000);
4linksource = map(n => n * 3)(source);
Or you might want to only pick odd numbers:
1linkimport { interval, map, filter } from 'callbag-common';
2link
3linklet source = interval(1000);
4linksource = map(n => n * 3)(source);
5linksource = filter(n => n % 2)(source);
👉 Finally, you start listening to your transformed source by subscribing to it:
1linkimport { interval, map, filter, subscribe } from 'callbag-common';
2link
3linklet source = interval(1000);
4linksource = map(n => n * 3)(source);
5linksource = filter(n => n % 2)(source);
6link
7linksubscribe(console.log)(source);
👉 It is also highly recommended to use the
pipe()
utility for transforming your sources and subscribing to them, as it makes the code much easier to read:
1linkimport { interval, map, filter, subscribe, pipe } from 'callbag-common';2link3linkpipe(4link interval(1000),5link map(n => n * 3),6link filter(n => n % 2),7link subscribe(console.log)8link)