博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
《记》rxjs分流操作符简单实现
阅读量:5973 次
发布时间:2019-06-19

本文共 9134 字,大约阅读时间需要 30 分钟。

分流即将一个流分成多个流!应该属于组合操作符。发现并没有类似功能操作符,合并流居多,如果有读者发现有类似分流操作符,可以告知我!在此感谢!

类似功能 filter,filter的作用是过滤掉部分杂质,而我需要的是杂质也有用!把杂质当成额外的流引走!相当于挖一个杂质排放通道!下面自己实现一个!

实现 division 分流器

predicate: (value: T, index: number) => boolean 开始截取,也可叫做前缀函数 suffixdicate: (value: T, index: number) => boolean 终止截取,也可叫做后缀函数 observer: Observer 垃圾排放通道

import {    MonoTypeOperatorFunction, Observable,    Operator, Subscriber, TeardownLogic, Observer} from "rxjs";export function division
( predicate: (value: T, index: number) => boolean, suffixdicate: (value: T, index: number) => boolean, observer: Observer
): MonoTypeOperatorFunction
{ return function divisionOperatorFunction(source: Observable
): Observable
{ return source.lift(new DivisionOperator(predicate, suffixdicate, observer)); }}class DivisionOperator
implements Operator
{ constructor( private predicate: (value: T, index: number) => boolean, private suffixdicate: (value: T, index: number) => boolean, private observer: Observer
) { } call(subscriber: Subscriber
, source: any): TeardownLogic { return source.subscribe(new DivisionSubscriber(subscriber, this.predicate, this.suffixdicate, this.observer)); }}class DivisionSubscriber
extends Subscriber
{ count: number = 0; __isStart: boolean; constructor( destination: Subscriber
, private predicate: (value: T, index: number) => boolean, private suffixdicate: (value: T, index: number) => boolean, private observer: Observer
, private thisOrgs?: any ) { super(destination); } protected _next(value: T) { let predicate: boolean; let suffixdicate: boolean; try { this.count++; predicate = this.predicate.call(this.thisOrgs, value); suffixdicate = this.suffixdicate.call(this.thisOrgs, value); } catch (err) { this.destination.error(err); return; } if (predicate) { this.__isStart = true; this.observer.next(value); } else if (this.__isStart && !suffixdicate) { this.observer.next(value); } else if (suffixdicate) { this.__isStart = false; this.observer.next(value); this.observer.complete(); } else { this.destination.next(value); } }}复制代码

使用

import { interval, Subject } from "rxjs";import { division } from './index';const oneTo100 = new Subject();interval(100).pipe(    division(        (value, index) => {            return value === 1;        },        (value, index) => {            return value === 100;        },        oneTo100    )).subscribe(res => {    console.log(res)})oneTo100.subscribe(res => {    console.log(`1-100:${res}`)});复制代码

不仅能实现filter的过滤主流的功能,还实现了将过滤的部分数据转移到其他流中!

深入

问题来了,这仅仅实现了单次过滤,并没有循环利用,相当于一次性卫生纸,擦完就废了!下面是对上面的方案改造升级

class DivisionSubscriber
extends Subscriber
{ count: number = 0; __isStart: boolean; __division: Subject
; constructor( destination: Subscriber
, private predicate: (value: T, index: number) => boolean, private suffixdicate: (value: T, index: number) => boolean, private observer: Observer
>, private thisOrgs?: any ) { super(destination); } protected _next(value: T) { let predicate: boolean; let suffixdicate: boolean; try { this.count++; predicate = this.predicate.call(this.thisOrgs, value); suffixdicate = this.suffixdicate.call(this.thisOrgs, value); } catch (err) { this.destination.error(err); return; } // 前缀开始 if (predicate) { this.__isStart = true; // 开启一个流 this._createNewDivisionItem(); this.__division.next(value); // 开始但没有结束 } else if (this.__isStart && !suffixdicate) { this.__division.next(value); // 结束 } else if (suffixdicate) { this.__isStart = false; this.__division.next(value); this.__division.complete(); } else { this.destination.next(value); } } private _createNewDivisionItem(): void { this.__division = new Subject(); // 这里没有直接返回this.__division是为了防止下游改变上游数据 const divition = Observable.create((obser: Observer
) => { this.__division.subscribe(obser); }); this.observer.next(divition); }}复制代码

使用

import { interval, Subject, Observable } from "rxjs";import { concatAll, tap } from "rxjs/operators";import { division } from './division';const oneTo100 = new Subject
>();interval(100).pipe( division( (value, index) => { return value === 1 || value === 105; }, (value, index) => { return value === 100 || value === 205; }, oneTo100 )).subscribe(res => { console.log(res)})oneTo100.pipe( concatAll()).subscribe(res => { console.log(`垃圾处理:${res}`)});复制代码

继续深入

当匹配字符串时,有时候需要两个或多个一起匹配!比如开始的条件是/结束的条件是/2个最近字符,或者html中开始的是最近四个字符,这种怎么实现呢!苦恼ing!想到了一个操作符叫scan的!模仿他的思路利用bufferCount进行改进

