diff --git a/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj b/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj new file mode 100644 index 0000000..5f24b03 --- /dev/null +++ b/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj @@ -0,0 +1,15 @@ + + + + netstandard2.0 + + + + + + + + + + + diff --git a/src/BirdsiteLive.Pipeline/Contracts/IRetrieveFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveFollowersProcessor.cs new file mode 100644 index 0000000..557362f --- /dev/null +++ b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveFollowersProcessor.cs @@ -0,0 +1,12 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using BirdsiteLive.Pipeline.Models; + +namespace BirdsiteLive.Pipeline.Contracts +{ + public interface IRetrieveFollowersProcessor + { + Task> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct); + } +} \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTweetsProcessor.cs new file mode 100644 index 0000000..451f1d1 --- /dev/null +++ b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTweetsProcessor.cs @@ -0,0 +1,12 @@ +using System.Threading; +using System.Threading.Tasks; +using BirdsiteLive.DAL.Models; +using BirdsiteLive.Pipeline.Models; + +namespace BirdsiteLive.Pipeline.Contracts +{ + public interface IRetrieveTweetsProcessor + { + Task ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct); + } +} \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTwitterAccountsProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTwitterAccountsProcessor.cs new file mode 100644 index 0000000..219f74d --- /dev/null +++ b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTwitterAccountsProcessor.cs @@ -0,0 +1,7 @@ +namespace BirdsiteLive.Pipeline.Contracts +{ + public interface IRetrieveTwitterAccountsProcessor + { + + } +} \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs new file mode 100644 index 0000000..df18fa9 --- /dev/null +++ b/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs @@ -0,0 +1,11 @@ +using System.Threading; +using System.Threading.Tasks; +using BirdsiteLive.Pipeline.Models; + +namespace BirdsiteLive.Pipeline.Contracts +{ + public interface ISendTweetsToFollowersProcessor + { + Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct); + } +} \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Models/UserWithTweetsToSync.cs b/src/BirdsiteLive.Pipeline/Models/UserWithTweetsToSync.cs new file mode 100644 index 0000000..133e2a5 --- /dev/null +++ b/src/BirdsiteLive.Pipeline/Models/UserWithTweetsToSync.cs @@ -0,0 +1,12 @@ +using BirdsiteLive.DAL.Models; +using Tweetinvi.Models; + +namespace BirdsiteLive.Pipeline.Models +{ + public class UserWithTweetsToSync + { + public SyncTwitterUser User { get; set; } + public ITweet[] Tweets { get; set; } + public Follower[] Followers { get; set; } + } +} \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs new file mode 100644 index 0000000..46c658e --- /dev/null +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs @@ -0,0 +1,16 @@ +using System.Threading; +using System.Threading.Tasks; +using BirdsiteLive.DAL.Models; +using BirdsiteLive.Pipeline.Contracts; +using BirdsiteLive.Pipeline.Models; + +namespace BirdsiteLive.Pipeline.Processors +{ + public class RetrieveTweetsProcessor : IRetrieveTweetsProcessor + { + public Task ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct) + { + throw new System.NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs new file mode 100644 index 0000000..d1ddf17 --- /dev/null +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -0,0 +1,47 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using BirdsiteLive.DAL.Models; +using BirdsiteLive.Pipeline.Contracts; +using BirdsiteLive.Pipeline.Models; + +namespace BirdsiteLive.Pipeline +{ + public interface IStatusPublicationPipeline + { + Task ExecuteAsync(CancellationToken ct); + } + + public class StatusPublicationPipeline : IStatusPublicationPipeline + { + private readonly IRetrieveTwitterAccountsProcessor _retrieveTwitterAccountsProcessor; + private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor; + private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor; + private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor; + + #region Ctor + public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor) + { + _retrieveTweetsProcessor = retrieveTweetsProcessor; + } + #endregion + + public async Task ExecuteAsync(CancellationToken ct) + { + // Create blocks + var twitterUsersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct}); + var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct)); + var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); + var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct)); + var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); + var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct}); + + // Link pipeline + + // Launch twitter user retriever + + // Wait + } + } +} diff --git a/src/BirdsiteLive.sln b/src/BirdsiteLive.sln index d600aa6..5491d81 100644 --- a/src/BirdsiteLive.sln +++ b/src/BirdsiteLive.sln @@ -25,11 +25,15 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BirdsiteLive.ActivityPub.Te EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "DataAccessLayers", "DataAccessLayers", "{CFAB3509-3931-42DB-AC97-4F91FC2D849C}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BirdsiteLive.DAL", "DataAccessLayers\BirdsiteLive.DAL\BirdsiteLive.DAL.csproj", "{47058CAB-DC43-4DD1-8F68-D3D625332905}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BirdsiteLive.DAL", "DataAccessLayers\BirdsiteLive.DAL\BirdsiteLive.DAL.csproj", "{47058CAB-DC43-4DD1-8F68-D3D625332905}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BirdsiteLive.DAL.Postgres", "DataAccessLayers\BirdsiteLive.DAL.Postgres\BirdsiteLive.DAL.Postgres.csproj", "{87E46519-BBF2-437C-8A5B-CF6CDE7CDAA6}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BirdsiteLive.DAL.Postgres", "DataAccessLayers\BirdsiteLive.DAL.Postgres\BirdsiteLive.DAL.Postgres.csproj", "{87E46519-BBF2-437C-8A5B-CF6CDE7CDAA6}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BirdsiteLive.DAL.Postgres.Tests", "Tests\BirdsiteLive.DAL.Postgres.Tests\BirdsiteLive.DAL.Postgres.Tests.csproj", "{CD9489BF-69C8-4705-8774-81C45F4F8FE1}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BirdsiteLive.DAL.Postgres.Tests", "Tests\BirdsiteLive.DAL.Postgres.Tests\BirdsiteLive.DAL.Postgres.Tests.csproj", "{CD9489BF-69C8-4705-8774-81C45F4F8FE1}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Pipeline", "Pipeline", "{DA3C160C-4811-4E26-A5AD-42B81FAF2D7C}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BirdsiteLive.Pipeline", "BirdsiteLive.Pipeline\BirdsiteLive.Pipeline.csproj", "{2A8CC30D-D775-47D1-9388-F72A5C32DE2A}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -81,6 +85,10 @@ Global {CD9489BF-69C8-4705-8774-81C45F4F8FE1}.Debug|Any CPU.Build.0 = Debug|Any CPU {CD9489BF-69C8-4705-8774-81C45F4F8FE1}.Release|Any CPU.ActiveCfg = Release|Any CPU {CD9489BF-69C8-4705-8774-81C45F4F8FE1}.Release|Any CPU.Build.0 = Release|Any CPU + {2A8CC30D-D775-47D1-9388-F72A5C32DE2A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2A8CC30D-D775-47D1-9388-F72A5C32DE2A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2A8CC30D-D775-47D1-9388-F72A5C32DE2A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2A8CC30D-D775-47D1-9388-F72A5C32DE2A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -96,6 +104,7 @@ Global {47058CAB-DC43-4DD1-8F68-D3D625332905} = {CFAB3509-3931-42DB-AC97-4F91FC2D849C} {87E46519-BBF2-437C-8A5B-CF6CDE7CDAA6} = {CFAB3509-3931-42DB-AC97-4F91FC2D849C} {CD9489BF-69C8-4705-8774-81C45F4F8FE1} = {A32D3458-09D0-4E0A-BA4B-8C411B816B94} + {2A8CC30D-D775-47D1-9388-F72A5C32DE2A} = {DA3C160C-4811-4E26-A5AD-42B81FAF2D7C} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {69E8DCAD-4C37-4010-858F-5F94E6FBABCE} diff --git a/src/BirdsiteLive/BirdsiteLive.csproj b/src/BirdsiteLive/BirdsiteLive.csproj index 5757c99..2195422 100644 --- a/src/BirdsiteLive/BirdsiteLive.csproj +++ b/src/BirdsiteLive/BirdsiteLive.csproj @@ -16,6 +16,7 @@ + diff --git a/src/BirdsiteLive/Services/FederationService.cs b/src/BirdsiteLive/Services/FederationService.cs index 62f862b..f2c2e94 100644 --- a/src/BirdsiteLive/Services/FederationService.cs +++ b/src/BirdsiteLive/Services/FederationService.cs @@ -2,7 +2,7 @@ using System.Threading; using System.Threading.Tasks; using BirdsiteLive.DAL.Contracts; -using BirdsiteLive.Domain; +using BirdsiteLive.Pipeline; using Microsoft.Extensions.Hosting; namespace BirdsiteLive.Services @@ -10,25 +10,20 @@ namespace BirdsiteLive.Services public class FederationService : BackgroundService { private readonly IDbInitializerDal _dbInitializerDal; - private readonly IUserService _userService; + private readonly IStatusPublicationPipeline _statusPublicationPipeline; #region Ctor - public FederationService(IDbInitializerDal dbInitializerDal, IUserService userService) + public FederationService(IDbInitializerDal dbInitializerDal, IStatusPublicationPipeline statusPublicationPipeline) { _dbInitializerDal = dbInitializerDal; - _userService = userService; + _statusPublicationPipeline = statusPublicationPipeline; } #endregion protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await DbInitAsync(); - - for (;;) - { - Console.WriteLine("RUNNING SERVICE"); - await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); - } + await _statusPublicationPipeline.ExecuteAsync(stoppingToken); } private async Task DbInitAsync() diff --git a/src/BirdsiteLive/Startup.cs b/src/BirdsiteLive/Startup.cs index 3eaecd8..e31945f 100644 --- a/src/BirdsiteLive/Startup.cs +++ b/src/BirdsiteLive/Startup.cs @@ -67,6 +67,7 @@ namespace BirdsiteLive _.Assembly("BirdsiteLive.Domain"); _.Assembly("BirdsiteLive.DAL"); _.Assembly("BirdsiteLive.DAL.Postgres"); + _.Assembly("BirdsiteLive.Pipeline"); _.TheCallingAssembly(); //_.AssemblyContainingType();