限韩令什么时候解除需要解除RxJava的订阅

RxJava学习总结 - 简书
RxJava学习总结
什么是RxJava
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
RxJava是JVM的响应式扩展(ReactiveX),它是通过使用可观察的序列将异步和基于事件的程序组合起来的一个库。
(1)观察者模式
RxJava用到了设计模式中的观察者模式。支持数据或事件序列,允许对序列进行组合,并对线程、同步和并发数据结构进行了抽象。
无依赖库、Jar包小于1M
(3)支持多语言
支持Java 6+和Android 2.3+。RxJava设计初衷就是兼容所有JVM语言,目前支持的JVM语言有Groovy,Clojure,JRuby,Kotlin和Scala。
(4)多线程支持
封装了各种并发实现,如threads, pools, event loops, fibers, actors。
3. RxJava vs. Java
为了便于大家更好的理解这个新伙伴,学姐总结了RxJava和Java的异同。下面从关于异步序列,数据获取方式,数据传递方式,增强功能4个方面来阐述。
(1)关于异步序列
通常我们获取一个同步对象,可以这么写T getData();获取一个异步对象,可以这么写Future&T& getData();而获取一个同步序列,可以这么写Iterable&T& getData()。那获取一个异步序列呢,Java没有提供相应方法,RxJava填充了这一空白,我们可以这么写Observable&T& getData(),关于Observable的相关介绍稍后会有。
(2)数据获取方式
Java中如果不使用观察者模式,数据都是主动获取,即Pull方式,对于列表数据,也是使用Iterator轮询获取。RxJava由于用到了观察者模式,数据是被动获取,由被观察者向观察者发出通知,即Push方式。
(3)数据传递方式
对于同步数据操作,Java中可以顺序传递结果,即operation1 -& operation2 -& operation3。异步操作通常则需要使用Callback回调,然后在回调中继续后续操作,即Callback1 -& Callback2 -& Callback3,可能会存在很多层嵌套。而RxJava同步和异步都是链式调用,即operation1 -& operation2 -& operation3,这种做法的好处就是即时再复杂的逻辑都简单明了,不容易出错。
(4)增强功能
比观察者模式功能更强大,在onNext()回调方法基础上增加了onCompleted()和OnError(),当事件执行完或执行出错时回调。此外还可以很方便的切换事件生产和消费的线程。事件还可以组合处理。
说了这么多,然而好像并没有什么用,还是来个例子吧。
假设需要找出某个本地url列表中的图片本地目录,并且加载对应的图片,展现在UI上。
new Thread() {
public void run() {
super.run();
for (String url : urls) {
if (url.endsWith(".png")) {
final Bitmap bitmap = getBitmap(url);
getActivity().runOnUiThread(new Runnable() {
public void run() {
imageView.setImageBitmap(bitmap);
}.start();
而使用RxJava后,代码是这样的:
Observable.from(urls)
.filter(new Func1&String, Boolean&() {
public Boolean call(String url) {
return url.endsWith(".png");
.map(new Func1&String, Bitmap&() {
public Bitmap call(String url) {
return getBitmap(url);
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1&Bitmap&() {
public void call(Bitmap bitmap) {
imageView.addImage(bitmap);
API介绍及使用
由于RxJava内容较多,学姐打算采用从基础到高级循序渐进的方式讲解。
RxJava很重要的思想就是观察者模式,对API的介绍也是根据这个模式划分。
(1)观察者(Observer、Subscriber)
Observer是一个接口,提供了3个方法:onNext(T t), onError(Throwable e), onCompleted()。
Subscriber是Observer的子类,class Subscriber&T& implements Observer&T&, Subscription。
Subscriber在Observer的基础上有如下扩展:
增加了onStart()。这个方法在观察者和被观察者建立订阅关系后,而被观察者向观察者发送消息前调用,主要用于做一些初始化工作,如数据的清零或重置。
增加了unsubscribe()。这个方法用于取消订阅,若isUnsubscribed()为true,则观察者不能收到被观察者的消息。
创建一个Observer:
Observer&String& observer = new Observer&String&() {
public void onCompleted() {
Log.d(TAG, "onCompleted");
public void onError(Throwable e) {
Log.d(TAG, "onError" + e);
public void onNext(String s) {
Log.d(TAG, "onNext -& " + s);
创建一个Subscriber:
Subscriber&String& subscriber = new Subscriber&String&() {
public void onCompleted() {
Log.d(TAG, "onCompleted");
public void onError(Throwable e) {
Log.d(TAG, "onError" + e);
public void onNext(String s) {
Log.d(TAG, "onNext -& " + s);
(2)被观察者(Observable)
Observable决定什么时候触发事件以及触发怎样的事件。常见的3种创建Observable的方式:
1 Observable.create(Observable.OnSubscribe)
Observable&String& observable = Observable.create(new Observable.OnSubscribe&String&() {
public void call(Subscriber&? super String& subscriber) {
subscriber.onNext("message 1");
subscriber.onNext("message 2");
subscriber.onCompleted();
当Observable与Observer/Subscriber建立订阅关系的时候,call()会被调用。
2 Observable.just(T...)
Observable&String& observable1 = Observable.just("message 1", "message 2");
这个方法可以传1-N个类型相同的参数,和上面的例子等价。最终也会调用Observeber/Subscriber的onNext("message 1"), onNext("message 2"), onCompleted()。
3 Observable.from(T[]), Observable.from(Iterable&? extends T&)
String[] array = {"message 1", "message 2"};
Observable&String& observable2 = Observable.from(array);
这个方法可以传数组或Iterable,和上面的例子等价。
(3)订阅(Subscription)
订阅关系建立有2种方式:1.Observable.subscribe(Subscriber); 2.Observable.subscribe(Action)
Observable和Observer/Subscriber通过Observable.subscribe(Subscriber)建立订阅关系,其内部实现抽取出来如下:
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
由源码可知,当订阅关系建立时,首先调用subscriber的onStart()方法,此处可进行一些初始化操作,如数据清零或重置。接着调用onSubscribe.call(subscriber),此处onSubscribe就是创建Observable时Observable.create(OnSubscribe)传入的OnSubscribe参数,说明Observable创建时传入的OnSubscribe的call()回调是在订阅关系建立后调用的。
Action这种方式,里面实现也还是用Subscriber进行了包装,本质上就是上面Subscriber的那种方式。只不过根据传入的参数不同回调的方法不同而已,下面代码分别调用Subscriber的onNext, onNext&onError, onNext&onError&onCompleted。
Subscription subscribe(final Action1&? super T& onNext)
Subscription subscribe(final Action1&? super T& onNext, final Action1&Throwable& onError)
Subscription subscribe(final Action1&? super T& onNext, final Action1&Throwable& onError, final Action0 onComplete)
这里学姐想讲讲Action,RxJava提供了Action0- Action9和ActionN,这里的数字表示参数的个数分别为0-9和N个。
关于Subscription这个接口,这个类提供了两个方法unsubscribe()和isUnsubscribed(),可以解除订阅关系和判断订阅关系。subscribe()订阅方法的返回值也是Subscription。
(4)场景示例
demo参考github
(1)线程(Scheduler)
Scheduler是RxJava的线程调度器,可以指定代码执行的线程。RxJava内置了几种线程:
AndroidSchedulers.mainThread() 主线程
Schedulers.immediate() 当前线程,即默认Scheduler
Schedulers.newThread() 启用新线程
Schedulers.io() IO线程,内部是一个数量无上限的线程池,可以进行文件、数据库和网络操作。
<putation() CPU计算用的线程,内部是一个数目固定为CPU核数的线程池,适合于CPU密集型计算,不能操作文件、数据库和网络。
subscribeOn()和observeOn()可以用来控制代码的执行线程。学姐学习这里的时候,很容易搞混这两个方法分别控制哪部分代码,其实咱们直接跑个demo就明白了。
Observable.create(new Observable.OnSubscribe&String&() {
public void call(Subscriber&? super String& subscriber) {
Log.d(TAG, "OnSubscribe.call Thread -& " + Thread.currentThread().getName());
subscriber.onNext("message");
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber&String&() {
public void onCompleted() {
public void onError(Throwable e) {
public void onNext(String s) {
Log.d(TAG, "Subscriber.onNext Thread -& " + Thread.currentThread().getName());
根据打印出的Log可以得出结论:
subscribeOn()指定OnSubscribe.call()的执行线程,即Observable通知Subscriber的线程;
observeOn()指定Subscriber回调的执行线程,即事件消费的线程。
(2)变换(map, flatMap)
RxJava提供了一个很牛逼的功能,可以对事件或事件序列进行变换,使之转换成不同的事件或事件序列。
有两个常用方法的方法支持变换:map()和flatMap()。
map为一对一变换。可以将一个对象转换成另一个对象,或者将对象数组的每单个对象转换成新的对象数组的每单个对象。
flatMap()为一对多变换。可以将一个对象转换成一组对象,或者将对象数组的每单个对象转换成新的对象数组的每单组对象。
以Person为例,一个Person对应一个身份证id,一个Person可以有多个Email。通过map()可以将Person转换成id,从而得到一个Person的身份证号码;通过flatMap()可以将 Person转换成一组Email,从而得到一个Person的所有Email。
示例如下:
* map: Person -& id(String)
* 打印某个人id
private void testMap0() {
Observable.just(getPersonArray()[0])
.map(new Func1&Person, String&() {
public String call(Person person) {
return person.
.subscribe(new Subscriber&String&() {
public void onCompleted() {
public void onError(Throwable e) {
public void onNext(String id) {
Log.d(TAG, "id -& " + id);
* map: array Person -& id(String)
* 打印每个人的id
private void testMap() {
Observable.from(getPersonArray())
.map(new Func1&Person, String&() {
public String call(Person person) {
return person.
.subscribe(new Subscriber&String&() {
public void onCompleted() {
public void onError(Throwable e) {
public void onNext(String id) {
Log.d(TAG, "id -& " + id);
* flatMap: array Person -& email数组(String[])
* 打印每个人的所有email
private void testFlatMap() {
Observable.from(getPersonArray())
.flatMap(new Func1&Person, Observable&Person.Email&&() {
public Observable&Person.Email& call(Person person) {
Log.d(TAG, "flatMap " + person.id);
return Observable.from(person.emails);
.subscribe(new Subscriber&Person.Email&() {
public void onCompleted() {
Log.d(TAG, "onCompleted");
public void onError(Throwable e) {
Log.d(TAG, "onError " + e.getMessage());
public void onNext(Person.Email email) {
Log.d(TAG, "onNext " + email.name);
学姐也是这周才开始学RxJava,对于一个全新的知识点,我认为还是以知道怎么使用和一些基础API的基本原理为主,对于更高级的应该是等到API使用较为熟练之后再去学习。因此这里也就不再阐述啦,不能一口吃成一个胖子。
RxJava与Retrofit组合
Retrofit是Square公司提供的一个类型安全的Http Client,由于Retrofit本身是支持RxJava的,因此这两者理所当然搭配使用。
先看下单独使用Retrofit进行网络操作的例子:
public interface GithubAPI {
@GET("/users/{user}")
public void getUserInfo(@Path("user") String user, Callback&UserInfo& callback);
private void fetchUserInfo() {
String username = mEditText.getText().toString();
getGithubAPI()
.getUserInfo(username, new Callback&UserInfo&() {
public void success(UserInfo userInfo, Response response) {
mTextView.setText(userInfo.email);
public void failure(RetrofitError error) {
mTextView.setText(error.getMessage());
单独使用Retrofit是需要有回调的,如果逻辑稍微在复杂点,可能又要在Callback里做很多事情,代码维护会很费劲。
下面看下Retrofit搭配RxJava使用的例子:
public interface GithubAPI {
@GET("/users/{user}")
public Observable&UserInfo& getUserInfo(@Path("user") String user);
private void fetchUserInfoRx() {
String username = mEditText.getText().toString();
getGithubAPI()
.getUserInfo(username);
.subscribe(new Observer&UserInfo&() {
public void onCompleted() {
public void onError(Throwable e) {
mTextView.setText(e.getMessage());
public void onNext(UserInfo userInfo) {
mTextView.setText(userInfo.email);
下面再看下多个操作的情况,比如username需要根据网络操作获取,然后才能通过username获取用户信息。
则Retrofit的代码是这样的:
public interface GithubAPI {
@GET("/username")
public void getUserName(Callback&String& callback);
@GET("/users/{user}")
public void getUserInfo(@Path("user") String user, Callback&UserInfo& callback);
private void fetchUserInfo() {
String username = mEditText.getText().toString();
getGithubAPI()
.getUserName(new Callback&String&() {//获取username
public void success(String username, Response response) {
/获取UserInfo
getUserInfo(username, new Callback&UserInfo&() {
public void success(UserInfo userInfo, Response response) {
mTextView.setText(userInfo.email);
public void failure(RetrofitError error) {
mTextView.setText(error.getMessage());
而Retrofit搭配RxJava使用的代码是这样的:
public interface GithubAPI {
@GET("/username")
public Observable&String& getUserName();
@GET("/users/{user}")
public Observable&UserInfo& getUserInfo(@Path("user") String user);
private void fetchUserInfoRx() {
String username = mEditText.getText().toString();
getGithubAPI()
.getUserName()
//获取username
.flatMap(new Func1&String, Observable&UserInfo&&() {
public Observable&UserInfo& onNext(String username) {
//获取UserInfo
return getGithubAPI().getUserInfo(username);
.subscribe(new Observer&UserInfo&() {
public void onCompleted() {
public void onError(Throwable e) {
mTextView.setText(e.getMessage());
public void onNext(UserInfo userInfo) {
mTextView.setText(userInfo.email);
有没有发现,使用RxJava结构更清晰明了。
补充下,使用Retrofit和RxJava是需要添加依赖的:
compile 'io.reactivex:rxjava:1.1.0'
compile 'io.reactivex:rxandroid:1.1.0'
compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'
compile 'com.squareup.retrofit:retrofit:1.9.0'
示例demo放到github上了,
1.RxJava最大特点是链式调用,使异步逻辑结构更清晰明了2.观察者模式:Obverable(被观察者), Observer/Subscriber(观察者), Subscription(订阅)3.真的很感谢,拜读他的文章学习的RxJava
微信公众号:学姐的IT专栏
热爱技术的Android程序媛,非典型高材生。
微信公众号“某学姐”
技术问答与分享社区talkcode.cc站长RxJava教程之生命周期管理 -解道Jdon
& & & &&& & &
  Rx主要的亮点是:它并不知道一系列事件什么时候发送或中断时,但是你能控制你什么时候开始接受这一系列事件或什么时候停止接受,这样,订阅者不再和原始事件系列发生源耦合,Rx能提给供你有关事件的各种订阅方式。你可以任意地订阅,也可以取消订阅,这种订阅到取消订阅之间的周期我们可以称为订阅的生命周期,RxJava提供各种灵活强大的订阅生命周期管理。
  我们看看Observable.subscribe几个重要方法参数:
Subscription
subscribe()
Subscription
subscribe(Action1&? super
T& onNext)
Subscription
subscribe(Action1&? super
T& onNext, Action1& java.lang .Throwable& onError)
Subscription
subscribe(Action1&? super
T& onNext, Action1& java.lang .Throwable& onError, Action0 onComplete)
Subscription
subscribe(Observer&? super
T& observer)
Subscription
subscribe(Subscriber&? super
T& subscriber)
上述各种subscribe()接口是消费使用各种推送的事件,但是不会执行任何动作action,具体真正action动作会作为参数提供给接口方法,如果你不提供任何action,事件实际上会被忽视,也就是不做任何处理。
Subject& Integer,
Integer& s = ReplaySubject.create();
s.subscribe(
v -& System.out.println(v),
e -& System.err.println(e));
s.onNext(0);
s.onError(new Exception(&Oops &));
java.lang.Exception: Oops
如果我们不提供错误处理的函数,那么在onError被调用时,OnErrorNotImplementedException 会被抛出。
  你也可以在一个系列事件中断前停止接受事件值,前面每个subscribe 方法返回都是一个Subscription实例,这是一个有两个方法的接口:
boolean isUnsubscribed()
void unsubscribe()
调用unsubscribe将会停止接受观察者向你发送的事件。
Subject& Integer,
values = ReplaySubject.create();
Subscription subscription = values.subscribe(
v -& System.out.println(v),
e -& System.err.println(e),
() -& System.out.println(&Done &)
values.onNext(0);
values.onNext(1);
subscription.unsubscribe();
values.onNext(2);
输出结果是两个数值:0和1
没有输出2数值,因为我们已经在发送数值2之前取消了订阅,一个观察者取消订阅并不影响对同一个被观察者的其他观察者的订阅接受情况。
onError 和 onCompleted
  这两个方法意味着中断一个事件系列,可观察者(被观察者)一旦在这两个方法以后就不再发射任何事件,尽管你的代码可能还会有发送事件:
Subject& Integer,
values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
v -& System.out.println(&First:
e -& System.out.println(&First:
() -& System.out.println(&Completed &)
values.onNext(0);
values.onNext(1);
values.onCompleted();
values.onNext(2);
上述代码尽管可观察者还在onCompleted以后发送事件数值2,但是运行时观察者并没有接受到。
  一个Subscription 将会和其使用的资源绑定,因此,你必须记得处理订阅的善后工作,,使用 这个静态工厂创建一个Subscription,可将Subscription与一个必需的资源绑定:
Subscription s = Subscriptions.create(() -& System.out.println(&Clean &));
s.unsubscribe();
输出结果: Clean
Subscriptions.create创建的订阅Subscription需要有一个取消订阅的动作,以便释放资源,这些方式有如下几种:
Subscriptions.empty()返回的一个订阅Subscription是不需要做善后工作的,当你需要一个订阅Subscription并不需要释放任何资源是有效。
Subscriptions.from(Subscription... subscriptions) 返回的订阅Subscription是当其结束时需要处理多个其他订阅的善后工作。
Subscriptions.unsubscribed() 返回一个已经取消订阅做好善后工作的订阅Subscription
订阅Subscription有如下几种实现子类:
BooleanSubscription
CompositeSubscription
MultipleAssignmentSubscription
RefCountSubscription
SafeSubscriber
Scheduler.Worker
SerializedSubscriber
SerialSubscription
Subscriber
TestSubscriber
It is interesting to note that Subscriber also implements Subscription. This means that we can also use a reference to a Subscriber to terminate a subscription.
请注意Subscriber也会实现接口Subscription,这意味着我们能使用一个指向Subscriber的引用来中断一个订阅。
| 网站地图 | 设为首页&nbsp&#8250&nbsp&nbsp&#8250&nbsp
RxJava2 浅析
原文地址:&Observable在RxJava1.x中,最熟悉的莫过于Observable这个类了,笔者刚使用RxJava2.x时,创建一个Observable后,顿时是懵逼的。因为我们熟悉的Subscriber居然没影了,取而代之的是ObservableEmitter,俗称发射器。此外,由于没有了Subscriber的踪影,我们创建观察者时需使用Observer。而Observer也不是我们熟悉的那个Observer,其回调的Disposable参数更是让人摸不到头脑。废话不多说,从会用开始,还记得使用RxJava的三部曲吗?&第一步:初始化一个Observable&&&&&&&Observable&Integer&&observable=Observable.create(new&ObservableOnSubscribe&Integer&()&{
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&subscribe(ObservableEmitter&Integer&&e)&throws&Exception&{
&&&&&&&&&&&&&&&&e.onNext(1);
&&&&&&&&&&&&&&&&e.onNext(2);
&&&&&&&&&&&&&&&&e.onComplete();
&&&&&&&&&&&&}
&&&&&&&&});第二步:初始化一个Observer
&&&&&&&&Observer&Integer&&observer=&new&Observer&Integer&()&{
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&onSubscribe(Disposable&d)&{
&&&&&&&&&&&&}
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&onNext(Integer&value)&{
&&&&&&&&&&&&}
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&onError(Throwable&e)&{
&&&&&&&&&&&&}
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&onComplete()&{
&&&&&&&&&&&&}
&&&&&&&&}第三部:建立订阅关系&&&&observable.subscribe(observer);&//建立订阅关系不难看出,与RxJava1.x还是存在着一些区别的。首先,创建Observable时,回调的是ObservableEmitter,字面意思即发射器,用于发射数据(onNext)和通知(onError/onComplete)。其次,创建的Observer中多了一个回调方法onSubscribe,传递参数为Disposable&,Disposable相当于RxJava1.x中的Subscription,用于解除订阅。你可能纳闷为什么不像RxJava1.x中订阅时返回Disposable,而是选择回调出来呢。官方说是为了设计成Reactive-Streams架构。不过仔细想想这么一个场景还是很有用的,假设Observer需要在接收到异常数据项时解除订阅,在RxJava2.x中则非常简便,如下操作即可。&&Observer&Integer&&observer&=&new&Observer&Integer&()&{
&&&&&&&&&&&&private&Disposable&
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&onSubscribe(Disposable&d)&{
&&&&&&&&&&&&&&&&disposable&=&d;
&&&&&&&&&&&&}
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&onNext(Integer&value)&{
&&&&&&&&&&&&&&&&Log.d(&JG&,&value.toString());
&&&&&&&&&&&&&&&&if&(value&&&3)&{&&&//&&3&时为异常数据,解除订阅
&&&&&&&&&&&&&&&&&&&&disposable.dispose();
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&}
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&onError(Throwable&e)&{
&&&&&&&&&&&&}
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&onComplete()&{
&&&&&&&&&&&&}
&&&&&&&&};此外,RxJava2.x中仍然保留了其他简化订阅方法,我们可以根据需求,选择相应的简化订阅。只不过传入的对象改为了Consumer。`&&&Disposable&disposable&=&observable.subscribe(new&Consumer&Integer&()&{
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&accept(Integer&integer)&throws&Exception&{
&&&&&&&&&&&&&&&&&&//这里接收数据项
&&&&&&&&&&&&}
&&&&&&&&},&new&Consumer&Throwable&()&{
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&accept(Throwable&throwable)&throws&Exception&{
&&&&&&&&&&&&&&//这里接收onError
&&&&&&&&&&&&}
&&&&&&&&},&new&Action()&{
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&run()&throws&Exception&{
&&&&&&&&&&&&&&//这里接收onComplete。
&&&&&&&&&&&&}
&&&&&&&&});不同于RxJava1.x,RxJava2.x中没有了一系列的Action/Func接口,取而代之的是与Java8命名类似的函数式接口,如下图:&其中Action类似于RxJava1.x中的Action0,区别在于Action允许抛出异常。public&interface&Action&{
&&&&&*&Runs&the&action&and&optionally&throws&a&checked&exception
&&&&&*&@throws&Exception&if&the&implementation&wishes&to&throw&a&checked&exception
&&&&void&run()&throws&E
}而Consumer即消费者,用于接收单个值,BiConsumer则是接收两个值,Function用于变换对象,Predicate用于判断。这些接口命名大多参照了Java8,熟悉Java8新特性的应该都知道意思,这里也就不再赘述了。线程调度关于线程切换这点,RxJava1.x和RxJava2.x的实现思路是一样的。这里就简单看下相关源码。subscribeOn同RxJava1.x一样,subscribeOn用于指定subscribe()时所发生的线程,从源码角度可以看出,内部线程调度是通过ObservableSubscribeOn来实现的。&&&public&final&Observable&T&&subscribeOn(Scheduler&scheduler)&{
&&&&&&&&ObjectHelper.requireNonNull(scheduler,&&scheduler&is&null&);
&&&&&&&&return&RxJavaPlugins.onAssembly(new&ObservableSubscribeOn&T&(this,&scheduler));
&&&&}ObservableSubscribeOn的核心源码在subscribeActual方法中,通过代理的方式使用SubscribeOnObserver包装Observer后,设置Disposable来将subscribe切换到Scheduler线程中&&&&@Override
&&&&public&void&subscribeActual(final&Observer&?&super&T&&s)&{
&&&&&&&&final&SubscribeOnObserver&T&&parent&=&new&SubscribeOnObserver&T&(s);
&&&&&&&&s.onSubscribe(parent);&//回调Disposable
&&&&&&&&parent.setDisposable(scheduler.scheduleDirect(new&Runnable()&{&//设置`Disposable`
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&run()&{
&&&&&&&&&&&&&&&&source.subscribe(parent);&//使Observable的subscribe发生在Scheduler线程中
&&&&&&&&&&&&}
&&&&&&&&}));
&&&&}observeOnobserveOn方法用于指定下游Observer回调发生的线程。&&&&public&final&Observable&T&&observeOn(Scheduler&scheduler,&boolean&delayError,&int&bufferSize)&{
&&&&&&&&&//..
&&&&&&&&&//验证安全
&&&&&&&&return&RxJavaPlugins.onAssembly(new&ObservableObserveOn&T&(this,&scheduler,&delayError,&bufferSize));
&&&&}主要实现在ObservableObserveOn中的subscribeActual,可以看出,不同于subscribeOn,没有将suncribe操作全部切换到Scheduler中,而是通过ObserveOnSubscriber与Scheduler配合,通过schedule()达到切换下游Observer回调发生的线程,这一点与RxJava1.x实现几乎相同。关于ObserveOnSubscriber的源码这里不再重复描述了,有兴趣的可以查看本人这篇文章&&&&@Override
&&&&protected&void&subscribeActual(Observer&?&super&T&&observer)&{
&&&&&&&&if&(scheduler&instanceof&TrampolineScheduler)&{
&&&&&&&&&&&&source.subscribe(observer);
&&&&&&&&}&else&{
&&&&&&&&&&&&Scheduler.Worker&w&=&scheduler.createWorker();
&&&&&&&&&&&&source.subscribe(new&ObserveOnSubscriber&T&(observer,&w,&delayError,&bufferSize));
&&&&}FlowableFlowable是RxJava2.x中新增的类,专门用于应对背压(Backpressure)问题,但这并不是RxJava2.x中新引入的概念。所谓背压,即生产者的速度大于消费者的速度带来的问题,比如在中常见的点击事件,点击过快则会造成点击两次的效果。&我们知道,在RxJava1.x中背压控制是由Observable完成的,使用如下:&&Observable.range(1,10000)
&&&&&&&&&&&&.onBackpressureDrop()
&&&&&&&&&&&&.subscribe(integer&-&&Log.d(&JG&,integer.toString()));而在RxJava2.x中将其独立了出来,取名为Flowable。因此,原先的Observable已经不具备背压处理能力。&通过Flowable我们可以自定义背压处理策略。&测试Flowable例子如下:&&Flowable.create(new&FlowableOnSubscribe&Integer&()&{
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&subscribe(FlowableEmitter&Integer&&e)&throws&Exception&{
&&&&&&&&&&&&&&&&for(int&i=0;i&10000;i++){
&&&&&&&&&&&&&&&&&&&&e.onNext(i);
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&&&&&e.onComplete();
&&&&&&&&&&&&}
&&&&&&&&},&FlowableEmitter.BackpressureMode.ERROR)&//指定背压处理策略,抛出异常
&&&&&&&&&&&&&&&&.putation())
&&&&&&&&&&&&&&&&.observeOn(Schedulers.newThread())
&&&&&&&&&&&&&&&&.subscribe(new&Consumer&Integer&()&{
&&&&&&&&&&&&&&&&&&&&@Override
&&&&&&&&&&&&&&&&&&&&public&void&accept(Integer&integer)&throws&Exception&{
&&&&&&&&&&&&&&&&&&&&&&&&Log.d(&JG&,&integer.toString());
&&&&&&&&&&&&&&&&&&&&&&&&Thread.sleep(1000);
&&&&&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&&&&&},&new&Consumer&Throwable&()&{
&&&&&&&&&&&&&&&&&&&&@Override
&&&&&&&&&&&&&&&&&&&&public&void&accept(Throwable&throwable)&throws&Exception&{
&&&&&&&&&&&&&&&&&&&&&&&&Log.d(&JG&,throwable.toString());
&&&&&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&&&&&});或者可以使用类似RxJava1.x的方式来控制。&&Flowable.range(1,10000)
&&&&&&&&&&&&&&&&.onBackpressureDrop()
&&&&&&&&&&&&&&&&.subscribe(integer&-&&Log.d(&JG&,integer.toString()));其中还需要注意的一点在于,Flowable并不是订阅就开始发送数据,而是需等到执行Subscription#request才能开始发送数据。当然,使用简化subscribe订阅方法会默认指定Long.MAX_VALUE。手动指定的例子如下:
&&&&&&&&Flowable.range(1,10).subscribe(new&Subscriber&Integer&()&{
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&onSubscribe(Subscription&s)&{
&&&&&&&&&&&&&&&&s.request(Long.MAX_VALUE);//设置请求数
&&&&&&&&&&&&}
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&onNext(Integer&integer)&{
&&&&&&&&&&&&}
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&onError(Throwable&t)&{
&&&&&&&&&&&&}
&&&&&&&&&&&&@Override
&&&&&&&&&&&&public&void&onComplete()&{
&&&&&&&&&&&&}
&&&&&&&&});Single不同于RxJava1.x中的SingleSubscriber,RxJava2中的SingleObserver多了一个回调方法onSubscribe。interface&SingleObserver&T&&{
&&&&void&onSubscribe(Disposable&d);
&&&&void&onSuccess(T&value);
&&&&void&onError(Throwable&error);
}Completable同Single,Completable也被重新设计为Reactive-Streams架构,RxJava1.x的CompletableSubscriber改为CompletableObserver,源码如下:interface&CompletableObserver&T&&{
&&&&void&onSubscribe(Disposable&d);
&&&&void&onComplete();
&&&&void&onError(Throwable&error);
}Subject/ProcessorProcessor和Subject的作用是相同的。关于Subject部分,RxJava1.x与RxJava2.x在用法上没有显著区别,这里就不介绍了。其中Processor是RxJava2.x新增的,继承自Flowable,所以支持背压控制。而Subject则不支持背压控制。使用如下:&&&&&&&&//Subject
&&&&&&&&AsyncSubject&String&&subject&=&AsyncSubject.create();
&&&&&&&&subject.subscribe(o&-&&Log.d(&JG&,o));//three
&&&&&&&&subject.onNext(&one&);
&&&&&&&&subject.onNext(&two&);
&&&&&&&&subject.onNext(&three&);
&&&&&&&&subject.onComplete();
&&&&&&&//Processor
&&&&&&&&AsyncProcessor&String&&processor&=&AsyncProcessor.create();
&&&&&&&&processor.subscribe(o&-&&Log.d(&JG&,o));&//three
&&&&&&&&processor.onNext(&one&);
&&&&&&&&processor.onNext(&two&);
&&&&&&&&processor.onNext(&three&);
&&&&&&&&processor.onComplete();操作符关于操作符,RxJava1.x与RxJava2.x在命名和行为上大多数保持了一致,部分操作符请查阅文档。最后RxJava1.x 如何平滑升级到RxJava2.x?&由于RxJava2.x变化较大无法直接升级,幸运的是,官方提供了RxJava2Interop这个库,可以方便地将RxJava1.x升级到RxJava2.x,或者将RxJava2.x转回RxJava1.x。地址:
上一篇: 先看效果: 看到这个效果很多人可能会说 github上很多开源的呀。干嘛还要造轮子呢。github确实有很多这样效果的项目。用起来也都不错。可我偏偏就是造轮子了。。。最主要的原因是最近想优化app的渲染性能。想来app当中最主要的特性就是图片多和文字。图片优化
下一篇: It’s parfettitime! 实际名称是 Confetti ,但是的一个朋友认为parfetti是一个更好的名字。 在紧张开发 Robinhood Gold 之余,由于要等待设计等最终定稿,所以我有时间去弄我将用在Robinhood的第二个开源项目。今天很高兴宣布发布出来给大家试试。 额, 它是

我要回帖

更多关于 禁韩令什么时候解除 的文章

 

随机推荐