multithreading - C# Parallel Foreach + Async -


i'm processing list of items (200k - 300k), each item processing time between 2 8 seconds. gain time, can process list in parallel. i'm in async context, use :

public async task<list<keyword>> doword(list<string> keyword) {     concurrentbag<keyword> keywordresults = new concurrentbag<keyword>();     if (keyword.count > 0)     {         try         {             var tasks = keyword.select(async kw =>             {                 return await work(kw).configureawait(false);             });              keywordresults = new concurrentbag<keyword>(await task.whenall(tasks).configureawait(false));         }         catch (aggregateexception ae)         {             foreach (exception innerex in ae.innerexceptions)             {                 log.errorformat("core threads exception: {0}", innerex);             }         }     }     return keywordresults.tolist(); } 

the keyword list contains 8 elements (comming above) process list 8 8 but, in case, guess if 7 keywords processed in 3 secs , 8th processed in 10 secs, total time 8 keywords 10 (correct me if i'm wrong). how can approach parallel.foreach then? mean : launch 8 keywords if 1 of them done, launch 1 more. in case i'll have 8 working processes permanently. idea ?

here's sample code showing how approach using tpl dataflow.

note in order compile this, need add tpl dataflow project via nuget.

using system; using system.collections.generic; using system.linq; using system.threading.tasks; using system.threading.tasks.dataflow;  namespace demo {     class keyword // dummy test class.     {         public string name;     }      class program     {         static void main()         {             // dummy test data.             var keywords = enumerable.range(1, 100).select(n => n.tostring()).tolist();              var result = dowork(keywords).result;              console.writeline("---------------------------------");              foreach (var item in result)                 console.writeline(item.name);         }          public static async task<list<keyword>> dowork(list<string> keywords)         {             var input = new transformblock<string, keyword>             (                 async s => await work(s),                 // specify max number of threads use.                 new executiondataflowblockoptions { maxdegreeofparallelism = 8 }             );              var result = new list<keyword>();              var output = new actionblock<keyword>             (                 item => result.add(item),   // output 1 item @ time, because 'result.add()' not threadsafe.                 new executiondataflowblockoptions { maxdegreeofparallelism = 1 }             );              input.linkto(output, new dataflowlinkoptions { propagatecompletion = true });              foreach (string s in keywords)                 await input.sendasync(s);              input.complete();             await output.completion;              return result;         }          public static async task<keyword> work(string s) // stubbed test method.         {             console.writeline("processing " + s);              int delay;             lock (rng) { delay = rng.next(10, 1000); }             await task.delay(delay); // simulate load.              console.writeline("completed " + s);             return await task.run( () => new keyword { name = s });         }          static random rng = new random();     } } 

Comments

Popular posts from this blog

Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.12:test (default-test) on project.Error occurred in starting fork -

windows - Debug iNetMgr.exe unhandle exception System.Management.Automation.CmdletInvocationException -

configurationsection - activeMq-5.13.3 setup configurations for wildfly 10.0.0 -