From 2f57eb5ece9a5cb25b78284d01bd55b14d2e4580 Mon Sep 17 00:00:00 2001 From: tsmethurst Date: Sun, 2 Jan 2022 15:00:53 +0100 Subject: [PATCH] fiddle around with workers --- internal/media/image.go | 88 ++++++------- internal/media/manager.go | 224 +++------------------------------ internal/media/media.go | 7 ++ internal/media/pool.go | 65 ++++++++++ internal/media/process.go | 5 + internal/media/processvideo.go | 23 ---- internal/media/util_test.go | 150 ---------------------- 7 files changed, 135 insertions(+), 427 deletions(-) create mode 100644 internal/media/media.go create mode 100644 internal/media/pool.go create mode 100644 internal/media/process.go delete mode 100644 internal/media/processvideo.go delete mode 100644 internal/media/util_test.go diff --git a/internal/media/image.go b/internal/media/image.go index 87b5d70b7..26bd5e8b6 100644 --- a/internal/media/image.go +++ b/internal/media/image.go @@ -20,6 +20,7 @@ package media import ( "bytes" + "context" "errors" "fmt" "image" @@ -51,7 +52,8 @@ type imageAndMeta struct { blurhash string } -func (m *manager) processImage(data []byte, contentType string) (*gtsmodel.MediaAttachment, error) { +func (m *manager) processImage(ctx context.Context, data []byte, contentType string, accountID string) { + var clean []byte var err error var original *imageAndMeta @@ -68,9 +70,9 @@ func (m *manager) processImage(data []byte, contentType string) (*gtsmodel.Media case mimeImageGif: // gifs are already clean - no exif data to remove clean = data - original, err = decodeGif(clean, contentType) + original, err = decodeGif(clean) default: - err = fmt.Errorf("content type %s not a recognized image type", contentType) + err = fmt.Errorf("content type %s not a processible image type", contentType) } if err != nil { @@ -89,47 +91,46 @@ func (m *manager) processImage(data []byte, contentType string) (*gtsmodel.Media return nil, err } - originalURL := uris.GenerateURIForAttachment(minAttachment.AccountID, string(TypeAttachment), string(SizeOriginal), attachmentID, extension) - smallURL := uris.GenerateURIForAttachment(minAttachment.AccountID, string(TypeAttachment), string(SizeSmall), attachmentID, "jpeg") // all thumbnails/smalls are encoded as jpeg + originalURL := uris.GenerateURIForAttachment(accountID, string(TypeAttachment), string(SizeOriginal), attachmentID, extension) + smallURL := uris.GenerateURIForAttachment(accountID, string(TypeAttachment), string(SizeSmall), attachmentID, "jpeg") // all thumbnails/smalls are encoded as jpeg // we store the original... - originalPath := fmt.Sprintf("%s/%s/%s/%s.%s", minAttachment.AccountID, TypeAttachment, SizeOriginal, attachmentID, extension) + originalPath := fmt.Sprintf("%s/%s/%s/%s.%s", accountID, TypeAttachment, SizeOriginal, attachmentID, extension) if err := m.storage.Put(originalPath, original.image); err != nil { return nil, fmt.Errorf("storage error: %s", err) } // and a thumbnail... - smallPath := fmt.Sprintf("%s/%s/%s/%s.jpeg", minAttachment.AccountID, TypeAttachment, SizeSmall, attachmentID) // all thumbnails/smalls are encoded as jpeg + smallPath := fmt.Sprintf("%s/%s/%s/%s.jpeg", accountID, TypeAttachment, SizeSmall, attachmentID) // all thumbnails/smalls are encoded as jpeg if err := m.storage.Put(smallPath, small.image); err != nil { return nil, fmt.Errorf("storage error: %s", err) } - minAttachment.FileMeta.Original = gtsmodel.Original{ - Width: original.width, - Height: original.height, - Size: original.size, - Aspect: original.aspect, - } - - minAttachment.FileMeta.Small = gtsmodel.Small{ - Width: small.width, - Height: small.height, - Size: small.size, - Aspect: small.aspect, - } - attachment := >smodel.MediaAttachment{ - ID: attachmentID, - StatusID: minAttachment.StatusID, - URL: originalURL, - RemoteURL: minAttachment.RemoteURL, - CreatedAt: minAttachment.CreatedAt, - UpdatedAt: minAttachment.UpdatedAt, - Type: gtsmodel.FileTypeImage, - FileMeta: minAttachment.FileMeta, - AccountID: minAttachment.AccountID, - Description: minAttachment.Description, - ScheduledStatusID: minAttachment.ScheduledStatusID, + ID: attachmentID, + StatusID: "", + URL: originalURL, + RemoteURL: "", + CreatedAt: time.Time{}, + UpdatedAt: time.Time{}, + Type: gtsmodel.FileTypeImage, + FileMeta: gtsmodel.FileMeta{ + Original: gtsmodel.Original{ + Width: original.width, + Height: original.height, + Size: original.size, + Aspect: original.aspect, + }, + Small: gtsmodel.Small{ + Width: small.width, + Height: small.height, + Size: small.size, + Aspect: small.aspect, + }, + }, + AccountID: accountID, + Description: "", + ScheduledStatusID: "", Blurhash: small.blurhash, Processing: 2, File: gtsmodel.File{ @@ -144,33 +145,24 @@ func (m *manager) processImage(data []byte, contentType string) (*gtsmodel.Media FileSize: len(small.image), UpdatedAt: time.Now(), URL: smallURL, - RemoteURL: minAttachment.Thumbnail.RemoteURL, + RemoteURL: "", }, - Avatar: minAttachment.Avatar, - Header: minAttachment.Header, + Avatar: false, + Header: false, } return attachment, nil } -func decodeGif(b []byte, extension string) (*imageAndMeta, error) { - var g *gif.GIF - var err error - - switch extension { - case mimeGif: - g, err = gif.DecodeAll(bytes.NewReader(b)) - default: - err = fmt.Errorf("extension %s not recognised", extension) - } - +func decodeGif(b []byte) (*imageAndMeta, error) { + gif, err := gif.DecodeAll(bytes.NewReader(b)) if err != nil { return nil, err } // use the first frame to get the static characteristics - width := g.Config.Width - height := g.Config.Height + width := gif.Config.Width + height := gif.Config.Height size := width * height aspect := float64(width) / float64(height) diff --git a/internal/media/manager.go b/internal/media/manager.go index 782542ca9..16465bb67 100644 --- a/internal/media/manager.go +++ b/internal/media/manager.go @@ -22,45 +22,32 @@ import ( "context" "errors" "fmt" - "net/url" + "runtime" "strings" - "time" "codeberg.org/gruf/go-store/kv" "github.com/superseriousbusiness/gotosocial/internal/db" - "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/id" - "github.com/superseriousbusiness/gotosocial/internal/transport" - "github.com/superseriousbusiness/gotosocial/internal/uris" ) -// ProcessCallback is triggered by the media manager when an attachment has finished undergoing -// image processing (generation of a blurhash, thumbnail etc) but hasn't yet been inserted into -// the database. It is provided to allow callers to a) access the processed media attachment and b) -// make any last-minute changes to the media attachment before it enters the database. -type ProcessCallback func(*gtsmodel.MediaAttachment) *gtsmodel.MediaAttachment - -// defaultCB will be used when a nil ProcessCallback is passed to one of the manager's interface functions. -// It just returns the processed media attachment with no additional changes. -var defaultCB ProcessCallback = func(a *gtsmodel.MediaAttachment) *gtsmodel.MediaAttachment { - return a -} - // Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs. type Manager interface { - ProcessAttachment(ctx context.Context, data []byte, accountID string, cb ProcessCallback) (*gtsmodel.MediaAttachment, error) + ProcessMedia(ctx context.Context, data []byte, accountID string) (*Media, error) } type manager struct { db db.DB storage *kv.KVStore + pool *workerPool } // New returns a media manager with the given db and underlying storage. func New(database db.DB, storage *kv.KVStore) Manager { + workers := runtime.NumCPU() / 2 + return &manager{ db: database, storage: storage, + pool: newWorkerPool(workers), } } @@ -68,13 +55,19 @@ func New(database db.DB, storage *kv.KVStore) Manager { INTERFACE FUNCTIONS */ -func (m *manager) ProcessAttachment(ctx context.Context, data []byte, accountID string, cb ProcessCallback) (*gtsmodel.MediaAttachment, error) { +func (m *manager) ProcessMedia(ctx context.Context, data []byte, accountID string) (*Media, error) { contentType, err := parseContentType(data) if err != nil { return nil, err } - mainType := strings.Split(contentType, "/")[0] + split := strings.Split(contentType, "/") + if len(split) != 2 { + return nil, fmt.Errorf("content type %s malformed", contentType) + } + + mainType := split[0] + switch mainType { case mimeImage: if !supportedImage(contentType) { @@ -83,192 +76,11 @@ func (m *manager) ProcessAttachment(ctx context.Context, data []byte, accountID if len(data) == 0 { return nil, errors.New("image was of size 0") } - return m.processImage(attachmentBytes, minAttachment) + + return m.pool.run(func(ctx context.Context, data []byte, contentType string, accountID string) { + m.processImage(ctx, data, contentType, accountID) + }) default: return nil, fmt.Errorf("content type %s not (yet) supported", contentType) } } - -// ProcessHeaderOrAvatar takes a new header image for an account, checks it out, removes exif data from it, -// puts it in whatever storage backend we're using, sets the relevant fields in the database for the new image, -// and then returns information to the caller about the new header. -func (m *manager) ProcessHeader(ctx context.Context, data []byte, accountID string, cb ProcessCallback) (*gtsmodel.MediaAttachment, error) { - - // make sure we have a type we can handle - contentType, err := parseContentType(data) - if err != nil { - return nil, err - } - - if !supportedImage(contentType) { - return nil, fmt.Errorf("%s is not an accepted image type", contentType) - } - - if len(data) == 0 { - return nil, fmt.Errorf("passed reader was of size 0") - } - - // process it - ma, err := m.processHeaderOrAvi(attachment, contentType, mediaType, accountID, remoteURL) - if err != nil { - return nil, fmt.Errorf("error processing %s: %s", mediaType, err) - } - - // set it in the database - if err := m.db.SetAccountHeaderOrAvatar(ctx, ma, accountID); err != nil { - return nil, fmt.Errorf("error putting %s in database: %s", mediaType, err) - } - - return ma, nil -} - -// ProcessLocalEmoji takes a new emoji and a shortcode, cleans it up, puts it in storage, and creates a new -// *gts.Emoji for it, then returns it to the caller. It's the caller's responsibility to put the returned struct -// in the database. -func (m *manager) ProcessLocalEmoji(ctx context.Context, emojiBytes []byte, shortcode string) (*gtsmodel.Emoji, error) { - var clean []byte - var err error - var original *imageAndMeta - var static *imageAndMeta - - // check content type of the submitted emoji and make sure it's supported by us - contentType, err := parseContentType(emojiBytes) - if err != nil { - return nil, err - } - if !supportedEmoji(contentType) { - return nil, fmt.Errorf("content type %s not supported for emojis", contentType) - } - - if len(emojiBytes) == 0 { - return nil, errors.New("emoji was of size 0") - } - if len(emojiBytes) > EmojiMaxBytes { - return nil, fmt.Errorf("emoji size %d bytes exceeded max emoji size of %d bytes", len(emojiBytes), EmojiMaxBytes) - } - - // clean any exif data from png but leave gifs alone - switch contentType { - case mimePng: - if clean, err = purgeExif(emojiBytes); err != nil { - return nil, fmt.Errorf("error cleaning exif data: %s", err) - } - case mimeGif: - clean = emojiBytes - default: - return nil, errors.New("media type unrecognized") - } - - // unlike with other attachments we don't need to derive anything here because we don't care about the width/height etc - original = &imageAndMeta{ - image: clean, - } - - static, err = deriveStaticEmoji(clean, contentType) - if err != nil { - return nil, fmt.Errorf("error deriving static emoji: %s", err) - } - - // since emoji aren't 'owned' by an account, but we still want to use the same pattern for serving them through the filserver, - // (ie., fileserver/ACCOUNT_ID/etc etc) we need to fetch the INSTANCE ACCOUNT from the database. That is, the account that's created - // with the same username as the instance hostname, which doesn't belong to any particular user. - instanceAccount, err := m.db.GetInstanceAccount(ctx, "") - if err != nil { - return nil, fmt.Errorf("error fetching instance account: %s", err) - } - - // the file extension (either png or gif) - extension := strings.Split(contentType, "/")[1] - - // generate a ulid for the new emoji - newEmojiID, err := id.NewRandomULID() - if err != nil { - return nil, err - } - - // activitypub uri for the emoji -- unrelated to actually serving the image - // will be something like https://example.org/emoji/01FPSVBK3H8N7V8XK6KGSQ86EC - emojiURI := uris.GenerateURIForEmoji(newEmojiID) - - // serve url and storage path for the original emoji -- can be png or gif - emojiURL := uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeOriginal), newEmojiID, extension) - emojiPath := fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeOriginal, newEmojiID, extension) - - // serve url and storage path for the static version -- will always be png - emojiStaticURL := uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), newEmojiID, "png") - emojiStaticPath := fmt.Sprintf("%s/%s/%s/%s.png", instanceAccount.ID, TypeEmoji, SizeStatic, newEmojiID) - - // Store the original emoji - if err := m.storage.Put(emojiPath, original.image); err != nil { - return nil, fmt.Errorf("storage error: %s", err) - } - - // Store the static emoji - if err := m.storage.Put(emojiStaticPath, static.image); err != nil { - return nil, fmt.Errorf("storage error: %s", err) - } - - // and finally return the new emoji data to the caller -- it's up to them what to do with it - e := >smodel.Emoji{ - ID: newEmojiID, - Shortcode: shortcode, - Domain: "", // empty because this is a local emoji - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - ImageRemoteURL: "", // empty because this is a local emoji - ImageStaticRemoteURL: "", // empty because this is a local emoji - ImageURL: emojiURL, - ImageStaticURL: emojiStaticURL, - ImagePath: emojiPath, - ImageStaticPath: emojiStaticPath, - ImageContentType: contentType, - ImageStaticContentType: mimePng, // static version will always be a png - ImageFileSize: len(original.image), - ImageStaticFileSize: len(static.image), - ImageUpdatedAt: time.Now(), - Disabled: false, - URI: emojiURI, - VisibleInPicker: true, - CategoryID: "", // empty because this is a new emoji -- no category yet - } - return e, nil -} - -func (m *manager) ProcessRemoteHeaderOrAvatar(ctx context.Context, t transport.Transport, currentAttachment *gtsmodel.MediaAttachment, accountID string) (*gtsmodel.MediaAttachment, error) { - if !currentAttachment.Header && !currentAttachment.Avatar { - return nil, errors.New("provided attachment was set to neither header nor avatar") - } - - if currentAttachment.Header && currentAttachment.Avatar { - return nil, errors.New("provided attachment was set to both header and avatar") - } - - var headerOrAvi Type - if currentAttachment.Header { - headerOrAvi = TypeHeader - } else if currentAttachment.Avatar { - headerOrAvi = TypeAvatar - } - - if currentAttachment.RemoteURL == "" { - return nil, errors.New("no remote URL on media attachment to dereference") - } - remoteIRI, err := url.Parse(currentAttachment.RemoteURL) - if err != nil { - return nil, fmt.Errorf("error parsing attachment url %s: %s", currentAttachment.RemoteURL, err) - } - - // for content type, we assume we don't know what to expect... - expectedContentType := "*/*" - if currentAttachment.File.ContentType != "" { - // ... and then narrow it down if we do - expectedContentType = currentAttachment.File.ContentType - } - - attachmentBytes, err := t.DereferenceMedia(ctx, remoteIRI, expectedContentType) - if err != nil { - return nil, fmt.Errorf("dereferencing remote media with url %s: %s", remoteIRI.String(), err) - } - - return m.ProcessHeaderOrAvatar(ctx, attachmentBytes, accountID, headerOrAvi, currentAttachment.RemoteURL) -} diff --git a/internal/media/media.go b/internal/media/media.go new file mode 100644 index 000000000..e96c37020 --- /dev/null +++ b/internal/media/media.go @@ -0,0 +1,7 @@ +package media + +import gtsmodel "github.com/superseriousbusiness/gotosocial/internal/db/bundb/migrations/20211113114307_init" + +type Media struct { + Attachment *gtsmodel.MediaAttachment +} diff --git a/internal/media/pool.go b/internal/media/pool.go new file mode 100644 index 000000000..19b31cde3 --- /dev/null +++ b/internal/media/pool.go @@ -0,0 +1,65 @@ +package media + +import "context" + +func newWorkerPool(workers int) *workerPool { + // make a pool with the given worker capacity + pool := &workerPool{ + workerQueue: make(chan *worker, workers), + } + + // fill the pool with workers + for i := 0; i < workers; i++ { + pool.workerQueue <- &worker{ + // give each worker a reference to the pool so it + // can put itself back in when it's finished + workerQueue: pool.workerQueue, + data: []byte{}, + contentType: "", + accountID: "", + } + } + + return pool +} + +type workerPool struct { + workerQueue chan *worker +} + +func (p *workerPool) run(fn func(ctx context.Context, data []byte, contentType string, accountID string)) (*Media, error) { + + m := &Media{} + + go func() { + // take a worker from the worker pool + worker := <-p.workerQueue + // tell it to work + worker.work(fn) + }() + + return m, nil +} + +type worker struct { + workerQueue chan *worker + data []byte + contentType string + accountID string +} + +func (w *worker) work(fn func(ctx context.Context, data []byte, contentType string, accountID string)) { + // return self to pool when finished + defer w.finish() + // do the work + fn(context.Background(), w.data, w.contentType, w.accountID) +} + +func (w *worker) finish() { + // clear self + w.data = []byte{} + w.contentType = "" + w.accountID = "" + // put self back in the worker pool + w.workerQueue <- w +} diff --git a/internal/media/process.go b/internal/media/process.go new file mode 100644 index 000000000..e921be6bc --- /dev/null +++ b/internal/media/process.go @@ -0,0 +1,5 @@ +package media + +import "context" + +type mediaProcessingFunction func(ctx context.Context, data []byte, contentType string, accountID string) diff --git a/internal/media/processvideo.go b/internal/media/processvideo.go deleted file mode 100644 index e829c68c0..000000000 --- a/internal/media/processvideo.go +++ /dev/null @@ -1,23 +0,0 @@ -/* - GoToSocial - Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . -*/ - -package media - -// func (mh *mediaManager) processVideoAttachment(data []byte, accountID string, contentType string, remoteURL string) (*gtsmodel.MediaAttachment, error) { -// return nil, nil -// } diff --git a/internal/media/util_test.go b/internal/media/util_test.go deleted file mode 100644 index 1180bf2aa..000000000 --- a/internal/media/util_test.go +++ /dev/null @@ -1,150 +0,0 @@ -/* - GoToSocial - Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . -*/ - -package media - -import ( - "io/ioutil" - "testing" - - "github.com/spf13/viper" - "github.com/superseriousbusiness/gotosocial/internal/config" - "github.com/superseriousbusiness/gotosocial/internal/log" - - "github.com/stretchr/testify/suite" -) - -type MediaUtilTestSuite struct { - suite.Suite -} - -/* - TEST INFRASTRUCTURE -*/ - -// SetupSuite sets some variables on the suite that we can use as consts (more or less) throughout -func (suite *MediaUtilTestSuite) SetupSuite() { - // doesn't use testrig.InitTestLog() helper to prevent import cycle - viper.Set(config.Keys.LogLevel, "trace") - err := log.Initialize() - if err != nil { - panic(err) - } -} - -func (suite *MediaUtilTestSuite) TearDownSuite() { - -} - -// SetupTest creates a db connection and creates necessary tables before each test -func (suite *MediaUtilTestSuite) SetupTest() { - -} - -// TearDownTest drops tables to make sure there's no data in the db -func (suite *MediaUtilTestSuite) TearDownTest() { - -} - -/* - ACTUAL TESTS -*/ - -func (suite *MediaUtilTestSuite) TestParseContentTypeOK() { - f, err := ioutil.ReadFile("./test/test-jpeg.jpg") - suite.NoError(err) - ct, err := parseContentType(f) - suite.NoError(err) - suite.Equal("image/jpeg", ct) -} - -func (suite *MediaUtilTestSuite) TestParseContentTypeNotOK() { - f, err := ioutil.ReadFile("./test/test-corrupted.jpg") - suite.NoError(err) - ct, err := parseContentType(f) - suite.NotNil(err) - suite.Equal("", ct) - suite.Equal("filetype unknown", err.Error()) -} - -func (suite *MediaUtilTestSuite) TestRemoveEXIF() { - // load and validate image - b, err := ioutil.ReadFile("./test/test-with-exif.jpg") - suite.NoError(err) - - // clean it up and validate the clean version - clean, err := purgeExif(b) - suite.NoError(err) - - // compare it to our stored sample - sampleBytes, err := ioutil.ReadFile("./test/test-without-exif.jpg") - suite.NoError(err) - suite.EqualValues(sampleBytes, clean) -} - -func (suite *MediaUtilTestSuite) TestDeriveImageFromJPEG() { - // load image - b, err := ioutil.ReadFile("./test/test-jpeg.jpg") - suite.NoError(err) - - // clean it up and validate the clean version - imageAndMeta, err := decodeImage(b, "image/jpeg") - suite.NoError(err) - - suite.Equal(1920, imageAndMeta.width) - suite.Equal(1080, imageAndMeta.height) - suite.Equal(1.7777777777777777, imageAndMeta.aspect) - suite.Equal(2073600, imageAndMeta.size) - - // assert that the final image is what we would expect - sampleBytes, err := ioutil.ReadFile("./test/test-jpeg-processed.jpg") - suite.NoError(err) - suite.EqualValues(sampleBytes, imageAndMeta.image) -} - -func (suite *MediaUtilTestSuite) TestDeriveThumbnailFromJPEG() { - // load image - b, err := ioutil.ReadFile("./test/test-jpeg.jpg") - suite.NoError(err) - - // clean it up and validate the clean version - imageAndMeta, err := deriveThumbnail(b, "image/jpeg", 512, 512) - suite.NoError(err) - - suite.Equal(512, imageAndMeta.width) - suite.Equal(288, imageAndMeta.height) - suite.Equal(1.7777777777777777, imageAndMeta.aspect) - suite.Equal(147456, imageAndMeta.size) - suite.Equal("LjBzUo#6RQR._NvzRjWF?urqV@a$", imageAndMeta.blurhash) - - sampleBytes, err := ioutil.ReadFile("./test/test-jpeg-thumbnail.jpg") - suite.NoError(err) - suite.EqualValues(sampleBytes, imageAndMeta.image) -} - -func (suite *MediaUtilTestSuite) TestSupportedImageTypes() { - ok := supportedImage("image/jpeg") - suite.True(ok) - - ok = supportedImage("image/bmp") - suite.False(ok) -} - -func TestMediaUtilTestSuite(t *testing.T) { - suite.Run(t, new(MediaUtilTestSuite)) -}