文档章节

hdp-storm的kafka kerberos认证

 如风达
发布于 2017/09/05 17:10
字数 1024
阅读 910
收藏 0

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

1、使用Apache Storm-Kafka的插件包

一直使用apache storm-kafka的工具包去消费kafka,索性直接配置好对应的zookeeper集群的host、port,kafka集群的host、port。

直接放到HDP-storm的环境上提交。果然直接报错查看异常发现拿不到kafka leader所在机器host、port信息。查看apache storm-kafka源码发现它是从zk的/brokers/ids/0下获得的.

//自己维护和构建所有topic的partition对应的host与port信息,因kafka管理集群都是通过将topic、分区、副本信息写入zk
//中监听更新或删除的。所以在zk中可以读取到kafka所有的状态信息
public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException {
      List<String> topics =  getTopics();//获得所有的topic从zk的/brokers/topics下获得
      List<GlobalPartitionInformation> partitions =  new ArrayList<GlobalPartitionInformation>();

      for (String topic : topics) {
          GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic);
          try {
         //获得当前topic中有几个partition从zk的/brokers/topics/distributedTopic/partitions/下获得.
              int numPartitionsForTopic = getNumPartitions(topic);
              String brokerInfoPath = brokerPath();//一个broker路径默认是/brokers/ids
              //找到每个partition的leader对应的host和port为后面创建consumer做准备
              for (int partition = 0; partition < numPartitionsForTopic; partition++) {
                //获得partition的leader在zk中保存的路径,默认0、1、2....可在hdp中1001、1002、1003....
                  int leader = getLeaderFor(topic,partition);
                  String path = brokerInfoPath + "/" + leader;// /brokers/ids/1001
                  try {
                      //获得/brokers/ids/1001 znode的信息
                      byte[] brokerData = _curator.getData().forPath(path);
                      Broker hp = getBrokerHost(brokerData);//拿到host与port
                      //构建partition与broker对应关系简单说就是partition所在的host机器和port
                      globalPartitionInformation.addPartition(partition, hp);
                  } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
                      LOG.error("Node {} does not exist ", path);
                  }
              }
          } catch (SocketTimeoutException e) {
              throw e;
          } catch (Exception e) {
              throw new RuntimeException(e);
          }
          LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
          partitions.add(globalPartitionInformation);
      }
        return partitions;
    }
/**
     * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
     * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
     *
     * @param contents
     * @return
     */
    private Broker getBrokerHost(byte[] contents) {
        try {
            Map<Object, Object> value = (Map<Object, Object>) JSONValue.parseWithException(new String(contents, "UTF-8"));
            String host = (String) value.get("host");
            Integer port = ((Long) value.get("port")).intValue();
            return new Broker(host, port);
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

可是上HDP-zookeeper中查看发现/brokers/ids/1001下面的host:null、port:-1.

找到获取partition映射host与port,发现无非就是在对应zk的znode上获取到他的JSONValue值拿到host与port,虽然这个/brokers/ids/1001的znode下面host:null,port:-1.不知道出于什么原因HDP的kafka要将host与port变成null与-1,但是我发现这个endpoints中一样可以拿到很显然host : woker11-cs.zuhu2.com , port : 6667.

修改后使用maven打包.结果测试发现host与port确实拿到了,可是还是无法消费.

google发现apache storm-kafka安全认证的API,只有hdp二次开发storm-kafka才有

    public SimpleConsumer register(Broker host, String topic, int partition) {
        if (!this._connections.containsKey(host)) {
            this._connections.put(host, new DynamicPartitionConnections.ConnectionInfo(
new SimpleConsumer(host.host, host.port, this._config.socketTimeoutMs,
 this._config.bufferSizeBytes, this._config.clientId, this._config.securityProtocol)));
        }

        DynamicPartitionConnections.ConnectionInfo info = (DynamicPartitionConnections.ConnectionInfo)this._connections.get(host);
        info.partitions.add(this.getHashKey(topic, partition));
        return info.consumer;
    }

调用register获取SompleConsumer,hdp版本的比apache storm-kafka的多一个securityProtocol变量.这个值默认是"PLAINTEXT".

hdp的kafka_2.10中的SimpleConsumer默认它是开启认证的所以你在创建时只给host、port、soTimeout、bufferSize、clientId,一样会默认给securityProtocol赋值为“PLAINTEXT”

apache的kafka_2.10中的SimpleConsumer

最根本的还是storm创建new SimpleConsumer使用的kafka_2.10 API也二次开发了.它提供了安全认证的API.如下apache kafka_2.10没有提供认证API

最后发现自己修改apache storm-kafka是不行了.kafka_2.10和kafka-clients都是经过二次开发了的.

由于网络原因这些包都下载不了.我去

http://repo.hortonworks.com/content/repositories/releases/

官网去下载

并手动的install到本地maven库中.

mvn install:install-file -Dfile=F:\sougoDownload\storm-kafka-1.0.1.2.5.3.0-37.jar -DgroupId=org.apache.storm -DartifactId=storm-kafka -Dversion=1.0.1.2.5.3.0-37 -Dpackaging=jar

最后打包观察可以消费并发送数据到kafka.

注意下载的hdp版本的包必须和环境中的版本一直.hdp各个版本存在差异并不像java向后兼容.最开始我下的最新版本storm-kafka 1.1.0.3.0.1.3-1,KafkaBolt继承的BaseTickTupleAwareRichBolt但是环境的storm-core是1.0.1.2.5.3.0-37中根本没有BaseTickTupleAwareRichBolt这个类.

storm-kafka 1.0.1.2.5.3.0-37版本的KafkaBolt继承的是BaseRichBolt

© 著作权归作者所有

粉丝 7
博文 256
码字总数 23157
作品 0
深圳
私信 提问
加载中

评论(0)

Storm Kafka与配置和代码集成

1.目标 - 风暴卡夫卡整合 在本Kafka教程中,我们将学习Storm Kafka Integration的概念。此外,我们将在此Kafka Storm集成教程中讨论Storm架构,Storm Cluster。因此,为了使Kafka开发人员更容...

osc_qgmf9fjw
2019/05/12
3
0
zookeeper+kafka集群部署+storm集群

zookeeper+kafka集群部署+storm集群 一、环境安装前准备: 准备三台机器 操作系统:centos6.8 jdk:jdk-8u111-linux-x64.gz zookeeper:zookeeper-3.4.11.tar.gz kafka: kafka_2.11-1.0.1.tgz ......

jxzhfei
2018/09/04
0
0
使用不同的namespace让不同的kafka/Storm连接同一个zookeeper

背景介绍: 需要部署2个kafka独立环境,但是只有一个zookeeper集群。 需要部署2个独立的storm环境,但是只有一个zookeeper集群。 -------------------------- kafka配置 ------------------...

osc_lteogyh9
2018/08/22
2
0
Flume+Kafka+Storm整合

Flume+Kafka+Storm整合 1. 需求: 有一个客户端Client可以产生日志信息,我们需要通过Flume获取日志信息,再把该日志信息放入到Kafka的一个Topic:flume-to-kafka 再由Storm读取该topic:flu...

Hongten
2018/12/18
0
0
Storm集成Kafka应用的开发

Storm集成Kafka应用的开发   我们知道storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据...

osc_l26z0337
2018/05/21
4
0

没有更多内容

加载失败,请刷新页面

加载更多

QT 执行shell命令

(1)首先包含头文件: #include <QProcess> (2)执行shell命令: QProcess::execute("ls");

悲催的古灵武士
27分钟前
22
0
osgEarth使用笔记3——加载倾斜摄影数据

目录 1. 概述 2. 详论 2.1. 位置 2.2. 着色 2.3. 其他 3. 结果 4. 参考 1. 概述 我在《OSG加载倾斜摄影数据》这篇博文中论述了如何通过OSG生成一个整体的索引文件,通过这个索引文件来正确显...

osc_7oc4d1en
28分钟前
19
0
cesium加载gltf模型点击以及列表点击定位弹窗

前言 cesium 官网的api文档介绍地址cesium官网api,里面详细的介绍 cesium 各个类的介绍,还有就是在线例子:cesium 官网在线例子,这个也是学习 cesium 的好素材。 之前有部分订阅者咨询我,...

osc_cx8uhydz
29分钟前
14
0
思维导图软件如何插入图片?具体步骤?

学习思维导图制作的过程中,会遇到很多没有学过的知识,需要我们不断地去改进和学习,这样增强自己的学习能力,才能更好地掌握制图软件。以后帮助我们快速方便地完成制图,今天我们就要来看看...

深蓝月上
30分钟前
25
0
Notepad++ 列块模式编辑,替换换行符

一、列块模式编辑: 1、数据准备 2、按住 “Alt + 鼠标左键” 选择需要列块模式编辑的区域,可以看到多了一条竖线 3、之后批量可以添加,修改内容 二、替换换行符 上面说了列块模式的编辑,后...

osc_itgved4p
30分钟前
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部