Java
反射使我们能在程序运行时动态调用某个对象的方法/构造函数、获取某个对象的属性,经常用于实现动态代理、工厂模式、Java JDBC
加载连接驱动类等,近期阅读开源项目源码发现,它还有一种重要的用途——状态同步。
开源项目NNAnalytics是一个用Java
实现的HDFS
文件分析项目,能为我们实时分析、展示HDFS
中各类文件(小文件、中文件、大文件等)的数据。之所以能做到实时,与使用反射同步HDFS
元数据对象变化脱不开关系,接下我们来剖析一下该项目源码并使用一个测试用例来理解如何使用反射实现状态同步。
源码剖析
首先,在NNAnalytics
服务启动的时候,会加载fsimage
元数据镜像文件在JVM
内存中构建org.apache.hadoop.hdfs.server.namenode.INodeMap
对象,该对象保存了HDFS
中所有的INode
信息,INodeMap
在JVM
内存中保存了所有的HDFS
元数据信息。接着,在启动逻辑中,NNAnalytics
中的org.apache.hadoop.hdfs.server.namenode.AbstractQueryEngine
类会使用反射的getDeclaredField
方法获取INodeMap
类中的map
属性,该属性实际保存了INode
信息:
INodeMap inodeMap = fsDirectory.getINodeMap();
Field mapField = inodeMap.getClass().getDeclaredField("map");
mapField.setAccessible(true);
GSet<INode, INodeWithAdditionalFields> gset =
(GSet<INode, INodeWithAdditionalFields>) mapField.get(inodeMap);
在反射获取了保存INode
信息的map
对象之后,AbstractQueryEngine
类使用这些信息得到所有文件(files
)和目录(dirs
)信息:
files =
StreamSupport.stream(gset.spliterator(), true)
.filter(INode::isFile)
.collect(Collectors.toConcurrentMap(node -> node, node -> node));
dirs =
StreamSupport.stream(gset.spliterator(), true)
.filter(INode::isDirectory)
.collect(Collectors.toConcurrentMap(node -> node, node -> node));
all = CollectionsView.combine(files.keySet(), dirs.keySet());
以上代码中三个变量在AbstractQueryEngine
类中的声明如下:
protected Collection<INode> all;
protected Map<INode, INodeWithAdditionalFields> files;
protected Map<INode, INodeWithAdditionalFields> dirs;
由于反射操作的是JVM
内存中的对象信息,反射获取的对象和被反射的对象在JVM
中指向同一内存地址,因此,当INodeMap
中的map
属性更新时,all
、files
、dirs
变量也会同时更新,我们就能获取到HDFS
文件元数据信息对象的变化信息。
为了证实这一点,我们写段代码测试一下。
测试
接下来我们使用以下代码复现一下使用反射实现状态同步的场景,以下代码由Test1
和Test2
两个类组成:
Test1
类的定义如下:
public class Test1 {
// 保存随机数字
private final List<Integer> list;
static Test1 newInstance(List<Integer> list) {
return new Test1(list);
}
// 类初始化时调用启动add和remove方法
private Test1(List<Integer> list) {
add();
remove();
this.list = list;
}
public List<Integer> getList() {
return list;
}
/**
* 定时添加随机数字
*/
public final void add() {
ScheduledThreadPoolExecutor executors = new ScheduledThreadPoolExecutor(1);
executors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
list.add(new Random().nextInt(100));
}
}, 1, 2, TimeUnit.SECONDS);
}
/**
* 定时移除随机数字
*/
public final void remove() {
ScheduledThreadPoolExecutor executors = new ScheduledThreadPoolExecutor(21);
executors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
list.remove(0);
}
}, 1, 2, TimeUnit.SECONDS);
}
}
Test1
类很简单,主要有一个属性和两个方法:
list
属性,保存随机数字add
和remove
方法:每隔两秒添加或者移除list
中的元素
Test2
类的定义如下,主要逻辑见注释:
public class Test2 {
// 保存反射获取的Test1中list字段的值
protected static List<Integer> list2;
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
List<Integer> list1 = new ArrayList<>();
list1.add(1);
list1.add(2);
Test1 test1 = Test1.newInstance(list1);
// 1、反射获取list对象
Field field = test1.getClass().getDeclaredField("list");
// 2、设置访问字段时跳过安全检查,否则会抛出java.lang.IllegalAccessException异常
field.setAccessible(true);
// 3、反射获取Test1中list字段的值
list2 = (List) field.get(test1);
// 4、比较list1和list2是否指向同一地址,输出结果true
System.out.println(list2 == list1);
// 5、打印输出list的值,观察list2是否与list1的数据同步
while (true) {
System.out.println(list2);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
输出结果:
[40, 59]
[94, 43]
[43, 93]
[30, 90]
[90, 2]
[53, 56]
[56, 87]
[70, 56]
[56, 42]
[40, 8]
[8, 53]
[58, 91]
[91, 85]
[99, 73]
[73, 70]
[9, 1]
......
以上输出证明list2
能感知到list1
的变化,与list1
实现了状态同步。
总结
反射不仅可以帮我们实现动态代理、工厂模式、Java JDBC
加载数据库驱动类等操作,还可以帮我们实现状态同步,其根本原因在于反射获取的对象与被反射对象指向同一内存地址。在使用反射操作私有(private
)字段时,特别要注意设置字段跳过安全检查,也就是setAccessible(true)
,否则会抛出java.lang.IllegalAccessException
异常。