Android笔记—Rxjava2
资料来源如下
给初学者的RxJava2.0教程(demo代码来源)
http://www.jianshu.com/u/c50b715ccaeb
编程环境
- Android Studio 2.2.3
- 在Gradle配置:
1
2compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
观察者模式
在Eventbus中亦涉及了相关概念,比较简单.包括Observable(被观察者)、Observer(观察者)、subscribe().事件由Observable(被观察者)开始发出,通过subscribe()最终被传递到Observer(观察者).而整个过程中你是站在Observer(观察者)的位置,也就是事件的末尾,观察Observable(被观察者).
上图
demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});事件由
emitter.onNext(1);
开始,最终被public void onNext(Integer value)
相应.被观察者事件发送结束调用emitter.onComplete();
,同时观察者最终以public void onComplete()
相应.
- note: 上下游以
.subscribe
建立连接后,事件才会开始发送.
Observable(被观察者) Observer(观察者)
- ObservableEmitter
ObservableEmitter: Emitter意为发射器,事件发送.onNext(T value)、onComplete()和onError(Throwable error)分别对应next事件、complete事件和error事件。 - Observer
中onNext(Integer value)、onError(Throwable e)、onComplete()对应接受next事件、complete事件和error事件 - 被观察者发送complete事件和error事件后,观察者接受后不再继续响应事件,即使被观察者还在发送事件.complete事件和error事件互斥.
- 在Observer
中,调用Disposable.dispose(),切断管道.被观察者继续发送,但观察者不再响应.
subscribe
- 建立Observable(被观察者) Observer(观察者)之间的管道.有多个重载.
- 重载
1
2
3
4
5
6
7
8
9
10
11public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {} - 重载说明
- 不带参数 : 观察者不关心任何事件(有卵用😵)
- 只带onNext : 观察者只响应next事件.
- 其他类似….演绎推理….
- 最后一个是传入完整的Observer对象.(demo就是🙃)
操作符
- 基于Rxjava的观察者模式可以拆分大多数的业务逻辑,即使再增加很多功能整体也不会过于混乱.
- 但Rxjava的强大并不局限在拆分逻辑.由被观察者到观察者的整个事件传递过程,基于Rxjava我们可以任意拆分 合并 转换 事件、切换线程等.
- note: 操作符搭配 Lambda 表达式食用更佳 🤣
创建
- 产生并发送 Obserable 事件.
- 仅常用,详细在 RxJava 2.x 使用详解(二) 创建操作符
.creat
- 前面demo中已经实际使用过了
- 用于产生一个 Obserable 被观察者对象,demo如上所示.
.just
- 对于简单的几个数据,直接使用just发送即可,无需创建 Obserable 对象.just最多可以接收 10 个参数.
- demo相当于顺序调用onNext(“test”)和onNext(“test2”),最后调用onComplete方法。
1
2Observable.just("test","test2")
.subscribe(str -> Log.i("tag", str));
.fromArray
- 功能与just类似但fromArray来接收任意长度的数据数组,也可以直接传入数组
fromArray(new int[]{1, 2, 3})
- demofromArray不支持直接传入list进,list会被当作一个整体发送.
1
2Observable.fromArray(1, 2, 3, 4, 5)
.subscribe(integer -> Log.i("tag", String.valueOf(integer)));
.fromIterable
- 功能与fromArray类似,但是可以接收 list 类型,遍历可迭代数据集合.
- demo
1
2
3
4
5
6
7
8List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");
Flowable.fromIterable(list).subscribe(
s -> Log.i("tag", s)
);
.timer
- 指定一段时间间隔后发送数据(一次性),不太常用.
线程切换
默认事件传递的双方在同一线程工作.
demo
1
2
3
4
5observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(consumer);方法:
- .subscribeOn() : 指定被观察者发送事件线程,仅第一次调用时有效!
- .observeOn() : 指定观察者/流变换(对发送的事件处理🖖)线程,多次调用,多次切换有效
参数:
- Schedulers.io() : 适用于io密集型操作,网络通信、磁盘操作等.
- Schedulers.computation() : CPU密集操作,需要大量CPU计算的操作.
- Schedulers.newThread() : 创建新线程.
- AndroidSchedulers.mainThread() : Android主线程,通常为更新UI等.
过滤
- 对 Obserable 事件筛选.
- 仅常用,详细在RxJava 2.x 使用详解(三) 过滤操作符
.filter
- 基本过滤操作符,按照任意自定规则过滤.
组合
转换
.map
- 处理前后事件数量之比1:1,事件变换前后顺序不变
- map作用是对Observable发送的每一个事件,应用处理变换函数,再继续像下游发送.中间过程可以转换事件类型、改变事件内容等等.只需要变换后的事件类型与下游接收的类型匹配即可.
- demo这里是把int类型转换为了string类型,与观察者接收的类型匹配即可.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
.flatMap
- 处理前后事件数量之比 1:n,事件变换前后顺序不保证
- flatMap,通俗点就是把Observable发送的事件拆散变换再,继续像下游发送.1个Observable事件可拆成任意个.只需要变换后的事件类型与下游接收的类型匹配即可.
- demo这里是flatMap把一个int类型事件拆成了3个String类型,运行结果看,最终事件到达顺序与onNext(1);onNext(2);的发送顺序无关
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
.concatMap
- 处理前后事件数量之比 1:n,事件变换前后顺序按顺序
- 与flatMap作用相同,只是保证了事件严格按顺序达到下游.
- demo 就不上了,直接替换flatMap的位置就好.
.zip
- 处理前后事件数量之比 n:1,事件变换前后顺序严格按顺序
- zip.最常见的压缩文件格式,在这里也是类似的意思,zip可以严格按照顺序合并多个 不同类型 Observable发送的事件.总的发送事件数量与上游Observable发送最少的那个数量相同.
- demo这里两个事件发送在同一线程中.当两个事件发送不再同一线程时,情况类似,不过当异步时,数量较少的事件发送完成,发送Complete事件后,通道随即被切断.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
.Concat
- 处理前后事件数量之比 n:1,事件变换前后顺序严格按顺序
- Concat,可以严格按照顺序合并 相同类型 Observable发送的事件.
Backpressure
- 被翻译为背压…(如此文不达意的直译,能忍?往下都是因为原文..😈)
- 其实概念有够简单:将整个事件产生/传递/处理的过程想象为一条河流由上而下, Backpressure 指的是上游产生的事件太快,远远超过了下游的处理速度,以至于缓冲区溢出.上游来了洪水,下游径流量不够,以至于中间河道跨过了堤岸,溢出.
Flowable基础
Rxjava 1.x中需要自行通过操作符处理,到了2.0中,则有了专门对付发洪水上游的被观察者- Flowable .我们常用的 observable 在2.x中一般用于不涉及 Backpressure 的情况.而对应与 observable 的 Observer ,改为了 Subscriber .
demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一个参数
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE); //注意这句代码
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
upstream.subscribe(downstream);注意两个地方
- Flowable创建时比 observable 多了一个参数.(参数作用下节说明)
- Subscriber中调用了 s.request .
Flowable 与 Observable 最大的不同就是 Flowable再次发送事件需要等待 Subscriber 中调用 .request
.request() 实质上是下游告知上游自己的处理能力,使得上游根据下游处理能力发送事件.多次调用,上游表示处理能力的数字会叠加,上游每发送一个事件,该数字减一,到0抛出异常
- 上下游在同一线程时,下游没有或没有及时调用 .request ,上游会抛出异常
- 异步线程时,下游即使没有调用 .request 会有128个事件的缓存区.上游可继续发出事件,缓存区超出128个事件后,抛出异常.
Flowable拓展
- 这里对 Flowable 多的参数进行说明.
- 参数
- BackpressureStrategy.BUFFER : 默认缓存区128,这个参数极大拓展了缓存区,使得 Flowable 表现与 Observable 差不多.
- BackpressureStrategy.DROP : 128缓存区满了,就丢弃上游事件,直到下游处理了一个事件,缓存区 -1 ,再允许存入新的上游事件.
- BackpressureStrategy.LATEST : 永远保存最后达到的128个上游事件,上游有新的事件到达满载的缓存区时,丢弃第一个存入缓存区的上游事件.
- 对于不是由我们编写的 Flowable 也可以通过 interval 操作符来加工.
1
2Flowable.interval(1, TimeUnit.MICROSECONDS)
.onBackpressureDrop() //加上 Backpressure 策略 - 对应上文,指定参数有3,意思同上.
- onBackpressureBuffer()
- onBackpressureDrop()
- onBackpressureLatest()
相关文章