refactor: store migrator

This commit is contained in:
Steven
2024-08-16 08:07:30 +08:00
parent 1ae3afc0ba
commit 6e901fc940
82 changed files with 1494 additions and 402 deletions

View File

@ -1,132 +0,0 @@
-- migration_history
CREATE TABLE migration_history (
version TEXT NOT NULL PRIMARY KEY,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW())
);
-- system_setting
CREATE TABLE system_setting (
name TEXT NOT NULL PRIMARY KEY,
value TEXT NOT NULL,
description TEXT NOT NULL
);
-- user
CREATE TABLE "user" (
id SERIAL PRIMARY KEY,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
updated_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
row_status TEXT NOT NULL DEFAULT 'NORMAL',
username TEXT NOT NULL UNIQUE,
role TEXT NOT NULL DEFAULT 'USER',
email TEXT NOT NULL DEFAULT '',
nickname TEXT NOT NULL DEFAULT '',
password_hash TEXT NOT NULL,
avatar_url TEXT NOT NULL,
description TEXT NOT NULL DEFAULT ''
);
-- user_setting
CREATE TABLE user_setting (
user_id INTEGER NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
UNIQUE(user_id, key)
);
-- memo
CREATE TABLE memo (
id SERIAL PRIMARY KEY,
uid TEXT NOT NULL UNIQUE,
creator_id INTEGER NOT NULL,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
updated_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
row_status TEXT NOT NULL DEFAULT 'NORMAL',
content TEXT NOT NULL,
visibility TEXT NOT NULL DEFAULT 'PRIVATE',
tags JSONB NOT NULL DEFAULT '[]',
payload JSONB NOT NULL DEFAULT '{}'
);
-- memo_organizer
CREATE TABLE memo_organizer (
memo_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
pinned INTEGER NOT NULL DEFAULT 0,
UNIQUE(memo_id, user_id)
);
-- memo_relation
CREATE TABLE memo_relation (
memo_id INTEGER NOT NULL,
related_memo_id INTEGER NOT NULL,
type TEXT NOT NULL,
UNIQUE(memo_id, related_memo_id, type)
);
-- resource
CREATE TABLE resource (
id SERIAL PRIMARY KEY,
uid TEXT NOT NULL UNIQUE,
creator_id INTEGER NOT NULL,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
updated_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
filename TEXT NOT NULL,
blob BYTEA,
type TEXT NOT NULL DEFAULT '',
size INTEGER NOT NULL DEFAULT 0,
memo_id INTEGER DEFAULT NULL,
storage_type TEXT NOT NULL DEFAULT '',
reference TEXT NOT NULL DEFAULT '',
payload TEXT NOT NULL DEFAULT '{}'
);
-- activity
CREATE TABLE activity (
id SERIAL PRIMARY KEY,
creator_id INTEGER NOT NULL,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
type TEXT NOT NULL DEFAULT '',
level TEXT NOT NULL DEFAULT 'INFO',
payload JSONB NOT NULL DEFAULT '{}'
);
-- idp
CREATE TABLE idp (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
identifier_filter TEXT NOT NULL DEFAULT '',
config JSONB NOT NULL DEFAULT '{}'
);
-- inbox
CREATE TABLE inbox (
id SERIAL PRIMARY KEY,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
sender_id INTEGER NOT NULL,
receiver_id INTEGER NOT NULL,
status TEXT NOT NULL,
message TEXT NOT NULL
);
-- webhook
CREATE TABLE webhook (
id SERIAL PRIMARY KEY,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
updated_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
row_status TEXT NOT NULL DEFAULT 'NORMAL',
creator_id INTEGER NOT NULL,
name TEXT NOT NULL,
url TEXT NOT NULL
);
-- reaction
CREATE TABLE reaction (
id SERIAL PRIMARY KEY,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
creator_id INTEGER NOT NULL,
content_id TEXT NOT NULL,
reaction_type TEXT NOT NULL,
UNIQUE(creator_id, content_id, reaction_type)
);

View File

@ -1,15 +0,0 @@
ALTER TABLE memo ADD COLUMN resource_name TEXT;
UPDATE memo SET resource_name = uuid_in(md5(random()::text || random()::text)::cstring);
ALTER TABLE memo ALTER COLUMN resource_name SET NOT NULL;
CREATE UNIQUE INDEX idx_memo_resource_name ON memo (resource_name);
ALTER TABLE resource ADD COLUMN resource_name TEXT;
UPDATE resource SET resource_name = uuid_in(md5(random()::text || random()::text)::cstring);
ALTER TABLE resource ALTER COLUMN resource_name SET NOT NULL;
CREATE UNIQUE INDEX idx_resource_resource_name ON resource (resource_name);

View File

@ -1,9 +0,0 @@
-- reaction
CREATE TABLE reaction (
id SERIAL PRIMARY KEY,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
creator_id INTEGER NOT NULL,
content_id TEXT NOT NULL,
reaction_type TEXT NOT NULL,
UNIQUE(creator_id, content_id, reaction_type)
);

View File

@ -1 +0,0 @@
ALTER TABLE "user" ADD COLUMN description TEXT NOT NULL DEFAULT '';

View File

@ -1,3 +0,0 @@
ALTER TABLE memo RENAME COLUMN resource_name TO uid;
ALTER TABLE resource RENAME COLUMN resource_name TO uid;

View File

@ -1,11 +0,0 @@
ALTER TABLE resource ADD COLUMN storage_type TEXT NOT NULL DEFAULT '';
ALTER TABLE resource ADD COLUMN reference TEXT NOT NULL DEFAULT '';
ALTER TABLE resource ADD COLUMN payload TEXT NOT NULL DEFAULT '{}';
UPDATE resource SET storage_type = 'LOCAL', reference = internal_path WHERE internal_path IS NOT NULL AND internal_path != '';
UPDATE resource SET storage_type = 'EXTERNAL', reference = external_link WHERE external_link IS NOT NULL AND external_link != '';
ALTER TABLE resource DROP COLUMN internal_path;
ALTER TABLE resource DROP COLUMN external_link;

View File

@ -1 +0,0 @@
ALTER TABLE memo ADD COLUMN tags JSONB NOT NULL DEFAULT '[]';

View File

@ -1 +0,0 @@
ALTER TABLE memo ADD COLUMN payload JSONB NOT NULL DEFAULT '{}';

View File

@ -1 +0,0 @@
DROP TABLE IF EXISTS tag;

View File

@ -1,132 +0,0 @@
-- migration_history
CREATE TABLE migration_history (
version TEXT NOT NULL PRIMARY KEY,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW())
);
-- system_setting
CREATE TABLE system_setting (
name TEXT NOT NULL PRIMARY KEY,
value TEXT NOT NULL,
description TEXT NOT NULL
);
-- user
CREATE TABLE "user" (
id SERIAL PRIMARY KEY,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
updated_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
row_status TEXT NOT NULL DEFAULT 'NORMAL',
username TEXT NOT NULL UNIQUE,
role TEXT NOT NULL DEFAULT 'USER',
email TEXT NOT NULL DEFAULT '',
nickname TEXT NOT NULL DEFAULT '',
password_hash TEXT NOT NULL,
avatar_url TEXT NOT NULL,
description TEXT NOT NULL DEFAULT ''
);
-- user_setting
CREATE TABLE user_setting (
user_id INTEGER NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
UNIQUE(user_id, key)
);
-- memo
CREATE TABLE memo (
id SERIAL PRIMARY KEY,
uid TEXT NOT NULL UNIQUE,
creator_id INTEGER NOT NULL,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
updated_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
row_status TEXT NOT NULL DEFAULT 'NORMAL',
content TEXT NOT NULL,
visibility TEXT NOT NULL DEFAULT 'PRIVATE',
tags JSONB NOT NULL DEFAULT '[]',
payload JSONB NOT NULL DEFAULT '{}'
);
-- memo_organizer
CREATE TABLE memo_organizer (
memo_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
pinned INTEGER NOT NULL DEFAULT 0,
UNIQUE(memo_id, user_id)
);
-- memo_relation
CREATE TABLE memo_relation (
memo_id INTEGER NOT NULL,
related_memo_id INTEGER NOT NULL,
type TEXT NOT NULL,
UNIQUE(memo_id, related_memo_id, type)
);
-- resource
CREATE TABLE resource (
id SERIAL PRIMARY KEY,
uid TEXT NOT NULL UNIQUE,
creator_id INTEGER NOT NULL,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
updated_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
filename TEXT NOT NULL,
blob BYTEA,
type TEXT NOT NULL DEFAULT '',
size INTEGER NOT NULL DEFAULT 0,
memo_id INTEGER DEFAULT NULL,
storage_type TEXT NOT NULL DEFAULT '',
reference TEXT NOT NULL DEFAULT '',
payload TEXT NOT NULL DEFAULT '{}'
);
-- activity
CREATE TABLE activity (
id SERIAL PRIMARY KEY,
creator_id INTEGER NOT NULL,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
type TEXT NOT NULL DEFAULT '',
level TEXT NOT NULL DEFAULT 'INFO',
payload JSONB NOT NULL DEFAULT '{}'
);
-- idp
CREATE TABLE idp (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
identifier_filter TEXT NOT NULL DEFAULT '',
config JSONB NOT NULL DEFAULT '{}'
);
-- inbox
CREATE TABLE inbox (
id SERIAL PRIMARY KEY,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
sender_id INTEGER NOT NULL,
receiver_id INTEGER NOT NULL,
status TEXT NOT NULL,
message TEXT NOT NULL
);
-- webhook
CREATE TABLE webhook (
id SERIAL PRIMARY KEY,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
updated_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
row_status TEXT NOT NULL DEFAULT 'NORMAL',
creator_id INTEGER NOT NULL,
name TEXT NOT NULL,
url TEXT NOT NULL
);
-- reaction
CREATE TABLE reaction (
id SERIAL PRIMARY KEY,
created_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()),
creator_id INTEGER NOT NULL,
content_id TEXT NOT NULL,
reaction_type TEXT NOT NULL,
UNIQUE(creator_id, content_id, reaction_type)
);

View File

