made posting to fediverse servers parallel

This commit is contained in:
Vincent Cloutier 2023-02-22 11:54:03 -05:00
parent 3a47655671
commit 2674041a22
4 changed files with 83 additions and 44 deletions

View file

@ -16,5 +16,6 @@
public int UserCacheCapacity { get; set; }
public int ParallelTwitterRequests { get; set; } = 10;
public int ParallelFediversePosts { get; set; } = 10;
}
}

View file

@ -28,6 +28,7 @@ namespace BirdsiteLive.Pipeline.Processors
private readonly InstanceSettings _instanceSettings;
private readonly ILogger<SendTweetsToFollowersProcessor> _logger;
private readonly IRemoveFollowerAction _removeFollowerAction;
private List<Task> _todo = new List<Task>();
#region Ctor
public SendTweetsToFollowersProcessor(ISendTweetsToInboxTask sendTweetsToInboxTask, ISendTweetsToSharedInboxTask sendTweetsToSharedInbox, ISaveProgressionTask saveProgressionTask, IFollowersDal followersDal, ILogger<SendTweetsToFollowersProcessor> logger, InstanceSettings instanceSettings, IRemoveFollowerAction removeFollowerAction)
@ -46,51 +47,56 @@ namespace BirdsiteLive.Pipeline.Processors
{
var user = userWithTweetsToSync.User;
// Process Shared Inbox
var followersWtSharedInbox = userWithTweetsToSync.Followers
.Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user);
_todo = _todo.Where(x => !x.IsCompleted).ToList();
var t = Task.Run( async () =>
{
// Process Shared Inbox
var followersWtSharedInbox = userWithTweetsToSync.Followers
.Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user);
// Process Inbox
var followerWtInbox = userWithTweetsToSync.Followers
.Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user);
// Process Inbox
var followerWtInbox = userWithTweetsToSync.Followers
.Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user);
await _saveProgressionTask.ProcessAsync(userWithTweetsToSync, ct);
await _saveProgressionTask.ProcessAsync(userWithTweetsToSync, ct);
});
_todo.Add(t);
if (_todo.Count >= _instanceSettings.ParallelFediversePosts)
{
await Task.WhenAny(_todo);
}
}
private async Task ProcessFollowersWithSharedInboxAsync(ExtractedTweet[] tweets, List<Follower> followers, SyncTwitterUser user)
{
var followersPerInstances = followers.GroupBy(x => x.Host);
List<Task> todo = new List<Task>();
foreach (var followersPerInstance in followersPerInstances)
{
var t = Task.Run( async () =>
try
{
try
{
_logger.LogInformation("Sending " + tweets.Length + " tweets from user " + user.Acct + " to instance " + followersPerInstance.Key);
await _sendTweetsToSharedInbox.ExecuteAsync(tweets, user, followersPerInstance.Key, followersPerInstance.ToArray());
_logger.LogInformation("Sending " + tweets.Length + " tweets from user " + user.Acct + " to instance " + followersPerInstance.Key);
await _sendTweetsToSharedInbox.ExecuteAsync(tweets, user, followersPerInstance.Key, followersPerInstance.ToArray());
foreach (var f in followersPerInstance)
await ProcessWorkingUserAsync(f);
}
catch (Exception e)
{
var follower = followersPerInstance.First();
_logger.LogError(e, "Posting to {Host}{Route} failed", follower.Host, follower.SharedInboxRoute);
foreach (var f in followersPerInstance)
await ProcessWorkingUserAsync(f);
}
catch (Exception e)
{
var follower = followersPerInstance.First();
_logger.LogError(e, "Posting to {Host}{Route} failed", follower.Host, follower.SharedInboxRoute);
foreach (var f in followersPerInstance)
await ProcessFailingUserAsync(f);
}
});
todo.Add(t);
foreach (var f in followersPerInstance)
await ProcessFailingUserAsync(f);
}
}
await Task.WhenAll(todo);
}
private async Task ProcessFollowersWithInboxAsync(ExtractedTweet[] tweets, List<Follower> followerWtInbox, SyncTwitterUser user)

View file

@ -47,7 +47,7 @@ namespace BirdsiteLive.Pipeline
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct });
var sendTweetsToFollowersBlock = new ActionBlock<UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 });
var sendTweetsToFollowersBlock = new ActionBlock<UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, CancellationToken = ct, BoundedCapacity = 1 });
// Link pipeline
twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });

View file

@ -79,7 +79,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<SendTweetsToFollowersProcessor>>();
var saveProgressMock = new Mock<ISaveProgressionTask>();
var settings = new InstanceSettings();
var settings = new InstanceSettings
{
ParallelFediversePosts = 1
};
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
@ -157,7 +160,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<SendTweetsToFollowersProcessor>>();
var settings = new InstanceSettings();
var settings = new InstanceSettings
{
ParallelFediversePosts = 1
};
var saveProgressMock = new Mock<ISaveProgressionTask>();
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
@ -246,7 +252,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<SendTweetsToFollowersProcessor>>();
var saveProgressMock = new Mock<ISaveProgressionTask>();
var settings = new InstanceSettings();
var settings = new InstanceSettings
{
ParallelFediversePosts = 1
};
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
@ -335,7 +344,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<SendTweetsToFollowersProcessor>>();
var saveProgressMock = new Mock<ISaveProgressionTask>();
var settings = new InstanceSettings();
var settings = new InstanceSettings
{
ParallelFediversePosts = 1
};
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
@ -429,7 +441,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<SendTweetsToFollowersProcessor>>();
var saveProgressMock = new Mock<ISaveProgressionTask>();
var settings = new InstanceSettings();
var settings = new InstanceSettings
{
ParallelFediversePosts = 1
};
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
@ -506,7 +521,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<SendTweetsToFollowersProcessor>>();
var saveProgressMock = new Mock<ISaveProgressionTask>();
var settings = new InstanceSettings();
var settings = new InstanceSettings
{
ParallelFediversePosts = 1
};
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
@ -584,7 +602,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<SendTweetsToFollowersProcessor>>();
var saveProgressMock = new Mock<ISaveProgressionTask>();
var settings = new InstanceSettings();
var settings = new InstanceSettings
{
ParallelFediversePosts = 1
};
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
@ -670,7 +691,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<SendTweetsToFollowersProcessor>>();
var saveProgressMock = new Mock<ISaveProgressionTask>();
var settings = new InstanceSettings();
var settings = new InstanceSettings
{
ParallelFediversePosts = 1
};
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
@ -755,7 +779,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var settings = new InstanceSettings
{
FailingFollowerCleanUpThreshold = 10
FailingFollowerCleanUpThreshold = 10,
ParallelFediversePosts = 1
};
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
@ -844,7 +869,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var settings = new InstanceSettings
{
FailingFollowerCleanUpThreshold = 0
FailingFollowerCleanUpThreshold = 0,
ParallelFediversePosts = 1
};
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
@ -935,7 +961,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<SendTweetsToFollowersProcessor>>();
var saveProgressMock = new Mock<ISaveProgressionTask>();
var settings = new InstanceSettings();
var settings = new InstanceSettings
{
ParallelFediversePosts = 1
};
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
@ -1027,8 +1056,11 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<SendTweetsToFollowersProcessor>>();
var saveProgressMock = new Mock<ISaveProgressionTask>();
var settings = new InstanceSettings();
var settings = new InstanceSettings
{
ParallelFediversePosts = 1
};
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion