RabbitMQ是一個高效穩定的消息隊列系統,可在分布式系統之間傳遞消息。RabbitMQ使用AMQP協議,支持消息路由,傳輸確認和ACK機制,在消息的可靠性和穩定性上表現出了優異的性能。
在實際應用中,我們常常需要實現延時任務,例如:訂單支付成功后需要5分鐘后才能發貨;用戶發表了文章需要5秒后才能顯示等待。RabbitMQ提供了一種解決方案——延時隊列。
在延時隊列中,消息先被發送到特定的隊列,該隊列中的每個消息都附帶一個過期時間, 直到該過期時間到達才會釋放消息到原始隊列,或流向失敗隊列。如圖所示:
// 創建延時隊列并綁定死信交換機 $delayExchangeName = 'delay.order'; $delayExchangeArgs = new AMQPTable(['x-delayed-type' => 'direct']); $delayExchange = new AMQPExchange($channel); $delayExchange->setType('x-delayed-message'); $delayExchange->setName($delayExchangeName); $delayExchange->setArguments($delayExchangeArgs); $delayExchange->declare(); // 創建綁定死信隊列 $queueName = 'order'; $deadExchangeName = 'dead.order'; $deadRoutingKey = 'order.dead'; $deadQueueName = 'dead.order'; $deadExchange = new AMQPExchange($channel); $deadExchange->setName($deadExchangeName); $deadExchange->setType(AMQP_EX_TYPE_DIRECT); $deadExchange->declare(); $queueArgs = new AMQPTable([ 'x-dead-letter-exchange' => $deadExchangeName, 'x-dead-letter-routing-key' => $deadRoutingKey, 'x-message-ttl' => $delayTime * 1000, ]); $queue = new AMQPQueue($channel); $queue->setArguments($queueArgs); $queue->setName($queueName); $queue->declare(); $queue->bind($delayExchangeName, $queueName);
本文中,我們創建了一個延遲隊列delay.order,綁定了死信交換機dead.order。 消息在該隊列中保留的時間是由消費者處理延遲消息的時間大于TTL的時長度,當過期時間到達時,消息將被發送到死信交換機,然后轉移到“order.dead”隊列。 如果在該期間內成功處理,則消息將被刪除,否則將被置于失敗隊列。
當我們發布一個延遲消息時,我們需要在消息的Header中添加一個x-delay參數來表示延遲時間,具體如下:
$delayTime = 5 * 60 * 1000; $msg = new AMQPMessage('order paid', [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'expiration' => strval($delayTime), 'application_headers' => new AMQPTable(['x-delay' => $delayTime]) ]); $exchange->publish($msg, 'order', AMQP_NOPARAM, [ 'delivery_mode' => 2, 'delay' => $delayTime, ]);
在這里,我們定義了一個在5分鐘后才能到達的消息,然后發布到名為“order”的交換器。 'delay'參數是AMQP協議的延時擴展,時間以毫秒為單位。
當消費者處理消息時,需要從“order”交換機中接收消息,但是此時應使用“對應的死信交換機和死信路由鍵”命名并創建消費隊列:
$deadQueue = new AMQPQueue($channel); $deadQueue->setName($deadQueueName); $deadQueue->declare(); $deadQueue->bind($deadExchangeName, $deadRoutingKey); while (true) { $msg = $queue->get(AMQP_AUTOACK); if (!$msg) { usleep(100000); continue; } // 處理消息 $deadExchange->publish($msg->getBody(), $deadRoutingKey, AMQP_NOPARAM, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]); }
在這里,消費者從一個名為“order”的AMQP隊列接收消息。如前所述,隊列被聲明為具有針對"dead.order"隊列的TTL和x-dead-letter-exchange。 此消費者將消息送到"dead.order"隊列。
此時,就可以用它來處理延遲消息了。 只需在死信交換機中監聽即可。
$deadQueue = new AMQPQueue($channel); $deadQueue->setName($deadQueueName); $deadQueue->declare(); $deadQueue->bind($deadExchangeName, $deadRoutingKey); while (true) { $msg = $deadQueue->get(AMQP_AUTOACK); if (!$msg) { usleep(100000); continue; } // 處理死信消息 }
總的來說,延時隊列可以用來處理訂單支付成功后等待一段時間后再發貨等場景。而使用RabbitMQ的延時隊列可以有效的解決這類問題,提升系統的穩定性和可靠性。