From 4dd071abe24217b8c0f6ec06e279b7ddbcec4889 Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Fri, 17 Mar 2023 15:10:53 -0400 Subject: [PATCH 01/16] cache tweaks --- src/BirdsiteLive.Common/Settings/InstanceSettings.cs | 1 + src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs | 4 +++- src/BirdsiteLive.Twitter/CachedTwitterTweetsService.cs | 3 +-- .../StatusPublicationPipelineTests.cs | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/BirdsiteLive.Common/Settings/InstanceSettings.cs b/src/BirdsiteLive.Common/Settings/InstanceSettings.cs index db82611..c57fb27 100644 --- a/src/BirdsiteLive.Common/Settings/InstanceSettings.cs +++ b/src/BirdsiteLive.Common/Settings/InstanceSettings.cs @@ -15,6 +15,7 @@ public int FailingFollowerCleanUpThreshold { get; set; } = -1; public int UserCacheCapacity { get; set; } + public int TweetCacheCapacity { get; set; } = 20_000; public int ParallelTwitterRequests { get; set; } = 10; public int ParallelFediversePosts { get; set; } = 10; } diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index 88fbb28..045c673 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -56,7 +56,9 @@ namespace BirdsiteLive.Pipeline retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); - // Launch twitter user retriever + // Launch twitter user retriever after a little delay + // to give time for the Tweet cache to fill + await Task.Delay(30 * 1000, ct); var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUserToRefreshBufferBlock, ct); // Wait diff --git a/src/BirdsiteLive.Twitter/CachedTwitterTweetsService.cs b/src/BirdsiteLive.Twitter/CachedTwitterTweetsService.cs index 83da0ac..4752a19 100644 --- a/src/BirdsiteLive.Twitter/CachedTwitterTweetsService.cs +++ b/src/BirdsiteLive.Twitter/CachedTwitterTweetsService.cs @@ -18,7 +18,6 @@ namespace BirdsiteLive.Twitter private readonly MemoryCache _tweetCache; private readonly MemoryCacheEntryOptions _cacheEntryOptions = new MemoryCacheEntryOptions() - .SetSize(10000)//Size amount //Priority on removing when reaching size limit (memory pressure) .SetPriority(CacheItemPriority.Low) // Keep in cache for this time, reset time if accessed. @@ -33,7 +32,7 @@ namespace BirdsiteLive.Twitter _tweetCache = new MemoryCache(new MemoryCacheOptions() { - SizeLimit = 10000 //TODO make this use number of entries in db + SizeLimit = settings.TweetCacheCapacity, }); } #endregion diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/StatusPublicationPipelineTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/StatusPublicationPipelineTests.cs index c439b72..31f52c5 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/StatusPublicationPipelineTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/StatusPublicationPipelineTests.cs @@ -16,7 +16,7 @@ namespace BirdsiteLive.Pipeline.Tests public async Task ExecuteAsync_Test() { #region Stubs - var ct = new CancellationTokenSource(10); + var ct = new CancellationTokenSource(100 * 1000); #endregion #region Mocks From 160ef9762688573190fde62de8bfe678f58bb0f3 Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Fri, 17 Mar 2023 15:51:11 -0400 Subject: [PATCH 02/16] pipeline simplifications --- .../SendTweetsToFollowersProcessor.cs | 6 +- .../SubTasks/SendTweetsToInboxTask.cs | 41 ++++--------- .../SubTasks/SendTweetsToSharedInboxTask.cs | 44 ++++--------- .../CachedTwitterTweetsService.cs | 18 +++--- .../SendTweetsToFollowersProcessorTests.cs | 24 ++++---- .../SubTasks/SendTweetsToInboxTaskTests.cs | 47 +++++--------- .../SubTasks/SendTweetsToSharedInboxTests.cs | 61 ++++--------------- 7 files changed, 76 insertions(+), 165 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs index 796e472..c04a8cd 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs @@ -22,7 +22,6 @@ namespace BirdsiteLive.Pipeline.Processors public class SendTweetsToFollowersProcessor : ISendTweetsToFollowersProcessor { private readonly ISendTweetsToInboxTask _sendTweetsToInboxTask; - private readonly ISaveProgressionTask _saveProgressionTask; private readonly ISendTweetsToSharedInboxTask _sendTweetsToSharedInbox; private readonly IFollowersDal _followersDal; private readonly InstanceSettings _instanceSettings; @@ -31,14 +30,13 @@ namespace BirdsiteLive.Pipeline.Processors private List _todo = new List(); #region Ctor - public SendTweetsToFollowersProcessor(ISendTweetsToInboxTask sendTweetsToInboxTask, ISendTweetsToSharedInboxTask sendTweetsToSharedInbox, ISaveProgressionTask saveProgressionTask, IFollowersDal followersDal, ILogger logger, InstanceSettings instanceSettings, IRemoveFollowerAction removeFollowerAction) + public SendTweetsToFollowersProcessor(ISendTweetsToInboxTask sendTweetsToInboxTask, ISendTweetsToSharedInboxTask sendTweetsToSharedInbox, IFollowersDal followersDal, ILogger logger, InstanceSettings instanceSettings, IRemoveFollowerAction removeFollowerAction) { _sendTweetsToInboxTask = sendTweetsToInboxTask; _sendTweetsToSharedInbox = sendTweetsToSharedInbox; _logger = logger; _instanceSettings = instanceSettings; _removeFollowerAction = removeFollowerAction; - _saveProgressionTask = saveProgressionTask; _followersDal = followersDal; } #endregion @@ -62,8 +60,6 @@ namespace BirdsiteLive.Pipeline.Processors .Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute)) .ToList(); await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user); - - await _saveProgressionTask.ProcessAsync(userWithTweetsToSync, ct); }); _todo.Add(t); diff --git a/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToInboxTask.cs b/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToInboxTask.cs index ec98bd8..7b0c533 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToInboxTask.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToInboxTask.cs @@ -31,7 +31,6 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks { _activityPubService = activityPubService; _statusService = statusService; - _followersDal = followersDal; _settings = settings; _logger = logger; } @@ -40,46 +39,32 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks public async Task ExecuteAsync(IEnumerable tweets, Follower follower, SyncTwitterUser user) { var userId = user.Id; - var fromStatusId = follower.FollowingsSyncStatus[userId]; + //var fromStatusId = follower.FollowingsSyncStatus[userId]; var tweetsToSend = tweets - .Where(x => x.Id > fromStatusId) .OrderBy(x => x.Id) .ToList(); var inbox = follower.InboxRoute; - var syncStatus = fromStatusId; - try + foreach (var tweet in tweetsToSend) { - foreach (var tweet in tweetsToSend) + try { - try + var activity = _statusService.GetActivity(user.Acct, tweet); + await _activityPubService.PostNewActivity(activity, user.Acct, tweet.Id.ToString(), follower.Host, inbox); + } + catch (ArgumentException e) + { + if (e.Message.Contains("Invalid pattern") && e.Message.Contains("at offset")) //Regex exception { - var activity = _statusService.GetActivity(user.Acct, tweet); - await _activityPubService.PostNewActivity(activity, user.Acct, tweet.Id.ToString(), follower.Host, inbox); + _logger.LogError(e, "Can't parse {MessageContent} from Tweet {Id}", tweet.MessageContent, tweet.Id); } - catch (ArgumentException e) + else { - if (e.Message.Contains("Invalid pattern") && e.Message.Contains("at offset")) //Regex exception - { - _logger.LogError(e, "Can't parse {MessageContent} from Tweet {Id}", tweet.MessageContent, tweet.Id); - } - else - { - throw; - } + throw; } + } - syncStatus = tweet.Id; - } - } - finally - { - if (syncStatus != fromStatusId) - { - follower.FollowingsSyncStatus[userId] = syncStatus; - await _followersDal.UpdateFollowerAsync(follower); - } } } } diff --git a/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToSharedInboxTask.cs b/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToSharedInboxTask.cs index 72a7c41..f66eeab 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToSharedInboxTask.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToSharedInboxTask.cs @@ -40,50 +40,30 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks var userId = user.Id; var inbox = followersPerInstance.First().SharedInboxRoute; - var fromStatusId = followersPerInstance - .Max(x => x.FollowingsSyncStatus[userId]); - var tweetsToSend = tweets - .Where(x => x.Id > fromStatusId) .OrderBy(x => x.Id) .ToList(); _logger.LogInformation("After filtering, there were " + tweetsToSend.Count() + " tweets left to send"); - var syncStatus = fromStatusId; - try + foreach (var tweet in tweetsToSend) { - foreach (var tweet in tweetsToSend) + try { - try + var activity = _statusService.GetActivity(user.Acct, tweet); + await _activityPubService.PostNewActivity(activity, user.Acct, tweet.Id.ToString(), host, inbox); + } + catch (ArgumentException e) + { + if (e.Message.Contains("Invalid pattern") && e.Message.Contains("at offset")) //Regex exception { - var activity = _statusService.GetActivity(user.Acct, tweet); - await _activityPubService.PostNewActivity(activity, user.Acct, tweet.Id.ToString(), host, inbox); + _logger.LogError(e, "Can't parse {MessageContent} from Tweet {Id}", tweet.MessageContent, tweet.Id); } - catch (ArgumentException e) + else { - if (e.Message.Contains("Invalid pattern") && e.Message.Contains("at offset")) //Regex exception - { - _logger.LogError(e, "Can't parse {MessageContent} from Tweet {Id}", tweet.MessageContent, tweet.Id); - } - else - { - throw; - } + throw; } + } - syncStatus = tweet.Id; - } - } - finally - { - if (syncStatus != fromStatusId) - { - foreach (var f in followersPerInstance) - { - f.FollowingsSyncStatus[userId] = syncStatus; - await _followersDal.UpdateFollowerAsync(f); - } - } } } } diff --git a/src/BirdsiteLive.Twitter/CachedTwitterTweetsService.cs b/src/BirdsiteLive.Twitter/CachedTwitterTweetsService.cs index 4752a19..a2a85fc 100644 --- a/src/BirdsiteLive.Twitter/CachedTwitterTweetsService.cs +++ b/src/BirdsiteLive.Twitter/CachedTwitterTweetsService.cs @@ -17,13 +17,7 @@ namespace BirdsiteLive.Twitter private readonly ITwitterTweetsService _twitterService; private readonly MemoryCache _tweetCache; - private readonly MemoryCacheEntryOptions _cacheEntryOptions = new MemoryCacheEntryOptions() - //Priority on removing when reaching size limit (memory pressure) - .SetPriority(CacheItemPriority.Low) - // Keep in cache for this time, reset time if accessed. - .SetSlidingExpiration(TimeSpan.FromMinutes(60)) - // Remove from cache after this time, regardless of sliding expiration - .SetAbsoluteExpiration(TimeSpan.FromDays(1)); + private readonly MemoryCacheEntryOptions _cacheEntryOptions; #region Ctor public CachedTwitterTweetsService(ITwitterTweetsService twitterService, InstanceSettings settings) @@ -32,8 +26,16 @@ namespace BirdsiteLive.Twitter _tweetCache = new MemoryCache(new MemoryCacheOptions() { - SizeLimit = settings.TweetCacheCapacity, + SizeLimit = settings.TweetCacheCapacity, }); + _cacheEntryOptions = new MemoryCacheEntryOptions() + .SetSize(1) + //Priority on removing when reaching size limit (memory pressure) + .SetPriority(CacheItemPriority.Low) + // Keep in cache for this time, reset time if accessed. + .SetSlidingExpiration(TimeSpan.FromMinutes(60)) + // Remove from cache after this time, regardless of sliding expiration + .SetAbsoluteExpiration(TimeSpan.FromDays(1)); } #endregion diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs index 03e90ad..06d8b67 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs @@ -87,7 +87,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion - var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); + var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); await processor.ProcessAsync(userWithTweets, CancellationToken.None); #region Validations @@ -169,7 +169,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion - var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); + var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); await processor.ProcessAsync(userWithTweets, CancellationToken.None); #region Validations @@ -260,7 +260,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion - var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); + var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); await processor.ProcessAsync(userWithTweets, CancellationToken.None); #region Validations @@ -352,7 +352,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion - var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); + var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); await processor.ProcessAsync(userWithTweets, CancellationToken.None); #region Validations @@ -449,7 +449,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion - var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); + var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); await processor.ProcessAsync(userWithTweets, CancellationToken.None); #region Validations @@ -529,7 +529,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion - var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); + var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); await processor.ProcessAsync(userWithTweets, CancellationToken.None); #region Validations @@ -610,7 +610,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion - var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); + var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); await processor.ProcessAsync(userWithTweets, CancellationToken.None); #region Validations @@ -699,7 +699,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion - var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); + var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); await processor.ProcessAsync(userWithTweets, CancellationToken.None); #region Validations @@ -789,7 +789,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors .Returns(Task.CompletedTask); #endregion - var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); + var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); await processor.ProcessAsync(userWithTweets, CancellationToken.None); #region Validations @@ -879,7 +879,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors .Returns(Task.CompletedTask); #endregion - var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); + var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); await processor.ProcessAsync(userWithTweets, CancellationToken.None); #region Validations @@ -969,7 +969,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion - var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); + var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); await processor.ProcessAsync(userWithTweets, CancellationToken.None); #region Validations @@ -1064,7 +1064,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion - var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); + var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); await processor.ProcessAsync(userWithTweets, CancellationToken.None); #region Validations diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToInboxTaskTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToInboxTaskTests.cs index e8a6fbf..da1cfb7 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToInboxTaskTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToInboxTaskTests.cs @@ -85,10 +85,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks .Returns(activity); var followersDalMock = new Mock(MockBehavior.Strict); - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId))) - .Returns(Task.CompletedTask); var loggerMock = new Mock>(); #endregion @@ -168,10 +164,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks .Returns(activity); var followersDalMock = new Mock(MockBehavior.Strict); - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId))) - .Returns(Task.CompletedTask); var loggerMock = new Mock>(); #endregion @@ -254,10 +246,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks .Returns(activity); var followersDalMock = new Mock(MockBehavior.Strict); - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId))) - .Returns(Task.CompletedTask); + var loggerMock = new Mock>(); #endregion @@ -340,10 +329,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks .Returns(activity); var followersDalMock = new Mock(MockBehavior.Strict); - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId))) - .Returns(Task.CompletedTask); + var loggerMock = new Mock>(); #endregion @@ -400,7 +386,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks #region Mocks var activityPubService = new Mock(MockBehavior.Strict); - foreach (var tweetId in new[] { tweetId2, tweetId3 }) + foreach (var tweetId in new[] { tweetId1, tweetId2, tweetId3 }) { activityPubService .Setup(x => x.PostNewActivity( @@ -413,7 +399,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks } var statusServiceMock = new Mock(MockBehavior.Strict); - foreach (var tweetId in new[] { tweetId2, tweetId3 }) + foreach (var tweetId in new[] { tweetId1, tweetId2, tweetId3 }) { statusServiceMock .Setup(x => x.GetActivity( @@ -423,10 +409,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks } var followersDalMock = new Mock(MockBehavior.Strict); - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId3))) - .Returns(Task.CompletedTask); + var loggerMock = new Mock>(); #endregion @@ -485,6 +468,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks #region Mocks var activityPubService = new Mock(MockBehavior.Strict); + activityPubService + .Setup(x => x.PostNewActivity( + It.Is(y => y.apObject.id == tweetId1.ToString()), + It.Is(y => y == twitterHandle), + It.Is(y => y == tweetId1.ToString()), + It.Is(y => y == host), + It.Is(y => y == inbox))) + .Returns(Task.CompletedTask); + activityPubService .Setup(x => x.PostNewActivity( It.Is(y => y.apObject.id == tweetId2.ToString()), @@ -504,7 +496,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks .Throws(new HttpRequestException()); var statusServiceMock = new Mock(MockBehavior.Strict); - foreach (var tweetId in new[] { tweetId2, tweetId3 }) + foreach (var tweetId in new[] { tweetId1, tweetId2, tweetId3 }) { statusServiceMock .Setup(x => x.GetActivity( @@ -514,10 +506,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks } var followersDalMock = new Mock(MockBehavior.Strict); - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId2))) - .Returns(Task.CompletedTask); var loggerMock = new Mock>(); #endregion @@ -592,10 +580,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks .Throws(new ArgumentException("Invalid pattern blabla at offset 9")); var followersDalMock = new Mock(MockBehavior.Strict); - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId))) - .Returns(Task.CompletedTask); + var loggerMock = new Mock>(); #endregion diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToSharedInboxTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToSharedInboxTests.cs index ef26beb..4846de1 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToSharedInboxTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToSharedInboxTests.cs @@ -105,13 +105,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks var followersDalMock = new Mock(MockBehavior.Strict); - foreach (var follower in followers) - { - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId))) - .Returns(Task.CompletedTask); - } var loggerMock = new Mock>(); #endregion @@ -212,14 +205,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks var followersDalMock = new Mock(MockBehavior.Strict); - foreach (var follower in followers) - { - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId))) - .Returns(Task.CompletedTask); - } - var loggerMock = new Mock>(); #endregion @@ -318,13 +303,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks var followersDalMock = new Mock(MockBehavior.Strict); - foreach (var follower in followers) - { - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId))) - .Returns(Task.CompletedTask); - } var loggerMock = new Mock>(); #endregion @@ -398,7 +376,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks #region Mocks var activityPubService = new Mock(MockBehavior.Strict); - foreach (var tweetId in new[] { tweetId2, tweetId3 }) + foreach (var tweetId in new[] { tweetId1, tweetId2, tweetId3 }) { activityPubService .Setup(x => x.PostNewActivity( @@ -411,7 +389,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks } var statusServiceMock = new Mock(MockBehavior.Strict); - foreach (var tweetId in new[] { tweetId2, tweetId3 }) + foreach (var tweetId in new[] { tweetId1, tweetId2, tweetId3 }) { statusServiceMock .Setup(x => x.GetActivity( @@ -422,14 +400,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks var followersDalMock = new Mock(MockBehavior.Strict); - foreach (var follower in followers) - { - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId3))) - .Returns(Task.CompletedTask); - } - var loggerMock = new Mock>(); #endregion @@ -504,6 +474,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks #region Mocks var activityPubService = new Mock(MockBehavior.Strict); + activityPubService + .Setup(x => x.PostNewActivity( + It.Is(y => y.apObject.id == tweetId1.ToString()), + It.Is(y => y == twitterHandle), + It.Is(y => y == tweetId1.ToString()), + It.Is(y => y == host), + It.Is(y => y == inbox))) + .Returns(Task.CompletedTask); + activityPubService .Setup(x => x.PostNewActivity( It.Is(y => y.apObject.id == tweetId2.ToString()), @@ -523,7 +502,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks .Throws(new HttpRequestException()); var statusServiceMock = new Mock(MockBehavior.Strict); - foreach (var tweetId in new[] { tweetId2, tweetId3 }) + foreach (var tweetId in new[] { tweetId1, tweetId2, tweetId3 }) { statusServiceMock .Setup(x => x.GetActivity( @@ -534,14 +513,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks var followersDalMock = new Mock(MockBehavior.Strict); - foreach (var follower in followers) - { - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId2))) - .Returns(Task.CompletedTask); - } - var loggerMock = new Mock>(); #endregion @@ -633,14 +604,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks var followersDalMock = new Mock(MockBehavior.Strict); - foreach (var follower in followers) - { - followersDalMock - .Setup(x => x.UpdateFollowerAsync( - It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId))) - .Returns(Task.CompletedTask); - } - var loggerMock = new Mock>(); #endregion From 37725dfd9cd9b1a67d81557583f7c89321abe387 Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Fri, 17 Mar 2023 16:03:44 -0400 Subject: [PATCH 03/16] catch an exception --- src/BirdsiteLive.Domain/UserService.cs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/BirdsiteLive.Domain/UserService.cs b/src/BirdsiteLive.Domain/UserService.cs index 8131c7e..62a72b6 100644 --- a/src/BirdsiteLive.Domain/UserService.cs +++ b/src/BirdsiteLive.Domain/UserService.cs @@ -196,9 +196,16 @@ namespace BirdsiteLive.Domain apObject = activity.apObject } }; - var result = await _activityPubService.PostDataAsync(acceptFollow, followerHost, activity.apObject); - return result == HttpStatusCode.Accepted || - result == HttpStatusCode.OK; //TODO: revamp this for better error handling + try + { + var result = await _activityPubService.PostDataAsync(acceptFollow, followerHost, activity.apObject); + return result == HttpStatusCode.Accepted || + result == HttpStatusCode.OK; + } + catch (Exception e) + { + return false; + } } public async Task SendRejectFollowAsync(ActivityFollow activity, string followerHost) From 240dfd19025f183c7db11af561826db2f4510f8b Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Fri, 17 Mar 2023 16:14:30 -0400 Subject: [PATCH 04/16] pipeline tweaks --- src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index 045c673..5638a96 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -39,15 +39,15 @@ namespace BirdsiteLive.Pipeline public async Task ExecuteAsync(CancellationToken ct) { - var standardBlockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }; + var standardBlockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1, CancellationToken = ct}; // Create blocks var twitterUserToRefreshBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); - var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 } ); + var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), standardBlockOptions ); var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct }); - var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, CancellationToken = ct, BoundedCapacity = 1 }); + var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); // Link pipeline twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true }); From 5dcb1199c71aff4a722bdc3e40ed360c3b1d0a5b Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Fri, 17 Mar 2023 16:31:52 -0400 Subject: [PATCH 05/16] limit parallel postgres requests --- .../Processors/RetrieveFollowersProcessor.cs | 7 +++++++ .../BirdsiteLive.DAL.Postgres.csproj | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs index a34adfb..88a56a8 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using BirdsiteLive.DAL.Contracts; @@ -29,7 +30,13 @@ namespace BirdsiteLive.Pipeline.Processors user.Followers = followers; }); todo.Add(t); + + if (todo.Count(x => !x.IsCompleted) >= 25) + { + await Task.WhenAny(todo); + } } + await Task.WhenAll(todo); return userWithTweetsToSyncs; diff --git a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/BirdsiteLive.DAL.Postgres.csproj b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/BirdsiteLive.DAL.Postgres.csproj index 84a3210..c4c49db 100644 --- a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/BirdsiteLive.DAL.Postgres.csproj +++ b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/BirdsiteLive.DAL.Postgres.csproj @@ -6,7 +6,7 @@ - + From 66e2ba9b060921d1660ea1bc3169e12538f9ea80 Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Fri, 17 Mar 2023 19:45:51 -0400 Subject: [PATCH 06/16] tweak checkpointing of twitter fetches --- .../Processors/RetrieveTweetsProcessor.cs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs index 119e1b4..4678bd3 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs @@ -51,9 +51,10 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks index++; var t = Task.Run(async () => { + var user = userWtData.User; + var now = DateTime.UtcNow; try { - var user = userWtData.User; var tweets = await RetrieveNewTweets(user); _logger.LogInformation(index + "/" + syncTwitterUsers.Count() + " Got " + tweets.Length + " tweets from user " + user.Acct + " " ); if (tweets.Length > 0 && user.LastTweetPostedId != -1) @@ -61,15 +62,13 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks userWtData.Tweets = tweets; usersWtTweets.Add(userWtData); } - else if (tweets.Length > 0 && user.LastTweetPostedId == -1) + else if (tweets.Length > 0) { var tweetId = tweets.Last().Id; - var now = DateTime.UtcNow; await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now); } else { - var now = DateTime.UtcNow; await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now); } @@ -77,7 +76,7 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks catch(Exception e) { _logger.LogError(e.Message); - + await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now); } }); todo.Add(t); From 71dfe4b0197fcd92867729b3367a8c5ab84d576c Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Fri, 17 Mar 2023 22:02:34 -0400 Subject: [PATCH 07/16] tweak checkpointing of twitter fetches 2 --- .../Processors/RetrieveTweetsProcessor.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs index 4678bd3..cfdb8a5 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs @@ -61,9 +61,6 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks { userWtData.Tweets = tweets; usersWtTweets.Add(userWtData); - } - else if (tweets.Length > 0) - { var tweetId = tweets.Last().Id; await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now); } From dd7786ce385b6519b138545837d153d257dee476 Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Sat, 18 Mar 2023 16:16:03 -0400 Subject: [PATCH 08/16] speed tweaks --- src/BirdsiteLive.Common/Settings/InstanceSettings.cs | 2 +- src/BirdsiteLive.Domain/ActivityPubService.cs | 1 - src/BirdsiteLive.Twitter/CachedTwitterService.cs | 4 ++-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/BirdsiteLive.Common/Settings/InstanceSettings.cs b/src/BirdsiteLive.Common/Settings/InstanceSettings.cs index c57fb27..4cec952 100644 --- a/src/BirdsiteLive.Common/Settings/InstanceSettings.cs +++ b/src/BirdsiteLive.Common/Settings/InstanceSettings.cs @@ -14,7 +14,7 @@ public int FailingTwitterUserCleanUpThreshold { get; set; } public int FailingFollowerCleanUpThreshold { get; set; } = -1; - public int UserCacheCapacity { get; set; } + public int UserCacheCapacity { get; set; } = 20_000; public int TweetCacheCapacity { get; set; } = 20_000; public int ParallelTwitterRequests { get; set; } = 10; public int ParallelFediversePosts { get; set; } = 10; diff --git a/src/BirdsiteLive.Domain/ActivityPubService.cs b/src/BirdsiteLive.Domain/ActivityPubService.cs index e18193e..cc9e2b2 100644 --- a/src/BirdsiteLive.Domain/ActivityPubService.cs +++ b/src/BirdsiteLive.Domain/ActivityPubService.cs @@ -106,7 +106,6 @@ namespace BirdsiteLive.Domain var response = await client.SendAsync(httpRequestMessage); response.EnsureSuccessStatusCode(); _logger.LogInformation("Sent tweet to " + targetHost); - _logger.LogInformation("Tweet content is " + json); return response.StatusCode; } diff --git a/src/BirdsiteLive.Twitter/CachedTwitterService.cs b/src/BirdsiteLive.Twitter/CachedTwitterService.cs index a74c3de..92ccf52 100644 --- a/src/BirdsiteLive.Twitter/CachedTwitterService.cs +++ b/src/BirdsiteLive.Twitter/CachedTwitterService.cs @@ -19,7 +19,7 @@ namespace BirdsiteLive.Twitter private readonly MemoryCache _userCache; private readonly MemoryCacheEntryOptions _cacheEntryOptions = new MemoryCacheEntryOptions() - .SetSize(10000)//Size amount + .SetSize(1)//Size amount //Priority on removing when reaching size limit (memory pressure) .SetPriority(CacheItemPriority.Low) // Keep in cache for this time, reset time if accessed. @@ -34,7 +34,7 @@ namespace BirdsiteLive.Twitter _userCache = new MemoryCache(new MemoryCacheOptions() { - SizeLimit = 3000 //TODO make this use number of entries in db + SizeLimit = settings.UserCacheCapacity }); } #endregion From 0bc8b96ea5a4bac928ef6e153dda162bfed1b452 Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Sat, 18 Mar 2023 20:48:23 -0400 Subject: [PATCH 09/16] made retwrieve followers more parallel --- .../Processors/RetrieveFollowersProcessor.cs | 5 ----- src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs index 88a56a8..65357a9 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs @@ -30,11 +30,6 @@ namespace BirdsiteLive.Pipeline.Processors user.Followers = followers; }); todo.Add(t); - - if (todo.Count(x => !x.IsCompleted) >= 25) - { - await Task.WhenAny(todo); - } } await Task.WhenAll(todo); diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index 5638a96..97c4bf6 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -45,7 +45,7 @@ namespace BirdsiteLive.Pipeline { BoundedCapacity = 1, CancellationToken = ct }); var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), standardBlockOptions ); var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); - var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); + var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1 } ); var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct }); var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); From 75cc1dcc2730cd06c43e36adb7e2023f7e8bd4c9 Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Sun, 19 Mar 2023 10:23:08 -0400 Subject: [PATCH 10/16] sql query tweak --- .../DataAccessLayers/FollowersPostgresDal.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/FollowersPostgresDal.cs b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/FollowersPostgresDal.cs index eff4c12..9bb99db 100644 --- a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/FollowersPostgresDal.cs +++ b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/FollowersPostgresDal.cs @@ -99,7 +99,7 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers { if (followedUserId == default) throw new ArgumentException("followedUserId"); - var query = $"SELECT * FROM {_settings.FollowersTableName} WHERE $1=ANY(followings)"; + var query = $"SELECT * FROM {_settings.FollowersTableName} WHERE followings @> ARRAY[$1]"; await using var connection = DataSource.CreateConnection(); await connection.OpenAsync(); From 1daec5577d07373651f8a106bb152a458536b0e3 Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Sun, 19 Mar 2023 11:04:03 -0400 Subject: [PATCH 11/16] tweak logging --- src/BirdsiteLive.Domain/ActivityPubService.cs | 1 - .../Processors/RetrieveTwitterUsersProcessor.cs | 2 +- .../Processors/SendTweetsToFollowersProcessor.cs | 2 ++ 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/BirdsiteLive.Domain/ActivityPubService.cs b/src/BirdsiteLive.Domain/ActivityPubService.cs index cc9e2b2..4dd0802 100644 --- a/src/BirdsiteLive.Domain/ActivityPubService.cs +++ b/src/BirdsiteLive.Domain/ActivityPubService.cs @@ -105,7 +105,6 @@ namespace BirdsiteLive.Domain var response = await client.SendAsync(httpRequestMessage); response.EnsureSuccessStatusCode(); - _logger.LogInformation("Sent tweet to " + targetHost); return response.StatusCode; } diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs index 3b78954..af721d6 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs @@ -38,7 +38,7 @@ namespace BirdsiteLive.Pipeline.Processors { var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(2000); - var userCount = users.Any() ? Math.Min(users.Length, 100) : 1; + var userCount = users.Any() ? Math.Min(users.Length, 200) : 1; var splitUsers = users.OrderBy(a => rng.Next()).ToArray().Split(userCount).ToList(); foreach (var u in splitUsers) diff --git a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs index c04a8cd..a4fc7b9 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs @@ -67,6 +67,8 @@ namespace BirdsiteLive.Pipeline.Processors { await Task.WhenAny(_todo); } + + _logger.LogInformation("Done sending " + userWithTweetsToSync.Followers.Length + "tweets for user " + userWithTweetsToSync.User.Acct); } From 62caf7e956b61bf161cb971cc6568decc3edcaab Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Sun, 19 Mar 2023 11:35:30 -0400 Subject: [PATCH 12/16] don't send full backlog of tweets on first sync --- .../Processors/RetrieveTweetsProcessor.cs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs index cfdb8a5..15c8d12 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs @@ -57,7 +57,13 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks { var tweets = await RetrieveNewTweets(user); _logger.LogInformation(index + "/" + syncTwitterUsers.Count() + " Got " + tweets.Length + " tweets from user " + user.Acct + " " ); - if (tweets.Length > 0 && user.LastTweetPostedId != -1) + if (tweets.Length > 0 && user.LastTweetPostedId == -1) + { + // skip the first time to avoid sending backlog of tweet + var tweetId = tweets.Last().Id; + await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now); + } + else if (tweets.Length > 0 && user.LastTweetPostedId != -1) { userWtData.Tweets = tweets; usersWtTweets.Add(userWtData); From 8d6851c6391d420ef7bb1c88887116b45513a0ca Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Sat, 25 Mar 2023 13:24:11 -0400 Subject: [PATCH 13/16] moved followers retrieval --- .../Processors/RetrieveFollowersProcessor.cs | 24 +++++++++---------- .../SendTweetsToFollowersProcessor.cs | 6 ++++- .../RetrieveFollowersProcessorTests.cs | 1 + 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs index 65357a9..33dc353 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs @@ -21,18 +21,18 @@ namespace BirdsiteLive.Pipeline.Processors public async Task> ProcessAsync(UserWithDataToSync[] userWithTweetsToSyncs, CancellationToken ct) { - List todo = new List(); - foreach (var user in userWithTweetsToSyncs) - { - var t = Task.Run( - async() => { - var followers = await _followersDal.GetFollowersAsync(user.User.Id); - user.Followers = followers; - }); - todo.Add(t); - } - - await Task.WhenAll(todo); + //List todo = new List(); + //foreach (var user in userWithTweetsToSyncs) + //{ + // var t = Task.Run( + // async() => { + // var followers = await _followersDal.GetFollowersAsync(user.User.Id); + // user.Followers = followers; + // }); + // todo.Add(t); + //} + // + //await Task.WhenAll(todo); return userWithTweetsToSyncs; } diff --git a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs index a4fc7b9..630d158 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs @@ -49,6 +49,10 @@ namespace BirdsiteLive.Pipeline.Processors var t = Task.Run( async () => { + if (userWithTweetsToSync.Followers is null || userWithTweetsToSync.Followers.Length == 0) + { + userWithTweetsToSync.Followers = await _followersDal.GetFollowersAsync(user.Id); + } // Process Shared Inbox var followersWtSharedInbox = userWithTweetsToSync.Followers .Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute)) @@ -60,7 +64,7 @@ namespace BirdsiteLive.Pipeline.Processors .Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute)) .ToList(); await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user); - }); + }, ct); _todo.Add(t); if (_todo.Count >= _instanceSettings.ParallelFediversePosts) diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveFollowersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveFollowersProcessorTests.cs index 4679259..8188540 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveFollowersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveFollowersProcessorTests.cs @@ -14,6 +14,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors [TestClass] public class RetrieveFollowersProcessorTests { + [Ignore] [TestMethod] public async Task ProcessAsync_Test() { From 9551c735ea5db6a01e5bc00d4e3a1c0214f69787 Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Sat, 25 Mar 2023 13:53:07 -0400 Subject: [PATCH 14/16] moved followers retrieval 2 --- .../RetrieveTwitterUsersProcessor.cs | 10 ++++- .../SendTweetsToFollowersProcessor.cs | 4 -- .../RetrieveTwitterUsersProcessorTests.cs | 43 ++++++++++++++++--- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs index af721d6..31b4ea4 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs @@ -15,15 +15,17 @@ namespace BirdsiteLive.Pipeline.Processors public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor { private readonly ITwitterUserDal _twitterUserDal; + private readonly IFollowersDal _followersDal; private readonly ILogger _logger; private static Random rng = new Random(); public int WaitFactor = 1000 * 60; //1 min #region Ctor - public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger logger) + public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, IFollowersDal followersDal, ILogger logger) { _twitterUserDal = twitterUserDal; + _followersDal = followersDal; _logger = logger; } #endregion @@ -44,7 +46,11 @@ namespace BirdsiteLive.Pipeline.Processors foreach (var u in splitUsers) { ct.ThrowIfCancellationRequested(); - UserWithDataToSync[] toSync = u.Select(x => new UserWithDataToSync { User = x }).ToArray(); + UserWithDataToSync[] toSync = await Task.WhenAll( + u.Select(async x => new UserWithDataToSync + { User = x, Followers = await _followersDal.GetFollowersAsync(x.Id) } + ) + ); await twitterUsersBufferBlock.SendAsync(toSync, ct); } diff --git a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs index 630d158..80bb25a 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs @@ -49,10 +49,6 @@ namespace BirdsiteLive.Pipeline.Processors var t = Task.Run( async () => { - if (userWithTweetsToSync.Followers is null || userWithTweetsToSync.Followers.Length == 0) - { - userWithTweetsToSync.Followers = await _followersDal.GetFollowersAsync(user.Id); - } // Process Shared Inbox var followersWtSharedInbox = userWithTweetsToSync.Followers .Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute)) diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs index 244c85f..a9cddd3 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs @@ -38,10 +38,16 @@ namespace BirdsiteLive.Pipeline.Tests.Processors It.Is(y => true))) .ReturnsAsync(users); + var followersDalMock = new Mock(MockBehavior.Strict); + followersDalMock + .Setup(x => x.GetFollowersAsync(It.Is(x => true))) + .ReturnsAsync(new Follower[] {}); + var loggerMock = new Mock>(); + #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); processor.WaitFactor = 10; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -79,10 +85,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors .ReturnsAsync(new SyncTwitterUser[0]) .ReturnsAsync(new SyncTwitterUser[0]); + var followersDalMock = new Mock(MockBehavior.Strict); + followersDalMock + .Setup(x => x.GetFollowersAsync(It.Is(x => true))) + .ReturnsAsync(new Follower[] {}); + var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); processor.WaitFactor = 2; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -120,10 +131,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors .ReturnsAsync(new SyncTwitterUser[0]) .ReturnsAsync(new SyncTwitterUser[0]); + var followersDalMock = new Mock(MockBehavior.Strict); + followersDalMock + .Setup(x => x.GetFollowersAsync(It.Is(x => true))) + .ReturnsAsync(new Follower[] {}); + var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); processor.WaitFactor = 2; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -154,10 +170,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors It.Is(y => true))) .ReturnsAsync(new SyncTwitterUser[0]); + var followersDalMock = new Mock(MockBehavior.Strict); + followersDalMock + .Setup(x => x.GetFollowersAsync(It.Is(x => true))) + .ReturnsAsync(new Follower[] {}); + var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); processor.WaitFactor = 1; var t =processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -185,10 +206,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors It.Is(y => true))) .Returns(async () => await DelayFaultedTask(new Exception())); + var followersDalMock = new Mock(MockBehavior.Strict); + followersDalMock + .Setup(x => x.GetFollowersAsync(It.Is(x => true))) + .ReturnsAsync(new Follower[] {}); + var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); processor.WaitFactor = 10; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -215,10 +241,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #region Mocks var twitterUserDalMock = new Mock(MockBehavior.Strict); + var followersDalMock = new Mock(MockBehavior.Strict); + followersDalMock + .Setup(x => x.GetFollowersAsync(It.Is(x => true))) + .ReturnsAsync(new Follower[] {}); + var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); processor.WaitFactor = 1; await processor.GetTwitterUsersAsync(buffer, canTokenS.Token); } From 46be9552e99d002c80205bbc04f8d3f3e662c54a Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Sat, 25 Mar 2023 15:26:11 -0400 Subject: [PATCH 15/16] pipeline refactoring --- .../ISendTweetsToFollowersProcessor.cs | 2 +- .../SendTweetsToFollowersProcessor.cs | 51 ++++++++++--------- .../StatusPublicationPipeline.cs | 10 ++-- .../SendTweetsToFollowersProcessorTests.cs | 24 ++++----- 4 files changed, 45 insertions(+), 42 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs index c188f55..eddadf5 100644 --- a/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs @@ -6,6 +6,6 @@ namespace BirdsiteLive.Pipeline.Contracts { public interface ISendTweetsToFollowersProcessor { - Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct); + Task ProcessAsync(UserWithDataToSync[] usersWithTweetsToSync, CancellationToken ct); } } \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs index 80bb25a..49e3bdb 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs @@ -41,34 +41,39 @@ namespace BirdsiteLive.Pipeline.Processors } #endregion - public async Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct) + public async Task ProcessAsync(UserWithDataToSync[] usersWithTweetsToSync, CancellationToken ct) { - var user = userWithTweetsToSync.User; - - _todo = _todo.Where(x => !x.IsCompleted).ToList(); - - var t = Task.Run( async () => + foreach (var userWithTweetsToSync in usersWithTweetsToSync) { - // Process Shared Inbox - var followersWtSharedInbox = userWithTweetsToSync.Followers - .Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute)) - .ToList(); - await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user); + var user = userWithTweetsToSync.User; - // Process Inbox - var followerWtInbox = userWithTweetsToSync.Followers - .Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute)) - .ToList(); - await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user); - }, ct); - _todo.Add(t); + _todo = _todo.Where(x => !x.IsCompleted).ToList(); + + var t = Task.Run( async () => + { + // Process Shared Inbox + var followersWtSharedInbox = userWithTweetsToSync.Followers + .Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute)) + .ToList(); + await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user); - if (_todo.Count >= _instanceSettings.ParallelFediversePosts) - { - await Task.WhenAny(_todo); + // Process Inbox + var followerWtInbox = userWithTweetsToSync.Followers + .Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute)) + .ToList(); + await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user); + + _logger.LogInformation("Done sending " + userWithTweetsToSync.Tweets.Length + "tweets for user " + userWithTweetsToSync.User.Acct); + }, ct); + _todo.Add(t); + + if (_todo.Count >= _instanceSettings.ParallelFediversePosts) + { + await Task.WhenAny(_todo); + } + + } - - _logger.LogInformation("Done sending " + userWithTweetsToSync.Followers.Length + "tweets for user " + userWithTweetsToSync.User.Acct); } diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index 97c4bf6..47ee70d 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -45,16 +45,14 @@ namespace BirdsiteLive.Pipeline { BoundedCapacity = 1, CancellationToken = ct }); var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), standardBlockOptions ); var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); - var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1 } ); - var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct }); - var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); + // var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1 } ); + // var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct }); + var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); // Link pipeline twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }); - retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); - retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }); - retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); + retrieveTweetsBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); // Launch twitter user retriever after a little delay // to give time for the Tweet cache to fill diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs index 06d8b67..2121831 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs @@ -88,7 +88,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new[] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -170,7 +170,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new[] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -261,7 +261,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new[] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -353,7 +353,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -450,7 +450,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -530,7 +530,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -611,7 +611,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -700,7 +700,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -790,7 +790,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -880,7 +880,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -970,7 +970,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -1065,7 +1065,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] { userWithTweets }, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); From f631e922bc271f3f674fc90af16f8a33bf9f6b1a Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Sat, 25 Mar 2023 15:44:42 -0400 Subject: [PATCH 16/16] tweak logging --- .../Processors/SendTweetsToFollowersProcessor.cs | 3 ++- .../Processors/SubTasks/SendTweetsToSharedInboxTask.cs | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs index 49e3bdb..9cfac79 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs @@ -63,7 +63,8 @@ namespace BirdsiteLive.Pipeline.Processors .ToList(); await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user); - _logger.LogInformation("Done sending " + userWithTweetsToSync.Tweets.Length + "tweets for user " + userWithTweetsToSync.User.Acct); + _logger.LogInformation("Done sending " + userWithTweetsToSync.Tweets.Length + " tweets for " + + userWithTweetsToSync.Followers.Length + "followers for user " + userWithTweetsToSync.User.Acct); }, ct); _todo.Add(t); diff --git a/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToSharedInboxTask.cs b/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToSharedInboxTask.cs index f66eeab..87966c4 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToSharedInboxTask.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToSharedInboxTask.cs @@ -43,7 +43,6 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks var tweetsToSend = tweets .OrderBy(x => x.Id) .ToList(); - _logger.LogInformation("After filtering, there were " + tweetsToSend.Count() + " tweets left to send"); foreach (var tweet in tweetsToSend) {