Awaitable Long Running Producer-Consumer

Awaitable Long Running Producer-Consumer

Awaitable Long Running Producer-Consumer

Normally we use producer-consumer problem to solve certain problems like write buffer, cache buffer or any operation (producer) that needs to entry data into a queue and having another operation (consumer) to consume data from the queue to process by order (or non order).

It could be a single producer and single consumer. Single producer to sigle consumer single producer and single consumer

Or it could be multiple producers and single consumer. Multiple producers to sigle consumer multiple producers and single consumer

The consumer normally be a long operation like an infinite loop with some wait in between together with an IF check for the availability of a data in the queue to process.

The main problem of regular producer-consumer is the producer wouldn’t know if its data entry is successfully processed or any validation failure or error. It is considered as a fire-and-forget method.

while (true)
{
  if (_cancellationTokenSource.IsCancellationRequested)
  {
      break;
  }

  while (_writeBuffer.TryDequeue(out byte[] bytes))
  {
      // Process data (bytes in this case)
  }

  // Take some rest...
  await Task.Delay(200);
}

But what if the producer needs to know when a particular message gets processed?

You can think of this problem as a situation when a busy restaurant that having a long queue. A customer is a producer who produces food order as a queue message. While the restaurant as a consumer has to dequeue the order and cook. Then the restaurant wants to notify the customers when their order is ready to pickup. They will have to yell or using the customer pager to notify.

Slot ordering machine

So wondering if we can passing around something as a variable to a function and signal when it is success or fail? We can think of Event and Delegate function but if we want to make it async-await then TaskCompletionSource is the one. The definition of this class in Microsoft document said…

Represents the producer side of a Task unbound to a delegate, providing access to the consumer side through the Task property.

Another sample of a Producer-Consumer

The block builder, a simple process that does following. Producer: Input string data to the buffer queue. Consumer: Dequeue the buffer and build a very simple data block with hashing similar to the blockchain concept.

using System.Collections.Concurrent;
using System.Text;

namespace AwaitableProducerSample
{
    internal class SimpleBlockBuilder : BlockBuilderBase
    {
        private readonly CancellationTokenSource _cancellationTokenSource;
        private readonly ConcurrentQueue<byte[]> _writeBuffer;

        public SimpleBlockBuilder()
        {
            _cancellationTokenSource = new();
            _writeBuffer = new();
        }

        public override void Start()
        {
            Task.Factory.StartNew(BlockBuilding, _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
        }

        public override void Stop()
        {
            _cancellationTokenSource.Cancel();
        }

        public override void Write(string data)
        {
            _writeBuffer.Enqueue(Encoding.ASCII.GetBytes(data));
        }

        private async void BlockBuilding()
        {
            while (true)
            {
                if (_cancellationTokenSource.IsCancellationRequested)
                {
                    break;
                }

                while (_writeBuffer.TryDequeue(out byte[] bytes))
                {
                    if (NumberOfBlocks < 10)
                    {
                        byte[] input = ObjectToByteArray(new Block { PreviousHash = HashOutput, Data = bytes });
                        HashOutput = HashBytes(input);
                        NumberOfBlocks++;
                    }
                    else
                    {
                        Console.WriteLine("Exceed maximum supported blocks of 10");
                    }
                }

                // Take some rest...
                await Task.Delay(200);
            }

            Console.WriteLine("Stopped block builder...");
        }
    }
}

SimpleBlockBuilder

To improve above sample to be awaitable producer as described in above section, let’s apply usage of TaskCompletionSource. Now we can have an overload method for the producer to Write with async so it is named WriteAsync in the sample below. Then whether a particular input gets successfully processed or failed by the consumer, the producer knows.

using System.Collections.Concurrent;
using System.Text;

namespace AwaitableProducerSample
{
    internal class SimpleBlockBuilder : BlockBuilderBase
    {
        private readonly CancellationTokenSource _cancellationTokenSource;
        private readonly ConcurrentQueue<byte[]> _writeBuffer;

