Programio

Kafka messaging system

Kafka je aplikace, která umožňuje posílat velké množství malých zpráv napříč servery, přičemž umožňuje horizontální škálování a zároveň všechny zprávy replikuje na více serverů – vypadne-li jeden z nich, jiný ho nahradí.

V celé sérii budu používat podivnou czenglish. Pojmu “partition” budu zkrátka říkat partition, protože překládat to jako oddíl mi přijde divné a navíc není na první pohled jasné, jaký původní termín myslím. Zkrátka se v práci bavíme o partitionech a když máme těch partitionů málo, zkusíme nějaké partitiony přidat. Sorry.

K čemu Kafku používáme

U nás ve firmě se nově snažíme Kafku používat na logování všech zpráv, které v systému vznikají a které chceme nějak zpracovávat. Na konkrétním příkladu si ukážeme, k čemu Kafka slouží a jak ji používáme.

Use case máme následující: z nějakého webu, například z iDnes.cz, nám přijde požadavek na zobrazení reklamy, my spustíme aukci a v té vybereme nejlepší reklamu, vrátíme ji jako odpověď a tato reklama se pak zobrazí na iDnesu. Aukci provedeme tak, že pošleme HTTP request několika serverům, kterým říkáme DSPéčka, ta pošlou nazpátek HTML kód reklamy a cenu, kterou nabízí za zobrazení. Z odpovědí vybereme nejvhodnější reklamu. DSPéčka jsou projekty úplně jiných firem. Tyto HTTP requesty zaslané DSPéčkům nazýváme bid request, DSPéčkové odpovědi nazýváme bid response. Aukce obsluhuje projekt AdSelectorProxy, což je HTTP server napsaný v Node.JS.

Další informace o tom, co vlastně stavíme, si můžete přečíst v předchozím článku

Prostě aukce, nic víc. iDnes prodává reklamní pozici na stránce, DSPéčka nabízí peníze za pozici a my to celé zprostředkováváme.

Rádi bychom všechny tyto bid requesty a bid responsy ukládali a později analyzovali. Například bychom chtěli jednoduše zjistit, které DSPéčka spolu nejčastěji soutěží, o kolik se v průměru liší nabídky u dvou nejvyšších nabídek a podobně. Konkrétně bychom mohli zjistit, že DSP 1 často soutěží s DSP 2 a většinou prohrává, ale vždy jen o několik halířů. To je zajímavá informace, se kterou lze dále pracovat.

V dokumentaci RTB se můžete podívat na to, jak takový reálný bid request vypadá.

Chceme tedy ukládat do nějaké databáze jak bid požadavky, tak bid odpovědi. Ale toto ukládání se nesmí dít přímo v AdSelectorProxy, protože před samotným uložením do databáze musíme ještě všechna data nějak předzpracovat, obohatit o data, která se sice neposílají DSPéčkům, ale v databázi je mít chceme a podobně a musíme nějak spojit request a response, aby bylo vidět, že patří k sobě.

To je poměrně dost práce na to, aby to dělal náš už tak velice vytížený HTTP server AdSelectorProxy a navíc to ani není jeho zodpovědnost. Místo toho chceme tato data dostat na úplně jiný server, který bude zodpovědný právě za obohacení dat a za uložení do příslušné databáze. Na tento jiný server dostaneme všechny zprávy právě skrze Kafku.

Základní pojmy v Kafce

Kafka má tři základní části: producer, který posílá zprávy do Kafky, broker, který tyto zprávy uchovává a consumer, který tyto zprávy čte.

V Kafce jsme vytvořili topic bidrequests. Topic je základní logická jednotka, se kterou Kafka pracuje. Pro různé typy zpráv můžeme založit různé topicy. Takže pro bid response máme zase topic bidresponses. Každý bid request, který AdSelectorProxy pošle DSPéčkům, zároveň pošle ve formátu JSON do Kafky do topicu bidrequests. Náš projekt AdSelectorProxy se v tomto případě chová jako Kafka producer – producer je aplikace, která posílá data do Kafky.

