最新消息: USBMI致力于为网友们分享Windows、安卓、IOS等主流手机系统相关的资讯以及评测、同时提供相关教程、应用、软件下载等服务。

Nacos配置中心设计分析

IT圈 admin 2浏览 0评论

Nacos配置中心设计分析

Nacos配置中心设计分析-客户端

  • 主要功能
  • 客户端初始化
  • 重要数据结构
    • NacosConfigService
    • EventDispatcher
    • ServerListManager
    • ClientWorker
    • CacheData
  • 典型场景
    • 获取配置文件内容
    • 发布配置文件内容
    • 添加配置文件监听
    • 删除配置文件监听
  • 线程模型
    • Nacos服务器登录线程
    • Nacos服务器列表更新线程
    • 客户端工作线程
    • 配置文件拉取线程
  • 版本

Nacos配置中心按照NameSpace、Group、DataId三级结构来组织配置文件,其中,NameSpace可以用于区分环境,例如dev、test等;Group是服务分组,可以用于标识应用;DataId是配置文件名。在实际应用中,可以将NameSpace和Group与应用绑定,固定写入应用的启动参数。

主要功能

Nacos配置中心客户端主要具有如下能力

  1. 获取配置文件内容
  2. 发布配置文件内容
  3. 添加配置文件监听
  4. 删除配置文件监听

客户端初始化

  • 初始化HTTP代理MetricsHttpAgent:对HttpAgent的进一步封装,增加了统计响应时间的能力,HttpAgent中维护了服务器列表,并启动一个线程不断向服务器发起登录请求,保证客户端的合法性。此外,如果没有指定服务器列表,还会启动一个线程不断查询服务器列表,如果服务器有变动,则发布服务器变更事件,并在本线程调用相应的监听器
  • 创建客户端工作线程ClientWorker:定时检查是否添加了需要监控的配置文件,如果有,则把这个文件更新的监控任务分配给拉取线程,不断从服务端拉取最新文件内容,如果发现文件变更,则更新客户端配置文件内容,并调用相应的监听器(如果有指定的线程池则使用该线程池,没有指定则用自身线程执行监听器调用)

初始化代码如下

    // NacosConfigServicepublic NacosConfigService(Properties properties) throws NacosException {String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);if (StringUtils.isBlank(encodeTmp)) {encode = Constants.ENCODE;} else {encode = encodeTmp.trim();}initNamespace(properties);// 初始化HTTP代理MetricsHttpAgentagent = new MetricsHttpAgent(new ServerHttpAgent(properties));agent.start();// 创建客户端工作线程ClientWorkerworker = new ClientWorker(agent, configFilterChainManager, properties);}

重要数据结构

NacosConfigService

// NacosConfigService
public class NacosConfigService implements ConfigService {......// http代理MetricsHttpAgentprivate HttpAgent agent; // 客户端工作线程private ClientWorker worker;......
}

EventDispatcher

public class EventDispatcher {......// key是事件类型,value是监听器列表static final Map<Class<? extends AbstractEvent>, CopyOnWriteArrayList<AbstractEventListener>> LISTENER_MAP= new HashMap<Class<? extends AbstractEvent>, CopyOnWriteArrayList<AbstractEventListener>>();......
}

ServerListManager

// ServerListManager 
public class ServerListManager {......// 服务列表(拼接成一行)private String serverAddrsStr;// 服务器列表(分解serverAddrsStr得到)volatile List<String> serverUrls = new ArrayList<String>();// 服务端点地址(查询服务列表请求地址)public String addressServerUrl;......
}

ClientWorker

// ClientWorker
public class ClientWorker {// 配置文件监听器,key是配置文件的标识,value是CacheDataprivate final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(new HashMap<String, CacheData>());
}

CacheData

// CacheData
public class CacheData {// 配置文件内容private volatile String content;// 监听器列表private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
}

CacheData是配置文件缓存,应用从CacheData中获取文件内容,CacheData由客户端线程不断拉取更新

典型场景

获取配置文件内容

