文档章节

聊聊nacos config的publishConfig

go4it
 go4it
发布于 10/21 23:00
字数 1413
阅读 14
收藏 0

本文主要研究一下nacos config的publishConfig

ConfigController

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java

@Controller
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
public class ConfigController {

    private static final Logger log = LoggerFactory.getLogger(ConfigController.class);

    private static final String NAMESPACE_PUBLIC_KEY = "public";

    public static final String EXPORT_CONFIG_FILE_NAME = "nacos_config_export_";

    public static final String EXPORT_CONFIG_FILE_NAME_EXT = ".zip";

    public static final String EXPORT_CONFIG_FILE_NAME_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

    private final transient ConfigServletInner inner;

    private final transient PersistService persistService;

    private final transient ConfigSubService configSubService;

    @Autowired
    public ConfigController(ConfigServletInner configServletInner, PersistService persistService,
                            ConfigSubService configSubService) {
        this.inner = configServletInner;
        this.persistService = persistService;
        this.configSubService = configSubService;
    }

    /**
     * 增加或更新非聚合数据。
     *
     * @throws NacosException
     */
    @RequestMapping(method = RequestMethod.POST)
    @ResponseBody
    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
                                 @RequestParam("dataId") String dataId, @RequestParam("group") String group,
                                 @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
                                     String tenant,
                                 @RequestParam("content") String content,
                                 @RequestParam(value = "tag", required = false) String tag,
                                 @RequestParam(value = "appName", required = false) String appName,
                                 @RequestParam(value = "src_user", required = false) String srcUser,
                                 @RequestParam(value = "config_tags", required = false) String configTags,
                                 @RequestParam(value = "desc", required = false) String desc,
                                 @RequestParam(value = "use", required = false) String use,
                                 @RequestParam(value = "effect", required = false) String effect,
                                 @RequestParam(value = "type", required = false) String type,
                                 @RequestParam(value = "schema", required = false) String schema)
        throws NacosException {
        final String srcIp = RequestUtil.getRemoteIp(request);
        String requestIpApp = RequestUtil.getAppName(request);
        ParamUtils.checkParam(dataId, group, "datumId", content);
        ParamUtils.checkParam(tag);

        Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
        if (configTags != null) {
            configAdvanceInfo.put("config_tags", configTags);
        }
        if (desc != null) {
            configAdvanceInfo.put("desc", desc);
        }
        if (use != null) {
            configAdvanceInfo.put("use", use);
        }
        if (effect != null) {
            configAdvanceInfo.put("effect", effect);
        }
        if (type != null) {
            configAdvanceInfo.put("type", type);
        }
        if (schema != null) {
            configAdvanceInfo.put("schema", schema);
        }
        ParamUtils.checkParam(configAdvanceInfo);

        if (AggrWhitelist.isAggrDataId(dataId)) {
            log.warn("[aggr-conflict] {} attemp to publish single data, {}, {}",
                RequestUtil.getRemoteIp(request), dataId, group);
            throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
        }

        final Timestamp time = TimeUtils.getCurrentTime();
        String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else { // beta publish
            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
            EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),
            LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);

        return true;
    }

    //......
}
  • publishConfig根据入参构造configAdvanceInfo及configInfo,对于前者会执行ParamUtils.checkParam(configAdvanceInfo)校验
  • 对于有betaIps的则执行persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false),然后发布ConfigDataChangeEvent
  • 对于没有betaIps的则判断tag是否为空,为空则执行persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false),不为空则执行persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);二者都会发布ConfigDataChangeEvent

PersistService

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/PersistService.java

@Repository
public class PersistService {

    @Autowired
    private DynamicDataSource dynamicDataSource;

    private DataSourceService dataSourceService;

    //......