Na úplně jiném serveru nám běží Kafka broker. Ve chvíli, kdy pošleme zprávu do topicu bidrequests, broker zprávu přijme a uloží ji na konec souboru, ve kterém jsou uchované předchozí zprávy ze stejného topicu. Každá zpráva, která prochází Kafkou, se vždy uloží do souboru – je to jeden ze základních rozdílů oproti podobným programům jako je RabbitMQ nebo ZeroMq. Zpráva, která je potvrzená jako uložená, je dále poskytnuta ke zpracování.

Na třetím serveru pak běží Java aplikace, která je u brokera přihlášená jako consumer topicu bidrequests a tato aplikace čte příchozí bid requesty a dále je zpracovává. Obecně vzato může consumer dělat se zprávami cokoliv – může každou přečtenou zprávu uložit do SQL databáze, může je poslat na jiný HTTP server, může je zase poslat zpátky do Kafky do jiného topicu… My jako consumera používáme Samzu, o které budeme mluvit někdy příště.

Producer, broker a consumer mohou být na jednom stroji, ale typicky jsou to tři rozdílné servery. Producer tedy posílá zprávy na Kafka broker, Kafka broker všechny zprávy ukládá k sobě na disk, přičemž si u každé zprávy uloží informaci o tom, do jakého topicu patří a nakonec Kafka broker odešle zprávy z daného topicu nějakému Kafka consumerovi, který s těmi daty udělá co uzná za vhodné. To je celé základní flow.

Offset zprávy

Broker každou příchozí zprávu ukládá na disk a smaže ji až po uplynutí nastavené doby – vůbec nezáleží na tom, jestli tu zprávu nějaký consumer přečetl, nebo ne. To nám umožňuje, aby zprávy z jednoho topicu mohlo číst více consumerů a každý consumer může dokonce číst různě staré zprávy.

Každá zpráva v topicu má svůj číselný identifikátor, kterému se říká offset. První poslaná zpráva má offset 0, třicátá má offset 29 atd. Zprávy jsou vždy seřazené podle toho, v jakém pořadí přišly. Když se consumer přihlásí ke čtení, řekne brokeru “chci zprávy z topicu ABC od offsetu x” a broker mu je začne posílat. Máme v provozu dva rozdílné consumery:

  1. První consumer zkrátka čte aktuální bid requesty a dělá s nimi nějaké transformace a posílá je do databáze (to je ta Samza, více příště).

  2. Druhý consumer dělá jednou denně zálohy. Spustí se po půlnoci, přečte z topicu všechny dosud jím nepřečtené zprávy a uloží je do Hadoopu jako zálohu. Zároveň si uloží offset naposledy zpracované zprávy, aby další den věděl, odkud má zase začít. Kromě toho, že takto data zálohujeme, tak v budoucnu plánujeme nad těmito daty provádět nějakou analýzu.

Každý consumer se sám stará o ukládání offsetu naposledy přečtené zprávy. Je to celkem věda, ale o tom později.

Partitions

Co se stane ve chvíli, kdy náš Kafka broker přestane zvládat zpracovávat zprávy, které do něj posíláme? Kafka má vysokou propustnost, dejme tomu, že na jednom stroji zvládneme zpracovat 50 000 zpráv za sekundu. Ale co když chceme zpracovat 100 000 zpráv? Nebo 100 000 000 zpráv?

Řešením je partition. Každý topic můžeme rozdělit na několik partitionů. Každá partition pak může být zpracována na jiném brokeru/serveru. Pokud nastavíme deset partitionů, můžeme provoz rozprostřít mezi deset serverů. 100 000 zpráv bychom mohli zpracovat tak, že vytvoříme dvě partitiony a provoz proženeme přes dva Kafka brokery:

V AdSelectorProxy se sami musíme postarat o to, aby polovina zpráv šla do partition #1 a druhá polovina do partition #2. Naše architektura ale spíše vypadá tak, že máme několik producerů, kteří paralelně zapisují zprávy do Kafka brokerů, přičemž typicky každý producer zapisuje do všech partitionů.

V tomto obrázku tak máme tři webové servery, které zapisují do dvou partitions a dále máme dva consumery, přičemž každý consumer čte jen data z jedné partition. Mohli bychom mít jednoho consumera, který čte data z obou partitions, ale mohli bychom narážet na limity sítě nebo na jiné limity. Takto je architektura více distribuovaná.

Partition je dále nedělitelná jednotka – jedna partition nemůže být zpracovávána na více Kafka brokerech, vždy platí, že jedna partition je zpracovávána na právě jednom serveru/Kafka brokeru. Stejně tak nemůžeme mít dva consumery, kteří by si tu partitionu rozdělili a každý četl jenom jednu polovinu dat z dané partitiony.

Počet partition tedy určuje, jak moc může být zpracovávání zpráv distribuované. Proto je dobré nastavit jich spíše více než méně. Počet partitionů sice lze měnit i za provozu, ale není to úplně jednoduché. Lepší je na začátku “přestřelit” a nastavit velký počet partitionů než to každou chvíli v provozu měnit.

Strategie rozesílání zpráv do partition

Při poslání zprávy do topicu ručně nastavujeme, do jaké partition se má zpráva poslat. To můžeme udělat několika různými způsoby:

  • Natvrdo nastavíme, že server #1 bude posílat všechny zprávy do partition #1 atp.
  • Budeme zprávy rozhazovat zcela náhodně.
  • Budeme všechny partitiony poctivě střídat.
  • Číslo partitiony určíme na základě nějakého atributu zprávy. Pokud má naše zpráva nějaké ID, můžeme například všechny zprávy s ID začínající na malé písmeno odeslat do partition #1 a ostatní do #2.

Prvním cílem je, aby v každé partition bylo přibližně stejně zpráv. Druhým cílem může (ale nemusí) být, aby určité zprávy skončily ve stejné partition. Například bychom mohli chtít odstraňovat duplikované zprávy – pokud by dvě stejné zprávy skončily v různých partition, pak by se mohlo stát, že by je nakonec zpracovávaly konzumenti na různých serverech a duplikované zprávy by tak mohly projít. Proto se hodí postup, kdy zprávy do partition rozdělujeme na základě nějakého atributu. Například všechny zprávy, které mají stejné impressionId pošleme do stejné partition. To lze zařídit tak, že vypočteme číselnou hash impressionId a celočíselně vydělíme počtem partitions.

1
function getPartitionNumber(message, numberOfPartitions) {
    return murmurhash(message.impressionId) % numberOfPartitions;
}

(Jako hashovací funkci jsme použili známou Murmurhash.)

Uspořádání zpráv v rámci partition

Už jsme si řekli, že broker uchovává zprávy z jednoho topicu v takovém pořadí, v jakém tyto zprávy přišly. Jak do toho zapadá koncept partitionů? Jednoduše tak, že zprávy jsou uspořádané jen v rámci své partitiony. Není reálné nějakým způsobem synchronizovat pořadí zpráv, které se potenciálně nachází na fyzicky odlišných serverech.

Pro příklad: mějme deset zpráv, které očíslujeme 1, 2, …, 10. Liché zprávy pošleme do partitony #1, sudé do partitony #2. Dopadlo by to takto:

Nyní spustíme dva consumery, každý bude číst jednu partition. První consumer přečte zprávy 1, 3, 5, 7, 9 v tomto pořadí, druhý consumer přečte zprávy 2, 4, 6, 8, 10, v tomto pořadí:

Pokud ale budeme číst zprávy z obou partitionů jedním consumerem, bude čtení více méně nedeterministické. Chvíli se budou číst zprávy z jedné partition a další chvíli z druhé. Pořadí zpráv bude zachováno vždy jen v rámci jedné partitiony, takže zpráva 4 vždy přijde až po zprávě 2, ale zpráva 3 může přijít před 4. zprávou, ale může přijít i po 4. zprávě – protože je z jiné partitiony. Možné pořadí:

Pokud bychom chtěli použít jeden consumer, ale chtěli bychom zajistit, aby consumer přečetl zprávy zase v pořadí 1, 2, …, 10, museli bychom si doprogramovat logiku, která by během čtení tyto zprávy ještě zpátky seřazovala.

Příště…

Příště se podíváme na to, jak přesně takový Kafka consumer funguje, jak zajistit, abychom každou zprávu přečetli právě jednou a jak zařídit, abychom mohli mít dva consumery, kteří čtou různě staré zprávy.