Android -- RxAndroid 2.0 学习笔记
Rxjava 2.x正式版出来已经快两个月了。在之前的项目中也在使用Rx。但却一直没有时间对整个的知识进行梳理,恰好今天抽出时间,也系统的再学习一遍RxJava/RxAndroid
RxJava的使用
一、观察者/被观察者
1、前奏:
在观察者之前就要先提下backpressure这个概念。简单来说,backpressure是在异步场景中,被观察者发送事件速度远快于观察者的处理速度时,告诉被观察者降低发送速度的策略。
2、在2.0中有以下几种观察者
- Observable/Observer
- Flowable/Subscriber
- Single/SingleObserver
- Completable/CompletableObserver
- Maybe/MaybeObserver
依次的来看一下:
Observable
1 | Observable |
这里要提的就是onSubscribe(Disposable d),disposable用于取消订阅。
就用简单的just这个操作符来分析一下。
1 | @SuppressWarnings("unchecked") |
1 | (SchedulerSupport.NONE) |
1 |
|
just实际调用了
1 |
|
LambdaSubscriber 瞅瞅
@Override
public void dispose() {
cancel();
}
@Override
public boolean isDisposed() {
return get() == SubscriptionHelper.CANCELLED;
}
1 |
|
Flowable
.just(1, 2, 3, 4)
.subscribe(new Subscriber < Integer > () {
@Override public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override public void onNext(Integer integer) {}
@Override public void onError(Throwable t) {}
@Override public void onComplete() {}
});
1 | ```onSubscribe``` 这个回调传出了一个Subscription, 我们要指定他传出数据的大小, 调用他的```request()``` 方法。如没有要求可以传入一个Long的最大数值```Long.MAX_VALUE```。 |
@Override void slowPath(long r) {
long e = 0;
T[] arr = array;
int f = arr.length;
int i = index;
Subscriber < ?super T > a = actual;
for (;;) {
while (e != r && i != f) {
if (cancelled) {
return;
}
T t = arr[i];
if (t == null) {
a.onError(new NullPointerException(“array element is null”));
return;
} else {
a.onNext(t);
}
e++;
i++;
}
if (i == f) {
if (!cancelled) {
a.onComplete();
}
return;
}
r = get();
if (e == r) {
index = i;
r = addAndGet( - e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
}
1 | 需要if (i == f) f 是这个数据的大小,i是当前发送数据的个数,所以不会调用onComplete |
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}
));
1 | 在这个过程中,会把source也就是ObservableSource在线程中订阅,同时也把把传入的Observer变成SubscribeOnObserver。若指定的是io线程,可以在```IoScheduler```中看见对线程的管理 |
public interface MaybeSource {
void subscribe(MaybeObserver observer);
}
1 | 2、创建一个MaybeObserver, 这就是最后绑定的时候的接口 |
public interface MaybeObserver {
void onSuccess(int value);
}
1 | 3、创建Function, 这个在操作符中用于实现 |
public interface Function {
int apply(int t);
}
1 | 4、当然少不了Maybe, 这里就实现just和map两个方法吧 |
public abstract class Maybe implements MaybeSource {
public static Maybe just(int item) {
return new MaybeJust(item);
}
public final Maybe map(Function mapper) {
return new MaybeMap(this, mapper);
}
}
1 | 5、just实际返回的对象是MaybeJust,他的父类是Maybe |
public class MaybeJust extends Maybe {
final int value;
public MaybeJust(int value) {
this.value = value;
}
@Override
public void subscribe(MaybeObserver observer) {
observer.onSuccess(value);
}
}
1 |
|
public class MaybeMap extends Maybe {
final Function mapper;
final MaybeSource source;
public MaybeMap(MaybeSource source, Function mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
public void subscribe(MaybeObserver observer) {
source.subscribe(new MapMaybeObserver(observer, mapper));
}
static final class MapMaybeObserver implements MaybeObserver {
final MaybeObserver actual;
final Function mapper;
MapMaybeObserver(MaybeObserver actual, Function mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onSuccess(int value) {
this.actual.onSuccess(this.mapper.apply(value));
}
}
}
1 | 7、在main中可以这么运行 |
Maybe
.just(1)
.map(new Function() {
@Override
public int apply(int t) {
return t + 1;
}
}).map(new Function() {
@Override
public int apply(int t) {
return t * 4;
}
}).subscribe(new MaybeObserver() {
@Override
public void onSuccess(int value) {
System.out.println(value);
}
});
1 | 8、运行结果,传入1,先+1, 在 * 4,最后结果应该是8 |
@GET(“/“)
Flowable
1 |
|
public <T> void addSubscription(Flowable flowable,
final RxSubscriber<T> subscriber) {
if (mCompositeDisposable == null) {
mCompositeDisposable = new CompositeDisposable();
}
if (subscriber == null) {
Log.e(TAG, "rx callback is null");
return;
}
Disposable disposable = flowable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<T>() {
@Override
public void accept(T o) throws Exception {
subscriber.onNext(o);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable)
throws Exception {
subscriber.onError(throwable);
}
},
new Action() {
@Override
public void run() throws Exception {
subscriber.onComplete();
}
});
`
此外,之前的项目后台接口也是奇葩,同一个人写的接口,接口的返回格式更是多种多样,还不改,没办法,客户端只能将就着服务端,谁叫我们是新来的呢。遇到这种问题,就不直接转成对象格式了,先转成ResponseBody得到Body,再拿出string来。
okhttp中response的body对象就是这个ResponseBody,他的string() 方法就可以获得整个body,然后再做json解析吧