PHP RabbitMQ通信是一種常見的消息隊列通信方式,適用于大量的異步處理需求。消息隊列就像是一個中間人,它負責接收、存儲并轉(zhuǎn)發(fā)消息,使得不同的應用程序可以異步交互,提高整體性能和可擴展性。
假設我們有一個在線商城系統(tǒng),用戶可以通過添加商品到購物車來完成在線購物。當用戶選擇完商品后,我們可以在購物車里添加一個消息,通知庫存系統(tǒng)減去相應商品的庫存。
// 生產(chǎn)者代碼 $connection = new AMQPConnection([ 'host' => 'localhost', 'port' => 5672, 'vhost' => '/', 'login' => 'guest', 'password' => 'guest' ]); $channel = $connection->channel(); $channel->queue_declare('inventory', false, true, false, false); $message = json_encode(['id' => 1, 'quantity' => 5]); $channel->basic_publish( new AMQPMessage($message), '', 'inventory' ); $channel->close(); $connection->close();
上面的代碼中,我們使用AMQP擴展連接到RabbitMQ服務器,聲明一個名為inventory的隊列,發(fā)送一條消息表明持有id為1的商品的數(shù)量減少了5。AMQPMessage類代表一條消息。
// 消費者代碼 $connection = new AMQPConnection([ 'host' => 'localhost', 'port' => 5672, 'vhost' => '/', 'login' => 'guest', 'password' => 'guest' ]); $channel = $connection->channel(); $channel->queue_declare('inventory', false, true, false, false); $callback = function($message){ $data = json_decode($message->body, true); $id = $data['id']; $quantity = $data['quantity']; echo "商品$id已經(jīng)減少了$quantity的庫存"; }; $channel->basic_consume('inventory', '', false, true, false, false, $callback); while(count($channel->callbacks)){ $channel->wait(); } $channel->close(); $connection->close();
我們還需要一個消費者代碼來處理這條消息。實現(xiàn)步驟與用戶下單類似,通過回調(diào)函數(shù)處理消息即可。
RabbitMQ也支持 Exchange,它類似于消息隊列中的路由器,它會將不同類型的消息路由到不同的隊列中。例如當用戶進行評論時,他的評論可能會被推送到好評Top、差評Top兩個隊列中。Exchange類型有三種:Fanout、Direct、Topic。Fanout類型將所有消息轉(zhuǎn)發(fā)到所有隊列中,Direct類型將匹配綁定鍵的消息轉(zhuǎn)發(fā)到相關(guān)隊列,Topic類型可以根據(jù)通配符對消息進行路由。
// 生產(chǎn)者代碼 $connection = new AMQPConnection([ 'host' => 'localhost', 'port' => 5672, 'vhost' => '/', 'login' => 'guest', 'password' => 'guest' ]); $channel = $connection->channel(); $channel->exchange_declare('comment', 'direct', false, true, false); $message_data = [ 'id' => 1, 'content' => '非常好的商品!', 'rating' => 5 ]; $message = json_encode($message_data); $channel->basic_publish( new AMQPMessage($message), 'comment', 'positive' ); $channel->basic_publish( new AMQPMessage($message), 'comment', 'negative' ); $channel->close(); $connection->close();
上面的代碼中,我們創(chuàng)建了一個名為comment的Exchange,類型為Direct,將一條評論消息同時路由到Positive和Negative兩個隊列。注意,我們需要使用basic_publish函數(shù)代替queue_declare函數(shù),傳遞Exchange名稱和綁定鍵(即第二個參數(shù))。
// 消費者代碼 $connection = new AMQPConnection([ 'host' => 'localhost', 'port' => 5672, 'vhost' => '/', 'login' => 'guest', 'password' => 'guest' ]); $channel = $connection->channel(); $channel->exchange_declare('comment', 'direct', false, true, false); $channel->queue_bind('positive', 'comment', 'positive'); $channel->queue_bind('negative', 'comment', 'negative'); $callback = function($message){ $data = json_decode($message->body, true); $id = $data['id']; $content = $data['content']; $rating = $data['rating']; echo "$content(評分$rating)被推薦到了好評榜" . PHP_EOL; }; $channel->basic_consume('positive', '', false, true, false, false, $callback); while(count($channel->callbacks)){ $channel->wait(); } $channel->close(); $connection->close();
以上代碼演示了如何將評論存儲到好評和差評隊列中,并將評論存儲到不同的隊列中。用戶可以針對存儲的每個隊列寫不同的處理邏輯,例如將好評評論展示在商品詳情頁,將差評評論推送給客服人員等等。
最后,我們需要注意在使用RabbitMQ時,一定要根據(jù)需求設置合適的隊列持久化、消息持久化和確認機制。隊列持久化可以保證在消息隊列服務器發(fā)生故障時,消息隊列不會丟失。消息持久化可以保證在消息發(fā)送時,如果隊列尚未準備好,這條消息會被寫入磁盤中,待隊列準備好后再進行發(fā)送。確認機制則可以保證在消息被消費者正常處理后,才會將該消息從隊列中刪除,防止消息丟失。
以上就是我們的PHP RabbitMQ通信介紹。希望大家在實際項目中能夠靈活運用,提高系統(tǒng)性能和可維護性。