sharding support

This commit is contained in:
Vincent Cloutier 2023-04-01 19:55:20 -04:00
parent bc90bc293e
commit 3346b7b5e8
5 changed files with 62 additions and 12 deletions

View file

@ -18,6 +18,9 @@
public int TweetCacheCapacity { get; set; } = 20_000;
// "AAAAAAAAAAAAAAAAAAAAAPYXBAAAAAAACLXUNDekMxqa8h%2F40K4moUkGsoc%3DTYfbDKbT3jJPCEVnMYqilB28NHfOPqkca3qaAxGfsyKCs0wRbw"
public string TwitterBearerToken { get; set; } = "AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA";
public int m { get; set; } = 1;
public int n_start { get; set; } = 0;
public int n_end { get; set; } = 1;
public int ParallelTwitterRequests { get; set; } = 10;
public int ParallelFediversePosts { get; set; } = 10;
}

View file

@ -16,16 +16,18 @@ namespace BirdsiteLive.Pipeline.Processors
{
private readonly ITwitterUserDal _twitterUserDal;
private readonly IFollowersDal _followersDal;
private readonly InstanceSettings _instanceSettings;
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;
private static Random rng = new Random();
public int WaitFactor = 1000 * 60; //1 min
#region Ctor
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, IFollowersDal followersDal, ILogger<RetrieveTwitterUsersProcessor> logger)
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, IFollowersDal followersDal, InstanceSettings instanceSettings, ILogger<RetrieveTwitterUsersProcessor> logger)
{
_twitterUserDal = twitterUserDal;
_followersDal = followersDal;
_instanceSettings = instanceSettings;
_logger = logger;
}
#endregion
@ -38,7 +40,7 @@ namespace BirdsiteLive.Pipeline.Processors
try
{
var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(2000);
var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(2000, _instanceSettings.n_start, _instanceSettings.n_end, _instanceSettings.m);
var userCount = users.Any() ? Math.Min(users.Length, 200) : 1;
var splitUsers = users.OrderBy(a => rng.Next()).ToArray().Split(userCount).ToList();

View file

@ -118,14 +118,20 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
}
}
public async Task<SyncTwitterUser[]> GetAllTwitterUsersWithFollowersAsync(int maxNumber)
public async Task<SyncTwitterUser[]> GetAllTwitterUsersWithFollowersAsync(int maxNumber, int nStart, int nEnd, int m)
{
var query = "SELECT * FROM (SELECT unnest(followings) as follow FROM followers GROUP BY follow) AS f INNER JOIN twitter_users ON f.follow=twitter_users.id ORDER BY lastSync ASC NULLS FIRST LIMIT $1";
var query = "SELECT * FROM (SELECT unnest(followings) as follow FROM followers GROUP BY follow) AS f INNER JOIN twitter_users ON f.follow=twitter_users.id WHERE mod(id, $2) >= $3 AND mod(id, $2) <= $4 ORDER BY lastSync ASC NULLS FIRST LIMIT $1";
await using var connection = DataSource.CreateConnection();
await connection.OpenAsync();
await using var command = new NpgsqlCommand(query, connection) {
Parameters = { new() { Value = maxNumber}}
Parameters =
{
new() { Value = maxNumber},
new() { Value = m},
new() { Value = nStart},
new() { Value = nEnd}
}
};
var reader = await command.ExecuteReaderAsync();
var results = new List<SyncTwitterUser>();

View file

@ -9,7 +9,7 @@ namespace BirdsiteLive.DAL.Contracts
Task CreateTwitterUserAsync(string acct, long lastTweetPostedId);
Task<SyncTwitterUser> GetTwitterUserAsync(string acct);
Task<SyncTwitterUser> GetTwitterUserAsync(int id);
Task<SyncTwitterUser[]> GetAllTwitterUsersWithFollowersAsync(int maxNumber);
Task<SyncTwitterUser[]> GetAllTwitterUsersWithFollowersAsync(int maxNumber, int nStart, int nEnd, int m);
Task<SyncTwitterUser[]> GetAllTwitterUsersAsync(int maxNumber);
Task<SyncTwitterUser[]> GetAllTwitterUsersAsync();
Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, int fetchingErrorCount, DateTime lastSync);

View file

@ -29,12 +29,19 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
new SyncTwitterUser(),
};
var maxUsers = 1000;
var instanceSettings = new InstanceSettings()
{
n_start = 1,
};
#endregion
#region Mocks
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.Setup(x => x.GetAllTwitterUsersWithFollowersAsync(
It.Is<int>(y => true),
It.Is<int>(y => true),
It.Is<int>(y => true),
It.Is<int>(y => true)))
.ReturnsAsync(users);
@ -47,7 +54,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, instanceSettings, loggerMock.Object);
processor.WaitFactor = 10;
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -72,12 +79,19 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
users.Add(new SyncTwitterUser());
var maxUsers = 1000;
var instanceSettings = new InstanceSettings()
{
n_start = 1,
};
#endregion
#region Mocks
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.SetupSequence(x => x.GetAllTwitterUsersWithFollowersAsync(
It.Is<int>(y => true),
It.Is<int>(y => true),
It.Is<int>(y => true),
It.Is<int>(y => true)))
.ReturnsAsync(users.ToArray())
.ReturnsAsync(new SyncTwitterUser[0])
@ -93,7 +107,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, instanceSettings, loggerMock.Object);
processor.WaitFactor = 2;
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -118,12 +132,19 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
users.Add(new SyncTwitterUser());
var maxUsers = 1000;
var instanceSettings = new InstanceSettings()
{
n_start = 1,
};
#endregion
#region Mocks
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.SetupSequence(x => x.GetAllTwitterUsersWithFollowersAsync(
It.Is<int>(y => true),
It.Is<int>(y => true),
It.Is<int>(y => true),
It.Is<int>(y => true)))
.ReturnsAsync(users.ToArray())
.ReturnsAsync(new SyncTwitterUser[0])
@ -139,7 +160,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, instanceSettings, loggerMock.Object);
processor.WaitFactor = 2;
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -160,6 +181,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var buffer = new BufferBlock<UserWithDataToSync[]>();
var maxUsers = 1000;
var instanceSettings = new InstanceSettings()
{
n_start = 1,
};
#endregion
#region Mocks
@ -167,6 +192,9 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.Setup(x => x.GetAllTwitterUsersWithFollowersAsync(
It.Is<int>(y => true),
It.Is<int>(y => true),
It.Is<int>(y => true),
It.Is<int>(y => true)))
.ReturnsAsync(new SyncTwitterUser[0]);
@ -178,7 +206,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, instanceSettings, loggerMock.Object);
processor.WaitFactor = 1;
var t =processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -197,12 +225,19 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var buffer = new BufferBlock<UserWithDataToSync[]>();
var maxUsers = 1000;
var instanceSettings = new InstanceSettings()
{
n_start = 1,
};
#endregion
#region Mocks
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.Setup(x => x.GetAllTwitterUsersWithFollowersAsync(
It.Is<int>(y => true),
It.Is<int>(y => true),
It.Is<int>(y => true),
It.Is<int>(y => true)))
.Returns(async () => await DelayFaultedTask<SyncTwitterUser[]>(new Exception()));
@ -214,7 +249,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, instanceSettings, loggerMock.Object);
processor.WaitFactor = 10;
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -236,6 +271,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
canTokenS.Cancel();
var maxUsers = 1000;
var instanceSettings = new InstanceSettings()
{
n_start = 1,
};
#endregion
#region Mocks
@ -249,7 +288,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, instanceSettings, loggerMock.Object);
processor.WaitFactor = 1;
await processor.GetTwitterUsersAsync(buffer, canTokenS.Token);
}