Nacos源码分析专题(二)-服务注册

网友投稿 735 2022-10-06 08:40:29

Nacos源码分析专题(二)-服务注册

文章目录

​​1.引言​​​​2.服务注册接口​​​​3.客户端​​

​​3.1.NacosServiceRegistryAutoConfiguration​​​​3.2.NacosAutoServiceRegistration​​​​3.3.NacosServiceRegistry​​​​3.4.NacosNamingService​​

​​4.客户端注册的流程图​​​​5.服务端​​

​​5.1.InstanceController​​​​5.2.ServiceManager​​

​​5.2.1.更服务列表​​​​5.2.2.Nacos集群一致性​​​​5.2.3.DistroConsistencyServiceImpl​​​​5.2.4.更新本地实例列表​​​​5.2.5.集群数据同步​​

​​5.3.服务端流程图​​

​​6.总结​​

​​6.1.Nacos的注册表结构是什么样的?​​​​6.2.Nacos如何保证并发写的安全性?​​​​6.3.Nacos如何避免并发读写的冲突?​​​​6.4.Nacos如何应对阿里内部数十万服务的并发写请求?​​

1.引言

服务注册到​​Nacos​​​以后,会保存在一个​​本地注册表​​中,其结构如下:

首先最外层是一个​​Map​​​,结构为:​​Map>​​:

​​key​​​:是​​namespace_id​​,起到环境隔离的作用。​​namespace​​下可以有多个​​group​​​​value​​​:又是一个​​Map​​,代表分组及组内的服务。一个组内可以有多个服务

​​key​​​:代表​​group​​分组,不过作为​​key​​时格式是​​group_name:service_name​​​​value​​​:分组下的某一个服务,例如​​userservice​​,用户服务。类型为​​Service​​,内部也包含一个​​Map​​,一个服务下可以有多个集群

​​key​​:集群名称​​value​​​:​​Cluster​​​类型,包含集群的具体信息。一个集群中可能包含多个实例,也就是具体的节点信息,其中包含一个​​Set​​​,就是该集群下的实例的集合-​​​Instance​​​:实例信息,包含实例的​​IP​​​、​​Port​​、健康状态、权重等等信息

每一个服务去注册到Nacos时,就会把信息组织并存入这个Map中。

2.服务注册接口

​​Nacos​​提供了服务注册的API接口,客户端只需要向该接口发送请求,即可实现服务注册。

接口说明:注册一个实例到Nacos服务。

请求类型:​​POST​​

请求路径:​​/nacos/v1/ns/instance​​

请求参数:

名称

类型

是否必选

描述

ip

字符串

​是​

服务实例IP

port

int

​是​

服务实例port

namespaceId

字符串


命名空间ID

weight

double


权重

enabled

boolean


是否上线

healthy

boolean


是否健康

metadata

字符串


扩展信息

clusterName

字符串


集群名

serviceName

字符串

​是​

服务名

groupName

字符串


分组名

ephemeral

boolean


是否临时实例

错误编码:

错误代码

描述

语义

400

Bad Request

客户端请求中的语法错误

403

Forbidden

没有权限

404

Not Found

无法找到资源

500

Internal Server Error

服务器内部错误

200

OK

正常

3.客户端

首先,我们需要找到服务注册的入口。

3.1.NacosServiceRegistryAutoConfiguration

因为​​Nacos​​​的客户端是基于​​SpringBoot​​​的自动装配实现的,我们可以在​​nacos-discovery​​依赖:

spring-cloud-starter-alibaba-nacos-discovery-2.2.6.RELEASE.jar

这个包中找到​​Nacos​​自动装配信息:

可以看到,有很多个自动配置类被加载了,其中跟服务注册有关的就是​​NacosServiceRegistryAutoConfiguration​​这个类,我们跟入其中。

可以看到,在​​NacosServiceRegistryAutoConfiguration​​这个类中,包含一个跟自动注册有关的Bean:

3.2.NacosAutoServiceRegistration

​​NacosAutoServiceRegistration​​源码如图:

可以看到在初始化时,其父类​​AbstractAutoServiceRegistration​​​也被初始化了。​​AbstractAutoServiceRegistration​​如图:

