starting pipeline implementation

This commit is contained in:
Nicolas Constant 2020-07-18 23:35:19 -04:00
parent d13f60ec3c
commit d91ddd4204
No known key found for this signature in database
GPG Key ID: 1E9F677FB01A5688
9 changed files with 207 additions and 12 deletions

View File

@ -2,13 +2,16 @@
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.0.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\BirdsiteLive.Twitter\BirdsiteLive.Twitter.csproj" />
<ProjectReference Include="..\DataAccessLayers\BirdsiteLive.DAL\BirdsiteLive.DAL.csproj" />
</ItemGroup>

View File

@ -8,5 +8,6 @@ namespace BirdsiteLive.Pipeline.Contracts
public interface IRetrieveFollowersProcessor
{
Task<IEnumerable<UserWithTweetsToSync>> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct);
//IAsyncEnumerable<UserWithTweetsToSync> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct);
}
}

View File

@ -1,7 +1,12 @@
namespace BirdsiteLive.Pipeline.Contracts
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.DAL.Models;
namespace BirdsiteLive.Pipeline.Contracts
{
public interface IRetrieveTwitterAccountsProcessor
public interface IRetrieveTwitterUsersProcessor
{
Task GetTwitterUsersAsync(BufferBlock<SyncTwitterUser[]> twitterUsersBufferBlock, CancellationToken ct);
}
}

View File

@ -0,0 +1,33 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Models;
namespace BirdsiteLive.Pipeline.Processors
{
public class RetrieveFollowersProcessor : IRetrieveFollowersProcessor
{
private readonly IFollowersDal _followersDal;
#region Ctor
public RetrieveFollowersProcessor(IFollowersDal followersDal)
{
_followersDal = followersDal;
}
#endregion
public async Task<IEnumerable<UserWithTweetsToSync>> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct)
{
//TODO multithread this
foreach (var user in userWithTweetsToSyncs)
{
var followers = await _followersDal.GetFollowersAsync(user.User.Id);
user.Followers = followers;
}
return userWithTweetsToSyncs;
}
}
}

View File

@ -1,16 +1,65 @@
using System.Threading;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Models;
using BirdsiteLive.Twitter;
using Tweetinvi.Models;
namespace BirdsiteLive.Pipeline.Processors
{
public class RetrieveTweetsProcessor : IRetrieveTweetsProcessor
{
public Task<UserWithTweetsToSync[]> ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct)
private readonly ITwitterService _twitterService;
private readonly ITwitterUserDal _twitterUserDal;
#region Ctor
public RetrieveTweetsProcessor(ITwitterService twitterService, ITwitterUserDal twitterUserDal)
{
throw new System.NotImplementedException();
_twitterService = twitterService;
_twitterUserDal = twitterUserDal;
}
#endregion
public async Task<UserWithTweetsToSync[]> ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct)
{
var usersWtTweets = new List<UserWithTweetsToSync>();
//TODO multithread this
foreach (var user in syncTwitterUsers)
{
var tweets = RetrieveNewTweets(user);
if (tweets.Length > 0 && user.LastTweetPostedId != -1)
{
var userWtTweets = new UserWithTweetsToSync
{
User = user,
Tweets = tweets
};
usersWtTweets.Add(userWtTweets);
}
else if (tweets.Length > 0 && user.LastTweetPostedId == -1)
{
var tweetId = tweets.First().Id;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId);
}
}
return usersWtTweets.ToArray();
}
private ITweet[] RetrieveNewTweets(SyncTwitterUser user)
{
ITweet[] tweets;
if (user.LastTweetPostedId == -1)
tweets = _twitterService.GetTimeline(user.Acct, 1);
else
tweets = _twitterService.GetTimeline(user.Acct, 200, user.LastTweetSynchronizedForAllFollowersId);
return tweets;
}
}
}

View File

@ -0,0 +1,43 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
namespace BirdsiteLive.Pipeline.Processors
{
public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor
{
private readonly ITwitterUserDal _twitterUserDal;
private const int SyncPeriod = 15; //in minutes
#region Ctor
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal)
{
_twitterUserDal = twitterUserDal;
}
#endregion
public async Task GetTwitterUsersAsync(BufferBlock<SyncTwitterUser[]> twitterUsersBufferBlock, CancellationToken ct)
{
for (;;)
{
ct.ThrowIfCancellationRequested();
try
{
var users = await _twitterUserDal.GetAllTwitterUsersAsync();
await twitterUsersBufferBlock.SendAsync(users, ct);
}
catch (Exception e)
{
Console.WriteLine(e);
}
await Task.Delay(SyncPeriod * 1000 * 60, ct);
}
}
}
}