优先从本地缓存文件读取配置文件内容,失败则从服务器获取,仍然失败,则从历史快照文件获取

    // NacosConfigServiceprivate String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {...// 优先使用本地配置String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);if (content != null) {......configFilterChainManager.doFilter(null, cr);content = cr.getContent();......return content;}// 本地没有配置文件内容则请求服务端获取try {String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);cr.setContent(ct[0]);configFilterChainManager.doFilter(null, cr);content = cr.getContent();return content;            } catch (NacosException ioe) {if (NacosException.NO_RIGHT == ioe.getErrCode()) {throw ioe;}LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",agent.getName(), dataId, group, tenant, ioe.toString());}// 从快照获取LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),dataId, group, tenant, ContentUtils.truncateContent(content));content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);cr.setContent(content);configFilterChainManager.doFilter(null, cr);content = cr.getContent();return content;}

发布配置文件内容

调用API修改配置文件内容

添加配置文件监听

客户端定时拉取指定文件内容,如果发生变化,更新配置文件内容(存放在cacheMap里),并调用监听器进行相应处理。
增加文件监控代码如下

    // ClientWorkerpublic void checkConfigInfo() {// 分任务int listenerSize = cacheMap.get().size();// 向上取整为批数// ParamUtil.getPerTaskConfigSize()为每个线程处理的文件数目,添加文件监听时,会向cacheMap增加一条记录int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}}

拉取文件内容并更新代码如下

    // ClientWorkerclass LongPollingRunnable implements Runnable {private int taskId;public LongPollingRunnable(int taskId) {this.taskId = taskId;}@Overridepublic void run() {List<CacheData> cacheDatas = new ArrayList<CacheData>();List<String> inInitializingCacheList = new ArrayList<String>();try {// check failover configfor (CacheData cacheData : cacheMap.get().values()) {if (cacheData.getTaskId() == taskId) {cacheDatas.add(cacheData);try {checkLocalConfig(cacheData);if (cacheData.isUseLocalConfigInfo()) {cacheData.checkListenerMd5();}} catch (Exception e) {LOGGER.error("get local config info error", e);}}}// check server configList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);LOGGER.info("get changedGroupKeys:" + changedGroupKeys);for (String groupKey : changedGroupKeys) {String[] key = GroupKey.parseKey(groupKey);String dataId = key[0];String group = key[1];String tenant = null;if (key.length == 3) {tenant = key[2];}try {String[] ct = getServerConfig(dataId, group, tenant, 3000L);CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));// 更新配置文件内容cache.setContent(ct[0]);if (null != ct[1]) {cache.setType(ct[1]);}LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",agent.getName(), dataId, group, tenant, cache.getMd5(),ContentUtils.truncateContent(ct[0]), ct[1]);} catch (NacosException ioe) {String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",agent.getName(), dataId, group, tenant);LOGGER.error(message, ioe);}}for (CacheData cacheData : cacheDatas) {if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {// 调用监听器处理cacheData.checkListenerMd5();cacheData.setInitializing(false);}}inInitializingCacheList.clear();executorService.execute(this);} catch (Throwable e) {// If the rotation training task is abnormal, the next execution time of the task will be punishedLOGGER.error("longPolling error : ", e);executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);}}}

删除配置文件监听

    // ClientWorkerpublic void removeListener(String dataId, String group, Listener listener) {group = null2defaultGroup(group);CacheData cache = getCache(dataId, group);if (null != cache) {cache.removeListener(listener);if (cache.getListeners().isEmpty()) {removeCache(dataId, group);}}}

线程模型

Nacos服务器登录线程

  • JVM中名称com.alibaba.nacos.client.config.security.updater
  • 定时向Nacos服务器发送登录请求,保证客户端的合法性
  • 实现类:ServerHttpAgent.匿名类

Nacos服务器列表更新线程

  • JVM中名称com.alibaba.nacos.client.Timer
  • 定时查询服务器列表(该线程只会在启动参数没有指定服务器列表的时候才会启动),如果有服务器变更,则更新客户端服务器列表信息,并调用该事件的监听器(如果有的话)
  • 实现类:ServerListManager.GetServerListTask

客户端工作线程

  • JVM中名称com.alibaba.nacos.client.Worker.{agentName}
  • 定时检查有没有新订阅的配置文件,如果有则启动一个拉取线程
  • 实现类:ClientWorker.匿名类

配置文件拉取线程

  • JVM中名称com.alibaba.nacos.client.Worker.longPolling.{agentName}
  • 定时拉取服务器配置文件内容,并依次调用该文件的监听器,如果添加监听器时指定了执行线程池,则使用指定线程线程池调用监听器
  • 实现类:ClientWorker.LongPollingRunnable

