created syncTweets DAL
This commit is contained in:
parent
432484eaaa
commit
584266a040
|
@ -23,7 +23,7 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
|
|||
public class DbInitializerPostgresDal : PostgresBase, IDbInitializerDal
|
||||
{
|
||||
private readonly PostgresTools _tools;
|
||||
private readonly Version _currentVersion = new Version(2, 5);
|
||||
private readonly Version _currentVersion = new Version(2, 6);
|
||||
private const string DbVersionType = "db-version";
|
||||
|
||||
#region Ctor
|
||||
|
@ -136,7 +136,8 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
|
|||
new Tuple<Version, Version>(new Version(2,1), new Version(2,2)),
|
||||
new Tuple<Version, Version>(new Version(2,2), new Version(2,3)),
|
||||
new Tuple<Version, Version>(new Version(2,3), new Version(2,4)),
|
||||
new Tuple<Version, Version>(new Version(2,4), new Version(2,5))
|
||||
new Tuple<Version, Version>(new Version(2,4), new Version(2,5)),
|
||||
new Tuple<Version, Version>(new Version(2,5), new Version(2,6))
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -184,6 +185,22 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
|
|||
var addDeletedToAcct = $@"ALTER TABLE {_settings.TwitterUserTableName} ADD deleted BOOLEAN";
|
||||
await _tools.ExecuteRequestAsync(addDeletedToAcct);
|
||||
}
|
||||
else if (from == new Version(2, 5) && to == new Version(2, 6))
|
||||
{
|
||||
// Create Synchronized Tweets table
|
||||
var createFollowers = $@"CREATE TABLE {_settings.SynchronizedTweetsTableName}
|
||||
(
|
||||
id SERIAL PRIMARY KEY,
|
||||
|
||||
acct VARCHAR(50) NOT NULL,
|
||||
tweetId BIGINT NOT NULL,
|
||||
inbox VARCHAR(2048) NOT NULL,
|
||||
publishedAt TIMESTAMP (2) WITHOUT TIME ZONE NOT NULL,
|
||||
|
||||
UNIQUE (acct, tweetId, inbox)
|
||||
);";
|
||||
await _tools.ExecuteRequestAsync(createFollowers);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
|
@ -212,7 +229,8 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
|
|||
$@"DROP TABLE {_settings.DbVersionTableName};",
|
||||
$@"DROP TABLE {_settings.TwitterUserTableName};",
|
||||
$@"DROP TABLE {_settings.FollowersTableName};",
|
||||
$@"DROP TABLE {_settings.CachedTweetsTableName};"
|
||||
$@"DROP TABLE {_settings.CachedTweetsTableName};",
|
||||
$@"DROP TABLE {_settings.SynchronizedTweetsTableName};"
|
||||
};
|
||||
|
||||
foreach (var r in dropsRequests)
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using BirdsiteLive.DAL.Contracts;
|
||||
using BirdsiteLive.DAL.Models;
|
||||
using BirdsiteLive.DAL.Postgres.DataAccessLayers.Base;
|
||||
using BirdsiteLive.DAL.Postgres.Settings;
|
||||
using Dapper;
|
||||
|
||||
namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
|
||||
{
|
||||
public class SyncTweetsPostgresDal : PostgresBase, ISyncTweetsPostgresDal
|
||||
{
|
||||
#region Ctor
|
||||
public SyncTweetsPostgresDal(PostgresSettings settings) : base(settings)
|
||||
{
|
||||
}
|
||||
#endregion
|
||||
|
||||
public async Task<long> SaveTweetAsync(SyncTweet tweet)
|
||||
{
|
||||
if (tweet.PublishedAt == default) throw new ArgumentException("publishedAt");
|
||||
if (tweet.TweetId == default) throw new ArgumentException("tweetId");
|
||||
|
||||
var acct = tweet.Acct.ToLowerInvariant().Trim();
|
||||
var inbox = tweet.Inbox.ToLowerInvariant().Trim();
|
||||
|
||||
using (var dbConnection = Connection)
|
||||
{
|
||||
dbConnection.Open();
|
||||
|
||||
return (await dbConnection.QueryAsync<long>(
|
||||
$"INSERT INTO {_settings.SynchronizedTweetsTableName} (acct,tweetId,inbox,publishedAt) VALUES(@acct,@tweetId,@inbox,@publishedAt) RETURNING id",
|
||||
new
|
||||
{
|
||||
acct,
|
||||
tweetId = tweet.TweetId,
|
||||
inbox,
|
||||
publishedAt = tweet.PublishedAt.ToUniversalTime()
|
||||
})).First();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<SyncTweet> GetTweetAsync(long id)
|
||||
{
|
||||
if (id == default) throw new ArgumentException("id");
|
||||
|
||||
var query = $"SELECT * FROM {_settings.SynchronizedTweetsTableName} WHERE id = @id";
|
||||
|
||||
using (var dbConnection = Connection)
|
||||
{
|
||||
dbConnection.Open();
|
||||
|
||||
var result = (await dbConnection.QueryAsync<SyncTweet>(query, new { id })).FirstOrDefault();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task DeleteTweetAsync(long id)
|
||||
{
|
||||
if (id == default) throw new ArgumentException("id");
|
||||
|
||||
var query = $"DELETE FROM {_settings.SynchronizedTweetsTableName} WHERE id = @id";
|
||||
|
||||
using (var dbConnection = Connection)
|
||||
{
|
||||
dbConnection.Open();
|
||||
|
||||
await dbConnection.QueryAsync(query, new { id });
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<List<SyncTweet>> GetTweetsOlderThanAsync(DateTime date, long from = -1, int size = 100)
|
||||
{
|
||||
if (date == default) throw new ArgumentException("date");
|
||||
|
||||
var query = $"SELECT * FROM {_settings.SynchronizedTweetsTableName} WHERE id > @from AND publishedAt < @date ORDER BY id ASC LIMIT @size";
|
||||
|
||||
using (var dbConnection = Connection)
|
||||
{
|
||||
dbConnection.Open();
|
||||
|
||||
var result = await dbConnection.QueryAsync<SyncTweet>(query, new
|
||||
{
|
||||
from,
|
||||
date,
|
||||
size
|
||||
});
|
||||
return result.ToList();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -8,5 +8,6 @@
|
|||
public string TwitterUserTableName { get; set; } = "twitter_users";
|
||||
public string FollowersTableName { get; set; } = "followers";
|
||||
public string CachedTweetsTableName { get; set; } = "cached_tweets";
|
||||
public string SynchronizedTweetsTableName { get; set; } = "sync_tweets";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using BirdsiteLive.DAL.Models;
|
||||
|
||||
namespace BirdsiteLive.DAL.Contracts
|
||||
{
|
||||
public interface ISyncTweetsPostgresDal
|
||||
{
|
||||
Task<long> SaveTweetAsync(SyncTweet tweet);
|
||||
Task<SyncTweet> GetTweetAsync(long id);
|
||||
Task DeleteTweetAsync(long id);
|
||||
Task<List<SyncTweet>> GetTweetsOlderThanAsync(DateTime date, long from = -1, int size = 100);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
using System;
|
||||
|
||||
namespace BirdsiteLive.DAL.Models
|
||||
{
|
||||
public class SyncTweet
|
||||
{
|
||||
public long Id { get; set; }
|
||||
|
||||
public string Acct { get; set; }
|
||||
public string Inbox { get; set; }
|
||||
public long TweetId { get; set; }
|
||||
public DateTime PublishedAt { get; set; }
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers.Base
|
|||
CachedTweetsTableName = "CachedTweetsTableName" + RandomGenerator.GetString(4),
|
||||
FollowersTableName = "FollowersTableName" + RandomGenerator.GetString(4),
|
||||
TwitterUserTableName = "TwitterUserTableName" + RandomGenerator.GetString(4),
|
||||
SynchronizedTweetsTableName = "SynchronizedTweetsTableName" + RandomGenerator.GetString(4),
|
||||
};
|
||||
_tools = new PostgresTools(_settings);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
using System;
|
||||
using System.Linq;
|
||||
using BirdsiteLive.DAL.Postgres.DataAccessLayers;
|
||||
using BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers.Base;
|
||||
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||||
using System.Threading.Tasks;
|
||||
using BirdsiteLive.DAL.Models;
|
||||
|
||||
namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers
|
||||
{
|
||||
[TestClass]
|
||||
public class SyncTweetsPostgresDalTests : PostgresTestingBase
|
||||
{
|
||||
[TestInitialize]
|
||||
public async Task TestInit()
|
||||
{
|
||||
var dal = new DbInitializerPostgresDal(_settings, _tools);
|
||||
var init = new DatabaseInitializer(dal);
|
||||
await init.InitAndMigrateDbAsync();
|
||||
}
|
||||
|
||||
[TestCleanup]
|
||||
public async Task CleanUp()
|
||||
{
|
||||
var dal = new DbInitializerPostgresDal(_settings, _tools);
|
||||
await dal.DeleteAllAsync();
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public async Task CreateAndGetTweets()
|
||||
{
|
||||
var tweet = new SyncTweet
|
||||
{
|
||||
Acct = "test",
|
||||
PublishedAt = DateTime.UtcNow,
|
||||
Inbox = "https://instance.ext/inbox",
|
||||
TweetId = 4567889
|
||||
};
|
||||
|
||||
var dal = new SyncTweetsPostgresDal(_settings);
|
||||
|
||||
var id = await dal.SaveTweetAsync(tweet);
|
||||
var result = await dal.GetTweetAsync(id);
|
||||
|
||||
Assert.IsNotNull(result);
|
||||
|
||||
Assert.IsTrue(result.Id > 0);
|
||||
Assert.AreEqual(tweet.Acct, result.Acct);
|
||||
Assert.AreEqual(tweet.Inbox, result.Inbox);
|
||||
Assert.AreEqual(tweet.TweetId, result.TweetId);
|
||||
Assert.IsTrue(Math.Abs((tweet.PublishedAt - result.PublishedAt).Seconds) < 5);
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public async Task CreateDeleteAndGetTweets()
|
||||
{
|
||||
var tweet = new SyncTweet
|
||||
{
|
||||
Acct = "test",
|
||||
PublishedAt = DateTime.UtcNow,
|
||||
Inbox = "https://instance.ext/inbox",
|
||||
TweetId = 4567889
|
||||
};
|
||||
|
||||
var dal = new SyncTweetsPostgresDal(_settings);
|
||||
|
||||
var id = await dal.SaveTweetAsync(tweet);
|
||||
await dal.DeleteTweetAsync(id);
|
||||
var result = await dal.GetTweetAsync(id);
|
||||
|
||||
Assert.IsNull(result);
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public async Task CreateAndGetTweetsByDate()
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
var dal = new SyncTweetsPostgresDal(_settings);
|
||||
|
||||
for (var i = 0; i < 100; i++)
|
||||
{
|
||||
var tweet = new SyncTweet
|
||||
{
|
||||
Acct = "test",
|
||||
PublishedAt = now.AddDays(-10 - i),
|
||||
Inbox = "https://instance.ext/inbox",
|
||||
TweetId = 4567889 + i
|
||||
};
|
||||
await dal.SaveTweetAsync(tweet);
|
||||
}
|
||||
|
||||
var date = now.AddDays(-20);
|
||||
var result = await dal.GetTweetsOlderThanAsync(date, -1, 10);
|
||||
|
||||
Assert.IsNotNull(result);
|
||||
Assert.IsTrue(result.Count == 10);
|
||||
|
||||
foreach (var res in result)
|
||||
{
|
||||
Assert.IsTrue(res.PublishedAt < date);
|
||||
Assert.IsTrue(res.Id > 10);
|
||||
Assert.IsTrue(res.Id < 25);
|
||||
}
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public async Task CreateAndGetTweetsByDate_Offset()
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
var dal = new SyncTweetsPostgresDal(_settings);
|
||||
|
||||
for (var i = 0; i < 100; i++)
|
||||
{
|
||||
var tweet = new SyncTweet
|
||||
{
|
||||
Acct = "test",
|
||||
PublishedAt = now.AddDays(-10 - i),
|
||||
Inbox = "https://instance.ext/inbox",
|
||||
TweetId = 4567889 + i
|
||||
};
|
||||
await dal.SaveTweetAsync(tweet);
|
||||
}
|
||||
|
||||
var date = now.AddDays(-20);
|
||||
var result = await dal.GetTweetsOlderThanAsync(date, 1000, 10);
|
||||
|
||||
Assert.IsNotNull(result);
|
||||
Assert.IsTrue(result.Count == 0);
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public async Task CreateAndGetTweetsByDate_Iteration()
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
var dal = new SyncTweetsPostgresDal(_settings);
|
||||
|
||||
for (var i = 0; i < 100; i++)
|
||||
{
|
||||
var tweet = new SyncTweet
|
||||
{
|
||||
Acct = "test",
|
||||
PublishedAt = now.AddDays(-10 - i),
|
||||
Inbox = "https://instance.ext/inbox",
|
||||
TweetId = 4567889 + i
|
||||
};
|
||||
await dal.SaveTweetAsync(tweet);
|
||||
}
|
||||
|
||||
var date = now.AddDays(-20);
|
||||
var result = await dal.GetTweetsOlderThanAsync(date, -1, 10);
|
||||
var result2 = await dal.GetTweetsOlderThanAsync(date, result.Last().Id, 10);
|
||||
|
||||
var global = result.ToList();
|
||||
global.AddRange(result2);
|
||||
var d = global.GroupBy(x => x.Id).Count();
|
||||
Assert.AreEqual(20, d);
|
||||
|
||||
foreach (var res in global)
|
||||
{
|
||||
Assert.IsTrue(res.PublishedAt < date);
|
||||
Assert.IsTrue(res.Id > 10);
|
||||
Assert.IsTrue(res.Id < 35);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue