merge dev

This commit is contained in:
Vincent Cloutier 2023-03-26 12:16:10 -04:00
commit a527d3e342
21 changed files with 204 additions and 244 deletions

View File

@ -14,7 +14,8 @@
public int FailingTwitterUserCleanUpThreshold { get; set; }
public int FailingFollowerCleanUpThreshold { get; set; } = -1;
public int UserCacheCapacity { get; set; }
public int UserCacheCapacity { get; set; } = 20_000;
public int TweetCacheCapacity { get; set; } = 20_000;
public int ParallelTwitterRequests { get; set; } = 10;
public int ParallelFediversePosts { get; set; } = 10;
}

View File

@ -135,7 +135,6 @@ namespace BirdsiteLive.Domain
var response = await client.SendAsync(httpRequestMessage);
response.EnsureSuccessStatusCode();
_logger.LogInformation("Sent tweet to " + targetHost);
return response.StatusCode;
}

View File

@ -186,6 +186,7 @@ namespace BirdsiteLive.Domain
var result = await _activityPubService.PostDataAsync(acceptFollow, followerHost, activity.apObject);
return result == HttpStatusCode.Accepted ||
result == HttpStatusCode.OK; //TODO: revamp this for better error handling
}
public async Task<bool> SendRejectFollowAsync(ActivityFollow activity, string followerHost)

View File

@ -6,6 +6,6 @@ namespace BirdsiteLive.Pipeline.Contracts
{
public interface ISendTweetsToFollowersProcessor
{
Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct);
Task ProcessAsync(UserWithDataToSync[] usersWithTweetsToSync, CancellationToken ct);
}
}

View File

@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Contracts;
@ -20,17 +21,18 @@ namespace BirdsiteLive.Pipeline.Processors
public async Task<IEnumerable<UserWithDataToSync>> ProcessAsync(UserWithDataToSync[] userWithTweetsToSyncs, CancellationToken ct)
{
List<Task> todo = new List<Task>();
foreach (var user in userWithTweetsToSyncs)
{
var t = Task.Run(
async() => {
var followers = await _followersDal.GetFollowersAsync(user.User.Id);
user.Followers = followers;
});
todo.Add(t);
}
await Task.WhenAll(todo);
//List<Task> todo = new List<Task>();
//foreach (var user in userWithTweetsToSyncs)
//{
// var t = Task.Run(
// async() => {
// var followers = await _followersDal.GetFollowersAsync(user.User.Id);
// user.Followers = followers;
// });
// todo.Add(t);
//}
//
//await Task.WhenAll(todo);
return userWithTweetsToSyncs;
}

View File

@ -51,25 +51,27 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks
index++;
var t = Task.Run(async () => {
var user = userWtData.User;
var now = DateTime.UtcNow;
try
{
var user = userWtData.User;
var tweets = await RetrieveNewTweets(user);
_logger.LogInformation(index + "/" + syncTwitterUsers.Count() + " Got " + tweets.Length + " tweets from user " + user.Acct + " " );
if (tweets.Length > 0 && user.LastTweetPostedId != -1)
if (tweets.Length > 0 && user.LastTweetPostedId == -1)
{
// skip the first time to avoid sending backlog of tweet
var tweetId = tweets.Last().Id;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now);
}
else if (tweets.Length > 0 && user.LastTweetPostedId != -1)
{
userWtData.Tweets = tweets;
usersWtTweets.Add(userWtData);
}
else if (tweets.Length > 0 && user.LastTweetPostedId == -1)
{
var tweetId = tweets.Last().Id;
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now);
}
else
{
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now);
}
@ -77,7 +79,7 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks
catch(Exception e)
{
_logger.LogError(e.Message);
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now);
}
});
todo.Add(t);

View File

@ -15,15 +15,17 @@ namespace BirdsiteLive.Pipeline.Processors
public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor
{
private readonly ITwitterUserDal _twitterUserDal;
private readonly IFollowersDal _followersDal;
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;
private static Random rng = new Random();
public int WaitFactor = 1000 * 60; //1 min
#region Ctor
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger<RetrieveTwitterUsersProcessor> logger)
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, IFollowersDal followersDal, ILogger<RetrieveTwitterUsersProcessor> logger)
{
_twitterUserDal = twitterUserDal;
_followersDal = followersDal;
_logger = logger;
}
#endregion
@ -38,13 +40,17 @@ namespace BirdsiteLive.Pipeline.Processors
{
var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(2000);
var userCount = users.Any() ? Math.Min(users.Length, 100) : 1;
var userCount = users.Any() ? Math.Min(users.Length, 200) : 1;
var splitUsers = users.OrderBy(a => rng.Next()).ToArray().Split(userCount).ToList();
foreach (var u in splitUsers)
{
ct.ThrowIfCancellationRequested();
UserWithDataToSync[] toSync = u.Select(x => new UserWithDataToSync { User = x }).ToArray();
UserWithDataToSync[] toSync = await Task.WhenAll(
u.Select(async x => new UserWithDataToSync
{ User = x, Followers = await _followersDal.GetFollowersAsync(x.Id) }
)
);
await twitterUsersBufferBlock.SendAsync(toSync, ct);
}

View File

@ -22,7 +22,6 @@ namespace BirdsiteLive.Pipeline.Processors
public class SendTweetsToFollowersProcessor : ISendTweetsToFollowersProcessor
{
private readonly ISendTweetsToInboxTask _sendTweetsToInboxTask;
private readonly ISaveProgressionTask _saveProgressionTask;
private readonly ISendTweetsToSharedInboxTask _sendTweetsToSharedInbox;
private readonly IFollowersDal _followersDal;
private readonly InstanceSettings _instanceSettings;
@ -31,45 +30,50 @@ namespace BirdsiteLive.Pipeline.Processors
private List<Task> _todo = new List<Task>();
#region Ctor
public SendTweetsToFollowersProcessor(ISendTweetsToInboxTask sendTweetsToInboxTask, ISendTweetsToSharedInboxTask sendTweetsToSharedInbox, ISaveProgressionTask saveProgressionTask, IFollowersDal followersDal, ILogger<SendTweetsToFollowersProcessor> logger, InstanceSettings instanceSettings, IRemoveFollowerAction removeFollowerAction)
public SendTweetsToFollowersProcessor(ISendTweetsToInboxTask sendTweetsToInboxTask, ISendTweetsToSharedInboxTask sendTweetsToSharedInbox, IFollowersDal followersDal, ILogger<SendTweetsToFollowersProcessor> logger, InstanceSettings instanceSettings, IRemoveFollowerAction removeFollowerAction)
{
_sendTweetsToInboxTask = sendTweetsToInboxTask;
_sendTweetsToSharedInbox = sendTweetsToSharedInbox;
_logger = logger;
_instanceSettings = instanceSettings;
_removeFollowerAction = removeFollowerAction;
_saveProgressionTask = saveProgressionTask;
_followersDal = followersDal;
}
#endregion
public async Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct)
public async Task ProcessAsync(UserWithDataToSync[] usersWithTweetsToSync, CancellationToken ct)
{
var user = userWithTweetsToSync.User;
_todo = _todo.Where(x => !x.IsCompleted).ToList();
var t = Task.Run( async () =>
foreach (var userWithTweetsToSync in usersWithTweetsToSync)
{
// Process Shared Inbox
var followersWtSharedInbox = userWithTweetsToSync.Followers
.Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user);
var user = userWithTweetsToSync.User;
// Process Inbox
var followerWtInbox = userWithTweetsToSync.Followers
.Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user);
_todo = _todo.Where(x => !x.IsCompleted).ToList();
var t = Task.Run( async () =>
{
// Process Shared Inbox
var followersWtSharedInbox = userWithTweetsToSync.Followers
.Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user);
await _saveProgressionTask.ProcessAsync(userWithTweetsToSync, ct);
});
_todo.Add(t);
// Process Inbox
var followerWtInbox = userWithTweetsToSync.Followers
.Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user);
_logger.LogInformation("Done sending " + userWithTweetsToSync.Tweets.Length + " tweets for "
+ userWithTweetsToSync.Followers.Length + "followers for user " + userWithTweetsToSync.User.Acct);
}, ct);
_todo.Add(t);
if (_todo.Count >= _instanceSettings.ParallelFediversePosts)
{
await Task.WhenAny(_todo);
if (_todo.Count >= _instanceSettings.ParallelFediversePosts)
{
await Task.WhenAny(_todo);
}
}
}

View File

@ -31,7 +31,6 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks
{
_activityPubService = activityPubService;
_statusService = statusService;
_followersDal = followersDal;
_settings = settings;
_logger = logger;
}
@ -40,46 +39,32 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks
public async Task ExecuteAsync(IEnumerable<ExtractedTweet> tweets, Follower follower, SyncTwitterUser user)
{
var userId = user.Id;
var fromStatusId = follower.FollowingsSyncStatus[userId];
//var fromStatusId = follower.FollowingsSyncStatus[userId];
var tweetsToSend = tweets
.Where(x => x.Id > fromStatusId)
.OrderBy(x => x.Id)
.ToList();
var inbox = follower.InboxRoute;
var syncStatus = fromStatusId;
try
foreach (var tweet in tweetsToSend)
{
foreach (var tweet in tweetsToSend)
try
{
try
var activity = _statusService.GetActivity(user.Acct, tweet);
await _activityPubService.PostNewActivity(activity, user.Acct, tweet.Id.ToString(), follower.Host, inbox);
}
catch (ArgumentException e)
{
if (e.Message.Contains("Invalid pattern") && e.Message.Contains("at offset")) //Regex exception
{
var activity = _statusService.GetActivity(user.Acct, tweet);
await _activityPubService.PostNewActivity(activity, user.Acct, tweet.Id.ToString(), follower.Host, inbox);
_logger.LogError(e, "Can't parse {MessageContent} from Tweet {Id}", tweet.MessageContent, tweet.Id);
}
catch (ArgumentException e)
else
{
if (e.Message.Contains("Invalid pattern") && e.Message.Contains("at offset")) //Regex exception
{
_logger.LogError(e, "Can't parse {MessageContent} from Tweet {Id}", tweet.MessageContent, tweet.Id);
}
else
{
throw;
}
throw;
}
}
syncStatus = tweet.Id;
}
}
finally
{
if (syncStatus != fromStatusId)
{
follower.FollowingsSyncStatus[userId] = syncStatus;
await _followersDal.UpdateFollowerAsync(follower);
}
}
}
}

View File

@ -40,50 +40,29 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks
var userId = user.Id;
var inbox = followersPerInstance.First().SharedInboxRoute;
var fromStatusId = followersPerInstance
.Max(x => x.FollowingsSyncStatus[userId]);
var tweetsToSend = tweets
.Where(x => x.Id > fromStatusId)
.OrderBy(x => x.Id)
.ToList();
_logger.LogInformation("After filtering, there were " + tweetsToSend.Count() + " tweets left to send");
var syncStatus = fromStatusId;
try
foreach (var tweet in tweetsToSend)
{
foreach (var tweet in tweetsToSend)
try
{
try
{
var activity = _statusService.GetActivity(user.Acct, tweet);
await _activityPubService.PostNewActivity(activity, user.Acct, tweet.Id.ToString(), host, inbox);
}
catch (ArgumentException e)
{
if (e.Message.Contains("Invalid pattern") && e.Message.Contains("at offset")) //Regex exception
{
_logger.LogError(e, "Can't parse {MessageContent} from Tweet {Id}", tweet.MessageContent, tweet.Id);
}
else
{
throw;
}
}
syncStatus = tweet.Id;
var activity = _statusService.GetActivity(user.Acct, tweet);
await _activityPubService.PostNewActivity(activity, user.Acct, tweet.Id.ToString(), host, inbox);
}
}
finally
{
if (syncStatus != fromStatusId)
catch (ArgumentException e)
{
foreach (var f in followersPerInstance)
if (e.Message.Contains("Invalid pattern") && e.Message.Contains("at offset")) //Regex exception
{
f.FollowingsSyncStatus[userId] = syncStatus;
await _followersDal.UpdateFollowerAsync(f);
_logger.LogError(e, "Can't parse {MessageContent} from Tweet {Id}", tweet.MessageContent, tweet.Id);
}
else
{
throw;
}
}
}
}
}

View File

@ -39,24 +39,24 @@ namespace BirdsiteLive.Pipeline
public async Task ExecuteAsync(CancellationToken ct)
{
var standardBlockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 };
var standardBlockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1, CancellationToken = ct};
// Create blocks
var twitterUserToRefreshBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions
{ BoundedCapacity = 1, CancellationToken = ct });
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 } );
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), standardBlockOptions );
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct });
var sendTweetsToFollowersBlock = new ActionBlock<UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, CancellationToken = ct, BoundedCapacity = 1 });
// var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1 } );
// var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct });
var sendTweetsToFollowersBlock = new ActionBlock<UserWithDataToSync[]>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
// Link pipeline
twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
// Launch twitter user retriever
// Launch twitter user retriever after a little delay
// to give time for the Tweet cache to fill
await Task.Delay(30 * 1000, ct);
var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUserToRefreshBufferBlock, ct);
// Wait

View File

