文档章节

php连接kafka

 果树啊
发布于 2017/06/30 10:54
字数 554
阅读 230
收藏 0
点赞 0
评论 0

安装kafka

yum search jdk
yum -y install java-1.8.0-openjdk.i686 java-1.8.0-openjdk-devel.i686
wget http://mirror.its.dal.ca/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -xzvf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0/bin
#启动zookeeper
./zookeeper-server-start.sh  ../config/zookeeper.properties  &
#启动kafka
 ./kafka-server-start.sh ../config/server.properties &
ps -ef | grep kafka




创建一个叫"test1234"的topic,它只有一个分区,一个副本:
 ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1234

可以用list查看创建的topic,当前创建了4个topic
./kafka-topics.sh --list --zookeeper localhost:2181

发送消息。运行producer并在控制台中输一些消息,这些消息将被发送到服务端
./kafka-console-producer.sh --broker-list localhost:9092 --topic test1234

开启consumer,可以读取到刚才发出的消息并输出。
 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1234 --from-beginning

 

安装可视化kafka管理工具

curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
mv bintray-sbt-rpm.repo /etc/yum.repos.d/
yum install sbt
git clone https://github.com/yahoo/kafka-manager
cd kafka-manager
./sbt clean dist
cp target/universal/kafka-manager-1.3.3.13.zip /usr/local/kafka-manager.zip
unzip kafka-manager.zip
vi conf/application.conf
kafka-manager.zkhosts="127.0.0.1:2181"
nohup bin/kafka-manager -Dconfig.file=/conf/application.conf -Dhttp.port=9999 &

安装扩展

wget https://github.com/edenhill/librdkafka/archive/master.zip
unzip master.zip
cd librdkafka-master/
./configure
make
sudo make install
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/php/bin/php-config 
make all -j 5
sudo make install
vi /usr/local/php/etc/php.ini
extension=rdkafka.so

生产者

$conf = new \RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
  if ($message->err) {
    // message permanently failed to be delivered
    $error_info = rd_kafka_err2str ($message->err);
  } else {
    // message successfully delivered
    print_r($message);
  }
});
        $conf->setErrorCb(function ($kafka, $err, $reason) {
            printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
        });
        $rk = new \RdKafka\Producer($conf);
        $rk->setLogLevel(LOG_DEBUG);
        $rk->addBrokers('IP:PORT');
        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set('message.timeout.ms', 100);//设置超时时间避免发送不成功长期堵住
        $topic = $rk->newTopic('TOPIC_ID',$topicConf);
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'message');

消费者

<?php
$config = require('config.php');
if(!extension_loaded('Rdkafka')){
    //如果扩展没有被加载
    file_put_contents("./extension.txt", "RdKafka extension is not load",FILE_APPEND);
}
$conf = new \RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    if ($message->err) {
        $error_info = rd_kafka_err2str ($message->err);
        echo $error_info;
    } else {
    }
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});
$rk = new \RdKafka\Consumer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers($config['kafka']['consumer']['brokers']);
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$topicConf->set('auto.commit.enable', true);
$topicConf->set('offset.store.path', './kafka_offset');
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.sync.interval.ms', 0);
$topic = $rk->newTopic($config['kafka']['consumer']['topic'], $topicConf);
do{
    $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
    $msg = $topic->consume(0, 1000);
    if($msg != null && $msg->err == 0){
        $data = $msg->payload;
    }
    $topic->consumeStop(0);
}while(true);
return array(
    'kafka' => array(
        'produce' => array(
            'brokers' => '127.0.0.1:9092',
            'topic' => '1'
        ),
        'consumer' => array(
            'brokers' => '127.0.0.1:9092',
            'topic' => '1'
        )
    ),
);

可配置属性

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

文档

https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html

 

shell命令 http://www.cnblogs.com/xiaodf/p/6093261.html#3

 

© 著作权归作者所有

共有 人打赏支持
粉丝 11
博文 207
码字总数 49313
作品 0
福州
高级程序员
喵了个咪/See-KafKa

#See-KafKa 简单舒适的PHP-KafKa拓展 ##前言 (Simple 简单 easy 容易 expand 的拓展) KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通...

喵了个咪
2016/09/27
0
0
简单舒适的 PHP-KafKa 拓展--See-KafKa

[喵咪KafKa(3)]PHP拓展See-KafKa 前言 (Simple 简单 easy 容易 expand 的拓展) KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通量(同...

喵了_个咪
2016/09/27
2.1K
1
[喵咪KafKa(3)]PHP拓展See-KafKa

[喵咪KafKa(3)]PHP拓展See-KafKa 前言 (Simple 简单 easy 容易 expand 的拓展) KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通量(同...

喵了_个咪
2016/09/27
800
1
[转]使用PHP处理Kafka消息

Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用...

daos
01/03
0
0
weiboad/kafka-php

Kafka-php English Document Kafka-php 使用纯粹的PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka,该项目 v0.2.x 和 v0.1.x 不兼容,如果使用原有的 v0.1.x 的可以参照文档 Ka...

weiboad
2017/04/28
0
0
PHP 编写的 kafka 客户端--kafka-php

Kafka-php 使用纯粹的 PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka,该项目 v0.2.x 和 v0.1.x 不兼容,如果使用原有的 v0.1.x 的可以参照文档 Kafka PHP v0.1.x Document, 不...

nmred_2008
2017/04/28
465
0
[喵咪KafKa(2)]单机模式运行KafKa

在上节我们介绍完KafKa之后,今天我们来搭建KafKa三种模式(单机模式,伪集群,集群)中的一种单机模式的搭建,在正常的使用中我们一般吧单机模式作为开发环境的标配,今天就来和喵咪一同搭建一个K...

喵了_个咪
2016/09/12
190
0
kafka kerberos 认证访问与非认证访问共存下的ACL问题

在一个正在运行的kafka集群中添加kerberos认证和ACL权限控制,同时保证以前所有的producerconsumer服务不中断 解决方式: 使kafka集群监听两个端口,一个为无认证连接,另一个为kerberos的认...

落花非有意
2017/08/22
0
0
spring的cloud之kafka配置

Kafka配置 在上面的例子中,由于Kafka、ZooKeeper均运行于本地,所以我们没有在测试程序中通过配置信息来指定Kafka和ZooKeeper的配置信息,就完成了本地消息总线的试验。但是我们实际应用中,...

wilesun
2016/11/18
950
0
Kafka 生产者守护进程--Bruce

Bruce 是 Apache Kafka 的生产者守护进程,它简化了客户端发送消息到 Kafka ,无需关注后端的 Kafka 集群。Bruce 主要处理: Routing messages to the proper brokers, and spreading the lo...

小编辑
2014/09/10
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

JPA @MappedSuperclass 注解说明

基于代码复用和模型分离的思想,在项目开发中使用JPA的@MappedSuperclass注解将实体类的多个属性分别封装到不同的非实体类中。 1.@MappedSuperclass注解只能标准在类上:@Target({java.lang....

海博1600
6分钟前
0
0
Scala Configuration 相关API

Play使用了 Typesafe config library,但是也提供了一个有着更多Scala高级特性的的 Configuration 封装。不熟悉Typesafe配置的开发者可以移步 configuration文件的语法和特性文档。 读取配置...

Landas
今天
1
0
使用cookie技术 记住账号

1. 效果 2. 实现过程 2.1 前端 将用户的选中传递给后台 这个参数的获取是 参考:https://my.oschina.net/springMVCAndspring/blog/1860498 // var rememberLogin = $("#rememberLoginId").i...

Lucky_Me
今天
1
0
《趣谈网络协议》02之网络分层的真实含义

一、提出问题 1.提出问题 当你听到什么二层设备、三层设备、四层 LB 和七层 LB 中层的时候,是否有点一头雾水,不知道这些所谓的层,对应的各种协议具体要做什么“工作”? 2.这四个问题你弄...

aibinxiao
今天
2
0
Python3学习日志二 Python中的集合set和字典dict

1.集合set 定义一个集合set 我们可以看到定义集合set有两种不同的形式,如果要定义一个空的集合set不能用{}而是要用set();另外,集合是无序的,而且set中的元素是不可重复的,如果你定义了一...

Mr_bullshit
今天
0
0
adb 操作指令详解

ADB,即 Android Debug Bridge,它是 Android 开发/测试人员不可替代的强大工具,也是 Android 设备玩家的好玩具。 注:有部分命令的支持情况可能与 Android 系统版本及定制 ROM 的实现有关。...

孟飞阳
今天
0
0
nodejs安装以及环境配置(很好的node安装和配置文章,少走很多弯路)

一、安装环境 1、本机系统:Windows 10 Pro(64位) 2、Node.js:v6.9.2LTS(64位) 二、安装Node.js步骤 1、下载对应你系统的Node.js版本:https://nodejs.org/en/download/ 2、选安装目录进...

sprouting
今天
1
0
Redisson

了解了Redisson,发现使用挺简单的,接下来准备深入学习一下。 Redisson介绍 Redisson是架设于Redis基础之上的一个Java驻内存数据网格(In-Memory Data Grid) Redisson在基于NIO的Netty框架上...

to_ln
今天
0
0
python有哪些好玩的应用实现,用python爬虫做一个二维码生成器

python爬虫不止可以批量下载数据,还可以有很多有趣的应用,之前也发过很多,比如天气预报实时查询、cmd版的实时翻译、快速浏览论坛热门帖等等,这些都可以算是爬虫的另一个应用方向! 今天给...

python玩家
今天
0
0
python爬虫日志(3)-爬去异步加载网页

在浏览器检查元素页面中,选取Network中的XHR选项即可观察每次加载页面,网页发出的请求,观察url的规律即可利用封装的函数对每一页进行爬取。

茫羽行
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部