multithreading - C# Parallel Foreach + Async -
i'm processing list of items (200k - 300k), each item processing time between 2 8 seconds. gain time, can process list in parallel. i'm in async context, use :
public async task<list<keyword>> doword(list<string> keyword) { concurrentbag<keyword> keywordresults = new concurrentbag<keyword>(); if (keyword.count > 0) { try { var tasks = keyword.select(async kw => { return await work(kw).configureawait(false); }); keywordresults = new concurrentbag<keyword>(await task.whenall(tasks).configureawait(false)); } catch (aggregateexception ae) { foreach (exception innerex in ae.innerexceptions) { log.errorformat("core threads exception: {0}", innerex); } } } return keywordresults.tolist(); }
the keyword list contains 8 elements (comming above) process list 8 8 but, in case, guess if 7 keywords processed in 3 secs , 8th processed in 10 secs, total time 8 keywords 10 (correct me if i'm wrong). how can approach parallel.foreach
then? mean : launch 8 keywords if 1 of them done, launch 1 more. in case i'll have 8 working processes permanently. idea ?
here's sample code showing how approach using tpl dataflow
.
note in order compile this, need add tpl dataflow project via nuget.
using system; using system.collections.generic; using system.linq; using system.threading.tasks; using system.threading.tasks.dataflow; namespace demo { class keyword // dummy test class. { public string name; } class program { static void main() { // dummy test data. var keywords = enumerable.range(1, 100).select(n => n.tostring()).tolist(); var result = dowork(keywords).result; console.writeline("---------------------------------"); foreach (var item in result) console.writeline(item.name); } public static async task<list<keyword>> dowork(list<string> keywords) { var input = new transformblock<string, keyword> ( async s => await work(s), // specify max number of threads use. new executiondataflowblockoptions { maxdegreeofparallelism = 8 } ); var result = new list<keyword>(); var output = new actionblock<keyword> ( item => result.add(item), // output 1 item @ time, because 'result.add()' not threadsafe. new executiondataflowblockoptions { maxdegreeofparallelism = 1 } ); input.linkto(output, new dataflowlinkoptions { propagatecompletion = true }); foreach (string s in keywords) await input.sendasync(s); input.complete(); await output.completion; return result; } public static async task<keyword> work(string s) // stubbed test method. { console.writeline("processing " + s); int delay; lock (rng) { delay = rng.next(10, 1000); } await task.delay(delay); // simulate load. console.writeline("completed " + s); return await task.run( () => new keyword { name = s }); } static random rng = new random(); } }
Comments
Post a Comment