Programio

Druid.io: distribuovaná immutable databáze pro analytické výpočty

Druid.io používáme pro ukládání všech statistických a analytických dat. Je to vysoce škálovatelná distribuovaná immutable databáze, která obsahuje přímou podporu pro agregaci dat a pro aproximační algoritmy jako je hyperloglog nebo histogram.

Dlouhá cesta k Druidu

Než se dostaneme k Druidu, podíváme se na to, jak řešíme ukládání statistických dat teď, co jsme plánovali zkusit a proč jsme nakonec zvolili Druidíka. Vše si ukážeme na nejjednodušším příkladu ukládání impresí (tj. zobrazení reklamního banneru). Naše vstupní data vypadají nějak takto:

datetime campaignID userId web browser
2015-15-03 14:10:54 336 40BBF8 csfd.cz Chrome
2015-15-03 14:10:55 1344 9896AA idnes.cz Chrome
2015-15-03 14:11:01 1344 40BBF8 idnes.cz Firefox
2015-15-03 14:11:05 3738 43D03E csfd.cz Safari
2015-15-03 14:11:18 1344 9896AA idnes.cz Chrome

Každý řádek reprezentuje jednu impresi. Z těchto surových dat chceme získávat tyto informace:

  • Kolik impresí dosud udělala kampaň XYZ?
  • Kolik unikátních zobrazení udělala kampaň XYZ za uplynulý týden?
  • Kolik unikátních zobrazení udělaly všechny kampaně klienta ABC?

…a mnoho dalších, toto jsou ty nejzákladnější dotazy. Hlavním problémem samozřejmě je, že těchto dat máme hodně, řekněme zhruba deset tisíc impresí za sekundu, což dělá necelou miliardu denně.

Cesta SQL

V současné době máme data v SQL databázi, konkrétně InfoBright. Běžné dotazy zvládá odpovídat rychle, nicméně pokud klient požádá o nějaký složitější report, ve kterém se musí hodně joinovat, může si na odpověď počkat i několik dlouhých desítek minut.

Celkově vzato se SQL databáze blbě replikuje a blbě sharduje. Shardováním myslím to, že databáze běží na dvou serverech, v každé databázi je polovina dat a když vy položíte do databáze dotaz, automaticky se provede na obou serverech, obě databáze tento dotaz vyhodnotí a pak nějaký driver tyto dva dílčí výsledky spojí do jednoho. Zkrátka možnost databázi horizontálně škálovat. Proto zatím škálujeme jenom vertikálně.

Jednou z hlavních nevýhod Infobright je navíc to, že je dost drahá.

Cesta NoSQL a předpočítávání výsledků

Když jsme tento problém začali řešit, rozhodli jsme se jako první zkusit data předpočítávat. Chtěli jsme vzít nějakou NoSQL Key-value databázi a ukládat do ní data typu:

  • “14. 3. vidělo reklamu XYZ na ČSFD 35 952 lidí, z toho 9744 v Chrome, 4788 ve Firefoxu, 0 v Opeře, …”
  • “15. 3. vidělo reklamu XYZ na ČSFD 28 434 lidí, z toho 4410 v Chrome, 2772 ve Firefoxu, 0 v Opeře, …”
  • atd.

