php rdkafka是一個支持Apache Kafka的PHP擴展, 它是以librdkafka為基礎,使用C語言編寫,性能極佳。如果你在處理大規模的數據流或消息隊列上遇到了瓶頸,php rdkafka將是一個很好的解決方案。
下面我們來看一些實例,首先我們需要安裝php rdkafka擴展。安裝方法可以在官方文檔中查看。下面我們以簡單的生產者和消費者為例:
// 生產者 $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { if ($message->err) { echo "Produce failed: (" . $message->err . ") " . $message->errstr(); } else { echo "Produced message " . $message->payload . PHP_EOL; } }); $rk = new RdKafka\Producer($conf); $rk->addBrokers("127.0.0.1:9092"); $topic = $rk->newTopic("test"); $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Hello, World!"); // 消費者 $conf = new RdKafka\Conf(); $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); $conf->set('group.id', 'myGroup'); $conf->set('metadata.broker.list', '127.0.0.1:9092'); $consumer = new RdKafka\KafkaConsumer($conf); $consumer->subscribe(['test']); while (true) { $message = $consumer->consume(120 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo $message->payload . PHP_EOL; break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more" . PHP_EOL; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out" . PHP_EOL; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
上面的代碼演示了如何創建一個生產者,向Kafka中的test主題發送一條消息,以及如何創建一個消費者,訂閱test主題并接收消息。
我們還可以使用事務,在生產者中將多個消息發送到一個分區,如果任何消息發送失敗,所有消息都將回滾。下面是一個使用事務的示例:
$conf = new RdKafka\Conf(); $rk = new RdKafka\Producer($conf); $rk->addBrokers("127.0.0.1:9092"); $topicConf = new RdKafka\TopicConf(); $topicConf->set("request.required.acks", 1); $topic = $rk->newTopic("test", $topicConf); $rk->initTransactions(5000); $rk->beginTransaction(); try { $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message 1"); $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message 2"); $rk->commitTransaction(5000); } catch (RdKafka\Exception $e) { $rk->abortTransaction(5000); }
最后,由于php rdkafka的性能和php的其他擴展不一樣,因此在使用時需要特別注意內存和CPU使用情況,尤其是在使用高并發的場景下。
下一篇css密碼提示框