elasticsearch集群cluster discovery可配式模块示例分析

 更新时间:2022年04月21日 16:07:15   作者:zziawan  
这篇文章主要为大家介绍了elasticsearch集群cluster discovery可配式模块示例分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

elasticsearch cluster实现了自己发现机制zen。Discovery功能主要包括以下几部分内容:master选举,master错误探测,集群中其它节点探测,单播多播ping。本篇会首先概述以下Discovery这一部分的功能,然后介绍节点检测。其它内容会在接下来介绍。

Discovery模块的概述

discovery是可配式模块,官方支持亚马逊的Azure discovery,Google Compute Engine,EC2 Discovery三种发现机制,根据插件规则完全可以自己实现其它的发现机制。整个模块通过实现guice的DiscoveryModule对外提供模块的注册和启动, 默认使用zen discovery。发现模块对外接口为DiscoveryService,它的方法如下所示:

它本质上是discovery的一个代理,所有的功能最终都是由所绑定的discovery所实现的。节点启动时通过DiscoveryModule获取DiscoveryService,然后启动DiscoveryService,DiscoveryService启动绑定的Discovery,整个功能模块就完成了加载和启动。这也是elasticsearch所有模块的实现方式,通过module对外提供绑定和获取,通过service接口对外提供模块的功能,在后面的分析中会经常遇到。

cluster节点探测

接下来分析cluster的一个重要功能就是节点探测。cluster中不能没有master节点,因此集群中所有节点都要周期探测master节点,一旦无法检测到,将会进行master选举。同时作为master,对于节点变动也要时刻关注,因此它需要周期性探测集群中所有节点,确保及时剔除已经宕机的节点。这种相互间的心跳检测就是cluster的faultdetection。下图是faultdetection的继承关系:

有两种实现方式,分别是master探测集群中其它节点和其它节点对master节点的探测。

FaultDetection只要一个抽象方法handleTransportDisconnect,该方法在内部类FDConnectionListener中被调用。在elasticsearch中大量使用了listener的异步方式,异步可以极大提升系统性能。它的代码如下所示:

private class FDConnectionListener implements TransportConnectionListener {
        @Override
        public void onNodeConnected(DiscoveryNode node) {
        }
        @Override
        public void onNodeDisconnected(DiscoveryNode node) {
            handleTransportDisconnect(node);
        }
    }

faultdetection启动时会注册相应的FDConnetionListener,当探测到节点丢失,会通过onNodeDisconnected方法回调对于的handleTransportDisconnect进行处理。

MasterFaultDetection的启动代码

private void innerStart(final DiscoveryNode masterNode) {
        this.masterNode = masterNode;
        this.retryCount = 0;
        this.notifiedMasterFailure.set(false);
        // 尝试连接master节点
        try {
            transportService.connectToNode(masterNode);
        } catch (final Exception e) {
            // 连接失败通知masterNode失败
            notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]");
            return;
        }
    //关闭之前的masterping,重启新的masterping
        if (masterPinger != null) {
            masterPinger.stop();
        }
        this.masterPinger = new MasterPinger();
        // 周期之后启动masterPing,这里并没有周期启动masterPing,只是设定了延迟时间。
        threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
    }

代码有有详细注释,就不再过多解释。

master连接失败的逻辑

代码如下:

private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
        if (notifiedMasterFailure.compareAndSet(false, true)) {
            threadPool.generic().execute(new Runnable() {
                @Override
                public void run() {
            //通知所有listener master丢失
                    for (Listener listener : listeners) {
                        listener.onMasterFailure(masterNode, reason);
                    }
                }
            });
            stop("master failure, " + reason);
        }
    }

 在ZenDiscovery中实现了listener.onMasterFailure接口。会进行master丢失的相关处理,在后面再分析。

MasterPing的关键代码

private class MasterPinger implements Runnable {
        private volatile boolean running = true;
        public void stop() {
            this.running = false;
        }
        @Override
        public void run() {
            if (!running) {
                // return and don't spawn...
                return;
            }
            final DiscoveryNode masterToPing = masterNode;
   final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
            final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
                        @Override
                        public MasterPingResponseResponse newInstance() {
                            return new MasterPingResponseResponse();
                        }
                        @Override
                        public void handleResponse(MasterPingResponseResponse response) {
                            if (!running) {
                                return;
                            }
                            // reset the counter, we got a good result
                            MasterFaultDetection.this.retryCount = 0;
                            // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
                            if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                // 启动新的ping周期
                                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
                            }
                        }
                        @Override
                        public void handleException(TransportException exp) {
                            if (!running) {
                                return;
                            }
                            synchronized (masterNodeMutex) {
                                // check if the master node did not get switched on us...
                                if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                    if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                                        handleTransportDisconnect(masterToPing);
                                        return;
                                    } else if (exp.getCause() instanceof NoLongerMasterException) {
                                        logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
                                        notifyMasterFailure(masterToPing, "no longer master");
                                        return;
                                    } else if (exp.getCause() instanceof NotMasterException) {
                                        logger.debug("[master] pinging a master {} that is not the master", masterNode);
                                        notifyMasterFailure(masterToPing, "not master");
                                        return;
                                    } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
                                        logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode);
                                        notifyMasterFailure(masterToPing, "do not exists on master, act as master failure");
                                        return;
                                    }
                                    int retryCount = ++MasterFaultDetection.this.retryCount;
                                    logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
                                    if (retryCount >= pingRetryCount) {
                                        logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout);
                                        // not good, failure
                                        notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");
                                    } else {
                                         // resend the request, not reschedule, rely on send timeout
                                        transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
                                    }
                                }
                            }
                        }
            );
        }
    }

