added limit to user retrieval

This commit is contained in:
Nicolas Constant 2021-01-22 21:23:27 -05:00
parent b4c01ad326
commit 89c041f332
No known key found for this signature in database
GPG Key ID: 1E9F677FB01A5688
9 changed files with 81 additions and 27 deletions

View File

@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -45,7 +46,8 @@ namespace BirdsiteLive.Pipeline.Processors
else if (tweets.Length > 0 && user.LastTweetPostedId == -1)
{
var tweetId = tweets.Last().Id;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId);
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, now);
}
}

View File

@ -4,6 +4,7 @@ using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.Common.Extensions;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
@ -15,12 +16,16 @@ namespace BirdsiteLive.Pipeline.Processors
{
private readonly ITwitterUserDal _twitterUserDal;
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;
private readonly InstanceSettings _instanceSettings;
public int WaitFactor = 1000 * 60; //1 min
private int StartUpWarming = 4;
#region Ctor
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger<RetrieveTwitterUsersProcessor> logger)
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, InstanceSettings instanceSettings, ILogger<RetrieveTwitterUsersProcessor> logger)
{
_twitterUserDal = twitterUserDal;
_instanceSettings = instanceSettings;
_logger = logger;
}
#endregion
@ -33,7 +38,11 @@ namespace BirdsiteLive.Pipeline.Processors
try
{
var users = await _twitterUserDal.GetAllTwitterUsersAsync();
var maxUsers = StartUpWarming > 0
? _instanceSettings.MaxUsersCapacity / 4
: _instanceSettings.MaxUsersCapacity;
StartUpWarming--;
var users = await _twitterUserDal.GetAllTwitterUsersAsync(maxUsers);
var userCount = users.Any() ? users.Length : 1;
var splitNumber = (int) Math.Ceiling(userCount / 15d);

View File

@ -1,4 +1,5 @@
using System.Linq;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Contracts;
@ -23,7 +24,8 @@ namespace BirdsiteLive.Pipeline.Processors
var userId = userWithTweetsToSync.User.Id;
var lastPostedTweet = userWithTweetsToSync.Tweets.Select(x => x.Id).Max();
var minimumSync = userWithTweetsToSync.Followers.Select(x => x.FollowingsSyncStatus[userId]).Min();
await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync);
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync, now);
}
}
}

View File

@ -62,15 +62,15 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
}
}
public async Task<SyncTwitterUser[]> GetAllTwitterUsersAsync()
public async Task<SyncTwitterUser[]> GetAllTwitterUsersAsync(int maxNumber)
{
var query = $"SELECT * FROM {_settings.TwitterUserTableName}";
var query = $"SELECT * FROM {_settings.TwitterUserTableName} ORDER BY lastSync ASC LIMIT @maxNumber";
using (var dbConnection = Connection)
{
dbConnection.Open();
var result = await dbConnection.QueryAsync<SyncTwitterUser>(query);
var result = await dbConnection.QueryAsync<SyncTwitterUser>(query, new { maxNumber });
return result.ToArray();
}
}

View File

@ -8,7 +8,7 @@ namespace BirdsiteLive.DAL.Contracts
{
Task CreateTwitterUserAsync(string acct, long lastTweetPostedId);
Task<SyncTwitterUser> GetTwitterUserAsync(string acct);
Task<SyncTwitterUser[]> GetAllTwitterUsersAsync();
Task<SyncTwitterUser[]> GetAllTwitterUsersAsync(int maxNumber);
Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, DateTime lastSync);
Task DeleteTwitterUserAsync(string acct);
Task<int> GetTwitterUsersCountAsync();

View File

@ -111,7 +111,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers
await dal.CreateTwitterUserAsync(acct, lastTweetId);
}
var result = await dal.GetAllTwitterUsersAsync();
var result = await dal.GetAllTwitterUsersAsync(1000);
Assert.AreEqual(1000, result.Length);
Assert.IsFalse(result[0].Id == default);
Assert.IsFalse(result[0].Acct == default);

View File

