mirror of
https://github.com/usememos/memos.git
synced 2025-06-05 22:09:59 +02:00
chore: add presign background service
This commit is contained in:
@ -15,6 +15,8 @@ import (
|
|||||||
storepb "github.com/usememos/memos/proto/gen/store"
|
storepb "github.com/usememos/memos/proto/gen/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// presignLifetimeSecs is the lifetime of a presigned URL in seconds.
|
||||||
|
// The presigned URL is valid for 7 days.
|
||||||
const presignLifetimeSecs = 7 * 24 * 60 * 60
|
const presignLifetimeSecs = 7 * 24 * 60 * 60
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
apiv1 "github.com/usememos/memos/server/router/api/v1"
|
apiv1 "github.com/usememos/memos/server/router/api/v1"
|
||||||
"github.com/usememos/memos/server/router/frontend"
|
"github.com/usememos/memos/server/router/frontend"
|
||||||
"github.com/usememos/memos/server/router/rss"
|
"github.com/usememos/memos/server/router/rss"
|
||||||
|
s3objectpresigner "github.com/usememos/memos/server/service/s3_object_presigner"
|
||||||
versionchecker "github.com/usememos/memos/server/service/version_checker"
|
versionchecker "github.com/usememos/memos/server/service/version_checker"
|
||||||
"github.com/usememos/memos/store"
|
"github.com/usememos/memos/store"
|
||||||
)
|
)
|
||||||
@ -136,6 +137,7 @@ func (s *Server) Shutdown(ctx context.Context) {
|
|||||||
|
|
||||||
func (s *Server) StartBackgroundRunners(ctx context.Context) {
|
func (s *Server) StartBackgroundRunners(ctx context.Context) {
|
||||||
go versionchecker.NewVersionChecker(s.Store, s.Profile).Start(ctx)
|
go versionchecker.NewVersionChecker(s.Store, s.Profile).Start(ctx)
|
||||||
|
go s3objectpresigner.NewS3ObjectPresigner(s.Store).Start(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) getOrUpsertWorkspaceBasicSetting(ctx context.Context) (*storepb.WorkspaceBasicSetting, error) {
|
func (s *Server) getOrUpsertWorkspaceBasicSetting(ctx context.Context) (*storepb.WorkspaceBasicSetting, error) {
|
||||||
|
98
server/service/s3_object_presigner/s3_object_presigner.go
Normal file
98
server/service/s3_object_presigner/s3_object_presigner.go
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
package s3objectpresigner
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
|
||||||
|
"github.com/usememos/memos/plugin/storage/s3"
|
||||||
|
storepb "github.com/usememos/memos/proto/gen/store"
|
||||||
|
"github.com/usememos/memos/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// nolint
|
||||||
|
type S3ObjectPresigner struct {
|
||||||
|
Store *store.Store
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewS3ObjectPresigner(store *store.Store) *S3ObjectPresigner {
|
||||||
|
return &S3ObjectPresigner{
|
||||||
|
Store: store,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *S3ObjectPresigner) CheckAndPresign(ctx context.Context) error {
|
||||||
|
workspaceStorageSetting, err := p.Store.GetWorkspaceStorageSetting(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "failed to get workspace storage setting")
|
||||||
|
}
|
||||||
|
|
||||||
|
s3Config := workspaceStorageSetting.GetS3Config()
|
||||||
|
if s3Config == nil {
|
||||||
|
return errors.New("no actived external storage found")
|
||||||
|
}
|
||||||
|
s3Client, err := s3.NewClient(ctx, s3Config)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "Failed to create s3 client")
|
||||||
|
}
|
||||||
|
|
||||||
|
s3StorageType := storepb.ResourceStorageType_S3
|
||||||
|
resources, err := p.Store.ListResources(ctx, &store.FindResource{
|
||||||
|
GetBlob: false,
|
||||||
|
StorageType: &s3StorageType,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "list resources")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, resource := range resources {
|
||||||
|
s3ObjectPayload := resource.Payload.GetS3Object()
|
||||||
|
if s3ObjectPayload == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if s3ObjectPayload.LastPresignedTime != nil {
|
||||||
|
// Skip if the presigned URL is still valid.
|
||||||
|
if time.Now().Before(s3ObjectPayload.LastPresignedTime.AsTime().Add(24 * time.Hour)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
presignURL, err := s3Client.PresignGetObject(ctx, s3ObjectPayload.Key)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "Failed to presign via s3 client")
|
||||||
|
}
|
||||||
|
s3ObjectPayload.LastPresignedTime = timestamppb.New(time.Now())
|
||||||
|
if err := p.Store.UpdateResource(ctx, &store.UpdateResource{
|
||||||
|
Reference: &presignURL,
|
||||||
|
Payload: &storepb.ResourcePayload{
|
||||||
|
Payload: &storepb.ResourcePayload_S3Object_{
|
||||||
|
S3Object: s3ObjectPayload,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}); err != nil {
|
||||||
|
return errors.Wrap(err, "Failed to update resource")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *S3ObjectPresigner) Start(ctx context.Context) {
|
||||||
|
p.CheckAndPresign(ctx)
|
||||||
|
|
||||||
|
// Schedule runner every 24 hours.
|
||||||
|
ticker := time.NewTicker(24 * time.Hour)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
}
|
||||||
|
|
||||||
|
p.CheckAndPresign(ctx)
|
||||||
|
}
|
||||||
|
}
|
@ -66,6 +66,9 @@ func (d *DB) ListResources(ctx context.Context, find *store.FindResource) ([]*st
|
|||||||
if find.HasRelatedMemo {
|
if find.HasRelatedMemo {
|
||||||
where = append(where, "`memo_id` IS NOT NULL")
|
where = append(where, "`memo_id` IS NOT NULL")
|
||||||
}
|
}
|
||||||
|
if find.StorageType != nil {
|
||||||
|
where, args = append(where, "`storage_type` = ?"), append(args, find.StorageType.String())
|
||||||
|
}
|
||||||
|
|
||||||
fields := []string{"`id`", "`uid`", "`filename`", "`type`", "`size`", "`creator_id`", "`created_ts`", "`updated_ts`", "`memo_id`", "`storage_type`", "`reference`", "`payload`"}
|
fields := []string{"`id`", "`uid`", "`filename`", "`type`", "`size`", "`creator_id`", "`created_ts`", "`updated_ts`", "`memo_id`", "`storage_type`", "`reference`", "`payload`"}
|
||||||
if find.GetBlob {
|
if find.GetBlob {
|
||||||
@ -159,6 +162,16 @@ func (d *DB) UpdateResource(ctx context.Context, update *store.UpdateResource) e
|
|||||||
if v := update.MemoID; v != nil {
|
if v := update.MemoID; v != nil {
|
||||||
set, args = append(set, "`memo_id` = ?"), append(args, *v)
|
set, args = append(set, "`memo_id` = ?"), append(args, *v)
|
||||||
}
|
}
|
||||||
|
if v := update.Reference; v != nil {
|
||||||
|
set, args = append(set, "`reference` = ?"), append(args, *v)
|
||||||
|
}
|
||||||
|
if v := update.Payload; v != nil {
|
||||||
|
bytes, err := protojson.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "failed to marshal resource payload")
|
||||||
|
}
|
||||||
|
set, args = append(set, "`payload` = ?"), append(args, string(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
args = append(args, update.ID)
|
args = append(args, update.ID)
|
||||||
stmt := "UPDATE `resource` SET " + strings.Join(set, ", ") + " WHERE `id` = ?"
|
stmt := "UPDATE `resource` SET " + strings.Join(set, ", ") + " WHERE `id` = ?"
|
||||||
|
@ -57,6 +57,9 @@ func (d *DB) ListResources(ctx context.Context, find *store.FindResource) ([]*st
|
|||||||
if find.HasRelatedMemo {
|
if find.HasRelatedMemo {
|
||||||
where = append(where, "memo_id IS NOT NULL")
|
where = append(where, "memo_id IS NOT NULL")
|
||||||
}
|
}
|
||||||
|
if v := find.StorageType; v != nil {
|
||||||
|
where, args = append(where, "storage_type = "+placeholder(len(args)+1)), append(args, v.String())
|
||||||
|
}
|
||||||
|
|
||||||
fields := []string{"id", "uid", "filename", "type", "size", "creator_id", "created_ts", "updated_ts", "memo_id", "storage_type", "reference", "payload"}
|
fields := []string{"id", "uid", "filename", "type", "size", "creator_id", "created_ts", "updated_ts", "memo_id", "storage_type", "reference", "payload"}
|
||||||
if find.GetBlob {
|
if find.GetBlob {
|
||||||
@ -144,6 +147,16 @@ func (d *DB) UpdateResource(ctx context.Context, update *store.UpdateResource) e
|
|||||||
if v := update.MemoID; v != nil {
|
if v := update.MemoID; v != nil {
|
||||||
set, args = append(set, "memo_id = "+placeholder(len(args)+1)), append(args, *v)
|
set, args = append(set, "memo_id = "+placeholder(len(args)+1)), append(args, *v)
|
||||||
}
|
}
|
||||||
|
if v := update.Reference; v != nil {
|
||||||
|
set, args = append(set, "reference = "+placeholder(len(args)+1)), append(args, *v)
|
||||||
|
}
|
||||||
|
if v := update.Payload; v != nil {
|
||||||
|
bytes, err := protojson.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "failed to marshal resource payload")
|
||||||
|
}
|
||||||
|
set, args = append(set, "payload = "+placeholder(len(args)+1)), append(args, string(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
stmt := `UPDATE resource SET ` + strings.Join(set, ", ") + ` WHERE id = ` + placeholder(len(args)+1)
|
stmt := `UPDATE resource SET ` + strings.Join(set, ", ") + ` WHERE id = ` + placeholder(len(args)+1)
|
||||||
args = append(args, update.ID)
|
args = append(args, update.ID)
|
||||||
|
@ -59,6 +59,9 @@ func (d *DB) ListResources(ctx context.Context, find *store.FindResource) ([]*st
|
|||||||
if find.HasRelatedMemo {
|
if find.HasRelatedMemo {
|
||||||
where = append(where, "`memo_id` IS NOT NULL")
|
where = append(where, "`memo_id` IS NOT NULL")
|
||||||
}
|
}
|
||||||
|
if find.StorageType != nil {
|
||||||
|
where, args = append(where, "`storage_type` = ?"), append(args, find.StorageType.String())
|
||||||
|
}
|
||||||
|
|
||||||
fields := []string{"`id`", "`uid`", "`filename`", "`type`", "`size`", "`creator_id`", "`created_ts`", "`updated_ts`", "`memo_id`", "`storage_type`", "`reference`", "`payload`"}
|
fields := []string{"`id`", "`uid`", "`filename`", "`type`", "`size`", "`creator_id`", "`created_ts`", "`updated_ts`", "`memo_id`", "`storage_type`", "`reference`", "`payload`"}
|
||||||
if find.GetBlob {
|
if find.GetBlob {
|
||||||
@ -140,6 +143,16 @@ func (d *DB) UpdateResource(ctx context.Context, update *store.UpdateResource) e
|
|||||||
if v := update.MemoID; v != nil {
|
if v := update.MemoID; v != nil {
|
||||||
set, args = append(set, "`memo_id` = ?"), append(args, *v)
|
set, args = append(set, "`memo_id` = ?"), append(args, *v)
|
||||||
}
|
}
|
||||||
|
if v := update.Reference; v != nil {
|
||||||
|
set, args = append(set, "`reference` = ?"), append(args, *v)
|
||||||
|
}
|
||||||
|
if v := update.Payload; v != nil {
|
||||||
|
bytes, err := protojson.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "failed to marshal resource payload")
|
||||||
|
}
|
||||||
|
set, args = append(set, "`payload` = ?"), append(args, string(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
args = append(args, update.ID)
|
args = append(args, update.ID)
|
||||||
stmt := "UPDATE `resource` SET " + strings.Join(set, ", ") + " WHERE `id` = ?"
|
stmt := "UPDATE `resource` SET " + strings.Join(set, ", ") + " WHERE `id` = ?"
|
||||||
|
@ -45,6 +45,7 @@ type FindResource struct {
|
|||||||
Filename *string
|
Filename *string
|
||||||
MemoID *int32
|
MemoID *int32
|
||||||
HasRelatedMemo bool
|
HasRelatedMemo bool
|
||||||
|
StorageType *storepb.ResourceStorageType
|
||||||
Limit *int
|
Limit *int
|
||||||
Offset *int
|
Offset *int
|
||||||
}
|
}
|
||||||
@ -55,6 +56,8 @@ type UpdateResource struct {
|
|||||||
UpdatedTs *int64
|
UpdatedTs *int64
|
||||||
Filename *string
|
Filename *string
|
||||||
MemoID *int32
|
MemoID *int32
|
||||||
|
Reference *string
|
||||||
|
Payload *storepb.ResourcePayload
|
||||||
}
|
}
|
||||||
|
|
||||||
type DeleteResource struct {
|
type DeleteResource struct {
|
||||||
|
Reference in New Issue
Block a user