RxJava 触发流基本原理源码解析

 更新时间:2022年12月30日 16:14:59   作者:itbird01  
这篇文章主要为大家介绍了RxJava 触发流基本原理源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

正文

本节,我们从Rxjava使用代码入手,去结合自己已有的知识体系,加查阅部分源码验证的方式,来一起探索一下Rxjava实现的基本原理。

为了本文原理分析环节,可以被更多的人理解、学习,所以小编从初学者的角度,从使用入手,一点点的分析了其中的源码细节、思想,建议大家随着本文的章节步骤,一步一步的来阅读,才能更快、更好的理解Rxjava的真正的思想精髓,也为我们之后的实践课程留一个好的底子。

触发流

到目前为止,我们讲了构建流、订阅流,但是依然没有触发真正的observer中的事件,例如:

@Override
   public void onSubscribe(@NonNull Disposable d) {
       Log.d(TAG, "onSubscribe");
   }
   @Override
   public void onNext(@NonNull String s) {
       Log.d(TAG, "onNext s = " + s);
   }
   @Override
   public void onError(@NonNull Throwable e) {
       Log.d(TAG, "onError");
   }
   @Override
   public void onComplete() {
       Log.d(TAG, "onComplete");
   }

各位看官,莫急莫急,且听老衲娓娓道来。

还记得上面的订阅流吗?订阅流从右往左执行的,执行到最后的observable,执行了它的subscribe方法。我们从使用代码知道,最左端的observable是啥来着,大家还记得吗?当然是ObservableJust

private void test() {
	//第一步:just调用
    Observable.just("https://img-blog.csdn.net/20160903083319668")
    //第二步:map调用
            .map(new Function<String, Bitmap>() {
                @Override
                public Bitmap apply(String s) throws Exception {
                    //Bitmap bitmap = downloadImage(s);
                    return null;
                }
            })
            //第三步:subscribeOn、observeOn调用
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            //第四步:subscribe调用
            .subscribe(new Observer<Bitmap>() {
                @Override
                public void onSubscribe() {
                    Log.d(TAG, "onSubscribe");
                }
                @Override
                public void onNext(Bitmap s) {
                    Log.d(TAG, "onNext s = " + s);
                }
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError ", e);
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

我们就顺坡下驴,看一下ObservableJust的subscribe方法做啥了

public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }
    @Override
    public T get() {
        return value;
    }
}

仔细一看,这里面没有subscribe方法,那么肯定就是调用父类observable的subscribe方法了

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        //对象封装,暂时不是重点,我们跳过
        observer = RxJavaPlugins.onSubscribe(this, observer);
        //判空
        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

大家看到这里,其实关键在于,最终调用了一个subscribeActual方法,所以我们继续看子类ObservableJust的subscribeActual方法干啥了?

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

接续根据ScalarDisposable的run方法

   public static final class ScalarDisposable<T>
    extends AtomicInteger
    implements QueueDisposable<T>, Runnable {
        private static final long serialVersionUID = 3880992722410194083L;
        final Observer<? super T> observer;
        final T value;
		//...省略很多代码
        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
            	//可以看到这里执行了onNext、onComplete方法
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }
    }

小结

看到这里,我们知道了,开始一层一层的从左往右去调用observer的相关方法了。 由订阅流可知,每层的observable实际上拥有下一层的observer的代理类,所以自然而然,从最左边开始调用observer的相关方法开始,触发流,就是从左往右,一层一层的剥开之前包裹的observer,然后顺序调用里面的onNext、onComplete等方法。 不信,我们挑一个ObservableMap来验证一下。

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //此处调用了下游的observer的onNext方法
            downstream.onNext(v);
        }
    }
}

可以看到里面,的确调用了下游的observer的onNext方法。

总结

整个过程,分为构建流、订阅流、触发流。

构建流:从左到右执行不同的操作符的过程,其实很简单,就是根据不同的操作符,对原始的 observable进行逐层包装,这里可以看出,每层的节点 N* 就持有了上一层的observable。