Vytáhnout počet impresí pro daný den pro daný prohlížeč je pak otázkou vytáhnutí jednoho řádku z databáze. Jenomže tento způsob ukládání trpí několika problémy:

  • Ve skutečnosti se nikdy nebude tahat pouze jeden řádek. Když budeme chtít znát statistiky za jeden týden, musíme vytáhnout sedm řádků a výsledky sečíst. Navíc chceme podporovat různá časová pásma, což znamená, že bychom předpočítaná data neměli ukládat pod denní granularitou, ale pod hodinovou granularitou. Ale to není velký problém.

  • Imprese sice lze sčítat, ale počet unikátních uživatelů už se sčítat nedá. Pokud bychom v sobotu měli 1000 unikátů a v neděli 1500 unikátů, neznamená to, že za oba dny tam máme 2500 unikátů. Museli bychom proto předpočítávat počty unikátů pro každý možný časový interval (od 17. 3. do 23. 3 tam bylo 1500 unikátních uživatelů, od 17. 3. do 24. 3. 1750 unikátů atp.) nebo přinejmenším alespoň pro běžné intervaly jako je minulý týden, celý běh kampaně, posledních sedm dní apod. Předpočítávat pro všechny intervaly bude velice náročné, předpočítávat jen některé časy se chvíli zdálo jako docela přijatelné řešení.

  • S každou novou dimenzí exponenciálně roste počet klíčů, pro které musíme počítat výsledky. Pokud bychom zobrazovali reklamu na tisíci webech a ukládali si informaci o deseti prohlížečích, museli bychom si ukládat až 1000 · 10 = 10 000 záznamů za každou hodinu. Pokud bychom dále ukládali i tři hlavní operační systémy, dostaneme se až na 10 000 · 3 = 30 000 záznamů každou hodinu. A tak dále s každou novou dimenzí, podle které bychom chtěli filtrovat. To není dlouhodobě zvládnutelné.

Dude, I can store that in memory

Zajímavé je, že přesně touto cestou si prošli i Metamarkeťáci, autoři Druidu, a stejně jako my přišli na to, že ani SQL databáze, ani předpočítávání není ta správná cesta. Tehdy to dopadlo tak, že se nějaký Metamarkeťák podíval na data, která potřebovali prohledávat a pronesl památnou větu: “Dude, I can store that in memory”. Toho se chytli a vznikla první verze Druidu, distribuované databáze, která všechna data, nad kterými se volaly dotazy, držela v paměti. Byl to dobrý nápad? Ano, i ne.

  • Protože jsou všechna data v paměti, jsou dotazy bleskurychlé.
  • Protože je Druid distribuovaná databáze, nejste omezení velikostí RAMky na jednom počítači. “Přidat” paměť můžete tak, že přidáte nový server do clusteru. Potřebujete-li deset tera paměti, můžete vytvořit cluster ze sto počítačů, každý po sto GB paměti. Druid si s tím poradí.

Otázkou zůstává, jestli se to vyplatí. Nebude to moc drahé? Naše současné denní statistiky mají v surové podobě přibližně 50 GB. Za rok by to dalo přibližně 18 TB. Pokud bychom toto množství paměti nakoupili jako cloudovou službu u Amazonu, stálo by nás to $210. Za hodinu. Za měsíc by to byly skoro čtyři miliony Kč. (75 instancí r3.8xlarge) No, není to úplně málo.

Snižujeme velikost dat

Zkusíme snížit velikost dat.

  1. V těch 50 GB jsou i data, která ve skutečnosti pro statistiky nepotřebujeme – odstraníme je.

  2. Můžeme nějak rozumně komprimovat data. Místo toho, abychom do databáze ukládali string “chrome”, tak tam uložíme nějakou číselnou konstantu, která tento prohlížeč reprezentuje.

  3. Blbá jsou IDéčka uživatelů. Pokud chceme spočítat počet unikátních uživatelů, kteří viděli nějakou reklamu, musíme u každé imprese uložit i to, jaký uživatel ji viděl, tj. nějaké jeho unikátní ID. Unikátní ID má ale tu blbou vlastnost, že se velmi špatně komprimuje, protože … no zkrátka protože jsou to příliš unikátní hodnoty. Jak se řeší problém si řekneme v příštím článku, pro teď předpokládejme, že sloupec s IDečky odstraníme.

Agregujeme

V tuto chvíli jsme docela slušně zredukovali velikost dat, ale stále ještě nejsme u konce. Můžeme totiž data dále agregovat. Stále máme pro každou impresi jeden řádek, ale to my ani tak moc nepotřebujeme. Potřebujeme vědět, kolik impresí se událo za danou hodinu. Můžeme vzít takovouto tabulku…

