本文主要是希望通过分析Server.main()方法的源码,了解seata-server在启动时都做了哪些工作。
Main方法是seata-server的启动方法,在该方法中对很多服务进行了初始化,具体如下:
public static void main(String[] args) throws IOException { //初始化 metrics MetricsManager.get().init(); //initialize the parameter parser ParameterParser parameterParser = new ParameterParser(args); //设置server端的持久化模式,默认使用文件模式 System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode()); RpcServer rpcServer = new RpcServer(WORKING_THREADS); //server port 默认8091 rpcServer.setListenPort(parameterParser.getPort()); UUIDGenerator.init(parameterParser.getServerNode()); //初始化全局事务 SessionHolder.init(parameterParser.getStoreMode()); //初始化事务协调器TC DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer); coordinator.init(); rpcServer.setHandler(coordinator); // 注册钩子函数,用于服务退出时coordinator的处理 ShutdownHook.getInstance().addDisposable(coordinator); //127.0.0.1 and 0.0.0.0 are not valid here. if (NetUtil.isValidIp(parameterParser.getHost(), false)) { XID.setIpAddress(parameterParser.getHost()); } else { XID.setIpAddress(NetUtil.getLocalIp()); } XID.setPort(rpcServer.getListenPort()); rpcServer.init(); System.exit(0); }
1.初始化metrics
MetricsManager.get().init();
Metrics,作为 APM(Application Performance Management & Monitoring)三大基石之一,通过它可以快速详尽的获取到TC,TM和RM中事务的活动状态以及时延等重要统计信息。
metrics默认是关闭的,其配置信息在file.conf中,具体如下:
## metrics settings metrics { enabled = false registry-type = "compact" # multi exporters use comma divided exporter-list = "prometheus" exporter-prometheus-port = 9898 }
2.设置额外参数
ParameterParser parameterParser = new ParameterParser(args);
3.设置Server端的存储模式(store.mode)
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
Server端存储模式(store.mode)现有file和db两种。
file模式为单机模式,全局事务会话信息在内存中读写并持久化到本地文件root.data中,性能较高,seata-server默认是file模式。
db模式为高可用模式,全局事务会话信息通过db共享,性能较file模式稍差。
两种模式可以通过file.conf文件中的store.mode属性配置,具体配置信息如下:
## transaction log store store { ## store mode: file、db mode = "file" ## file store file { dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions max-branch-session-size = 16384 # globe session size , if exceeded throws exceptions max-global-session-size = 512 # file buffer size , if exceeded allocate new buffer file-write-buffer-cache-size = 16384 # when recover batch read size session.reload.read_size = 100 # async, sync flush-disk-mode = async } ## database store db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "dbcp" ## mysql/oracle/h2/oceanbase etc. db-type = "mysql" driver-class-name = "com.mysql.jdbc.Driver" url = "jdbc:mysql://127.0.0.1:3306/seata" user = "mysql" password = "mysql" min-conn = 1 max-conn = 3 global.table = "global_table" branch.table = "branch_table" lock-table = "lock_table" query-limit = 100 } }
配置为file模式时,会创建一个root.data文件,用于持久化事务信息,文件默认存储在项目的sessionStore路径下,可以在store.file.dir中修改该路径配置
配置为db模式时,需要创建global_table,branch_table,lock_table三张表,分别用于存储全局事务,分支事务以及全局锁。这三张表的表结构在db_store.sql文件中,执行即可。
4.rpc服务初始化
RpcServer rpcServer = new RpcServer(WORKING_THREADS); //server port rpcServer.setListenPort(parameterParser.getPort());
seata通过RpcServer类启动服务监听端口(默认是8091),该端口用于seata Server端和client端的数据通信
seata的rpc服务是通过netty实现的
WORKING_THREADS是一个线程池,默认的配置信息在Server类中,如下:
private static final int MIN_SERVER_POOL_SIZE = 100; private static final int MAX_SERVER_POOL_SIZE = 500; private static final int MAX_TASK_QUEUE_SIZE = 20000; private static final int KEEP_ALIVE_TIME = 500;
5.SessionHolder的初始化
//log store mode : file、db SessionHolder.init(parameterParser.getStoreMode());
SessionHolder 用来做全局事务树的管理,根据上文设置的store.mode类型分别进行初始化设置。
SessionHolder 内部拥有四种不同功能的sessionManager,分别用于不同的场景:
-
ROOT_SESSION_MANAGER:根Session,存储了所有的GlobalSession
-
ASYNC_COMMITTING_SESSION_MANAGER:管理需要做异步commit的Session。
-
RETRY_COMMITTING_SESSION_MANAGER:管理重试commit的Session。
-
RETRY_ROLLBACKING_SESSION_MANAGER:管理重试回滚的Session。
6.DefaultCoordinator初始化
DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer); coordinator.init(); rpcServer.setHandler(coordinator);
DefaultCoordinator 用来处理事务执行逻辑
7.钩子初始化
// register ShutdownHook ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook是seata中一个钩子类,继承了Thread类,在此处的作用主要是优雅的关闭DefaultCoordinator
DefaultCoordinator实现了Disposable接口的distory()方法,ShutdownHook内部维护了一个Disposable的集合,此处代码的作用就是将DefaultCoordinator添加到该集合中,在jvm退出时,ShutdownHook内的线程会调用DefaultCoordinator类的distory()方法,该方法会关闭DefaultCoordinator内部设置的各种线程池资源。
(注:Disposable是seata中定义的一个用于处理可回收资源的接口,接口中只有一个方法distory())
未完待续....