@ -1,170 +0,0 @@
package postgres
import (
"context"
"embed"
"fmt"
"io/fs"
"regexp"
"sort"
"strings"
"github.com/pkg/errors"
"github.com/usememos/memos/server/version"
"github.com/usememos/memos/store"
)
//go:embed migration
var migrationFS embed.FS
const (
latestSchemaFileName = "LATEST__SCHEMA.sql"
)
func (d *DB) Migrate(ctx context.Context) error {
if d.profile.IsDev() {
return d.nonProdMigrate(ctx)
}
return d.prodMigrate(ctx)
}
func (d *DB) nonProdMigrate(ctx context.Context) error {
rows, err := d.db.QueryContext(ctx, "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")
if err != nil {
return errors.Errorf("failed to query database tables: %s", err)
}
if rows.Err() != nil {
return errors.Errorf("failed to query database tables: %s", err)
}
defer rows.Close()
var tables []string
for rows.Next() {
var table string
err := rows.Scan(&table)
if err != nil {
return errors.Errorf("failed to scan table name: %s", err)
}
tables = append(tables, table)
}
if len(tables) != 0 {
return nil
}
buf, err := migrationFS.ReadFile("migration/dev/" + latestSchemaFileName)
if err != nil {
return errors.Errorf("failed to read latest schema file: %s", err)
}
stmt := string(buf)
if _, err := d.db.ExecContext(ctx, stmt); err != nil {
return errors.Errorf("failed to exec SQL %s: %s", stmt, err)
}
return nil
}
func (d *DB) prodMigrate(ctx context.Context) error {
currentVersion := version.GetCurrentVersion(d.profile.Mode)
migrationHistoryList, err := d.FindMigrationHistoryList(ctx, &store.FindMigrationHistory{})
// If there is no migration history, we should apply the latest schema.
if err != nil || len(migrationHistoryList) == 0 {
buf, err := migrationFS.ReadFile("migration/prod/" + latestSchemaFileName)
if err != nil {
return errors.Errorf("failed to read latest schema file: %s", err)
}
stmt := string(buf)
if _, err := d.db.ExecContext(ctx, stmt); err != nil {
return errors.Errorf("failed to exec SQL %s: %s", stmt, err)
}
if _, err := d.UpsertMigrationHistory(ctx, &store.UpsertMigrationHistory{
Version: currentVersion,
}); err != nil {
return errors.Wrap(err, "failed to upsert migration history")
}
return nil
}
migrationHistoryVersionList := []string{}
for _, migrationHistory := range migrationHistoryList {
migrationHistoryVersionList = append(migrationHistoryVersionList, migrationHistory.Version)
}
sort.Sort(version.SortVersion(migrationHistoryVersionList))
latestMigrationHistoryVersion := migrationHistoryVersionList[len(migrationHistoryVersionList)-1]
if !version.IsVersionGreaterThan(version.GetSchemaVersion(currentVersion), latestMigrationHistoryVersion) {
return nil
}
fmt.Println("start migrate")
for _, minorVersion := range getMinorVersionList() {
normalizedVersion := minorVersion + ".0"
if version.IsVersionGreaterThan(normalizedVersion, latestMigrationHistoryVersion) && version.IsVersionGreaterOrEqualThan(currentVersion, normalizedVersion) {
fmt.Println("applying migration for", normalizedVersion)
if err := d.applyMigrationForMinorVersion(ctx, minorVersion); err != nil {
return errors.Wrap(err, "failed to apply minor version migration")
}
}
}
fmt.Println("end migrate")
return nil
}
func (d *DB) applyMigrationForMinorVersion(ctx context.Context, minorVersion string) error {
filenames, err := fs.Glob(migrationFS, fmt.Sprintf("migration/prod/%s/*.sql", minorVersion))
if err != nil {
return errors.Wrap(err, "failed to read ddl files")
}
sort.Strings(filenames)
// Loop over all migration files and execute them in order.
for _, filename := range filenames {
buf, err := migrationFS.ReadFile(filename)
if err != nil {
return errors.Wrapf(err, "failed to read minor version migration file, filename=%s", filename)
}
for _, stmt := range strings.Split(string(buf), ";") {
if strings.TrimSpace(stmt) == "" {
continue
}
if _, err := d.db.ExecContext(ctx, stmt); err != nil {
return errors.Wrapf(err, "migrate error: %s", stmt)
}
}
}
// Upsert the newest version to migration_history.
version := minorVersion + ".0"
if _, err = d.UpsertMigrationHistory(ctx, &store.UpsertMigrationHistory{Version: version}); err != nil {
return errors.Wrapf(err, "failed to upsert migration history with version: %s", version)
}
return nil
}
// minorDirRegexp is a regular expression for minor version directory.
var minorDirRegexp = regexp.MustCompile(`^migration/prod/[0-9]+\.[0-9]+$`)
func getMinorVersionList() []string {
minorVersionList := []string{}
if err := fs.WalkDir(migrationFS, "migration", func(path string, file fs.DirEntry, err error) error {
if err != nil {
return err
}
if file.IsDir() && minorDirRegexp.MatchString(path) {
minorVersionList = append(minorVersionList, file.Name())
}
return nil
}); err != nil {
panic(err)
}
sort.Sort(version.SortVersion(minorVersionList))
return minorVersionList
}

View File

@ -1,7 +1,6 @@
package postgres
import (
"context"
"database/sql"
"log"
@ -44,8 +43,8 @@ func (d *DB) GetDB() *sql.DB {
return d.db
}
func (*DB) GetCurrentDBSize(context.Context) (int64, error) {
return 0, errors.New("unimplemented")
func (d *DB) Type() string {
return "postgres"
}
func (d *DB) Close() error {