From d1018881ec31154cb5a7d6725a104b58ef7cff0b Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Tue, 10 Jan 2023 20:30:07 -0500 Subject: [PATCH] now fetching twitter feed in parallel --- .../Processors/RetrieveTweetsProcessor.cs | 2 +- .../Processors/RetrieveTwitterUsersProcessor.cs | 3 ++- src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs index 232e812..681ee66 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs @@ -59,7 +59,7 @@ namespace BirdsiteLive.Pipeline.Processors await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now); } - //await Task.Delay(150); + await Task.Delay(250); } return usersWtTweets.ToArray(); diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs index 8c6eb39..8950c82 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs @@ -18,6 +18,7 @@ namespace BirdsiteLive.Pipeline.Processors private readonly ITwitterUserDal _twitterUserDal; private readonly IMaxUsersNumberProvider _maxUsersNumberProvider; private readonly ILogger _logger; + private static Random rng = new Random(); public int WaitFactor = 1000 * 60; //1 min @@ -41,7 +42,7 @@ namespace BirdsiteLive.Pipeline.Processors var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(500); var userCount = users.Any() ? Math.Min(users.Length, 25) : 1; - var splitUsers = users.Split(userCount).ToList(); + var splitUsers = users.OrderBy(a => rng.Next()).ToArray().Split(userCount).ToList(); foreach (var u in splitUsers) { diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index 43437de..b629b7e 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -43,11 +43,11 @@ namespace BirdsiteLive.Pipeline // Create blocks var twitterUserToRefreshBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); - var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), standardBlockOptions ); - var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = ct }); + var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 2 } ); + var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); - var sendTweetsToFollowersBlock = new TransformBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct, BoundedCapacity = 1 }); + var sendTweetsToFollowersBlock = new TransformBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 }); var sendTweetsToFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var saveProgressionBlock = new ActionBlock(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct, BoundedCapacity = 1 });