Programio

Jak funguje replikace v Kafce

Co se stane, když nám server s Kafka Brokerem shoří? Není Kafka Broker single point of failure? Asi všichni tušíme, že není… Pro každý topic totiž lze nastavit počet replikací. Pokud pro topic nastavíme replikaci na hodnotu tři, znamená to, že všechny zprávy, které do topicu pošleme, se zkopírují na další dva servery (respektive Kafka brokery) a jedna zpráva se bude celkem nacházet na třech serverech.

Důležité je, že se o replikaci nemusí starat producer – ten stále odesílá zprávy na jeden server a Kafka cluster už si sám řeší replikaci dat na další servery. Flow vypadá takto:

Producer pošle zprávu brokerovi #1, ten zprávu zapíše k sobě na disk a zároveň se zpráva replikuje u brokerů #2 a #3, které si ji také zapíší na disk. Tím dojde k záloze a pokud Broker #1 vypadne, provoz se automaticky přesměruje na jeden ze zbývajících brokerů:

Replikací nicméně nezvýšíme propustnost služby. Máme-li nastavenou replikaci 3, neznamená to, že když budeme číst zprávy z tohoto topicu, že budeme číst zprávy paralelně ze všech tří brokerů. Vždy se zvolí jeden broker jako leader, z něj se čtou zprávy a zbylé dva servery se pro čtení nepoužijí. V předchozím obrázku například consumer čte zprávy z brokeru #2 a z brokeru #3 žádné zprávy nedostává, přestože tento broker má zprávy z daného topicu uložené u sebe na disku.

ISR

Teď trochu detailněji o tom, jak replikce funguje. Tímto příkazem založíme topic:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
bin/kafka-topics.sh --create --zookeeper zookeeper01:2181 \
--replication-factor 3 --partitions 2 --topic bidrequests
```

Kafka musí založit dvě partitions, pro každou partition musí vybrat celkem tři brokery, které budou tyto partitiony obsluhovat a replikovat. Dejme tomu, že pro partitionu 0 zvolí brokery kafka00, kafka01 a kafka02 a pro partition 1 kafka03, kafka04 a kafka05. V každé skupině se zvolí leader pro danou partition. Leaderem se tak mohou stát například stroje kafka01 a kafka05. Ostatní Kafky slouží v tuto chvíli jako repliky.

Kafka dále definuje pojem *in sync replica*, což je broker, který je "živý" (= udržuje svou session se ZooKeeperem) a který stíhá dostatečně rychle replikovat zprávy z leaderu (= offset poslední replikované zprávy je maximálně o x zpráv pozadu oproti leaderovi).

Kafka si neustále udržuje informace o tom, které repliky jsou "aktuální". Může se totiž stát, že replikace spadne nebo že replikace přestane stíhat stahovat a ukládat data. Ty servery, které to stíhají, jsou nazývány *in sync replica*, zkráceně ISR. Kafka si pak pro každou partitionu udržuje v ZooKeeperu množinu ISR, můžete se na ně podívat v ZooKeeper konzoli (`bin/zkCli.sh`):

```plain
$ get /brokers/topics/bidrequests/partitions/0/state
{"leader":1, "version":1, "isr":[2,1,0]}

$ get /brokers/topics/bidrequests/partitions/0/state
{"leader":5, "version":1, "isr":[5,3,4]}
```

Záznam v ZooKeeperu říká, že leaderem partitiony 0 topicu *bidrequests* je kafka01 a synchronizované repliky jsou kafka02, kafka01 a kafka00 (i samotný leader je ISR). Tyto repliky pravidelně stahují nové zprávy z leadera a replikují je k sobě. Leader poskytne zprávu ke konzumaci pouze ve chvíli, kdy všechny ISR potvrdí, že zprávu přijaly.

Pokud ISR spadne nebo pokud nestíhá stahovat nové zprávy z leadera, je tento uzel odstraněn z množiny ISR v ZooKeeperu a pokračuje se dále. Dejme tomu, že by kafka00 kompletně spadla:

```plain
$ get /brokers/topics/bidrequests/partitions/0/state
{"leader":1, "version":1, "isr":[2,1]}

Leader by od této chvíle čekal na potvrzení jen od kafka02. Pokud by spadnul leader, stala by se leaderem kafka02:

1
$ get /brokers/topics/bidrequests/partitions/0/state
{"leader":2, "version":1, "isr":[2]}

Všechny zprávy by v tuto chvíli tekly jen přes kafka02 a docházelo by k nulové záloze. Je přitom jedno, že vedle máme stroje kafka03, kafka04 a kafka05, které žijí – partition 0 má nastaveno, že poběží na strojích kafka00, kafka01 a kafka02, takže dokud tyto stroje neoživíme, nebude docházet k žádné replikaci. Nemusí to být samozřejmě stejné servery, stačí spustit Kafka broker se stejným ID.

Pokud jakýmkoliv způsobem přivedeme k životu kafka01, ta zjistí, že by měla obsluhovat partition 0 topicu bidrequests, takže si začne z leaderu tahat chybějící data. Až je natáhne, stane se zase ISR, což se uloží do ZooKeeperu. Leader se nezmění.

$ get /brokers/topics/bidrequests/partitions/0/state
{"leader":2, "version":1, "isr":[2,1]}

Máme-li nastavenou replikaci r, náš systém bude tolerovat až r-1 nedostupných ISR. Pokud vypadne všech r strojů, máme samozřejmě smůlu.

Nastavení Kafky týkající se replikace

Kafka umožňuje více specifikovat, jak se v určitých situacích týkajících se replikace chovat:

  • request.required.acks určuje, jestli bude producer čekat na potvrzení doručení zprávy. Možné hodnoty:
    • 0: Producer nečeká na žádné potvrzení, rychlost odesílání je nejvyšší.
    • 1: Producer čeká na potvrzení leadera.
    • -1: Producer čeká na potvrzení leadera a všech replik v ISR. Při tomto nastavení bue producer nejpomalejší, protože bude relativně dlouho čekat na potvrzení doručení zpráv, ale zase máte největší jistotu, že o žádnou zprávu nepřijdete.
  • min.insync.replicas určuje minimální počet ISR. Pokud například nastavíte request.required.acks na -1, tj. producer bude čekat na potvrzení od všech ISR, může se stejně stát, že všechny replicy spadnou a ISR bude obsahovat pouze leadera. Tímto nastavením můžete říci, že pokud je velikost ISR menší než min.insync.replicas, začne producer vyhazovat při odesílání výjimky.