文档章节

hdp-storm的kafka kerberos认证

 如风达
发布于 2017/09/05 17:10
字数 1024
阅读 86
收藏 0

1、使用Apache Storm-Kafka的插件包

一直使用apache storm-kafka的工具包去消费kafka,索性直接配置好对应的zookeeper集群的host、port,kafka集群的host、port。

直接放到HDP-storm的环境上提交。果然直接报错查看异常发现拿不到kafka leader所在机器host、port信息。查看apache storm-kafka源码发现它是从zk的/brokers/ids/0下获得的.

//自己维护和构建所有topic的partition对应的host与port信息,因kafka管理集群都是通过将topic、分区、副本信息写入zk
//中监听更新或删除的。所以在zk中可以读取到kafka所有的状态信息
public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException {
      List<String> topics =  getTopics();//获得所有的topic从zk的/brokers/topics下获得
      List<GlobalPartitionInformation> partitions =  new ArrayList<GlobalPartitionInformation>();

      for (String topic : topics) {
          GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic);
          try {
         //获得当前topic中有几个partition从zk的/brokers/topics/distributedTopic/partitions/下获得.
              int numPartitionsForTopic = getNumPartitions(topic);
              String brokerInfoPath = brokerPath();//一个broker路径默认是/brokers/ids
              //找到每个partition的leader对应的host和port为后面创建consumer做准备
              for (int partition = 0; partition < numPartitionsForTopic; partition++) {
                //获得partition的leader在zk中保存的路径,默认0、1、2....可在hdp中1001、1002、1003....
                  int leader = getLeaderFor(topic,partition);
                  String path = brokerInfoPath + "/" + leader;// /brokers/ids/1001
                  try {
                      //获得/brokers/ids/1001 znode的信息
                      byte[] brokerData = _curator.getData().forPath(path);
                      Broker hp = getBrokerHost(brokerData);//拿到host与port
                      //构建partition与broker对应关系简单说就是partition所在的host机器和port
                      globalPartitionInformation.addPartition(partition, hp);
                  } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
                      LOG.error("Node {} does not exist ", path);
                  }
              }
          } catch (SocketTimeoutException e) {
              throw e;
          } catch (Exception e) {
              throw new RuntimeException(e);
          }
          LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
          partitions.add(globalPartitionInformation);
      }
        return partitions;
    }
/**
     * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
     * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
     *
     * @param contents
     * @return
     */
    private Broker getBrokerHost(byte[] contents) {
        try {
            Map<Object, Object> value = (Map<Object, Object>) JSONValue.parseWithException(new String(contents, "UTF-8"));
            String host = (String) value.get("host");
            Integer port = ((Long) value.get("port")).intValue();
            return new Broker(host, port);
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

可是上HDP-zookeeper中查看发现/brokers/ids/1001下面的host:null、port:-1.

找到获取partition映射host与port,发现无非就是在对应zk的znode上获取到他的JSONValue值拿到host与port,虽然这个/brokers/ids/1001的znode下面host:null,port:-1.不知道出于什么原因HDP的kafka要将host与port变成null与-1,但是我发现这个endpoints中一样可以拿到很显然host : woker11-cs.zuhu2.com , port : 6667.

修改后使用maven打包.结果测试发现host与port确实拿到了,可是还是无法消费.

google发现apache storm-kafka安全认证的API,只有hdp二次开发storm-kafka才有

    public SimpleConsumer register(Broker host, String topic, int partition) {
        if (!this._connections.containsKey(host)) {
            this._connections.put(host, new DynamicPartitionConnections.ConnectionInfo(
new SimpleConsumer(host.host, host.port, this._config.socketTimeoutMs,
 this._config.bufferSizeBytes, this._config.clientId, this._config.securityProtocol)));
        }

        DynamicPartitionConnections.ConnectionInfo info = (DynamicPartitionConnections.ConnectionInfo)this._connections.get(host);
        info.partitions.add(this.getHashKey(topic, partition));
        return info.consumer;
    }

调用register获取SompleConsumer,hdp版本的比apache storm-kafka的多一个securityProtocol变量.这个值默认是"PLAINTEXT".

hdp的kafka_2.10中的SimpleConsumer默认它是开启认证的所以你在创建时只给host、port、soTimeout、bufferSize、clientId,一样会默认给securityProtocol赋值为“PLAINTEXT”

apache的kafka_2.10中的SimpleConsumer

最根本的还是storm创建new SimpleConsumer使用的kafka_2.10 API也二次开发了.它提供了安全认证的API.如下apache kafka_2.10没有提供认证API

最后发现自己修改apache storm-kafka是不行了.kafka_2.10和kafka-clients都是经过二次开发了的.

由于网络原因这些包都下载不了.我去

http://repo.hortonworks.com/content/repositories/releases/

官网去下载

并手动的install到本地maven库中.

mvn install:install-file -Dfile=F:\sougoDownload\storm-kafka-1.0.1.2.5.3.0-37.jar -DgroupId=org.apache.storm -DartifactId=storm-kafka -Dversion=1.0.1.2.5.3.0-37 -Dpackaging=jar

最后打包观察可以消费并发送数据到kafka.

注意下载的hdp版本的包必须和环境中的版本一直.hdp各个版本存在差异并不像java向后兼容.最开始我下的最新版本storm-kafka 1.1.0.3.0.1.3-1,KafkaBolt继承的BaseTickTupleAwareRichBolt但是环境的storm-core是1.0.1.2.5.3.0-37中根本没有BaseTickTupleAwareRichBolt这个类.

storm-kafka 1.0.1.2.5.3.0-37版本的KafkaBolt继承的是BaseRichBolt

© 著作权归作者所有

共有 人打赏支持
粉丝 7
博文 255
码字总数 22313
作品 0
深圳
私信 提问
Apache Storm 0.10.0-beta 发布

Apache Storm 0.10.0-beta 发布,值得关注的更新如下: 安全,多租户部署: Kerberos Authentication with Automatic Credential Push and Renewal Pluggable Authorization and ACLs Multi-......

oschina
2015/06/16
1K
0
基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats)

基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats) 网盘地址:https://pan.baidu.com/s/1-59o76H32Jfp0Vp-o8zLYg 提取码: im8f 备用地址(腾讯微云):https://share...

小小搞笑弟
11/16
0
0
Apache Storm 0.9.6/0.10.0 发布

Apache Storm 0.10.0 发布,此版本是个稳定版本,相比之前的 Beta 版本主要包括 bug 修复和改进: STORM-1108: Fix NPE in simulated time STORM-1106: Netty should not limit attempts to...

oschina
2015/11/06
4.7K
8
Apache Storm 1.2.0,1.1.2 和 1.0.6 发布

Apache Storm 1.2.0,1.1.2 和 1.0.6 发布了。主要更新内容及下载地址如下: 1.2.0 New Feature [STORM-2383] - [storm-hbase] Support HBase as state backend [STORM-2484] - Flux: suppo......

达尔文
02/18
923
0
Apache Storm 1.0.1 发布,分布式实时计算

Apache Storm 1.0.1 发布了,这是一个维护版本,主要修复了一些重要的Bugs,提升性能、稳定性、以及容错能力。内部通信机制消息吞吐量是之前的一倍多,这使得任务间通信效率大大提升。 完整改...

oschina
2016/05/07
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

selenium 结合 docker 构建分布式测试环境

随着自动化测试越学越深,深深觉得有太多的东西需要总结。 1.记录下学习中遇到的坑,当做学习笔记。 2.有前人路过看到文章中比较落后的做法,请务必一定要指教。(因为是初学者视角,很多东西...

呐呐丶嘿
5分钟前
0
0
PostgreSQL 安装启动使用一条龙教程——Ubuntu 16.04

今天想尝试下 PostgreSQL,分享一下在 Ubuntu 16.04 下安装启动使用 PostgreSQL 一条龙方法。 添加第三方 apt 仓库: sudo add-apt-repository "deb http://apt.postgresql.org/pub/repos/a...

宇润
6分钟前
0
0
对于json文件的读写操作

对json文件的读操作 返回的一个列表,里面是多个字典 def read_json(self,jsonname): with open(r"./{}.json".format(jsonname),"r") as json_f: text_list = json......

鹏灬
9分钟前
0
0
Date-Time API简介

  Date-Time API简介      在Java8之前的版本中,我们处理时间类型常常使用的是java.util包下的Date类。但使用Date类却有诸多的弊端,如: java.util.Date 是非线程安全的,所有的日期...

SEOwhywhy
9分钟前
1
0
实体类生成对应的建表语句

通过实体类生成对应的建表语句 用java代码根据实体类自动生成对应的建表语句或生成某个包下的所有类的建表语句 根据实体类反射生成SQL java 根据实体对象生成 增删改的SQL语句 ModelToSQL...

miaojiangmin
12分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部