@ -1,3 +1,4 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -54,7 +55,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
.Setup(x => x.UpdateTwitterUserAsync(
It.Is<int>(y => y == user1.Id),
It.Is<long>(y => y == tweets.Last().Id),
It.Is<long>(y => y == tweets.Last().Id)
It.Is<long>(y => y == tweets.Last().Id),
It.IsAny<DateTime>()
))
.Returns(Task.CompletedTask);
#endregion

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Processors;
@ -26,18 +27,23 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
new SyncTwitterUser(),
new SyncTwitterUser(),
};
var settings = new InstanceSettings
{
MaxUsersCapacity = 10
};
#endregion
#region Mocks
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.Setup(x => x.GetAllTwitterUsersAsync())
.Setup(x => x.GetAllTwitterUsersAsync(
It.Is<int>(y => y == settings.MaxUsersCapacity)))
.ReturnsAsync(users);
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object);
processor.WaitFactor = 10;
processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -60,19 +66,25 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
for (var i = 0; i < 30; i++)
users.Add(new SyncTwitterUser());
var settings = new InstanceSettings
{
MaxUsersCapacity = 100
};
#endregion
#region Mocks
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.SetupSequence(x => x.GetAllTwitterUsersAsync())
.SetupSequence(x => x.GetAllTwitterUsersAsync(
It.Is<int>(y => y == settings.MaxUsersCapacity)))
.ReturnsAsync(users.ToArray())
.ReturnsAsync(new SyncTwitterUser[0]);
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object);
processor.WaitFactor = 2;
processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -95,19 +107,25 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
for (var i = 0; i < 31; i++)
users.Add(new SyncTwitterUser());
var settings = new InstanceSettings
{
MaxUsersCapacity = 10
};
#endregion
#region Mocks
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.SetupSequence(x => x.GetAllTwitterUsersAsync())
.SetupSequence(x => x.GetAllTwitterUsersAsync(
It.Is<int>(y => y == settings.MaxUsersCapacity)))
.ReturnsAsync(users.ToArray())
.ReturnsAsync(new SyncTwitterUser[0]);
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object);
processor.WaitFactor = 2;
processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -126,18 +144,24 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
{
#region Stubs
var buffer = new BufferBlock<SyncTwitterUser[]>();
var settings = new InstanceSettings
{
MaxUsersCapacity = 10
};
#endregion
#region Mocks
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.Setup(x => x.GetAllTwitterUsersAsync())
.Setup(x => x.GetAllTwitterUsersAsync(
It.Is<int>(y => y == settings.MaxUsersCapacity)))
.ReturnsAsync(new SyncTwitterUser[0]);
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object);
processor.WaitFactor = 1;
processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -154,18 +178,24 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
{
#region Stubs
var buffer = new BufferBlock<SyncTwitterUser[]>();
var settings = new InstanceSettings
{
MaxUsersCapacity = 10
};
#endregion
#region Mocks
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.Setup(x => x.GetAllTwitterUsersAsync())
.Setup(x => x.GetAllTwitterUsersAsync(
It.Is<int>(y => y == settings.MaxUsersCapacity)))
.Returns(async () => await DelayFaultedTask<SyncTwitterUser[]>(new Exception()));
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object);
processor.WaitFactor = 10;
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
@ -185,6 +215,11 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var buffer = new BufferBlock<SyncTwitterUser[]>();
var canTokenS = new CancellationTokenSource();
canTokenS.Cancel();
var settings = new InstanceSettings
{
MaxUsersCapacity = 10
};
#endregion
#region Mocks
@ -192,7 +227,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object);
processor.WaitFactor = 1;
await processor.GetTwitterUsersAsync(buffer, canTokenS.Token);
}

View File

@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Contracts;
@ -60,7 +61,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
.Setup(x => x.UpdateTwitterUserAsync(
It.Is<int>(y => y == user.Id),
It.Is<long>(y => y == tweet2.Id),
It.Is<long>(y => y == tweet2.Id)
It.Is<long>(y => y == tweet2.Id),
It.IsAny<DateTime>()
))
.Returns(Task.CompletedTask);
#endregion
@ -123,7 +125,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
.Setup(x => x.UpdateTwitterUserAsync(
It.Is<int>(y => y == user.Id),
It.Is<long>(y => y == tweet3.Id),
It.Is<long>(y => y == tweet2.Id)
It.Is<long>(y => y == tweet2.Id),
It.IsAny<DateTime>()
))
.Returns(Task.CompletedTask);
#endregion
@ -194,7 +197,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
.Setup(x => x.UpdateTwitterUserAsync(
It.Is<int>(y => y == user.Id),
It.Is<long>(y => y == tweet3.Id),
It.Is<long>(y => y == tweet2.Id)
It.Is<long>(y => y == tweet2.Id),
It.IsAny<DateTime>()
))
.Returns(Task.CompletedTask);
#endregion