色婷婷狠狠18禁久久YY,CHINESE性内射高清国产,国产女人18毛片水真多1,国产AV在线观看

php kafka 消費

馮子軒1年前9瀏覽0評論

在開發中,常常會涉及到對消息隊列的處理,而 Kafka 是當前熱門且實用的消息隊列系統,使用 PHP 操作 Kafka 也變得越來越流行。接下來,我們就來探討如何使用 PHP 操作 Kafka 消費。

首先,我們需要安裝 Kafka 擴展,使用 pecl 類似如下命令:

pecl install rdkafka

安裝完成后,我們就可以開始編寫 PHP 消費者了。下面是一個簡單的消費者示例:

$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '127.0.0.1:9092');
$consumer = new RdKafka\Consumer($conf);    
$consumer->addBrokers('127.0.0.1:9092');
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$topicName = 'test';
$topic = $consumer->newTopic($topicName, $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 100);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo $message->payload . PHP_EOL;
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}

上述代碼為一個簡單的 Kafka 消費者消費消息并輸出到控制臺。其中,metadata.broker.list 指定 Kafka 集群的服務器,auto.offset.reset 指定消費的起始點,RD_KAFKA_OFFSET_STORED 指從消息的最后一個偏移量開始消費。

接下來,我們再來探討如何針對 Kafka 的多個分區進行消費。我們可以自己指定分區及其起始點:

$topic = $consumer->newTopic($topicName, $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
$topic->consumeStart(1, RD_KAFKA_OFFSET_STORED);
$topic->consumeStart(2, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 100);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo $message->payload . PHP_EOL;
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}

上述代碼中,我們分別訂閱了 3 個不同的分區進行消費。

最后,我們再來探討如何通過配置文件來配置 Kafka 消費者。下面是一個使用 yaml 配置文件的例子:

parameters:
kafka.broker.list: '127.0.0.1:9092'
kafka.topic.name: 'test'
services:
rd_kafka.producer.configuration:
class: RdKafka\Conf
calls:
- [ set, [ 'metadata.broker.list', '%kafka.broker.list%' ]]
rd_kafka.consumer.topic.configuration:
class: RdKafka\TopicConf
calls:
- [ set, [ 'auto.offset.reset', 'smallest' ]]
rd_kafka.consumer.configuration:
class: RdKafka\Conf
calls:
- [ set, [ 'metadata.broker.list', '%kafka.broker.list%' ]]
rd_kafka.consumer:
class: RdKafka\Consumer
arguments:
- '@rd_kafka.consumer.configuration'
calls:
- [ addBrokers, [ '%kafka.broker.list%' ]]
rd_kafka.topic:
class: RdKafka\ProducerTopic
arguments:
- '@rd_kafka.producer'
- '%kafka.topic.name%'
event_listener.kafka_consumer:
class: App\EventListener\KafkaConsumer
arguments:
- '@rd_kafka.consumer'
- '@rd_kafka.topic'
tags:
- { name: kernel.event_listener, event: kernel.request }

上述配置文件定義了 Kafka 的 broker 以及 topic name 等信息,并定義了 rd_kafka.producer.configuration、rd_kafka.consumer.topic.configuration、rd_kafka.consumer.configuration 等服務,這里不再贅述每個服務的作用,讀者可以參考 Kafka 的官方文檔進行了解。

以上就是關于 PHP Kafka 消費的一些常見用法介紹,希望能幫助大家更好地使用 Kafka 進行開發。