在開發中,常常會涉及到對消息隊列的處理,而 Kafka 是當前熱門且實用的消息隊列系統,使用 PHP 操作 Kafka 也變得越來越流行。接下來,我們就來探討如何使用 PHP 操作 Kafka 消費。
首先,我們需要安裝 Kafka 擴展,使用 pecl 類似如下命令:
pecl install rdkafka
安裝完成后,我們就可以開始編寫 PHP 消費者了。下面是一個簡單的消費者示例:
$conf = new RdKafka\Conf(); $conf->set('metadata.broker.list', '127.0.0.1:9092'); $consumer = new RdKafka\Consumer($conf); $consumer->addBrokers('127.0.0.1:9092'); $topicConf = new RdKafka\TopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $topicName = 'test'; $topic = $consumer->newTopic($topicName, $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 100); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo $message->payload . PHP_EOL; break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
上述代碼為一個簡單的 Kafka 消費者消費消息并輸出到控制臺。其中,metadata.broker.list 指定 Kafka 集群的服務器,auto.offset.reset 指定消費的起始點,RD_KAFKA_OFFSET_STORED 指從消息的最后一個偏移量開始消費。
接下來,我們再來探討如何針對 Kafka 的多個分區進行消費。我們可以自己指定分區及其起始點:
$topic = $consumer->newTopic($topicName, $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); $topic->consumeStart(1, RD_KAFKA_OFFSET_STORED); $topic->consumeStart(2, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 100); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo $message->payload . PHP_EOL; break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
上述代碼中,我們分別訂閱了 3 個不同的分區進行消費。
最后,我們再來探討如何通過配置文件來配置 Kafka 消費者。下面是一個使用 yaml 配置文件的例子:
parameters: kafka.broker.list: '127.0.0.1:9092' kafka.topic.name: 'test' services: rd_kafka.producer.configuration: class: RdKafka\Conf calls: - [ set, [ 'metadata.broker.list', '%kafka.broker.list%' ]] rd_kafka.consumer.topic.configuration: class: RdKafka\TopicConf calls: - [ set, [ 'auto.offset.reset', 'smallest' ]] rd_kafka.consumer.configuration: class: RdKafka\Conf calls: - [ set, [ 'metadata.broker.list', '%kafka.broker.list%' ]] rd_kafka.consumer: class: RdKafka\Consumer arguments: - '@rd_kafka.consumer.configuration' calls: - [ addBrokers, [ '%kafka.broker.list%' ]] rd_kafka.topic: class: RdKafka\ProducerTopic arguments: - '@rd_kafka.producer' - '%kafka.topic.name%' event_listener.kafka_consumer: class: App\EventListener\KafkaConsumer arguments: - '@rd_kafka.consumer' - '@rd_kafka.topic' tags: - { name: kernel.event_listener, event: kernel.request }
上述配置文件定義了 Kafka 的 broker 以及 topic name 等信息,并定義了 rd_kafka.producer.configuration、rd_kafka.consumer.topic.configuration、rd_kafka.consumer.configuration 等服務,這里不再贅述每個服務的作用,讀者可以參考 Kafka 的官方文檔進行了解。
以上就是關于 PHP Kafka 消費的一些常見用法介紹,希望能幫助大家更好地使用 Kafka 進行開發。