Programio

Jak Druid.io agreguje data

Z minulého dílu víme, že Druid typicky čte zprávy z nějakého messaging systému, tvoří segmenty a tyto segmenty poté proplouvají různými částmi systému. Dnes se podíváme na to, jak vypadají data v jednotlivých segmentech.

Agregujeme

Druid je sloupcová databáze, pro ilustraci si můžeme zpracovaná data zobrazit jako tabulku. Pokud bychom Druidu poslali tyto dvě zprávy

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"timestamp": "2015-03-17T20:40:00Z",
"campaignId": "336",
"web": "csfd.cz",
"browser": "Chrome"
}


{
"timestamp": "2015-03-17T20:40:02Z",
"campaignId": "1344",
"web": "idnes.cz",
"browser": "Chrome"
}

v Druidu by se poskládala tato tabulka:

timestamp campaignID web browser
2015-03-17T20:40:00Z 336 csfd.cz Chrome
2015-03-17T20:40:02Z 1344 idnes.cz Chrome

Dosud nic světoborného. Zázraky by se začaly dít, až by Druid přečetl například tuto zprávu:

1
2
3
4
5
6
{
"timestamp": "2015-03-17T20:40:27Z",
"campaignId": "336",
"web": "csfd.cz",
"browser": "Chrome"
}

…protože ta se od úplně první zprávy liší pouze v čase (20:40:00 vs 20:40:27), ostatní vlastnosti jsou stejné. Jsou to dvě stejné události, které pouze nastaly v jiný čas. Druid by samozřejmě mohl tuto zprávu uložit jako třetí řádek do tabulky

timestamp campaignID web browser
2015-03-17T20:40:00Z 336 csfd.cz Chrome
2015-03-17T20:40:02Z 1344 idnes.cz Chrome
2015-03-17T20:40:27Z 336 csfd.cz Chrome

…ale efektivnější bude, když provede agregaci a jen si u prvního řádku uloží, že tato událost nastala dvakrát:

timestamp campaignID web browser impressions
2015-03-17T20:00:00 336 csfd.cz Chrome 2
2015-03-17T20:00:00 1344 idnes.cz Chrome 1

Tyto tabulky reprezentují stejná data, u druhé jsme pouze ztratili informaci o tom, kdy která událost přesně nastala. Všimněte si, že ve druhé tabulce už máme v timestampu pouze hodinu, bez přesného času. První řádek tabulky říká, že v čase 20:00:00 až 20:59:59 nastaly pro kampaň 336 na ČSFD dvě imprese.

Druid při agregaci postupuje tak, že porovnává hodnoty ve všech sloupcích a pokud se zprávy liší pouze v čase, spojí tyto zprávy do jedné a za každou zprávu inkrementuje sloupec impressions. Název nově vzniklého sloupce specifikujeme ve specFilu:

1
2
3
4
5
6
"aggregators": [
{
"type": "count",
"name": "impressions"
}
]

Tímto vytvoříme agregátor count, který se bude ukládat do sloupečku impressions. Druid zná tři druhy sloupců:

  • čas – reprezentuje, pro kterou hodinu jsou data platná. V příkladu jde o sloupec “timestamp”.
  • dimenze – to jsou naše „syrová“ data, jsou to sloupce campaignID, web a browser.
  • metrika – vypočtené hodnoty, sloupec impressions.

Další agregátory

Druid má podporu pro několik dalších agregátorů. Mohli bychom například přidat informaci o ceně za impresi; tabulka před agregací by vypadala takto:

timestamp campaignID web browser price
2015-03-17T20:40:00Z 336 csfd.cz Chrome 0.17
2015-03-17T20:40:02Z 1344 idnes.cz Chrome 0.19
2015-03-17T20:40:27Z 336 csfd.cz Chrome 0.43

Podle toho, co dosud víme, by se žádné řádky nespojily v jeden, protože všechny řádky se liší přinejmenším v ceně. To je ale docela nešikovné. Proto můžeme přidat doubleSum agregátor, který zařídí, aby se během agregace hodnoty ve sloupcích sečetly:

1
2
3
4
5
{
"type": "doubleSum",
"name": "price",
"fieldName": "price"
}

Vlastnost fieldName říká, z kterého sloupečku má hodnoty číst a vlastnost name do jakého sloupečku má Druid agregovaná data ukládat. Protože máme oba názvy stejné, budou se data ve sloupečku price přepisovat. Nyní už může dojít k agregaci. Druid ví, že má porovnávat pouze hodnoty ve sloupcích campaignId, web a browser, zatímco sloupec price má vždy jen sečíst. Proto spojí první a třetí řádek do jednoho:

timestamp campaignID web browser impressions price
2015-03-17T20:00:00 336 csfd.cz Chrome 2 0.6
2015-03-17T20:00:00 1344 idnes.cz Chrome 1 0.19

Nyní už ve sloupci price nemáme cenu jedné imprese, ale součet cen všech impresí v daném časovém úseku, pod danou kampaní, na daném webu a s daným prohlížečem. Z takové agregované tabulky už bychom samozřejmě nikdy nezjistili, jaká byla cena za jednu impresi.

Unikátní uživatelé

Co když do zprávy přidáme i informaci o tom, kterému uživateli se banner zobrazil? Data před agregací:

timestamp campaignID web browser userId
2015-03-17T20:40:00Z 336 csfd.cz Chrome 40BBF8
2015-03-17T20:40:27Z 336 csfd.cz Chrome 9896AA

Události nastaly ve stejnou hodinu, ale liší se v hodnotě userId. Druid by proto tyto dvě zprávy uložil do dvou řádků. To je ale dost prekérní situace, protože nyní agregace proběhne jen tehdy, týkají-li se dva řádky stejného uživatele, což nemusí nastat moc často.

V tuto chvíli se musíme zamyslet nad tím, na co data ve skutečnosti potřebujeme. Jde-li nám jen o statistické dotazy typu „kolik unikátních uživatelů vidělo reklamu XYZ na webu ZXY“, můžeme si pomoci tím, že použijeme pravděpodobností algoritmy. Ty nám z dostupných dat nespočítají přesný počet unikátních uživatelů, ale odhadnou počet unikátních uživatelů s nějakou chybou (typicky pod dvě procenta). To je většinou přijatelné. Výhodou pravděpodobnostních algoritmů pak je, že mají nižší paměťové nároky a jsou rychlejší.

Druid zná HyperLogLog, což je v současné době nejlepší algoritmus pro odhadování unikátních hodnot v multimnožině. Přidáme do našeho specFilu nový agregátor hyperUnique:

1
2
3
4
5
{
"type" : "hyperUnique",
"name" : "uniqueUsers",
"fieldName" : "userId"
}

Abyste plně pochopili, co se teď bude dít, je nutné vědět jak funguje HyperLogLog a hlavně jak funguje sjednocení hyperloglogu. Stručně řečeno, HyperLogLog je algoritmus, který převádí hodnoty ve sloupci na vektory, například na [0, 0, 3, 0], z nichž lze později odhadnout počet unikátních hodnot v daném sloupci.

Velkou výhodou je, že máme-li dva vektory [0, 3, 0, 0] a [0, 2, 0, 0], které reprezentují dvě různé hodnoty A a B, můžeme reprezentovat množinu {A, B} tak, že vektory „sjednotíme“ (vypočítáme maximum po složkách):

1
[0, 3, 0, 0]
[0, 2, 0, 0]
------------
[0, 3, 0, 0]

Druid při agregaci vypočte HLL vektor a ten uloží do tabulky do sloupce uniqueUsers a hodnotu userId zahodí. Před finální agregací bude tabulka vypadat takto:

timestamp campaignID web browser impressions uniqueUsers
2015-03-17T20:00:00 336 csfd.cz Chrome 1 [0, 3, 0, 0]
2015-03-17T20:00:00 336 csfd.cz Chrome 1 [0, 2, 0, 0]

Protože HLL vektory umíme sjednotit, můžeme tyto dva řádky spojit do jednoho. Výsledná agregovaná tabulka by vypadala takto:

timestamp campaignID web browser impressions uniqueUsers
2015-03-17T20:00:00 336 csfd.cz Chrome 2 [0, 3, 0, 0]

Z těchto dat umíme vyčíst přesný počet impresí pro danou kampaň, daný web a daný prohlížeč za daný čas. Z daných dat také umíme získat přibližný počet unikátních uživatelů, kterým se tento banner zobrazil. Důležité je, že i když poroste počet zobrazení daného banneru, stále nám pro uložení stačí jeden řádek. Pokud by kampaň vyprodukovala 15 874 impresí u 6 098 unikátních uživatelů, mohl by řádek vypadat nějak takto:

timestamp campaignID web browser impressions uniqueUsers
2015-03-17T20:00:00 336 csfd.cz Chrome 15 874 [15, 17, 10, 14]

Počet impresí máme v tabulce přesně, počet unikátů by nám Druid odhadl z vektoru [15, 17, 10, 14]. Přitom nikde nemáme uložená IDéčka uživatelů. Nepotřebujeme je.

Sloupec uniqueUsers představuje další metriku v naší tabulce.

Časová granularita

Dosud jsme stále pracovali s tím, že se data agregují v rámci jedné hodiny. Jako většina věcí, i toto je v Druidu nastavitelné:

"granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "day",
    "queryGranularity" : "hour"
}

Teď se budeme zabývat vlastností queryGranularity, která nám udává, za jaký časový úsek se budou agregovat zprávy. Nastavíme-li hodinovou queryGranularity, pak se nám zprávy budou agregovat vždy v rámci jedné hodiny (14:00 až 14:59:59…), tzn., že za jeden den budeme mít pro každou kombinaci dat 24 různých řádků pro 24 hodin. Jeden příklad lepší než tisíc slov (A pro jednoduchost vynecháme sloupec userId, protože Hyperloglogu stejně nikdo nerozumíte):

timestamp campaignID web browser
2015-03-20T18:50:00Z 36708 csfd.cz Chrome
2015-03-20T18:51:09Z 36708 csfd.cz Chrome
2015-03-20T18:51:12Z 36708 csfd.cz Chrome
2015-03-20T18:51:35Z 36708 csfd.cz Chrome
2015-03-20T18:51:35Z 13650 csfd.cz Chrome
2015-03-20T18:51:35Z 13650 csfd.cz Chrome
2015-03-20T19:14:27Z 36708 csfd.cz Chrome
2015-03-20T19:15:38Z 36708 csfd.cz Chrome
2015-03-20T19:16:01Z 36708 csfd.cz Chrome

tuto tabulku by Druid agregoval na tuto výslednou tabulku:

timestamp campaignID web browser Impressions
2015-03-20T18:00:00Z 36708 csfd.cz Chrome 4
2015-03-20T18:00:00Z 13650 csfd.cz Chrome 2
2015-03-20T19:00:00Z 36708 csfd.cz Chrome 2

Pokud bychom nastavili queryGranularity na day, vypadala by výsledná agregovaná tabulka takto:

timestamp campaignID web browser Impressions
2015-03-20T00:00:00Z 36708 csfd.cz Chrome 6
2015-03-20T00:00:00Z 13650 csfd.cz Chrome 2

Hodnota queryGranularity nám určuje největší možnou granularitu. Nastavíme-li queryGranularity na hodinu, nebudeme už z Druidu schopni vytáhnout data po minutách. Ale můžeme vytáhnout data po dnech, protože to je menší granularita.

Vlastnost segmentGranularity udává velikost jednoho segmentu, což jsme řešili v předchozím článku. V jednom denním segmentu protože můžeme mít například 24 hodinových bloků dat.

Co Druid ukládá

Důležité je, že Druid ukládá pouze agregovaná data. Původní zprávy, ze kterých byla data vybudovaná, neukládá a není možné se k nim přes Druida nějak dostat. Pokud jednou uložíme data s hodinovou granularitou, nedokážeme z Druidu žádným způsobem vytáhnout počet impresí za čas 13:15 až 13:30, budeme moci vytáhnout pouze data za 13:00 až 14:00. Nemůžeme ani říci Druidu, aby přepočítal data z minulého týdne, protože jsme tam našli chybu – Druid surová data nemá a proto nemůže vše znova přepočítat.

Proto se často původní surová data nezahazují, ale ukládají se někam jinam, například do HDFS nebo do Amazon S3. Tím máme zaručeno, že když se něco v Druidu pokazí, že můžeme vzít data z HDFS a poslat je znovu do Druidu. Třeba když zjistíme, že by se nám vlastně hodila minutová granularita. Na přesun dat z Kafky do HDFS používáme Camus.