datetime campaignID web browser
2015-15-03 10:52:54 1344 idnes.cz safari
2015-15-03 10:52:58 1344 idnes.cz safari
2015-15-03 10:53:14 3738 csfd.cz chrome
2015-15-03 10:53:15 1344 idnes.cz safari
2015-15-03 10:53:24 1344 idnes.cz chrome
2015-15-03 10:53:25 1344 csfd.cz chrome
2015-15-03 10:53:29 3738 csfd.cz chrome
2015-15-03 10:53:32 3738 csfd.cz chrome
2015-15-03 11:02:38 1344 csfd.cz safari
2015-15-03 11:02:43 1344 csfd.cz safari

…a můžeme ji agregovat na takovouto tabulku:

datetime campaignID web browser impressions
2015-15-03 10:00:00 1344 idnes.cz safari 3
2015-15-03 10:00:00 1344 idnes.cz chrome 1
2015-15-03 10:00:00 1344 csfd.cz chrome 1
2015-15-03 10:00:00 3738 csfd.cz chrome 3
2015-15-03 11:00:00 1344 csfd.cz safari 2

V první tabulce máme co jedno zobrazení reklamy, to jeden řádek. Ve druhé tabulce už jsme si spočítali, že na iDnes.cz byly v Safari v dané hodině a pro danou kampaň tři imprese. Druhá tabulka obsahuje stejně informací jako ta první, s výjimkou časového razítka. Z druhé tabulky už nevyčteme, kolik impresí bylo mezi 10:52:00 a 10:53:00. Ale to nás už moc nezajímá, hodinová granularita nám stačí.

Jak moc se dále data agregují záleží na počtu dimenzí, na jejich kardinalitě, tj. kolik různých hodnot v sloupci máme a na časové granularity – agregace bude větší, když se bude agregovat po dnech než po hodinách. Tato agregace může snížit velikost dat klidně na deset procent, ale také může snížit velikost jen o deset procent; záleží na datech.

A jak jsme si pomohli? Se všemi kompresemi a agregacemi bychom se mohli dostat na řádově nižší velikost dat. Pokud bychom předpokládali, že bychom dokázali data takto zredukovat z 50 GB na 2,5 GB, stál by nás cluster na Amazonu dvacetkrát méně, což znamená 200 000 Kč za měsíc. Ne že by to bylo málo, ale už se o tom dá uvažovat.

Dámy a pánové, Druid.io

Dobře, už jsme si spočítali, že bychom možná dokázali všechna data vměstnat do paměti a že by to nemuselo být úplně drahé. Co nám tedy Druid nabízí, že jsme nakonec skončili u něj?

Druid je distribuovaná databáze napsaná v Javě vhodná pro statistická a analytická data. Není například vhodná pro ukládání textů, takže redakční systém iDnes.cz bych na něm nestavěl. Jaké jsou klíčové vlastnosti?

  • Zvládá běžné databázové operace jako jsou filtry nebo grupování; nezvládá join – proto jsou typicky data uložená denormalizovaně. Nad sloupcem umí spočítat počet unikátních hodnot, průměr, medián, jiný percentil, rozložení hodnot. Obecně dokáže nad sloupcem udržovat aproximaci histogramu a z něj spočítat hromadu zajímavých údajů. Všechno z toho dělá Druid distribuovaně na několika různých serverech.

  • Je škálovatelný. Každá složka Druidu je škálovatelná, pokud už současná velikost clusteru nezvládá zpracovávat dotazy nad daty, snadno lze přidat další stroje, které si k sobě stáhnou část dat a přeberou tak i část dotazů. Každý dotaz se typicky vyhodnocuje na několika Druidích serverech současně. Můžete si to představit tak, že máme například 12 Druidích serverů, na každém jsou data za jeden měsíc roku a dotaz na statistická data za uplynulý půl rok by se paralelně vykonával na šesti serverech.

  • Každou část lze replikovat. Stejně jako třeba u Kafky lze i Druidu jednoduše nastavit replikační faktor. Nastavíme-li ho na dva, budou všechna data uložena na dvou odlišných strojích. Pokud jeden ze serverů vypadne, dotazy se přesměrují na druhý stroj.

  • Je realtime. Druid zvládá indexovat desetitisíce nově příchozích zpráv za sekundu a tyto události dává okamžitě k dispozici pro dotazování.

  • Všechna data jsou immutable. Druid v podstatě zvládá pouze append dat. Neumí žádný update. Každá zpráva, kterou appendujeme, musí mít časové razítko, aby šlo určit, do kterého časového intervalu patří. Do Druidu proto patří události, tj. data, která nastala v nějaký okamžik a tato data jsou na vždy platná. Ve chvíli, kdy Druid zpracuje včerejší data, se už tato data nemůžou nijak změnit. Potřebujete-li změnit stará data, není jiné cesty než stará data smazat a vše znova přepočítat (což se běžně dělá).

