added parameter for twitter request parallelism

This commit is contained in:
Vincent Cloutier 2023-01-20 12:53:30 -05:00
parent 4a7373ec07
commit 714e66e284
4 changed files with 16 additions and 7 deletions

View file

@ -15,5 +15,6 @@
public int FailingFollowerCleanUpThreshold { get; set; } = -1;
public int UserCacheCapacity { get; set; }
public int ParallelTwitterRequests { get; set; } = 10;
}
}

View file

@ -10,6 +10,7 @@ using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Models;
using BirdsiteLive.Twitter;
using BirdsiteLive.Twitter.Models;
using BirdsiteLive.Common.Settings;
using Microsoft.Extensions.Logging;
using Tweetinvi.Models;
@ -21,21 +22,29 @@ namespace BirdsiteLive.Pipeline.Processors
private readonly ICachedTwitterUserService _twitterUserService;
private readonly ITwitterUserDal _twitterUserDal;
private readonly ILogger<RetrieveTweetsProcessor> _logger;
private readonly InstanceSettings _settings;
#region Ctor
public RetrieveTweetsProcessor(ITwitterTweetsService twitterTweetsService, ITwitterUserDal twitterUserDal, ICachedTwitterUserService twitterUserService, ILogger<RetrieveTweetsProcessor> logger)
public RetrieveTweetsProcessor(ITwitterTweetsService twitterTweetsService, ITwitterUserDal twitterUserDal, ICachedTwitterUserService twitterUserService, InstanceSettings settings, ILogger<RetrieveTweetsProcessor> logger)
{
_twitterTweetsService = twitterTweetsService;
_twitterUserDal = twitterUserDal;
_twitterUserService = twitterUserService;
_logger = logger;
_settings = settings;
}
#endregion
public async Task<UserWithDataToSync[]> ProcessAsync(UserWithDataToSync[] syncTwitterUsers, CancellationToken ct)
{
var usersWtTweets = new ConcurrentBag<UserWithDataToSync>();
if (_settings.ParallelTwitterRequests == 0)
{
while(true)
await Task.Delay(1000);
}
var usersWtTweets = new ConcurrentBag<UserWithDataToSync>();
List<Task> todo = new List<Task>();
int index = 0;
foreach (var userWtData in syncTwitterUsers)
@ -64,11 +73,10 @@ namespace BirdsiteLive.Pipeline.Processors
}
});
todo.Add(t);
if (todo.Count > 10)
if (todo.Count > _settings.ParallelTwitterRequests)
{
await Task.WhenAll(todo);
todo.Clear();
//await Task.Delay(250);
}
}

View file

@ -38,7 +38,7 @@ namespace BirdsiteLive.Pipeline.Processors
{
var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(500);
var userCount = users.Any() ? Math.Min(users.Length, 25) : 1;
var userCount = users.Any() ? Math.Min(users.Length, 50) : 1;
var splitUsers = users.OrderBy(a => rng.Next()).ToArray().Split(userCount).ToList();
foreach (var u in splitUsers)

View file

@ -46,10 +46,10 @@ namespace BirdsiteLive.Pipeline
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 } );
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 400, CancellationToken = ct });
var sendTweetsToFollowersBlock = new TransformBlock<UserWithDataToSync, UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 });
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var saveProgressionBlock = new ActionBlock<UserWithDataToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct, BoundedCapacity = 1 });
var saveProgressionBlock = new ActionBlock<UserWithDataToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 });
// Link pipeline
twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });