diff --git a/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj b/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj
index 5d93cb1..884af18 100644
--- a/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj
+++ b/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj
@@ -17,4 +17,8 @@
+
+
+
+
diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs
index eba1868..ebb87fc 100644
--- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs
+++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs
@@ -8,6 +8,7 @@ using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
+using BirdsiteLive.Pipeline.Tools;
using Microsoft.Extensions.Logging;
namespace BirdsiteLive.Pipeline.Processors
@@ -15,37 +16,30 @@ namespace BirdsiteLive.Pipeline.Processors
public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor
{
private readonly ITwitterUserDal _twitterUserDal;
+ private readonly IMaxUsersNumberProvider _maxUsersNumberProvider;
private readonly ILogger _logger;
- private readonly InstanceSettings _instanceSettings;
public int WaitFactor = 1000 * 60; //1 min
#region Ctor
- public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, InstanceSettings instanceSettings, ILogger logger)
+ public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, IMaxUsersNumberProvider maxUsersNumberProvider, ILogger logger)
{
_twitterUserDal = twitterUserDal;
- _instanceSettings = instanceSettings;
+ _maxUsersNumberProvider = maxUsersNumberProvider;
_logger = logger;
}
#endregion
public async Task GetTwitterUsersAsync(BufferBlock twitterUsersBufferBlock, CancellationToken ct)
{
- var totalUsers = await _twitterUserDal.GetTwitterUsersCountAsync();
- var warmUpMaxCapacity = _instanceSettings.MaxUsersCapacity / 4;
- var warmUpIterations = warmUpMaxCapacity == 0 ? 0 : (int) (totalUsers / (float) warmUpMaxCapacity);
-
for (; ; )
{
ct.ThrowIfCancellationRequested();
try
{
- var maxUsers = warmUpIterations > 0
- ? _instanceSettings.MaxUsersCapacity / 4
- : _instanceSettings.MaxUsersCapacity;
- warmUpIterations--;
- var users = await _twitterUserDal.GetAllTwitterUsersAsync(maxUsers);
+ var maxUsersNumber = await _maxUsersNumberProvider.GetMaxUsersNumberAsync();
+ var users = await _twitterUserDal.GetAllTwitterUsersAsync(maxUsersNumber);
var userCount = users.Any() ? users.Length : 1;
var splitNumber = (int) Math.Ceiling(userCount / 15d);
diff --git a/src/BirdsiteLive.Pipeline/Tools/MaxUsersNumberProvider.cs b/src/BirdsiteLive.Pipeline/Tools/MaxUsersNumberProvider.cs
new file mode 100644
index 0000000..c84b7b1
--- /dev/null
+++ b/src/BirdsiteLive.Pipeline/Tools/MaxUsersNumberProvider.cs
@@ -0,0 +1,49 @@
+using System.Threading.Tasks;
+using BirdsiteLive.Common.Settings;
+using BirdsiteLive.DAL.Contracts;
+
+namespace BirdsiteLive.Pipeline.Tools
+{
+ public interface IMaxUsersNumberProvider
+ {
+ Task GetMaxUsersNumberAsync();
+ }
+
+ public class MaxUsersNumberProvider : IMaxUsersNumberProvider
+ {
+ private readonly InstanceSettings _instanceSettings;
+ private readonly ITwitterUserDal _twitterUserDal;
+
+ private int _totalUsersCount = -1;
+ private int _warmUpIterations;
+
+ #region Ctor
+ public MaxUsersNumberProvider(InstanceSettings instanceSettings, ITwitterUserDal twitterUserDal)
+ {
+ _instanceSettings = instanceSettings;
+ _twitterUserDal = twitterUserDal;
+ }
+ #endregion
+
+ public async Task GetMaxUsersNumberAsync()
+ {
+ // Init data
+ if (_totalUsersCount == -1)
+ {
+ _totalUsersCount = await _twitterUserDal.GetTwitterUsersCountAsync();
+ var warmUpMaxCapacity = _instanceSettings.MaxUsersCapacity / 4;
+ _warmUpIterations = warmUpMaxCapacity == 0 ? 0 : (int)(_totalUsersCount / (float)warmUpMaxCapacity);
+ }
+
+ // Return if warm up ended
+ if (_warmUpIterations <= 0) return _instanceSettings.MaxUsersCapacity;
+
+ // Calculate warm up value
+ var maxUsers = _warmUpIterations > 0
+ ? _instanceSettings.MaxUsersCapacity / 4
+ : _instanceSettings.MaxUsersCapacity;
+ _warmUpIterations--;
+ return maxUsers;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/BirdsiteLive.Pipeline.Tests.csproj b/src/Tests/BirdsiteLive.Pipeline.Tests/BirdsiteLive.Pipeline.Tests.csproj
index 3dd6984..aa7750b 100644
--- a/src/Tests/BirdsiteLive.Pipeline.Tests/BirdsiteLive.Pipeline.Tests.csproj
+++ b/src/Tests/BirdsiteLive.Pipeline.Tests/BirdsiteLive.Pipeline.Tests.csproj
@@ -18,4 +18,8 @@
+
+
+
+
diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs
index ef33e78..5600e1c 100644
--- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs
+++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs
@@ -7,6 +7,7 @@ using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Processors;
+using BirdsiteLive.Pipeline.Tools;
using Microsoft.Extensions.Logging;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
@@ -27,33 +28,32 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
new SyncTwitterUser(),
new SyncTwitterUser(),
};
- var settings = new InstanceSettings
- {
- MaxUsersCapacity = 40
- };
+ var maxUsers = 1000;
#endregion
#region Mocks
+ var maxUsersNumberProviderMock = new Mock(MockBehavior.Strict);
+ maxUsersNumberProviderMock
+ .Setup(x => x.GetMaxUsersNumberAsync())
+ .ReturnsAsync(maxUsers);
+
var twitterUserDalMock = new Mock(MockBehavior.Strict);
twitterUserDalMock
.Setup(x => x.GetAllTwitterUsersAsync(
- It.Is(y => y == settings.MaxUsersCapacity/4)))
+ It.Is(y => y == maxUsers)))
.ReturnsAsync(users);
-
- twitterUserDalMock
- .Setup(x => x.GetTwitterUsersCountAsync())
- .ReturnsAsync(10);
-
+
var loggerMock = new Mock>();
#endregion
- var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object);
+ var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, maxUsersNumberProviderMock.Object, loggerMock.Object);
processor.WaitFactor = 10;
processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
await Task.Delay(50);
#region Validations
+ maxUsersNumberProviderMock.VerifyAll();
twitterUserDalMock.VerifyAll();
Assert.AreEqual(3, buffer.Count);
buffer.TryReceive(out var result);
@@ -71,34 +71,36 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
for (var i = 0; i < 30; i++)
users.Add(new SyncTwitterUser());
- var settings = new InstanceSettings
- {
- MaxUsersCapacity = 400
- };
+ var maxUsers = 1000;
#endregion
#region Mocks
+ var maxUsersNumberProviderMock = new Mock(MockBehavior.Strict);
+ maxUsersNumberProviderMock
+ .Setup(x => x.GetMaxUsersNumberAsync())
+ .ReturnsAsync(maxUsers);
+
var twitterUserDalMock = new Mock(MockBehavior.Strict);
twitterUserDalMock
.SetupSequence(x => x.GetAllTwitterUsersAsync(
- It.Is(y => y == settings.MaxUsersCapacity)))
+ It.Is(y => y == maxUsers)))
.ReturnsAsync(users.ToArray())
+ .ReturnsAsync(new SyncTwitterUser[0])
+ .ReturnsAsync(new SyncTwitterUser[0])
+ .ReturnsAsync(new SyncTwitterUser[0])
.ReturnsAsync(new SyncTwitterUser[0]);
- twitterUserDalMock
- .Setup(x => x.GetTwitterUsersCountAsync())
- .ReturnsAsync(30);
-
var loggerMock = new Mock>();
#endregion
- var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object);
+ var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, maxUsersNumberProviderMock.Object, loggerMock.Object);
processor.WaitFactor = 2;
processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
await Task.Delay(300);
#region Validations
+ maxUsersNumberProviderMock.VerifyAll();
twitterUserDalMock.VerifyAll();
Assert.AreEqual(15, buffer.Count);
buffer.TryReceive(out var result);
@@ -116,34 +118,36 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
for (var i = 0; i < 31; i++)
users.Add(new SyncTwitterUser());
- var settings = new InstanceSettings
- {
- MaxUsersCapacity = 400
- };
+ var maxUsers = 1000;
#endregion
#region Mocks
+ var maxUsersNumberProviderMock = new Mock(MockBehavior.Strict);
+ maxUsersNumberProviderMock
+ .Setup(x => x.GetMaxUsersNumberAsync())
+ .ReturnsAsync(maxUsers);
+
var twitterUserDalMock = new Mock(MockBehavior.Strict);
twitterUserDalMock
.SetupSequence(x => x.GetAllTwitterUsersAsync(
- It.Is(y => y == settings.MaxUsersCapacity/4)))
+ It.Is(y => y == maxUsers)))
.ReturnsAsync(users.ToArray())
+ .ReturnsAsync(new SyncTwitterUser[0])
+ .ReturnsAsync(new SyncTwitterUser[0])
+ .ReturnsAsync(new SyncTwitterUser[0])
.ReturnsAsync(new SyncTwitterUser[0]);
-
- twitterUserDalMock
- .Setup(x => x.GetTwitterUsersCountAsync())
- .ReturnsAsync(31);
-
+
var loggerMock = new Mock>();
#endregion
- var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object);
+ var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, maxUsersNumberProviderMock.Object, loggerMock.Object);
processor.WaitFactor = 2;
processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
await Task.Delay(200);
#region Validations
+ maxUsersNumberProviderMock.VerifyAll();
twitterUserDalMock.VerifyAll();
Assert.AreEqual(11, buffer.Count);
buffer.TryReceive(out var result);
@@ -157,33 +161,32 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#region Stubs
var buffer = new BufferBlock();
- var settings = new InstanceSettings
- {
- MaxUsersCapacity = 10
- };
+ var maxUsers = 1000;
#endregion
#region Mocks
+ var maxUsersNumberProviderMock = new Mock(MockBehavior.Strict);
+ maxUsersNumberProviderMock
+ .Setup(x => x.GetMaxUsersNumberAsync())
+ .ReturnsAsync(maxUsers);
+
var twitterUserDalMock = new Mock(MockBehavior.Strict);
twitterUserDalMock
.Setup(x => x.GetAllTwitterUsersAsync(
- It.Is(y => y == settings.MaxUsersCapacity)))
+ It.Is(y => y == maxUsers)))
.ReturnsAsync(new SyncTwitterUser[0]);
- twitterUserDalMock
- .Setup(x => x.GetTwitterUsersCountAsync())
- .ReturnsAsync(1000);
-
var loggerMock = new Mock>();
#endregion
- var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object);
+ var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, maxUsersNumberProviderMock.Object, loggerMock.Object);
processor.WaitFactor = 1;
processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
await Task.Delay(50);
#region Validations
+ maxUsersNumberProviderMock.VerifyAll();
twitterUserDalMock.VerifyAll();
Assert.AreEqual(0, buffer.Count);
#endregion
@@ -195,33 +198,32 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#region Stubs
var buffer = new BufferBlock();
- var settings = new InstanceSettings
- {
- MaxUsersCapacity = 10
- };
+ var maxUsers = 1000;
#endregion
#region Mocks
+ var maxUsersNumberProviderMock = new Mock(MockBehavior.Strict);
+ maxUsersNumberProviderMock
+ .Setup(x => x.GetMaxUsersNumberAsync())
+ .ReturnsAsync(maxUsers);
+
var twitterUserDalMock = new Mock(MockBehavior.Strict);
twitterUserDalMock
.Setup(x => x.GetAllTwitterUsersAsync(
- It.Is(y => y == settings.MaxUsersCapacity)))
+ It.Is(y => y == maxUsers)))
.Returns(async () => await DelayFaultedTask(new Exception()));
- twitterUserDalMock
- .Setup(x => x.GetTwitterUsersCountAsync())
- .ReturnsAsync(1000);
-
var loggerMock = new Mock>();
#endregion
- var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object);
+ var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, maxUsersNumberProviderMock.Object, loggerMock.Object);
processor.WaitFactor = 10;
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
await Task.WhenAny(t, Task.Delay(50));
#region Validations
+ maxUsersNumberProviderMock.VerifyAll();
twitterUserDalMock.VerifyAll();
Assert.AreEqual(0, buffer.Count);
#endregion
@@ -236,23 +238,21 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var canTokenS = new CancellationTokenSource();
canTokenS.Cancel();
- var settings = new InstanceSettings
- {
- MaxUsersCapacity = 10
- };
+ var maxUsers = 1000;
#endregion
#region Mocks
+ var maxUsersNumberProviderMock = new Mock(MockBehavior.Strict);
+ maxUsersNumberProviderMock
+ .Setup(x => x.GetMaxUsersNumberAsync())
+ .ReturnsAsync(maxUsers);
+
var twitterUserDalMock = new Mock(MockBehavior.Strict);
-
- twitterUserDalMock
- .Setup(x => x.GetTwitterUsersCountAsync())
- .ReturnsAsync(1000);
-
+
var loggerMock = new Mock>();
#endregion
- var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object);
+ var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, maxUsersNumberProviderMock.Object, loggerMock.Object);
processor.WaitFactor = 1;
await processor.GetTwitterUsersAsync(buffer, canTokenS.Token);
}
diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Tools/MaxUsersNumberProviderTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Tools/MaxUsersNumberProviderTests.cs
new file mode 100644
index 0000000..d48beb8
--- /dev/null
+++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Tools/MaxUsersNumberProviderTests.cs
@@ -0,0 +1,79 @@
+using System.Threading.Tasks;
+using BirdsiteLive.Common.Settings;
+using BirdsiteLive.DAL.Contracts;
+using BirdsiteLive.Pipeline.Tools;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+
+namespace BirdsiteLive.Pipeline.Tests.Tools
+{
+ [TestClass]
+ public class MaxUsersNumberProviderTests
+ {
+ [TestMethod]
+ public async Task GetMaxUsersNumberAsync_WarmUp_Test()
+ {
+ #region Stubs
+ var settings = new InstanceSettings
+ {
+ MaxUsersCapacity = 1000
+ };
+ #endregion
+
+ #region Mocks
+ var twitterUserDalMock = new Mock(MockBehavior.Strict);
+ twitterUserDalMock
+ .Setup(x => x.GetTwitterUsersCountAsync())
+ .ReturnsAsync(1000);
+ #endregion
+
+ var provider = new MaxUsersNumberProvider(settings, twitterUserDalMock.Object);
+
+ var result = await provider.GetMaxUsersNumberAsync();
+ Assert.AreEqual(250, result);
+
+ result = await provider.GetMaxUsersNumberAsync();
+ Assert.AreEqual(250, result);
+
+ result = await provider.GetMaxUsersNumberAsync();
+ Assert.AreEqual(250, result);
+
+ result = await provider.GetMaxUsersNumberAsync();
+ Assert.AreEqual(250, result);
+
+ result = await provider.GetMaxUsersNumberAsync();
+ Assert.AreEqual(1000, result);
+
+ #region Validations
+ twitterUserDalMock.VerifyAll();
+ #endregion
+ }
+
+ [TestMethod]
+ public async Task GetMaxUsersNumberAsync_NoWarmUp_Test()
+ {
+ #region Stubs
+ var settings = new InstanceSettings
+ {
+ MaxUsersCapacity = 1000
+ };
+ #endregion
+
+ #region Mocks
+ var twitterUserDalMock = new Mock(MockBehavior.Strict);
+ twitterUserDalMock
+ .Setup(x => x.GetTwitterUsersCountAsync())
+ .ReturnsAsync(249);
+ #endregion
+
+ var provider = new MaxUsersNumberProvider(settings, twitterUserDalMock.Object);
+
+ var result = await provider.GetMaxUsersNumberAsync();
+ Assert.AreEqual(1000, result);
+
+ #region Validations
+ twitterUserDalMock.VerifyAll();
+ #endregion
+ }
+ }
+}
\ No newline at end of file