kafka使用记录

kafka项目地址:http://kafka.apache.org/

php api :

https://github.com/quipo/kafka-php

https://github.com/michal-harish/kafka-php

 

需要先安装zookeeper的扩展才能使用

https://github.com/andreiz/php-zookeeper

zookeeper的php扩展安装时要求安装zookeeper

http://zookeeper.apache.org/

wget http://mirrors.hust.edu.cn/apache/zookeeper/stable/zookeeper-3.4.6.tar.gz

tar zxvf zookeeper-3.4.6.tar.gz

cd zookeeper-3.4.6/src/c

./configure

make && make install

 

wget https://github.com/andreiz/php-zookeeper/archive/master.zip

 

unzip master.zip

phpize

./configure

make && make install

将zookeepre.so加入php.ini

 

 

使用php开发kafka的consumer(消费者)程序时有以下注意事项:

1.group 不能为0

2.每次循环结束后要将$zkconsumer和$zookeeper对象unset,否则数据会一直存在内存中导致崩溃
3.每次从队列获取数据后要commitoffset,否则队列不消耗

 

多进程消费时可以继承Kafka_ZookeeperConsumer类,改写rewind方法,指定消费的broker机器,这样能够避免重复消费数据,具体代码如下:

主要是这里if ( $host == $GLOBALS[‘host’] && $port == $GLOBALS[‘port’] )  可以从命令行或者配置中取得host和port,这样就只消费该台broker了

<?php
// just read the specified host/port’s partitions
class Kafka_ZookeeperConsumerRewrite extends Kafka_ZookeeperConsumer
{
/**
* [Rewrite] Rewind the iterator
*
* @return void
*/
public function rewind() {
$this->iterators = array();
$this->nIterators = 0;
foreach ($this->topicRegistry->partitions($this->topic) as $broker => $nPartitions) {
for ($partition = 0; $partition < $nPartitions; ++$partition) {
list($host, $port) = explode(‘:’, $this->brokerRegistry->address($broker));
// just keep the specified host/port
if ( $host == $GLOBALS[‘host’] && $port == $GLOBALS[‘port’] ) {
$offset = $this->offsetRegistry->offset($this->topic, $broker, $partition);
$this->iterators[] = (object) array(
‘consumer’          => null,
‘host’              => $host,
‘port’              => $port,
‘broker’            => $broker,
‘partition’         => $partition,
‘offset’            => $offset,
‘uncommittedOffset’ => 0,
‘messages’          => null,
);
++$this->nIterators;
}
}
}
if (0 == count($this->iterators)) {
throw new Kafka_Exception_InvalidTopic(‘Cannot find topic ‘ . $this->topic);
}
// get a random broker/partition every time
$this->shuffle();
}
}