文档章节

php连接kafka

 果树啊
发布于 2017/06/30 10:54
字数 690
阅读 292
收藏 0

安装kafka

yum search jdk
yum -y install java-1.8.0-openjdk.i686 java-1.8.0-openjdk-devel.i686
wget http://mirror.its.dal.ca/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -xzvf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0/bin
#启动zookeeper
./zookeeper-server-start.sh  ../config/zookeeper.properties  &
#启动kafka
 ./kafka-server-start.sh ../config/server.properties &
ps -ef | grep kafka




创建一个叫"test1234"的topic,它只有一个分区,一个副本:
 ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1234

可以用list查看创建的topic,当前创建了4个topic
./kafka-topics.sh --list --zookeeper localhost:2181

发送消息。运行producer并在控制台中输一些消息,这些消息将被发送到服务端
./kafka-console-producer.sh --broker-list localhost:9092 --topic test1234

开启consumer,可以读取到刚才发出的消息并输出。
 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1234 --from-beginning

安装可视化工具

wget http://ftp.cuhk.edu.hk/pub/packages/apache.org/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.zip
unzip apache-maven-3.5.4-bin.zip
mv apache-maven-3.5.4 apache-maven
vim /etc/profile
export MAVEN_HOME=/root/dev/apache-maven-3.5.4
export PATH=$PATH:$MAVEN_HOME/bin
source /etc/profile
wget https://github.com/HomeAdvisor/Kafdrop/archive/kafdrop-2.0.0.zip
unzip kafdrop-2.0.0.zip
cd kafdrop
mvn clean package
java -jar ./target/kafdrop-2.0.0.jar --zookeeper.connect=127.0.0.1:2181 --server.port=9999

安装扩展

wget https://github.com/edenhill/librdkafka/archive/master.zip
unzip master.zip
cd librdkafka-master/
./configure
make
sudo make install
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/php/bin/php-config 
make all -j 5
sudo make install
vi /usr/local/php/etc/php.ini
extension=rdkafka.so

生产者

$conf = new \RdKafka\Conf();
$conf->set('socket.timeout.ms', 50);//网络请求的超时时间,默认60s,改成50ms
        if (function_exists('pcntl_sigprocmask')) {
            pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
            $conf->set('internal.termination.signal', SIGIO);//设置kafka客户端线程在其完成后立即终止
        } else {
            $conf->set('queue.buffering.max.ms', 1);//确保消息尽快发送
        }
$conf->setDrMsgCb(function ($kafka, $message) {//每次都会调用
  if ($message->err) {
    // message permanently failed to be delivered
    $error_info = rd_kafka_err2str ($message->err);
  } else {
    // message successfully delivered
    print_r($message);
  }
});
        $conf->setErrorCb(function ($kafka, $err, $reason) {//发送失败后调用
            printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
        });
        $rk = new \RdKafka\Producer($conf);
        $rk->setLogLevel(LOG_DEBUG);
        $rk->addBrokers('IP:PORT');
        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set('message.timeout.ms', 100);//设置超时时间避免发送不成功长期堵住
        $topic = $rk->newTopic('TOPIC_ID',$topicConf);
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'message');
$rk->poll(0);//立即调用DrMsgCb

消费者

<?php
$config = require('config.php');
if(!extension_loaded('Rdkafka')){
    //如果扩展没有被加载
    file_put_contents("./extension.txt", "RdKafka extension is not load",FILE_APPEND);
}
$conf = new \RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    if ($message->err) {
        $error_info = rd_kafka_err2str ($message->err);
        echo $error_info;
    } else {
    }
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});
$rk = new \RdKafka\Consumer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers($config['kafka']['consumer']['brokers']);
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$topicConf->set('auto.commit.enable', true);
$topicConf->set('offset.store.path', './kafka_offset');
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.sync.interval.ms', 0);
$topic = $rk->newTopic($config['kafka']['consumer']['topic'], $topicConf);
do{
    $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
    $msg = $topic->consume(0, 1000);
    if($msg != null && $msg->err == 0){
        $data = $msg->payload;
    }
    $topic->consumeStop(0);
}while(true);
return array(
    'kafka' => array(
        'produce' => array(
            'brokers' => '127.0.0.1:9092',
            'topic' => '1'
        ),
        'consumer' => array(
            'brokers' => '127.0.0.1:9092',
            'topic' => '1'
        )
    ),
);

