use()
respond()
check()
wait()
join()
reject()
next()
timeout()
join()
If you want to conduct some operations in parallel, you could use join()
:
1import { join, allow, find, json } from 'rxxpress';
2
3const endpoint = router.get('/article/:id').pipe(authenticate()); // --> get the endpoint
4
5const findArticle = endpoint.pipe( // --> find the requested article
6 find(async ({req}) => { // --> find the requested article
7 req._.article = await articles.findById(req.params.id); // --> find the requested article
8 return !!req._.article; // --> find the requested article
9 }) // --> find the requested article
10); // --> find the requested article
11
12const checkUserAccess = endpoint.pipe( // --> check if user is subscribed or still have free articles
13 find(async ({req}) => { // --> check if user is subscribed or still have free articles
14 req._.payment = await payment.findOne({ userId: req._.userId }); // --> check if user is subscribed or still have free articles
15 return !!req._.payment; // --> check if user is subscribed or still have free articles
16 }), // --> check if user is subscribed or still have free articles
17 allow(({req}) => req._.payment.subscribed || req._.payment.freeArticles > 0) // --> check if user is subscribed or still have free articles
18); // --> check if user is subscribed or still have free articles
19
20join(findArticle, checkUserAccess).pipe( // --> use `join()` to do two things in parallel ...
21 json(({req}) => req._.article) // --> respond with the article
22)
23.subscribe();
Or if you prefer to write everything in one go:
1import { join, allow, find, json } from 'rxxpress';
2
3
4router.get('/article/:id')
5.pipe(
6 authenticate(), // --> authenticate the user
7 endpoint => join( // --> use `join()` to do two things in parallel ...
8 endpoint.pipe( // --> find the requested article
9 find(async ({req}) => { // --> find the requested article
10 req._.article = await articles.findById(req.params.id); // --> find the requested article
11 return !!req._.article; // --> find the requested article
12 }) // --> find the requested article
13 ), // --> find the requested article
14 endpoint.pipe( // --> check if user is subscribed or still have free articles
15 find(async ({req}) => { // --> check if user is subscribed or still have free articles
16 req._.payment = await payment.findOne({ userId: req._.userId }); // --> check if user is subscribed or still have free articles
17 return !!req._.payment; // --> check if user is subscribed or still have free articles
18 }), // --> check if user is subscribed or still have free articles
19 allow(({req}) => req._.payment.subscribed || req._.payment.freeArticles > 0), // --> check if user is subscribed or still have free articles
20 ) // --> check if user is subscribed or still have free articles
21 ), // --> both things done
22 json(({req}) => req._.article) // --> respond with the article
23)
24.subscribe();
By default, join()
will ingore requests that are responded to. It will also ignore tracked
packets at the moment they are responded to.
You can disable this behavior by passing { safe: false }
option:
1join({ safe: false }, someTask$, someOtherTask$)
2.pipe(
3 // ...
4).subscribe();
Be careful that when safe mode is turned off, join()
will keep track of all incoming packets
until all of the tasks are done with those packets, which in some cases might lead to
memory leaks.
join()
will start tracking each incoming packet (which it receives when the first task is finished)
IF they are not responded to. It will keep tracking the packet until all other tasks are also
finished, and then it will pass the packet down the observable sequence.
Unless safe mode is turned off, join()
will ignore packets that are responded to, not passing
them down the observable sequence.