@ -19,7 +19,7 @@ namespace BirdsiteLive.Twitter
private readonly MemoryCache _userCache;
private readonly MemoryCacheEntryOptions _cacheEntryOptions = new MemoryCacheEntryOptions()
.SetSize(10000)//Size amount
.SetSize(1)//Size amount
//Priority on removing when reaching size limit (memory pressure)
.SetPriority(CacheItemPriority.Low)
// Keep in cache for this time, reset time if accessed.
@ -34,7 +34,7 @@ namespace BirdsiteLive.Twitter
_userCache = new MemoryCache(new MemoryCacheOptions()
{
SizeLimit = 3000 //TODO make this use number of entries in db
SizeLimit = settings.UserCacheCapacity
});
}
#endregion

View File

@ -17,14 +17,7 @@ namespace BirdsiteLive.Twitter
private readonly ITwitterTweetsService _twitterService;
private readonly MemoryCache _tweetCache;
private readonly MemoryCacheEntryOptions _cacheEntryOptions = new MemoryCacheEntryOptions()
.SetSize(10000)//Size amount
//Priority on removing when reaching size limit (memory pressure)
.SetPriority(CacheItemPriority.Low)
// Keep in cache for this time, reset time if accessed.
.SetSlidingExpiration(TimeSpan.FromMinutes(60))
// Remove from cache after this time, regardless of sliding expiration
.SetAbsoluteExpiration(TimeSpan.FromDays(1));
private readonly MemoryCacheEntryOptions _cacheEntryOptions;
#region Ctor
public CachedTwitterTweetsService(ITwitterTweetsService twitterService, InstanceSettings settings)
@ -33,8 +26,16 @@ namespace BirdsiteLive.Twitter
_tweetCache = new MemoryCache(new MemoryCacheOptions()
{
SizeLimit = 10000 //TODO make this use number of entries in db
SizeLimit = settings.TweetCacheCapacity,
});
_cacheEntryOptions = new MemoryCacheEntryOptions()
.SetSize(1)
//Priority on removing when reaching size limit (memory pressure)
.SetPriority(CacheItemPriority.Low)
// Keep in cache for this time, reset time if accessed.
.SetSlidingExpiration(TimeSpan.FromMinutes(60))
// Remove from cache after this time, regardless of sliding expiration
.SetAbsoluteExpiration(TimeSpan.FromDays(1));
}
#endregion

View File

@ -6,7 +6,7 @@
<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.123" />
<PackageReference Include="Npgsql" Version="7.0.1" />
<PackageReference Include="Npgsql" Version="7.0.2" />
</ItemGroup>
<ItemGroup>

View File

@ -99,7 +99,7 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
{
if (followedUserId == default) throw new ArgumentException("followedUserId");
var query = $"SELECT * FROM {_settings.FollowersTableName} WHERE $1=ANY(followings)";
var query = $"SELECT * FROM {_settings.FollowersTableName} WHERE followings @> ARRAY[$1]";
await using var connection = DataSource.CreateConnection();
await connection.OpenAsync();

View File

@ -14,6 +14,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
[TestClass]
public class RetrieveFollowersProcessorTests
{
[Ignore]
[TestMethod]
public async Task ProcessAsync_Test()
{

View File

@ -38,10 +38,16 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
It.Is<int>(y => true)))
.ReturnsAsync(users);
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.GetFollowersAsync(It.Is<int>(x => true)))
.ReturnsAsync(new Follower[] {});
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
processor.WaitFactor = 10;
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -79,10 +85,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
.ReturnsAsync(new SyncTwitterUser[0])
.ReturnsAsync(new SyncTwitterUser[0]);
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.GetFollowersAsync(It.Is<int>(x => true)))
.ReturnsAsync(new Follower[] {});
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
processor.WaitFactor = 2;
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -120,10 +131,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
.ReturnsAsync(new SyncTwitterUser[0])
.ReturnsAsync(new SyncTwitterUser[0]);
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.GetFollowersAsync(It.Is<int>(x => true)))
.ReturnsAsync(new Follower[] {});
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
processor.WaitFactor = 2;
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -154,10 +170,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
It.Is<int>(y => true)))
.ReturnsAsync(new SyncTwitterUser[0]);
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.GetFollowersAsync(It.Is<int>(x => true)))
.ReturnsAsync(new Follower[] {});
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
processor.WaitFactor = 1;
var t =processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -185,10 +206,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
It.Is<int>(y => true)))
.Returns(async () => await DelayFaultedTask<SyncTwitterUser[]>(new Exception()));
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.GetFollowersAsync(It.Is<int>(x => true)))
.ReturnsAsync(new Follower[] {});
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
processor.WaitFactor = 10;
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -215,10 +241,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#region Mocks
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.GetFollowersAsync(It.Is<int>(x => true)))
.ReturnsAsync(new Follower[] {});
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
processor.WaitFactor = 1;
await processor.GetTwitterUsersAsync(buffer, canTokenS.Token);
}