class DivisionSubscriber
extends Subscriber
{ constructor( ... // 加入缓存长度控制 private length: number = 1 ... ) { super(destination); } protected _next(value: T) { let predicate: boolean; let suffixdicate: boolean; try { this.count++; if (this._buffer.length < this.length) { // 如果长度不够 继续push this._buffer.push(value); } else { // 否则去掉第一个后push this._buffer.push(value); // 保存 前this.length个数据 this._buffer = this._buffer.slice(this._buffer.length - this.length); } predicate = this.predicate.call(this.thisOrgs, ...this._buffer); suffixdicate = this.suffixdicate.call(this.thisOrgs, ...this._buffer); } catch (err) { this.destination.error(err); return; } ... 其他不变 } private _createNewDivisionItem(): void { this.__division = new Subject(); // 这里没有直接返回this.__division是为了防止下游改变上游数据 const divition = Observable.create((obser: Observer
) => { this.__division.subscribe(obser); }); this.observer.next(divition); }}复制代码

使用

interval(100).pipe(    division(        (...values: any[]) => equals([2, 3, 4, 5])(values),        (...values: any[]) => equals([99, 100, 101, 102])(values),        oneTo100,        4    )).subscribe(res => {    console.log(res)})oneTo100.pipe(    concatAll()).subscribe(res => {    console.log(`垃圾处理:${res}`)});复制代码

到此为止就够我用的了,仅把开发思路整理分享,代码有待优化,欢迎各位大佬指正!共同进步学习

最终完整代码!

import {    MonoTypeOperatorFunction, Observable,    Operator, Subscriber, TeardownLogic, Observer, Subject} from "rxjs";export function division
( predicate: (value: T, index: number) => boolean, suffixdicate: (value: T, index: number) => boolean, observer: Observer
>, length: number = 1): MonoTypeOperatorFunction
{ return function divisionOperatorFunction(source: Observable
): Observable
{ return source.lift(new DivisionOperator(predicate, suffixdicate, observer, length)); }}class DivisionOperator
implements Operator
{ constructor( private predicate: (value: T, index: number) => boolean, private suffixdicate: (value: T, index: number) => boolean, private observer: Observer
>, private length: number = 1 ) { } call(subscriber: Subscriber
, source: any): TeardownLogic { return source.subscribe(new DivisionSubscriber(subscriber, this.predicate, this.suffixdicate, this.observer, this.length)); }}class DivisionSubscriber
extends Subscriber
{ count: number = 0; __isStart: boolean; __division: Subject
; _buffer: any[] = []; constructor( destination: Subscriber
, private predicate: (value: T, index: number) => boolean, private suffixdicate: (value: T, index: number) => boolean, private observer: Observer
>, private length: number = 1, private thisOrgs?: any ) { super(destination); } protected _next(value: T) { let predicate: boolean; let suffixdicate: boolean; try { this.count++; if (this._buffer.length < this.length) { // 如果长度不够 继续push this._buffer.push(value); } else { // 否则去掉第一个后push this._buffer.push(value); // 保存 前this.length个数据 this._buffer = this._buffer.slice(this._buffer.length - this.length); } predicate = this.predicate.call(this.thisOrgs, ...this._buffer); suffixdicate = this.suffixdicate.call(this.thisOrgs, ...this._buffer); } catch (err) { this.destination.error(err); return; } // 前缀开始 predicate = true if (predicate) { this.__isStart = true; // 开启一个流 this._createNewDivisionItem(); this.__division.next(value); // 开始但没有结束 __isStart = true suffixdicate = false } else if (this.__isStart && !suffixdicate) { this.__division.next(value); // 结束 } else if (suffixdicate) { this.__isStart = false; this.__division.next(value); this.__division.complete(); // 没有开始 长度不够 } else if (!predicate && this._buffer.length < this.length) { // console.log('没有开始 长度不够', value); // 没有开始 长度够了 } else if (!predicate && this._buffer.length === this.length) { this.destination.next(this._buffer[0]); } else { this.destination.next(value); } } private _createNewDivisionItem(): void { this.__division = new Subject(); // 这里没有直接返回this.__division是为了防止下游改变上游数据 const divition = Observable.create((obser: Observer
) => { this.__division.subscribe(obser); }); this.observer.next(divition); }}复制代码

转载地址:http://pzdox.baihongyu.com/

你可能感兴趣的文章
我的友情链接
查看>>
Linux软RAID操作与实现学习笔记
查看>>
为Angularjs ngOptions加上index解决方案
查看>>
hive启动问题 Unable to start Hive Cli
查看>>
虚拟化系列-Windows server 2012 网络管理
查看>>
gridview 获取当前行的index ,按钮的click事件
查看>>
Linux常用命令
查看>>
vmstat命令的含义
查看>>
无线自主AP漫游
查看>>
【翻译】在Ext JS应用程序中构建可维护的控制器
查看>>
C#开发学习——SqlHelper的应用
查看>>
第四周作业
查看>>
Chem 3D中怎么创建立体模型
查看>>
5月8号打卡
查看>>
[转]各种排序算法的分析及java实现
查看>>
用扩展开发一个PHP类
查看>>
How Delete File with Readonly Permission?
查看>>
[MSSQL2005]再看CTE
查看>>
Ubuntu安装Fcitx(小企鹅五笔输入法)
查看>>
例:进店买衣服案例
查看>>