文档章节

kafka 0.10.1.0 权限验证源码分析

纳兰清风
 纳兰清风
发布于 2017/01/05 15:00
字数 1139
阅读 379
收藏 1

初始化流程图

ChannelBuilders.create创建ChannelBuilder对应关系如下:

switch (securityProtocol) {
    case SSL:
        requireNonNullMode(mode, securityProtocol);
        channelBuilder = new SslChannelBuilder(mode);
        break;
    case SASL_SSL:
    case SASL_PLAINTEXT:
        requireNonNullMode(mode, securityProtocol);
        if (loginType == null)
            throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
        if (mode == Mode.CLIENT && clientSaslMechanism == null)
            throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
        channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable);
        break;
    case PLAINTEXT:
    case TRACE:
        channelBuilder = new PlaintextChannelBuilder();
        break;
    default:
        throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
}


这里我们配置的listeners值是
listeners=SASL_PLAINTEXT://0.0.0.0:9092
故解析出来的protocol就是SASL_PLAINTEXT,相应的ChannelBuilder也就是SaslChannelBuilder
到这里,权限验证相关的组件算是构建完毕了,然后我们看当一个连接接进来的时候,这些组件是怎么工作的。
入口是Accepter.accept

接下来是Processor线程

+ SaslChannelBuilder.buildChannel:
    * 这里会根据构造SaslChannelBuilder时传进来的mode参数的不同选择构造SaslServerAuthenticator还是SaslClientAuthenticator,我们这里是服务端,当然是构造SaslServerAuthenticator.
    * 构造好SaslServerAuthenticator后会调用它的configure函数,进行一些初始化配置。
    * 把SaslServerAuthenticator对象作为参数传给KafkaChannel返回。
+ 这里注册好新的kafkaChannel后会调用poll函数完成一些IO的读写操作,而权限验证的处理就以这里为入口。
+ pollSelectionKeys函数会处理所有可完成连接,可读或可写的KafkaChannel。而权限验证部分则出现在KafkaChannel的准备阶段。
+ authenticate这个函数内会根据目前握手所处的状态的不同而做不同的处理,
    * 首先是HANDSHAKE_REQUEST状态,调用handleKafkaRequest处理第一阶段的握手请求,解析出客户端发来的mechanism,
    根据mechanism创建SaslServer,代码如下:

saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
    public SaslServer run() throws SaslException {
        return Sasl.createSaslServer(saslMechanism, "kafka", host, configs, callbackHandler);
    }
});


这里具体根据我们在jaas配置文件中的配置,最后是返回了一个PlainSaslServer,具体为什么返回了一个PlainSaslServer稍后讲。
* 然后下一状态是AUTHENTICATE,验证客户端发来的明文用户名和密码,调用了PlainSaslServer的evaluateResponse,代码如下

String[] tokens;
try {
    tokens = new String(response, "UTF-8").split("\u0000");
} catch (UnsupportedEncodingException e) {
    throw new SaslException("UTF-8 encoding not supported", e);
}
if (tokens.length != 3)
    throw new SaslException("Invalid SASL/PLAIN response: expected 3 tokens, got " + tokens.length);
authorizationID = tokens[0];
String username = tokens[1];
String password = tokens[2];

if (username.isEmpty()) {
    throw new SaslException("Authentication failed: username not specified");
}
if (password.isEmpty()) {
    throw new SaslException("Authentication failed: password not specified");
}
if (authorizationID.isEmpty())
    authorizationID = username;
try {
    String expectedPassword = JaasUtils.jaasConfig(LoginType.SERVER.contextName(), JAAS_USER_PREFIX + username);
    if (!password.equals(expectedPassword)) {
        throw new SaslException("Authentication failed: Invalid username or password");
    }
} catch (IOException e) {
    throw new SaslException("Authentication failed: Invalid JAAS configuration", e);
}


所以这里可以根据我们的需求根据客户端传过来的username和password动态的去某一个数据源获取和匹配其合法性。

jaas配置文件:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="xxxxx"
    password="yyyyy"
    user_xxxxx="yyyyy";
};


看到这里我们配置了一个org.apache.kafka.common.security.plain.PlainLoginModule

