Jak Druid.io agreguje data
Z minulého dílu víme, že Druid typicky čte zprávy z nějakého messaging systému, tvoří segmenty a tyto segmenty poté proplouvají různými částmi systému. Dnes se podíváme na to, jak vypadají data v jednotlivých segmentech.
Agregujeme
Druid je sloupcová databáze, pro ilustraci si můžeme zpracovaná data zobrazit jako tabulku. Pokud bychom Druidu poslali tyto dvě zprávy
1 | { |
v Druidu by se poskládala tato tabulka:
timestamp | campaignID | web | browser |
---|---|---|---|
2015-03-17T20:40:00Z | 336 | csfd.cz | Chrome |
2015-03-17T20:40:02Z | 1344 | idnes.cz | Chrome |
Dosud nic světoborného. Zázraky by se začaly dít, až by Druid přečetl například tuto zprávu:
1 | { |
…protože ta se od úplně první zprávy liší pouze v čase (20:40:00 vs 20:40:27), ostatní vlastnosti jsou stejné. Jsou to dvě stejné události, které pouze nastaly v jiný čas. Druid by samozřejmě mohl tuto zprávu uložit jako třetí řádek do tabulky
timestamp | campaignID | web | browser |
---|---|---|---|
2015-03-17T20:40:00Z | 336 | csfd.cz | Chrome |
2015-03-17T20:40:02Z | 1344 | idnes.cz | Chrome |
2015-03-17T20:40:27Z | 336 | csfd.cz | Chrome |
…ale efektivnější bude, když provede agregaci a jen si u prvního řádku uloží, že tato událost nastala dvakrát:
timestamp | campaignID | web | browser | impressions |
---|---|---|---|---|
2015-03-17T20:00:00 | 336 | csfd.cz | Chrome | 2 |
2015-03-17T20:00:00 | 1344 | idnes.cz | Chrome | 1 |
Tyto tabulky reprezentují stejná data, u druhé jsme pouze ztratili informaci o tom, kdy která událost přesně nastala. Všimněte si, že ve druhé tabulce už máme v timestampu pouze hodinu, bez přesného času. První řádek tabulky říká, že v čase 20:00:00 až 20:59:59 nastaly pro kampaň 336 na ČSFD dvě imprese.
Druid při agregaci postupuje tak, že porovnává hodnoty ve všech sloupcích a pokud se zprávy liší pouze v čase, spojí tyto zprávy do jedné a za každou zprávu inkrementuje sloupec impressions. Název nově vzniklého sloupce specifikujeme ve specFilu:
1 | "aggregators": [ |
Tímto vytvoříme agregátor count
, který se bude ukládat do sloupečku impressions. Druid zná tři druhy sloupců:
- čas – reprezentuje, pro kterou hodinu jsou data platná. V příkladu jde o sloupec “timestamp”.
- dimenze – to jsou naše „syrová“ data, jsou to sloupce
campaignID
,web
abrowser
. - metrika – vypočtené hodnoty, sloupec
impressions
.
Další agregátory
Druid má podporu pro několik dalších agregátorů. Mohli bychom například přidat informaci o ceně za impresi; tabulka před agregací by vypadala takto:
timestamp | campaignID | web | browser | price |
---|---|---|---|---|
2015-03-17T20:40:00Z | 336 | csfd.cz | Chrome | 0.17 |
2015-03-17T20:40:02Z | 1344 | idnes.cz | Chrome | 0.19 |
2015-03-17T20:40:27Z | 336 | csfd.cz | Chrome | 0.43 |
Podle toho, co dosud víme, by se žádné řádky nespojily v jeden, protože všechny řádky se liší přinejmenším v ceně. To je ale docela nešikovné. Proto můžeme přidat doubleSum
agregátor, který zařídí, aby se během agregace hodnoty ve sloupcích sečetly:
1 | { |
Vlastnost fieldName
říká, z kterého sloupečku má hodnoty číst a vlastnost name
do jakého sloupečku má Druid agregovaná data ukládat. Protože máme oba názvy stejné, budou se data ve sloupečku price přepisovat. Nyní už může dojít k agregaci. Druid ví, že má porovnávat pouze hodnoty ve sloupcích campaignId
, web
a browser
, zatímco sloupec price
má vždy jen sečíst. Proto spojí první a třetí řádek do jednoho:
timestamp | campaignID | web | browser | impressions | price |
---|---|---|---|---|---|
2015-03-17T20:00:00 | 336 | csfd.cz | Chrome | 2 | 0.6 |
2015-03-17T20:00:00 | 1344 | idnes.cz | Chrome | 1 | 0.19 |
Nyní už ve sloupci price
nemáme cenu jedné imprese, ale součet cen všech impresí v daném časovém úseku, pod danou kampaní, na daném webu a s daným prohlížečem. Z takové agregované tabulky už bychom samozřejmě nikdy nezjistili, jaká byla cena za jednu impresi.
Unikátní uživatelé
Co když do zprávy přidáme i informaci o tom, kterému uživateli se banner zobrazil? Data před agregací:
timestamp | campaignID | web | browser | userId |
---|---|---|---|---|
2015-03-17T20:40:00Z | 336 | csfd.cz | Chrome | 40BBF8 |
2015-03-17T20:40:27Z | 336 | csfd.cz | Chrome | 9896AA |
Události nastaly ve stejnou hodinu, ale liší se v hodnotě userId
. Druid by proto tyto dvě zprávy uložil do dvou řádků. To je ale dost prekérní situace, protože nyní agregace proběhne jen tehdy, týkají-li se dva řádky stejného uživatele, což nemusí nastat moc často.
V tuto chvíli se musíme zamyslet nad tím, na co data ve skutečnosti potřebujeme. Jde-li nám jen o statistické dotazy typu „kolik unikátních uživatelů vidělo reklamu XYZ na webu ZXY“, můžeme si pomoci tím, že použijeme pravděpodobností algoritmy. Ty nám z dostupných dat nespočítají přesný počet unikátních uživatelů, ale odhadnou počet unikátních uživatelů s nějakou chybou (typicky pod dvě procenta). To je většinou přijatelné. Výhodou pravděpodobnostních algoritmů pak je, že mají nižší paměťové nároky a jsou rychlejší.
Druid zná HyperLogLog, což je v současné době nejlepší algoritmus pro odhadování unikátních hodnot v multimnožině. Přidáme do našeho specFilu nový agregátor hyperUnique
:
1 | { |
Abyste plně pochopili, co se teď bude dít, je nutné vědět jak funguje HyperLogLog a hlavně jak funguje sjednocení hyperloglogu. Stručně řečeno, HyperLogLog je algoritmus, který převádí hodnoty ve sloupci na vektory, například na [0, 0, 3, 0], z nichž lze později odhadnout počet unikátních hodnot v daném sloupci.
Velkou výhodou je, že máme-li dva vektory [0, 3, 0, 0] a [0, 2, 0, 0], které reprezentují dvě různé hodnoty A
a B
, můžeme reprezentovat množinu {A, B}
tak, že vektory „sjednotíme“ (vypočítáme maximum po složkách):
1 | [0, 3, 0, 0] [0, 2, 0, 0] ------------ [0, 3, 0, 0] |
Druid při agregaci vypočte HLL vektor a ten uloží do tabulky do sloupce uniqueUsers
a hodnotu userId
zahodí. Před finální agregací bude tabulka vypadat takto:
timestamp | campaignID | web | browser | impressions | uniqueUsers |
---|---|---|---|---|---|
2015-03-17T20:00:00 | 336 | csfd.cz | Chrome | 1 | [0, 3, 0, 0] |
2015-03-17T20:00:00 | 336 | csfd.cz | Chrome | 1 | [0, 2, 0, 0] |
Protože HLL vektory umíme sjednotit, můžeme tyto dva řádky spojit do jednoho. Výsledná agregovaná tabulka by vypadala takto:
timestamp | campaignID | web | browser | impressions | uniqueUsers |
---|---|---|---|---|---|
2015-03-17T20:00:00 | 336 | csfd.cz | Chrome | 2 | [0, 3, 0, 0] |
Z těchto dat umíme vyčíst přesný počet impresí pro danou kampaň, daný web a daný prohlížeč za daný čas. Z daných dat také umíme získat přibližný počet unikátních uživatelů, kterým se tento banner zobrazil. Důležité je, že i když poroste počet zobrazení daného banneru, stále nám pro uložení stačí jeden řádek. Pokud by kampaň vyprodukovala 15 874 impresí u 6 098 unikátních uživatelů, mohl by řádek vypadat nějak takto:
timestamp | campaignID | web | browser | impressions | uniqueUsers |
---|---|---|---|---|---|
2015-03-17T20:00:00 | 336 | csfd.cz | Chrome | 15 874 | [15, 17, 10, 14] |
Počet impresí máme v tabulce přesně, počet unikátů by nám Druid odhadl z vektoru [15, 17, 10, 14]
. Přitom nikde nemáme uložená IDéčka uživatelů. Nepotřebujeme je.
Sloupec uniqueUsers
představuje další metriku v naší tabulce.
Časová granularita
Dosud jsme stále pracovali s tím, že se data agregují v rámci jedné hodiny. Jako většina věcí, i toto je v Druidu nastavitelné:
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "day",
"queryGranularity" : "hour"
}
Teď se budeme zabývat vlastností queryGranularity
, která nám udává, za jaký časový úsek se budou agregovat zprávy. Nastavíme-li hodinovou queryGranularity, pak se nám zprávy budou agregovat vždy v rámci jedné hodiny (14:00 až 14:59:59…), tzn., že za jeden den budeme mít pro každou kombinaci dat 24 různých řádků pro 24 hodin. Jeden příklad lepší než tisíc slov (A pro jednoduchost vynecháme sloupec userId, protože Hyperloglogu stejně nikdo nerozumíte):
timestamp | campaignID | web | browser |
---|---|---|---|
2015-03-20T18:50:00Z | 36708 | csfd.cz | Chrome |
2015-03-20T18:51:09Z | 36708 | csfd.cz | Chrome |
2015-03-20T18:51:12Z | 36708 | csfd.cz | Chrome |
2015-03-20T18:51:35Z | 36708 | csfd.cz | Chrome |
2015-03-20T18:51:35Z | 13650 | csfd.cz | Chrome |
2015-03-20T18:51:35Z | 13650 | csfd.cz | Chrome |
… | … | … | … |
2015-03-20T19:14:27Z | 36708 | csfd.cz | Chrome |
2015-03-20T19:15:38Z | 36708 | csfd.cz | Chrome |
2015-03-20T19:16:01Z | 36708 | csfd.cz | Chrome |
tuto tabulku by Druid agregoval na tuto výslednou tabulku:
timestamp | campaignID | web | browser | Impressions |
---|---|---|---|---|
2015-03-20T18:00:00Z | 36708 | csfd.cz | Chrome | 4 |
2015-03-20T18:00:00Z | 13650 | csfd.cz | Chrome | 2 |
… | … | … | … | … |
2015-03-20T19:00:00Z | 36708 | csfd.cz | Chrome | 2 |
Pokud bychom nastavili queryGranularity
na day
, vypadala by výsledná agregovaná tabulka takto:
timestamp | campaignID | web | browser | Impressions |
---|---|---|---|---|
2015-03-20T00:00:00Z | 36708 | csfd.cz | Chrome | 6 |
2015-03-20T00:00:00Z | 13650 | csfd.cz | Chrome | 2 |
… | … | … | … | … |
Hodnota queryGranularity nám určuje největší možnou granularitu. Nastavíme-li queryGranularity na hodinu, nebudeme už z Druidu schopni vytáhnout data po minutách. Ale můžeme vytáhnout data po dnech, protože to je menší granularita.
Vlastnost segmentGranularity udává velikost jednoho segmentu, což jsme řešili v předchozím článku. V jednom denním segmentu protože můžeme mít například 24 hodinových bloků dat.
Co Druid ukládá
Důležité je, že Druid ukládá pouze agregovaná data. Původní zprávy, ze kterých byla data vybudovaná, neukládá a není možné se k nim přes Druida nějak dostat. Pokud jednou uložíme data s hodinovou granularitou, nedokážeme z Druidu žádným způsobem vytáhnout počet impresí za čas 13:15 až 13:30, budeme moci vytáhnout pouze data za 13:00 až 14:00. Nemůžeme ani říci Druidu, aby přepočítal data z minulého týdne, protože jsme tam našli chybu – Druid surová data nemá a proto nemůže vše znova přepočítat.
Proto se často původní surová data nezahazují, ale ukládají se někam jinam, například do HDFS nebo do Amazon S3. Tím máme zaručeno, že když se něco v Druidu pokazí, že můžeme vzít data z HDFS a poslat je znovu do Druidu. Třeba když zjistíme, že by se nám vlastně hodila minutová granularita. Na přesun dat z Kafky do HDFS používáme Camus.