分流即将一个流分成多个流!应该属于组合操作符。发现并没有类似功能操作符,合并流居多,如果有读者发现有类似分流操作符,可以告知我!在此感谢!
类似功能 filter,filter的作用是过滤掉部分杂质,而我需要的是杂质也有用!把杂质当成额外的流引走!相当于挖一个杂质排放通道!下面自己实现一个!
实现 division 分流器
predicate: (value: T, index: number) => boolean 开始截取,也可叫做前缀函数 suffixdicate: (value: T, index: number) => boolean 终止截取,也可叫做后缀函数 observer: Observer 垃圾排放通道
import { MonoTypeOperatorFunction, Observable, Operator, Subscriber, TeardownLogic, Observer} from "rxjs";export function division( predicate: (value: T, index: number) => boolean, suffixdicate: (value: T, index: number) => boolean, observer: Observer ): MonoTypeOperatorFunction { return function divisionOperatorFunction(source: Observable ): Observable { return source.lift(new DivisionOperator(predicate, suffixdicate, observer)); }}class DivisionOperator implements Operator { constructor( private predicate: (value: T, index: number) => boolean, private suffixdicate: (value: T, index: number) => boolean, private observer: Observer ) { } call(subscriber: Subscriber , source: any): TeardownLogic { return source.subscribe(new DivisionSubscriber(subscriber, this.predicate, this.suffixdicate, this.observer)); }}class DivisionSubscriber extends Subscriber { count: number = 0; __isStart: boolean; constructor( destination: Subscriber , private predicate: (value: T, index: number) => boolean, private suffixdicate: (value: T, index: number) => boolean, private observer: Observer , private thisOrgs?: any ) { super(destination); } protected _next(value: T) { let predicate: boolean; let suffixdicate: boolean; try { this.count++; predicate = this.predicate.call(this.thisOrgs, value); suffixdicate = this.suffixdicate.call(this.thisOrgs, value); } catch (err) { this.destination.error(err); return; } if (predicate) { this.__isStart = true; this.observer.next(value); } else if (this.__isStart && !suffixdicate) { this.observer.next(value); } else if (suffixdicate) { this.__isStart = false; this.observer.next(value); this.observer.complete(); } else { this.destination.next(value); } }}复制代码
使用
import { interval, Subject } from "rxjs";import { division } from './index';const oneTo100 = new Subject();interval(100).pipe( division( (value, index) => { return value === 1; }, (value, index) => { return value === 100; }, oneTo100 )).subscribe(res => { console.log(res)})oneTo100.subscribe(res => { console.log(`1-100:${res}`)});复制代码
不仅能实现filter的过滤主流的功能,还实现了将过滤的部分数据转移到其他流中!
深入
问题来了,这仅仅实现了单次过滤,并没有循环利用,相当于一次性卫生纸,擦完就废了!下面是对上面的方案改造升级
class DivisionSubscriberextends Subscriber { count: number = 0; __isStart: boolean; __division: Subject ; constructor( destination: Subscriber , private predicate: (value: T, index: number) => boolean, private suffixdicate: (value: T, index: number) => boolean, private observer: Observer >, private thisOrgs?: any ) { super(destination); } protected _next(value: T) { let predicate: boolean; let suffixdicate: boolean; try { this.count++; predicate = this.predicate.call(this.thisOrgs, value); suffixdicate = this.suffixdicate.call(this.thisOrgs, value); } catch (err) { this.destination.error(err); return; } // 前缀开始 if (predicate) { this.__isStart = true; // 开启一个流 this._createNewDivisionItem(); this.__division.next(value); // 开始但没有结束 } else if (this.__isStart && !suffixdicate) { this.__division.next(value); // 结束 } else if (suffixdicate) { this.__isStart = false; this.__division.next(value); this.__division.complete(); } else { this.destination.next(value); } } private _createNewDivisionItem(): void { this.__division = new Subject(); // 这里没有直接返回this.__division是为了防止下游改变上游数据 const divition = Observable.create((obser: Observer ) => { this.__division.subscribe(obser); }); this.observer.next(divition); }}复制代码
使用
import { interval, Subject, Observable } from "rxjs";import { concatAll, tap } from "rxjs/operators";import { division } from './division';const oneTo100 = new Subject>();interval(100).pipe( division( (value, index) => { return value === 1 || value === 105; }, (value, index) => { return value === 100 || value === 205; }, oneTo100 )).subscribe(res => { console.log(res)})oneTo100.pipe( concatAll()).subscribe(res => { console.log(`垃圾处理:${res}`)});复制代码
继续深入
当匹配字符串时,有时候需要两个或多个一起匹配!比如开始的条件是/结束的条件是/2个最近字符,或者html中开始的是最近四个字符,这种怎么实现呢!苦恼ing!想到了一个操作符叫scan的!模仿他的思路利用bufferCount进行改进
class DivisionSubscriberextends Subscriber { constructor( ... // 加入缓存长度控制 private length: number = 1 ... ) { super(destination); } protected _next(value: T) { let predicate: boolean; let suffixdicate: boolean; try { this.count++; if (this._buffer.length < this.length) { // 如果长度不够 继续push this._buffer.push(value); } else { // 否则去掉第一个后push this._buffer.push(value); // 保存 前this.length个数据 this._buffer = this._buffer.slice(this._buffer.length - this.length); } predicate = this.predicate.call(this.thisOrgs, ...this._buffer); suffixdicate = this.suffixdicate.call(this.thisOrgs, ...this._buffer); } catch (err) { this.destination.error(err); return; } ... 其他不变 } private _createNewDivisionItem(): void { this.__division = new Subject(); // 这里没有直接返回this.__division是为了防止下游改变上游数据 const divition = Observable.create((obser: Observer ) => { this.__division.subscribe(obser); }); this.observer.next(divition); }}复制代码
使用
interval(100).pipe( division( (...values: any[]) => equals([2, 3, 4, 5])(values), (...values: any[]) => equals([99, 100, 101, 102])(values), oneTo100, 4 )).subscribe(res => { console.log(res)})oneTo100.pipe( concatAll()).subscribe(res => { console.log(`垃圾处理:${res}`)});复制代码
到此为止就够我用的了,仅把开发思路整理分享,代码有待优化,欢迎各位大佬指正!共同进步学习
最终完整代码!
import { MonoTypeOperatorFunction, Observable, Operator, Subscriber, TeardownLogic, Observer, Subject} from "rxjs";export function division( predicate: (value: T, index: number) => boolean, suffixdicate: (value: T, index: number) => boolean, observer: Observer >, length: number = 1): MonoTypeOperatorFunction { return function divisionOperatorFunction(source: Observable ): Observable { return source.lift(new DivisionOperator(predicate, suffixdicate, observer, length)); }}class DivisionOperator implements Operator { constructor( private predicate: (value: T, index: number) => boolean, private suffixdicate: (value: T, index: number) => boolean, private observer: Observer >, private length: number = 1 ) { } call(subscriber: Subscriber , source: any): TeardownLogic { return source.subscribe(new DivisionSubscriber(subscriber, this.predicate, this.suffixdicate, this.observer, this.length)); }}class DivisionSubscriber extends Subscriber { count: number = 0; __isStart: boolean; __division: Subject ; _buffer: any[] = []; constructor( destination: Subscriber , private predicate: (value: T, index: number) => boolean, private suffixdicate: (value: T, index: number) => boolean, private observer: Observer >, private length: number = 1, private thisOrgs?: any ) { super(destination); } protected _next(value: T) { let predicate: boolean; let suffixdicate: boolean; try { this.count++; if (this._buffer.length < this.length) { // 如果长度不够 继续push this._buffer.push(value); } else { // 否则去掉第一个后push this._buffer.push(value); // 保存 前this.length个数据 this._buffer = this._buffer.slice(this._buffer.length - this.length); } predicate = this.predicate.call(this.thisOrgs, ...this._buffer); suffixdicate = this.suffixdicate.call(this.thisOrgs, ...this._buffer); } catch (err) { this.destination.error(err); return; } // 前缀开始 predicate = true if (predicate) { this.__isStart = true; // 开启一个流 this._createNewDivisionItem(); this.__division.next(value); // 开始但没有结束 __isStart = true suffixdicate = false } else if (this.__isStart && !suffixdicate) { this.__division.next(value); // 结束 } else if (suffixdicate) { this.__isStart = false; this.__division.next(value); this.__division.complete(); // 没有开始 长度不够 } else if (!predicate && this._buffer.length < this.length) { // console.log('没有开始 长度不够', value); // 没有开始 长度够了 } else if (!predicate && this._buffer.length === this.length) { this.destination.next(this._buffer[0]); } else { this.destination.next(value); } } private _createNewDivisionItem(): void { this.__division = new Subject(); // 这里没有直接返回this.__division是为了防止下游改变上游数据 const divition = Observable.create((obser: Observer ) => { this.__division.subscribe(obser); }); this.observer.next(divition); }}复制代码