        public SimpleBlockBuilder()
        {
            _cancellationTokenSource = new();
            _writeBuffer = new();
        }

        public override void Start()
        {
            Task.Factory.StartNew(BlockBuilding, _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
        }

        public override void Stop()
        {
            _cancellationTokenSource.Cancel();
        }

        public override void Write(string data)
        {
            _writeBuffer.Enqueue(Encoding.ASCII.GetBytes(data));
        }

        private async void BlockBuilding()
        {
            while (true)
            {
                if (_cancellationTokenSource.IsCancellationRequested)
                {
                    break;
                }

                while (_writeBuffer.TryDequeue(out byte[] bytes))
                {
                    if (NumberOfBlocks < 10)
                    {
                        byte[] input = ObjectToByteArray(new Block { PreviousHash = HashOutput, Data = bytes });
                        HashOutput = HashBytes(input);
                        NumberOfBlocks++;
                    }
                    else
                    {
                        Console.WriteLine("Exceed maximum supported blocks of 10");
                    }
                }

                // Take some rest...
                await Task.Delay(200);
            }

            Console.WriteLine("Stopped block builder...");
        }
    }
}

AwaitableBlockBuilder


Extra Bonus In the sample above also has an extra improvement compare to the SimpleBlockBuilder above. It uses BlockingCollection instead of ConcurrentQueue. The definition of the BlockingCollection says.

Provides blocking and bounding capabilities for thread-safe collections that implement

It gives a significant performance improvement for producer-consumer as the BlockingCollection does not require the while(true) loop to iteratively check the availability of the new data in the buffer (you can compare between two samples, the consumer of the AwaitableBlockBuilder has only one loop). Yet the BlockingCollection also thread-safe just like the ConcurrentQueue.


The full runnable source code of the SimpleBlockBuilder and the AwaitableBlockBuilder can be found at https://github.com/panot-hong/AwaitableProducerSample

Related Posts

Let's create our own Crypto coin, easy and in just a few minutes (no coding knowledge).

Let's create our own Crypto coin, easy and in just a few minutes (no coding knowledge).

Hello everyone who stumbled upon and came to read. I've been away from writing a blog for a while. Caught up with work, trying this and that, blah blah blah. But now it's time to come back and write

read more
Customize the website to display using Tampermonkey

Customize the website to display using Tampermonkey

Many people may feel dissatisfied with certain websites when they browse them, for example:* Disliking intrusive banner advertisements that strain the eyes. * Wishing a website had specific feature

read more
Pass Through Data Over IServiceProvider.CreateScope()

Pass Through Data Over IServiceProvider.CreateScope()

[ASP.NET] In some cases you may encounter the situation that you need to pass through some particular data over a new scope of Service Provider.For instance, when you implement a solution that inte

read more
Write Unit Tests for React Hooks using react-hooks-testing-library

Write Unit Tests for React Hooks using react-hooks-testing-library

Hooks in React are a feature that has drastically changed the way we write React. It's like undergoing plastic surgery in Korea, where some developers love the new look, while others prefer the old o

read more
Scan code with Credential Scanner on Azure DevOps

Scan code with Credential Scanner on Azure DevOps

🥳 Happy New Year 2023! 🥳Wishing everyone a great year ahead!Now, let's get down to business. Today, I'm going to introduce you to a handy tool that checks for leaked passwords, secrets, certifi

read more
Convert interface to enum (for those too lazy to type 'name' in the input form) in TypeScript

Convert interface to enum (for those too lazy to type 'name' in the input form) in TypeScript

![Convert interface to enum cover](/images/posts/transform-interface-as-enum-typescript/transform_interface-as-enum-cover.png)It's a small trick to convert an Interface to an Enum, helping to solve

read more
Utilize WebAssembly in .NET

Utilize WebAssembly in .NET

We heard the WebAssembly quite a while ago but the use case, especially for .NET developers, was still limited. As of the time writing this post, in the last quarter of 2022, there are many new thing

read more