diff --git a/src/BirdsiteLive.Common/Extensions/EnumerableExt.cs b/src/BirdsiteLive.Common/Extensions/EnumerableExt.cs new file mode 100644 index 0000000..51b117b --- /dev/null +++ b/src/BirdsiteLive.Common/Extensions/EnumerableExt.cs @@ -0,0 +1,16 @@ +using System.Collections.Generic; +using System.Linq; + +namespace BirdsiteLive.Common.Extensions +{ + public static class EnumerableExt + { + public static IEnumerable> Split(this T[] array, int size) + { + for (var i = 0; i < (float)array.Length / size; i++) + { + yield return array.Skip(i * size).Take(size); + } + } + } +} \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs index f11b277..f556831 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs @@ -1,7 +1,9 @@ using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; +using BirdsiteLive.Common.Extensions; using BirdsiteLive.DAL.Contracts; using BirdsiteLive.DAL.Models; using BirdsiteLive.Pipeline.Contracts; @@ -13,7 +15,7 @@ namespace BirdsiteLive.Pipeline.Processors { private readonly ITwitterUserDal _twitterUserDal; private readonly ILogger _logger; - private const int SyncPeriod = 15; //in minutes + public int WaitFactor = 1000 * 60; //1 min #region Ctor public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger logger) @@ -25,7 +27,7 @@ namespace BirdsiteLive.Pipeline.Processors public async Task GetTwitterUsersAsync(BufferBlock twitterUsersBufferBlock, CancellationToken ct) { - for (;;) + for (; ; ) { ct.ThrowIfCancellationRequested(); @@ -33,15 +35,26 @@ namespace BirdsiteLive.Pipeline.Processors { var users = await _twitterUserDal.GetAllTwitterUsersAsync(); - if(users.Length > 0) - await twitterUsersBufferBlock.SendAsync(users, ct); + var userCount = users.Any() ? users.Length : 1; + var splitNumber = (int) Math.Ceiling(userCount / 15d); + var splitUsers = users.Split(splitNumber).ToList(); + + foreach (var u in splitUsers) + { + ct.ThrowIfCancellationRequested(); + + await twitterUsersBufferBlock.SendAsync(u.ToArray(), ct); + + await Task.Delay(WaitFactor, ct); + } + + var splitCount = splitUsers.Count(); + if (splitCount < 15) await Task.Delay((15 - splitCount) * WaitFactor, ct); } catch (Exception e) { _logger.LogError(e, "Failing retrieving Twitter Users."); } - - await Task.Delay(SyncPeriod * 1000 * 60, ct); } } } diff --git a/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToInboxTask.cs b/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToInboxTask.cs index 7e0835d..2fc93fe 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToInboxTask.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToInboxTask.cs @@ -7,6 +7,7 @@ using BirdsiteLive.DAL.Contracts; using BirdsiteLive.DAL.Models; using BirdsiteLive.Domain; using BirdsiteLive.Twitter.Models; +using Microsoft.Extensions.Logging; namespace BirdsiteLive.Pipeline.Processors.SubTasks { @@ -20,13 +21,16 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks private readonly IActivityPubService _activityPubService; private readonly IStatusService _statusService; private readonly IFollowersDal _followersDal; + private readonly ILogger _logger; + #region Ctor - public SendTweetsToInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal) + public SendTweetsToInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal, ILogger logger) { _activityPubService = activityPubService; _statusService = statusService; _followersDal = followersDal; + _logger = logger; } #endregion @@ -46,8 +50,23 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks { foreach (var tweet in tweetsToSend) { - var note = _statusService.GetStatus(user.Acct, tweet); - await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), follower.Host, inbox); + try + { + var note = _statusService.GetStatus(user.Acct, tweet); + await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), follower.Host, inbox); + } + catch (ArgumentException e) + { + if (e.Message.Contains("Invalid pattern") && e.Message.Contains("at offset")) //Regex exception + { + _logger.LogError(e, "Can't parse {MessageContent} from Tweet {Id}", tweet.MessageContent, tweet.Id); + } + else + { + throw; + } + } + syncStatus = tweet.Id; } } diff --git a/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToSharedInboxTask.cs b/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToSharedInboxTask.cs index c620910..dcc6aca 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToSharedInboxTask.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SubTasks/SendTweetsToSharedInboxTask.cs @@ -6,6 +6,7 @@ using BirdsiteLive.DAL.Contracts; using BirdsiteLive.DAL.Models; using BirdsiteLive.Domain; using BirdsiteLive.Twitter.Models; +using Microsoft.Extensions.Logging; namespace BirdsiteLive.Pipeline.Processors.SubTasks { @@ -19,13 +20,15 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks private readonly IStatusService _statusService; private readonly IActivityPubService _activityPubService; private readonly IFollowersDal _followersDal; + private readonly ILogger _logger; #region Ctor - public SendTweetsToSharedInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal) + public SendTweetsToSharedInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal, ILogger logger) { _activityPubService = activityPubService; _statusService = statusService; _followersDal = followersDal; + _logger = logger; } #endregion @@ -47,8 +50,23 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks { foreach (var tweet in tweetsToSend) { - var note = _statusService.GetStatus(user.Acct, tweet); - await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), host, inbox); + try + { + var note = _statusService.GetStatus(user.Acct, tweet); + await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), host, inbox); + } + catch (ArgumentException e) + { + if (e.Message.Contains("Invalid pattern") && e.Message.Contains("at offset")) //Regex exception + { + _logger.LogError(e, "Can't parse {MessageContent} from Tweet {Id}", tweet.MessageContent, tweet.Id); + } + else + { + throw; + } + } + syncStatus = tweet.Id; } } diff --git a/src/BirdsiteLive/BirdsiteLive.csproj b/src/BirdsiteLive/BirdsiteLive.csproj index 34a2392..01e0094 100644 --- a/src/BirdsiteLive/BirdsiteLive.csproj +++ b/src/BirdsiteLive/BirdsiteLive.csproj @@ -4,7 +4,7 @@ netcoreapp3.1 d21486de-a812-47eb-a419-05682bb68856 Linux - 0.9.1 + 0.10.0 diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs index 356a479..6cbd008 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; @@ -37,13 +38,84 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + processor.WaitFactor = 10; processor.GetTwitterUsersAsync(buffer, CancellationToken.None); await Task.Delay(50); #region Validations twitterUserDalMock.VerifyAll(); - Assert.AreEqual(1, buffer.Count); + Assert.AreEqual(3, buffer.Count); + buffer.TryReceive(out var result); + Assert.AreEqual(1, result.Length); + #endregion + } + + [TestMethod] + public async Task GetTwitterUsersAsync_Multi_Test() + { + #region Stubs + var buffer = new BufferBlock(); + var users = new List(); + + for (var i = 0; i < 30; i++) + users.Add(new SyncTwitterUser()); + #endregion + + #region Mocks + var twitterUserDalMock = new Mock(MockBehavior.Strict); + twitterUserDalMock + .SetupSequence(x => x.GetAllTwitterUsersAsync()) + .ReturnsAsync(users.ToArray()) + .ReturnsAsync(new SyncTwitterUser[0]); + + var loggerMock = new Mock>(); + #endregion + + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + processor.WaitFactor = 2; + processor.GetTwitterUsersAsync(buffer, CancellationToken.None); + + await Task.Delay(200); + + #region Validations + twitterUserDalMock.VerifyAll(); + Assert.AreEqual(15, buffer.Count); + buffer.TryReceive(out var result); + Assert.AreEqual(2, result.Length); + #endregion + } + + [TestMethod] + public async Task GetTwitterUsersAsync_Multi2_Test() + { + #region Stubs + var buffer = new BufferBlock(); + var users = new List(); + + for (var i = 0; i < 31; i++) + users.Add(new SyncTwitterUser()); + #endregion + + #region Mocks + var twitterUserDalMock = new Mock(MockBehavior.Strict); + twitterUserDalMock + .SetupSequence(x => x.GetAllTwitterUsersAsync()) + .ReturnsAsync(users.ToArray()) + .ReturnsAsync(new SyncTwitterUser[0]); + + var loggerMock = new Mock>(); + #endregion + + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + processor.WaitFactor = 2; + processor.GetTwitterUsersAsync(buffer, CancellationToken.None); + + await Task.Delay(200); + + #region Validations + twitterUserDalMock.VerifyAll(); + Assert.AreEqual(11, buffer.Count); buffer.TryReceive(out var result); Assert.AreEqual(3, result.Length); #endregion @@ -66,6 +138,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + processor.WaitFactor = 1; processor.GetTwitterUsersAsync(buffer, CancellationToken.None); await Task.Delay(50); @@ -75,8 +148,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors Assert.AreEqual(0, buffer.Count); #endregion } - - + [TestMethod] public async Task GetTwitterUsersAsync_Exception_Test() { @@ -88,12 +160,13 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var twitterUserDalMock = new Mock(MockBehavior.Strict); twitterUserDalMock .Setup(x => x.GetAllTwitterUsersAsync()) - .Throws(new Exception()); + .Returns(async () => await DelayFaultedTask(new Exception())); var loggerMock = new Mock>(); #endregion var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + processor.WaitFactor = 10; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); await Task.WhenAny(t, Task.Delay(50)); @@ -104,7 +177,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion } - [TestMethod] [ExpectedException(typeof(OperationCanceledException))] public async Task GetTwitterUsersAsync_Cancellation_Test() @@ -121,7 +193,14 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + processor.WaitFactor = 1; await processor.GetTwitterUsersAsync(buffer, canTokenS.Token); } + + private static async Task DelayFaultedTask(Exception e) + { + await Task.Delay(30); + throw e; + } } } \ No newline at end of file diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToInboxTaskTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToInboxTaskTests.cs index 36688f6..23c9fe3 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToInboxTaskTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToInboxTaskTests.cs @@ -9,6 +9,7 @@ using BirdsiteLive.DAL.Models; using BirdsiteLive.Domain; using BirdsiteLive.Pipeline.Processors.SubTasks; using BirdsiteLive.Twitter.Models; +using Microsoft.Extensions.Logging; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -79,9 +80,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId))) .Returns(Task.CompletedTask); + var loggerMock = new Mock>(); #endregion - var task = new SendTweetsToInboxTask(activityPubService.Object, statusServiceMock.Object, followersDalMock.Object); + var task = new SendTweetsToInboxTask(activityPubService.Object, statusServiceMock.Object, followersDalMock.Object, loggerMock.Object); await task.ExecuteAsync(tweets.ToArray(), follower, twitterUser); #region Validations @@ -156,9 +158,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId3))) .Returns(Task.CompletedTask); + var loggerMock = new Mock>(); #endregion - var task = new SendTweetsToInboxTask(activityPubService.Object, statusServiceMock.Object, followersDalMock.Object); + var task = new SendTweetsToInboxTask(activityPubService.Object, statusServiceMock.Object, followersDalMock.Object, loggerMock.Object); await task.ExecuteAsync(tweets.ToArray(), follower, twitterUser); #region Validations @@ -241,9 +244,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId2))) .Returns(Task.CompletedTask); + var loggerMock = new Mock>(); #endregion - var task = new SendTweetsToInboxTask(activityPubService.Object, statusServiceMock.Object, followersDalMock.Object); + var task = new SendTweetsToInboxTask(activityPubService.Object, statusServiceMock.Object, followersDalMock.Object, loggerMock.Object); try { diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToSharedInboxTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToSharedInboxTests.cs index 1909108..5ae9c7f 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToSharedInboxTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SubTasks/SendTweetsToSharedInboxTests.cs @@ -10,6 +10,7 @@ using BirdsiteLive.DAL.Models; using BirdsiteLive.Domain; using BirdsiteLive.Pipeline.Processors.SubTasks; using BirdsiteLive.Twitter.Models; +using Microsoft.Extensions.Logging; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -100,9 +101,11 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId))) .Returns(Task.CompletedTask); } + + var loggerMock = new Mock>(); #endregion - var task = new SendTweetsToSharedInboxTask(activityPubService.Object, statusServiceMock.Object, followersDalMock.Object); + var task = new SendTweetsToSharedInboxTask(activityPubService.Object, statusServiceMock.Object, followersDalMock.Object, loggerMock.Object); await task.ExecuteAsync(tweets.ToArray(), twitterUser, host, followers.ToArray()); #region Validations @@ -197,9 +200,11 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId3))) .Returns(Task.CompletedTask); } + + var loggerMock = new Mock>(); #endregion - var task = new SendTweetsToSharedInboxTask(activityPubService.Object, statusServiceMock.Object, followersDalMock.Object); + var task = new SendTweetsToSharedInboxTask(activityPubService.Object, statusServiceMock.Object, followersDalMock.Object, loggerMock.Object); await task.ExecuteAsync(tweets.ToArray(), twitterUser, host, followers.ToArray()); #region Validations @@ -302,9 +307,11 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks It.Is(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId2))) .Returns(Task.CompletedTask); } + + var loggerMock = new Mock>(); #endregion - var task = new SendTweetsToSharedInboxTask(activityPubService.Object, statusServiceMock.Object, followersDalMock.Object); + var task = new SendTweetsToSharedInboxTask(activityPubService.Object, statusServiceMock.Object, followersDalMock.Object, loggerMock.Object); try {