Programio

Uložení lokálního stavu v Samze

V minulé části série jsme si představili problém: pokud restartujeme Samza job, nevyhnutelně přijdeme o lokální stav aplikace, přijdeme o všechna data, které jsme měli uloženy pouze v paměti počítače. Jak nám Samza framework pomůže tento problém vyřešit?

Ukážeme si to na jiném Samza jobu. Jednou z dalších věcí, které měříme, jsou pochopitelně kliky na reklamní bannery. Zpráva o kliku také nakonec skončí v Kafce v topicu clicks a ten se dále zpracovává v Samze. Představme si nyní, že nějaký uživatel přijde na ČSFD, koukne na reklamu a … no to není možné! Vyhrál iPhone! Rychle kliká, stránka se ale neotevírá. Stále jako zběsilý kliká na reklamní banner, až se nakonec dostane na cílovou stránku a začne skákat radostí.

Jaká je největší zrada celého příběhu? Pokud uživatel desetkrát za sebou kliknul na reklamní banner, my to započítáme jako deset různých kliků, přitom logicky by to měl být spíše jeden klik. Abychom to byli schopni takto spočítat, máme v zásadě dvě možnosti: zařídit, aby se po druhém kliku na banner už neodeslal požadavek na naše servery, nebo musíme všechny požadavky ještě dále filtrovat a odstraňovat duplicity.

Z různých důvodů nemusí být úplně jednoduché zařídit, aby se už pdoruhé neodeslal požadavek na naše servery, takže zvolíme druhou možnost. Potřebujeme napsat Samza job, který bude číst topic clicks a bude odstraňovat duplikované zprávy, které bude odesílat do topicu unique_clicks.

Naivní verze

Každá naše zpráva má unikátní ID. Pokud budou v topicu dvě shodné zprávy, budou mít také shodné ID. Nemusíme si uchovávat obsah každé zprávy, stačí nám si pamatovat IDéčka těch zpráv, které jsme už viděli.

Napíšeme Samza job tak, že bude číst zprávy z topicu clicks a IDéčko každé dosud nepřečtené zprávy si uloží do nějaké množiny. Pokud z topicu přečteme znova stejnou zprávu, budeme mít ID této zprávy uložené v množině, takže tuto zprávu zahodíme; nebo ji pošleme do topicu duplicated_clicks, abychom o tuto informaci nepřišli.

Spustíme Samza job.

Za tři hodiny spadne na nedostatku paměti.

Přidáme windowing

Problém byl v tom, že jsme do množiny neustále přidávali nová a nová IDéčka, ale neodstraňovali jsme stará. Nevyhnutelně muselo dojít k tomu, že dojde paměť. Tento problém lze vyřešit například windowingem, tj. budeme z paměti odstraňovat IDéčka, která jsme uložili před více než hodinou a budeme doufat, že to bude stačit.

Máme ale větší problém. Po restartu aplikace přijdeme a celý lokální stav aplikace a tím pádem i o seznam již přečtených zpráv. Po nastartování Samza jobu nutně začneme propouštět i ty zprávy, které jsme přečetli v minulém běhu programu.

Zatímco druhou zprávu s ID 46 vyfiltrujeme, protože ji máme uloženou v množině, druhá zpráva s ID 47 nám projde, protože to bude první zpráva, kterou přečteme po startu Samza jobu a naše množina dosud přečtených zpráv je aktuálně prázdná.