上面讲到返回了一个PlainSaslServer,具体是怎么返回的呢?我们需要从SaslChannelBuilder的configure说起,上流程图:
+ LoginContext.init这里会初始化ConfigFile,读取jaas配置文件中的内容
+ LoginContext.login这里会实例化jaas配置文件中的PlainLoginModule,并依次调用其initialize和login函数,
其中initialize函数会把username和password配置到subject里面去,这个subject最终会通过LoginManager的subject函数在SaslChannelBuilder的buildChannel函数中获取到,设置到SaslClientAuthenticator中用于和其他服务器通讯验证使用。
+ 而PlainLoginModule可不仅仅只做了这一件事情,该类定义了一个静态块初始化代码,调用了PlainSaslServerProvider的initialize函数用于注册创建负责做PLAIN协议验证的类的工厂类PlainSaslServerFactory。代码如下:

protected PlainSaslServerProvider() {
    super("Simple SASL/PLAIN Server Provider", 1.0, "Simple SASL/PLAIN Server Provider for Kafka");
    super.put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, PlainSaslServerFactory.class.getName());
}


我们看到注册的是一个SaslServerFactory.PLAIN -> org.apache.kafka.common.security.plain.PlainSaslServer.PlainSaslServerFactory的对应关系
而再看前面调用的Sasl的createSaslServer的代码:

String mechFilter = "SaslServerFactory." + mechanism;
Provider[] provs = Security.getProviders(mechFilter);
for (int j = 0; provs != null && j < provs.length; j++) {
    className = provs[j].getProperty(mechFilter);
    if (className == null) {
        throw new SaslException("Provider does not support " +
            mechFilter);
    }
    fac = (SaslServerFactory) loadFactory(provs[j], className);
    if (fac != null) {
        mech = fac.createSaslServer(
            mechanism, protocol, serverName, props, cbh);
        if (mech != null) {
            return mech;
        }
    }
}


其中mechFilter的值即是SaslServerFactory.PLAIN,这里取到PlainSaslServerFactory然后调用createSaslServer方法返回了一个PlainSaslServer

综上,kafka内部的整个权限验证的初始化流程和验证逻辑已经比较清晰了(可能讲的比较乱,反正我是清晰了),但是我们发现,跟网上的其他自己编写LoginModule模块做验证的方式不同,kafka的PlainLoginModule这个类本身并没有做什么跟验证有关的逻辑,

只是做了一些初始化和注册provider的工作,而真正做权限验证的是从provider间接生产出来的PlainSaslServer类。


 

© 著作权归作者所有

纳兰清风

纳兰清风

粉丝 33
博文 36
码字总数 37100
作品 0
朝阳
程序员
私信 提问
Apache Kafka 0.10.1.0 发布,大量更新

Apache Kafka 0.10.1.0 发布了,该版本更新了大量内容,主要改进如下: 新特性 [KAFKA-1464] - Add a throttling option to the Kafka replication tool [KAFKA-3176] - Allow console cons......

局长
2016/10/21
5.4K
4
kafka_0.10.1.0监控及管理

kafka_0.10.1.0监控及管理 1. kafka监控 kafka自身没有监控管理页面,无论是进行一些管理操作还是状态的监控都要命令加一大堆记不住的参数,实在是很不方便,不过好在在github上开源了一些工...

舒文joven
2018/07/19
215
1
windows下kafka的认证配置总结

config目录下创建kafkaserverjaas.conf文件:内容如下: KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user......

思想的行走
2017/11/14
0
0
Kafka压力测试(写入MQ消息压测和消费MQ消息压测)

1.测试目的 本次性能测试在正式环境下单台服务器上Kafka处理MQ消息能力进行压力测试。测试包括对Kafka写入MQ消息和消费MQ消息进行压力测试,根据10w、100w和1000w级别的消息处理结果,评估K...

数据架构师
2018/11/06
0
0
基于docker部署的微服务架构(六): 日志统一输出到kafka中间件

前言 上一篇 基于docker部署的微服务架构(五): docker环境下的zookeeper和kafka部署 中,已经成功部署了 kafka 环境,现在我们要改造之前的项目,使用 log4j2 的 kafka appender 把日志统...

月冷X心寒
2016/11/22
1K
4

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
今天
1K
12
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
22
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
17
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
30
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
12
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部