Programio

Jak funguje Kafka consumer

Kafka je napsaná ve Scale a poskytuje nějaké normální třídy pro práci v Javě. Máme na výběr ze dvou druhů implementací lišících se mírou abstrakce: high level Kafka consumer a low level. My téměř všude používáme high level consumera a o něm bude celý tento článek. Existují implementace i v jiných jazycích, které mohou poskytovat ještě jiné úrovně abstrakce, než jaké poskytuje původní Scala implementace, těmi se nebudu zabývat vůbec. Obecně je nejlepší používat nativní Javové knihovny přímo od LinkedInu, tam máte největší jistotu, že bude vše fungovat jak má. Implementace v ostatních jazycích už LinkedIn na svědomí nemá.

ZooKeeper

Kafka interně používá ZooKeeper. ZooKeeper je hodně jednoduše řečeno distribuovaná replikovaná databáze, kterou můžeme použít na ukládání konfiguračních souborů, ale také na takové věci jako je volba lídra v distribuované síti. V tuto chvíli není důležité vědět o ZooKeeperu nic víc než to, že Kafka do něj ukládá všechna metadata o brokerech, topicích a partitonech.

Ukládání offsetu naposledy přečtené zprávy

Consumer je obecně nějaká aplikace, která se umí napojit na Kafka broker a číst z něj zprávy z nějakého topicu.

Všechny consumery pojí jeden problém: jak si pamatovat, které zprávy už byly zpracovány a které ne. Představme si, že náš topic bidrequests má jednu partition a že máme jednoho consumera, který přečetl a zpracoval 47 zpráv, když program náhle spadnul a znovu se nastartoval. Jak se consumer doví, že má začít číst od 48. zprávy?

Musíme si někam ukládat offset naposledy přečtené zprávy. Otázkou je kam.

  • Samozřejmě nemůže být v paměti, kterou po pádu aplikace ztratíme.
  • Můžeme ho ukládat do souboru nebo do nějaké lokální databáze typu Redis. To má zásadní nevýhodu v tom, že consumera musíme následně spustit na stejném stroji. Pokud bychom ho chtěli pustit na úplně jiném serveru, neměli bychom tato data k dispozici. Pokud by nastala chyba disku, na kterém je soubor uložený, máme opět problém.
  • Můžeme ho ukládat do vzdálené databáze. Ne náhodou jsme zmiňovali ZooKeeper, takže můžeme pro uložení offsetu použít právě ZooKeeper, který běží na nějakém vlastním stroji. Consumera můžeme pustit na jakémkoliv stroji, který má do ZooKeepera přístup, a ten si po startu může vytáhnout offset naposledy přečtené zprávy.

Jenomže to jsme se dostali z deště pod okap. Posílat s každou přečtenou zprávou požadavek do ZooKeeperu je velmi náročné – jak pro ZooKeeper, který musí obvykle ještě všechna data udržovat synchronizovaná na nejméně třech serverech – tak pro samotný consumer. A jako bonus to ani zcela neřeší náš problém.

At most once delivery

Představme si, že náš consumer dělá to, že přečte zprávu z topicu a zapíše ji do databáze, nic víc. Pokud by consumer po přečtení zprávy uložil do ZooKeeperu offset této zprávy, dělo by se při pádu aplikace toto:

  1. Consumer přečte z topicu zprávu #47.
  2. Consumer uloží do ZooKeeperu informaci o tom, že přečetl zprávu #47.
  3. Consumer pošle zprávu do databáze.
  4. Consumer kvůli nějaké chybě spadne; my ho po chvíli znovu spustíme.
  5. Consumer se podívá do ZooKeeperu a vidí, že naposledy přečetl zprávu #47.
  6. Consumer zažádá o zprávu #48.

Potud vše v pořádku. Jenomže co by se stalo, kdyby consumer spadl o kousek dřív – těsně předtím, než by odeslal zprávu do databáze:

  1. Consumer přečte zprávu #47 z topicu.
  2. Consumer informuje ZooKeeper, že přečetl zprávu #47.
  3. Consumer spadne; znovu ho spustíme.
  4. Consumer se podívá do ZooKeeperu a vidí, že naposledy přečetl zprávu #47.
  5. Consumer zažádá o zprávu #48.

…přitom zprávu #47 v databázi uloženou nemáme, přišli jsme o ni. Takovému způsobu zpracování zpráv říkáme at most once, tedy že každou zprávu zpracujeme maximálně jednou, ale může se stát, že nám nějaké zprávy po pádu aplikace utečou.

At least once delivery

Dobře, můžeme změnit strategii ukládání do ZooKeeperu tak, že budeme offset ukládat až když nám databáze potvrdí, že uložení proběhlo korektně. Tj. takto:

  1. Consumer přečte z topicu zprávu #47.
  2. Consumer vloží zprávu do databáze.
  3. Consumer čeká na potvrzení zápisu z databáze…
  4. …které právě přišlo.
  5. Consumer informuje ZooKeeper, že zpracoval zprávu #47.
  6. Consumer spadne; znovu ho pustíme.
  7. Consumer se podívá do ZooKeeperu a vidí, že naposledy zpracoval zprávu #47.
  8. Consumer zažádá o zprávu #48.

Potud opět vše v pořádku. Samozřejmě i v tomto případě může pád aplikace nastat v blbější chvíli a pak nastane problém:

  1. Consumer přečte zprávu #47 z topicu.
  2. Consumer vloží zprávu do databáze.
  3. Consumer čeká na potvrzení zápisu z databáze…
  4. …ale mezitím spadne; znovu ho pustíme.
  5. Consumer se podívá do ZooKeeperu a vidí, že naposledy zpracoval zprávu #46.
  6. Consumer zažádá o zprávu #47.

Protože si consumer nestihl uložit, že zprávu #47 už zpracoval, zpracuje ji znova. Zprávu #47 tak zpracoval dvakrát a pokud nedošlo během ukládání do databáze k chybě, budemem tam mít zprávu uloženou dvakrát. Takovému způsobu zpracovávání říkáme at least once.

Exactly once

Ale my samozřejmě chceme, abychom každou zprávu přečetli právě jednou. Jak to zařídit? No… těžko. Dost těžko.

Pokud je exactly once něco, co je opravdu potřeba, nezbývá, než nějak kooperovat s databází, kam se data ukládají. Například místo toho, abychom ukládali do databáze pouze samotnou zprávu, uložíme tam atomicky jak zprávu, tak offset zprávy, kterou zpracováváme. Při restartu consumera se pak podíváme do databáze a vytáhneme offset z naposledy uloženého záznamu. Tím máme zajištěno, že offset se do databáze uloží právě a jen tehdy, pokud se tam uloží i zpráva.

Jenomže to není moc pružné řešení – ve chvíli, kdy bychom chtěli vyměnit cílovou databázi za nějaké jiné úložiště, museli bychom znova implementovat exactly once chování, přičemž do některých databází ani nemusí být možné uložit takto atomicky dva různé záznamy.

Jednou z dalších cest je navrhnout systém tak, aby nevadilo, že se některé zprávy zpracovaly dvakrát. Třeba zprávu “nastav mzdu zaměstnance XYZ na 42 000 Kč” lze aplikovat vícekrát bez toho, aniž by to mělo vliv na výsledný stav systému. Naopak aplikovat vícekrát zprávu “zvyš mzdu zaměstnance XYZ o 5000 Kč” by už byl průšvih. I když jak pro koho.

