Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

How to Zip concurrently two IAsyncEnumerables?

I have two asynchronous sequences that I want to "zip" in pairs, and for this purpose I used the Zip operator from the System.Linq.Async package. This operator behaves in an undesirable way though, at least for my case. Instead of enumerating the two sequences concurrently, it enumerates them sequentially, with the result being that the latency is added up. Each of my sequences emits an element every one second on average, and I expected that the combined sequence would also emit zipped pairs every one second, but in reality I get one pair every 2 seconds. Below is a minimal example that demonstrates this behavior:

static async IAsyncEnumerable<int> First()
{
    for (int i = 1; i <= 5; i++) { await Task.Delay(1000); yield return i; }
}

static async IAsyncEnumerable<int> Second()
{
    for (int i = 1; i <= 5; i++) { await Task.Delay(1000); yield return i; }
}

var stopwatch = Stopwatch.StartNew();
await foreach (var pair in First().Zip(Second()))
    Console.WriteLine(pair);
Console.WriteLine($"Duration: {stopwatch.ElapsedMilliseconds:#,0} msec");

Output:

(1, 1)
(2, 2)
(3, 3)
(4, 4)
(5, 5)
Duration: 10,155 msec

Try it on Fiddle.

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

Is there any way that I can Zip these two sequences in a way that the program completes in 5 seconds instead of 10? I am interested about a custom operator, or about a combination of operators from the official packages, that has the desirable behavior.

>Solution :

Something like this appears to work:

public static async IAsyncEnumerable<(TFirst, TSecond)> Zip<TFirst, TSecond>(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second)
{
    await using var e1 = first.GetAsyncEnumerator();    
    await using var e2 = second.GetAsyncEnumerator();
    
    while (true)
    {
        var t1 = e1.MoveNextAsync();
        var t2 = e2.MoveNextAsync();
        await Task.WhenAll(new[] { t1.AsTask(), t2.AsTask() });
        
        if (!t1.Result || !t2.Result)
            yield break;
        
        yield return (e1.Current, e2.Current);
    }
}

See it on dotnetfiddle.net.

Of course, this misses things like null checks, so could do with some improvements: that’s left as an excercise for the reader.

I’m also not convinced that the Task.WhenAll is any better than bool r1 = await t1; bool r2 = await t2; if (!r1 || !r2) yield break; here.

Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading