0%

Android笔记—Rxjava2


资料来源如下
给初学者的RxJava2.0教程(demo代码来源)

http://www.jianshu.com/u/c50b715ccaeb

编程环境

  • Android Studio 2.2.3

  • 在Gradle配置:
    1
    2
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

观察者模式

  • 在Eventbus中亦涉及了相关概念,比较简单.包括Observable(被观察者)、Observer(观察者)、subscribe().事件由Observable(被观察者)开始发出,通过subscribe()最终被传递到Observer(观察者).而整个过程中你是站在Observer(观察者)的位置,也就是事件的末尾,观察Observable(被观察者).

  • 上图

  • demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    emitter.onNext(1);
    emitter.onComplete();
    }
    }).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
    Log.d(TAG, "subscribe");
    }

    @Override
    public void onNext(Integer value) {
    Log.d(TAG, "" + value);
    }

    @Override
    public void onError(Throwable e) {
    Log.d(TAG, "error");
    }

    @Override
    public void onComplete() {
    Log.d(TAG, "complete");
    }
    });
  • 事件由emitter.onNext(1);开始,最终被public void onNext(Integer value)相应.被观察者事件发送结束调用emitter.onComplete();,同时观察者最终以public void onComplete()相应.


  • note: 上下游以.subscribe建立连接后,事件才会开始发送.

Observable(被观察者) Observer(观察者)

  • ObservableEmitter
    ObservableEmitter: Emitter意为发射器,事件发送.onNext(T value)、onComplete()和onError(Throwable error)分别对应next事件、complete事件和error事件。
  • Observer中onNext(Integer value)、onError(Throwable e)、onComplete()对应接受next事件、complete事件和error事件
  • 被观察者发送complete事件和error事件后,观察者接受后不再继续响应事件,即使被观察者还在发送事件.complete事件和error事件互斥.
  • 在Observer中,调用Disposable.dispose(),切断管道.被观察者继续发送,但观察者不再响应.

subscribe

  • 建立Observable(被观察者) Observer(观察者)之间的管道.有多个重载.

  • 重载

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public final Disposable subscribe() {}

    public final Disposable subscribe(Consumer<? super T> onNext) {}

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}

    public final void subscribe(Observer<? super T> observer) {}
  • 重载说明

    • 不带参数 : 观察者不关心任何事件(有卵用😵)
    • 只带onNext : 观察者只响应next事件.
    • 其他类似….演绎推理….
    • 最后一个是传入完整的Observer对象.(demo就是🙃)

操作符

  • 基于Rxjava的观察者模式可以拆分大多数的业务逻辑,即使再增加很多功能整体也不会过于混乱.
  • 但Rxjava的强大并不局限在拆分逻辑.由被观察者到观察者的整个事件传递过程,基于Rxjava我们可以任意拆分 合并 转换 事件、切换线程等.

  • note: 操作符搭配 Lambda 表达式食用更佳 🤣

创建

.just

  • 对于简单的几个数据,直接使用just发送即可,无需创建 Obserable 对象.just最多可以接收 10 个参数.

  • demo

    1
    2
    Observable.just("test","test2")
    .subscribe(str -> Log.i("tag", str));

    相当于顺序调用onNext(“test”)和onNext(“test2”),最后调用onComplete方法。

.fromArray

  • 功能与just类似但fromArray来接收任意长度的数据数组,也可以直接传入数组fromArray(new int[]{1, 2, 3})

  • demo

    1
    2
    Observable.fromArray(1, 2, 3, 4, 5)
    .subscribe(integer -> Log.i("tag", String.valueOf(integer)));

    fromArray不支持直接传入list进,list会被当作一个整体发送.

.fromIterable

  • 功能与fromArray类似,但是可以接收 list 类型,遍历可迭代数据集合.
  • demo
    1
    2
    3
    4
    5
    6
    7
    8
    List<String> list = new ArrayList<>();
    list.add("a");
    list.add("b");
    list.add("c");

    Flowable.fromIterable(list).subscribe(
    s -> Log.i("tag", s)
    );

