switched to vanilla npgsql for more queries 4

This commit is contained in:
Vincent Cloutier 2023-01-01 11:58:36 -05:00
parent 8551763f77
commit 97d40b21fb
6 changed files with 27 additions and 15 deletions

View file

@ -7,7 +7,6 @@
<ItemGroup>
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="System.Text.Json" Version="4.7.2" />
</ItemGroup>
</Project>

View file

@ -35,7 +35,6 @@ namespace BirdsiteLive.Pipeline.Processors
{
var usersWtTweets = new List<UserWithDataToSync>();
//TODO multithread this
int index = 0;
foreach (var userWtData in syncTwitterUsers)
{
@ -60,7 +59,7 @@ namespace BirdsiteLive.Pipeline.Processors
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now);
}
await Task.Delay(150);
//await Task.Delay(150);
}
return usersWtTweets.ToArray();

View file

@ -41,8 +41,8 @@ namespace BirdsiteLive.Pipeline.Processors
//var users = await _twitterUserDal.GetAllTwitterUsersAsync(50);
//var splitUsers = users.Split(25).ToList();
var maxUsersNumber = await _maxUsersNumberProvider.GetMaxUsersNumberAsync();
var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(maxUsersNumber);
//var maxUsersNumber = await _maxUsersNumberProvider.GetMaxUsersNumberAsync();
var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(1000);
var userCount = users.Any() ? Math.Min(users.Length, 25) : 1;
//var splitNumber = (int) Math.Ceiling(userCount / 15d);

View file

@ -49,9 +49,9 @@ namespace BirdsiteLive.Pipeline
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct));
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 2, CancellationToken = ct });
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct));
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 10, CancellationToken = ct });
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = ct });
var sendTweetsToFollowersBlock = new TransformBlock<UserWithDataToSync, UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 10, CancellationToken = ct });
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = ct });
var saveProgressionBlock = new ActionBlock<UserWithDataToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
// Link pipeline

View file

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Contracts;
@ -110,11 +111,29 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
{
var query = "SELECT * FROM (SELECT unnest(followings) as follow FROM followers GROUP BY follow) AS f INNER JOIN twitter_users ON f.follow=twitter_users.id ORDER BY lastSync ASC NULLS FIRST LIMIT @maxNumber";
using (var dbConnection = Connection)
await using var connection = DataSource.CreateConnection();
await connection.OpenAsync();
await using var command = new NpgsqlCommand(query, connection) {
Parameters = { new() { Value = maxNumber}}
};
var reader = await command.ExecuteReaderAsync();
var results = new List<SyncTwitterUser>();
while (await reader.ReadAsync())
{
var result = await dbConnection.QueryAsync<SyncTwitterUser>(query, new { maxNumber });
return result.ToArray();
results.Add(new SyncTwitterUser
{
Id = reader["id"] as int? ?? default,
Acct = reader["acct"] as string,
TwitterUserId = reader["twitterUserId"] as long? ?? default,
LastTweetPostedId = reader["lastTweetPostedId"] as long? ?? default,
LastTweetSynchronizedForAllFollowersId = reader["lastTweetSynchronizedForAllFollowersId"] as long? ?? default,
LastSync = reader["lastSync"] as DateTime? ?? default,
FetchingErrorCount = reader["fetchingErrorCount"] as int? ?? default,
}
);
}
return results.ToArray();
}
public async Task<SyncTwitterUser[]> GetAllTwitterUsersAsync(int maxNumber)

View file

@ -53,7 +53,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
await Task.WhenAny(t, Task.Delay(50));
#region Validations
maxUsersNumberProviderMock.VerifyAll();
twitterUserDalMock.VerifyAll();
Assert.IsTrue(0 < buffer.Count);
buffer.TryReceive(out var result);
@ -100,7 +99,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
await Task.WhenAny(t, Task.Delay(300));
#region Validations
maxUsersNumberProviderMock.VerifyAll();
twitterUserDalMock.VerifyAll();
Assert.IsTrue(0 < buffer.Count);
buffer.TryReceive(out var result);
@ -147,7 +145,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
await Task.WhenAny(t, Task.Delay(5000));
#region Validations
maxUsersNumberProviderMock.VerifyAll();
twitterUserDalMock.VerifyAll();
Assert.IsTrue(0 < buffer.Count);
buffer.TryReceive(out var result);
@ -186,7 +183,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
await Task.WhenAny(t, Task.Delay(50));
#region Validations
maxUsersNumberProviderMock.VerifyAll();
twitterUserDalMock.VerifyAll();
Assert.AreEqual(0, buffer.Count);
#endregion
@ -223,7 +219,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
await Task.WhenAny(t, Task.Delay(50));
#region Validations
maxUsersNumberProviderMock.VerifyAll();
twitterUserDalMock.VerifyAll();
Assert.AreEqual(0, buffer.Count);
#endregion