Programio

Jak zpracováváme velké množství dat: Lambda Architecture

Rozhodl jsem se napsat sérii článků, ve kterých popíši, jaké postupy a jaký software používáme k tomu, abychom zvládali zpracovávat provoz v řádu desítek tisíc zpráv za sekundu. Pracuji ve firmě Internet Billboard, kterou nejspíš znáte, aniž o tom víte. Všimli jste si někdy reklam na iDnesu, denik.cz, ČSFD…? Tak to, že se tam ty reklamy objevily, je naše práce; respektive práce mých předků. Jednoduše řečeno, dodáváme a vyvíjíme technologie pro zobrazování internetové reklamy. V podstatě vyvíjíme něco podobného jako je Google AdWords/AdSense.

Abyste byli v obraze, kolik dat musíme přibližně zpracovávat: Za rok 2014 jsme vyservírovali 243 392 081 935 zobrazení reklamních bannerů, což je v průměru 7700 zobrazení za sekundu; ve špičkách mnohem více. Ne že bychom měli provoz jako Facebook, ale na jednom MySQL stroji to taky neutáhneme. Teď se snažíme vytvořit novou verzi statistik, které toho budou umět zásadně více než ty předchozí, budou zpracovávat daleko více dat, budou dobře škálovatelné a vůbec se budou motat kolem termínů jako stream processing, event sourcing, reactive, cep, …

Real-time bidding

Pojďme se nejdříve podívat na to, jaký konkrétní problém aktuálně řešíme. Současný trend v internetové reklamě je Real-time bidding (RTB), což je jakási real time aukce reklam. Co se všechno stane předtím, než vám AdBlock zablokuje zobrazení reklamy v prohlížeči?

  1. Přijdete například na ČSFD. Stránka pošle na naše servery požadavek, ve kterém žádá o reklamu, kterou má uživateli zobrazit.
  2. My požadavek přijmeme a spustíme aukci. Problém je, že reklamy, které se účastní aukce, nemáme na našich serverech, ale jsou na serverech někoho úplně jiného. Těmto vzdálených serverům říkáme Demand Side Platform. Všem DSPéčkům pošleme HTTP požadavek typu “Hele, mám volnou reklamní pozici 250x250 na ČSFD. Nechcete tam zobrazit nějaký svůj banner?”
  3. Každé DSPéčko nějak odpoví. Buď “Nechci.”, nebo “Chci a nabízím za zobrazení pět korun. Tady máte kód reklamy.”
  4. My si z těchto došlých odpovědí vybereme nejlepší nabídku; typicky takovou reklamu, za kterou DSPéčko nabízí nejvíce peněz.
  5. Pošleme odpověď zpět na ČSFD, kde se Javascriptem vloží reklama do stránky.

 Nové výzvy

Současné statistiky fungují dobře a obsahují data nezbytná k našemu podnikání, ale trpí především tím, že nejsou moc dobře horizontálně škálovatelné. Vrátíme-li se k RTB, tak bychom například rádi měli zaznamenané všechny požadavky a odpovědi, které během aukce nastanou. To se dříve nedělalo, protože to nebylo potřeba, ale čas ukázal, že by se tato data – a hlavně jejich analýza! – mohla hodit.

Celý seriál se bude točit o tom, jak ukládáme a zpracováváme právě požadavky poslané DSPéčkům a odpovědi od nich. Takových dat nebude málo, přičemž s časem počet těchto zpráv poroste, protože roste provoz v celém RTB. Tyto zprávy musí ze serverů nějakým způsobem odtéct a musí dotéct někam, kde ty zprávy budeme uchovávat a analyzovat.

Jaké bylo hlavní zadání nových statistik?

  • Musí být horizontálně škálovatelné. Pokud by za námi ráno přišel Jimmy Wales, že by rád zobrazoval naši reklamu na Wikipedii, my dopoledne nakoupíme nové servery, odpoledne tam nainstalujeme a nakonfigurujeme potřebný software a večer musíme být schopni servírovat na Wikipedii reklamu.

  • Musí být realtime. Současné statistiky jsou zpožděné přibližně o jednu hodinu. Pokud si uživatel teď vygeneruje report, nebudou v něm obsaženy imprese a kliky, které se staly před třemi minutami. Nyní chceme, aby tato prodleva prakticky neexistovala – aby nebyla větší než pár sekund. A co třeba grafy na dashboardu, které ukazují aktuální data a ještě se automaticky aktualizují třeba pomocí Socket.io? Pokud bychom se rozhodli, že takovou funkcionalitu chceme, musíme být schopni ji v pohodě naprogramovat a obsloužit.

  • Musí být výpadkuvzdorné. Ideálně bychom chtěli, aby byla každá součást replikovaná a výpadek jedné ze služeb by tak neměl žádný vliv na výsledné statistiky. Ruční údržba připadá v úvahu, až když toho popadá opravdu hodně. Přitom bychom měli být schopni znovu zpracovat i stará data, pokud se kvůli výpadkům nezpracovala správně.

Architektura nových statistik

To, co se snažíme vybudovat, má dokonce i svůj buzzword: Lambda Architecture (viz How to beat the CAP theorem). Principem je, že máme realtime větev, kterou zpracováváme aktuální data. Tato větev zajišťuje, že každá událost, která nastane, bude za pár sekund dotazovatelná. Nicméně tato větev je obecně nespolehlivá, protože přese všechny replikace se může stát, že se něco pokazí, něco se někam nepřesune jak má, nestihnou se zpracovat nějaké události, které dorazily později, než se očekávalo a podobně.

Proto vedle realtime větve máme ještě batch větvi. Ta se spouští jednou za čas, dejme tomu jednou za den, a přepočítává stará data tak, aby byly výsledky přesné. Takže ve dvě ráno můžeme spustit nějaký batch job, který přepočítá data za včerejšek a ta data, která včera zpracovala realtime větev zahodíme.

Zatímco realtime větev musí být optimalizovaná tak, aby průchozí data zpracovávala rychle, batch větev je optimalizována tak, aby zpracovávala data přesně.

Kappa architecture

Batch větev ale ještě nemáme úplně pořešenou. Hlavní problém Lambda architektury totiž je, že se jen těžko vyhneme zdvojenému kódu. Musíme mít nějaký kód, který bude zpracovávat data v realtime větvi a pak musíme mít nějaký kód, který bude zpracovávat data v batch větvi – tato část se píše například jako MapReduce job v Hadoopu. Výstupem by měla být stejná data a je otázkou, jak moc lze sdílet zdrojový kód v obou větvích. Psát stejný kód dvakrát pro dva různé systémy samozřejmě moc nechceme…

Proto ještě uvažujeme o jiné architektuře, ta dostala jména Kappa architecture, která předpokládá, že ohneme realtime větev tak, aby šla použít i na batch zpracování o den později. Pak bychom měli jen jeden dev-stack a batch větev by byla klasická “realtime větev”, pouze s jiným nastavením.

Jaký software používáme

K tomu všemu používáme tohle:

  • Vstupním bodem všech našich nových aplikací jsou servery psané v Node.JS (starý server byl psán v Céčku), které běží v nějakém Clusteru. Když jeden server přestane stačit, přidá se jednoduše další a na Nginxu se rozdělí provoz.

  • Většina dat, se kterými pracujeme, nakonec skončí v Hadoopu, což je distribuovaný file system, který má zároveň API pro psaní jobů nad daty v něm uloženými (MapReduce). My vlastně v současnosti Hadoop využíváme jen jako externí závislost ostatních programů, které do něj ukládají svá data. V budoucnu nejspíš sepíšeme zapojíme i nějaký ten Spark nebo Hive a zkusíme analyzovat data přímo v Hadoopu. Ale zatím to neděláme.

  • Potřebujeme-li data dostat z jednoho serveru na druhý, používáme k tomu Kafku. Kafka (opravdu je pojmenována podle spisovatele) je distribuovaný systém pro posílání zpráv, který vyvinul LinkedIn. Na jedné straně do Kafky všechny naše Node.JS servery posílají data (obvykle nějaký JSON) a na straně druhé konzumenti tato data zase čtou a něco s nimi dělají. Kafka je krásně distribuovaná, takže můžeme snadno nastavit, že se všechna data mají replikovat na x fyzicky odlišných serverů a podobně.

  • Data, která prochází Kafkou, potřebujeme nějak dále upravovat – k tomu používáme další open source projekt od LinkedInu, a to Samzu. Samza umí číst zprávy z Kafky, provést nějaký námi napsaný kód (odstranit duplikované zprávy, zvalidovat data, přidat ke zprávě nějaká data z databáze…), a zase data uložit zpět do Kafky nebo je poslat přímo do databáze.

  • Data z Kafky do Hadoopu dostáváme pomocí aplikace Camus. Je to MapReduce job, je to open source … a vyvíjí to LinkedIn. Jo a nečte se to [kejmus], hehe, ale [kamy], protože Camus byl francouzský spisovatel.

  • Srdcem našich statistik je Druid – distribuovaná (jak jinak), vysoce škálovatelná, databáze, která umožňuje zpracovávat velké množství dat a okamžitě tato data dávat k dispozici pro dotazování. Druid je optimalizovaný jak pro zpracování realtime dat, tak i pro ukládání historických (třeba pět let starých) dat. Dále obsahuje podporu pro aproximační algoritmy jako je HyperLogLog pro odhadování počtu unikátních hodnot ve sloupci nebo pro aproximaci histogramu, díky kterému můžeme například odhadnout medián nad nějakým sloupcem, který obsahuje klidně několik desítek miliard neseřazených čísel rozházených mezi deset serverů. Do Druidu dostáváme data přes Tranquility.

V čem to všechno programujeme

Vlastně toho na programování zase tak moc není. Většinu z těch aplikací “stačí” jen správně nakonfigurovat. Což bylo občas šílené peklíčko, protože když něco nefungovalo, museli jsme procházet logy na deseti různých serverech a když jsme měli štěstí, tak jsme v některých z nich opravdu nalezli nějakou chybu a když jsme měli jóó velké štěstí, tak jsme z té chyby hned pochopili, co je špatně. Nejblbější chybu, jakou jsme udělali, byla v tomto nastavení:

druid.announcer.type=batch 

Správně to má být samozřejmě takto:

druid.announcer.type=batch

Že je to stejné? A co ta mezera na konci v první ukázce, hmm?! OMFG! Ale stačily dva dny debugování a našli jsme to.

Protože všechny výše zmíněné aplikace jsou psané v Javě nebo v nějakém JVM jazyku jako Scala, tak i my jsme byli v podstatě donuceni psát všechno v Javě. Zkoušeli jsme Groovy, ale s tím byly spíš problémy a navíc bylo dynamicky zkompilované Groovy významně pomalejší než staticky zkompilované Groovy nebo Java. Groovy ale dále používáme pro testy, používáme Spock. Přešli jsme alespoň na Javu 8, která má lambda výrazy, streamy a takové hezké věci jako Optional<T>.

Současný stav

Naše statistiky jsou zatím ve velmi rané alpha verzi, kdy ani nemáme žádné pořádné GUI a v ostrém provozu běžely jen několik málo dnů – ale běžely bez problémů. Popis naší architektury v následujících článcích tak berte jako popis toho, jak postupujeme, proč jsme se rozhodli, že zkusíme to a ne tamto, na jaké problémy jsme narazili, jaké problémy jednotlivé aplikace řeší, ale prozatím nečekejte žádné zkušenosti z tříletého provozu.

A jak všechny tyto technologie řeší ony tři problémy, které jsem nastínil nahoře?

  • Každá technologie je horizontálně škálovatelná – obyčejně stačí zprovoznit nový server a přidat ho do Kafka/Hadoop/Druid clusteru a propustnost celého clusteru se rázem zvýší. Pro příklad: v LinkedInu přes Kafka proteče 500 miliard zpráv denně na bůhví kolika serverech.
  • Kafkou i Samzou protékají data velmi rychle. Pokud to uměle nezdržujeme, protečou všechna data celým kolotočem za pár set milisekund. Pak už jen zbývá Druid jako finální databáze – a ta se také chová tak, že co do něj dostaneme, to je hned dotazovatelné, bez dalšího významného zpoždění. U některých typů úloh se proto jsme schopni dostat na velmi nízké zpoždění.
  • U Kafky a u Druidu jsme schopni konfiguračně nastavit replikaci daného úkolu. Jsme schopni zařídit, aby se každá úloha vykonávala na dvou různých serverech, takže jsme odolní vůči výpadku některé ze služeb. U Samzy zase Hadoop hlídá, jestli daná aplikace běží a pokud ne, tak ji automaticky restartuje. Dokud nenastane nějaká větší porucha, měli bychom být v pohodě. Teoreticky.

Příště…

Příště si povíme něco málo o Kafce.

Stay tuned!

(A děkuji Michalovi za korekturu!)