如何把一個運行完好的Kafka搞崩?
kafka是一個分布式的發布和訂閱的消息系統。也就是消息的發布者把消息進行分類,然后發送到kafka上。而訂閱者去讀取也就是去消費一個特定類型的消息。當然在這樣的一個系統中一般都會存在一個節點來做一個中心的作用。
生產者kafka是流式的處理數據,那么這個數據的來源就是由生產者發送到kafka的。整個消息的發送流程如下圖:
大致上來看我們需要構建一個ProducerRecord 一個對象。然后把這個對象發送到對應的一個broker上。
首先一個ProducerRecord對象包含了topic,parition,key ,value。生產者需要把鍵值對象序列化成字節數據,這樣才能夠在網上傳播。然后如果指定了分區,數據就會往分區寫,如果沒有指定就會根據這個key,把數據往指定的分區寫。在成功寫入之后,服務器響應會發回一個RecordMetaData對象,包含記錄在分區里面的偏移量。接下來,我們來看看 producer.send() 怎么使用:
這里,我們關注了發送者的兩種發送方式:同步和異步。send方法會返回一個Future對象,如果使用這個對象的get方法,等到kafka的返回,那么會阻塞在這里。如果你不關注這個返回值得話,我們可以用完send就跑。然后使用回掉方法,特殊的關注一些異常就好了。
需要注意的是,消息是被放進緩沖區中,然后使用單獨的線程發送到服務端。我們會等到緩沖區到達一定大小統一發送,發送者在發送的時候會把屬于同一分區的消息依次發送,這樣分區里面的消息就是有序的。關于緩沖區這個是有具體的參數可以配置的。使用的時候搜索一下就行。
最后,發送到broker的消息類型是字節數組,所以我們在發送之前需要把key-value進行相應的序列化,kafka有自帶的一些序列化方法,假如你使用的是你自己定義類,那么你可以自己寫序列化方法,也可以使用向thrift這樣來生成數據。
消費者消費者消費它訂閱的那個topic的消息。因為我們消費之后可能要做一些耗時的處理工作,所以一個消費者可能消費不過來這么多消息,導致消息阻塞在broker上。但是Kafka是很好橫向擴展的,橫向擴展的意思是你多來幾個消費者來消費同一個分區不就好了。那么這些消費同一個topic的消費者,就屬于一個消費者群組。topic下面消息的最小單位應該是分區(partition),每個消費者消費的都是一個partition。如果說消費者的數量大于了partition的數量,那么會有一些消費者,消費不到數據。另一個是,kafka能保證的消息有序只有paritition這個級別了,不能保證topic級別的消息有序性。
假如說生產者崩了一個那也就崩了,只不過是沒有消息進來而已。但是消費者崩了的話,可能消息就會堵在broker上,對整個業務造成影響。所以在消費的離開或者加入群組的時候。kafka會進行一項rebalance的操作,就是把分區在重新分配給消費者。所以就得知道在rebalance之前各個消費者消費patition的情況,得知道他們的offset。kafka 0.8 及之前都是把這個信息放在Zookeeper上。但實際上Zookeeper并不能承受住大規模的讀寫。所以經常rebalance失敗,消息堵住,影響業務。但是kafka 0.9及之后,consumer在消費的時候會把偏移信息發送給一個特殊的topic->__consumer_offsets。這樣broker就可以直接維護這些信息了。下面我們來看一下如何使用kafka consumer:
poll()這個方法是很重要的,消費者在每次輪詢的時候都會檢查是否該提交偏移量了,如果是,那么就會使用poll來提交一次偏移量到_consumer_offsets 這個topic上。當然了,提交的時機也是我們可以設置的配置。
當然,出了自動提交還有手動提交的api:commitSync 和 commitAsync。
還有兩個特殊的api:
public void onPartitionsRevoked(Collection partitions)public void onPartitionsAssigned(Collection partitions)這兩個方法,都是在consumer.subscribe的時候調用,這樣發生rebalance的時候,或者重新分配區之后和消費者讀取消息之前被調用。可以執行這兩個方法里面的操作。
最后,在另一個線程里面調用consumer.wakeup() 就可以優雅的退出consumer.poll() 輪詢。