php连接kafka
php连接kafka
果树啊 发表于10个月前
php连接kafka
  • 发表于 10个月前
  • 阅读 215
  • 收藏 0
  • 点赞 0
  • 评论 0

【腾讯云】买域名送云解析+SSL证书+建站!>>>   

安装kafka

yum -y install java-1.8.0-openjdk.i686 java-1.8.0-openjdk-devel.i686
wget http://mirror.its.dal.ca/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -xzvf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0/bin
#启动zookeeper
./zookeeper-server-start.sh  ../config/zookeeper.properties  &
#启动kafka
 ./kafka-server-start.sh ../config/server.properties &
ps -ef | grep kafka




创建一个叫"test1234"的topic,它只有一个分区,一个副本:
 ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1234

可以用list查看创建的topic,当前创建了4个topic
./kafka-topics.sh --list --zookeeper localhost:2181

发送消息。运行producer并在控制台中输一些消息,这些消息将被发送到服务端
./kafka-console-producer.sh --broker-list localhost:9092 --topic test1234

开启consumer,可以读取到刚才发出的消息并输出。
 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1234 --from-beginning

 

安装可视化kafka管理工具

curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
mv bintray-sbt-rpm.repo /etc/yum.repos.d/
yum install sbt
git clone https://github.com/yahoo/kafka-manager
cd kafka-manager
./sbt clean dist
cp target/universal/kafka-manager-1.3.3.13.zip /usr/local/kafka-manager.zip
unzip kafka-manager.zip
vi conf/application.conf
kafka-manager.zkhosts="127.0.0.1:2181"
nohup bin/kafka-manager -Dconfig.file=/conf/application.conf -Dhttp.port=9999 &

安装扩展

wget https://github.com/edenhill/librdkafka/archive/master.zip
unzip master.zip
cd librdkafka-master/
./configure
make
sudo make install
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/php/bin/php-config 
make all -j 5
sudo make install
vi /usr/local/php/etc/php.ini
extension=rdkafka.so

生产者

$conf = new \RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
  if ($message->err) {
    // message permanently failed to be delivered
    $error_info = rd_kafka_err2str ($message->err);
  } else {
    // message successfully delivered
    print_r($message);
  }
});
        $conf->setErrorCb(function ($kafka, $err, $reason) {
            printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
        });
        $rk = new \RdKafka\Producer($conf);
        $rk->setLogLevel(LOG_DEBUG);
        $rk->addBrokers('IP:PORT');
        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set('message.timeout.ms', 100);//设置超时时间避免发送不成功长期堵住
        $topic = $rk->newTopic('TOPIC_ID',$topicConf);
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'message');

消费者

<?php
$config = require('config.php');
if(!extension_loaded('Rdkafka')){
    //如果扩展没有被加载
    file_put_contents("./extension.txt", "RdKafka extension is not load",FILE_APPEND);
}
$conf = new \RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    if ($message->err) {
        $error_info = rd_kafka_err2str ($message->err);
        echo $error_info;
    } else {
    }
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});
$rk = new \RdKafka\Consumer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers($config['kafka']['consumer']['brokers']);
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$topicConf->set('auto.commit.enable', true);
$topicConf->set('offset.store.path', './kafka_offset');
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.sync.interval.ms', 0);
$topic = $rk->newTopic($config['kafka']['consumer']['topic'], $topicConf);
do{
    $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
    $msg = $topic->consume(0, 1000);
    if($msg != null && $msg->err == 0){
        $data = $msg->payload;
    }
    $topic->consumeStop(0);
}while(true);
return array(
    'kafka' => array(
        'produce' => array(
            'brokers' => '127.0.0.1:9092',
            'topic' => '1'
        ),
        'consumer' => array(
            'brokers' => '127.0.0.1:9092',
            'topic' => '1'
        )
    ),
);

可配置属性

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

文档

https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html

 

shell命令 http://www.cnblogs.com/xiaodf/p/6093261.html#3

 

  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 10
博文 196
码字总数 48114
×
果树啊
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: