Create an arbitrary line using line() function:
1import { line } from 'rxline';
2
3line([1, 2, 3, 4]);                  // --> a line from an array
1async function whatever() { 
2  await stuff(); 
3  return someArray;
4}
5
6line(whatever());                    // --> a line from a promised array
1import { interval } from 'rxjs';
2import { take } from 'rxjs/operators';
3
4line(interval(1000).pipe(take(23))); // --> a line from an `Observable`
A line is basically some content and a transform that is to be applied on the line's content. Applying the transform to line's content is called processing the line.
You can access the content of a line via its .content$ property, which returns an Observable:
1line([1, 2, 3, 4]).content$.subscribe(console.log);
2
3// Result:
4// 1, 2, 3, 4
1line([1, 2, 3, 4]).pipe(x => x * 2);
This will yield a new line who has the same content but a different transform. 
Incoming transforms are combined with previous ones, so you can chain .pipe() multiple times:
1line([1, 2, 3, 4])    // --> transform is x => x
2  .pipe(x => x * 2)   // --> transform is x => x * 2
3  .pipe(x => x + 3);  // --> transform is x => (x * 2) + 3
Or provide multiple arguments (which is equivalent to chaining):
1line([1, 2, 3, 4])    // --> transform is x => x
2  .pipe(x => x * 2,   // --> transform is x => x * 2
3        x => x + 3);  // --> transform is x => (x * 2) + 3
You can also use .pipe() method to add asynchronous transforms:
1line([1, 2, 3, 4])
2  .pipe(async x => {
3    await something();
4    return x * 3;
5  });
Or observable functions:
1import { of } from 'rxjs';
2import { delay } from 'rxjs/operators';
3import { line } from 'rxline';
4
5line([1, 2, 3, 4]).pipe(x => of(x).pipe(delay(100)));
You can use .collect() method to apply the final transform and collect the results as an array 
(more on .collect()'s details later):
1line([1, 2, 3, 4])
2  .pipe(x => x * 2)
3  .pipe(x => x + 3)
4  .collect(console.log);
5
6// Result:
7// [5, 7, 9, 11]
1line([1, 2, 3, 4])
2  .pick(x => x % 2 == 1)   // --> only pick odd numbers
3  .pipe(x => x * 2)
4  .collect(console.log);
5
6// Result:
7// [2, 6]
1line([1, 2, 3, 4])
2  .drop(x => x % 2 == 1)   // --> drop odd numbers
3  .pipe(x => x * 2)
4  .collect(console.log);
5
6// Result:
7// [4, 8]
Similar to .pipe(), you can provide async functions to .pick() or .drop():
1line(...).pick(async x => {
2  await stuff();
3  return x % 2 == 1;
4});
1line([1, 2, 3, 4])
2  .peek(console.log)
3  .pipe(x => x * 2)
4  .peek(console.log)
5  .collect(console.log);
6
7// Result:
8// 1, 2, 2, 4, 3, 6, 4, 8
9// [2, 4, 6, 8]
.collect()The line's transform will be applied to its content and the result, gathered in an array, passed to given callback:
1line([1, 2, 3, 4])
2  .pipe(x => x * 10)
3  .drop(x => x > 35)
4  .collect(r => console.log(r.length));
5
6// Result:
7// 3
The method with which the transform is applied to the line's content is called a processing strategy. 
RxLine comes with two simple processing strategies: sequentially (one by one) and concurrently 
(all at once, in parallel). By default, .collect() uses sequentially, but you can provide it the processing strategy you need:
1import { of } from 'rxjs';
2import { delay } from 'rxjs/operators';
3import { line, concurrently, sequentially } from 'rxline';
4
5line([1, 2, 3, 4])
6  .pipe(x => of(x * 3).pipe(delay(100 - (10 * x))))
7  .collect(sequentially, console.log);
8
9// Result:
10// [3, 6, 9, 12]
11
12line([1, 2, 3, 4])
13  .pipe(x => of(x * 3).pipe(delay(100 - (10 * x))))
14  .collect(concurrently, console.log);
15
16// Result:
17// [12, 9, 6, 3]
.process()Similar to .collect(), but instead of collecting the results in an array, 
will return a new line who's content is the processed content of the original line:
1const l1 = line([1, 2, 3, 4]).pipe(x => x * 2);
2l1.content$.subscribe(console.log);
3// 1, 2, 3, 4
4
5const l2 = l1.process();
6l2.content$.subscribe(console.log);
7// 2, 4, 6, 8
Similar to .collect(), you can provide a processing strategy to .process(). 
By default it will use sequentially.
1import { of } from 'rxjs';
2import { delay } from 'rxjs/operators';
3import { line, concurrently } from 'rxline';
4
5line([1, 2, 3, 4])
6  .pipe(x => of(x * 3).pipe(delay(100 - (10 * x))))
7  .peek(console.log)
8  .process(concurrently);
9
10// Result:
11// 12, 9, 6, 3