    /**
     * 写入主表,插入或更新
     */
    public void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time,
                               Map<String, Object> configAdvanceInfo, boolean notify) {
        try {
            addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify);
        } catch (DataIntegrityViolationException ive) { // 唯一性约束冲突
            updateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify);
        }
    }

    public void insertOrUpdateTag(final ConfigInfo configInfo, final String tag, final String srcIp,
                                  final String srcUser, final Timestamp time, final boolean notify) {
        try {
            addConfigInfo4Tag(configInfo, tag, srcIp, null, time, notify);
        } catch (DataIntegrityViolationException ive) { // 唯一性约束冲突
            updateConfigInfo4Tag(configInfo, tag, srcIp, null, time, notify);
        }
    }

    public void insertOrUpdateBeta(final ConfigInfo configInfo, final String betaIps, final String srcIp,
                                   final String srcUser, final Timestamp time, final boolean notify) {
        try {
            addConfigInfo4Beta(configInfo, betaIps, srcIp, null, time, notify);
        } catch (DataIntegrityViolationException ive) { // 唯一性约束冲突
            updateConfigInfo4Beta(configInfo, srcIp, null, time, notify);
        }
    }

    //......

    /**
     * 添加普通配置信息,发布数据变更事件
     */
    public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
                              final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
        tjt.execute(new TransactionCallback<Boolean>() {
            @Override
            public Boolean doInTransaction(TransactionStatus status) {
                try {
                    long configId = addConfigInfoAtomic(srcIp, srcUser, configInfo, time, configAdvanceInfo);
                    String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
                    addConfiTagsRelationAtomic(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),
                        configInfo.getTenant());
                    insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I");
                    if (notify) {
                        EventDispatcher.fireEvent(
                            new ConfigDataChangeEvent(false, configInfo.getDataId(), configInfo.getGroup(),
                                configInfo.getTenant(), time.getTime()));
                    }
                } catch (CannotGetJdbcConnectionException e) {
                    fatalLog.error("[db-error] " + e.toString(), e);
                    throw e;
                }
                return Boolean.TRUE;
            }
        });
    }

    /**
     * 更新配置信息
     */
    public void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser,
                                 final Timestamp time, final Map<String, Object> configAdvanceInfo,
                                 final boolean notify) {
        tjt.execute(new TransactionCallback<Boolean>() {
            @Override
            public Boolean doInTransaction(TransactionStatus status) {
                try {
                    ConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(),
                        configInfo.getTenant());
                    String appNameTmp = oldConfigInfo.getAppName();
                    // 用户传过来的appName不为空,则用持久化用户的appName,否则用db的;清空appName的时候需要传空串
                    if (configInfo.getAppName() == null) {
                        configInfo.setAppName(appNameTmp);
                    }
                    updateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo);
                    String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
                    if (configTags != null) {
                        // 删除所有tag,然后再重新创建
                        removeTagByIdAtomic(oldConfigInfo.getId());
                        addConfiTagsRelationAtomic(oldConfigInfo.getId(), configTags, configInfo.getDataId(),
                            configInfo.getGroup(), configInfo.getTenant());
                    }
                    insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U");
                    if (notify) {
                        EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, configInfo.getDataId(),
                            configInfo.getGroup(), configInfo.getTenant(), time.getTime()));
                    }
                } catch (CannotGetJdbcConnectionException e) {
                    fatalLog.error("[db-error] " + e.toString(), e);
                    throw e;
                }
                return Boolean.TRUE;
            }
        });
    }

    /**
     * 添加普通配置信息,发布数据变更事件
     */
    public void addConfigInfo4Tag(ConfigInfo configInfo, String tag, String srcIp, String srcUser, Timestamp time,
                                  boolean notify) {
        String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
        String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
        String tagTmp = StringUtils.isBlank(tag) ? StringUtils.EMPTY : tag.trim();
        try {
            String md5 = MD5.getInstance().getMD5String(configInfo.getContent());
            jt.update(
                "INSERT INTO config_info_tag(data_id,group_id,tenant_id,tag_id,app_name,content,md5,src_ip,src_user,"
                    + "gmt_create,gmt_modified) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
                configInfo.getDataId(), configInfo.getGroup(), tenantTmp, tagTmp, appNameTmp, configInfo.getContent(),
                md5,
                srcIp, srcUser, time, time);
            if (notify) {
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, configInfo.getDataId(),
                    configInfo.getGroup(), tenantTmp, tagTmp, time.getTime()));
            }
        } catch (CannotGetJdbcConnectionException e) {
            fatalLog.error("[db-error] " + e.toString(), e);
            throw e;
        }
    }

    /**
     * 更新配置信息
     */
    public void updateConfigInfo4Tag(ConfigInfo configInfo, String tag, String srcIp, String srcUser, Timestamp time,
                                     boolean notify) {
        String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
        String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
        String tagTmp = StringUtils.isBlank(tag) ? StringUtils.EMPTY : tag.trim();
        try {
            String md5 = MD5.getInstance().getMD5String(configInfo.getContent());
            jt.update(
                "UPDATE config_info_tag SET content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,app_name=? WHERE "
                    + "data_id=? AND group_id=? AND tenant_id=? AND tag_id=?",
                configInfo.getContent(), md5, srcIp, srcUser, time, appNameTmp, configInfo.getDataId(),
                configInfo.getGroup(), tenantTmp, tagTmp);
            if (notify) {
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, configInfo.getDataId(), configInfo.getGroup(),
                    tenantTmp, tagTmp, time.getTime()));
            }

        } catch (CannotGetJdbcConnectionException e) {
            fatalLog.error("[db-error] " + e.toString(), e);
            throw e;
        }
    }

    /**
     * 添加普通配置信息,发布数据变更事件
     */
    public void addConfigInfo4Beta(ConfigInfo configInfo, String betaIps,
                                   String srcIp, String srcUser, Timestamp time, boolean notify) {
        String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
        String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
        try {
            String md5 = MD5.getInstance().getMD5String(configInfo.getContent());
            jt.update(
                "INSERT INTO config_info_beta(data_id,group_id,tenant_id,app_name,content,md5,beta_ips,src_ip,"
                    + "src_user,gmt_create,gmt_modified) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
                configInfo.getDataId(), configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(), md5,
                betaIps, srcIp, srcUser, time, time);
            if (notify) {
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, configInfo.getDataId(), configInfo.getGroup(),
                    tenantTmp, time.getTime()));
            }

        } catch (CannotGetJdbcConnectionException e) {
            fatalLog.error("[db-error] " + e.toString(), e);
            throw e;
        }
    }

    /**
     * 更新配置信息
     */
    public void updateConfigInfo4Beta(ConfigInfo configInfo, String srcIp, String srcUser, Timestamp time,
                                      boolean notify) {
        String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
        String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
        try {
            String md5 = MD5.getInstance().getMD5String(configInfo.getContent());
            jt.update(
                "UPDATE config_info_beta SET content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,app_name=? WHERE "
                    + "data_id=? AND group_id=? AND tenant_id=?",
                configInfo.getContent(), md5, srcIp, srcUser, time, appNameTmp, configInfo.getDataId(),
                configInfo.getGroup(), tenantTmp);
            if (notify) {
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, configInfo.getDataId(), configInfo.getGroup(),
                    tenantTmp, time.getTime()));
            }

        } catch (CannotGetJdbcConnectionException e) {
            fatalLog.error("[db-error] " + e.toString(), e);
            throw e;
        }
    }

    //......
}
  • insertOrUpdate、insertOrUpdateTag、insertOrUpdateBeta三者的执行逻辑都是先执行insert操作,捕获到DataIntegrityViolationException时执行update操作

小结

  • publishConfig根据入参构造configAdvanceInfo及configInfo,对于前者会执行ParamUtils.checkParam(configAdvanceInfo)校验
  • 对于有betaIps的则执行persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false),然后发布ConfigDataChangeEvent
  • 对于没有betaIps的则判断tag是否为空,为空则执行persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false),不为空则执行persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);二者都会发布ConfigDataChangeEvent

doc

© 著作权归作者所有

go4it
粉丝 88
博文 1152
码字总数 1085438
作品 0
深圳
私信 提问
聊聊nacos config的EventDispatcher

序 本文主要研究一下nacos config的EventDispatcher EventDispatcher nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/utils/event/EventDispatcher.java EventDispatch......

go4it
09/06
31
0
聊聊nacos config的RequestLogAspect

序 本文主要研究一下nacos config的RequestLogAspect RequestLogAspect nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/aspect/RequestLogAspect.java RequestLogAspec......

go4it
09/05
20
0
聊聊nacos client的MetricsMonitor

序 本文主要研究一下nacos client的MetricsMonitor MetricsMonitor nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/monitor/MetricsMonitor.java MetricsMonitor内置了nacosMo......

go4it
09/03
32
0
聊聊nacos的DistroMapper

序 本文主要研究一下nacos的DistroMapper ServerChangeListener nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/servers/ServerChangeListener.java ServerChangeListe......

go4it
09/09
37
0
Spring Cloud Alibaba迁移指南(三):极简的 Config

自 Spring Cloud 官方宣布 Spring Cloud Netflix 进入维护状态后,我们开始制作《Spring Cloud Alibaba迁移指南》系列文章,向开发者提供更多的技术选型方案,并降低迁移过程中的技术难度。 ...

中间件小哥
02/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

BigDecimal 去后面无用的0的方法

BigDecimal a=new BigDecimal("0.1000"); System.out.println(a.stripTrailingZeros().toPlainString());...

xiaodong16
16分钟前
4
0
JAVA--高级基础开发

[集合版双色球] 十二、双色球规则:双色球每注投注号码由6个红色球号码和1个蓝色球号码组成。红色球号码从1—33中选择;蓝色球号码从1—16中选择;请随机生成一注双色球号码。(要求同色号码...

李文杰-yaya
昨天
14
0
聊聊rocketmq broker的CONSUMER_SEND_MSG_BACK

序 本文主要研究一下rocketmq broker的CONSUMER_SEND_MSG_BACK CONSUMER_SEND_MSG_BACK rocketmq/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java public class......

go4it
昨天
3
0
API常见接口(下)

system类 StringBuilder和StringBuffer 包装类 1.System类 (java.lang包中) 提供了大量的静态方法,可以获取与系统相关的信息或系统级操作。 常用方法: public static long currentTimeMi...

Firefly-
昨天
4
0
MySQL系列:一句SQL,MySQL是怎么工作的?

对于MySQL而言,其实分为客户端与服务端。 服务端,就是MySQL应用,当我们使用net start mysql命令启动的服务,其实就是启动了MySQL的服务端。 客户端,负责发送请求到服务端并从服务端获取数...

杨小格子
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部