.timer

  • 指定一段时间间隔后发送数据(一次性),不太常用.

线程切换

  • 默认事件传递的双方在同一线程工作.

  • demo

    1
    2
    3
    4
    5
    observable.subscribeOn(Schedulers.newThread())
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .observeOn(Schedulers.io())
    .subscribe(consumer);
  • 方法:

    • .subscribeOn() : 指定被观察者发送事件线程,仅第一次调用时有效!
    • .observeOn() : 指定观察者/流变换(对发送的事件处理🖖)线程,多次调用,多次切换有效
  • 参数:

    • Schedulers.io() : 适用于io密集型操作,网络通信、磁盘操作等.
    • Schedulers.computation() : CPU密集操作,需要大量CPU计算的操作.
    • Schedulers.newThread() : 创建新线程.
    • AndroidSchedulers.mainThread() : Android主线程,通常为更新UI等.

过滤

.filter

  • 基本过滤操作符,按照任意自定规则过滤.

组合

转换

.map

  • 处理前后事件数量之比1:1,事件变换前后顺序不变

  • map作用是对Observable发送的每一个事件,应用处理变换函数,再继续像下游发送.中间过程可以转换事件类型、改变事件内容等等.只需要变换后的事件类型与下游接收的类型匹配即可.

  • demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    }
    }).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer integer) throws Exception {
    return "This is result " + integer;
    }
    }).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
    Log.d(TAG, s);
    }
    });

    这里是把int类型转换为了string类型,与观察者接收的类型匹配即可.

.flatMap

  • 处理前后事件数量之比 1:n,事件变换前后顺序不保证

  • flatMap,通俗点就是把Observable发送的事件拆散变换再,继续像下游发送.1个Observable事件可拆成任意个.只需要变换后的事件类型与下游接收的类型匹配即可.

  • demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    }
    }).flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
    final List<String> list = new ArrayList<>();
    for (int i = 0; i < 3; i++) {
    list.add("I am value " + integer);
    }
    return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
    }
    }).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
    Log.d(TAG, s);
    }
    });

    这里是flatMap把一个int类型事件拆成了3个String类型,运行结果看,最终事件到达顺序与onNext(1);onNext(2);的发送顺序无关

.concatMap

  • 处理前后事件数量之比 1:n,事件变换前后顺序按顺序
  • 与flatMap作用相同,只是保证了事件严格按顺序达到下游.
  • demo 就不上了,直接替换flatMap的位置就好.

.zip

  • 处理前后事件数量之比 n:1,事件变换前后顺序严格按顺序

  • zip.最常见的压缩文件格式,在这里也是类似的意思,zip可以严格按照顺序合并多个 不同类型 Observable发送的事件.总的发送事件数量与上游Observable发送最少的那个数量相同.

  • demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    Log.d(TAG, "emit 1");
    emitter.onNext(1);
    Thread.sleep(1000);

    Log.d(TAG, "emit 2");
    emitter.onNext(2);
    Thread.sleep(1000);

    Log.d(TAG, "emit 3");
    emitter.onNext(3);
    Thread.sleep(1000);

    Log.d(TAG, "emit 4");
    emitter.onNext(4);
    Thread.sleep(1000);

    Log.d(TAG, "emit complete1");
    emitter.onComplete();
    }
    }).subscribeOn(Schedulers.io());

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
    @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    Log.d(TAG, "emit A");
    emitter.onNext("A");
    Thread.sleep(1000);

    Log.d(TAG, "emit B");
    emitter.onNext("B");
    Thread.sleep(1000);

    Log.d(TAG, "emit C");
    emitter.onNext("C");
    Thread.sleep(1000);

    Log.d(TAG, "emit complete2");
    emitter.onComplete();
    }
    }).subscribeOn(Schedulers.io());

    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
    @Override
    public String apply(Integer integer, String s) throws Exception {
    return integer + s;
    }
    }).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    Log.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String value) {
    Log.d(TAG, "onNext: " + value);
    }

    @Override
    public void onError(Throwable e) {
    Log.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
    Log.d(TAG, "onComplete");
    }
    });

    这里两个事件发送在同一线程中.当两个事件发送不再同一线程时,情况类似,不过当异步时,数量较少的事件发送完成,发送Complete事件后,通道随即被切断.