版本

本文基于Nacos的1.2.0版本

Nacos配置中心设计分析

Nacos配置中心设计分析-客户端

  • 主要功能
  • 客户端初始化
  • 重要数据结构
    • NacosConfigService
    • EventDispatcher
    • ServerListManager
    • ClientWorker
    • CacheData
  • 典型场景
    • 获取配置文件内容
    • 发布配置文件内容
    • 添加配置文件监听
    • 删除配置文件监听
  • 线程模型
    • Nacos服务器登录线程
    • Nacos服务器列表更新线程
    • 客户端工作线程
    • 配置文件拉取线程
  • 版本

Nacos配置中心按照NameSpace、Group、DataId三级结构来组织配置文件,其中,NameSpace可以用于区分环境,例如dev、test等;Group是服务分组,可以用于标识应用;DataId是配置文件名。在实际应用中,可以将NameSpace和Group与应用绑定,固定写入应用的启动参数。

主要功能

Nacos配置中心客户端主要具有如下能力

  1. 获取配置文件内容
  2. 发布配置文件内容
  3. 添加配置文件监听
  4. 删除配置文件监听

客户端初始化

  • 初始化HTTP代理MetricsHttpAgent:对HttpAgent的进一步封装,增加了统计响应时间的能力,HttpAgent中维护了服务器列表,并启动一个线程不断向服务器发起登录请求,保证客户端的合法性。此外,如果没有指定服务器列表,还会启动一个线程不断查询服务器列表,如果服务器有变动,则发布服务器变更事件,并在本线程调用相应的监听器
  • 创建客户端工作线程ClientWorker:定时检查是否添加了需要监控的配置文件,如果有,则把这个文件更新的监控任务分配给拉取线程,不断从服务端拉取最新文件内容,如果发现文件变更,则更新客户端配置文件内容,并调用相应的监听器(如果有指定的线程池则使用该线程池,没有指定则用自身线程执行监听器调用)

初始化代码如下

    // NacosConfigServicepublic NacosConfigService(Properties properties) throws NacosException {String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);if (StringUtils.isBlank(encodeTmp)) {encode = Constants.ENCODE;} else {encode = encodeTmp.trim();}initNamespace(properties);// 初始化HTTP代理MetricsHttpAgentagent = new MetricsHttpAgent(new ServerHttpAgent(properties));agent.start();// 创建客户端工作线程ClientWorkerworker = new ClientWorker(agent, configFilterChainManager, properties);}

重要数据结构

NacosConfigService

// NacosConfigService
public class NacosConfigService implements ConfigService {......// http代理MetricsHttpAgentprivate HttpAgent agent; // 客户端工作线程private ClientWorker worker;......
}

EventDispatcher

public class EventDispatcher {......// key是事件类型,value是监听器列表static final Map<Class<? extends AbstractEvent>, CopyOnWriteArrayList<AbstractEventListener>> LISTENER_MAP= new HashMap<Class<? extends AbstractEvent>, CopyOnWriteArrayList<AbstractEventListener>>();......
}

ServerListManager

// ServerListManager 
public class ServerListManager {......// 服务列表(拼接成一行)private String serverAddrsStr;// 服务器列表(分解serverAddrsStr得到)volatile List<String> serverUrls = new ArrayList<String>();// 服务端点地址(查询服务列表请求地址)public String addressServerUrl;......
}

ClientWorker

// ClientWorker
public class ClientWorker {// 配置文件监听器,key是配置文件的标识,value是CacheDataprivate final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(new HashMap<String, CacheData>());
}

CacheData

// CacheData
public class CacheData {// 配置文件内容private volatile String content;// 监听器列表private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
}

CacheData是配置文件缓存,应用从CacheData中获取文件内容,CacheData由客户端线程不断拉取更新

典型场景

获取配置文件内容

优先从本地缓存文件读取配置文件内容,失败则从服务器获取,仍然失败,则从历史快照文件获取

    // NacosConfigServiceprivate String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {...// 优先使用本地配置String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);if (content != null) {......configFilterChainManager.doFilter(null, cr);content = cr.getContent();......return content;}// 本地没有配置文件内容则请求服务端获取try {String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);cr.setContent(ct[0]);configFilterChainManager.doFilter(null, cr);content = cr.getContent();return content;            } catch (NacosException ioe) {if (NacosException.NO_RIGHT == ioe.getErrCode()) {throw ioe;}LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",agent.getName(), dataId, group, tenant, ioe.toString());}// 从快照获取LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),dataId, group, tenant, ContentUtils.truncateContent(content));content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);cr.setContent(content);configFilterChainManager.doFilter(null, cr);content = cr.getContent();return content;}

