dispatch users batch progressively, fix #55

This commit is contained in:
Nicolas Constant 2021-01-19 02:37:24 -05:00
parent 6ae5f06280
commit 7e3d7b7c4f
No known key found for this signature in database
GPG key ID: 1E9F677FB01A5688
3 changed files with 119 additions and 11 deletions

View file

@ -0,0 +1,16 @@
using System.Collections.Generic;
using System.Linq;
namespace BirdsiteLive.Common.Extensions
{
public static class EnumerableExt
{
public static IEnumerable<IEnumerable<T>> Split<T>(this T[] array, int size)
{
for (var i = 0; i < (float)array.Length / size; i++)
{
yield return array.Skip(i * size).Take(size);
}
}
}
}

View file

@ -1,7 +1,9 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.Common.Extensions;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
@ -13,7 +15,7 @@ namespace BirdsiteLive.Pipeline.Processors
{
private readonly ITwitterUserDal _twitterUserDal;
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;
private const int SyncPeriod = 15; //in minutes
public int WaitFactor = 1000 * 60; //1 min
#region Ctor
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger<RetrieveTwitterUsersProcessor> logger)
@ -25,7 +27,7 @@ namespace BirdsiteLive.Pipeline.Processors
public async Task GetTwitterUsersAsync(BufferBlock<SyncTwitterUser[]> twitterUsersBufferBlock, CancellationToken ct)
{
for (;;)
for (; ; )
{
ct.ThrowIfCancellationRequested();
@ -33,15 +35,26 @@ namespace BirdsiteLive.Pipeline.Processors
{
var users = await _twitterUserDal.GetAllTwitterUsersAsync();
if(users.Length > 0)
await twitterUsersBufferBlock.SendAsync(users, ct);
var userCount = users.Any() ? users.Length : 1;
var splitNumber = (int) Math.Ceiling(userCount / 15d);
var splitUsers = users.Split(splitNumber).ToList();
foreach (var u in splitUsers)
{
ct.ThrowIfCancellationRequested();
await twitterUsersBufferBlock.SendAsync(u.ToArray(), ct);
await Task.Delay(WaitFactor, ct);
}
var splitCount = splitUsers.Count();
if (splitCount < 15) await Task.Delay((15 - splitCount) * WaitFactor, ct);
}
catch (Exception e)
{
_logger.LogError(e, "Failing retrieving Twitter Users.");
}
await Task.Delay(SyncPeriod * 1000 * 60, ct);
}
}
}

View file

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
@ -37,13 +38,84 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
processor.WaitFactor = 10;
processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
await Task.Delay(50);
#region Validations
twitterUserDalMock.VerifyAll();
Assert.AreEqual(1, buffer.Count);
Assert.AreEqual(3, buffer.Count);
buffer.TryReceive(out var result);
Assert.AreEqual(1, result.Length);
#endregion
}
[TestMethod]
public async Task GetTwitterUsersAsync_Multi_Test()
{
#region Stubs
var buffer = new BufferBlock<SyncTwitterUser[]>();
var users = new List<SyncTwitterUser>();
for (var i = 0; i < 30; i++)
users.Add(new SyncTwitterUser());
#endregion
#region Mocks
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.SetupSequence(x => x.GetAllTwitterUsersAsync())
.ReturnsAsync(users.ToArray())
.ReturnsAsync(new SyncTwitterUser[0]);
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
processor.WaitFactor = 2;
processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
await Task.Delay(200);
#region Validations
twitterUserDalMock.VerifyAll();
Assert.AreEqual(15, buffer.Count);
buffer.TryReceive(out var result);
Assert.AreEqual(2, result.Length);
#endregion
}
[TestMethod]
public async Task GetTwitterUsersAsync_Multi2_Test()
{
#region Stubs
var buffer = new BufferBlock<SyncTwitterUser[]>();
var users = new List<SyncTwitterUser>();
for (var i = 0; i < 31; i++)
users.Add(new SyncTwitterUser());
#endregion
#region Mocks
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.SetupSequence(x => x.GetAllTwitterUsersAsync())
.ReturnsAsync(users.ToArray())
.ReturnsAsync(new SyncTwitterUser[0]);
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
processor.WaitFactor = 2;
processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
await Task.Delay(200);
#region Validations
twitterUserDalMock.VerifyAll();
Assert.AreEqual(11, buffer.Count);
buffer.TryReceive(out var result);
Assert.AreEqual(3, result.Length);
#endregion
@ -66,6 +138,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
processor.WaitFactor = 1;
processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
await Task.Delay(50);
@ -75,8 +148,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
Assert.AreEqual(0, buffer.Count);
#endregion
}
[TestMethod]
public async Task GetTwitterUsersAsync_Exception_Test()
{
@ -88,12 +160,13 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
twitterUserDalMock
.Setup(x => x.GetAllTwitterUsersAsync())
.Throws(new Exception());
.Returns(async () => await DelayFaultedTask<SyncTwitterUser[]>(new Exception()));
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
processor.WaitFactor = 10;
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
await Task.WhenAny(t, Task.Delay(50));
@ -104,7 +177,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion
}
[TestMethod]
[ExpectedException(typeof(OperationCanceledException))]
public async Task GetTwitterUsersAsync_Cancellation_Test()
@ -121,7 +193,14 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
processor.WaitFactor = 1;
await processor.GetTwitterUsersAsync(buffer, canTokenS.Token);
}
private static async Task<T> DelayFaultedTask<T>(Exception e)
{
await Task.Delay(30);
throw e;
}
}
}