To jsou asi věci, které jsme všichni čekali, když se náš seriál zabývá budováním statistik se zaměřením na škálovatelnost, odolnost vůči výpadkům a na nízkou latenci. Co dále Druid umí? Všecho, o čem jsme se bavili v předchozí kapitole:

  • Druid používá různé kompresní mechanismy pro zmenšení velikost dat, která do něj vkládáme. Například i slovníky, takže můžeme do Druidu valit data jako je “chrome” a Druid se už sám postará o to, aby se daného sloupce uložila nějaká číselná konstanta.

  • Druid zvládá agregovat data postupem, který jsme si ukázali výše. Stačí mu nastavit, které sloupce má jak agregovat a nastavit časovou granularitu. Pokud Druid najde v dané hodině dva stejné řádky, spojí je do jednoho a uloží k němu informaci o tom, že tento řádek reprezentuje dvě imprese. O agregaci budeme více mluvit příště.

  • Druid má přímou podporu pro HyperLogLog, díky čemu se můžeme efektivně zbavit sloupce s IDečkami uživatelů. Díky Druidovi jsme schopni velmi přesně odhadnout počet unikátních uživatelů, kterým se zobrazila nějaká reklama bez toho, aniž bychom fakticky měli uloženo, komu se jaká reklama zobrazila. Díky sjednocení HyperLogLogu bude fungovat i agregace.

  • Druid je schopný udržovat již zmiňovanou aproximaci histogramu pro daný sloupec.

Dobře, je to všechno hezké, ale … fakt musí být vše v paměti?

Nemusí! I kluci z Metamarkets si asi časem uvědomili tři věci:

  • Když už spotřebovaná paměť leze do jednotek TB, tak už tak moc levná není.
  • SSD vlastně nejsou tak úplně pomalé.
  • Opravdu je nutné držet v paměti tři roky stará data jenom proto, abychom je byli schopni dotázat jednou za uherský rok?

Současná verze Druidu proto umožňuje dotazovat i ta data, která jsou uložená na disku a nejsou přímo načtená v paměti. Je to samozřejmě pomalejší, ale pořád je to dostatečně rychlé. Zároveň umožňuje definovat pravidla, kam se mají jaká data ukládat. Takže můžeme vytvořit pravidla, že data za poslední tři měsíce budou na nejrychlejších strojích s hromadou paměti, zatímco starší data budou na horších strojích s méně paměti. Idea je, že v 90 % případů stejně taháme nová data, ne data několika let stará.

Není to nijak zvlášť nový přístup, takový Twitter používá něco velmi podobného. Aktuální tweety udržuje v paměti, zatímco staré tweety má uložené na SSD (a ještě k tomu trošku vyladili kernel, ale kdo z nás to občas nedělá, že?).

Příště…

Příště se podíváme na architekturu Druidu.