可配置属性

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

文档

https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html

可视化工具https://github.com/HomeAdvisor/Kafdrop

shell命令 http://www.cnblogs.com/xiaodf/p/6093261.html#3

 

© 著作权归作者所有

共有 人打赏支持
上一篇: swoole
粉丝 11
博文 220
码字总数 51408
作品 0
福州
高级程序员
私信 提问
简单舒适的 PHP-KafKa 拓展--See-KafKa

[喵咪KafKa(3)]PHP拓展See-KafKa 前言 (Simple 简单 easy 容易 expand 的拓展) KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通量(同...

喵了_个咪
2016/09/27
2.1K
1
喵了个咪/See-KafKa

#See-KafKa 简单舒适的PHP-KafKa拓展 ##前言 (Simple 简单 easy 容易 expand 的拓展) KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通...

喵了个咪
2016/09/27
0
0
[喵咪KafKa(3)]PHP拓展See-KafKa

[喵咪KafKa(3)]PHP拓展See-KafKa 前言 (Simple 简单 easy 容易 expand 的拓展) KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通量(同...

喵了_个咪
2016/09/27
800
1
【官方速报】360开源又一力作——KafkaBridge:让操作kafka更简单!

女主宣言 KafkaBridge 封装了对Kafka集群的读写操作,接口极少,简单易用,稳定可靠,支持c++/c、php、python、golang等多种语言,并特别针对php-fpm场景中作了长连接复用的优化,已在360公司...

ZVAyIVqt0UFji
10/12
0
0
[喵咪KafKa(2)]单机模式运行KafKa

在上节我们介绍完KafKa之后,今天我们来搭建KafKa三种模式(单机模式,伪集群,集群)中的一种单机模式的搭建,在正常的使用中我们一般吧单机模式作为开发环境的标配,今天就来和喵咪一同搭建一个K...

喵了_个咪
2016/09/12
190
0

没有更多内容

加载失败,请刷新页面

加载更多

【Flutter教程】从零构建电商应用(一)

在这个系列中,我们将学习如何使用google的移动开发框架flutter创建一个电商应用。本文是flutter框架系列教程的第一部分,将学习如何安装Flutter开发环境并创建第一个Flutter应用,并学习Flu...

笔阁
19分钟前
5
0
什么是以太坊DAO?(三)

Decentralized Autonomous Organization,简称DAO,以太坊中重要的概念。一般翻译为去中心化的自治组织。 投票支付合约的所有费用和行动需要时间,并要求用户始终保持活跃,知情和专注。另一...

geek12345
20分钟前
2
0
一个本科学生对Linux的认知

一个本科学生对Linux的认知 我是一名大三的普通一本大学的软件工程的一名学生,学校开设了一些关于系统开发的课程,纸上得来终觉浅,学校的课程课时较短,想要在56个课时之内学会一些公司需要...

linuxCool
今天
3
0
CentOS 安装Tomcat

Tomcat 介绍 Tomcat是Apache软件基金会(Apache Software Foundation)的Jakarta项目中的一个核心项目,由Apache、Sun和其他一些公司及个人共同开发而成。 Java 程序写的网站用tomcat+jdk来运...

野雪球
今天
1
0
OSChina 周四乱弹 —— 每天都迟到是种什么样的体验

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @开源中国首席机器人 :《Too Good At Goodbyes (Acoustic) - Sam Smith - 单曲》 《Too Good At Goodbyes (Acoustic) - Sam Smith - 单曲》 ...

小小编辑
今天
798
13

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部