可以看到它实现了​​ApplicationListener​​​接口,监听Spring容器启动过程中的事件。在监听到​​WebServerInitializedEvent​​​(web服务初始化完成)的事件后,执行了​​bind​​ 方法。

其中的bind方法如下:

public void bind(WebServerInitializedEvent event) { // 获取 ApplicationContext ApplicationContext context = event.getApplicationContext(); // 判断服务的 namespace,一般都是null if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals(((ConfigurableWebServerApplicationContext) context) .getServerNamespace())) { return; } } // 记录当前 web 服务的端口 this.port.compareAndSet(0, event.getWebServer().getPort()); // 启动当前服务注册流程 this.start();}

其中的​​start​​方法流程:

public void start() { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } // 当前服务处于未运行状态时,才进行初始化 if (!this.running.get()) { // 发布服务开始注册的事件 this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); // ☆☆☆☆开始注册☆☆☆☆ register(); if (shouldRegisterManagement()) { registerManagement(); } // 发布注册完成事件 this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); // 服务状态设置为运行状态,基于AtomicBoolean this.running.compareAndSet(false, true); } }

其中最关键的​​register()​​方法就是完成服务注册的关键,代码如下:

protected void register() { this.serviceRegistry.register(getRegistration());}

此处的​​this.serviceRegistry​​​就是​​NacosServiceRegistry​​:

3.3.NacosServiceRegistry

而​​NacosServiceRegistry​​​对​​register​​的实现如下:

@Overridepublic void register(Registration registration) { // 判断serviceId是否为空,也就是spring.application.name不能为空 if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } // 获取Nacos的命名服务,其实就是注册中心服务 NamingService namingService = namingService(); // 获取 serviceId 和 Group String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); // 封装服务实例的基本信息,如 cluster-name、是否为临时实例、权重、IP、端口等 Instance instance = getNacosInstanceFromRegistration(registration); try { // 开始注册服务 namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { if (nacosDiscoveryProperties.isFailFast()) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); rethrowRuntimeException(e); } else { log.warn("Failfast is false. {} register failed...{},", serviceId, registration.toString(), e); } }}

可以看到方法中最终是调用​​NamingService​​​的​​registerInstance​​方法实现注册的。

而​​NamingService​​​接口的默认实现就是​​NacosNamingService​​。

3.4.NacosNamingService

​​NacosNamingService​​提供了服务注册、订阅等功能。

其中​​registerInstance​​就是注册服务实例,源码如下:

@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { // 检查超时参数是否异常。心跳超时时间(默认15秒)必须大于心跳周期(默认5秒) NamingUtils.checkInstanceIsLegal(instance); // 拼接得到新的服务名,格式为:groupName@@serviceId String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); // 判断是否为临时实例,默认为 true。 if (instance.isEphemeral()) { // 如果是临时实例,需要定时向 Nacos 服务发送心跳 BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } // 发送注册服务实例的请求 serverProxy.registerService(groupedServiceName, groupName, instance);}

最终,由​​NacosProxy​​​的​​registerService​​方法,完成服务注册。

代码如下:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); // 组织请求参数 final Map params = new HashMap(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); // 通过POST请求将上述参数,发送到 /nacos/v1/ns/instance reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);}

这里提交的信息就是​​Nacos​​服务注册接口需要的完整参数,核心参数有:

namespace_id:环境service_name:服务名称group_name:组名称cluster_name:集群名称ip: 当前实例的ip地址port: 当前实例的端口

而在​​NacosNamingService​​​的​​registerInstance​​方法中,有一段是与服务心跳有关的代码,我们在后续会继续学习。

4.客户端注册的流程图

如图:

5.服务端

在​​nacos-console​​​的模块中,会引入​​nacos-naming​​这个模块

模块结构如下:

其中的​​com.alibaba.nacos.naming.controllers​​​包下就有服务注册、发现等相关的各种接口,其中的服务注册是在​​InstanceController​​类中:

5.1.InstanceController

进入​​InstanceController​​​类,可以看到一个​​register​​方法,就是服务注册的方法了:

@CanDistro@PostMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String register(HttpServletRequest request) throws Exception { // 尝试获取namespaceId final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 尝试获取serviceName,其格式为 group_name@@service_name final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); // 解析出实例信息,封装为Instance对象 final Instance instance = parseInstance(request); // 注册实例 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok";}

这里,进入到了​​serviceManager.registerInstance()​​方法中。

5.2.ServiceManager

​​ServiceManager​​​就是​​Nacos​​​中管理服务、实例信息的核心API,其中就包含​​Nacos​​的服务注册表:

而其中的​​registerInstance​​方法就是注册服务实例的方法:

/** * Register an instance to a service in AP mode. * *

This method creates service or cluster silently if they don't exist. * * @param namespaceId id of namespace * @param serviceName service name * @param instance instance to register * @throws Exception any error occurred in the process */public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // 创建一个空的service(如果是第一次来注册实例,要先创建一个空service出来,放入注册表) // 此时不包含实例信息 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); // 拿到创建好的service Service service = getService(namespaceId, serviceName); // 拿不到则抛异常 if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // 添加要注册的实例到service中 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);}

创建好了服务,接下来就要添加实例到服务中:

/** * Add instance to service. * * @param namespaceId namespace * @param serviceName service name * @param ephemeral whether instance is ephemeral * @param ips instances * @throws NacosException nacos exception */public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 监听服务列表用到的key,服务唯一标识,例如:com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@order-service String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // 获取服务 Service service = getService(namespaceId, serviceName); // 同步锁,避免并发修改的安全问题 synchronized (service) { // 1)获取要更新的实例列表 List instanceList = addIpAddresses(service, ephemeral, ips); // 2)封装实例列表到Instances对象 Instances instances = new Instances(); instances.setInstanceList(instanceList); // 3)完成 注册表更新 以及 Nacos集群的数据同步 consistencyService.put(key, instances); }}

该方法中对修改服务列表的动作加锁处理,确保线程安全。而在同步代码块中,包含下面几步:

1)先获取要更新的实例列表,​​addIpAddresses(service, ephemeral, ips);​​​ 2)然后将更新后的数据封装到​​Instances​​对象中,后面更新注册表时使用 3)最后,调用​​consistencyService.put()​​方法完成​​Nacos​​集群的数据同步,保证集群一致性。

