Programio

Samza: distributed stream processing framework

V předchozích částech jsme se bavili o Kafce, což je aplikace, která slouží k přenosu zpráv z jednoho serveru na jiný. Samza je Javový framework, který nám pomáhá tyto protékající zprávy dále zpracovávat.

Co děláme se zprávami před uložením do databáze

Jaké zpracování mám na mysli? Vraťme se k úkolu, který jsem popisoval v předchozích částech. Provozujeme burzu s reklamními pozicemi. Pro každou reklamní pozici pošleme několika serverům, DSPéčkům, nabídku (bid request) a očekáváme odpověď (bid response). Tato data ukládáme do Kafka topicu. (detailněji jsem to popsal minule) Co s těmi daty dále děláme?

  • Požadavky a odpovědi jsou vcelku nehezké JSONy. My na ukládání používáme Druid.io, která nepodporuje ukládání takto složitých JSONů, takže potřebujeme něco, co nám zanořený JSON “zlinearizuje” na jeden klasický řádek o x sloupcích.

  • bidrequests a bidresponses jsou dva nezávislé topicy, v jednom jsou požadavky a v druhém odpovědi. Do databáze ale chceme ukládat jeden záznam, který bude uchovávat jak data z požadavku, tak data z odpovědi, tj. chceme ukládat něco jako merge(request, response). Každý požadavek jde s každou odpovědí spárovat podle requestID. Potřebujeme nějaký program, který bude číst současně oba topicy a pro každou odpověď nalezne původní požadavek na základě requestID, tyto JSONy nějak inteligentně spojí a pošle zase dál na zpracování.

  • V jiné části systému do Kafky ukládáme informace o tom, když někdo klikne na banner. Pokud na něj někdo klikne dvakrát rychle za sebou, vyšlou se na naše servery dva požadavky o kliku, které budou úplně stejné. V Kafka topicu budou dvě shodné zprávy, které ale nechceme ukládat do databáze. Potřebujeme program, který z topicu vyháže duplikace.

  • Když někdo klikne na reklamu, máme přímo v GET požadavku informaci o tom, na jakou reklamu klikl. Nemáme ale například informaci o tom, jakému klientovi tato reklama patří. Přitom je to informace, kterou bychom rádi měli uloženou přímo se zprávou o kliku. Musíme tak mít nějaký program, který se ještě před uložením zeptá nějaké databáze, který klient vlastní reklamu s tímto ID a tuto informaci musí přidat do zprávy.

Toto jsou asi čtyři nejčastější případy využití nějaké (pre)processing aplikace: transformace dat, join dat, filtrace dat a obohacení dat. Samza nám s tímto preprocessingem velmi pomáhá.

Proč vůbec používat nějaký framework?

Všechny zprávy nám tečou v Kafce. Co nám brání napsat si Javovou (Python, Ruby, Node.JS, …) aplikaci, která bude číst zprávy z Kafky, něco s nimi udělá a pak je zase uloží zpět do Kafky nebo jinam? Samozřejmě, že to jde, ale museli bychom řešit problémy, které už jsou v Samze vyřešené. Například:

  • Co se stane, když z nějakého důvodu naše aplikace spadne? Samza se automaticky postará o znovunastartování aplikace.

  • Jak moc škálovatelná naše aplikace bude? V Samze stačí přepsat jedno číslo v configu a rázem neběží na jednom serveru, ale na deseti.

  • Často je potřeba windowing, který využíváme u párování požadavků a odpovědí. Obecně jde o to, jak provést nějakou akci každých deset sekund nebo jak přidat našim datům nějaké TTL – třeba když přečteme z topicu nějaký bid request, tak na odpovídající bid response čekáme jen určitou dobu. Dalším příkladem může být plovoucí průměr nějakých dat za posledních x sekund atp.

    Samza nám pomáhá tím, že má vlastní event loop a poskytuje nám dvě základní metody: process a window. Metoda process se zavolá, když přijde nová zpráva, metoda window se zavolá každých x sekund (konfigurovatelné). Výhodou je, že obě metody se vykonávají ve stejném vlákně (podobně jako v Node.JS), takže nehrozí žádné kolize kvůli více vláknům a podobně. Přitom toto primitivum nám stačí na implementaci všech předchozích scénářů.

  • Vezměme si příklad s filtrováním duplikovaných zpráv. Abychom byli schopni odstraňovat duplikované zprávy, musíme si nutně někde pamatovat ty zprávy, které už jsme zpracovali (nebo alespoň nějaká IDéčka, případně jejich hashe apod.). Když si tyto zprávy budeme ukládat do paměti, přijdeme o ně ve chvíli, kdy náš program spadne. Když si zprávy budeme ukládat do nějaké vzdálené databáze, bude to nejspíš pomalé. Samza to řeší tak, že si data lze uložit lokálně do key-value databáze, ta může být i in-memory, a každá změna v databázi je zálohována mimo server, na kterém běží Samza. Při restartu Samzy lze všechna data uložená do key-value databáze zpětně získat.

Toto všechno jsou problémy, na které bychom rychle narazili, kdybychom takový preprocessing zpráv psali sami od píky. Samza je řeší za nás, proto ji používáme. Samza dělá něco podobného jako Apache Spark, pokud znáte.

Jak Samza funguje

Samza je framework napsaný v Javě, který se spouští nad Hadoopem; využívá YARN. K čemu je to dobré? Hadoop cluster je hodně stručně řečeno hromada počítačů, na kterých běží Hadoop aplikace, která monitoruje, jaké procesy na daném stroji běží a kolik prostředků zabírají, případně kolik prostředků si procesy alokovaly. Součástí Hadoopu je i HDFS, distribuovaný file system, ale ten Samza téměř nevyužívá.

Pokud chceme v Hadoopu spustit nějakou novou aplikaci, řekneme to YARNu, což je část Hadoopu. Zároveň s tím řekneme, kolik prostředků budeme pro tu aplikaci potřebovat (2 GB RAM, 1 CPU, například). YARN se podívá, jestli je na nějakém počítači tolik volných prostředků a pokud ano, spustí tam danou aplikaci. My jako uživatelé komunikujeme pouze s YARNem a nestaráme se o to, na kterém počítači se nakonec naše aplikace spustí. YARN zároveň hlídá, jestli naše aplikace běží – pokud spadne, automaticky ji spustí znova.

Pro nás je důležité, že se nemusíme starat o distribuci naší Samza aplikace nebo nemusíme ručně zajišťovat, aby naše aplikace znova naběhla, pokud by náhodou spadla. Pokud navíc máte přehled o Hadoop ekosystému, můžete využít nějaké další programy na správu Samza aplikací.

Samza aplikace se může spustit na kterémkoliv počítači v Hadoop clusteru, což má i nepříjemné důsledky. Třeba nemá smysl dlouhodobě ukládat data na lokální disk – po pádu aplikace může YARN aplikaci spustit na úplně jiném počítači a Samza k těm souborům najednou nebude mít přístup.

Hello World v Samze

Samza je out-of-the-box propojena s Kafkou. To není úplně náhoda, protože Samzu vyvíjí LinkedIn, stejně jako Kafku. Nicméně existuje možnost, jak Samzu propojit i s jnými systémy. My ale využíváme Kafku, takže v dalších částech budu popisovat právě propojení s Kafkou.

V nastavení Samzy nejdříve určíme, jaké Kafka topicy chceme číst. My chceme číst dva topicy, takže to zapíšeme takto:

1
task.inputs=kafka.bidresponses,kafka.bidrequests

V kódu implementujeme rozhraní StreamTask, která má metodu process:

1
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    System.out.println(envelope.getMessage());
}
``` 

Pokaždé, když Samza přečte z topicu nějakou zprávu, zavolá metodu `process` a zprávu uloží do argumentu `envelope` spolu s dalšími metadaty. Ukázka nedělá nic jiného, než že danou zprávu vypíše na standardní výstup. V nastavení si můžeme zvolit, jaký (de)serializér bude pro zprávy použit:

systems.kafka.samza.msg.serde=string

1

Toto nastavení znamená, že každá zpráva, která samozřejmě přijde jen jako pole bajtů, bude serializována na Javový String. Můžete si napsat vlastní (de)serializér. 

Tohle jsou asi nejdůležitější nastavení a nejdůležitější funkce, které v Samze jsou. Pokud si chcete Samzu opravdu vyzkoušet, projděte si [Hello Samza](http://samza.apache.org/startup/hello-samza/0.8/). 


## Propojení s Kafkou

Základním stavebním prvkem Samzy je nějaký *StreamTask*, tedy třída, která má metodu `process` a která se stará o zpracovávání zpráv, které tečou do Samzy. Samza framework vytváří instanci nějakého StreamTasku na začátku běhu programu a tato instance přežívá až do konce aplikace. 

Samza dále vytváří instanci StreamTasku pro každou partition v Kafka topicu. Má-li náš topic 8 partitionů, vytvoří se 8 instancí našeho konkrétního StreamTasku. Každý StreamTask pak čte svou vlastní partition. Všechny instance poběží v jednom vlákně, nemusíme se tak bát nějakých synchronizačních problémů. 

### YARN kontejnery

Co ale dělat, když náš Samza job nestíhá zpracovávat zprávy, které do Kafky posíláme? Protože všechny instance StreamTasku běží by design v jednom vlákně, nestačí nám přidat CPU nebo něco podobného. Musíme změnit počet kontejnerů, ve kterých se Samza job spouští:

yarn.container.count=4

1

Kontejner je pojem z YARNu/Hadoopu, pro nás aktuálně stačí, když si představíme, že jeden kontejner = jeden Java proces = 1 využité CPU. V ukázce jsme nastavili čtyři kontejnery, což znamená, že když spustíme Samza job, vytvoří se celkem čtyři kontejnery / čtyři Java procesy, přičemž každý může zabrat až jedno celé CPU. V každém kontejneru se budou číst právě dvě Kafka partitiony. Nemá smysl nastavovat více než osm kontejnerů, protože jedna Kafka partition nemůže být čtena ve dvou kontejnerech. 

To ale vyvolává jeden problém: v jedné partition Kafky by mělo téct maximálně tolik zpráv, kolik jsme schopni zpracovávat na jednom CPU v Samze. Protože pokud by v jedné partition teklo více zpráv, nebudeme nikdy schopni v Samze zpracovávat aktuální data, budeme se neustále zpožďovat. Pokud nestíháme zpracovávat topic *bidrequests*, máme následující možnosti: 

- Napsat jiný, jednodušší, Samza job, který bude číst zprávy z jedné partiton topicu *bidrequests* a bude je ukládat do jiného topicu, například *bidrequests_splitted*, kterému zvětšíme počet partiton. Budeme číst zprávy z partiton 0 topicu *bidrequests* a budeme je posílat do partiton 0, 1, 2, 3 topic *bidrequests_splitted*. Zprávy z partiton 1 budeme posílat do 4, 5, 6, 7 atp. Tím docílíme toho, že v jedné partiton topicu *bidrequests_splitted* bude čtvrtina zpráv oproti jedné partiton topicu *bidrequests*. Pokud je v jedné partition tolik zpráv, že ani tato možnost nepřipadá v úvahu, zbývá prakticky jen následující možnost: 

- Zvýšit počet partiton topicu *bidrequests* a upravit producery. 

Takže ve stručnosti: volba `yarn.container.count` více méně nastavuje počet CPU, které může job sežrat a toto číslo nemůže být nikdy vyšší než počet partiton vstupního Kafka kanálu. 


### Více vstupních Kafka topiců

Jak se budou StreamTasky chovat, když máme na vstupu více Kafka topiců? Jednoduše tak, že se vytvoří tolik instancí StreamTasků, jaký je maximální počet partitionů ze všech topiců. Čteme-li čtyři topicy, které mají počet partitionů rovny 2, 5, 7, 6, tak se vytvoří celkem 7 instancí StreamTasků. Tohle moc smysluplné není, obvyklé je, že počet partitionů všech topiců je identický. 

Naše topicy *bidrequests* a *bidresponses* mají zatím 32 partitionů a spouštíme je v osmi kontejnerech; každý kontejner čte čtyři partitiony. Celkem se vytvoří 32 instancí StreamTasků. Každý StreamTask čte vždy stejnou partition napříč všemi topicy. Nultý StreamTask čte data z partition 0 topicu *bidrequests* a z partition 0 topicu *bidresponses*. Třetí StreamTask čte třetí partition *bidrequests* i třetí partition *bidresponses* atp. 

Jedním z úkolů, které náš Samza job dělá je, že páruje požadavky a odpovědi. Všechny požadavky a odpovědi, které se týkají jedné konkrétní imprese mají shodné *requestId*. Musíme být schopni zajistit, aby požadavek s *requestId* 4742 šel do stejné partition jako odpověď s *requestId* 4742. Jinak je nikdy nespárujeme, protože pokud bude požadavek v partition 2 a odpověď v partition 7, můžou je přečíst dva různé Samza joby, které potenciálně ani neběží na stejné mašině -- a i kdyby běžely, tak by se je přečetly dvě různé instance StreamTasku. 

Toho jsme docílili tak, že číslo partition, do které se mají požadavky a odpovědi posílat, jsme vypočítali na základě *requestId*, tj. asi takto:

function getPartitionNumber(message, numberOfPartitions) { return murmurhash(message.requestId) % numberOfPartitions; } ```

Napsal jsem, že kód Samza jobu běží v jednom vlákně, takže by se zdálo, že nikdy nemůže zabírat více než jedno CPU. V realitě ale pozorujeme, že náš Samza job zabírá třeba 130 % CPU. Přičítáme to tomu, že náš kód sice v jednom vlákně běží, ale kromě našeho kódu je tam ještě kód frameworku, který už nejspíš běží v jiných vláknech. V těch pak probíhá čtení zpráv z Kafky nebo zápis do Kafky, takže v součtu může jeden Samza job zabrat více než 1 CPU.

Příště…

Příště se podíváme na to, co je to ten windowing, jak lze využít metodu window a jaké jsou s tím spojeny problémy.