RxJava中多种场景的实现总结

 更新时间:2016年10月08日 11:01:38   作者:lluo2010  
这篇文章给大家详细介绍了RxJava中多种场景的实现,对大家学习使用RxJava具有一定的参考借鉴价值,有需要的朋友们可以参考学习,下面来一起看看吧。

一、推迟执行动作

可以使用timer+map方法实现.代码如下:

Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{
   return doSomething();
  }).subscribe(System.out::println);
 }

二、推迟发送执行的结果

这种场景要求产生数据的动作是马上执行,但是结果推迟发送.这和上面场景的是不一样的.

这种场景可以使用Observable.zip来实现.

zip操作符将多个Observable发射的数据按顺序组合起来,每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发射数据最少的Observable来决定。

对于各个observable相同位置的数据,需要相互等待,也就说,第一个observable第一个位置的数据产生后,要等待第二个observable第一个位置的数据产生,等各个Observable相同位置的数据都产生后,才能按指定规则进行组合.这真是我们要利用的.

zip有很多种声明,但大致上是一样的,就是传入几个observable,然后指定一个规则,对每个observable对应位置的数据进行处理,产生一个新的数据, 下面是其中一个最简单的:

 public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);

用zip实现推送发送执行结果如下:

 Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS)
         ,Observable.just(doSomething()), (x,y)->y)
   .subscribe(System.out::println));

三、使用defer在指定线程里执行某种动作

如下面的代码,虽然我们指定了线程的运行方式,但是doSomething()这个函数还是在当前代码调用的线程中执行的.

 Observable.just(doSomething())
     .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.computation())
     .subscribe(v->Utils.printlnWithThread(v.toString()););

通常我们采用下面的方法达到目的:

 Observable.create(s->{s.onNext(doSomething());})
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe(v->{
     Utils.printlnWithThread(v.toString());
  });

但其实我们采用defer也能达到相同的目的.

关于defer

defer 操作符与create、just、from等操作符一样,是创建类操作符,不过所有与该操作符相关的数据都是在订阅是才生效的。

声明:

 public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);

defer的Func0里的Observable是在订阅(subscribe)的时候才创建的.

作用:

Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.

也就说observable是在订阅的时候才创建的.

上面的问题用defer实现:

 Observable.defer(()->Observable.just(doSomething()))
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe(v->{Utils.printlnWithThread(v.toString());
  });

四、使用compose不要打断链式结构

我们经常看到下面的代码:

 Observable.just(doSomething())
    .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.computation())
    .subscribe(v->{Utils.printlnWithThread(v.toString());

上面的代码中,subscribeOn(xxx).observeOn(xxx)可能在很多地方都是一样的, 如果我们打算把它统一在某一个地方实现, 我们可以这么写:

 private static <T> Observable<T> applySchedulers(Observable<T> observable) {
  return observable.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation());
 }

但是这样每次我们需要调用上面的方法, 大致会像下面这样,最外面是一个函数,等于打破了链接结构:

 applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() {
   @Override public Data call(Data data) {
   return manipulate(data);
   }
  })
 ).subscribe(new Action1<Data>() {
  @Override public void call(Data data) {
  doSomething(data);
  }
 });

可以使用compose操作符达到不打破链接结构的目的.

compose的申明如下:

 public Observable compose(Transformer<? super T, ? extends R> transformer);

它的入参是一个Transformer接口,输出是一个Observable. 而Transformer实际上就是一个Func1<Observable<T>, Observable<R>> ,换言之就是:可以通过它将一种类型的Observable转换成另一种类型的Observable.

简单的说,compose可以通过指定的转化方式(输入参数transformer),将原来的observable转化为另外一种Observable.

通过compose, 采用下面方式指定线程方式:

 private static <T> Transformer<T, T> applySchedulers() {
   return new Transformer<T, T>() {
    @Override
    public Observable<T> call(Observable<T> observable) {
     return observable.subscribeOn(Schedulers.io())
       .observeOn(Schedulers.computation());
    }
   };
  }

 Observable.just(doSomething()).compose(applySchedulers())
    .subscribe(v->{Utils.printlnWithThread(v.toString());
   });

函数applySchedulers可以使用lambda表达式进一步简化为下面为:

 private static <T> Transformer<T, T> applySchedulers() { 
  return observable->observable.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation());
 }

五、按优先级使用不同的执行结果

上面这个标题估计没表达清楚我想表达的场景. 其实我想表达的场景类似于平常的获取网络数据场景:如果缓存有,从缓存获取,如果没有,再从网络获取.

这里要求,如果缓存有,不会做从网络获取数据的动作.

这个可以采用concat+first实现.

concat将几个Observable合并成一个Observable,返回最终的一个Observable. 而那些数据就像从一个Observable发出来一样. 参数可以是多个Observable,也可以是包含Observalbe的Iterator.

新的observable内的数据排列按原来concat里的observable顺序排列,即新结果内的数据是按原来的顺序排序的.

下面是上述需求的实现:

 Observable.concat(getDataFromCache(),getDataFromNetwork()).first()
    .subscribe(v->System.out.println("result:"+v));
 //从缓存获取数据
 private static Observable<String> getDataFromCache(){
  return Observable.create(s -> {
   //dosomething to get data
   int value = new Random().nextInt();
   value = value%2;
   if (value!=0){
    s.onNext("data from cache:"+value); //产生数据
   }
   //s.onError(new Throwable("none"));
   s.onCompleted();
  }
    );
 }
 //从网络获取数据
 private static Observable<String> getDataFromNetwork(){
  return Observable.create(s -> {
   for (int i = 0; i < 10; i++) {
    Utils.println("obs2 generate "+i);
    s.onNext("data from network:" + i); //产生数据
   }
   s.onCompleted();
  }
    );
 }

上面的实现,如果getDataFromCache有数据, getDataFromNetwork这里的代码是不会执行的, 这正是我们想要的.

上面实现有几个需要注意:

    1、有可能从两个地方都获取不到数据, 这种场景下使用first会抛出异常NoSuchElementException,如果是这样的场景,需要用firstOrDefault替换上面的first.

    2、上面getDataFromCache()里,如果没有数据,我们直接调用onCompleted,如果不调用onCompleted,而是调用onError,则上述采用concat是得不到任何结果的.因为concat在收到任何一个error,合并就会停止.所以,如果要用onError, 则需要用concatDelayError替代concat.concatDelayError会先忽略error,将error推迟到最后在处理.

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流。

相关文章

  • Java文件操作之IO流 File类的使用详解

    Java文件操作之IO流 File类的使用详解

    在java中提供有对于文件操作系统的支持,这个支持在java.io.File类中进行了定义,也就是说在整个java.io包中File类是唯一一个与文件本身操作有关的类(创建,删除,重命名)有关的类,而如果想要进行File类的操作,我们需要提供有完整的路径支持,而后可以调用相应的方法进行处理
    2021-09-09
  • SpringSecurity默认登录页的使用示例教程

    SpringSecurity默认登录页的使用示例教程

    Spring 是非常流行和成功的 Java 应用开发框架,Spring Security 正是 Spring 家族中的成员,Spring Security 基于 Spring 框架,提供了一套 Web 应用安全性的完整解决方案,本文给大家介绍SpringSecurity的默认登录页的使用教程,感兴趣的朋友一起看看吧
    2023-12-12
  • java后端实现信息分页查询的示例代码

    java后端实现信息分页查询的示例代码

    在一个页面展示大量的用户信息不便于观看,因此就需要采用分页展示的方法,本文就来为大家介绍一下java后端如何实现信息分页查询,需要的小伙伴可以参考下
    2023-11-11
  • JAVA中反射机制和模块化的深入讲解

    JAVA中反射机制和模块化的深入讲解

    很多刚学Java反射的同学可能对反射技术一头雾水,为什么要学习反射,学习反射有什么作用,下面这篇文章主要给大家介绍了关于JAVA中反射机制和模块化的相关资料,需要的朋友可以参考下
    2021-09-09
  • Java中的内存分配图解

    Java中的内存分配图解

    这篇文章主要介绍了Java中的内存分配图解,Java 程序运行时,需要在内存中分配空间。为了提高运算效率,就对空间进行了不同区域的划分,因为每一片区域都有特定的处理数据方式和内存管理方式,需要的朋友可以参考下
    2023-08-08
  • maven仓库中心mirrors配置多个下载中心(执行最快的镜像)

    maven仓库中心mirrors配置多个下载中心(执行最快的镜像)

    这篇文章主要介绍了maven仓库中心mirrors配置多个下载中心(执行最快的镜像),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • Java多线程与线程池技术分享

    Java多线程与线程池技术分享

    这篇文章主要介绍了Java多线程与线程池技术分享,线程池本质是池化技术的应用,和连接池类似,创建连接与关闭连接属于耗时操作,下文相关介绍需要的小伙伴可以参考一下
    2022-03-03
  • Java实现鼠标拖拽移动界面组件

    Java实现鼠标拖拽移动界面组件

    在Java中,Frame或者JFrame自身已经实现了鼠标拖拽标题栏移动窗口的功能。但是Jframe的样式实在无法令人满意,那你又该怎么实现鼠标拖拽移动窗口的目的呢?今天我们来探讨下
    2014-09-09
  • Java获取CPU和内存占用率最简单的方法

    Java获取CPU和内存占用率最简单的方法

    这篇文章主要介绍了Java获取CPU和内存占用率最简单的方法,文中通过代码示例和图文结合的方式给大家讲解的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2024-06-06
  • Java随机生成姓名,手机号,住址代码示例

    Java随机生成姓名,手机号,住址代码示例

    这篇文章主要介绍了Java随机生成姓名,手机号,住址代码示例,属于Java基础方面的内容,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11

最新评论