java客户端Etcd官方仓库jetcd中KeepAlive接口实现

 更新时间:2022年02月24日 17:07:51   作者:kl  
这篇文章主要为大家介绍了java客户端Etcd官方仓库jetcd中KeepAlive接口实现,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,多多加薪

前言

Etcd的Java客户端有很多开源实现,Jetcd是Etcd官方仓库的Java客户端,整体api接口设计实现和官方go客户端类似,简洁易用。其中,租期续约的接口提供了两个分别是keepAliveOnce和keepAlive。功能如其名,keepAliveOnce是单次续约的接口,如果要保持租约,需要手动触发这个接口,所以这个接口基本不用。而keepAlive是自动续约保活的接口。大多数场景下,使用keepAlive即可,但是针对不同的场景,我们还需要考虑几个问题,如租约ttl的设置,以及keepAlive异常时的处理。

Jetcd项目地址:https://github.com/etcd-io/jetcd

背景问题

我们有一个基于mysql的binlog订阅数据变更的应用,线上有非常重要的应用基于这个服务,因为存在单点故障,后面使用了jetcd的lock + keepAlive的机制实现了主备服务秒级切换的功能,具体参见高可用架构etcd选主故障主备秒级切换实现,系统上线运行后发现,binlog的服务经常切换发生主备切换,而实际情况是,binlog的服务非常稳定,在没有上线主备切换服务前,从来没有发生过线上binlog服务宕掉的情况。最后查明问题出在了租约TTL的设置上面。这里先抛出问题和定位,下面先看下Jetcd的keepAlive具体实现,然后在分析为什么导致这个问题。

KeepAlive实现

先看下keepAlive的用法

private long acquireActiveLease() throws InterruptedException, ExecutionException {
        long leaseId = leaseClient.grant(leaseTTL).get().getID();
        logger.debug("LeaderSelector get leaseId:[{}] and ttl:[{}]", leaseId, leaseTTL);
        this.leaseCloser = leaseClient.keepAlive(leaseId, new StreamObserver() {
            @Override
            public void onNext(LeaseKeepAliveResponse value) {
                logger.debug("LeaderSelector lease keeps alive for [{}]s:", value.getTTL());
            }
            @Override
            public void onError(Throwable t) {
                logger.debug("LeaderSelector lease renewal Exception!", t.fillInStackTrace());
                cancelTask();
            }
            @Override
            public void onCompleted() {
                logger.info("LeaderSelector lease renewal completed! start canceling task.");
                cancelTask();
            }
        });
        return leaseId;
    }

租约实现都在LeaseImpl类里,通过EtcdClient拿到LeaseImpl实例后,首先通过grant方法设置ttl拿到租约的id,然后将租约作为入参调用keepAlive方法,第二个入参是一个观察者对象,内置了三个接口,分别是onNext:确定下一次租约续约时间后触发,onError:续约异常时触发,onCompleted:租约过期后触发。

keepAlive方法代码:

public synchronized CloseableClient keepAlive(long leaseId, StreamObserverobserver) {
    if (this.closed) {
      throw newClosedLeaseClientException();
    }
    KeepAlive keepAlive = this.keepAlives.computeIfAbsent(leaseId, (key) -> new KeepAlive(leaseId));
    keepAlive.addObserver(observer);

    if (!this.hasKeepAliveServiceStarted) {
      this.hasKeepAliveServiceStarted = true;
      this.start();
    }
    return new CloseableClient() {
      @Override
      public void close() {
        keepAlive.removeObserver(observer);
      }
    };
  }

LeaseImpl内部维护了一个以LeaseId为key,KeepAlive对象为value的map,KeepAlive的类中维护了一个StreamObserver集合,到期时间deadLine,下次续约时间nextKeepAlive和续约leaseId。第一次调用keepAlive方法时会触发start,启动续约的线程(sendKeepAliveExecutor())和检查是否过期的线程(deadLineExecutor())。

private void sendKeepAliveExecutor() {
    this.keepAliveResponseObserver = Observers.observer(
      response -> processKeepAliveResponse(response),
      error -> processOnError()
    );
    this.keepAliveRequestObserver = this.leaseStub.leaseKeepAlive(this.keepAliveResponseObserver);
    this.keepAliveFuture = scheduledExecutorService.scheduleAtFixedRate(
        () -> {
            // send keep alive req to the leases whose next keep alive is before now.
            this.keepAlives.entrySet().stream()
                .filter(entry -> entry.getValue().getNextKeepAlive() < System.currentTimeMillis())
                .map(Entry::getKey)
                .map(leaseId -> LeaseKeepAliveRequest.newBuilder().setID(leaseId).build())
                .forEach(keepAliveRequestObserver::onNext);
        },
        0,
        500,
        TimeUnit.MILLISECONDS
    );
  }

sendKeepAliveExecutor方法是整个keepAlive功能实现的核心,这个方法在LeaseImpl实例里只会被触发一次,开启了一个时间间隔为500毫秒的的定时任务调度。每次从keepAlives中筛选出nextkeepAlive时间小于当前时间的KeepAlive对象,触发续约。

