Awaitable Long Running Producer-Consumer
- Panot Thaiuppathum
- Programming , /thcategories/programming Advanced /thcategories/advanced
- 17 Jul, 2023
Normally we use producer-consumer problem to solve certain problems like write buffer, cache buffer or any operation (producer) that needs to entry data into a queue and having another operation (consumer) to consume data from the queue to process by order (or non order).
It could be a single producer and single consumer. single producer and single consumer
Or it could be multiple producers and single consumer. multiple producers and single consumer
The consumer normally be a long operation like an infinite loop with some wait in between together with an IF check for the availability of a data in the queue to process.
The main problem of regular producer-consumer is the producer wouldn’t know if its data entry is successfully processed or any validation failure or error. It is considered as a fire-and-forget method.
while (true)
{
if (_cancellationTokenSource.IsCancellationRequested)
{
break;
}
while (_writeBuffer.TryDequeue(out byte[] bytes))
{
// Process data (bytes in this case)
}
// Take some rest...
await Task.Delay(200);
}
But what if the producer needs to know when a particular message gets processed?
You can think of this problem as a situation when a busy restaurant that having a long queue. A customer is a producer who produces food order as a queue message. While the restaurant as a consumer has to dequeue the order and cook. Then the restaurant wants to notify the customers when their order is ready to pickup. They will have to yell or using the customer pager to notify.
So wondering if we can passing around something as a variable to a function and signal when it is success or fail? We can think of Event and Delegate function but if we want to make it async-await then TaskCompletionSource is the one. The definition of this class in Microsoft document said…
Represents the producer side of a Task
unbound to a delegate, providing access to the consumer side through the Task property.
Another sample of a Producer-Consumer
The block builder, a simple process that does following. Producer: Input string data to the buffer queue. Consumer: Dequeue the buffer and build a very simple data block with hashing similar to the blockchain concept.
using System.Collections.Concurrent;
using System.Text;
namespace AwaitableProducerSample
{
internal class SimpleBlockBuilder : BlockBuilderBase
{
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly ConcurrentQueue<byte[]> _writeBuffer;
public SimpleBlockBuilder()
{
_cancellationTokenSource = new();
_writeBuffer = new();
}
public override void Start()
{
Task.Factory.StartNew(BlockBuilding, _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
}
public override void Stop()
{
_cancellationTokenSource.Cancel();
}
public override void Write(string data)
{
_writeBuffer.Enqueue(Encoding.ASCII.GetBytes(data));
}
private async void BlockBuilding()
{
while (true)
{
if (_cancellationTokenSource.IsCancellationRequested)
{
break;
}
while (_writeBuffer.TryDequeue(out byte[] bytes))
{
if (NumberOfBlocks < 10)
{
byte[] input = ObjectToByteArray(new Block { PreviousHash = HashOutput, Data = bytes });
HashOutput = HashBytes(input);
NumberOfBlocks++;
}
else
{
Console.WriteLine("Exceed maximum supported blocks of 10");
}
}
// Take some rest...
await Task.Delay(200);
}
Console.WriteLine("Stopped block builder...");
}
}
}
SimpleBlockBuilder
To improve above sample to be awaitable producer as described in above section, let’s apply usage of TaskCompletionSource. Now we can have an overload method for the producer to Write with async so it is named WriteAsync in the sample below. Then whether a particular input gets successfully processed or failed by the consumer, the producer knows.
using System.Collections.Concurrent;
using System.Text;
namespace AwaitableProducerSample
{
internal class SimpleBlockBuilder : BlockBuilderBase
{
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly ConcurrentQueue<byte[]> _writeBuffer;
public SimpleBlockBuilder()
{
_cancellationTokenSource = new();
_writeBuffer = new();
}
public override void Start()
{
Task.Factory.StartNew(BlockBuilding, _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
}
public override void Stop()
{
_cancellationTokenSource.Cancel();
}
public override void Write(string data)
{
_writeBuffer.Enqueue(Encoding.ASCII.GetBytes(data));
}
private async void BlockBuilding()
{
while (true)
{
if (_cancellationTokenSource.IsCancellationRequested)
{
break;
}
while (_writeBuffer.TryDequeue(out byte[] bytes))
{
if (NumberOfBlocks < 10)
{
byte[] input = ObjectToByteArray(new Block { PreviousHash = HashOutput, Data = bytes });
HashOutput = HashBytes(input);
NumberOfBlocks++;
}
else
{
Console.WriteLine("Exceed maximum supported blocks of 10");
}
}
// Take some rest...
await Task.Delay(200);
}
Console.WriteLine("Stopped block builder...");
}
}
}
AwaitableBlockBuilder
Extra Bonus In the sample above also has an extra improvement compare to the SimpleBlockBuilder above. It uses BlockingCollection instead of ConcurrentQueue. The definition of the BlockingCollection says.
Provides blocking and bounding capabilities for thread-safe collections that implement
It gives a significant performance improvement for producer-consumer as the BlockingCollection does not require the while(true) loop to iteratively check the availability of the new data in the buffer (you can compare between two samples, the consumer of the AwaitableBlockBuilder has only one loop). Yet the BlockingCollection also thread-safe just like the ConcurrentQueue.
The full runnable source code of the SimpleBlockBuilder and the AwaitableBlockBuilder can be found at https://github.com/panot-hong/AwaitableProducerSample