Java扩展库RxJava的基本结构与适用场景小结

 更新时间:2016年06月18日 16:09:47   作者:hi大头鬼hi  
RxJava(GitHub: https://github.com/ReactiveX/RxJava)能够帮助Java进行异步与事务驱动的程序编写,这里我们来作一个Java扩展库RxJava的基本结构与适用场景小结,刚接触RxJava的同学不妨看一下^^

基本结构

我们先来看一段最基本的代码,分析这段代码在RxJava中是如何实现的。

Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
  @Override
  public void call(Subscriber<? super String> subscriber) {
    subscriber.onNext("1");
    subscriber.onCompleted();
  }
};
Subscriber<String> subscriber1 = new Subscriber<String>() {
  @Override
  public void onCompleted() {

  }

  @Override
  public void onError(Throwable e) {

  }

  @Override
  public void onNext(String s) {

  }
};

Observable.create(onSubscriber1)
    .subscribe(subscriber1);

首先我们来看一下Observable.create的代码

public final static <T> Observable<T> create(OnSubscribe<T> f) {
  return new Observable<T>(hook.onCreate(f));
}

protected Observable(OnSubscribe<T> f) {
  this.onSubscribe = f;
}

直接就是调用了Observable的构造函数来创建一个新的Observable对象,这个对象我们暂时标记为observable1,以便后面追溯。
同时,会将我们传入的OnSubscribe对象onSubscribe1保存在observable1的onSubscribe属性中,这个属性在后面的上下文中很重要,大家留心一下。

接下来我们来看看subscribe方法。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
  return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
  ...
  subscriber.onStart();
  hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
  return hook.onSubscribeReturn(subscriber);
}

可以看到,subscribe之后,就直接调用了observable1.onSubscribe.call方法,也就是我们代码中的onSubscribe1对象的call方法
,传入的参数就是我们代码中定义的subscriber1对象。call方法中所做的事情就是调用传入的subscriber1对象的onNext和onComplete方法。
这样就实现了观察者和被观察者之间的通讯,是不是很简单?

public void call(Subscriber<? super String> subscriber) {
  subscriber.onNext("1");
  subscriber.onCompleted();
}

RxJava使用场景小结

1.取数据先检查缓存的场景
取数据,首先检查内存是否有缓存
然后检查文件缓存中是否有
最后才从网络中取
前面任何一个条件满足,就不会执行后面的

final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
  @Override
  public void call(Subscriber<? super String> subscriber) {
    if (memoryCache != null) {
      subscriber.onNext(memoryCache);
    } else {
      subscriber.onCompleted();
    }
  }
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
  @Override
  public void call(Subscriber<? super String> subscriber) {
    String cachePref = rxPreferences.getString("cache").get();
    if (!TextUtils.isEmpty(cachePref)) {
      subscriber.onNext(cachePref);
    } else {
      subscriber.onCompleted();
    }
  }
});

Observable<String> network = Observable.just("network");

//主要就是靠concat operator来实现
Observable.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
  memoryCache = "memory";
  System.out.println("--------------subscribe: " + s);
});

2.界面需要等到多个接口并发取完数据,再更新

//拼接两个Observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者
private void testMerge() {
  Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
  Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());

  Observable.merge(observable1, observable2)
      .subscribeOn(Schedulers.newThread())
      .subscribe(System.out::println);
}

3.一个接口的请求依赖另一个API请求返回的数据

举个例子,我们经常在需要登陆之后,根据拿到的token去获取消息列表。

这里用RxJava主要解决嵌套回调的问题,有一个专有名词叫Callback hell

NetworkService.getToken("username", "password")
  .flatMap(s -> NetworkService.getMessage(s))
  .subscribe(s -> {
    System.out.println("message: " + s);
  });

4.界面按钮需要防止连续点击的情况

RxView.clicks(findViewById(R.id.btn_throttle))
  .throttleFirst(1, TimeUnit.SECONDS)
  .subscribe(aVoid -> {
    System.out.println("click");
  });

5.响应式的界面

比如勾选了某个checkbox,自动更新对应的preference

SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);

CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
    .subscribe(checked.asAction());

6.复杂的数据变换

Observable.just("1", "2", "2", "3", "4", "5")
  .map(Integer::parseInt)
  .filter(s -> s > 1)
  .distinct()
  .take(3)
  .reduce((integer, integer2) -> integer.intValue() + integer2.intValue())
  .subscribe(System.out::println);//9

相关文章

  • Java中synchronized关键字修饰方法同步的用法详解

    Java中synchronized关键字修饰方法同步的用法详解

    synchronized可以用来同步静态和非静态方法,下面就具体来看一下Java中synchronized关键字修饰方法同步的用法详解:
    2016-06-06
  • Java SQL注入案例教程及html基础入门

    Java SQL注入案例教程及html基础入门

    这篇文章主要介绍了前端开发每天必学之SQL及HTML入门基础知识,介绍了学习web前端开发需要掌握的基础技术,感兴趣的小伙伴们可以参考一下
    2021-07-07
  • java 中 request.getSession(true、false、null)的区别

    java 中 request.getSession(true、false、null)的区别

    这篇文章主要介绍了java 中 request.getSession(true/false/null)的区别的相关资料,需要的朋友可以参考下
    2017-02-02
  • Java 批量生成条码的示例代码

    Java 批量生成条码的示例代码

    这篇文章主要介绍了Java 批量生成条码的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-08-08
  • 链表的原理及java实现代码示例

    链表的原理及java实现代码示例

    这篇文章主要介绍了链表的原理及java实现代码示例,涉及单向链表的基本介绍,单向链表的Java实现代码分享等相关内容,具有一定参考价值,需要的朋友可以参考下。
    2017-11-11
  • Java中注解@Async实现异步及导致失效原因分析

    Java中注解@Async实现异步及导致失效原因分析

    Async注解用于声明一个方法是异步的,当在方法上加上这个注解时将会在一个新的线程中执行该方法,而不会阻塞原始线程,这篇文章主要给大家介绍了关于Java中注解@Async实现异步及导致失效原因分析的相关资料,需要的朋友可以参考下
    2024-07-07
  • resultMap标签中里的collection标签详解

    resultMap标签中里的collection标签详解

    这篇文章主要介绍了resultMap标签中里的collection标签,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • SpringBoot使用过滤器、拦截器和监听器的案例代码(Springboot搭建java项目)

    SpringBoot使用过滤器、拦截器和监听器的案例代码(Springboot搭建java项目)

    这篇文章主要介绍了SpringBoot使用过滤器、拦截器和监听器(Springboot搭建java项目),本文是基于Springboot搭建java项目,结合案例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-02-02
  • 使用FeignClient调用POST表单Body内没有参数问题

    使用FeignClient调用POST表单Body内没有参数问题

    这篇文章主要介绍了使用FeignClient调用POST表单Body内没有参数问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • 图文详解Java中class的初始化顺序

    图文详解Java中class的初始化顺序

    网上有很多关于Java中class的初始化顺序文章,但是本文通过图文更加详细的介绍了Java中class的初始化顺序,并对class的装载顺序进行了讲解,下面一起来看看。
    2016-08-08

最新评论