use()
respond()
check()
wait()
join()
reject()
next()
timeout()
An experimental mash-up of RxJS and Express.
1npm i rxxpress
RxXpress provides a Router
class that behaves similar to Express's router,
except that instead of accepting callbacks it returns an Observable
:
1import { Router, respond } from 'rxxpress';
2
3const router = new Router();
4router.get('/').pipe(respond(() => 'Hellow World!')).subscribe(); // --> listen on / route
5
6export default router;
1import * as express from 'express'; // @see [Express](http://expressjs.com/)
2import router from './router';
3
4const app = express(); // --> create your express app
5app.use(router.core); // --> register the RxXpress router
6app.listen(3000); // --> start listening on port 3000
RxXpress allows you to easily treat your end-points like streams. For example, you can conduct rate-limiting on a particular end-point on a per-user basis (e.g. each user can invoke that particular end-point only once per second):
1import { Router, respond } from 'rxxpress';
2import { debounceTime, mergeMap, groupBy } from 'rxjs/operators';
3
4import { authenticate } from './auth';
5
6const router = new Router();
7router.post('/rate-limited-endpoint')
8.pipe(
9 authenticate(), // --> authenticates the request, adding `req._.user_id`
10 groupBy(({req}) => req._.user_id), // --> group incoming request by user
11 mergeMap(group => group.pipe(debounceTime(1000))), // --> debounce each group to allow one each second
12 respond(() => 'Halo meine liebe!'), // --> respond
13)
14.subscribe();
Or you can create an end-point that only responds when both ALICE and BOB request it with their respective keys at the same time (max 5 seconds apart):
1import { Router, timeout } from 'rxxpress';
2import { zip } from 'rxjs'; // @see [RxJS zip](https://www.learnrxjs.io/learn-rxjs/operators/combination/zip)
3import { filter, retry, tap } from 'rxjs/operators';
4
5
6const router = new Router();
7const endpoint = router.get('/endpoint').pipe(timeout(5000)); // --> let the endpoint remain waiting for max 5 seconds
8
9zip( // --> pair corresponding requests
10 endpoint.pipe(filter(({req}) => req.query.key === ALICE_KEY)), // --> ALICE requesting with her key
11 endpoint.pipe(filter(({req}) => req.query.key === BOB_KEY)), // --> BOB requesting with his key
12)
13.pipe(
14 tap(([alice, bob]) => { // --> Respond when both have requested
15 alice.res.send('You guys made it!'); // --> Respond when both have requested
16 bob.res.send('You guys made it!'); // --> Respond when both have requested
17 }),
18 retry() // --> retry when it fails (for example, due to timeout)
19)
20.subscribe();
free_breakfast TO BE HONEST ...
I actually just made this to be able to do weird stuff. I have no proper idea of where it would be practically useful.
RxXpress is fully inter-operable with Express.
You can seamlessly use RxXpress alongside existing Express code, by passing
RxXpress routers to Express apps and routers.
You can also use the use()
operator to integrate existing middlewares, Express routers and
other RxXpress routers into an RxXpress router:
1import * as express from 'express'; // @see [Express](http://expressjs.com/)
2import * as bodyParser from 'body-parser'; // @see [body-parser](https://github.com/expressjs/body-parser#readme)
3import { Router, use, respond } from 'rxxpress';
4
5const expressRouter = express.Router(); // --> a typical express router
6expressRouter.get('/express', (req, res) => res.send('From an express router!')); // --> a typical express router
7
8const subRouter = new Router(); // --> an RxXpress sub-router
9router.get('/sub').pipe(respond(() => 'From a sub-route')).subscribe(); // --> an RxXpress sub-router
10
11const router = new Router();
12router.use('/').pipe(
13 use(bodyParser), // --> hook in body-parser middleware
14 use(expressRouter), // --> hook in an express router
15 use(subRouter), // --> hook in a sub-router
16 use((req, res) => res.send('Ok no one respondend to this')) // --> a fallback response
17).subscribe();
18
19const app = express(); // --> setup express, hook our main router, run it
20app.use(router.core); // --> setup express, hook our main router, run it
21app.listen(3000); // --> setup express, hook our main router, run it
RxXpress also provides some other convenient operators, making building web-servers much more enjoyable:
1import { verify } from 'jsonwebtoken'; // @see [Json Web Token](https://www.npmjs.com/package/jsonwebtoken)
2import { Router, validate, allow, find, json } from 'rxxpress';
3
4import { User } from './user.model';
5import { SECRET } from './secrets';
6
7
8const router = new Router();
9router.get('/user-info/:user_id')
10.pipe(
11 validate(({req}) => { // --> validate that request has a token and the token is valid
12 if (!req.query.token) return false; // --> validate that request has a token and the token is valid
13 try { // --> validate that request has a token and the token is valid
14 req._.payload = verify(req.query.token, SECRET); // --> validate that request has a token and the token is valid
15 return true; // --> validate that request has a token and the token is valid
16 } catch { return false; } // --> validate that request has a token and the token is valid
17 }), // --> validate that request has a token and the token is valid
18 allow(({req}) => req._.payload.user_id === req.params.user_id), // --> allow only if the user owning the token is the requested user
19 find(async ({req}) => { // --> check if requested user exists
20 try { // --> check if requested user exists
21 req._.user = await User.findOne({ _id: req.params.user_id }); // --> check if requested user exists
22 return true; // --> check if requested user exists
23 } catch { return false; } // --> check if requested user exists
24 }), // --> check if requested user exists
25 json(({req}) => req._.user) // --> respond with the JSON object of the user
26)
27.subscribe();
28
29export default router;