made tweet fetching concurrent instead
This commit is contained in:
parent
d1018881ec
commit
9026273f45
2 changed files with 33 additions and 20 deletions
|
@ -1,4 +1,5 @@
|
|||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
|
@ -33,35 +34,47 @@ namespace BirdsiteLive.Pipeline.Processors
|
|||
|
||||
public async Task<UserWithDataToSync[]> ProcessAsync(UserWithDataToSync[] syncTwitterUsers, CancellationToken ct)
|
||||
{
|
||||
var usersWtTweets = new List<UserWithDataToSync>();
|
||||
var usersWtTweets = new ConcurrentBag<UserWithDataToSync>();
|
||||
|
||||
List<Task> todo = new List<Task>();
|
||||
int index = 0;
|
||||
foreach (var userWtData in syncTwitterUsers)
|
||||
{
|
||||
index++;
|
||||
var user = userWtData.User;
|
||||
var tweets = await RetrieveNewTweets(user);
|
||||
_logger.LogInformation(index + "/" + syncTwitterUsers.Count() + " Got " + tweets.Length + " tweets from user " + user.Acct + " " );
|
||||
if (tweets.Length > 0 && user.LastTweetPostedId != -1)
|
||||
|
||||
var t = Task.Run(async () => {
|
||||
var user = userWtData.User;
|
||||
var tweets = await RetrieveNewTweets(user);
|
||||
_logger.LogInformation(index + "/" + syncTwitterUsers.Count() + " Got " + tweets.Length + " tweets from user " + user.Acct + " " );
|
||||
if (tweets.Length > 0 && user.LastTweetPostedId != -1)
|
||||
{
|
||||
userWtData.Tweets = tweets;
|
||||
usersWtTweets.Add(userWtData);
|
||||
}
|
||||
else if (tweets.Length > 0 && user.LastTweetPostedId == -1)
|
||||
{
|
||||
var tweetId = tweets.Last().Id;
|
||||
var now = DateTime.UtcNow;
|
||||
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now);
|
||||
}
|
||||
else
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now);
|
||||
}
|
||||
});
|
||||
todo.Add(t);
|
||||
if (todo.Count > 3)
|
||||
{
|
||||
userWtData.Tweets = tweets;
|
||||
usersWtTweets.Add(userWtData);
|
||||
}
|
||||
else if (tweets.Length > 0 && user.LastTweetPostedId == -1)
|
||||
{
|
||||
var tweetId = tweets.Last().Id;
|
||||
var now = DateTime.UtcNow;
|
||||
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now);
|
||||
}
|
||||
else
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now);
|
||||
await Task.WhenAll(todo);
|
||||
_logger.LogInformation(index + "/" + syncTwitterUsers.Count() );
|
||||
todo.Clear();
|
||||
//await Task.Delay(250);
|
||||
}
|
||||
|
||||
await Task.Delay(250);
|
||||
}
|
||||
|
||||
await Task.WhenAll(todo);
|
||||
return usersWtTweets.ToArray();
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ namespace BirdsiteLive.Pipeline
|
|||
// Create blocks
|
||||
var twitterUserToRefreshBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions
|
||||
{ BoundedCapacity = 1, CancellationToken = ct });
|
||||
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 2 } );
|
||||
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 } );
|
||||
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
|
||||
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
|
||||
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
|
||||
|
|
Reference in a new issue