elasticsearch索引的创建过程index create逻辑分析

 更新时间:2022年04月22日 11:12:05   作者:zziawan  
这篇文章主要介绍了elasticsearch索引核心index create,索引的创建过程解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

索引的创建过程

从本篇开始,就进入了Index的核心代码部分。这里首先分析一下索引的创建过程。elasticsearch中的索引是多个分片的集合,它只是逻辑上的索引,并不具备实际的索引功能,所有对数据的操作最终还是由每个分片完成。

创建索引的过程,从elasticsearch集群上来说就是写入索引元数据的过程,这一操作只能在master节点上完成。这是一个阻塞式动作,在加上分配在集群上均衡的过程也非常耗时,因此在一次创建大量索引的过程master节点会出现单点性能瓶颈,能够看到响应过程很慢。

在开始具体源码分析之前,首先回顾一下Action部分的内容(参考index action分析),elasticsearch的每一个功能都对应两个Action,*action和Transport*action。*action中定义了每个功能对应的路径,同时Action的instance绑定对应的Transport*Action。所有功能请求都需要在集群上转发,这大概也是每个功能都有Transport*Action的原因吧。对于create当然也不例外,它的开始点也是TransportCreateAction。另外,在action support分析中分析过,不同的action需要经过和需要操作的节点也不同。create index只能由master节点进行,而且也只在master节点上进行,保证集群数据的一致性。

materOperation方法实现

因此TransportCreateAction继承了TransportMasterNodeOperationAction,并实现了materOperation方法。它的方法如下所示:

protected void masterOperation(final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener) throws ElasticsearchException {
        String cause = request.cause();
        if (cause.length() == 0) {
            cause = "api";
        }
        final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index())
                .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
                .settings(request.settings()).mappings(request.mappings())
                .aliases(request.aliases()).customs(request.customs());
        createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
            @Override
            public void onResponse(ClusterStateUpdateResponse response) {
                listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
            }
            @Override
            public void onFailure(Throwable t) {
                if (t instanceof IndexAlreadyExistsException) {
                    logger.trace("[{}] failed to create", t, request.index());
                } else {
                    logger.debug("[{}] failed to create", t, request.index());
                }
                listener.onFailure(t);
            }
        });
    }

这里看上很简单,只是调用了createIndexService(它其实是MetaDataCreateIndexService)的方法,就是修改集群matedata过程。

clusterservice处理

修改前首先获取到index名称对应的lock,这样保证操作数据一致性,然后生成updatetask,交给clusterservice处理。代码如下所示:

public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
        // 获取锁,只对该索引的操作加锁,而不是整个cluster
        final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index());
        // 如果能够获取锁离开创建索引,否则在下面启动新的线程进行
        if (mdLock.tryAcquire()) {
            createIndex(request, listener, mdLock);
            return;
        }
        threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) {
            @Override
            public void doRun() throws InterruptedException {
                if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {
                    listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock"));
                    return;
                }
                createIndex(request, listener, mdLock);
            }
        });
    }

createIndex方法,会封装create请求,然后向cluster发送一个updatetask。代码如下所示:

private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) {
        ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
        updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
        request.settings(updatedSettingsBuilder.build());
        clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener)

建立索引 修改配置

增加或者修改mapping都是对集群状态修改,它们的过程都很相似,都是通过clusterService提交一个更新操作,同时附带有优先级。clusterservice会根据优先级和更新状态task的类型来进行对应的操作。如下所示:

public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) {
        if (!lifecycle.started()) {
            return;
        }
        try {
            final UpdateTask task = new UpdateTask(source, priority, updateTask);//根据优先级新建不同的task
            if (updateTask instanceof TimeoutClusterStateUpdateTask) {//超时任务,这类任务需要即时返回,因此立刻执行。
                final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask;
                updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() {
                    @Override
                    public void run() {
                        threadPool.generic().execute(new Runnable() {
                            @Override
                            public void run() {
                                timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source()));
                            }
                        });
                    }
                });
            } else {//其它类型,可以延迟执行,则交给线程池来执行。
                updateTasksExecutor.execute(task);
            }
        } catch (EsRejectedExecutionException e) {
            // ignore cases where we are shutting down..., there is really nothing interesting
            // to be done here...
            if (!lifecycle.stoppedOrClosed()) {
                throw e;
            }
        }
    }

说完它们的执行过程,再来看一下create index的具体逻辑。这个逻辑在matedataservice所提交的AckedClusterStateUpdateTask中的execute方法中。总体来说,这一过程就是将request中关于索引的配置mapping等取出来加入到当前的clustermatedata中,构造一个新的matedata的过程。这一过程还是比较复杂,限于篇幅将在下次中进行分析。

总结

创建索引的过程就是master节点更新集群matedata的过程,为了保证数据一致性,需要获取锁。

因此存在单点瓶颈。对于外部调用来说,跟其它功能一样,外部接口调用CreateIndexAction的相关方法,然后通过TransPortCreateIndexAction讲请求发送到集群上,进行索引创建。

以上就是elasticsearch索引创建过程index create的详细内容,更多关于elasticsearch索引创建过程index create的资料请关注脚本之家其它相关文章!

相关文章

  • Intelli IDEA安装Scala插件并安装Scala软件和配置环境变量的详细教程

    Intelli IDEA安装Scala插件并安装Scala软件和配置环境变量的详细教程

    这篇文章主要介绍了Intelli IDEA安装Scala插件并安装Scala软件和配置环境变量的详细教程,需要的朋友可以参考下
    2020-10-10
  • Spring Cloud体系实现标签路由的方法示例

    Spring Cloud体系实现标签路由的方法示例

    这篇文章主要介绍了Spring Cloud体系实现标签路由的方法示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-05-05
  • SpringCloud微服务的调用与远程调用测试示例

    SpringCloud微服务的调用与远程调用测试示例

    这篇文章主要介绍了SpringCloud微服务的调用与远程调用测试示例,服务调用者-可以暂时认为是与用户交互的角色(因为存在微服务之间的调用),可以根据该用户的类型将其赋予不同的服务调用权限,通过一次http请求访问调用对应的微服务获取想要的数据
    2023-04-04
  • Spring Boot集成mongodb数据库过程解析

    Spring Boot集成mongodb数据库过程解析

    这篇文章主要介绍了Spring Boot集成mongodb数据库过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • SpringSecurity+OAuth2.0 搭建认证中心和资源服务中心流程分析

    SpringSecurity+OAuth2.0 搭建认证中心和资源服务中心流程分析

    OAuth 2.0 主要用于在互联网上安全地委托授权,广泛应用于身份验证和授权场景,这篇文章介绍SpringSecurity+OAuth2.0 搭建认证中心和资源服务中心,感兴趣的朋友一起看看吧
    2024-01-01
  • Java中Stream流对多个字段进行排序的方法

    Java中Stream流对多个字段进行排序的方法

    我们在处理数据的时候经常会需要进行排序后再返回给前端调用,比如按照时间升序排序,前端展示数据就是按时间先后进行排序,下面这篇文章主要给大家介绍了关于Java中Stream流对多个字段进行排序的相关资料,需要的朋友可以参考下
    2023-10-10
  • SpringBoot整合JWT Token的完整步骤

    SpringBoot整合JWT Token的完整步骤

    JSON Web Token是目前最流行的跨域认证解决方案,适合前后端分离项目通过Restful API进行数据交互时进行身份认证,这篇文章主要给大家介绍了关于SpringBoot整合JWT Token的相关资料,需要的朋友可以参考下
    2021-09-09
  • java定时任务实现的4种方式小结

    java定时任务实现的4种方式小结

    这篇文章主要介绍了java定时任务实现的4种方式小结,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • Java8 用Lambda表达式给List集合排序的实现

    Java8 用Lambda表达式给List集合排序的实现

    这篇文章主要介绍了Java8 用Lambda表达式给List集合排序的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-08-08
  • springboot 获取工具类bean过程详解

    springboot 获取工具类bean过程详解

    这篇文章主要介绍了springboot 获取工具类bean过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-09-09

最新评论