订阅流:从右到左的 subscribe 调用过程,这个过程中,每个observable内部的subscribeActual执行两个关键操作,一个是对自己已有的observer进行一层重新包装,另外一个就是使用前面节点的observable,订阅包装好的observer。

触发流:在订阅流执行完成之后,执行到最左端的observable,我们发现它内部的subscribeActual实现,实际上就是调用里面拥有的observer的相关回调方法(onNext、onComplete、onError等),那么这层回调流就简单了,就是一层一层的调用里面的observer,最终执行到最右端的observer。

篇幅所限,大家也发现了,我们本节课,我们详细讲解Rxjava线程切换的实现原理,这个有两个原因,一是篇幅所限,本节内容已经够多了,大家先吃透框架,另外一方面是,线程切换我相信我们后面实践环节,待框架自我搭建实现之后,里面的线程切换功能就是水到渠成的事情,相信凭借大家已有的知识,都可以做到的。

所以建议大家,先别看这块Rxjava是如何实现线程切换的,而是想一下,它是怎么实现的?到时我们自己的Rxjava框架搭建起来之后,填充实现一下。

提个醒儿,大家还记得我们之前EventBus源码分析、实践环节吗?其中也说到了线程切换。其实原理差不多,大家先想一下。

以上就是RxJava 触发流基本原理源码解析的详细内容,更多关于RxJava 触发流原理的资料请关注脚本之家其它相关文章!

相关文章

  • 利用kotlin实现一个打方块的小游戏实例教程

    利用kotlin实现一个打方块的小游戏实例教程

    最近在学习kotlin,利用其实现了一个小游戏,觉着有必要和大家分享下,所以下面这篇文章主要给大家介绍了关于利用kotlin实现一个打方块的小游戏的相关资料,文中给出了详细的示例代码供大家参考学习,需要的朋友们下面随着小编来一起学习学习吧。
    2017-12-12
  • Android开发中遇到端口号占用问题解决方法

    Android开发中遇到端口号占用问题解决方法

    这篇文章主要介绍了Android开发中遇到端口号占用问题解决方法,本文给出了一个简洁实用的方法来解决这个烦人的问题,需要的朋友可以参考下
    2015-06-06
  • android WindowManager的简单使用实例详解

    android WindowManager的简单使用实例详解

    这篇文章主要介绍了android WindowManager的简单使用,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-08-08
  • Android自定义View获取注册验证码倒计时按钮

    Android自定义View获取注册验证码倒计时按钮

    这篇文章主要介绍了Android自定义View获取验证码倒计时按钮的制作方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-10-10
  • Android实现轮播图无限循环效果

    Android实现轮播图无限循环效果

    这篇文章主要为大家详细介绍了Android实现轮播图无限循环效果,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-02-02
  • Android开发笔记XML数据解析方法及优缺点

    Android开发笔记XML数据解析方法及优缺点

    XML数据是一种常见的数据格式,Android开发中需要对其进行解析。常用的XML解析方式有DOM、SAX、Pull和Json等,每种方式都有其优缺点。开发者可以根据具体需求选择合适的解析方式,提高数据解析效率和性能
    2023-05-05
  • 用Android Location获取当前地理位置的方法

    用Android Location获取当前地理位置的方法

    本篇文章小编为大家介绍,用Android Location获取当前地理位置的方法。需要的朋友参考下
    2013-04-04
  • Android开发框架MVC-MVP-MVVM-MVI的演变Demo

    Android开发框架MVC-MVP-MVVM-MVI的演变Demo

    这篇文章主要为大家介绍了Android开发框架MVC-MVP-MVVM-MVI的演变Demo,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-10-10
  • RecyclerView使用payload实现局部刷新

    RecyclerView使用payload实现局部刷新

    这篇文章主要为大家详细介绍了RecyclerView使用payload实现局部刷新,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-10-10
  • 详解Flutter中视频播放器插件的使用教程

    详解Flutter中视频播放器插件的使用教程

    视频播放器插件是可用于Flutter的常用插件之一,在这篇文章中,将学习如何应用视频播放器插件以及控制视频播放器的不同功能,感兴趣的可以了解一下
    2022-02-02

最新评论