.Concat

  • 处理前后事件数量之比 n:1,事件变换前后顺序严格按顺序
  • Concat,可以严格按照顺序合并 相同类型 Observable发送的事件.

Backpressure


  • 被翻译为背压…(如此文不达意的直译,能忍?往下都是因为原文..😈)

  • 其实概念有够简单:将整个事件产生/传递/处理的过程想象为一条河流由上而下, Backpressure 指的是上游产生的事件太快,远远超过了下游的处理速度,以至于缓冲区溢出.上游来了洪水,下游径流量不够,以至于中间河道跨过了堤岸,溢出.

Flowable基础

  • Rxjava 1.x中需要自行通过操作符处理,到了2.0中,则有了专门对付发洪水上游的被观察者- Flowable .我们常用的 observable 在2.x中一般用于不涉及 Backpressure 的情况.而对应与 observable 的 Observer ,改为了 Subscriber .

  • demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
    Log.d(TAG, "emit 1");
    emitter.onNext(1);
    Log.d(TAG, "emit 2");
    emitter.onNext(2);
    Log.d(TAG, "emit 3");
    emitter.onNext(3);
    Log.d(TAG, "emit complete");
    emitter.onComplete();
    }
    }, BackpressureStrategy.ERROR); //增加了一个参数

    Subscriber<Integer> downstream = new Subscriber<Integer>() {

    @Override
    public void onSubscribe(Subscription s) {
    Log.d(TAG, "onSubscribe");
    s.request(Long.MAX_VALUE); //注意这句代码
    }

    @Override
    public void onNext(Integer integer) {
    Log.d(TAG, "onNext: " + integer);

    }

    @Override
    public void onError(Throwable t) {
    Log.w(TAG, "onError: ", t);
    }

    @Override
    public void onComplete() {
    Log.d(TAG, "onComplete");
    }
    };

    upstream.subscribe(downstream);
  • 注意两个地方

    • Flowable创建时比 observable 多了一个参数.(参数作用下节说明)
    • Subscriber中调用了 s.request .
  • Flowable 与 Observable 最大的不同就是 Flowable再次发送事件需要等待 Subscriber 中调用 .request

  • .request() 实质上是下游告知上游自己的处理能力,使得上游根据下游处理能力发送事件.多次调用,上游表示处理能力的数字会叠加,上游每发送一个事件,该数字减一,到0抛出异常

    • 上下游在同一线程时,下游没有或没有及时调用 .request ,上游会抛出异常
    • 异步线程时,下游即使没有调用 .request 会有128个事件的缓存区.上游可继续发出事件,缓存区超出128个事件后,抛出异常.

Flowable拓展

  • 这里对 Flowable 多的参数进行说明.

  • 参数
    • BackpressureStrategy.BUFFER : 默认缓存区128,这个参数极大拓展了缓存区,使得 Flowable 表现与 Observable 差不多.
    • BackpressureStrategy.DROP : 128缓存区满了,就丢弃上游事件,直到下游处理了一个事件,缓存区 -1 ,再允许存入新的上游事件.
    • BackpressureStrategy.LATEST : 永远保存最后达到的128个上游事件,上游有新的事件到达满载的缓存区时,丢弃第一个存入缓存区的上游事件.

  • 对于不是由我们编写的 Flowable 也可以通过 interval 操作符来加工.

    1
    2
    Flowable.interval(1, TimeUnit.MICROSECONDS)
    .onBackpressureDrop() //加上 Backpressure 策略
  • 对应上文,指定参数有3,意思同上.

    • onBackpressureBuffer()
    • onBackpressureDrop()
    • onBackpressureLatest()