发布配置文件内容

调用API修改配置文件内容

添加配置文件监听

客户端定时拉取指定文件内容,如果发生变化,更新配置文件内容(存放在cacheMap里),并调用监听器进行相应处理。
增加文件监控代码如下

    // ClientWorkerpublic void checkConfigInfo() {// 分任务int listenerSize = cacheMap.get().size();// 向上取整为批数// ParamUtil.getPerTaskConfigSize()为每个线程处理的文件数目,添加文件监听时,会向cacheMap增加一条记录int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}}

拉取文件内容并更新代码如下

    // ClientWorkerclass LongPollingRunnable implements Runnable {private int taskId;public LongPollingRunnable(int taskId) {this.taskId = taskId;}@Overridepublic void run() {List<CacheData> cacheDatas = new ArrayList<CacheData>();List<String> inInitializingCacheList = new ArrayList<String>();try {// check failover configfor (CacheData cacheData : cacheMap.get().values()) {if (cacheData.getTaskId() == taskId) {cacheDatas.add(cacheData);try {checkLocalConfig(cacheData);if (cacheData.isUseLocalConfigInfo()) {cacheData.checkListenerMd5();}} catch (Exception e) {LOGGER.error("get local config info error", e);}}}// check server configList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);LOGGER.info("get changedGroupKeys:" + changedGroupKeys);for (String groupKey : changedGroupKeys) {String[] key = GroupKey.parseKey(groupKey);String dataId = key[0];String group = key[1];String tenant = null;if (key.length == 3) {tenant = key[2];}try {String[] ct = getServerConfig(dataId, group, tenant, 3000L);CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));// 更新配置文件内容cache.setContent(ct[0]);if (null != ct[1]) {cache.setType(ct[1]);}LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",agent.getName(), dataId, group, tenant, cache.getMd5(),ContentUtils.truncateContent(ct[0]), ct[1]);} catch (NacosException ioe) {String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",agent.getName(), dataId, group, tenant);LOGGER.error(message, ioe);}}for (CacheData cacheData : cacheDatas) {if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {// 调用监听器处理cacheData.checkListenerMd5();cacheData.setInitializing(false);}}inInitializingCacheList.clear();executorService.execute(this);} catch (Throwable e) {// If the rotation training task is abnormal, the next execution time of the task will be punishedLOGGER.error("longPolling error : ", e);executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);}}}

删除配置文件监听

    // ClientWorkerpublic void removeListener(String dataId, String group, Listener listener) {group = null2defaultGroup(group);CacheData cache = getCache(dataId, group);if (null != cache) {cache.removeListener(listener);if (cache.getListeners().isEmpty()) {removeCache(dataId, group);}}}

线程模型

Nacos服务器登录线程

  • JVM中名称com.alibaba.nacos.client.config.security.updater
  • 定时向Nacos服务器发送登录请求,保证客户端的合法性
  • 实现类:ServerHttpAgent.匿名类

Nacos服务器列表更新线程

  • JVM中名称com.alibaba.nacos.client.Timer
  • 定时查询服务器列表(该线程只会在启动参数没有指定服务器列表的时候才会启动),如果有服务器变更,则更新客户端服务器列表信息,并调用该事件的监听器(如果有的话)
  • 实现类:ServerListManager.GetServerListTask

客户端工作线程

  • JVM中名称com.alibaba.nacos.client.Worker.{agentName}
  • 定时检查有没有新订阅的配置文件,如果有则启动一个拉取线程
  • 实现类:ClientWorker.匿名类

配置文件拉取线程

  • JVM中名称com.alibaba.nacos.client.Worker.longPolling.{agentName}
  • 定时拉取服务器配置文件内容,并依次调用该文件的监听器,如果添加监听器时指定了执行线程池,则使用指定线程线程池调用监听器
  • 实现类:ClientWorker.LongPollingRunnable

版本

本文基于Nacos的1.2.0版本

与本文相关的文章

发布评论

评论列表 (0)

  1. 暂无评论