Parallel Extensions for .NET

Jakub Čermák       28.05.2009       LINQ, Optimalizace, Threading, .NET       12193 zobrazení

V tomto článku budou přiblíženy 2 nové paralelizační koncepty – Task Parallel Library a Parallel LINQ (souhrnně nazývané Parallel Extensions for .NET) obsažené v novém .NETu 4. Jejich účelem je usnadnit programátorovi paralelizaci (hlavně těch běžnějších) úloh.

V tomto článku budou přiblíženy 2 nové paralelizační koncepty – Task Parallel Library a Parallel LINQ (souhrnně nazývané Parallel Extensions for .NET) obsažené v novém .NETu verze 4.

V článku se předpokládají alespoň základní znalosti paralelního programování a LINQu.

Úvod

V dnešní době již téměř (pominu-li úsporné netbooky) nesežene počítač nebo notebook s jednojádrovým CPU. To nám vývojářům dává do ruky ostrou zbraň, se kterou můžeme psát rychlejší a výkonnější aplikace a nebo se jí “pořežeme” a zaneseme si do aplikace spousty nových a nečekaných bugů. Ale o tom tenhle článek není, úkolem tohoto článku je přiblížit nové nástroje, které nám vývojářům Microsoft dává do ruky a které nám mohou pomoci ke snadnějšímu využití dalších CPU jader.

Task Parallel Library (TPL)

TPL je základním stavebním kamenem nových Parallel Extensions. Nachází se v namespace System.Threading a System.Threading.Tasks a obsahuje několik nových tříd, z nichž ty “významnější” za chvíli popíšu.

Třída Parallel

Součástí této statické třídy jsou metody For, ForEach, které jsou (jak název napovídá), paralelní obdobou klasických for a foreach cyklů, a metodu Invoke. Nejdříve popíšu For a ForEach metody. Obě mají společné spousty věcí: každá má jako jeden z parametrů “tělo cyklu” (zde je pěkné použít lambda výrazy, s nimiž lze dosáhnout vizuální podobnosti se standardními cykly), které se bude vykonávat paralelně. Obě fungují tak, že (po počáteční inicializaci) rozdělí všechny iterace (iterací budeme rozumět 1 zavolání těla cyklu s pevně danou iterační proměnnou (to klasické “i”)) do několika skupin a následně si vezmou z thread poolu nějaký počet vláken (ten je defaultně počet jader*2, ale pomocí instance ParallelOptions předané fci For jako parametr to lze změnit) a skupiny iterací rozdělí mezi vlákna. Načež se vlákna pustí čile do práce a pilně vykonávají kód. Vždy se čeká než se dokončí všechny iterace nebo nastane výjimka, do té doby funkce For/ForEach neskončí.

 // nejjednodušší použití Parallel.For
Parallel.For(0, 10, i =>
{
    
// nějaký kód, který se provede paralelně pro i ∈ {0,1,2,3,4,5,6,7,8,9}
});       

For má mnoho přetížených variant, od té nejjednodušší, která je uvedena v příkladu o řádek výše, a která pouze přijímá parametry od (včetně), do (nevčetně) a funkci Int32 nebo Int64 -> void, která reprezentuje tělo cyklu. Tělo cyklu může být ještě reprezentováno funkcemi (Int, ParallelLoopState) -> void, kde ParalelLoopState je třída umožňující break, případně variantou využívající tzv. thread local storage, což je zjednodušeně (generický) objekt, který je společný pro všechny iterace běžící na 1 vlákně, například si tam můžeme ukládat mezivýsledky bez potřeby nějakého zamykání. Tento objekt si před a po skončení iterací můžeme nechat zavolat vlastní inicializační či finalizační funkce, které se provedou v tom vlákně, pro který je daný storage objekt určen. Tělo takového For cyklu pak je pak funkce (Int, ParallelLoopState, TLocal) –> TLocal, kde TLocal je generický typ thread local objektu. Příklad využítí při počítání π pomocí integrálu od 0 do 1 z fce 4 / ( 1 + x^2):

 const int StepCnt=(int)4e8;
double step = 1.0 / StepCnt;
double result = 0;
object resLock = new object();

Parallel.For<double>(0, StepCnt, /* inicializační funkce */() => 0.0, (i, state, thrLocalObj) =>
{
    
// tělo cyklu
    double x = (i + 0.5) * step;
    
return thrLocalObj + 4.0 / (1 + x * x) * step;
}, localResult =>
{
    
// finalizační funkce, která se provede na konci cyklu ∀ vlákna
    lock (resLock)
         result += localResult;
});
Console.WriteLine("Pi = {0}", result);

ForEach se pak chová podobně, akorát že nepočítá od a do b, ale projíždí nějakou kolekci. Následující příklad ukáže, jak jde využít TPL pro stahování více položek najednou z Internetu – v poli Addresses má seznam adres stránek, které chce stáhnout, a aby s nimi aspoň něco dělal, tak vypíše jejich velikost na konzoli.

 object consoleLock = new object();
Parallel.ForEach(Addresses, uri =>
{
    
WebRequest req = WebRequest.Create(uri);
    
WebResponse resp = req.GetResponse();
    
Thread.Sleep(100); // simulujeme pomalejší síť
    int len = 0;
    
using (Stream rs = resp.GetResponseStream())
     {
        
byte[] buffer = new byte[1024];
        
int newread;
        
while ((newread = rs.Read(buffer, 0, 1024)) > 0)
             len += newread;
     }
    
lock (consoleLock)
        
Console.WriteLine("[{2}] {0} : {1}KiB", uri, len / 1024, Thread.CurrentThread.ManagedThreadId);
        
// při psaní konzole je třeba ji zamknout, aby se nám nestalo, že vypisujeme 2 řádky najednou
});

Poslední metoda, Invoke (neplést s Control.Invoke), pak bere pole funkcí typu Action (void->void) a provede s nimi to samé jako For s jednotlivými iteracemi – rozhází funkce do vláken a vlákna spustí.

Ale co se stane, když uprostřed nějaké iterace (či funkce z Invoke) kód vyhodí výjimku? Odpověď je jednoduchá – framework ji odchytí, pokusí se zastavit všechna spuštěná vlákna toho cyklu (což znamená, že není definováno, které iterace proběhnou a které už ne) a vyhodí výjímku AggregateException, která v property InnerException má první vyhozenou výjímku a v InnerExceptions jsou všechny zachycené (neboť než framework ukončí ostatní vlákna, tak můžou ještě nějaké výjimky přibýt).

Task

Třída Task je základní “stavební kámen” TPL knihovny – reprezentuje asynchronně vykonávanou úlohu, což je buď nějaká Action<….> nebo Func<….>. Trošku se tím podobá klasickému vláknu, ale hodí se více na trošku jiné věci (např. se pomocí toho pěkně dají dělat různé počítací a zpracovávací operace atd), takže se pěkně doplňují. Oproti klasickému vlánku má několik podstatných vylepšení (která jsou samozřejmě dodělat i do Threadu, ale tady už to máme předpřipravené:) ) :

  • Může vracet nějakou hodnotu. Win32 vlákno umí vracet DWORD (který by ale spíš měl vyjadřovat úspěch/chybu vlákna než jako výsledek nějakého počítání), .NETí vlákno nic. Kdežto Task existuje ve 2 variantách – bez návratové hodnoty a s ní. 2. varianta tedy bere Func<volitelně_něco, TResult> (tedy void->TResult nebo TParam->TResult) místo Action<…> a návratová hodnota této funkce je pak uložena v property Result. Kouzelné je na tom to, že pokud se podíváte do vlastnosti Result před dokončením té asynchronní fce, tak se volající vlákno pozastaví do doby, než se vypočítá hodnota Result, čehož se dá využít např. pro úlohy typu “za chvíli budu potřebovat spočítané X, tak to pustím v Tasku, a buď se to stihne spočítat nebo ne, a když ne, tak se jen chvilku počká”.
  •  int UltimateNum()
    {
    Thread.Sleep(10000); // déletrvající operace
    return 42;
    }

    // a Task pak vytvořím, spustím a hodnotu z něj použiju:
    Task<int> t = new Task<int>(UltimateNum);
    t.Start();
    Thread.Sleep(1000); //něco kutím
    Console.WriteLine("Value: "+t.Result)
  • Podporuje tzv. Cancellation, což je nový koncept, který umožňuje “nenásilně” rušit úlohy. Zjednodušeně to funguje tak, že v těle Tasku periodicky kontroluji IsCancellationRequested jeho CancellationTokenu a pokud je tato property true, tak v klidu dokončím, co je nutné, zavolám AcknowledgeCancellation, čímž potvrdím zrušení a skončím. Další možnost je, že si zaregistruju svoji metodu, která se zavolá při zrušení daného Tasku. Z pohledu jiného vlákna se to chová tak, že na Task t zavolám t.Cancel(), čímž ho uvedu do stavu CancellationRequested a buď počkám na zrušení nebo jedu dál. Když t pak zavolá AcknowledgeCancellation, tak se tím převede do stavu Cancelled.
  • Je stromově uspořádáný – když v nějakém Tasku t1 vytvořím a spustím jiný Task t2, tak t2 bude mít t1 za otce. Toho se dá různě využít, například to lze zkombinovat s Cancellation tak, že se nastaví, že zrušení otce zruší i jeho synovské/dceřinné Tasky.
  • Různé podpůrné metody např. pro vytváření Tasků z Async volání u I/O operací.
  • A další

System.Collections.Concurrent

Poslední věcí, o které bych se chtěl v souvislosti s TPL zmínit je nový namespace System.Collections.Concurrent, který, jak název napovídá, obsahuje nejrůznější kolekce uzpůsobené pro paralelní přístup. Zatím mají hotový ConcurrentBag (neuspořádaná kolekce), ConcurrentDictonary, ConcurrentQueue a ConcurrentStack. Je pravděpodobné, že ještě nějaké přibudou.

Parallel LINQ (PLINQ)

PLINQ je na první pohled velmi nenápadné rozšíření a k jeho ovládnutí stačí znát jen extension metodu AsParallel(this IEnumerable) (a volitelně With* metody, které slouží k nastavení paralelizace), přesto (nebo možná právě proto) si myslím, že bude užitečnější než celá TPL. Jedna z výhod funkcionálního programování (a tedy i LINQu, neboť ten je jím do značné míry inspirován) je totiž to, že se velmi snadno díky jen částečně definovaném pořadí provádění jednotlivých funkcí a neexistenci měnitelných globálních proměnných paralelizuje. Vysvětlím na konkrétním příkladě: mám List<string> a chci z něj vybrat všechny prvky obsahující slovo liška. Vzhledem k tomu, že zjištění, jestli string obsahuje nebo neobsahuje lišku, je závislé jen a pouze na tom 1 stringu, tak je mi celkem jedno, jestli ono porovnání běží sériově (tj že se porovnává postupně) nebo jestli paralelně (několik stringů se nezávisle na sobě porovnává najednou). Z toho vyplývá snadnost paralelizace linqovských dotazů (z hlediska programátora-uživatele) za dodržení podmínky nezávislosti.

Co tedy ona “magická” AsParallel dělá? Pokud ji zavoláme na nějaký IEnumerable, tak vrátí objekt typu ParallelQuery, který se díky existenci třídy ParallelEnumerable sloužící podobným účelům jako Enumerable pro klasický LINQ (definuje všechny LINQovské extension metody jako Contains, Where a další) - metody třídy ParallelEnumerable jsou paralelizované protějšky svých sester v Enumerable, neboli stejně vypadají, výsledek je obdobný, jen vytíží všechna dostupná jádra. Na příkladě si vysvětlíme použítí: z jednovláknové verze kódu počítajícího π (na stejném principu jako příklad výše):

 var query = from i in Enumerable.Range(0, steps)
                        
let x = (i + 0.5) * step
                        
select 4.0 / (1 + x * x) * step;
var result = query.Sum();

uděláme snadno paralelní:

 var query = from i in Enumerable.Range(0, steps).AsParallel()
                        
let x = (i + 0.5) * step
                        
select 4.0 / (1 + x * x) * step;
var result = query.Sum();

a pokud by byl paralelní až moc, tak ho můžeme zase trošku “zkrouhnout”:

 var query = from i in Enumerable.Range(0, steps).AsParallel().WithDegreeOfParallelism(1)

případně můžeme paralelizaci ovlivnit ještě dalšími metodami podobně se používajícími (viz dokumentace), např. AsOrdered() snažící se zachovat pořadí vyhodnocování na úkor výkonu.

Bohužel LINQ obecně na tom není úplně výkonově nejlépe (dost záleží na typu úlohy), takže ne vždy je používání LINQu to pravé ořechové. Paralelizace samozřejmě výpočetní čas zkracuje (za předpokladu dalších volných jader), je třeba ale pamatovat na ti, že sama paralelizace s sebou nese nějaký overhead, u mě na Core 2 Duo se výpočetní čas zkrátil asi 1.8x. Ale hodně závisí na typu úlohy (a lenosti programátora, s LINQem většinou píšu úlohy rychleji než konvenčně), u některých úloh je zase overhead malý.

Závěr

Budoucnost je zcela jistě v paralelizaci (už se těším na 8jádrové CPU v notebooku:) ), což má za následek vznik různých paralelizačních frameworků, jako třeba OpenMP, MPI (i když to je spíše clusterová záležitost pro náročné výpočty) nebo právě TPL s PLINQem, které umožňují snadněji a přijemněji paralelizovat kód, často jen tím, že se někde něco málo připíše. Já osobně podobné knihovny vítám, neboť mám rád cokoli, co mi ušetří trocha psaní. Jsem zvědavý, jak se ještě framework vyvine, snad už by neměli dělat nějaké větší změny, jako teď, kdy z CTP verze zmizely některé věci, některé se přejmenovaly a trochu změnily (Futures, z kterých je teď Task<TResult>, TaskManager atd) a hodně toho přibylo. Před pár měsíci jsem měl přehledovou přednášku o CTP verzi (neboť v té době bylo do Beta 1 verze daleko), pro zájemce je ke stažení na http://dl.jcermak.cz/clanky/prednaska_pfx_ctp.pdf, je vidět kolik se toho změnilo. Pokud Vás tohle téma zaujalo, tak výborný zdroj informací je blog vývojářského týmu http://blogs.msdn.com/pfxteam/default.aspx, případně vlastní experimenty. Nebo se můžete zeptat na různé věci v diskuzi pod článkem případně projet MSDN dokumentaci, ale ta je zatím dost strohá.

 

hodnocení článku

1 bodů / 1 hlasů       Hodnotit mohou jen registrované uživatelé.

 

 

 

Nový příspěvek

 

Diskuse: Parallel Extensions for .NET

Velmi by mě zajímalo, jak se bude řešit synchronizace vláken při použití paralelního For cyklu. Pochybuji že je to řešeno nějak automaticky. Příklad (předvedeno na normálním cyklu, považujte ho však za paralelní):

int sizeTotal = 0;
foreach (var file in new DirectoryInfo("C:\WINDOWS").GetFiles())
{
  /* Jak řešit synchronizaci proměnné sizeTotal, má-li
  k ní přístup x vláken současně? */
  sizeTotal += file.Length;
}

Pozn.: Samozřejmě takto primitivní sčítání by se dalo řešit atomovým Interlocked.Add, ale představte si tam něco složitějšího, toto je jen pro ilustraci.

nahlásit spamnahlásit spam 1 / 3 odpovědětodpovědět

Automaticky to řešeno není, obecné řešení je lockování, které ale zas je pomalé (v tomto případě). Asi nejjednodušší řešení pro podobné agregační funkce, co mě napadlo, je to, které jsem použil v prvním případě, tj. použít ten thread-local objekt, který je pro každé vlákno jiný, a do něho dávat mezivýsledky. A až v té final funkci to sečíst (nebo jinak zagregovat) dohromady. Výhoda je, že pro přístup k tomu thread-local objektu není třeba nic lockovat, nevýhoda, že to není obecné řešení.

Holt bude muset programátor trochu přemýšlet, jaké zvolí řešení, tak, aby to fungovalo, aby to bylo dostatečně rychlé (netrávilo většinu času lockováním), a aby se programátor co nejmíň nadřel :) Nenapadá mě nic lepšího, bohužel.

nahlásit spamnahlásit spam 1 / 3 odpovědětodpovědět
                       
Nadpis:
Antispam: Komu se občas házejí perly?
Příspěvek bude publikován pod identitou   anonym.

Nyní zakládáte pod článkem nové diskusní vlákno.
Pokud chcete reagovat na jiný příspěvek, klikněte na tlačítko "Odpovědět" u některého diskusního příspěvku.

Nyní odpovídáte na příspěvek pod článkem. Nebo chcete raději založit nové vlákno?

 

  • Administrátoři si vyhrazují právo komentáře upravovat či mazat bez udání důvodu.
    Mazány budou zejména komentáře obsahující vulgarity nebo porušující pravidla publikování.
  • Pokud nejste zaregistrováni, Vaše IP adresa bude zveřejněna. Pokud s tímto nesouhlasíte, příspěvek neodesílejte.

přihlásit pomocí externího účtu

přihlásit pomocí jména a hesla

Uživatel:  
Heslo:  

zapomenuté heslo

 

založit nový uživatelský účet

zaregistrujte se

 
zavřít

Nahlásit spam

Opravdu chcete tento příspěvek nahlásit pro porušování pravidel fóra?

Nahlásit Zrušit

Chyba

zavřít

feedback