Play! Iteratees pro realtime streamování dat - 1. část

Iteratee je immutable abstrakcí pro konzumenta dat, který iteruje přes skupiny dat za použití neblokujícího asynchronního přístupu. V tomto článku se podíváme blíže na zoubek konzumentům a producentům dat v Play! frameworku, v druhém navazujícím článku si potom představíme možnosti adaptování a filtrování dat a iteratees si demonstrujeme prakticky na jednoduchém příkladu streamování textu.

Play! framework nabízí ve své druhé verzi podporu pro snadné streamování, upload a download dat v dynamických webových aplikacích. Data mohou být do prohlížeče přenášena přes různé technologie, včetně Cometu a WebSockets. S různými datovými zdroji lze jednotně manipulovat pomocí stejného API pro produkování a konzumování dat.

Programovací model pro práci se streamy

Play! nabízí programový model pro práci se streamy: Vytváření, adaptování, manipulaci, filtrování a slučování streamů dat a související synchronizaci. Takto bohaté API usnadňuje programování realtime webových aplikací. Surové streamy bajtů (InputStream/OutputStream) takové možnosti a pohodlné zpracování dat jako iteratees v Play! frameworku nenabízí.

Play! 2 používá pro práci se streamy Iteratees a Futures a poskytuje velmi bohatý programovací model pro realtime webové aplikace, pomocí kterého můžete streamovat na vyšší úrovni přímo své vlastní strukturované objekty.

Iteratees – konzumenti dat

Iteratees použijeme, pokud chceme na serveru konzumovat nějaký proud dat (klidně naše vlastní specifické objekty).

Iteratee[E, A] je immutable rozhraní, které reprezentuje konzumenta dat, jenž eventuálně produkuje vypočítaný výsledek typu A (například celkový počet zkonzumovaných položek). Konzumuje se vždy skupina položek typu E (buffer dat). Iteratee se může rozhodnout přestat konzumovat data dříve než dorazí speciální EOF signál z konzumovaného streamu, nebo si může na EOF počkat. Iteratees mohou být skládané dohromady, a logika konzumování dat tak může být rozdělena do více samostatných částí.

Iteratee se může nacházet v jednom ze tří možných stavů:

  • Cont (akceptuje další vstup),
  • Error (chybový stav),
  • Done (ukončení konzumování, je dostupný vypočítaný výsledek).

Na Iteratee lze volat metodu fold, které se předávají funkce/callbacky umožňující reagovat na přechod Iteratee do jednoho ze zmíněných stavů (vnitřní implementace zavolá jednu z předaných funkcí cont, done nebo error podle aktuálních vstupních dat).

Skupina konzumovaných dat je reprezentována jako Input[E], které má potomky:

  • Input.El[E] – konkrétní data,
  • Input.Empty (prázdná data),
  • Input.EOF (konec streamu).

Při vytváření vlastních Iteratee můžeme použít jednoduché předdefinované Iteratees: Done, Cont, Error, která jsou vždy v odpovídajících stejnojmenných stavech. Složitější Iteratee můžeme implementovat voláním metody Iteratee.fold, kterou lze snadno vytvořit Iteratee aktualizující nějaký stav na základě příchozích dat (fold vytváří výsledný stav na základě nějakého iniciálního stavu a obecné logiky přechodu ze vstupního stavu do dalšího stavu na základě příchozích dat). Např.:

Pomocí dalších metod mohou být příchozí skupiny dat zpracovávány také asynchronně. Iteratee.foreach vytvoří iteratee, které vždy při příchodu dat zavolá předaný callback (vhodné pro imperativní styl zpracování vstupních dat).

Enumerators – producenti dat

Enumerator je zdroj dat, který předává data konzumentům (iteratees). Enumerator[E] reprezentuje stream, který produkuje data typu E. Enumerátory mohou být skládány za sebe (pokud jeden stream skončí s produkcí dat, další data začne produkovat druhý, ...), nebo se data produkovaná enumerátory mohou různě prolínat (lze implementovat vlastní management streamů).

Trait Enumerator[E]:

napovídá, že jej lze aplikovat na Iteratee – voláním metody apply (kulatých závorek) na enumerátoru: enumerator(iteratee), alternativně pomocí metody/aliasu |>>. Aplikování enumerátoru na Iteratee předává Iteratee postupně skupiny vstupních dat (Input[E]), které Iteratee konzumuje, a případně vrátí další stav tohoto Iteratee (další reprezentaci Iteratee ve stavu cont, done nebo error).

Interní implementace enumerátoru, tj. metody apply traitu Enumerator, by volala sekvenčně metodu fold na daném Iteratee (a plnila konzumenta jednotlivými skupinami dat), ale existují různé předpřipravené factory metody pro snazší vytvoření instance Enumerátoru:

  • Enumerátor s přímo specifikovanými produkovanými daty: Enumeratee("one", "two", "three").
  • Enumerátor, který produkuje data pomocí zadané Future: Enumerator.generateM(=> scala.concurrent.Future[Option[E]]) (jednou z implementací Future je např. Promise.timeout produkující data v daných časových prodlevách). Obdobou je ještě obecnější metoda Enumerator.fromCallback.
  • Enumerátor, který produkuje pole bytů ze souboru nebo ze streamu: Enumerator.fromFile(File), Enumerator.fromStream(InputStream).
  • Enumerátor, který produkuje data pomocí zadané funkce, jejímž úkolem je naplnit data do objektu Pushee[E] za pomoci metod push a close: Enumerator.pushee.
  • aj.

Enumerátor začne číst data, která produkuje a předává iteratee, teprve tehdy, když je iteratee, na který je aplikován, připraveno data číst. Příklad vytvoření enumerátoru a jeho aplikování na iteratee, které zkonzumuje výsledky a vrátí svůj výstup:

Díky tomu, že enumerátor vrací po plnění konzumenta metodou apply na výstupu výsledné Iteratee (konzumenta v jeho aktuálním posledním stavu), jde na vrácené Iteratee aplikovat další enumerátor (a provést další plnění daty), tj. aplikování enumerátorů na iteratee lze řetězit. Instance enumerátorů lze zkombinovat do jednoho složeného enumerátoru, který lze pak aplikovat na  iteratee jako jeden celek. Skládání enumerátorů se provádí pomocí metody andThen (nebo symbolickým zápisem/aliasem >>>), nebo interleave (>-).

Enumerátory lze na serveru řetězit dynamicky podle požadavků klienta – browseru. Užitečným enumerátorem je enumerátor produkující strukturovaná data typu play.api.libs.json.JsValue (v JSON formátu) snadno zpracovatelné přímo JavaScriptem v prohlížeči (např. při přijímání dat ze serveru přes AJAX a Comet).

Play! Iteratees pro realtime streamování dat - 2. část

Článek obsahuje 0 komentářů