diff --git a/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj b/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj index 5f24b03..d1da203 100644 --- a/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj +++ b/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj @@ -2,13 +2,16 @@ netstandard2.0 + latest + + diff --git a/src/BirdsiteLive.Pipeline/Contracts/IRetrieveFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveFollowersProcessor.cs index 557362f..e0d45dc 100644 --- a/src/BirdsiteLive.Pipeline/Contracts/IRetrieveFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveFollowersProcessor.cs @@ -8,5 +8,6 @@ namespace BirdsiteLive.Pipeline.Contracts public interface IRetrieveFollowersProcessor { Task> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct); + //IAsyncEnumerable ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct); } } \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTwitterAccountsProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTwitterAccountsProcessor.cs index 219f74d..b71ae93 100644 --- a/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTwitterAccountsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTwitterAccountsProcessor.cs @@ -1,7 +1,12 @@ -namespace BirdsiteLive.Pipeline.Contracts +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using BirdsiteLive.DAL.Models; + +namespace BirdsiteLive.Pipeline.Contracts { - public interface IRetrieveTwitterAccountsProcessor + public interface IRetrieveTwitterUsersProcessor { - + Task GetTwitterUsersAsync(BufferBlock twitterUsersBufferBlock, CancellationToken ct); } } \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs new file mode 100644 index 0000000..4b2f150 --- /dev/null +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs @@ -0,0 +1,33 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using BirdsiteLive.DAL.Contracts; +using BirdsiteLive.Pipeline.Contracts; +using BirdsiteLive.Pipeline.Models; + +namespace BirdsiteLive.Pipeline.Processors +{ + public class RetrieveFollowersProcessor : IRetrieveFollowersProcessor + { + private readonly IFollowersDal _followersDal; + + #region Ctor + public RetrieveFollowersProcessor(IFollowersDal followersDal) + { + _followersDal = followersDal; + } + #endregion + + public async Task> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct) + { + //TODO multithread this + foreach (var user in userWithTweetsToSyncs) + { + var followers = await _followersDal.GetFollowersAsync(user.User.Id); + user.Followers = followers; + } + + return userWithTweetsToSyncs; + } + } +} \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs index 46c658e..22416be 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs @@ -1,16 +1,65 @@ -using System.Threading; +using System.Collections.Generic; +using System.Linq; +using System.Threading; using System.Threading.Tasks; +using BirdsiteLive.DAL.Contracts; using BirdsiteLive.DAL.Models; using BirdsiteLive.Pipeline.Contracts; using BirdsiteLive.Pipeline.Models; +using BirdsiteLive.Twitter; +using Tweetinvi.Models; namespace BirdsiteLive.Pipeline.Processors { public class RetrieveTweetsProcessor : IRetrieveTweetsProcessor { - public Task ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct) + private readonly ITwitterService _twitterService; + private readonly ITwitterUserDal _twitterUserDal; + + #region Ctor + public RetrieveTweetsProcessor(ITwitterService twitterService, ITwitterUserDal twitterUserDal) { - throw new System.NotImplementedException(); + _twitterService = twitterService; + _twitterUserDal = twitterUserDal; + } + #endregion + + public async Task ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct) + { + var usersWtTweets = new List(); + + //TODO multithread this + foreach (var user in syncTwitterUsers) + { + var tweets = RetrieveNewTweets(user); + if (tweets.Length > 0 && user.LastTweetPostedId != -1) + { + var userWtTweets = new UserWithTweetsToSync + { + User = user, + Tweets = tweets + }; + usersWtTweets.Add(userWtTweets); + } + else if (tweets.Length > 0 && user.LastTweetPostedId == -1) + { + var tweetId = tweets.First().Id; + await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId); + } + } + + return usersWtTweets.ToArray(); + } + + private ITweet[] RetrieveNewTweets(SyncTwitterUser user) + { + ITweet[] tweets; + if (user.LastTweetPostedId == -1) + tweets = _twitterService.GetTimeline(user.Acct, 1); + else + tweets = _twitterService.GetTimeline(user.Acct, 200, user.LastTweetSynchronizedForAllFollowersId); + + return tweets; } } } \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterAccountsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterAccountsProcessor.cs new file mode 100644 index 0000000..dcc9d6b --- /dev/null +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterAccountsProcessor.cs @@ -0,0 +1,43 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using BirdsiteLive.DAL.Contracts; +using BirdsiteLive.DAL.Models; +using BirdsiteLive.Pipeline.Contracts; + +namespace BirdsiteLive.Pipeline.Processors +{ + public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor + { + private readonly ITwitterUserDal _twitterUserDal; + private const int SyncPeriod = 15; //in minutes + + #region Ctor + public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal) + { + _twitterUserDal = twitterUserDal; + } + #endregion + + public async Task GetTwitterUsersAsync(BufferBlock twitterUsersBufferBlock, CancellationToken ct) + { + for (;;) + { + ct.ThrowIfCancellationRequested(); + + try + { + var users = await _twitterUserDal.GetAllTwitterUsersAsync(); + await twitterUsersBufferBlock.SendAsync(users, ct); + } + catch (Exception e) + { + Console.WriteLine(e); + } + + await Task.Delay(SyncPeriod * 1000 * 60, ct); + } + } + } +} \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs new file mode 100644 index 0000000..51bcb39 --- /dev/null +++ b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs @@ -0,0 +1,15 @@ +using System.Threading; +using System.Threading.Tasks; +using BirdsiteLive.Pipeline.Contracts; +using BirdsiteLive.Pipeline.Models; + +namespace BirdsiteLive.Pipeline.Processors +{ + public class SendTweetsToFollowersProcessor : ISendTweetsToFollowersProcessor + { + public Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct) + { + throw new System.NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index d1ddf17..0e0da40 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -15,15 +15,18 @@ namespace BirdsiteLive.Pipeline public class StatusPublicationPipeline : IStatusPublicationPipeline { - private readonly IRetrieveTwitterAccountsProcessor _retrieveTwitterAccountsProcessor; + private readonly IRetrieveTwitterUsersProcessor _retrieveTwitterAccountsProcessor; private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor; private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor; private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor; #region Ctor - public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor) + public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor) { _retrieveTweetsProcessor = retrieveTweetsProcessor; + _retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor; + _retrieveFollowersProcessor = retrieveFollowersProcessor; + _sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor; } #endregion @@ -36,12 +39,24 @@ namespace BirdsiteLive.Pipeline var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct)); var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct}); - + // Link pipeline + twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions {PropagateCompletion = true}); + retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock); + retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock); + retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock); + retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock); // Launch twitter user retriever + var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUsersBufferBlock, ct); // Wait + await Task.WhenAll(retrieveTwitterAccountsTask, sendTweetsToFollowersBlock.Completion); + + var foreground = Console.ForegroundColor; + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine("An error occured, pipeline stopped"); + Console.ForegroundColor = foreground; } } } diff --git a/src/BirdsiteLive.Twitter/TwitterService.cs b/src/BirdsiteLive.Twitter/TwitterService.cs index 3bec34a..4c8878d 100644 --- a/src/BirdsiteLive.Twitter/TwitterService.cs +++ b/src/BirdsiteLive.Twitter/TwitterService.cs @@ -1,9 +1,12 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using BirdsiteLive.Common.Settings; using BirdsiteLive.Twitter.Models; using Tweetinvi; using Tweetinvi.Models; +using Tweetinvi.Parameters; namespace BirdsiteLive.Twitter { @@ -11,6 +14,7 @@ namespace BirdsiteLive.Twitter { TwitterUser GetUser(string username); ITweet GetTweet(long statusId); + ITweet[] GetTimeline(string username, int nberTweets, long fromTweetId = -1); } public class TwitterService : ITwitterService @@ -21,13 +25,13 @@ namespace BirdsiteLive.Twitter public TwitterService(TwitterSettings settings) { _settings = settings; + Auth.SetApplicationOnlyCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, true); } #endregion public TwitterUser GetUser(string username) { - //Auth.SetUserCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, _settings.AccessToken, _settings.AccessTokenSecret); - Auth.SetApplicationOnlyCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, true); + //Auth.SetApplicationOnlyCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, true); var user = User.GetUserFromScreenName(username); if (user == null) return null; @@ -45,9 +49,36 @@ namespace BirdsiteLive.Twitter public ITweet GetTweet(long statusId) { - Auth.SetApplicationOnlyCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, true); + //Auth.SetApplicationOnlyCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, true); var tweet = Tweet.GetTweet(statusId); return tweet; } + + public ITweet[] GetTimeline(string username, int nberTweets, long fromTweetId = -1) + { + //Auth.SetApplicationOnlyCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, true); + TweetinviConfig.CurrentThreadSettings.TweetMode = TweetMode.Extended; + + var user = User.GetUserFromScreenName(username); + var tweets = new List(); + if (fromTweetId == -1) + { + var timeline = Timeline.GetUserTimeline(user.Id, nberTweets); + if (timeline != null) tweets.AddRange(timeline); + } + else + { + var timelineRequestParameters = new UserTimelineParameters + { + SinceId = fromTweetId, + MaximumNumberOfTweetsToRetrieve = nberTweets + }; + var timeline = Timeline.GetUserTimeline(user.Id, timelineRequestParameters); + if (timeline != null) tweets.AddRange(timeline); + } + + return tweets.ToArray(); + //return tweets.Where(x => returnReplies || string.IsNullOrWhiteSpace(x.InReplyToScreenName)).ToArray(); + } } }