Nacos集群数据同步方式

 更新时间:2024年12月28日 08:55:16   作者:91猿说编程  
文章主要介绍了Nacos集群中服务注册信息的同步机制,涉及到负责节点和非负责节点之间的数据同步过程,以及DistroProtocol协议在同步中的应用

引言

在Nacos属于集群时,当服务器收到服务注册请求后,发生了ClientEvent.ClientChangedEvent事件,就会触发将注册的服务信息同步给集群中的其他Nacos-server节点。

// DistroClientDataProcessor
private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

同步时,会涉及到一个负责节点和非负责节点

负责节点(发起同步)

也就是收到客户端事件ClientChangedEvent后负责同步信息给其他非负责节点, 所以这里只能有负责节点来进行同步,非负责节点只能接收同步事件

// DistroClientDataProcessor
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
    return;
}

DistroProtocol

Distro是阿里巴巴的私有协议,distro协议是为了注册中心而创造出的协议;

DistroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改,对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看

// DistroProtocol
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        syncToTarget(distroKey, action, each.getAddress(), delay);
    }
}

在调用syncToTarget后,会触发任务DistroDelayTaskProcessor处理任务,这是Distro协议的一个默认延迟任务处理器,可以看到。 对于删除类型的任务,触发任务DistroSyncDeleteTask , 对于删除的任务:DistroSyncChangeTask

public class DistroDelayTaskProcessor implements NacosTaskProcessor {
    @Override
    public boolean process(NacosTask task) {
        
        DistroDelayTask distroDelayTask = (DistroDelayTask) task;
        DistroKey distroKey = distroDelayTask.getDistroKey();
        switch (distroDelayTask.getAction()) {
            case DELETE:
                DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
                return true;
            case CHANGE:
            case ADD:
                DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                return true;
            default:
                return false;
        }
    }
}

DistroSyncChangeTask

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    ...
    
    // 无回调
    @Override
    protected boolean doExecute() {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return true;
        }
        return getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer());
    }
    
    // 有回调
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return;
        }
        //将得到的数据同步给其他服务节点
        getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer(), callback);
    }
    
    // 从DistroClientDataProcessor获取DistroData
    private DistroData getDistroData(String type) {
        DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
        if (null != result) {
            result.setType(OPERATION);
        }
        return result;
    }
}

获取同步数据getDistroData

这里获取同步数据其实是从DistroClientDataProcessor 中获取的,所以为Client的相关注册服务信息

// DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor
@Override
public DistroData getDistroData(DistroKey distroKey) {
    Client client = clientManager.getClient(distroKey.getResourceKey());
    if (null == client) {
        return null;
    }
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
    return new DistroData(distroKey, data);
}

可以看到generateSyncData 方法是关键获取服务的方法,该方法提供了同步数据,包含Client的注册信息,包括客户端注册了哪些namespace,哪些group,哪些service,哪些instance。

// AbstractClient implements Client 
@Override
public ClientSyncData generateSyncData() {
    List<String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();
    List<InstancePublishInfo> instances = new LinkedList<>();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        namespaces.add(entry.getKey().getNamespace());
        groupNames.add(entry.getKey().getGroup());
        serviceNames.add(entry.getKey().getName());
        instances.add(entry.getValue());
    }
    return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}

执行同步数据syncData

这里的同步实际是由DistroClientTransportAgent来负责的,将数据分装成DistroDataRequest 然后查询到对于的服务节点Member然后调用asyncRequest异步方法执行同步,后面的方法我就不跟了, 这时我们主要关注非负责节点收到同步请求后如何处理。

// DistroClientTransportAgent
@Override
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
    if (isNoExistTarget(targetServer)) {
        callback.onSuccess();
        return;
    }
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    Member member = memberManager.find(targetServer);
    try {
        clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
    } catch (NacosException nacosException) {
        callback.onFailed(nacosException);
    }
}

非负责节点(接收请求)

当负责节点将数据发送给非负责节点以后,将要处理发送过来的Client数据。通过DistroController收到数据后, 然后最终会DistroClientDataProcessor.processData方法来进行处理

// DistroController.java
@PutMapping("/datum")
public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
    ...
    DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());
    distroProtocol.onReceive(distroHttpData);
    ...
}
// DistroClientDataProcessor.java
@Override
public boolean processData(DistroData distroData) {
    switch (distroData.getType()) {
        case ADD:
        case CHANGE:
            ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                    .deserialize(distroData.getContent(), ClientSyncData.class);
            handlerClientSyncData(clientSyncData);
            return true;
        case DELETE:
            String deleteClientId = distroData.getDistroKey().getResourceKey();
            Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
            clientManager.clientDisconnected(deleteClientId);
            return true;
        default:
            return false;
    }
}

可以看出,这里分别对ADD/CHANGE和DELETE进行了处理,这里我主要关注ADD/CHANGE,所以主要关注handlerClientSyncData方法。

private void handlerClientSyncData(ClientSyncData clientSyncData) {
    Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
    // 同步客户端连接,此时如果客户端不存在,则会注册一个非负责节点client,后面就会获取到该客户端操作
    clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
    // 获取Client(此时注册到的是ConnectionBasedClient)
    Client client = clientManager.getClient(clientSyncData.getClientId());
    // 更新Client数据
    upgradeClient(client, clientSyncData);
}

**注意:**这里要注意下此时的Client实现类ConnectionBasedClient,它的isNative属性为false,这是非负责节点和负责节点的主要区别。

其实判断当前nacos节点是否为负责节点的依据就是这个**isNative属性**,如果是客户端直接注册在这个nacos节点上的ConnectionBasedClient,它的isNative属性为true;如果是由Distro协议,同步到这个nacos节点上的ConnectionBasedClient,它的isNative属性为false。

那其实我们都知道2.x的版本以后使用了长连接,所以**通过长连接建立在哪个节点上,哪个节点就是责任节点,客户端也只会向这个责任节点发送请求**。

DistroClientDataProcessor的upgradeClient方法,更新Client里的注册表信息,发布对应事件

private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
    Set<Service> syncedService = new HashSet<>();
    for (int i = 0; i < namespaces.size(); i++) {
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        syncedService.add(singleton);
        InstancePublishInfo instancePublishInfo = instances.get(i);
        if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
            client.addServiceInstance(singleton, instancePublishInfo);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
        }
    }
    for (Service each : client.getAllPublishedService()) {
        if (!syncedService.contains(each)) {
            client.removeServiceInstance(each);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
        }
    }
}

Distro协议负责集群数据统一

Distro为了确保集群间数据一致,不仅仅依赖于数据发生改变时的实时同步,后台有定时任务做数据同步。

在1.x版本中,责任节点每5s同步所有Service的Instance列表的摘要(md5)给非责任节点,非责任节点用对端传来的服务md5比对本地服务的md5,如果发生改变,需要反查责任节点。

在2.x版本中,对这个流程做了改造,责任节点会发送Client全量数据,非责任节点定时检测同步过来的Client是否过期,减少1.x版本中的反查。

责任节点每5s向其他节点发送DataOperation=VERIFY类型的DistroData,来维持非责任节点的Client数据不过期。

// DistroVerifyTimedTask.java
@Override
public void run() {
    // 所有其他节点
    List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
    for (String each : distroComponentHolder.getDataStorageTypes()) {
        // 遍历这些节点发送Client.isNative=true的DistroData,type = VERIFY
        verifyForDataStorage(each, targetServer);
    }
}

非责任节点每5s扫描isNative=false的client,如果client 30s内没有被VERIFY的DistroData更新过续租时间,会删除这个同步过来的Client数据。

//ConnectionBasedClientManager->ExpiredClientCleaner
private static class ExpiredClientCleaner implements Runnable {
    @Override
    public void run() {
        long currentTime = System.currentTimeMillis();
        for (String each : clientManager.allClientId()) {
            ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
            if (null != client && client.isExpire(currentTime)) {
                clientManager.clientDisconnected(each);
            }
        }
    }
} 

// ConnectionBasedClient.java
@Override
public boolean isExpire(long currentTime) {
    // 判断30s内没有续租 认为过期
    return !isNative() && currentTime - getLastRenewTime() > ClientConfig.getInstance().getClientExpiredTime();
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java面向对象选择题总结归纳

    Java面向对象选择题总结归纳

    今天小编就为大家分享一篇关于Java面向对象选择题总结归纳,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-01-01
  • Java实现整数的逆序输出的三种方法

    Java实现整数的逆序输出的三种方法

    这篇文章主要介绍了Java实现整数的逆序输出的三种方法,第一种是无限制整数的逆序输出,第二种是非负整数的逆序输出,第三种是非特殊情况的逆序输出,每种方法给大家讲解的非常详细需要的朋友可以参考下
    2022-11-11
  • 拳皇(Java简单的小程序)代码实例

    拳皇(Java简单的小程序)代码实例

    这篇文章主要介绍了拳皇Java简单小程序,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • Java集合之HashMap/hashTable详解

    Java集合之HashMap/hashTable详解

    这篇文章主要介绍了Java集合之HashMap/hashTable详解,Map是映射键值的对象,map不能包含重复键:每个键最多只能映射一个值,它模拟了数学函数的抽象,需要的朋友可以参考下
    2023-09-09
  • JAVA初级项目——实现图书管理系统

    JAVA初级项目——实现图书管理系统

    这篇文章主要介绍了JAVA如何实现图书管理系统,文中示例代码非常详细,供大家参考和学习,感兴趣的朋友可以了解下
    2020-06-06
  • java高级用法之JNA中的Function

    java高级用法之JNA中的Function

    这篇文章主要介绍了java高级用法之JNA中的Function,JAVA中的映射在JNA中就是一个function。通过或者function对象,我们可以实现一些非常强大的功能,下面一起进入文章看看具体内容吧
    2022-04-04
  • 解决HttpServletResponse和HttpServletRequest取值的2个坑

    解决HttpServletResponse和HttpServletRequest取值的2个坑

    这篇文章主要介绍了解决HttpServletResponse和HttpServletRequest取值的2个坑问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • 详细解读Druid数据库连接池的使用

    详细解读Druid数据库连接池的使用

    这篇文章主要介绍了Druid数据库连接池的使用,数据库连接池负责分配、管理和释放数据库连接,它允许应用程序重复使用一个现有的数据库连接,而不是重新建立一个,需要的朋友可以参考下
    2023-03-03
  • IDEA查看所有的断点(Breakpoints)并关闭的方式

    IDEA查看所有的断点(Breakpoints)并关闭的方式

    我们在使用IDEA开发Java应用时,基本上都需要进行打断点的操作,这方便我们排查BUG,也方便我们查看设计的是否正确,不过有时候,我们不希望进入断点,所以我们需要快速关闭所有断点,故本文给大家介绍了IDEA查看所有的断点(Breakpoints)并关闭的方式
    2024-10-10
  • 关于java.lang.NumberFormatException: null的问题及解决

    关于java.lang.NumberFormatException: null的问题及解决

    这篇文章主要介绍了关于java.lang.NumberFormatException: null的问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09

最新评论