first implementation of tweet clean up's pipeline

This commit is contained in:
Nicolas Constant 2023-01-06 01:50:45 -05:00
parent b223bb0216
commit e579e1b11c
No known key found for this signature in database
GPG Key ID: 1E9F677FB01A5688
10 changed files with 250 additions and 10 deletions

View File

@ -9,6 +9,7 @@ using BirdsiteLive.ActivityPub;
using BirdsiteLive.ActivityPub.Converters;
using BirdsiteLive.ActivityPub.Models;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Models;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
@ -22,6 +23,7 @@ namespace BirdsiteLive.Domain
Task PostNewNoteActivity(Note note, string username, string noteId, string targetHost,
string targetInbox);
Task DeleteUserAsync(string username, string targetHost, string targetInbox);
Task DeleteNoteAsync(SyncTweet tweet);
}
public class WebFinger
@ -108,6 +110,30 @@ namespace BirdsiteLive.Domain
}
}
public async Task DeleteNoteAsync(SyncTweet tweet)
{
var acct = tweet.Acct.ToLowerInvariant().Trim();
var actor = $"https://{_instanceSettings.Domain}/users/{acct}";
var noteId = $"https://{_instanceSettings.Domain}/users/{acct}/statuses/{tweet.TweetId}";
var delete = new ActivityDelete
{
context = "https://www.w3.org/ns/activitystreams",
id = $"{noteId}#delete",
type = "Delete",
actor = actor,
to = new[] { "https://www.w3.org/ns/activitystreams#Public" },
apObject = new Tombstone
{
id = noteId,
atomUrl = noteId
}
};
await PostDataAsync(delete, tweet.Host, actor, tweet.Inbox);
}
public async Task PostNewNoteActivity(Note note, string username, string noteId, string targetHost, string targetInbox)
{
try

View File

@ -0,0 +1,11 @@
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.Pipeline.Models;
namespace BirdsiteLive.Pipeline.Contracts.TweetsCleanUp
{
public interface IDeleteTweetsProcessor
{
Task<TweetToDelete> ProcessAsync(TweetToDelete tweet, CancellationToken ct);
}
}

View File

@ -0,0 +1,12 @@
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.Pipeline.Models;
namespace BirdsiteLive.Pipeline.Contracts.TweetsCleanUp
{
public interface IRetrieveTweetsToDeleteProcessor
{
Task GetTweetsAsync(BufferBlock<TweetToDelete> tweetsBufferBlock, CancellationToken ct);
}
}

View File

@ -0,0 +1,11 @@
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.Pipeline.Models;
namespace BirdsiteLive.Pipeline.Contracts.TweetsCleanUp
{
public interface ISaveDeletedTweetStatusProcessor
{
Task ProcessAsync(TweetToDelete tweetToDelete, CancellationToken ct);
}
}

View File

@ -0,0 +1,10 @@
using BirdsiteLive.DAL.Models;
namespace BirdsiteLive.Pipeline.Models
{
public class TweetToDelete
{
public SyncTweet Tweet { get; set; }
public bool DeleteSuccessful { get; set; }
}
}

View File

@ -1,7 +1,44 @@
namespace BirdsiteLive.Pipeline.Processors.TweetsCleanUp
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.Domain;
using BirdsiteLive.Pipeline.Contracts.TweetsCleanUp;
using BirdsiteLive.Pipeline.Models;
using Microsoft.Extensions.Logging;
namespace BirdsiteLive.Pipeline.Processors.TweetsCleanUp
{
public class DeleteTweetsProcessor
public class DeleteTweetsProcessor : IDeleteTweetsProcessor
{
private readonly IActivityPubService _activityPubService;
private readonly ILogger<DeleteTweetsProcessor> _logger;
#region Ctor
public DeleteTweetsProcessor(IActivityPubService activityPubService, ILogger<DeleteTweetsProcessor> logger)
{
_activityPubService = activityPubService;
_logger = logger;
}
#endregion
public async Task<TweetToDelete> ProcessAsync(TweetToDelete tweet, CancellationToken ct)
{
try
{
await _activityPubService.DeleteNoteAsync(tweet.Tweet);
tweet.DeleteSuccessful = true;
}
catch (HttpRequestException e)
{
//TODO check code under .NET 5
}
catch (Exception e)
{
_logger.LogError(e.Message, e);
}
return tweet;
}
}
}

View File

@ -1,7 +1,58 @@
namespace BirdsiteLive.Pipeline.Processors.TweetsCleanUp
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.Pipeline.Contracts.TweetsCleanUp;
using BirdsiteLive.Pipeline.Models;
namespace BirdsiteLive.Pipeline.Processors.TweetsCleanUp
{
public class RetrieveTweetsToDeleteProcessor
public class RetrieveTweetsToDeleteProcessor : IRetrieveTweetsToDeleteProcessor
{
private readonly ISyncTweetsPostgresDal _syncTweetsPostgresDal;
#region Ctor
public RetrieveTweetsToDeleteProcessor(ISyncTweetsPostgresDal syncTweetsPostgresDal)
{
_syncTweetsPostgresDal = syncTweetsPostgresDal;
}
#endregion
public async Task GetTweetsAsync(BufferBlock<TweetToDelete> tweetsBufferBlock, CancellationToken ct)
{
var batchSize = 100;
for (;;)
{
ct.ThrowIfCancellationRequested();
var now = DateTime.UtcNow;
var from = now.AddDays(-20);
var dbBrowsingEnded = false;
var lastId = -1L;
do
{
var tweets = await _syncTweetsPostgresDal.GetTweetsOlderThanAsync(from, lastId, batchSize);
foreach (var syncTweet in tweets)
{
var tweet = new TweetToDelete
{
Tweet = syncTweet
};
await tweetsBufferBlock.SendAsync(tweet, ct);
}
if (tweets.Any()) lastId = tweets.Last().Id;
if (tweets.Count < batchSize) dbBrowsingEnded = true;
} while (!dbBrowsingEnded);
await Task.Delay(TimeSpan.FromHours(3), ct);
}
}
}
}

View File

@ -1,7 +1,30 @@
namespace BirdsiteLive.Pipeline.Processors.TweetsCleanUp
using System;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.Pipeline.Contracts.TweetsCleanUp;
using BirdsiteLive.Pipeline.Models;
namespace BirdsiteLive.Pipeline.Processors.TweetsCleanUp
{
public class SaveDeletedTweetStatusProcessor
public class SaveDeletedTweetStatusProcessor : ISaveDeletedTweetStatusProcessor
{
private readonly ISyncTweetsPostgresDal _syncTweetsPostgresDal;
#region Ctor
public SaveDeletedTweetStatusProcessor(ISyncTweetsPostgresDal syncTweetsPostgresDal)
{
_syncTweetsPostgresDal = syncTweetsPostgresDal;
}
#endregion
public async Task ProcessAsync(TweetToDelete tweetToDelete, CancellationToken ct)
{
var highLimitDate = DateTime.UtcNow.AddDays(-40); //TODO get settings value
if (tweetToDelete.DeleteSuccessful || tweetToDelete.Tweet.PublishedAt < highLimitDate)
{
await _syncTweetsPostgresDal.DeleteTweetAsync(tweetToDelete.Tweet.Id);
}
}
}
}

View File

@ -1,5 +1,9 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.Pipeline.Contracts.TweetsCleanUp;
using BirdsiteLive.Pipeline.Models;
using Microsoft.Extensions.Logging;
namespace BirdsiteLive.Pipeline
{
@ -10,9 +14,63 @@ namespace BirdsiteLive.Pipeline
public class TweetCleanUpPipeline : ITweetCleanUpPipeline
{
private readonly IRetrieveTweetsToDeleteProcessor _retrieveTweetsToDeleteProcessor;
private readonly IDeleteTweetsProcessor _deleteTweetsProcessor;
private readonly ISaveDeletedTweetStatusProcessor _saveDeletedTweetStatusProcessor;
private readonly ILogger<TweetCleanUpPipeline> _logger;
#region Ctor
public TweetCleanUpPipeline(IRetrieveTweetsToDeleteProcessor retrieveTweetsToDeleteProcessor, IDeleteTweetsProcessor deleteTweetsProcessor, ISaveDeletedTweetStatusProcessor saveDeletedTweetStatusProcessor, ILogger<TweetCleanUpPipeline> logger)
{
_retrieveTweetsToDeleteProcessor = retrieveTweetsToDeleteProcessor;
_deleteTweetsProcessor = deleteTweetsProcessor;
_saveDeletedTweetStatusProcessor = saveDeletedTweetStatusProcessor;
_logger = logger;
}
#endregion
public async Task ExecuteAsync(CancellationToken ct)
{
throw new System.NotImplementedException();
// Create blocks
var tweetsToDeleteBufferBlock = new BufferBlock<TweetToDelete>(new DataflowBlockOptions
{
BoundedCapacity = 200,
CancellationToken = ct
});
var deleteTweetsBlock = new TransformBlock<TweetToDelete, TweetToDelete>(
async x => await _deleteTweetsProcessor.ProcessAsync(x, ct),
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 5,
CancellationToken = ct
});
var deletedTweetsBufferBlock = new BufferBlock<TweetToDelete>(new DataflowBlockOptions
{
BoundedCapacity = 200,
CancellationToken = ct
});
var saveProgressionBlock = new ActionBlock<TweetToDelete>(
async x => await _saveDeletedTweetStatusProcessor.ProcessAsync(x, ct),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
CancellationToken = ct
});
// Link pipeline
var dataflowLinkOptions = new DataflowLinkOptions { PropagateCompletion = true };
tweetsToDeleteBufferBlock.LinkTo(deleteTweetsBlock, dataflowLinkOptions);
deleteTweetsBlock.LinkTo(deletedTweetsBufferBlock, dataflowLinkOptions);
deletedTweetsBufferBlock.LinkTo(saveProgressionBlock, dataflowLinkOptions);
// Launch tweet retriever
var retrieveTweetsToDeleteTask = _retrieveTweetsToDeleteProcessor.GetTweetsAsync(tweetsToDeleteBufferBlock, ct);
// Wait
await Task.WhenAny(new[] { retrieveTweetsToDeleteTask, saveProgressionBlock.Completion });
var ex = retrieveTweetsToDeleteTask.IsFaulted ? retrieveTweetsToDeleteTask.Exception : saveProgressionBlock.Completion.Exception;
_logger.LogCritical(ex, "An error occurred, pipeline stopped");
}
}
}

View File

@ -7,6 +7,7 @@ namespace BirdsiteLive.DAL.Models
public long Id { get; set; }
public string Acct { get; set; }
public string Host { get; set; } //TODO
public string Inbox { get; set; }
public long TweetId { get; set; }
public DateTime PublishedAt { get; set; }