Star

Created With

linkjoin()

If you want to conduct some operations in parallel, you could use join():

1import { join, allow, find, json } from 'rxxpress';

2

3const endpoint = router.get('/article/:id').pipe(authenticate()); // --> get the endpoint

4

5const findArticle = endpoint.pipe( // --> find the requested article

6 find(async ({req}) => { // --> find the requested article

7 req._.article = await articles.findById(req.params.id); // --> find the requested article

8 return !!req._.article; // --> find the requested article

9 }) // --> find the requested article

10); // --> find the requested article

11

12const checkUserAccess = endpoint.pipe( // --> check if user is subscribed or still have free articles

13 find(async ({req}) => { // --> check if user is subscribed or still have free articles

14 req._.payment = await payment.findOne({ userId: req._.userId }); // --> check if user is subscribed or still have free articles

15 return !!req._.payment; // --> check if user is subscribed or still have free articles

16 }), // --> check if user is subscribed or still have free articles

17 allow(({req}) => req._.payment.subscribed || req._.payment.freeArticles > 0) // --> check if user is subscribed or still have free articles

18); // --> check if user is subscribed or still have free articles

19

20join(findArticle, checkUserAccess).pipe( // --> use `join()` to do two things in parallel ...

21 json(({req}) => req._.article) // --> respond with the article

22)

23.subscribe();

Or if you prefer to write everything in one go:

1import { join, allow, find, json } from 'rxxpress';

2

3

4router.get('/article/:id')

5.pipe(

6 authenticate(), // --> authenticate the user

7 endpoint => join( // --> use `join()` to do two things in parallel ...

8 endpoint.pipe( // --> find the requested article

9 find(async ({req}) => { // --> find the requested article

10 req._.article = await articles.findById(req.params.id); // --> find the requested article

11 return !!req._.article; // --> find the requested article

12 }) // --> find the requested article

13 ), // --> find the requested article

14 endpoint.pipe( // --> check if user is subscribed or still have free articles

15 find(async ({req}) => { // --> check if user is subscribed or still have free articles

16 req._.payment = await payment.findOne({ userId: req._.userId }); // --> check if user is subscribed or still have free articles

17 return !!req._.payment; // --> check if user is subscribed or still have free articles

18 }), // --> check if user is subscribed or still have free articles

19 allow(({req}) => req._.payment.subscribed || req._.payment.freeArticles > 0), // --> check if user is subscribed or still have free articles

20 ) // --> check if user is subscribed or still have free articles

21 ), // --> both things done

22 json(({req}) => req._.article) // --> respond with the article

23)

24.subscribe();


linkSafety

By default, join() will ingore requests that are responded to. It will also ignore tracked packets at the moment they are responded to.

You can disable this behavior by passing { safe: false } option:

1join({ safe: false }, someTask$, someOtherTask$)

2.pipe(

3 // ...

4).subscribe();

Be careful that when safe mode is turned off, join() will keep track of all incoming packets until all of the tasks are done with those packets, which in some cases might lead to memory leaks.


linkPacket Flow

join() will start tracking each incoming packet (which it receives when the first task is finished) IF they are not responded to. It will keep tracking the packet until all other tasks are also finished, and then it will pass the packet down the observable sequence.

Unless safe mode is turned off, join() will ignore packets that are responded to, not passing them down the observable sequence.

join()SafetyPacket Flow

Home Router

Operatorschevron_right

Error Handling