Potřebovali bychom zkrátka zařídit, aby data, která máme v paměti, přežila pád aplikace. Podobný problém jsme už řešili, když jsme se bavili o ukládání offsetů v Kafce. Hlavním rozdílem oproti offsetům je ten, že lokální stav aplikace může být mnohem větší. Potřebujeme nějaký prostor na uložení potenciálně velkého množství dat, který přežije pád našeho Samza jobu. Jaké máme možnosti?

  • Můžeme data ukládat do souboru. No. Tam by se asi blbě prohledávala, že?
  • Dobře, můžeme data držet v paměti, kde je budeme prohledávat, a do souboru je budeme pouze zálohovat. Problémem nastane, když velikost dat přeroste dostupnou paměť.
  • Můžeme data ukládat do nějaké lokální databáze. Jenomže po restartu může Samza job běžet na úplně jiném počítači a k lokálně uloženým datům vůbec nebude mít přístup.
  • Můžeme data ukládat do nějaké vzdálené databáze na jiném stroji. To už je lepší cesta, ale asi tím slušně snížíme rychlost celého Samza jobu.

KeyValue store

Samza nám nabízí něco ještě jiného. Ideálně bychom chtěli mít všechny data v paměti, protože to je nejrychlejší. Druhý nejlepší způsob je mít data ne v paměti, ale alespoň na stejném počítači.

V Samze můžeme použít vestavěnou KeyValue store, která je uložena lokálně na disku, přičemž každá změna, kterou v KeyValue store provedeme, se automaticky zálohuje v Kafce. Samza si sama vytvoří v Kafce nový topic, který pojmenuje podle jobu, který je zrovna spuštěn a do toho topicu bude ukládat všechny změny, které provedeme v KeyValue store.

Když chceme změnit hodnotu v KeyValue storu, uděláme něco jako

1
store.put("Star trek", 47);

Samza zařídí, aby se v KeyValue store přepsala hodnota pod klíčem “Star trek” hodnotou 47 a zároveň vypálí do správného kafka topicu zprávu v přibližném tvaru

1
{"key": "Star trek", value: 47}

Tím získáme historii všech změn, které byly ve storu provedeny.

Po restartu Samza jobu se jako první vytvoří nová prázdná KeyValue store, přečte se celý obsah záložního topicu a aplikují se na KeyValue store všechny přečtené změny. Tím dostaneme KeyValue store do stavu, v jakém byl, když jsme restartovali Samzu.

Kafka nám přitom přímo poskytuje podporu pro takovéto ukládání zpráv – každé zprávě totiž lze přiřadit nějaký klíč. V Kafce můžeme dále zapnout něco, co se jmenuje Log Compaction. Tato feature bude v pravidelných intervalech hledat zprávy v topicu se stejným klíčem. Protože nemá smysl mít v topicu více zpráv se stejným klíčem – zajímá nás vždy jen ta nejnovější hodnota – Kafka automaticky smaže všechny starší zprávy.

Pro každý klíč zůstala v topicu (resp. partitioně) pouze jedna zpráva a to ta nejnovější. Offsety se nezmění. Z tohoto důvodu můžou v jedné partitioně v Kafce vzniknout “díry”, ve kterých chybí zprávy s určitým offsetem. Bohužel to také znamená, že každá zpráva musí v sobě obsahovat svůj offset, což mírně komplikuje kód celé Kafky, ale to už není náš problém.

Díky Log Compaction nebude záložní topic růst do nekonečna, ale bude vždy přibližně stejně velký jako KeyValue store samotná. Budeme-li měnit jen sto hodnot v KeyValue store, v topicu po promazání bude také jen sto zpráv. Přečteme-li nyní celý záložní topic, získáme stav KeyValue storu.

Náš Samza job, který má deduplikovat topic s kliky bychom mohli napsat tak, že by všechna IDéčka ukládal do KeyValue storu a při kontrole by se zase do KeyValue díval. Po restartu aplikace by se KeyValue obnovil do původního stavu, ať spustíme Samza job na kterémkoliv počítači.

Pro přesné použití se podívejte do dokumentace State Managmentu. Samza aktuálně jako KeyValue store využívá levelDB. V budoucích verzích by měla přibýt i čistě in-memory KeyValue store – to pokud víte, že potřebujete ukládat relativně málo dat, které se vám jistě vlezou do paměti, ale stále byste je chtěli mít zálohované v Kafce.