代做课程设计的网站挖掘关键词工具
参考:教你轻松理解Rxjava之线程切换流程(observeOn与subscribeOn)
observable.subscribeOn(Schedulers.newThread())//多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略..observeOn(AndroidSchedulers.mainThread())//每调用一次observeOn() , 下游的线程就会切换一次..subscribe(consumer);
订阅,向上走的一个过程
数据流,向下走的一个过程
1:当我们调用了subscribe的发起订阅
2:先向上走,我只需要关心subscribeOn和订阅的操作符
3:再向下走,我只需要关心observeOn和数据流的操作符
- subscribeOn
指定的是Observable自身在哪个调度器上执行,而且跟调用的位置没有关系。
- observeOn
指定一个观察者在哪个调度器上观察这个Observable,当每次调用了ObservableOn这个操作符时,之后都会在选择的调度器上进行观察,直到再次调用ObservableOn切换了调度器。
多次使用 subscribeOn
//subscribeOn : 切换订阅线程,只有第一次切换有效
Observable.just("source").map(new Function<String, String>() {@Overridepublic String apply(String s) {System.out.println("map1 thread========"+ Thread.currentThread().getName());return "map1:" + s;}}).subscribeOn(Schedulers.newThread()) //第一次切换.map(new Function<String, String>() {@Overridepublic String apply(String s) {System.out.println("map2 thread========"+ Thread.currentThread().getName());return "map2:" + s;}}).subscribeOn(Schedulers.io()) //第二次切换.map(new Function<String, String>() {@Overridepublic String apply(String s) {System.out.println("map3 thread========"+ Thread.currentThread().getName());return "map3:" + s;}}).subscribeOn(Schedulers.computation()) //第三次切换.map(new Function<String, String>() {@Overridepublic String apply(String s) {System.out.println("map4 thread========"+ Thread.currentThread().getName());return "map4:" + s;}}).subscribeOn(Schedulers.single()) //第四次切换.subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("onSubscribe thread========"+ Thread.currentThread().getName());}@Overridepublic void onNext(String s) {System.out.println("onNext thread========"+ Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {System.out.println("onError thread========"+ Thread.currentThread().getName());}@Overridepublic void onComplete() {System.out.println("onComplete thread========"+ Thread.currentThread().getName());}});//onSubscribe thread========main
//map1 thread========RxNewThreadScheduler-1
//map2 thread========RxNewThreadScheduler-1
//map3 thread========RxNewThreadScheduler-1
//map4 thread========RxNewThreadScheduler-1
//onNext thread========RxNewThreadScheduler-1
//onComplete thread========RxNewThreadScheduler-1
多次使用 observeOn
//observeOn : 切换后续操作的线程,可多次操作
Observable.just("source").map(new Function<String, String>() {@Overridepublic String apply(String s) {System.out.println("map1 thread========"+ Thread.currentThread().getName());return "map1:" + s;}}).observeOn(Schedulers.newThread()) //第一次切换.map(new Function<String, String>() {@Overridepublic String apply(String s) {System.out.println("map2 thread========"+ Thread.currentThread().getName());return "map2:" + s;}}).observeOn(Schedulers.io()) //第二次切换.map(new Function<String, String>() {@Overridepublic String apply(String s) {System.out.println("map3 thread========"+ Thread.currentThread().getName());return "map3:" + s;}}).observeOn(Schedulers.computation()) //第三次切换.map(new Function<String, String>() {@Overridepublic String apply(String s) {System.out.println("map4 thread========"+ Thread.currentThread().getName());return "map4:" + s;}}).observeOn(Schedulers.single()) //第四次切换.subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("onSubscribe thread========"+ Thread.currentThread().getName());//main}@Overridepublic void onNext(String s) {System.out.println("onNext thread========"+ Thread.currentThread().getName());//RxSingleScheduler-1}@Overridepublic void onError(Throwable e) {System.out.println("onError thread========"+ Thread.currentThread().getName());}@Overridepublic void onComplete() {System.out.println("onComplete thread========"+ Thread.currentThread().getName());//RxSingleScheduler-1}});//onSubscribe thread========main
//map1 thread========main
//map2 thread========RxNewThreadScheduler-1
//map3 thread========RxCachedThreadScheduler-1
//map4 thread========RxComputationThreadPool-1
//onNext thread========RxSingleScheduler-1
//onComplete thread========RxSingleScheduler-1
多次使用 subscribeOn 和 observeOn示例一:
Observable.just("source").map(new Function<String, String>() {@Overridepublic String apply(String s) {System.out.println("map1 thread========"+ Thread.currentThread().getName());return "map1:" + s; //RxSingleScheduler-1}}).observeOn(Schedulers.newThread()) //第一次切换.subscribeOn(Schedulers.single()) //第一次切换.map(new Function<String, String>() {@Overridepublic String apply(String s) {System.out.println("map2 thread========"+ Thread.currentThread().getName());return "map2:" + s; //RxNewThreadScheduler-2}}).subscribeOn(Schedulers.newThread()) //第二次切换.observeOn(Schedulers.io()) //第二次切换.map(new Function<String, String>() {@Overridepublic String apply(String s) {System.out.println("map3 thread========"+ Thread.currentThread().getName());return "map3:" + s; //RxCachedThreadScheduler-2}}).observeOn(Schedulers.computation()) //第三次切换.subscribeOn(Schedulers.io()) //第三次切换.map(new Function<String, String>() {@Overridepublic String apply(String s) {System.out.println("map4 thread========"+ Thread.currentThread().getName());return "map4:" + s; //RxComputationThreadPool-2}}).subscribeOn(Schedulers.computation()) //第四次切换.observeOn(Schedulers.single()) //第四次切换.subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("onSubscribe thread========"+ Thread.currentThread().getName());//main}@Overridepublic void onNext(String s) {System.out.println("onNext thread========"+ Thread.currentThread().getName());//RxSingleScheduler-1}@Overridepublic void onError(Throwable e) {System.out.println("onError thread========"+ Thread.currentThread().getName());}@Overridepublic void onComplete() {System.out.println("onComplete thread========"+ Thread.currentThread().getName());//RxSingleScheduler-1}});//onSubscribe thread========main
//map1 thread========RxSingleScheduler-1
//map2 thread========RxNewThreadScheduler-2
//map3 thread========RxCachedThreadScheduler-2
//map4 thread========RxComputationThreadPool-2
//onNext thread========RxSingleScheduler-1
//onComplete thread========RxSingleScheduler-1
多次使用 subscribeOn 和 observeOn示例二:
Observable.just(1).doOnNext(getConsumer(1))//RxCachedThreadScheduler-1.doOnSubscribe(getConsumerDisposable(1))//RxCachedThreadScheduler-1.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).doOnSubscribe(getConsumerDisposable(2))//RxNewThreadScheduler-1.doOnNext(getConsumer(2))//RxNewThreadScheduler-2.observeOn(Schedulers.computation()).subscribeOn(Schedulers.newThread()).doOnSubscribe(getConsumerDisposable(3))//RxSingleScheduler-1.doOnNext(getConsumer(3))//RxComputationThreadPool-1.subscribeOn(Schedulers.single()).doOnSubscribe(getConsumerDisposable(4))//main.observeOn(Schedulers.io()).subscribe(getObserver());//RxCachedThreadScheduler-2//4--accept Disposable thread========main
//onSubscribe thread========main
//3--accept Disposable thread========RxSingleScheduler-1
//2--accept Disposable thread========RxNewThreadScheduler-1
//1--accept Disposable thread========RxCachedThreadScheduler-1
//1--accept o=============1
//1--accept thread========RxCachedThreadScheduler-1
//2--accept o=============1
//2--accept thread========RxNewThreadScheduler-2
//3--accept o=============1
//3--accept thread========RxComputationThreadPool-1
//onNext o=============1
//onNext thread========RxCachedThreadScheduler-2
//onComplete thread========RxCachedThreadScheduler-2private Observer<Integer> getObserver() {return new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("onSubscribe thread========" + Thread.currentThread().getName());}@Overridepublic void onNext(Integer o) {System.out.println("onNext o=============" + o);System.out.println("onNext thread========" + Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {System.out.println("onError e=============" + e.getMessage());System.out.println("onError thread========" + Thread.currentThread().getName());}@Overridepublic void onComplete() {System.out.println("onComplete thread========" + Thread.currentThread().getName());}};
}private Consumer<Integer> getConsumer(final int i) {return new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println(i + "--accept o=============" + integer);System.out.println(i + "--accept thread========"+ Thread.currentThread().getName());}};
}