From 714e66e2842850e26dfe1663cd346ae8bdba30c5 Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Fri, 20 Jan 2023 12:53:30 -0500 Subject: [PATCH] added parameter for twitter request parallelism --- .../Settings/InstanceSettings.cs | 1 + .../Processors/RetrieveTweetsProcessor.cs | 16 ++++++++++++---- .../Processors/RetrieveTwitterUsersProcessor.cs | 2 +- .../StatusPublicationPipeline.cs | 4 ++-- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/BirdsiteLive.Common/Settings/InstanceSettings.cs b/src/BirdsiteLive.Common/Settings/InstanceSettings.cs index 89b535b..84821d4 100644 --- a/src/BirdsiteLive.Common/Settings/InstanceSettings.cs +++ b/src/BirdsiteLive.Common/Settings/InstanceSettings.cs @@ -15,5 +15,6 @@ public int FailingFollowerCleanUpThreshold { get; set; } = -1; public int UserCacheCapacity { get; set; } + public int ParallelTwitterRequests { get; set; } = 10; } } diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs index 0f85d7c..b7e4003 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs @@ -10,6 +10,7 @@ using BirdsiteLive.Pipeline.Contracts; using BirdsiteLive.Pipeline.Models; using BirdsiteLive.Twitter; using BirdsiteLive.Twitter.Models; +using BirdsiteLive.Common.Settings; using Microsoft.Extensions.Logging; using Tweetinvi.Models; @@ -21,21 +22,29 @@ namespace BirdsiteLive.Pipeline.Processors private readonly ICachedTwitterUserService _twitterUserService; private readonly ITwitterUserDal _twitterUserDal; private readonly ILogger _logger; + private readonly InstanceSettings _settings; #region Ctor - public RetrieveTweetsProcessor(ITwitterTweetsService twitterTweetsService, ITwitterUserDal twitterUserDal, ICachedTwitterUserService twitterUserService, ILogger logger) + public RetrieveTweetsProcessor(ITwitterTweetsService twitterTweetsService, ITwitterUserDal twitterUserDal, ICachedTwitterUserService twitterUserService, InstanceSettings settings, ILogger logger) { _twitterTweetsService = twitterTweetsService; _twitterUserDal = twitterUserDal; _twitterUserService = twitterUserService; _logger = logger; + _settings = settings; } #endregion public async Task ProcessAsync(UserWithDataToSync[] syncTwitterUsers, CancellationToken ct) { - var usersWtTweets = new ConcurrentBag(); + if (_settings.ParallelTwitterRequests == 0) + { + while(true) + await Task.Delay(1000); + } + + var usersWtTweets = new ConcurrentBag(); List todo = new List(); int index = 0; foreach (var userWtData in syncTwitterUsers) @@ -64,11 +73,10 @@ namespace BirdsiteLive.Pipeline.Processors } }); todo.Add(t); - if (todo.Count > 10) + if (todo.Count > _settings.ParallelTwitterRequests) { await Task.WhenAll(todo); todo.Clear(); - //await Task.Delay(250); } } diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs index 6224bb5..8734cc5 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs @@ -38,7 +38,7 @@ namespace BirdsiteLive.Pipeline.Processors { var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(500); - var userCount = users.Any() ? Math.Min(users.Length, 25) : 1; + var userCount = users.Any() ? Math.Min(users.Length, 50) : 1; var splitUsers = users.OrderBy(a => rng.Next()).ToArray().Split(userCount).ToList(); foreach (var u in splitUsers) diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index 16a40ee..ca8f047 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -46,10 +46,10 @@ namespace BirdsiteLive.Pipeline var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 } ); var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); - var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); + var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 400, CancellationToken = ct }); var sendTweetsToFollowersBlock = new TransformBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 }); var sendTweetsToFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); - var saveProgressionBlock = new ActionBlock(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct, BoundedCapacity = 1 }); + var saveProgressionBlock = new ActionBlock(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 }); // Link pipeline twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });