Star

Created With

banner

banner

An experimental mash-up of RxJS and Express.

1npm i rxxpress


RxXpress provides a Router class that behaves similar to Express's router, except that instead of accepting callbacks it returns an Observable:


router.ts
1import { Router, respond } from 'rxxpress';

2

3const router = new Router();

4router.get('/').pipe(respond(() => 'Hellow World!')).subscribe(); // --> listen on / route

5

6export default router;

index.ts
1import * as express from 'express'; // @see [Express](http://expressjs.com/)

2import router from './router';

3

4const app = express(); // --> create your express app

5app.use(router.core); // --> register the RxXpress router

6app.listen(3000); // --> start listening on port 3000

Try It!

linkUse Cases

RxXpress allows you to easily treat your end-points like streams. For example, you can conduct rate-limiting on a particular end-point on a per-user basis (e.g. each user can invoke that particular end-point only once per second):

1import { Router, respond } from 'rxxpress';

2import { debounceTime, mergeMap, groupBy } from 'rxjs/operators';

3

4import { authenticate } from './auth';

5

6const router = new Router();

7router.post('/rate-limited-endpoint')

8.pipe(

9 authenticate(), // --> authenticates the request, adding `req._.user_id`

10 groupBy(({req}) => req._.user_id), // --> group incoming request by user

11 mergeMap(group => group.pipe(debounceTime(1000))), // --> debounce each group to allow one each second

12 respond(() => 'Halo meine liebe!'), // --> respond

13)

14.subscribe();


Or you can create an end-point that only responds when both ALICE and BOB request it with their respective keys at the same time (max 5 seconds apart):

1import { Router, timeout } from 'rxxpress';

2import { zip } from 'rxjs'; // @see [RxJS zip](https://www.learnrxjs.io/learn-rxjs/operators/combination/zip)

3import { filter, retry, tap } from 'rxjs/operators';

4

5

6const router = new Router();

7const endpoint = router.get('/endpoint').pipe(timeout(5000)); // --> let the endpoint remain waiting for max 5 seconds

8

9zip( // --> pair corresponding requests

10 endpoint.pipe(filter(({req}) => req.query.key === ALICE_KEY)), // --> ALICE requesting with her key

11 endpoint.pipe(filter(({req}) => req.query.key === BOB_KEY)), // --> BOB requesting with his key

12)

13.pipe(

14 tap(([alice, bob]) => { // --> Respond when both have requested

15 alice.res.send('You guys made it!'); // --> Respond when both have requested

16 bob.res.send('You guys made it!'); // --> Respond when both have requested

17 }),

18 retry() // --> retry when it fails (for example, due to timeout)

19)

20.subscribe();

free_breakfast    TO BE HONEST ...

I actually just made this to be able to do weird stuff. I have no proper idea of where it would be practically useful.


linkInter-Operability

RxXpress is fully inter-operable with Express. You can seamlessly use RxXpress alongside existing Express code, by passing RxXpress routers to Express apps and routers. You can also use the use() operator to integrate existing middlewares, Express routers and other RxXpress routers into an RxXpress router:

1import * as express from 'express'; // @see [Express](http://expressjs.com/)

2import * as bodyParser from 'body-parser'; // @see [body-parser](https://github.com/expressjs/body-parser#readme)

3import { Router, use, respond } from 'rxxpress';

4

5const expressRouter = express.Router(); // --> a typical express router

6expressRouter.get('/express', (req, res) => res.send('From an express router!')); // --> a typical express router

7

8const subRouter = new Router(); // --> an RxXpress sub-router

9router.get('/sub').pipe(respond(() => 'From a sub-route')).subscribe(); // --> an RxXpress sub-router

10

11const router = new Router();

12router.use('/').pipe(

13 use(bodyParser), // --> hook in body-parser middleware

14 use(expressRouter), // --> hook in an express router

15 use(subRouter), // --> hook in a sub-router

16 use((req, res) => res.send('Ok no one respondend to this')) // --> a fallback response

17).subscribe();

18

19const app = express(); // --> setup express, hook our main router, run it

20app.use(router.core); // --> setup express, hook our main router, run it

21app.listen(3000); // --> setup express, hook our main router, run it


linkConvenience Operators

RxXpress also provides some other convenient operators, making building web-servers much more enjoyable:

1import { verify } from 'jsonwebtoken'; // @see [Json Web Token](https://www.npmjs.com/package/jsonwebtoken)

2import { Router, validate, allow, find, json } from 'rxxpress';

3

4import { User } from './user.model';

5import { SECRET } from './secrets';

6

7

8const router = new Router();

9router.get('/user-info/:user_id')

10.pipe(

11 validate(({req}) => { // --> validate that request has a token and the token is valid

12 if (!req.query.token) return false; // --> validate that request has a token and the token is valid

13 try { // --> validate that request has a token and the token is valid

14 req._.payload = verify(req.query.token, SECRET); // --> validate that request has a token and the token is valid

15 return true; // --> validate that request has a token and the token is valid

16 } catch { return false; } // --> validate that request has a token and the token is valid

17 }), // --> validate that request has a token and the token is valid

18 allow(({req}) => req._.payload.user_id === req.params.user_id), // --> allow only if the user owning the token is the requested user

19 find(async ({req}) => { // --> check if requested user exists

20 try { // --> check if requested user exists

21 req._.user = await User.findOne({ _id: req.params.user_id }); // --> check if requested user exists

22 return true; // --> check if requested user exists

23 } catch { return false; } // --> check if requested user exists

24 }), // --> check if requested user exists

25 json(({req}) => req._.user) // --> respond with the JSON object of the user

26)

27.subscribe();

28

29export default router;

Use CasesInter-OperabilityConvenience Operators

Home Router

Operatorschevron_right

Error Handling