View File

@ -87,8 +87,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None);
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(new[] {userWithTweets}, CancellationToken.None);
#region Validations
sendTweetsToInboxTaskMock.VerifyAll();
@ -169,8 +169,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None);
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(new[] {userWithTweets}, CancellationToken.None);
#region Validations
sendTweetsToInboxTaskMock.VerifyAll();
@ -260,8 +260,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None);
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(new[] {userWithTweets}, CancellationToken.None);
#region Validations
sendTweetsToInboxTaskMock.VerifyAll();
@ -352,8 +352,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None);
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations
sendTweetsToInboxTaskMock.VerifyAll();
@ -449,8 +449,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None);
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations
sendTweetsToInboxTaskMock.VerifyAll();
@ -529,8 +529,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None);
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations
sendTweetsToInboxTaskMock.VerifyAll();
@ -610,8 +610,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None);
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations
sendTweetsToInboxTaskMock.VerifyAll();
@ -699,8 +699,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None);
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations
sendTweetsToInboxTaskMock.VerifyAll();
@ -789,8 +789,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
.Returns(Task.CompletedTask);
#endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None);
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations
sendTweetsToInboxTaskMock.VerifyAll();
@ -879,8 +879,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
.Returns(Task.CompletedTask);
#endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None);
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations
sendTweetsToInboxTaskMock.VerifyAll();
@ -969,8 +969,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None);
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations
sendTweetsToInboxTaskMock.VerifyAll();
@ -1064,8 +1064,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var removeFollowerMock = new Mock<IRemoveFollowerAction>(MockBehavior.Strict);
#endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, saveProgressMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None);
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(new [] { userWithTweets }, CancellationToken.None);
#region Validations
sendTweetsToInboxTaskMock.VerifyAll();

View File

@ -85,10 +85,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
.Returns(activity);
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId)))
.Returns(Task.CompletedTask);
var loggerMock = new Mock<ILogger<SendTweetsToInboxTask>>();
#endregion
@ -168,10 +164,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
.Returns(activity);
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId)))
.Returns(Task.CompletedTask);
var loggerMock = new Mock<ILogger<SendTweetsToInboxTask>>();
#endregion
@ -254,10 +246,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
.Returns(activity);
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId)))
.Returns(Task.CompletedTask);
var loggerMock = new Mock<ILogger<SendTweetsToInboxTask>>();
#endregion
@ -340,10 +329,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
.Returns(activity);
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId)))
.Returns(Task.CompletedTask);
var loggerMock = new Mock<ILogger<SendTweetsToInboxTask>>();
#endregion
@ -400,7 +386,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
#region Mocks
var activityPubService = new Mock<IActivityPubService>(MockBehavior.Strict);
foreach (var tweetId in new[] { tweetId2, tweetId3 })
foreach (var tweetId in new[] { tweetId1, tweetId2, tweetId3 })
{
activityPubService
.Setup(x => x.PostNewActivity(
@ -413,7 +399,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
}
var statusServiceMock = new Mock<IStatusService>(MockBehavior.Strict);
foreach (var tweetId in new[] { tweetId2, tweetId3 })
foreach (var tweetId in new[] { tweetId1, tweetId2, tweetId3 })
{
statusServiceMock
.Setup(x => x.GetActivity(
@ -423,10 +409,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
}
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId3)))
.Returns(Task.CompletedTask);
var loggerMock = new Mock<ILogger<SendTweetsToInboxTask>>();
#endregion
@ -485,6 +468,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
#region Mocks
var activityPubService = new Mock<IActivityPubService>(MockBehavior.Strict);
activityPubService
.Setup(x => x.PostNewActivity(
It.Is<ActivityCreateNote>(y => y.apObject.id == tweetId1.ToString()),
It.Is<string>(y => y == twitterHandle),
It.Is<string>(y => y == tweetId1.ToString()),
It.Is<string>(y => y == host),
It.Is<string>(y => y == inbox)))
.Returns(Task.CompletedTask);
activityPubService
.Setup(x => x.PostNewActivity(
It.Is<ActivityCreateNote>(y => y.apObject.id == tweetId2.ToString()),
@ -504,7 +496,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
.Throws(new HttpRequestException());
var statusServiceMock = new Mock<IStatusService>(MockBehavior.Strict);
foreach (var tweetId in new[] { tweetId2, tweetId3 })
foreach (var tweetId in new[] { tweetId1, tweetId2, tweetId3 })
{
statusServiceMock
.Setup(x => x.GetActivity(
@ -514,10 +506,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
}
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId2)))
.Returns(Task.CompletedTask);
var loggerMock = new Mock<ILogger<SendTweetsToInboxTask>>();
#endregion
@ -592,10 +580,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
.Throws(new ArgumentException("Invalid pattern blabla at offset 9"));
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId)))
.Returns(Task.CompletedTask);
var loggerMock = new Mock<ILogger<SendTweetsToInboxTask>>();
#endregion

View File

@ -105,13 +105,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
foreach (var follower in followers)
{
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId)))
.Returns(Task.CompletedTask);
}
var loggerMock = new Mock<ILogger<SendTweetsToSharedInboxTask>>();
#endregion
@ -212,14 +205,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
foreach (var follower in followers)
{
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId)))
.Returns(Task.CompletedTask);
}
var loggerMock = new Mock<ILogger<SendTweetsToSharedInboxTask>>();
#endregion
@ -318,13 +303,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
foreach (var follower in followers)
{
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId)))
.Returns(Task.CompletedTask);
}
var loggerMock = new Mock<ILogger<SendTweetsToSharedInboxTask>>();
#endregion
@ -398,7 +376,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
#region Mocks
var activityPubService = new Mock<IActivityPubService>(MockBehavior.Strict);
foreach (var tweetId in new[] { tweetId2, tweetId3 })
foreach (var tweetId in new[] { tweetId1, tweetId2, tweetId3 })
{
activityPubService
.Setup(x => x.PostNewActivity(
@ -411,7 +389,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
}
var statusServiceMock = new Mock<IStatusService>(MockBehavior.Strict);
foreach (var tweetId in new[] { tweetId2, tweetId3 })
foreach (var tweetId in new[] { tweetId1, tweetId2, tweetId3 })
{
statusServiceMock
.Setup(x => x.GetActivity(
@ -422,14 +400,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
foreach (var follower in followers)
{
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId3)))
.Returns(Task.CompletedTask);
}
var loggerMock = new Mock<ILogger<SendTweetsToSharedInboxTask>>();
#endregion
@ -504,6 +474,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
#region Mocks
var activityPubService = new Mock<IActivityPubService>(MockBehavior.Strict);
activityPubService
.Setup(x => x.PostNewActivity(
It.Is<ActivityCreateNote>(y => y.apObject.id == tweetId1.ToString()),
It.Is<string>(y => y == twitterHandle),
It.Is<string>(y => y == tweetId1.ToString()),
It.Is<string>(y => y == host),
It.Is<string>(y => y == inbox)))
.Returns(Task.CompletedTask);
activityPubService
.Setup(x => x.PostNewActivity(
It.Is<ActivityCreateNote>(y => y.apObject.id == tweetId2.ToString()),
@ -523,7 +502,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
.Throws(new HttpRequestException());
var statusServiceMock = new Mock<IStatusService>(MockBehavior.Strict);
foreach (var tweetId in new[] { tweetId2, tweetId3 })
foreach (var tweetId in new[] { tweetId1, tweetId2, tweetId3 })
{
statusServiceMock
.Setup(x => x.GetActivity(
@ -534,14 +513,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
foreach (var follower in followers)
{
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId2)))
.Returns(Task.CompletedTask);
}
var loggerMock = new Mock<ILogger<SendTweetsToSharedInboxTask>>();
#endregion
@ -633,14 +604,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors.SubTasks
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
foreach (var follower in followers)
{
followersDalMock
.Setup(x => x.UpdateFollowerAsync(
It.Is<Follower>(y => y.Id == follower.Id && y.FollowingsSyncStatus[twitterUserId] == tweetId)))
.Returns(Task.CompletedTask);
}
var loggerMock = new Mock<ILogger<SendTweetsToSharedInboxTask>>();
#endregion

View File

@ -16,7 +16,7 @@ namespace BirdsiteLive.Pipeline.Tests
public async Task ExecuteAsync_Test()
{
#region Stubs
var ct = new CancellationTokenSource(10);
var ct = new CancellationTokenSource(100 * 1000);
#endregion
#region Mocks