Programio

Windowing v Samze

Ukážeme si, jak implementovat úlohy, které jsou nějak závislé na čase. Například průměr hodnot za posledních třicet sekund a podobně. Minule jsme skončili u toho, že máme dva topicy: bidrequests a bidresponses. Další částí skládačky je, že chceme ke každému požadavku najít odpovídající odpověď a spojit tyto dvě zprávy do jedné, abychom ve výsledné databázi měli jeden řádek, ve kterém budeme mít všechna potřebná data – jak z požadavku, tak z odpovědi.

Zprávy budeme párovat podle requestId, odpovídající požadavky a odpovědi ho mají stejné. Ukážeme si několik různých postupů, od nejvíce naivních, po propracované. Náš Samza job bude číst oba topicy, můžeme předpokládat, že máme jen jednu partitionu, pro následující algoritmy to nebude nijak důležité. Samza čte oba topicy stejnou rychlostí. Výstupní zprávě, která obsahuje data jak z požadavku, tak z odpovědi, budeme říkat aukce. Všechny tyto aukce pošleme do Kafka topicu bidauctions.

Prostě to přečti a spoj

První naivní algoritmus vypadá takto:

  1. Máme-li na vstupu požadavek, uložíme si ho do nějaké lokální cache.
  2. Máme-li na vstupu odpověď, nalezneme v cachi odpovídající požadavek, spojíme je a odešleme aukci do Kafky.

To vypadá příliš jednoduše na to, aby to fungovalo. Problém je, že se může stát, že nám odpověď přijde do Samzy dříve než požadavek. My sice máme zaručené pořadí zpráv v topicu (resp. v partitioně), takže požadavek, který jsme odpálili dříve taky dříve přečteme, ale nemáme zaručeno pořadí zpráv napříč dvěma topicy.

Ve hře je přitom příliš mnoho faktorů, které mohou ovlivnit rychlost přenosu zpráv. Třeba se mohlo stát, že nastala nějaká porucha sítě a všechny požadavky v topicu bidrequests se kvůli tomu o pět sekund zpozdily. V takovou chvíli bychom četli z druhého topicu odpovědi, ke kterým jsme ještě nepřečetli požadavky.

Náš algoritmus nefunguje.

Dobře, budeme čekat navzájem

Upravíme náš algoritmus, aby v případě, kdy nenalezne požadavek k odpovědi tuto odpověď uložil:

  1. Máme-li na vstupu požadavek, podíváme se, jestli už jsme nepřečetli odpověď se stejným requestId. Pokud ano, spojíme je a odešleme. Pokud ne, uložíme požadavek do cache.
  2. Totéž pro odpověď. Máme-li odpovídající požadavek, odešleme, jinak odpověď uložíme do cache.

Tím jsme zaručili, že nezáleží na pořadí, v jakém přečteme odpovědi nebo požadavky. Ať už přečteme odpověď dříve než požadavek nebo naopak, náš algoritmus bude fungovat.

Co když ale odpověď nemáme? Nikde není psáno, že pro každý požadavek nám odpověď přijde; občas zkrátka nepřijde, ať už je důvod jakýkoliv. V takovém případě nám bude v paměti viset uložený požadavek, ke kterému nikdy nenajdeme odpověď. Časem náš job selže na nedostatku paměti.

Náš algoritmus nefunguje.

Přidáme expiraci

Chtělo by to nějaký mechanismus, který by nám umožnil odstranit zprávy, které už máme v cachi uložené příliš dlouho. K tomu nám může pomoci Samza a její metoda window. Do configu přidáme

1
task.window.ms=1000

a dále v našem StreamTasku implementujeme rozhraní WindowableTask, tedy přidáme metodu window:

1
2
3
public void window(MessageCollector collector, TaskCoordinator coordinator) {
...
}

Při ukládání každé zprávy do cache si zároveň poznamenáme čas, kdy jsme zprávu přečetli. Samza se nyní postará o to, aby každou sekundu (každých task.window.ms milisekund) zavolala naši metodu window a to ve stejném vlákně, jako se volá metoda process. Výhoda je jasná – nemusíme si dělat starosti s race condition a jinými srandičkami, pokud by se metoda window volala v jiném vlákně.

V metodě window projdeme cache a najdeme ty zprávy, které byly uložené před více než minutou (třeba). Tyto zprávy pak odešleme do Kafky bez svého protějšku.

Samza sice má extra podporu pro generování metrik, ale kdyby neměla, mohli bychom si v metodě window implementovat vlastní metriky – například bychom mohli každou sekundu někam posílat, kolik jsme přečetli zpráv z daného topicu atp.

To už je docela dobré řešení a bude více méně fungovat. Jisté mouchy ale stále ještě má.

Nestíhačka

Přidejme provoz. Dejme tomu, že náš Samza job zvládá číst 2000 zpráv za sekundu. Protože čte stejně rychle z obou topiců, znamená to, že přečte tisíc požadavků a tisíc odpovědí za sekundu. Je to správně? Abychom si lépe nasimulovali co se děje, předpokládejme, že nám chodí poměrně málo odpovědí, například, že pro celých 90 % požadavků nepřijde žádná odpověď.

Pro ilustraci mějme v topicu jeden milion nezpracovaných požadavků a sto tisíc nezpracovaných odpovědí. Žádné další zprávy do topiců nechodí. Protože Samza čte z obou topiců rychlostí 1000 zpráv za sekundu, přečte všechny odpovědi za 100 sekund. Za jak dlouho přečte všechny požadavky? (Běloun, strana 168, cvičení 34)

Za prvních 100 sekund přečetla Samza 100 000 zpráv. Zbylých 900 000 zpráv už bude číst rychlostí 2000 zpráv za sekundu, protože už nečte žádné odpovědi. Přečte je za 450 sekund. Celkem čte požadavky 550 sekund. Jednoduchou animací bychom mohli rychlost čtení znázornit takto:

 
bidrequests
bidresponses

V předchozí kapitole jsme si přitom řekli, že pokud máme v paměti nějakou zprávu uloženou déle než jednu minutu, tak ji zahodíme/pošleme bez zpárované zprávy. Samza job tedy po 160 sekundách (100 sekund načítání + 60 sekund doba expirace) jistě nebude mít v paměti žádnou uloženou odpověď, přitom ještě dalších 390 sekund bude číst požadavky, ke kterým už ale nenalezne žádnou odpověď – všechny jsme už zahodili.

Protože máme odpovědí desetkrát méně než požadavků, měli bychom je také číst desetkrát pomaleji, jinak nebudeme číst odpovídající požadavky a odpovědi v přibližně stejné době. V opačném případě dojde k tomu, že se nám nepodaří zpárovat velkou část požadavků a odpovědí. Zkrátka chceme topicy číst takto:

 
 
bidrequests
bidresponses

Tento problém se typicky projeví jen ve chvíli, kdy Samza nestíhá zpracovávat zprávy z Kafky a zprávy se začnou v Kafce hromadit. Pokud by v Kafce přibylo jen tisíc požadavků a sto odpovědí za sekundu, tak by se tento problém neprojevil. Případně stačí, když je Samza deset minut mimo provoz a po nastartování musí dohánět staré zprávy.

Jaké je řešení? Určitě nezvyšovat dobu expirace, tím se ten problém jen posune. Samza nám poskytuje možnost ovlivnit, z kterého topicu se bude zrovna číst další zpráva: MessageChooser. Díky němu můžeme napsat náš Job tak, abychom například vždy přečetli deset zpráv z topicu bidrequests a pak jednu zprávu z topicu bidresponses.

To ale není dobré řešení, protože když se výrazně změní podíl odpovědí, přestane takový algoritmus fungovat.

My jsme tento problém vyřešili tak, že každá zpráva obsahuje i časové razítka s dobou vzniku a náš MessageChooser si vždy prohlédne nabízené zprávy z obou topiců a vybereme si tu, která je nejstarší. Tím přirozeně docílíme vyrovnaného čtení z obou topiců. Kód v MessageChooserovi se provádí ještě před metodou process, naše logika v této metodě může zůstat nezměněna.

Restart aplikace

Teď už vše musí fungovat, že? Samozřejmě, že ne. Následující obrázek zachycuje, co se stane, když v jeden okamžik restartujeme Samza job. Ta v tu chvíli čte souběžně dva topicy, požadavky a odpovědi.

Request 5 se má párovat s Response 5, pochopitelně. Jenomže po restartu jako první přečteme odpověď 6, přitom požadavek 6 jsme přečetli už před restartem. Odpověď 6 už nikdy nespárujeme s požadavkem 6, protože ho po restartu nikdy nenačteme. Podobný problém bude mít předchozí běh Samzy – před restartem zase přečteme požadavek 7, ale odpověď 7 přečteme až po restartu.

Aby k tomuto problému nedocházelo, je nutné si přenést část lokálního stavu aplikace do dalšího běhu. Jak na si ukážeme příště.