View File

@ -0,0 +1,15 @@
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Models;
namespace BirdsiteLive.Pipeline.Processors
{
public class SendTweetsToFollowersProcessor : ISendTweetsToFollowersProcessor
{
public Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct)
{
throw new System.NotImplementedException();
}
}
}

View File

@ -15,15 +15,18 @@ namespace BirdsiteLive.Pipeline
public class StatusPublicationPipeline : IStatusPublicationPipeline
{
private readonly IRetrieveTwitterAccountsProcessor _retrieveTwitterAccountsProcessor;
private readonly IRetrieveTwitterUsersProcessor _retrieveTwitterAccountsProcessor;
private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor;
private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor;
private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor;
#region Ctor
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor)
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor)
{
_retrieveTweetsProcessor = retrieveTweetsProcessor;
_retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor;
_retrieveFollowersProcessor = retrieveFollowersProcessor;
_sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor;
}
#endregion
@ -36,12 +39,24 @@ namespace BirdsiteLive.Pipeline
var retrieveFollowersBlock = new TransformManyBlock<UserWithTweetsToSync[], UserWithTweetsToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct));
var retrieveFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var sendTweetsToFollowersBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct});
// Link pipeline
twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions {PropagateCompletion = true});
retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock);
retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock);
retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock);
retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock);
// Launch twitter user retriever
var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUsersBufferBlock, ct);
// Wait
await Task.WhenAll(retrieveTwitterAccountsTask, sendTweetsToFollowersBlock.Completion);
var foreground = Console.ForegroundColor;
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("An error occured, pipeline stopped");
Console.ForegroundColor = foreground;
}
}
}

View File

@ -1,9 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.Twitter.Models;
using Tweetinvi;
using Tweetinvi.Models;
using Tweetinvi.Parameters;
namespace BirdsiteLive.Twitter
{
@ -11,6 +14,7 @@ namespace BirdsiteLive.Twitter
{
TwitterUser GetUser(string username);
ITweet GetTweet(long statusId);
ITweet[] GetTimeline(string username, int nberTweets, long fromTweetId = -1);
}
public class TwitterService : ITwitterService
@ -21,13 +25,13 @@ namespace BirdsiteLive.Twitter
public TwitterService(TwitterSettings settings)
{
_settings = settings;
Auth.SetApplicationOnlyCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, true);
}
#endregion
public TwitterUser GetUser(string username)
{
//Auth.SetUserCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, _settings.AccessToken, _settings.AccessTokenSecret);
Auth.SetApplicationOnlyCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, true);
//Auth.SetApplicationOnlyCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, true);
var user = User.GetUserFromScreenName(username);
if (user == null) return null;
@ -45,9 +49,36 @@ namespace BirdsiteLive.Twitter
public ITweet GetTweet(long statusId)
{
Auth.SetApplicationOnlyCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, true);
//Auth.SetApplicationOnlyCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, true);
var tweet = Tweet.GetTweet(statusId);
return tweet;
}
public ITweet[] GetTimeline(string username, int nberTweets, long fromTweetId = -1)
{
//Auth.SetApplicationOnlyCredentials(_settings.ConsumerKey, _settings.ConsumerSecret, true);
TweetinviConfig.CurrentThreadSettings.TweetMode = TweetMode.Extended;
var user = User.GetUserFromScreenName(username);
var tweets = new List<ITweet>();
if (fromTweetId == -1)
{
var timeline = Timeline.GetUserTimeline(user.Id, nberTweets);
if (timeline != null) tweets.AddRange(timeline);
}
else
{
var timelineRequestParameters = new UserTimelineParameters
{
SinceId = fromTweetId,
MaximumNumberOfTweetsToRetrieve = nberTweets
};
var timeline = Timeline.GetUserTimeline(user.Id, timelineRequestParameters);
if (timeline != null) tweets.AddRange(timeline);
}
return tweets.ToArray();
//return tweets.Where(x => returnReplies || string.IsNullOrWhiteSpace(x.InReplyToScreenName)).ToArray();
}
}
}