From 5c75e79abc3cf377da4dbbe369a156ae9c3b890a Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Tue, 3 Jan 2023 14:50:05 -0500 Subject: [PATCH] boundedcapacity change --- src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index 46013bb..39230ad 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -39,16 +39,17 @@ namespace BirdsiteLive.Pipeline public async Task ExecuteAsync(CancellationToken ct) { + var standardBlockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }; // Create blocks var twitterUserToRefreshBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); - var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct)); + var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), standardBlockOptions ); var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 2, CancellationToken = ct }); - var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct)); + var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = ct }); - var sendTweetsToFollowersBlock = new TransformBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct }); + var sendTweetsToFollowersBlock = new TransformBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct, BoundedCapacity = 1 }); var sendTweetsToFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = ct }); - var saveProgressionBlock = new ActionBlock(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct }); + var saveProgressionBlock = new ActionBlock(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct, BoundedCapacity = 1 }); // Link pipeline twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });