Create an arbitrary line using line()
function:
1import { line } from 'rxline';
2
3line([1, 2, 3, 4]); // --> a line from an array
1async function whatever() {
2 await stuff();
3 return someArray;
4}
5
6line(whatever()); // --> a line from a promised array
1import { interval } from 'rxjs';
2import { take } from 'rxjs/operators';
3
4line(interval(1000).pipe(take(23))); // --> a line from an `Observable`
A line is basically some content and a transform that is to be applied on the line's content. Applying the transform to line's content is called processing the line.
You can access the content of a line via its .content$
property, which returns an Observable
:
1line([1, 2, 3, 4]).content$.subscribe(console.log);
2
3// Result:
4// 1, 2, 3, 4
1line([1, 2, 3, 4]).pipe(x => x * 2);
This will yield a new line who has the same content but a different transform.
Incoming transforms are combined with previous ones, so you can chain .pipe()
multiple times:
1line([1, 2, 3, 4]) // --> transform is x => x
2 .pipe(x => x * 2) // --> transform is x => x * 2
3 .pipe(x => x + 3); // --> transform is x => (x * 2) + 3
Or provide multiple arguments (which is equivalent to chaining):
1line([1, 2, 3, 4]) // --> transform is x => x
2 .pipe(x => x * 2, // --> transform is x => x * 2
3 x => x + 3); // --> transform is x => (x * 2) + 3
You can also use .pipe()
method to add asynchronous transforms:
1line([1, 2, 3, 4])
2 .pipe(async x => {
3 await something();
4 return x * 3;
5 });
Or observable functions:
1import { of } from 'rxjs';
2import { delay } from 'rxjs/operators';
3import { line } from 'rxline';
4
5line([1, 2, 3, 4]).pipe(x => of(x).pipe(delay(100)));
You can use .collect()
method to apply the final transform
and collect the results as an array
(more on .collect()
's details later):
1line([1, 2, 3, 4])
2 .pipe(x => x * 2)
3 .pipe(x => x + 3)
4 .collect(console.log);
5
6// Result:
7// [5, 7, 9, 11]
1line([1, 2, 3, 4])
2 .pick(x => x % 2 == 1) // --> only pick odd numbers
3 .pipe(x => x * 2)
4 .collect(console.log);
5
6// Result:
7// [2, 6]
1line([1, 2, 3, 4])
2 .drop(x => x % 2 == 1) // --> drop odd numbers
3 .pipe(x => x * 2)
4 .collect(console.log);
5
6// Result:
7// [4, 8]
Similar to .pipe()
, you can provide async functions to .pick()
or .drop()
:
1line(...).pick(async x => {
2 await stuff();
3 return x % 2 == 1;
4});
1line([1, 2, 3, 4])
2 .peek(console.log)
3 .pipe(x => x * 2)
4 .peek(console.log)
5 .collect(console.log);
6
7// Result:
8// 1, 2, 2, 4, 3, 6, 4, 8
9// [2, 4, 6, 8]
.collect()
The line's transform will be applied to its content and the result, gathered in an array, passed to given callback:
1line([1, 2, 3, 4])
2 .pipe(x => x * 10)
3 .drop(x => x > 35)
4 .collect(r => console.log(r.length));
5
6// Result:
7// 3
The method with which the transform is applied to the line's content is called a processing strategy.
RxLine comes with two simple processing strategies: sequentially
(one by one) and concurrently
(all at once, in parallel). By default, .collect()
uses sequentially
, but you can provide it the processing strategy you need:
1import { of } from 'rxjs';
2import { delay } from 'rxjs/operators';
3import { line, concurrently, sequentially } from 'rxline';
4
5line([1, 2, 3, 4])
6 .pipe(x => of(x * 3).pipe(delay(100 - (10 * x))))
7 .collect(sequentially, console.log);
8
9// Result:
10// [3, 6, 9, 12]
11
12line([1, 2, 3, 4])
13 .pipe(x => of(x * 3).pipe(delay(100 - (10 * x))))
14 .collect(concurrently, console.log);
15
16// Result:
17// [12, 9, 6, 3]
.process()
Similar to .collect()
, but instead of collecting the results in an array,
will return a new line who's content is the processed content of the original line:
1const l1 = line([1, 2, 3, 4]).pipe(x => x * 2);
2l1.content$.subscribe(console.log);
3// 1, 2, 3, 4
4
5const l2 = l1.process();
6l2.content$.subscribe(console.log);
7// 2, 4, 6, 8
Similar to .collect()
, you can provide a processing strategy to .process()
.
By default it will use sequentially
.
1import { of } from 'rxjs';
2import { delay } from 'rxjs/operators';
3import { line, concurrently } from 'rxline';
4
5line([1, 2, 3, 4])
6 .pipe(x => of(x * 3).pipe(delay(100 - (10 * x))))
7 .peek(console.log)
8 .process(concurrently);
9
10// Result:
11// 12, 9, 6, 3