A nesmíme zapomenout ani na další řešení: vykašlat se na to, že se některé zprávy nezpracují nebo se zpracují vícekrát. Zní to děsivě, ale v realitě to není tak hrozné:

  • Nepřesné výsledky dostáváme jen tehdy, když Kafka spadne. Pokud Kafka běží v pohodě, získáváme 100% správné výsledky. V praxi se nám nestává, že by Kafka cluster byl nějak nestabilní a každou chvíli padal.
  • V bankovnictví si asi nemůžeme dovolit, abychom nějakou transakci zpracovali vícekrát nebo naopak abychom nějakou vynechali. Ale například v oblasti měření návštěvnosti tato kritéria tak přísná nebudou.
  • Minule jsme si představovali Lambda architekturu, jejíž součástí je i batch větev, která má za úkol opravit data z realtime větve – nepřesné výsledky vzniklé díky pádům Kafky může ještě později opravit tato větev.

Angličtina má pro toto hezké slovo, které neumím hezky přeložit: trade-off. Je to zkrátka na vás. Můžete si naimplementovat exactly once zpracování, ale nebude to zadarmo, nebude to jednoduché a pro spoustu případů stačí to, co Kafka poskytuje v základu.

Náročnost ukládání offsetu

Už jsme zmínili, že ukládat offset s každou přečtenou zprávou je samo o sobě velmi náročné. Posílat do ZooKeeperu desítky tisíc požadavků za sekundu prostě není dobrý nápad. V realitě tak dokonce ani neoznamujeme změnu offsetu s každou jednotlivou přečtenou zprávou, ale po nějakých větších celcích. Typicky co x sekund. Pokud použijete javový high level consumer, bude ukládat offsety co x sekund do ZooKeeperu.

My zatím nemáme tak velký provoz, takže si můžeme dovolit ukládat offsety každou sekundu, ale z příspěvků ostatních, které jsme četli, jsme vypozorovali, že používají i daleko větší intervaly, například celou jednu minutu. To znamená, že když consumer spadne sekundu předtím, než měl uložit offset, pak ty zprávy, které přečetl za 59 sekund, přečte znova.

Kam jinam ukládat offsety consumerů?

ZooKeeper je dobrý, ale má jeden problém: není absolutně horizontálně škálovatelný. Což je vlastně docela ironické, když člověk uváží, že ho spousta podobných distribuovaných systému používá.

Pokud máme tři servery se ZooKeeperem, funguje to tak, že jeden z nich je leader, všechny ostatní servery komunikují právě s tímto leaderem a zbylé dva uzly slouží jen jako replikace. Přidáme-li další dva servery do ZooKeeper clusteru, nijak nezvýšíme propustnost, ba právě naopak – bude se muset replikovat na více serverů, což bude více zdržovat. Všechny aplikace budou stále komunikovat s jedním serverem, s leaderem.

Přestane-li ZooKeeper stíhat, můžeme buď upgradovat hardware … nebo změnit architekturu. V LinkedInu už si problémů se ZooKeeperem všimli a rozhodli se, že offsety consumerů už nebudou ukládat do ZooKeeperu, ale do něčeho jiného. Jenomže kam? Obzvlášť, když potenciálně můžeme mít v běhu tisíce Kafka consumerů?

Ideálně by to chtělo systém, který zvládne zpracovávat tisíce požadavků za sekundu, je distribuovaný, má dobře vyřešené replikace a je odolný vůči výpadkům a snadno z něj zpětně přečteme uložené hodnoty. Plot twist – taková je přece Kafka!

Ano, v novějších verzích Kafky se offsety ukládají zpátky do Kafky, do topicu s nějakým specifickým jménem. Offsety consumerů, které čtou topic bidrequests by se tak mohly ukládat třeba do topicu bidrequests_offsets apod. Consumer pak po startu přečte všechny zprávy v daném topicu a z úplně poslední zprávy zjistí naposledy uložený offset a odtud bude číst dále.

Rebalancing a group id

Dejme tomu, že náš topic bidrequests má čtyři partitiony a spustíme jednoho high level consumera, který tento topic čte. Consumer bude automaticky číst zprávy ze všech partition.

Co se stane, když spustíme druhého consumera, který bude číst stejný topic?