MasterPing是一个线程,在innerStart的方法中没有设定周期启动masterping,但是masterping需要周期进行,这个秘密就在run 方法中,如果ping成功就会重启一个新的ping。这样既保证了ping线程的唯一性同时也保证了ping的顺序和间隔。

ping的方式跟之前一样是也是通过transport发送一个masterpingrequest,进行一个连接。节点收到该请求后,如果已不再是master会抛出NotMasterException,状态更新出差会抛出其它异常,异常会通过。否则会正常响应notifyMasterFailure方法处理跟启动逻辑一样。

对于网络问题导致的无响应情况,会调用handleTransportDisconnect(masterToPing)方法处理。masterfaultDetection对该方法的实现如下:

protected void handleTransportDisconnect(DiscoveryNode node) {
    //这里需要同步
        synchronized (masterNodeMutex) {
        //master 已经换成其它节点,就没必要再连接
            if (!node.equals(this.masterNode)) {
                return;
            }
            if (connectOnNetworkDisconnect) {
                try {
            //尝试再次连接
                    transportService.connectToNode(node);
                    // if all is well, make sure we restart the pinger
                    if (masterPinger != null) {
                        masterPinger.stop();
                    }
            //连接成功启动新的masterping
                    this.masterPinger = new MasterPinger();
                    // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
                    threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
                } catch (Exception e) {
            //连接出现异常,启动master节点丢失通知
                    logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
                    notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
                }
            } else {
          //不需要重连,通知master丢失。
                logger.trace("[master] [{}] transport disconnected", node);
                notifyMasterFailure(node, "transport disconnected");
            }
        }
    }

这就是masterfaultDetection的整个流程:启动中如果master丢失则通知节点丢失,否则在一定延迟(3s)后启动masterping,masterping线程尝试连接master节点,如果master节点网络失联,尝试再次连接。master节点收到masterpingrequest后首先看一下自己还是不是master,如果不是则抛出异常,否则正常回应。节点如果收到响应是异常则启动master丢失通知,否则此次ping结束。在一定延迟后启动新的masterping线程。

NodeFaultDetection的逻辑跟实现上跟MasterFualtDetetion相似,区别主要在于ping异常处理上。当某个节点出现异常或者没有响应时,会启动节点丢失机制,只是受到通知后的处理逻辑不通。就不再详细分析,有兴趣可以参考具体代码,希望大家以后多多支持脚本之家!

相关文章

  • java版十大排序经典算法:完整代码(3)

    java版十大排序经典算法:完整代码(3)

    优秀的文章也不少,但是Java完整版的好像不多,我把所有的写一遍巩固下,同时也真诚的希望阅读到这篇文章的小伙伴们可以自己去从头敲一遍,不要粘贴复制!希望我的文章对你有所帮助,每天进步一点点
    2021-07-07
  • java中@JsonValue和@JsonCreator使用

    java中@JsonValue和@JsonCreator使用

    本文主要介绍了java中@JsonValue和@JsonCreator使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-06-06
  • 利用Java快速查找21位花朵数示例代码

    利用Java快速查找21位花朵数示例代码

    这篇文章主要给大家介绍了关于利用Java快速查找21位花朵数的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习,需要的朋友们下面随着小编来一起学习学习吧。
    2017-10-10
  • SpringBoot集成Mybatis实现对多数据源访问原理

    SpringBoot集成Mybatis实现对多数据源访问原理

    本文主要分析讨论在SpringBoot应用中我们该如何配置SqlSessionFactoryBean对象,进而实现对多个不同的数据源的操纵,文章通过代码示例介绍的非常详细,需要的朋友可以参考下
    2023-11-11
  • 浅谈Spring嵌套事务是怎么回滚的

    浅谈Spring嵌套事务是怎么回滚的

    本文主要介绍了Spring嵌套事务是怎么回滚的,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-11-11
  • Mybatis使用MySQL模糊查询时输入中文检索不到结果怎么办

    Mybatis使用MySQL模糊查询时输入中文检索不到结果怎么办

    这篇文章主要介绍了Mybatis使用MySQL模糊查询时输入中文检索不到结果的解决办法的相关资料,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2016-07-07
  • Java实现微信网页授权的示例代码

    Java实现微信网页授权的示例代码

    这篇文章主要介绍了Java实现微信网页授权的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-07-07
  • java 转发和重定向区别及实例代码

    java 转发和重定向区别及实例代码

    这篇文章主要介绍了java 转发和重定向区别及实例代码的相关资料,需要的朋友可以参考下
    2016-11-11
  • jOOQ串联字符串拒绝使用的原因实例

    jOOQ串联字符串拒绝使用的原因实例

    这篇文章主要为大家介绍了jOOQ串联字符串拒绝使用的原因实例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • spring boot Rabbit高级教程(最新推荐)

    spring boot Rabbit高级教程(最新推荐)

    RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理,本篇文章给大家介绍spring boot Rabbit高级教程,感兴趣的朋友一起看看吧
    2023-10-10

最新评论