nextkeepAlive初始化值就是创建KeepAlive实例时的当前时间,然后在续约的响应流观察者实例中,执行了processKeepAliveResponse方法,在这个里面维护了KeepAlive对象的nextkeepAlive。

private synchronized void processKeepAliveResponse(io.etcd.jetcd.api.LeaseKeepAliveResponse leaseKeepAliveResponse) {
    if (this.closed) {
      return;
    }
    final long leaseID = leaseKeepAliveResponse.getID();
    final long ttl = leaseKeepAliveResponse.getTTL();
    final KeepAlive ka = this.keepAlives.get(leaseID);
    if (ka == null) {
      // return if the corresponding keep alive has closed.
      return;
    }
    if (ttl > 0) {
      long nextKeepAlive = System.currentTimeMillis() + ttl * 1000 / 3;
      ka.setNextKeepAlive(nextKeepAlive);
      ka.setDeadLine(System.currentTimeMillis() + ttl * 1000);
      ka.onNext(leaseKeepAliveResponse);
    } else {
      // lease expired; close all keep alive
      this.removeKeepAlive(leaseID);
      ka.onError(
          newEtcdException(
            ErrorCode.NOT_FOUND,
            "etcdserver: requested lease not found"
          )
      );
    }
  }

可以看到,在首次续约后的响应处理中,nextKeepAlive被设置为当前时间加上ttl的1/3时间后,也就是说如果我们设置一个key的过期时间为6s,那么在使用keepAlive时续期的时间间隔为,每2s执行续约一次。如果ttl小于零,说明key已经过期被删除了,就直接触发onError,传递了一个requested lease not found的异常对象。

文末小结

1、回到最上面binlog的主备频繁切换的问题,由于我们将ttl的时间设置的过小5s。只要client和etcd 服务失联5s以上,期间可能由于各种原因导致keepAlive没有正常续约上,就会触发主备切换。这个时候binlog服务本身是没有任何问题的,却要因为失去领导权,而选择自杀。后面将ttl调整到了20s后,主备切换就没有那么敏感了。

2、还有一个场景,在将etcd作为服务注册中心时,也会使用到keepAlive,即使设置了ttl为20s,还是有可能没有续约上,导致注册的服务过期被删了,这个时候,我们的服务进程还是健康的。这个场景下,需要在onError、onCompleted事件中重新获取租约以及添加新的KeepAlive。

以上就是java客户端Etcd官方仓库jetcd中KeepAlive接口实现的详细内容,更多关于jetcd中的KeepAlive实现的资料请关注脚本之家其它相关文章!

相关文章

  • Java String初始化String域例题解析

    Java String初始化String域例题解析

    这篇文章主要介绍了Java String初始化String域例题解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • 基于指针pointers和引用references的区别分析

    基于指针pointers和引用references的区别分析

    本篇文章介绍了,基于指针pointers和引用references的区别分析。需要的朋友参考下
    2013-05-05
  • Java深入浅出讲解多线程的概念到使用

    Java深入浅出讲解多线程的概念到使用

    哈哈!经过一个阶段的学习,Java基础知识学习终于到多线程了!Java多线程以及后面互斥锁的概念都是Java基础学习的难点,所以我做了一个总结,希望对大家也有帮助
    2022-05-05
  • Freemarker 最简单的例子程序

    Freemarker 最简单的例子程序

    Freemarker最简单的例子程序是通过String来创建模版对象,并执行插值处理。
    2016-04-04
  • SpringBoot实现热部署的三种方式

    SpringBoot实现热部署的三种方式

    本文主要介绍了SpringBoot实现热部署的三种方式,主要包括配置pom.xml文件,使用插件的执行命令mvn spring-boot:run启动项,使用springloader本地启动修改jvm参数,使用devtools工具包,感兴趣的可以了解一下
    2023-12-12
  • Java集合框架入门之泛型和包装类

    Java集合框架入门之泛型和包装类

    Java 泛型(generics)是 JDK 5 中引入的一个新特性, 泛型提供了编译时类型安全检测机制,该机制允许程序员在编译时检测到非法的类型。泛型的本质是参数化类型,也就是说所操作的数据类型被指定为一个参数
    2021-10-10
  • Seata AT模式前后镜像是如何生成详解

    Seata AT模式前后镜像是如何生成详解

    这篇文章主要为大家介绍了Seata AT模式前后镜像是如何生成的方法详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-11-11
  • SpringMvc自定义拦截器(注解)代码实例

    SpringMvc自定义拦截器(注解)代码实例

    这篇文章主要介绍了SpringMvc自定义拦截器(注解)代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-08-08
  • Java如何不解压读取.zip的文件内容

    Java如何不解压读取.zip的文件内容

    这篇文章主要给大家介绍了关于Java如何不解压读取.zip的文件内容的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-03-03
  • Java中不用第三个变量来互换两个变量的值

    Java中不用第三个变量来互换两个变量的值

    在程序运行期间,随时可能产生一些临时数据,应用程序会将这些数据保存在一些内存单元中,每个内存单元都用一个标识符来标识。这些内存单元被称为变量,定义的标识符就是变量名,内存单元中存储的数据就是变量的值
    2021-10-10

最新评论