注意:在第1步的addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中。在第3步中,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。 这样在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。这种方案称为CopyOnWrite方案。

5.2.1.更服务列表

我们来看看实例列表的更新,对应的方法是​​addIpAddresses(service, ephemeral, ips);​​:

private List addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);}

继续进入​​updateIpAddresses​​方法:

public List updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { // 根据namespaceId、serviceName获取当前服务的实例列表,返回值是Datum // 第一次来,肯定是null Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); // 得到服务中现有的实例列表 List currentIPs = service.allIPs(ephemeral); // 创建map,保存实例列表,key为ip地址,value是Instance对象 Map currentInstances = new HashMap<>(currentIPs.size()); // 创建Set集合,保存实例的instanceId Set currentInstanceIds = Sets.newHashSet(); // 遍历要现有的实例列表 for (Instance instance : currentIPs) { // 添加到map中 currentInstances.put(instance.toIpAddr(), instance); // 添加instanceId到set中 currentInstanceIds.add(instance.getInstanceId()); } // 创建map,用来保存更新后的实例列表 Map instanceMap; if (datum != null && null != datum.value) { // 如果服务中已经有旧的数据,则先保存旧的实例列表 instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { // 如果没有旧数据,则直接创建新的map instanceMap = new HashMap<>(ips.length); } // 遍历实例列表 for (Instance instance : ips) { // 判断服务中是否包含要注册的实例的cluster信息 if (!service.getClusterMap().containsKey(instance.getClusterName())) { // 如果不包含,创建新的cluster Cluster cluster = new Cluster(instance.getClusterName(), service); cluster.init(); // 将集群放入service的注册表 service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); } // 删除实例 or 新增实例 ? if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { // 新增实例,instance生成全新的instanceId Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null) { instance.setInstanceId(oldInstance.getInstanceId()); } else { instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } // 放入instance列表 instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } // 将instanceMap中的所有实例转为List返回 return new ArrayList<>(instanceMap.values());}

简单来讲,就是先获取旧的实例列表,然后把新的实例信息与旧的做对比,新的实例就添加,老的实例同步ID。然后返回最新的实例列表。

5.2.2.Nacos集群一致性

在完成本地服务列表更新后,Nacos又实现了集群一致性更新,调用的是:

consistencyService.put(key, instances);

这里的​​ConsistencyService​​接口,代表集群一致性的接口,有很多中不同实现:

我们进入​​DelegateConsistencyServiceImpl​​来看:

@Overridepublic void put(String key, Record value) throws NacosException { // 根据实例是否是临时实例,判断委托对象 mapConsistencyService(key).put(key, value);}

其中的​​mapConsistencyService(key)​​方法就是选择委托方式的:

private ConsistencyService mapConsistencyService(String key) { // 判断是否是临时实例: // 是,选择 ephemeralConsistencyService,也就是 DistroConsistencyServiceImpl类 // 否,选择 persistentConsistencyService,也就是PersistentConsistencyServiceDelegateImpl return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;}

默认情况下,所有实例都是临时实例,我们关注​​DistroConsistencyServiceImpl​​即可。

5.2.3.DistroConsistencyServiceImpl

我们来看临时实例的一致性实现:​​DistroConsistencyServiceImpl​​​类的​​put​​方法:

public void put(String key, Record value) throws NacosException { // 先将要更新的实例信息写入本地实例列表 onPut(key, value); // 开始集群同步 distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}

这里方法只有两行:

​​onPut(key, value)​​:其中value就是Instances,要更新的服务信息。这行主要是基于线程池方式,异步的将Service信息写入注册表中(就是那个多重Map)​​distroProtocol.sync()​​​:就是通过​​Distro​​​协议将数据同步给集群中的其它​​Nacos​​节点

我们先看​​onPut​​方法

5.2.4.更新本地实例列表

1)放入阻塞队列

​​onPut​​方法如下:

public void onPut(String key, Record value) { // 判断是否是临时实例 if (KeyBuilder.matchEphemeralInstanceListKey(key)) { // 封装 Instances 信息到 数据集:Datum Datum datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); // 放入DataStore dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } // 放入阻塞队列,这里的 notifier维护了一个阻塞队列,并且基于线程池异步执行队列中的任务 notifier.addTask(key, DataOperation.CHANGE);}

​​notifier​​​的类型就是​​DistroConsistencyServiceImpl​​​.​​Notifier​​,内部维护了一个阻塞队列,存放服务列表变更的事件:

​​addTask​​时,将任务加入该阻塞队列:

// DistroConsistencyServiceImpl.Notifier类的 addTask 方法:public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } // 任务放入阻塞队列 tasks.offer(Pair.with(datumKey, action));}

2)Notifier异步更新 同时,​​​notifier​​​还是一个​​Runnable​​,通过一个单线程的线程池来不断从阻塞队列中获取任务,执行服务列表的更新。来看下其中的run方法:

// DistroConsistencyServiceImpl.Notifier类的run方法:@Overridepublic void run() { Loggers.DISTRO.info("distro notifier started"); // 死循环,不断执行任务。因为是阻塞队列,不会导致CPU负载过高 for (; ; ) { try { // 从阻塞队列中获取任务 Pair pair = tasks.take(); // 处理任务,更新服务列表 handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } }}

来看看handle方法:

// DistroConsistencyServiceImpl.Notifier类的 handle 方法:private void handle(Pair pair) { try { String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { return; } // 遍历,找到变化的service,这里的 RecordListener就是 Service for (RecordListener listener : listeners.get(datumKey)) { count++; try { // 服务的实例列表CHANGE事件 if (action == DataOperation.CHANGE) { // 更新服务列表 listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } // 服务的实例列表 DELETE 事件 if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); }}

3)覆盖实例列表 而在​​​Service​​​的​​onChange​​方法中,就可以看到更新实例列表的逻辑了:

@Overridepublic void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); // 更新实例列表 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum();}

updateIPs方法:

public void updateIPs(Collection instances, boolean ephemeral) { // 准备一个Map,key是cluster,值是集群下的Instance集合 Map> ipMap = new HashMap<>(clusterMap.size()); // 获取服务的所有cluster名称 for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } // 遍历要更新的实例 for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } // 判断实例是否包含clusterName,没有的话用默认cluster if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } // 判断cluster是否存在,不存在则创建新的cluster if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster(instance.getClusterName(), this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } // 获取当前cluster实例的集合,不存在则创建新的 List clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } // 添加新的实例到 Instance 集合 clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry> entry : ipMap.entrySet()) { //make every ip mine List entryIPs = entry.getValue(); // 将实例集合更新到 clusterMap(注册表) clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); // 发布服务变更的通知消息 getPushService().serviceChanged(this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(","); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString());}

在第45行的代码中:​​clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);​​

就是在更新注册表:

public void updateIps(List ips, boolean ephemeral) { // 获取旧实例列表 Set toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; HashMap oldIpMap = new HashMap<>(toUpdateInstances.size()); for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } // 检查新加入实例的状态 List newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(), getName(), newIPs.size(), newIPs.toString()); for (Instance ip : newIPs) { HealthCheckStatus.reset(ip); } } // 移除要删除的实例 List deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(), getName(), deadIPs.size(), deadIPs.toString()); for (Instance ip : deadIPs) { HealthCheckStatus.remv(ip); } } toUpdateInstances = new HashSet<>(ips); // 直接覆盖旧实例列表 if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances; }}

5.2.5.集群数据同步

在​​DistroConsistencyServiceImpl​​的put方法中分为两步:

其中的​​onPut​​方法已经分析过了。

下面的​​distroProtocol.sync()​​就是集群同步的逻辑了。

​​DistroProtocol​​​类的​​sync​​方法如下:

public void sync(DistroKey distroKey, DataOperation action, long delay) { // 遍历 Nacos 集群中除自己以外的其它节点 for (Member each : memberManager.allMembersWithoutSelf()) { DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); // 定义一个Distro的同步任务 DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); // 交给线程池去执行 distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress()); } }}

其中同步的任务封装为一个​​DistroDelayTask​​对象。

交给了​​distroTaskEngineHolder.getDelayTaskExecuteEngine()​​执行,这行代码的返回值是:

​​NacosDelayTaskExecuteEngine​​,这个类维护了一个线程池,并且接收任务,执行任务。

执行任务的方法为​​processTasks()​​方法:

protected void processTasks() { Collection keys = getAllTaskKeys(); for (Object taskKey : keys) { AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // 尝试执行同步任务,如果失败会重试 if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } }}

可以看出来基于​​Distro​​模式的同步是异步进行的,并且失败时会将任务重新入队并充实,因此不保证同步结果的强一致性,属于AP模式的一致性策略。

5.3.服务端流程图

6.总结

6.1.Nacos的注册表结构是什么样的?

答:​​Nacos​​​是多级存储模型,最外层通过namespace来实现环境隔离,然后是​​group​​分组,分组下就是服务,一个服务有可以分为不同的集群,集群中包含多个实例。因此其注册表结构为一个Map,类型是:

​​Map>​​,

外层​​key​​​是​​namespace_id​​​,内层​​key​​​是​​group+serviceName​​.

​​Service​​​内部维护一个​​Map​​​,结构是:​​Map​​​,​​key​​​是​​clusterName​​,值是集群信息

​​Cluster​​​内部维护一个Set集合,元素是​​Instance​​类型,代表集群中的多个实例。

6.2.Nacos如何保证并发写的安全性?

答:首先,在注册实例时,会对​​service​​​加锁,不同service之间本身就不存在并发写问题,互不影响。相同​​service​​时通过锁来互斥。并且,在更新实例列表时,是基于异步的线程池来完成,而线程池的线程数量为1.

6.3.Nacos如何避免并发读写的冲突?

答:Nacos在更新实例列表时,会采用CopyOnWrite技术,首先将Old实例列表拷贝一份,然后更新拷贝的实例列表,再用更新后的实例列表来覆盖旧的实例列表。

6.4.Nacos如何应对阿里内部数十万服务的并发写请求?

答:Nacos内部会将服务注册的任务放入阻塞队列,采用线程池异步来完成实例更新,从而提高并发写能力。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:如何解决微信小程序请求服务器手机预览请求不到数据的问题(微信小程序预览无法获取数据)
下一篇:关于微信小程序中弹框和模态框的实现(微信小程序模态框组件)
相关文章