kafka日志壓縮
小編:管理員 18閱讀 2022.07.25
日志壓縮
Log compaction ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition. It addresses use cases and scenarios such as restoring state after application crashes or system failure, or reloading caches after application restarts during operational maintenance. Let's dive into these use cases in more detail and then describe how compaction works.
日志壓縮確保kafka始終保留至少單個topic分區數據中每條消息key的最后的值。它解決了一些用例和場景,如應用程序崩潰或系統故障后還原狀態,或應用程序在運行維護過程中重新啟動后重新加載緩存。讓我們深入討論這些使用中的更多細節,描述它是如何壓縮的。
So far we have described only the simpler approach to data retention where old log data is discarded after a fixed period of time or when the log reaches some predetermined size. This works well for temporal event data such as logging where each record stands alone. However an important class of data streams are the log of changes to keyed, mutable data (for example, the changes to a database table).
目前為止,我們只描述了比較簡單的數據保留方法,舊的數據保留一段固定的時間,或當日志達到規定的大小后丟棄。這非常適用于暫時性事件數據,如日志記錄,每個記錄是獨立的。而是一類重要的數據流,日志是變化的,可變的數據(例如:更改數據庫表)。
Let's discuss a concrete example of such a stream. Say we have a topic containing user email addresses; every time a user updates their email address we send a message to this topic using their user id as the primary key. Now say we send the following messages over some time period for a user with id 123, each message corresponding to a change in email address (messages for other ids are omitted):
讓我們來討論一個關于流的具體的例子,假設我們有一個topic里包含用戶的email地址,每次用戶更新他們的email地址,我們發送一條消息到這個topic,使用用戶Id作為主鍵,F在,我們在一段時間內為id為123的用戶發送一些消息,每個消息對應email地址的改變(其他ID消息省略):
123 => bill@microsoft.com
.
.
. 123 => bill@gatesfoundation.org
.
.
. 123 => bill@gmail.com
Log compaction gives us a more granular retention mechanism so that we are guaranteed to retain at least the last update for each primary key (e.g.bill@gmail.com). By doing this we guarantee that the log contains a full snapshot of the final value for every key not just keys that changed recently. This means downstream consumers can restore their own state off this topic without us having to retain a complete log of all changes.
日志壓縮為我提供了更精細的保留機制,所以我們至少保留每個主鍵的最后一次更新 (例如:bill@gmail.com)。這樣我們保證日志包含每一個key的最終值而不只是最近變更的完整快照。這意味著下游消費者可以獲得最終的狀態而無需拿到所有的變化的消息信息。
Let's start by looking at a few use cases where this is useful, then we'll see how it can be used.
讓我們先看幾個有用的用例,然后我們再看到怎么使用它。
-
Database change subscription. It is often necessary to have a data set in multiple data systems, and often one of these systems is a database of some kind (either a RDBMS or perhaps a new-fangled key-value store). For example you might have a database, a cache, a search cluster, and a Hadoop cluster. Each change to the database will need to be reflected in the cache, the search cluster, and eventually in Hadoop. In the case that one is only handling the real-time updates you only need recent log. But if you want to be able to reload the cache or restore a failed search node you may need a complete data set.
數據庫更改訂閱,通常需要在多個數據庫系統,有一個數據集,這些系統中通常有一個是某種類型的數據庫(無論是RDBMS或者新流行的key-value倉庫)。例如,你可能有一個數據庫,緩存,搜索集群,以及Hadoop集群。每次變更數據庫,也同時需要變更緩存,搜索集群,和Hadoop。在只需處理最新日志的實時更新的情況下,你只需要最近的日志。但是,如果你希望能夠重新加載緩存或恢復搜索失敗的節點,你可能需要一個完整的數據集。 -
Event sourcing. This is a style of application design which co-locates query processing with application design and uses a log of changes as the primary store for the application.
事件源。查詢處理與應用設計共存,這是一種應用程序的設計風格,并使用一個變更日志作為應用程序的主倉庫。 -
Journaling for high-availability. A process that does local computation can be made fault-tolerant by logging out changes that it makes to its local state so another process can reload these changes and carry on if it should fail. A concrete example of this is handling counts, aggregations, and other "group by"-like processing in a stream query system. Samza, a real-time stream-processing framework, uses this feature for exactly this purpose.
高可用的日志:本地的計算進程可以通過注銷它自己的本地狀態的變更進行容錯。使另一個進程可以重載加載這些更改并繼續執行(如果它故障)。例如:處理計數、聚合和其他的“group by”,- 像流查詢系統。Samza,實時流處理框架,使用這個特性正是出于這一原因。
In each of these cases one needs primarily to handle the real-time feed of changes, but occasionally, when a machine crashes or data needs to be re-loaded or re-processed, one needs to do a full load. Log compaction allows feeding both of these use cases off the same backing topic. This style of usage of a log is described in more detail in this blog post.
在每一種情況下,首先需要去處理實時變更的feed(ps:新請求來的消息),但是偶爾,當機器崩潰或數據需要重新加載或重新處理時,需要做完整的加載。數據壓縮允許feed這2種用例,這種風格的更詳細的請看博客帖子。
The general idea is quite simple. If we had infinite log retention, and we logged each change in the above cases, then we would have captured the state of the system at each time from when it first began. Using this complete log we could restore to any point in time by replaying the first N records in the log. This hypothetical complete log is not very practical for systems that update a single record many times as the log will grow without bound even for a stable dataset. The simple log retention mechanism which throws away old updates will bound space but the log is no longer a way to restore the current state—now restoring from the beginning of the log no longer recreates the current state as old updates may not be captured at all.
通常想法是很簡單,如果我們有無限的日志保留,我們記錄每個變更,在上述的情況下,那么我們就從當它第一次開始每次捕獲系統狀態。保留完整日志,我們可以通過在日志重放第一個N記錄來恢復到任意的時間點。這個假設的完整的日志對單條記錄更新多次的系統是很不實用,即使是一個穩定的數據集,但日志將無線增長。一個簡單的機制是扔掉舊日志,但是日志不能在恢復了。
The general idea is quite simple. If we had infinite log retention, and we logged each change in the above cases, then we would have captured the state of the system at each time from when it first began. Using this complete log, we could restore to any point in time by replaying the first N records in the log. This hypothetical complete log is not very practical for systems that update a single record many times as the log will grow without bound even for a stable dataset. The simple log retention mechanism which throws away old updates will bound space but the log is no longer a way to restore the current state—now restoring from the beginning of the log no longer recreates the current state as old updates may not be captured at all.
日志壓縮是一種機制給每條細粒度的保留,而不是基于時間的粗粒度的保留,是有選擇地刪除記錄,我們保留相同的主鍵的最新記錄。這種方式的日志保證至少有每個key的最后狀態。
This retention policy can be set per-topic, so a single cluster can have some topics where retention is enforced by size or time and other topics where retention is enforced by compaction.
可以為每個topic設置保存策略,可以通過大小或時間,以及通過其他的壓縮方式保存。
This functionality is inspired by one of LinkedIn's oldest and most successful pieces of infrastructure—a database changelog caching service called Databus. Unlike most log-structured storage systems Kafka is built for subscription and organizes data for fast linear reads and writes. Unlike Databus, Kafka acts as a source-of-truth store so it is useful even in situations where the upstream data source would not otherwise be replayable.
此功能的靈感來自LinkedIn最古老和最成功的基礎設施 — 一個叫做Databus的 “數據庫更新日志緩存服務”。不像大多數日志結構的存儲系統,Kafka是專門為訂閱和快速線性的讀和寫的組織數據,不同與Databus,kafka作為source-of-truth(真源:這里簡單解釋一下,消息發送到kafka這里,那么kafka里的消息就是最真的源了,因為如果kafka宕機了,從kafka的角度來講,那kafka能自己恢復消息嗎?不能,因為它不知道找誰,因此,kafka里面的消息就是真的源頭數據),因此非常利于那些上游數據無法回放的情形。
日志壓縮基礎
Here is a high-level picture that shows the logical structure of a Kafka log with the offset for each message.
這是一個高級別的日志邏輯圖,展示了kafka日志的每條消息的offset邏輯結構。
The head of the log is identical to a traditional Kafka log. It has dense, sequential offsets and retains all messages. Log compaction adds an option for handling the tail of the log. The picture above shows a log with a compacted tail. Note that the messages in the tail of the log retain the original offset assigned when they were first written—that never changes. Note also that all offsets remain valid positions in the log, even if the message with that offset has been compacted away; in this case this position is indistinguishable from the next highest offset that does appear in the log. For example, in the picture above the offsets 36, 37, and 38 are all equivalent positions and a read beginning at any of these offsets would return a message set beginning with 38.
的kafka日志。它是密集的,連續offset,并保存所有的消息。日志壓縮增加了一個選項來處理尾部(tail)的日志,上圖顯示了一個尾部壓縮日志。另外,日志尾部已分配的消息將保留原來的偏移量 —— 永遠不會改變,還要注意,在日志中所有的偏移量仍然保持有效的位置,即使消息已經壓縮,在這種情況下,在日志的下一個最高的offset的位置是無法區分的。例如,上圖的偏移量36,37和38都是等效的位置,讀這些offset都將返回消息集的開始位置38。
Compaction also allows for deletes. A message with a key and a null payload will be treated as a delete from the log. Such a record is sometimes referred to as a tombstone. This delete marker will cause any prior message with that key to be removed (as would any new message with that key), but delete markers are special in that they will themselves be cleaned out of the log after a period of time to free up space. The point in time at which deletes are no longer retained is marked as the "delete retention point" in the above diagram.
壓縮也允許刪除。通過消息的key和空負載(null payload)來標識該消息可從日志中刪除。這個刪除標記將導致刪除在這個key之前的任何消息(以及該key的所有新消息)。但是刪除標記是特殊的,他們自己去清理日志,在一段時間之后釋放空間。在刪除的時候不再保留標記作為“刪除保存點”,如上圖。
The compaction is done in the background by periodically recopying log segments. Cleaning does not block reads and can be throttled to use no more than a configurable amount of I/O throughput to avoid impacting producers and consumers. The actual process of compacting a log segment looks something like this:
壓縮是在后臺通過定期重新復制日志段來完成的。清洗不會阻塞讀,可以限流I/O吞吐量(是可配置),以避免影響生產者和消費者。實際壓縮處理日志看起來像這樣:
Log compaction guarantees the following:
日志壓縮保障如下:
-
Any consumer that stays caught-up to within the head of the log will see every message that is written; these messages will have sequential offsets. The topic's min.compaction.lag.ms can be used to guarantee the minimum length of time must pass after a message is written before it could be compacted. I.e. it provides a lower bound on how long each message will remain in the (uncompacted) head. The topic's max.compaction.lag.ms can be used to guarantee the maximum delay between the time a message is written and the time the message becomes eligible for compaction.
任何滯留在日志head中的所有消費者能看到寫入的所有消息;這些消息都是有序的offset。topic的使用min.compaction.lag.ms用來保障消息寫入之前必須經過的最小時間長度,才能被壓縮。也就是說,它提供了消息保留在head(未壓縮)的最少時間。 -
Ordering of messages is always maintained. Compaction will never re-order messages, just remove some.
始終保持消息的排序。壓縮永遠不會重新排序消息,只是刪除了一些。 -
The offset for a message never changes. It is the permanent identifier for a position in the log.
消息的偏移量永遠不會改變。消息在日志中的位置將永久保存。 -
Any consumer progressing from the start of the log will see at least the final state of all records in the order they were written. Additionally, all delete markers for deleted records will be seen, provided the consumer reaches the head of the log in a time period less than the topic's delete.retention.ms setting (the default is 24 hours). In other words: since the removal of delete markers happens concurrently with reads, it is possible for a consumer to miss delete markers if it lags by more than delete.retention.ms.
從日志開始消費的所有消費者將至少看到其按順序寫入的最終狀態的消息。此外,假如消費者在小于topic的delete.retention.mssetting設置的時間段(默認24小時)到達日志的head。將會看到所有已刪除消息的刪除標記。換句話說:由于刪除日志與讀取同時發生,消費者將優于刪除。
日志壓縮的細節
Log compaction is handled by the log cleaner, a pool of background threads that recopy log segment files, removing records whose key appears in the head of the log. Each compactor thread works as follows:
日志cleaner處理日志壓縮,后臺線程池重新復制日志段文件,移除在日志head中出現的消息。每個壓縮線程工作方式如下:
It chooses the log that has the highest ratio of log head to log tail
選擇log head(日志頭)到log tail(日志尾)比率最高的日志。
It creates a succinct summary of the last offset for each key in the head of the log
在head日志中為每個key的最后offset創建一個的簡單概要。
It recopies the log from beginning to end removing keys which have a later occurrence in the log. New, clean segments are swapped into the log immediately so the additional disk space required is just one additional log segment (not a fully copy of the log).
它從日志的開始到結束,刪除那些在日志中最新出現的key,新的,干凈的段將立刻交換到日志中。因此,所需的額外磁盤空間只是一個額外的日志段(不是日志的完整副本)。
The summary of the log head is essentially just a space-compact hash table. It uses exactly 24 bytes per entry. As a result with 8GB of cleaner buffer one cleaner iteration can clean around 366GB of log head (assuming 1k messages).
日志head的概要本質上是一個空間密集型的哈希表,每個entry使用固定的24byte。這樣8GB的cleaner buffer一次迭代可清理大約366GB的日志(假設消息1K)。
配置Log Cleaner
The log cleaner is enabled by default. This will start the pool of cleaner threads. To enable log cleaning on a particular topic you can add the log-specific property
Log cleaner默認是啟動的。也將啟動cleaner線程池。你也可以針對特定topic啟用log清潔,通過
log.cleanup.policy=compact
This can be done either at topic creation time or using the alter topic command.
可以在創建topic時或使用alter topic命令指定。
The log cleaner can be configured to retain a minimum amount of the uncompacted "head" of the log. This is enabled by setting the compaction time lag.
log cleaner可以配置保留日志“head”不壓縮的最小數。通過設置壓縮延遲時間。
log.cleaner.min.compaction.lag.ms
This can be used to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, i.e. the one currently being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag.
這可以預防消息在一個最小消息時間絕不會被壓縮。如果不設置,除了最新的段,其他所有的段都是可以壓縮的,即,當前正在寫入的那個。即使其所有消息都比最小壓縮時間滯后更長,正在寫入的段也不會被壓縮。
Further cleaner configurations are described here.
關于cleaner更詳細的配置在這里。
相關推薦
- kafka消費者Java客戶端 一個從kafka集群中獲取消息的java客戶端。kafka客戶端從kafka集群中獲取消息,并透明地處理kafka集群中出現故障broker,透明地調節適應集群中變化的數據分區。也和broker交互,負載平衡消費者。public class KafkaConsumerK,V extends Object implements Consu…
- HanLP《自然語言處理入門》--3.二元語法與中文分詞 文章目錄3. 二元語法與中文分詞3.1 語言模型3.2 中文分詞語料庫3.3 訓練與預測3.4 HanLP分詞與用戶詞典的集成3.5 二元語法與詞典分詞比較3.6 GitHub項目3. 二元語法與中文分詞上一章中我們實現了塊兒不準的詞典分詞,詞典分詞無法消歧。給定兩種分詞結果“商品…