From afa05a72d203ff6b44e913f90f07ee61eb3a2c0c Mon Sep 17 00:00:00 2001 From: Nicolas Constant Date: Wed, 12 Aug 2020 18:34:01 -0400 Subject: [PATCH] added shared inbox publication --- .../SendTweetsToFollowersProcessor.cs | 105 ++++++++++++++++-- 1 file changed, 94 insertions(+), 11 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs index 7a06c8a..8c5f6b9 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; +using System.Xml; using BirdsiteLive.DAL.Contracts; using BirdsiteLive.DAL.Models; using BirdsiteLive.Domain; @@ -33,13 +34,31 @@ namespace BirdsiteLive.Pipeline.Processors public async Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct) { var user = userWithTweetsToSync.User; - var userId = user.Id; - foreach (var follower in userWithTweetsToSync.Followers) + // Process Shared Inbox + var followersWtSharedInbox = userWithTweetsToSync.Followers + .Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute)) + .ToList(); + await ProcessFollowersWithSharedInbox(userWithTweetsToSync.Tweets, followersWtSharedInbox, user); + + // Process Inbox + var followerWtInbox = userWithTweetsToSync.Followers + .Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute)) + .ToList(); + await ProcessFollowersWithInbox(userWithTweetsToSync.Tweets, followerWtInbox, user); + + return userWithTweetsToSync; + } + + private async Task ProcessFollowersWithSharedInbox(ExtractedTweet[] tweets, List followers, SyncTwitterUser user) + { + var followersPerInstances = followers.GroupBy(x => x.Host); + + foreach (var followersPerInstance in followersPerInstances) { try { - await ProcessFollowerAsync(userWithTweetsToSync.Tweets, follower, userId, user); + await ProcessInstanceFollowersWithSharedInbox(tweets, user, followersPerInstance); } catch (Exception e) { @@ -47,17 +66,81 @@ namespace BirdsiteLive.Pipeline.Processors //TODO handle error } } - - return userWithTweetsToSync; } - private async Task ProcessFollowerAsync(IEnumerable tweets, Follower follower, int userId, SyncTwitterUser user) + private async Task ProcessInstanceFollowersWithSharedInbox(ExtractedTweet[] tweets, SyncTwitterUser user, + IGrouping followersPerInstance) { + var userId = user.Id; + var host = followersPerInstance.Key; + var groupedFollowers = followersPerInstance.ToList(); + var inbox = groupedFollowers.First().SharedInboxRoute; + + var fromStatusId = groupedFollowers + .Max(x => x.FollowingsSyncStatus[userId]); + + var tweetsToSend = tweets + .Where(x => x.Id > fromStatusId) + .OrderBy(x => x.Id) + .ToList(); + + var syncStatus = fromStatusId; + try + { + foreach (var tweet in tweetsToSend) + { + var note = _statusService.GetStatus(user.Acct, tweet); + var result = + await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), host, inbox); + + if (result == HttpStatusCode.Accepted) + syncStatus = tweet.Id; + else + throw new Exception("Posting new note activity failed"); + } + } + finally + { + if (syncStatus != fromStatusId) + { + foreach (var f in groupedFollowers) + { + f.FollowingsSyncStatus[userId] = syncStatus; + await _followersDal.UpdateFollowerAsync(f); + } + } + } + } + + private async Task ProcessFollowersWithInbox(ExtractedTweet[] tweets, List followerWtInbox, SyncTwitterUser user) + { + foreach (var follower in followerWtInbox) + { + try + { + await ProcessFollowerWithInboxAsync(tweets, follower, user); + } + catch (Exception e) + { + Console.WriteLine(e); + //TODO handle error + } + } + } + + private async Task ProcessFollowerWithInboxAsync(IEnumerable tweets, Follower follower, SyncTwitterUser user) + { + var userId = user.Id; var fromStatusId = follower.FollowingsSyncStatus[userId]; - var tweetsToSend = tweets.Where(x => x.Id > fromStatusId).OrderBy(x => x.Id).ToList(); - var inbox = string.IsNullOrWhiteSpace(follower.SharedInboxRoute) - ? follower.InboxRoute - : follower.SharedInboxRoute; + var tweetsToSend = tweets + .Where(x => x.Id > fromStatusId) + .OrderBy(x => x.Id) + .ToList(); + + var inbox = follower.InboxRoute; + //var inbox = string.IsNullOrWhiteSpace(follower.SharedInboxRoute) + // ? follower.InboxRoute + // : follower.SharedInboxRoute; var syncStatus = fromStatusId; try @@ -67,7 +150,7 @@ namespace BirdsiteLive.Pipeline.Processors var note = _statusService.GetStatus(user.Acct, tweet); var result = await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), follower.Host, inbox); - if (result == HttpStatusCode.Accepted) + if (result == HttpStatusCode.Accepted) syncStatus = tweet.Id; else throw new Exception("Posting new note activity failed");