To záleží na tom, jaké má group id. Pokud oběma consumerům nastavíme stejné group id, automaticky dojde k rebalancingu: consumeři si mezi sebe rovnoměrně rozdělí všechny partitiony z daného topicu a každý consumer tedy bude číst polovinu partitionů. První consumer bude číst partitiony 0 a 1, druhý 2 a 3.

Přidáme-li další dva consumery se stejným group id, dojde k dalšímu rebalancingu a každý consumer bude číst data z právě jedné partition. Přidáním pátého consumera už by se na situaci nic nezměnilo – čtyři consumeři by četli po jedné partition a pátý by nedělal nic.

Tímto způsobem lze efektivně škálovat consumery. Klidně bychom mohli mít v provozu pět consumerů na pěti různých serverech, na každém by se zpracovávala data z jedné partitiony a pátý consumer by sloužil čistě jako záloha. Ve chvíli, kdy by jeden z consumerů spadl, mohl by tento pátý po rebalancingu začít číst tu partition, kterou četl předchozí consumer.

Consumer, kterému bychom nastavili jiné group id, by četl zprávy nezávisle na ostatních consumerech, které mají odlišné group id. Ti consumeři, které mají stejné group id, se mezi sebou dělí o partitiony. Ti consumeři, které mají rozdílné group id, pracují nezávisle na sobě.

Můžeme mít skupinu consumerů s group id live-consumers a další skupinu s group id backup-consumers. Přitom live-consumers by četli aktuální data z topicu a backup-consumers by četli den staré zprávy a zálohovaly by je třeba do Hadoopu. Obě skupiny by četly zprávy nezávisle na sobě a nezávisle na sobě by si ukládaly offsety.

Offsety se vždy ukládají pro trojici [topic, partition, group_id]. Na offsety se můžeme podívat do ZooKeeperu:

1
2
$ get /kafka/consumers/some-group-id/offsets/bidrequests/0
27631000

Číslo 27631000 značí naposledy uložený offset pro topic bidrequests, pro partitionu 0 a pro group id some-group-id.

Rebalancing může být mrcha

Rebalancing se děje vždy, když se změní počet consumerů pro daný topic pro dané group id. Algoritmus je přibližně takový:

  1. Všichni aktuální consumeři daného topicu a daného group id přestanou číst zprávy.
  2. Všichni consumeři uloží svůj aktuální offset.
  3. Rozhodne se, kteří consumeři budou číst jaké partitiony.
  4. Začne se číst.

To zní jednoduše. Problém je, když chcete napsat aplikaci, která si sama řídí ukládání offsetů. Standardně se offsety ukládají co x sekund. Ale to můžete vypnout a můžete si ty offsety ukládat jak chcete pomocí metody commitOffsets. Třeba ty offsety ukládat po každé stovce zpracovaných zpráv a podobně.

Jenomže pokud vypnete automatické ukládání offsetů co x sekund, vypnete tím i ukládání offsetu, který probíhá v bodě 2) u rebalancingu. Což je trochu problémeček – při každém rebalancingu se v tuto chvíli neuloží offset aktuálně zpracované zprávy a při startu consumerů v bodě 4) se začne číst od offsetu, který byl uložený někdy předtím. Jinými slovy – některé zprávy nutně přečtete a zpracujete vícekrát.

Řeklo by se, že stačí nějak reagovat na událost rebalancing, jenže … Kafka žádnou takovou událost neemituje. Takže smolíček.

To jsme obešli tím, že jsme sice povolili automatické ukládání offsetů co x sekund, ale to x jsme nastavili šíleně vysoké, takže offsety se nám během rebalancingu uloží, ale pravidelné ukládání offsetů se dostane ke hře jednou za hodně dlouhou dobu, což už není takový problém.

Je možné, že nějaké novější verze Kafky toto nějak vyřeší (LinkedIn má v plánu přepracovat consumer).

Příště…

Příště se podíváme na to, jak v Kafce fungují replikace.