From 8fdd358f4b4428b33df4afd672ed070032d46e48 Mon Sep 17 00:00:00 2001 From: Vyr Cossont Date: Tue, 23 Jul 2024 12:44:31 -0700 Subject: [PATCH] [feature] Conversations API (#3013) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Implement conversations API * Sort and page conversations by last status ID * Appease linter * Fix deleting conversations and statuses * Refactor to make migrations automatic * Lint * Update tests post-merge * Fixes from live-fire testing * Linter caught a format problem * Refactor tests, fix cache * Negative test for non-DMs * Run conversations advanced migration on testrig startup as well as regular server startup * Document (lack of) side effects of API method for deleting a conversation * Make not-found check less nested for readability * Rename PutConversation to UpsertConversation * Use util.Ptr instead of IIFE * Reduce cache used by conversations * Remove unnecessary TableExpr/ColumnExpr * Use struct tags for both unique constraints on Conversation * Make it clear how paging with GetDirectStatusIDsBatch should be used * Let conversation paging skip conversations it can't render * Use Bun NewDropTable * Convert delete raw query to Bun * Convert update raw query to Bun * Convert latestConversationStatusesTempTable raw query partially to Bun * Convert conversationStatusesTempTable raw query partially to Bun * Rename field used to store result of MaxDirectStatusID * Move advanced migrations to their own tiny processor * Catch up util function name with main * Remove json.… wrappers * Remove redundant check * Combine error checks * Replace map with slice of structs * Address processor/type converter comments - Add context info for errors - Extract some common processor code into shared methods - Move conversation eligibility check ahead of populating conversation * Add error context when dropping temp tables --- cmd/gotosocial/action/server/server.go | 5 + cmd/gotosocial/action/testrig/testrig.go | 5 + docs/api/swagger.yaml | 75 ++- .../conversations/conversationdelete.go | 93 ++++ .../client/conversations/conversationread.go | 95 ++++ .../api/client/conversations/conversations.go | 10 +- .../client/conversations/conversationsget.go | 44 +- internal/cache/cache.go | 2 + internal/cache/db.go | 52 ++ internal/cache/invalidate.go | 5 + internal/cache/size.go | 15 + internal/cache/wrappers.go | 28 + internal/config/config.go | 96 ++-- internal/config/defaults.go | 94 ++-- internal/config/helpers.gen.go | 56 ++ internal/db/advancedmigration.go | 29 + internal/db/bundb/advancedmigration.go | 52 ++ internal/db/bundb/bundb.go | 11 + internal/db/bundb/conversation.go | 494 ++++++++++++++++++ internal/db/bundb/conversation_test.go | 115 ++++ .../20240611190733_add_conversations.go | 78 +++ .../20240712005536_add_advanced_migrations.go | 49 ++ internal/db/bundb/status.go | 32 ++ internal/db/conversation.go | 52 ++ internal/db/db.go | 2 + internal/db/status.go | 12 + internal/db/test/conversation.go | 122 +++++ internal/gtsmodel/advancedmigration.go | 32 ++ internal/gtsmodel/conversation.go | 77 +++ internal/processing/account/delete.go | 8 + .../advancedmigrations/advancedmigrations.go | 48 ++ .../processing/conversations/conversations.go | 126 +++++ .../conversations/conversations_test.go | 151 ++++++ internal/processing/conversations/delete.go | 45 ++ .../processing/conversations/delete_test.go | 27 + internal/processing/conversations/get.go | 101 ++++ internal/processing/conversations/get_test.go | 65 +++ internal/processing/conversations/migrate.go | 131 +++++ .../processing/conversations/migrate_test.go | 85 +++ internal/processing/conversations/read.go | 65 +++ .../processing/conversations/read_test.go | 34 ++ internal/processing/conversations/update.go | 242 +++++++++ .../processing/conversations/update_test.go | 54 ++ internal/processing/processor.go | 49 +- internal/processing/stream/conversation.go | 44 ++ .../processing/workers/fromclientapi_test.go | 231 ++++++++ internal/processing/workers/surface.go | 12 +- .../processing/workers/surfacenotify_test.go | 11 +- .../processing/workers/surfacetimeline.go | 13 +- internal/processing/workers/util.go | 5 + internal/processing/workers/workers.go | 13 +- internal/processing/workers/workers_test.go | 1 + internal/stream/stream.go | 4 + internal/typeutils/internaltofrontend.go | 61 +++ test/envparsing.sh | 2 + 55 files changed, 3317 insertions(+), 143 deletions(-) create mode 100644 internal/api/client/conversations/conversationdelete.go create mode 100644 internal/api/client/conversations/conversationread.go create mode 100644 internal/db/advancedmigration.go create mode 100644 internal/db/bundb/advancedmigration.go create mode 100644 internal/db/bundb/conversation.go create mode 100644 internal/db/bundb/conversation_test.go create mode 100644 internal/db/bundb/migrations/20240611190733_add_conversations.go create mode 100644 internal/db/bundb/migrations/20240712005536_add_advanced_migrations.go create mode 100644 internal/db/conversation.go create mode 100644 internal/db/test/conversation.go create mode 100644 internal/gtsmodel/advancedmigration.go create mode 100644 internal/gtsmodel/conversation.go create mode 100644 internal/processing/advancedmigrations/advancedmigrations.go create mode 100644 internal/processing/conversations/conversations.go create mode 100644 internal/processing/conversations/conversations_test.go create mode 100644 internal/processing/conversations/delete.go create mode 100644 internal/processing/conversations/delete_test.go create mode 100644 internal/processing/conversations/get.go create mode 100644 internal/processing/conversations/get_test.go create mode 100644 internal/processing/conversations/migrate.go create mode 100644 internal/processing/conversations/migrate_test.go create mode 100644 internal/processing/conversations/read.go create mode 100644 internal/processing/conversations/read_test.go create mode 100644 internal/processing/conversations/update.go create mode 100644 internal/processing/conversations/update_test.go create mode 100644 internal/processing/stream/conversation.go diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index 828b9c875..475804218 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -290,6 +290,11 @@ var Start action.GTSAction = func(ctx context.Context) error { return fmt.Errorf("error initializing metrics: %w", err) } + // Run advanced migrations. + if err := processor.AdvancedMigrations().Migrate(ctx); err != nil { + return err + } + /* HTTP router initialization */ diff --git a/cmd/gotosocial/action/testrig/testrig.go b/cmd/gotosocial/action/testrig/testrig.go index 99f366fbe..95982fc72 100644 --- a/cmd/gotosocial/action/testrig/testrig.go +++ b/cmd/gotosocial/action/testrig/testrig.go @@ -204,6 +204,11 @@ var Start action.GTSAction = func(ctx context.Context) error { return fmt.Errorf("error initializing metrics: %w", err) } + // Run advanced migrations. + if err := processor.AdvancedMigrations().Migrate(ctx); err != nil { + return err + } + /* HTTP router initialization */ diff --git a/docs/api/swagger.yaml b/docs/api/swagger.yaml index 9fcc8bab3..22ce536fd 100644 --- a/docs/api/swagger.yaml +++ b/docs/api/swagger.yaml @@ -6208,11 +6208,43 @@ paths: - read:bookmarks tags: - bookmarks + /api/v1/conversation/{id}/read: + post: + operationId: conversationRead + parameters: + - description: ID of the conversation. + in: path + name: id + required: true + type: string + produces: + - application/json + responses: + "200": + description: Updated conversation. + schema: + $ref: '#/definitions/conversation' + "400": + description: bad request + "401": + description: unauthorized + "404": + description: not found + "406": + description: not acceptable + "422": + description: unprocessable content + "500": + description: internal server error + security: + - OAuth2 Bearer: + - write:conversations + summary: Mark a conversation with the given ID as read. + tags: + - conversations /api/v1/conversations: get: description: |- - NOT IMPLEMENTED YET: Will currently always return an array of length 0. - The next and previous queries can be parsed from the returned Link header. Example: @@ -6221,15 +6253,15 @@ paths: ```` operationId: conversationsGet parameters: - - description: 'Return only conversations *OLDER* than the given max ID. The conversation with the specified ID will not be included in the response. NOTE: the ID is of the internal conversation, use the Link header for pagination.' + - description: 'Return only conversations with last statuses *OLDER* than the given max ID. The conversation with the specified ID will not be included in the response. NOTE: The ID is a status ID. Use the Link header for pagination.' in: query name: max_id type: string - - description: 'Return only conversations *NEWER* than the given since ID. The conversation with the specified ID will not be included in the response. NOTE: the ID is of the internal conversation, use the Link header for pagination.' + - description: 'Return only conversations with last statuses *NEWER* than the given since ID. The conversation with the specified ID will not be included in the response. NOTE: The ID is a status ID. Use the Link header for pagination.' in: query name: since_id type: string - - description: 'Return only conversations *IMMEDIATELY NEWER* than the given min ID. The conversation with the specified ID will not be included in the response. NOTE: the ID is of the internal conversation, use the Link header for pagination.' + - description: 'Return only conversations with last statuses *IMMEDIATELY NEWER* than the given min ID. The conversation with the specified ID will not be included in the response. NOTE: The ID is a status ID. Use the Link header for pagination.' in: query name: min_id type: string @@ -6269,6 +6301,39 @@ paths: summary: Get an array of (direct message) conversations that requesting account is involved in. tags: - conversations + /api/v1/conversations/{id}: + delete: + description: |- + This doesn't delete the actual statuses in the conversation, + nor does it prevent a new conversation from being created later from the same thread and participants. + operationId: conversationDelete + parameters: + - description: ID of the conversation + in: path + name: id + required: true + type: string + produces: + - application/json + responses: + "200": + description: conversation deleted + "400": + description: bad request + "401": + description: unauthorized + "404": + description: not found + "406": + description: not acceptable + "500": + description: internal server error + security: + - OAuth2 Bearer: + - write:conversations + summary: Delete a single conversation with the given ID. + tags: + - conversations /api/v1/custom_emojis: get: operationId: customEmojisGet diff --git a/internal/api/client/conversations/conversationdelete.go b/internal/api/client/conversations/conversationdelete.go new file mode 100644 index 000000000..6f8f43a94 --- /dev/null +++ b/internal/api/client/conversations/conversationdelete.go @@ -0,0 +1,93 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations + +import ( + "net/http" + + "github.com/gin-gonic/gin" + apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/oauth" +) + +// ConversationDELETEHandler swagger:operation DELETE /api/v1/conversations/{id} conversationDelete +// +// Delete a single conversation with the given ID. +// +// This doesn't delete the actual statuses in the conversation, +// nor does it prevent a new conversation from being created later from the same thread and participants. +// +// --- +// tags: +// - conversations +// +// produces: +// - application/json +// +// parameters: +// - +// name: id +// type: string +// description: ID of the conversation +// in: path +// required: true +// +// security: +// - OAuth2 Bearer: +// - write:conversations +// +// responses: +// '200': +// description: conversation deleted +// '400': +// description: bad request +// '401': +// description: unauthorized +// '404': +// description: not found +// '406': +// description: not acceptable +// '500': +// description: internal server error +func (m *Module) ConversationDELETEHandler(c *gin.Context) { + authed, err := oauth.Authed(c, true, true, true, true) + if err != nil { + apiutil.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGetV1) + return + } + + if _, err := apiutil.NegotiateAccept(c, apiutil.JSONAcceptHeaders...); err != nil { + apiutil.ErrorHandler(c, gtserror.NewErrorNotAcceptable(err, err.Error()), m.processor.InstanceGetV1) + return + } + + id, errWithCode := apiutil.ParseID(c.Param(apiutil.IDKey)) + if errWithCode != nil { + apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) + return + } + + errWithCode = m.processor.Conversations().Delete(c.Request.Context(), authed.Account, id) + if errWithCode != nil { + apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) + return + } + + c.JSON(http.StatusOK, apiutil.EmptyJSONObject) +} diff --git a/internal/api/client/conversations/conversationread.go b/internal/api/client/conversations/conversationread.go new file mode 100644 index 000000000..7f68a2a33 --- /dev/null +++ b/internal/api/client/conversations/conversationread.go @@ -0,0 +1,95 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations + +import ( + "net/http" + + "github.com/gin-gonic/gin" + apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/oauth" +) + +// ConversationReadPOSTHandler swagger:operation POST /api/v1/conversation/{id}/read conversationRead +// +// Mark a conversation with the given ID as read. +// +// --- +// tags: +// - conversations +// +// produces: +// - application/json +// +// parameters: +// - +// name: id +// in: path +// type: string +// required: true +// description: ID of the conversation. +// +// security: +// - OAuth2 Bearer: +// - write:conversations +// +// responses: +// '200': +// name: conversation +// description: Updated conversation. +// schema: +// "$ref": "#/definitions/conversation" +// '400': +// description: bad request +// '401': +// description: unauthorized +// '404': +// description: not found +// '406': +// description: not acceptable +// '422': +// description: unprocessable content +// '500': +// description: internal server error +func (m *Module) ConversationReadPOSTHandler(c *gin.Context) { + authed, err := oauth.Authed(c, true, true, true, true) + if err != nil { + apiutil.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGetV1) + return + } + + if _, err := apiutil.NegotiateAccept(c, apiutil.JSONAcceptHeaders...); err != nil { + apiutil.ErrorHandler(c, gtserror.NewErrorNotAcceptable(err, err.Error()), m.processor.InstanceGetV1) + return + } + + id, errWithCode := apiutil.ParseID(c.Param(apiutil.IDKey)) + if errWithCode != nil { + apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) + return + } + + apiConversation, errWithCode := m.processor.Conversations().Read(c.Request.Context(), authed.Account, id) + if errWithCode != nil { + apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) + return + } + + apiutil.JSON(c, http.StatusOK, apiConversation) +} diff --git a/internal/api/client/conversations/conversations.go b/internal/api/client/conversations/conversations.go index be19a9cdc..e742c8d3d 100644 --- a/internal/api/client/conversations/conversations.go +++ b/internal/api/client/conversations/conversations.go @@ -21,13 +21,17 @@ import ( "net/http" "github.com/gin-gonic/gin" + apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" "github.com/superseriousbusiness/gotosocial/internal/processing" ) const ( - // BasePath is the base URI path for serving - // conversations, minus the api prefix. + // BasePath is the base path for serving the conversations API, minus the 'api' prefix. BasePath = "/v1/conversations" + // BasePathWithID is the base path with the ID key in it, for operations on an existing conversation. + BasePathWithID = BasePath + "/:" + apiutil.IDKey + // ReadPathWithID is the path for marking an existing conversation as read. + ReadPathWithID = BasePathWithID + "/read" ) type Module struct { @@ -42,4 +46,6 @@ func New(processor *processing.Processor) *Module { func (m *Module) Route(attachHandler func(method string, path string, f ...gin.HandlerFunc) gin.IRoutes) { attachHandler(http.MethodGet, BasePath, m.ConversationsGETHandler) + attachHandler(http.MethodDelete, BasePathWithID, m.ConversationDELETEHandler) + attachHandler(http.MethodPost, ReadPathWithID, m.ConversationReadPOSTHandler) } diff --git a/internal/api/client/conversations/conversationsget.go b/internal/api/client/conversations/conversationsget.go index 11bddb1ce..663b9a707 100644 --- a/internal/api/client/conversations/conversationsget.go +++ b/internal/api/client/conversations/conversationsget.go @@ -24,14 +24,13 @@ import ( apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/paging" ) // ConversationsGETHandler swagger:operation GET /api/v1/conversations conversationsGet // // Get an array of (direct message) conversations that requesting account is involved in. // -// NOT IMPLEMENTED YET: Will currently always return an array of length 0. -// // The next and previous queries can be parsed from the returned Link header. // Example: // @@ -51,26 +50,26 @@ import ( // name: max_id // type: string // description: >- -// Return only conversations *OLDER* than the given max ID. +// Return only conversations with last statuses *OLDER* than the given max ID. // The conversation with the specified ID will not be included in the response. -// NOTE: the ID is of the internal conversation, use the Link header for pagination. +// NOTE: The ID is a status ID. Use the Link header for pagination. // in: query // required: false // - // name: since_id // type: string // description: >- -// Return only conversations *NEWER* than the given since ID. +// Return only conversations with last statuses *NEWER* than the given since ID. // The conversation with the specified ID will not be included in the response. -// NOTE: the ID is of the internal conversation, use the Link header for pagination. +// NOTE: The ID is a status ID. Use the Link header for pagination. // in: query // - // name: min_id // type: string // description: >- -// Return only conversations *IMMEDIATELY NEWER* than the given min ID. +// Return only conversations with last statuses *IMMEDIATELY NEWER* than the given min ID. // The conversation with the specified ID will not be included in the response. -// NOTE: the ID is of the internal conversation, use the Link header for pagination. +// NOTE: The ID is a status ID. Use the Link header for pagination. // in: query // required: false // - @@ -108,7 +107,8 @@ import ( // '500': // description: internal server error func (m *Module) ConversationsGETHandler(c *gin.Context) { - if _, err := oauth.Authed(c, true, true, true, true); err != nil { + authed, err := oauth.Authed(c, true, true, true, true) + if err != nil { apiutil.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGetV1) return } @@ -118,5 +118,29 @@ func (m *Module) ConversationsGETHandler(c *gin.Context) { return } - apiutil.Data(c, http.StatusOK, apiutil.AppJSON, apiutil.EmptyJSONArray) + page, errWithCode := paging.ParseIDPage(c, + 1, // min limit + 80, // max limit + 40, // default limit + ) + if errWithCode != nil { + apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) + return + } + + resp, errWithCode := m.processor.Conversations().GetAll( + c.Request.Context(), + authed.Account, + page, + ) + if errWithCode != nil { + apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) + return + } + + if resp.LinkHeader != "" { + c.Header("Link", resp.LinkHeader) + } + + apiutil.JSON(c, http.StatusOK, resp.Items) } diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 5a8a92ca3..8b0c04ea4 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -60,6 +60,8 @@ func (c *Caches) Init() { c.initBlockIDs() c.initBoostOfIDs() c.initClient() + c.initConversation() + c.initConversationLastStatusIDs() c.initDomainAllow() c.initDomainBlock() c.initEmoji() diff --git a/internal/cache/db.go b/internal/cache/db.go index 50acf00d1..4c063b06d 100644 --- a/internal/cache/db.go +++ b/internal/cache/db.go @@ -56,6 +56,12 @@ type GTSCaches struct { // Client provides access to the gtsmodel Client database cache. Client StructCache[*gtsmodel.Client] + // Conversation provides access to the gtsmodel Conversation database cache. + Conversation StructCache[*gtsmodel.Conversation] + + // ConversationLastStatusIDs provides access to the conversation last status IDs database cache. + ConversationLastStatusIDs SliceCache[string] + // DomainAllow provides access to the domain allow database cache. DomainAllow *domain.Cache @@ -426,6 +432,52 @@ func (c *Caches) initClient() { }) } +func (c *Caches) initConversation() { + cap := calculateResultCacheMax( + sizeofConversation(), // model in-mem size. + config.GetCacheConversationMemRatio(), + ) + + log.Infof(nil, "cache size = %d", cap) + + copyF := func(c1 *gtsmodel.Conversation) *gtsmodel.Conversation { + c2 := new(gtsmodel.Conversation) + *c2 = *c1 + + // Don't include ptr fields that + // will be populated separately. + // See internal/db/bundb/conversation.go. + c2.Account = nil + c2.OtherAccounts = nil + c2.LastStatus = nil + + return c2 + } + + c.GTS.Conversation.Init(structr.CacheConfig[*gtsmodel.Conversation]{ + Indices: []structr.IndexConfig{ + {Fields: "ID"}, + {Fields: "ThreadID,AccountID,OtherAccountsKey"}, + {Fields: "AccountID,LastStatusID"}, + {Fields: "AccountID", Multiple: true}, + }, + MaxSize: cap, + IgnoreErr: ignoreErrors, + Copy: copyF, + Invalidate: c.OnInvalidateConversation, + }) +} + +func (c *Caches) initConversationLastStatusIDs() { + cap := calculateSliceCacheMax( + config.GetCacheConversationLastStatusIDsMemRatio(), + ) + + log.Infof(nil, "cache size = %d", cap) + + c.GTS.ConversationLastStatusIDs.Init(0, cap) +} + func (c *Caches) initDomainAllow() { c.GTS.DomainAllow = new(domain.Cache) } diff --git a/internal/cache/invalidate.go b/internal/cache/invalidate.go index 088e7f91f..987a6eb64 100644 --- a/internal/cache/invalidate.go +++ b/internal/cache/invalidate.go @@ -83,6 +83,11 @@ func (c *Caches) OnInvalidateClient(client *gtsmodel.Client) { c.GTS.Token.Invalidate("ClientID", client.ID) } +func (c *Caches) OnInvalidateConversation(conversation *gtsmodel.Conversation) { + // Invalidate owning account's conversation list. + c.GTS.ConversationLastStatusIDs.Invalidate(conversation.AccountID) +} + func (c *Caches) OnInvalidateEmojiCategory(category *gtsmodel.EmojiCategory) { // Invalidate any emoji in this category. c.GTS.Emoji.Invalidate("CategoryID", category.ID) diff --git a/internal/cache/size.go b/internal/cache/size.go index 4ec30fbb7..4c474fa28 100644 --- a/internal/cache/size.go +++ b/internal/cache/size.go @@ -19,6 +19,7 @@ package cache import ( "crypto/rsa" + "strings" "time" "unsafe" @@ -320,6 +321,20 @@ func sizeofClient() uintptr { })) } +func sizeofConversation() uintptr { + return uintptr(size.Of(>smodel.Conversation{ + ID: exampleID, + CreatedAt: exampleTime, + UpdatedAt: exampleTime, + AccountID: exampleID, + OtherAccountIDs: []string{exampleID, exampleID, exampleID}, + OtherAccountsKey: strings.Join([]string{exampleID, exampleID, exampleID}, ","), + ThreadID: exampleID, + LastStatusID: exampleID, + Read: util.Ptr(true), + })) +} + func sizeofEmoji() uintptr { return uintptr(size.Of(>smodel.Emoji{ ID: exampleID, diff --git a/internal/cache/wrappers.go b/internal/cache/wrappers.go index edeea9bcd..9cb4fca98 100644 --- a/internal/cache/wrappers.go +++ b/internal/cache/wrappers.go @@ -158,6 +158,34 @@ func (c *StructCache[T]) LoadIDs(index string, ids []string, load func([]string) }) } +// LoadIDs2Part works as LoadIDs, except using a two-part key, +// where the first part is an ID shared by all the objects, +// and the second part is a list of per-object IDs. +func (c *StructCache[T]) LoadIDs2Part(index string, id1 string, id2s []string, load func(string, []string) ([]T, error)) ([]T, error) { + i := c.index[index] + if i == nil { + // we only perform this check here as + // we're going to use the index before + // passing it to cache in main .Load(). + panic("missing index for cache type") + } + + // Generate cache keys for two-part IDs. + keys := make([]structr.Key, len(id2s)) + for x, id2 := range id2s { + keys[x] = i.Key(id1, id2) + } + + // Pass loader callback with wrapper onto main cache load function. + return c.cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) { + uncachedIDs := make([]string, len(uncached)) + for i := range uncached { + uncachedIDs[i] = uncached[i].Values()[1].(string) + } + return load(id1, uncachedIDs) + }) +} + // Store: see structr.Cache{}.Store(). func (c *StructCache[T]) Store(value T, store func() error) error { return c.cache.Store(value, store) diff --git a/internal/config/config.go b/internal/config/config.go index bffa5b455..1b8cf2759 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -191,53 +191,55 @@ type HTTPClientConfiguration struct { } type CacheConfiguration struct { - MemoryTarget bytesize.Size `name:"memory-target"` - AccountMemRatio float64 `name:"account-mem-ratio"` - AccountNoteMemRatio float64 `name:"account-note-mem-ratio"` - AccountSettingsMemRatio float64 `name:"account-settings-mem-ratio"` - AccountStatsMemRatio float64 `name:"account-stats-mem-ratio"` - ApplicationMemRatio float64 `name:"application-mem-ratio"` - BlockMemRatio float64 `name:"block-mem-ratio"` - BlockIDsMemRatio float64 `name:"block-ids-mem-ratio"` - BoostOfIDsMemRatio float64 `name:"boost-of-ids-mem-ratio"` - ClientMemRatio float64 `name:"client-mem-ratio"` - EmojiMemRatio float64 `name:"emoji-mem-ratio"` - EmojiCategoryMemRatio float64 `name:"emoji-category-mem-ratio"` - FilterMemRatio float64 `name:"filter-mem-ratio"` - FilterKeywordMemRatio float64 `name:"filter-keyword-mem-ratio"` - FilterStatusMemRatio float64 `name:"filter-status-mem-ratio"` - FollowMemRatio float64 `name:"follow-mem-ratio"` - FollowIDsMemRatio float64 `name:"follow-ids-mem-ratio"` - FollowRequestMemRatio float64 `name:"follow-request-mem-ratio"` - FollowRequestIDsMemRatio float64 `name:"follow-request-ids-mem-ratio"` - InReplyToIDsMemRatio float64 `name:"in-reply-to-ids-mem-ratio"` - InstanceMemRatio float64 `name:"instance-mem-ratio"` - InteractionApprovalMemRatio float64 `name:"interaction-approval-mem-ratio"` - ListMemRatio float64 `name:"list-mem-ratio"` - ListEntryMemRatio float64 `name:"list-entry-mem-ratio"` - MarkerMemRatio float64 `name:"marker-mem-ratio"` - MediaMemRatio float64 `name:"media-mem-ratio"` - MentionMemRatio float64 `name:"mention-mem-ratio"` - MoveMemRatio float64 `name:"move-mem-ratio"` - NotificationMemRatio float64 `name:"notification-mem-ratio"` - PollMemRatio float64 `name:"poll-mem-ratio"` - PollVoteMemRatio float64 `name:"poll-vote-mem-ratio"` - PollVoteIDsMemRatio float64 `name:"poll-vote-ids-mem-ratio"` - ReportMemRatio float64 `name:"report-mem-ratio"` - StatusMemRatio float64 `name:"status-mem-ratio"` - StatusBookmarkMemRatio float64 `name:"status-bookmark-mem-ratio"` - StatusBookmarkIDsMemRatio float64 `name:"status-bookmark-ids-mem-ratio"` - StatusFaveMemRatio float64 `name:"status-fave-mem-ratio"` - StatusFaveIDsMemRatio float64 `name:"status-fave-ids-mem-ratio"` - TagMemRatio float64 `name:"tag-mem-ratio"` - ThreadMuteMemRatio float64 `name:"thread-mute-mem-ratio"` - TokenMemRatio float64 `name:"token-mem-ratio"` - TombstoneMemRatio float64 `name:"tombstone-mem-ratio"` - UserMemRatio float64 `name:"user-mem-ratio"` - UserMuteMemRatio float64 `name:"user-mute-mem-ratio"` - UserMuteIDsMemRatio float64 `name:"user-mute-ids-mem-ratio"` - WebfingerMemRatio float64 `name:"webfinger-mem-ratio"` - VisibilityMemRatio float64 `name:"visibility-mem-ratio"` + MemoryTarget bytesize.Size `name:"memory-target"` + AccountMemRatio float64 `name:"account-mem-ratio"` + AccountNoteMemRatio float64 `name:"account-note-mem-ratio"` + AccountSettingsMemRatio float64 `name:"account-settings-mem-ratio"` + AccountStatsMemRatio float64 `name:"account-stats-mem-ratio"` + ApplicationMemRatio float64 `name:"application-mem-ratio"` + BlockMemRatio float64 `name:"block-mem-ratio"` + BlockIDsMemRatio float64 `name:"block-ids-mem-ratio"` + BoostOfIDsMemRatio float64 `name:"boost-of-ids-mem-ratio"` + ClientMemRatio float64 `name:"client-mem-ratio"` + ConversationMemRatio float64 `name:"conversation-mem-ratio"` + ConversationLastStatusIDsMemRatio float64 `name:"conversation-last-status-ids-mem-ratio"` + EmojiMemRatio float64 `name:"emoji-mem-ratio"` + EmojiCategoryMemRatio float64 `name:"emoji-category-mem-ratio"` + FilterMemRatio float64 `name:"filter-mem-ratio"` + FilterKeywordMemRatio float64 `name:"filter-keyword-mem-ratio"` + FilterStatusMemRatio float64 `name:"filter-status-mem-ratio"` + FollowMemRatio float64 `name:"follow-mem-ratio"` + FollowIDsMemRatio float64 `name:"follow-ids-mem-ratio"` + FollowRequestMemRatio float64 `name:"follow-request-mem-ratio"` + FollowRequestIDsMemRatio float64 `name:"follow-request-ids-mem-ratio"` + InReplyToIDsMemRatio float64 `name:"in-reply-to-ids-mem-ratio"` + InstanceMemRatio float64 `name:"instance-mem-ratio"` + InteractionApprovalMemRatio float64 `name:"interaction-approval-mem-ratio"` + ListMemRatio float64 `name:"list-mem-ratio"` + ListEntryMemRatio float64 `name:"list-entry-mem-ratio"` + MarkerMemRatio float64 `name:"marker-mem-ratio"` + MediaMemRatio float64 `name:"media-mem-ratio"` + MentionMemRatio float64 `name:"mention-mem-ratio"` + MoveMemRatio float64 `name:"move-mem-ratio"` + NotificationMemRatio float64 `name:"notification-mem-ratio"` + PollMemRatio float64 `name:"poll-mem-ratio"` + PollVoteMemRatio float64 `name:"poll-vote-mem-ratio"` + PollVoteIDsMemRatio float64 `name:"poll-vote-ids-mem-ratio"` + ReportMemRatio float64 `name:"report-mem-ratio"` + StatusMemRatio float64 `name:"status-mem-ratio"` + StatusBookmarkMemRatio float64 `name:"status-bookmark-mem-ratio"` + StatusBookmarkIDsMemRatio float64 `name:"status-bookmark-ids-mem-ratio"` + StatusFaveMemRatio float64 `name:"status-fave-mem-ratio"` + StatusFaveIDsMemRatio float64 `name:"status-fave-ids-mem-ratio"` + TagMemRatio float64 `name:"tag-mem-ratio"` + ThreadMuteMemRatio float64 `name:"thread-mute-mem-ratio"` + TokenMemRatio float64 `name:"token-mem-ratio"` + TombstoneMemRatio float64 `name:"tombstone-mem-ratio"` + UserMemRatio float64 `name:"user-mem-ratio"` + UserMuteMemRatio float64 `name:"user-mute-mem-ratio"` + UserMuteIDsMemRatio float64 `name:"user-mute-ids-mem-ratio"` + WebfingerMemRatio float64 `name:"webfinger-mem-ratio"` + VisibilityMemRatio float64 `name:"visibility-mem-ratio"` } // MarshalMap will marshal current Configuration into a map structure (useful for JSON/TOML/YAML). diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 267e7b4bc..82ea07e10 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -156,52 +156,54 @@ var Defaults = Configuration{ // when TODO items in the size.go source // file have been addressed, these should // be able to make some more sense :D - AccountMemRatio: 5, - AccountNoteMemRatio: 1, - AccountSettingsMemRatio: 0.1, - AccountStatsMemRatio: 2, - ApplicationMemRatio: 0.1, - BlockMemRatio: 2, - BlockIDsMemRatio: 3, - BoostOfIDsMemRatio: 3, - ClientMemRatio: 0.1, - EmojiMemRatio: 3, - EmojiCategoryMemRatio: 0.1, - FilterMemRatio: 0.5, - FilterKeywordMemRatio: 0.5, - FilterStatusMemRatio: 0.5, - FollowMemRatio: 2, - FollowIDsMemRatio: 4, - FollowRequestMemRatio: 2, - FollowRequestIDsMemRatio: 2, - InReplyToIDsMemRatio: 3, - InstanceMemRatio: 1, - InteractionApprovalMemRatio: 1, - ListMemRatio: 1, - ListEntryMemRatio: 2, - MarkerMemRatio: 0.5, - MediaMemRatio: 4, - MentionMemRatio: 2, - MoveMemRatio: 0.1, - NotificationMemRatio: 2, - PollMemRatio: 1, - PollVoteMemRatio: 2, - PollVoteIDsMemRatio: 2, - ReportMemRatio: 1, - StatusMemRatio: 5, - StatusBookmarkMemRatio: 0.5, - StatusBookmarkIDsMemRatio: 2, - StatusFaveMemRatio: 2, - StatusFaveIDsMemRatio: 3, - TagMemRatio: 2, - ThreadMuteMemRatio: 0.2, - TokenMemRatio: 0.75, - TombstoneMemRatio: 0.5, - UserMemRatio: 0.25, - UserMuteMemRatio: 2, - UserMuteIDsMemRatio: 3, - WebfingerMemRatio: 0.1, - VisibilityMemRatio: 2, + AccountMemRatio: 5, + AccountNoteMemRatio: 1, + AccountSettingsMemRatio: 0.1, + AccountStatsMemRatio: 2, + ApplicationMemRatio: 0.1, + BlockMemRatio: 2, + BlockIDsMemRatio: 3, + BoostOfIDsMemRatio: 3, + ClientMemRatio: 0.1, + ConversationMemRatio: 1, + ConversationLastStatusIDsMemRatio: 2, + EmojiMemRatio: 3, + EmojiCategoryMemRatio: 0.1, + FilterMemRatio: 0.5, + FilterKeywordMemRatio: 0.5, + FilterStatusMemRatio: 0.5, + FollowMemRatio: 2, + FollowIDsMemRatio: 4, + FollowRequestMemRatio: 2, + FollowRequestIDsMemRatio: 2, + InReplyToIDsMemRatio: 3, + InstanceMemRatio: 1, + InteractionApprovalMemRatio: 1, + ListMemRatio: 1, + ListEntryMemRatio: 2, + MarkerMemRatio: 0.5, + MediaMemRatio: 4, + MentionMemRatio: 2, + MoveMemRatio: 0.1, + NotificationMemRatio: 2, + PollMemRatio: 1, + PollVoteMemRatio: 2, + PollVoteIDsMemRatio: 2, + ReportMemRatio: 1, + StatusMemRatio: 5, + StatusBookmarkMemRatio: 0.5, + StatusBookmarkIDsMemRatio: 2, + StatusFaveMemRatio: 2, + StatusFaveIDsMemRatio: 3, + TagMemRatio: 2, + ThreadMuteMemRatio: 0.2, + TokenMemRatio: 0.75, + TombstoneMemRatio: 0.5, + UserMemRatio: 0.25, + UserMuteMemRatio: 2, + UserMuteIDsMemRatio: 3, + WebfingerMemRatio: 0.1, + VisibilityMemRatio: 2, }, HTTPClient: HTTPClientConfiguration{ diff --git a/internal/config/helpers.gen.go b/internal/config/helpers.gen.go index 8c27da439..932cb802d 100644 --- a/internal/config/helpers.gen.go +++ b/internal/config/helpers.gen.go @@ -2975,6 +2975,62 @@ func GetCacheClientMemRatio() float64 { return global.GetCacheClientMemRatio() } // SetCacheClientMemRatio safely sets the value for global configuration 'Cache.ClientMemRatio' field func SetCacheClientMemRatio(v float64) { global.SetCacheClientMemRatio(v) } +// GetCacheConversationMemRatio safely fetches the Configuration value for state's 'Cache.ConversationMemRatio' field +func (st *ConfigState) GetCacheConversationMemRatio() (v float64) { + st.mutex.RLock() + v = st.config.Cache.ConversationMemRatio + st.mutex.RUnlock() + return +} + +// SetCacheConversationMemRatio safely sets the Configuration value for state's 'Cache.ConversationMemRatio' field +func (st *ConfigState) SetCacheConversationMemRatio(v float64) { + st.mutex.Lock() + defer st.mutex.Unlock() + st.config.Cache.ConversationMemRatio = v + st.reloadToViper() +} + +// CacheConversationMemRatioFlag returns the flag name for the 'Cache.ConversationMemRatio' field +func CacheConversationMemRatioFlag() string { return "cache-conversation-mem-ratio" } + +// GetCacheConversationMemRatio safely fetches the value for global configuration 'Cache.ConversationMemRatio' field +func GetCacheConversationMemRatio() float64 { return global.GetCacheConversationMemRatio() } + +// SetCacheConversationMemRatio safely sets the value for global configuration 'Cache.ConversationMemRatio' field +func SetCacheConversationMemRatio(v float64) { global.SetCacheConversationMemRatio(v) } + +// GetCacheConversationLastStatusIDsMemRatio safely fetches the Configuration value for state's 'Cache.ConversationLastStatusIDsMemRatio' field +func (st *ConfigState) GetCacheConversationLastStatusIDsMemRatio() (v float64) { + st.mutex.RLock() + v = st.config.Cache.ConversationLastStatusIDsMemRatio + st.mutex.RUnlock() + return +} + +// SetCacheConversationLastStatusIDsMemRatio safely sets the Configuration value for state's 'Cache.ConversationLastStatusIDsMemRatio' field +func (st *ConfigState) SetCacheConversationLastStatusIDsMemRatio(v float64) { + st.mutex.Lock() + defer st.mutex.Unlock() + st.config.Cache.ConversationLastStatusIDsMemRatio = v + st.reloadToViper() +} + +// CacheConversationLastStatusIDsMemRatioFlag returns the flag name for the 'Cache.ConversationLastStatusIDsMemRatio' field +func CacheConversationLastStatusIDsMemRatioFlag() string { + return "cache-conversation-last-status-ids-mem-ratio" +} + +// GetCacheConversationLastStatusIDsMemRatio safely fetches the value for global configuration 'Cache.ConversationLastStatusIDsMemRatio' field +func GetCacheConversationLastStatusIDsMemRatio() float64 { + return global.GetCacheConversationLastStatusIDsMemRatio() +} + +// SetCacheConversationLastStatusIDsMemRatio safely sets the value for global configuration 'Cache.ConversationLastStatusIDsMemRatio' field +func SetCacheConversationLastStatusIDsMemRatio(v float64) { + global.SetCacheConversationLastStatusIDsMemRatio(v) +} + // GetCacheEmojiMemRatio safely fetches the Configuration value for state's 'Cache.EmojiMemRatio' field func (st *ConfigState) GetCacheEmojiMemRatio() (v float64) { st.mutex.RLock() diff --git a/internal/db/advancedmigration.go b/internal/db/advancedmigration.go new file mode 100644 index 000000000..2b4601bdb --- /dev/null +++ b/internal/db/advancedmigration.go @@ -0,0 +1,29 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package db + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +type AdvancedMigration interface { + GetAdvancedMigration(ctx context.Context, id string) (*gtsmodel.AdvancedMigration, error) + PutAdvancedMigration(ctx context.Context, advancedMigration *gtsmodel.AdvancedMigration) error +} diff --git a/internal/db/bundb/advancedmigration.go b/internal/db/bundb/advancedmigration.go new file mode 100644 index 000000000..2a0ec93e6 --- /dev/null +++ b/internal/db/bundb/advancedmigration.go @@ -0,0 +1,52 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package bundb + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/uptrace/bun" +) + +type advancedMigrationDB struct { + db *bun.DB + state *state.State +} + +func (a *advancedMigrationDB) GetAdvancedMigration(ctx context.Context, id string) (*gtsmodel.AdvancedMigration, error) { + var advancedMigration gtsmodel.AdvancedMigration + err := a.db.NewSelect(). + Model(&advancedMigration). + Where("? = ?", bun.Ident("id"), id). + Limit(1). + Scan(ctx) + if err != nil { + return nil, err + } + return &advancedMigration, nil +} + +func (a *advancedMigrationDB) PutAdvancedMigration(ctx context.Context, advancedMigration *gtsmodel.AdvancedMigration) error { + _, err := NewUpsert(a.db). + Model(advancedMigration). + Constraint("id"). + Exec(ctx) + return err +} diff --git a/internal/db/bundb/bundb.go b/internal/db/bundb/bundb.go index 57fb661df..070d4eb91 100644 --- a/internal/db/bundb/bundb.go +++ b/internal/db/bundb/bundb.go @@ -54,8 +54,10 @@ import ( type DBService struct { db.Account db.Admin + db.AdvancedMigration db.Application db.Basic + db.Conversation db.Domain db.Emoji db.HeaderFilter @@ -158,6 +160,7 @@ func NewBunDBService(ctx context.Context, state *state.State) (db.DB, error) { // https://bun.uptrace.dev/orm/many-to-many-relation/ for _, t := range []interface{}{ >smodel.AccountToEmoji{}, + >smodel.ConversationToStatus{}, >smodel.StatusToEmoji{}, >smodel.StatusToTag{}, >smodel.ThreadToStatus{}, @@ -181,6 +184,10 @@ func NewBunDBService(ctx context.Context, state *state.State) (db.DB, error) { db: db, state: state, }, + AdvancedMigration: &advancedMigrationDB{ + db: db, + state: state, + }, Application: &applicationDB{ db: db, state: state, @@ -188,6 +195,10 @@ func NewBunDBService(ctx context.Context, state *state.State) (db.DB, error) { Basic: &basicDB{ db: db, }, + Conversation: &conversationDB{ + db: db, + state: state, + }, Domain: &domainDB{ db: db, state: state, diff --git a/internal/db/bundb/conversation.go b/internal/db/bundb/conversation.go new file mode 100644 index 000000000..1a3958a79 --- /dev/null +++ b/internal/db/bundb/conversation.go @@ -0,0 +1,494 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package bundb + +import ( + "context" + "errors" + "slices" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/paging" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/uptrace/bun" + "github.com/uptrace/bun/dialect" +) + +type conversationDB struct { + db *bun.DB + state *state.State +} + +func (c *conversationDB) GetConversationByID(ctx context.Context, id string) (*gtsmodel.Conversation, error) { + return c.getConversation( + ctx, + "ID", + func(conversation *gtsmodel.Conversation) error { + return c.db. + NewSelect(). + Model(conversation). + Where("? = ?", bun.Ident("id"), id). + Scan(ctx) + }, + id, + ) +} + +func (c *conversationDB) GetConversationByThreadAndAccountIDs(ctx context.Context, threadID string, accountID string, otherAccountIDs []string) (*gtsmodel.Conversation, error) { + otherAccountsKey := gtsmodel.ConversationOtherAccountsKey(otherAccountIDs) + return c.getConversation( + ctx, + "ThreadID,AccountID,OtherAccountsKey", + func(conversation *gtsmodel.Conversation) error { + return c.db. + NewSelect(). + Model(conversation). + Where("? = ?", bun.Ident("thread_id"), threadID). + Where("? = ?", bun.Ident("account_id"), accountID). + Where("? = ?", bun.Ident("other_accounts_key"), otherAccountsKey). + Scan(ctx) + }, + threadID, + accountID, + otherAccountsKey, + ) +} + +func (c *conversationDB) getConversation( + ctx context.Context, + lookup string, + dbQuery func(conversation *gtsmodel.Conversation) error, + keyParts ...any, +) (*gtsmodel.Conversation, error) { + // Fetch conversation from cache with loader callback + conversation, err := c.state.Caches.GTS.Conversation.LoadOne(lookup, func() (*gtsmodel.Conversation, error) { + var conversation gtsmodel.Conversation + + // Not cached! Perform database query + if err := dbQuery(&conversation); err != nil { + return nil, err + } + + return &conversation, nil + }, keyParts...) + if err != nil { + // already processe + return nil, err + } + + if gtscontext.Barebones(ctx) { + // Only a barebones model was requested. + return conversation, nil + } + + if err := c.populateConversation(ctx, conversation); err != nil { + return nil, err + } + + return conversation, nil +} + +func (c *conversationDB) populateConversation(ctx context.Context, conversation *gtsmodel.Conversation) error { + var ( + errs gtserror.MultiError + err error + ) + + if conversation.Account == nil { + conversation.Account, err = c.state.DB.GetAccountByID( + gtscontext.SetBarebones(ctx), + conversation.AccountID, + ) + if err != nil { + errs.Appendf("error populating conversation owner account: %w", err) + } + } + + if conversation.OtherAccounts == nil { + conversation.OtherAccounts, err = c.state.DB.GetAccountsByIDs( + gtscontext.SetBarebones(ctx), + conversation.OtherAccountIDs, + ) + if err != nil { + errs.Appendf("error populating other conversation accounts: %w", err) + } + } + + if conversation.LastStatus == nil && conversation.LastStatusID != "" { + conversation.LastStatus, err = c.state.DB.GetStatusByID( + gtscontext.SetBarebones(ctx), + conversation.LastStatusID, + ) + if err != nil { + errs.Appendf("error populating conversation last status: %w", err) + } + } + + return errs.Combine() +} + +func (c *conversationDB) GetConversationsByOwnerAccountID(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Conversation, error) { + conversationLastStatusIDs, err := c.getAccountConversationLastStatusIDs(ctx, accountID, page) + if err != nil { + return nil, err + } + return c.getConversationsByLastStatusIDs(ctx, accountID, conversationLastStatusIDs) +} + +func (c *conversationDB) getAccountConversationLastStatusIDs(ctx context.Context, accountID string, page *paging.Page) ([]string, error) { + return loadPagedIDs(&c.state.Caches.GTS.ConversationLastStatusIDs, accountID, page, func() ([]string, error) { + var conversationLastStatusIDs []string + + // Conversation last status IDs not in cache. Perform DB query. + if _, err := c.db. + NewSelect(). + Model((*gtsmodel.Conversation)(nil)). + Column("last_status_id"). + Where("? = ?", bun.Ident("account_id"), accountID). + OrderExpr("? DESC", bun.Ident("last_status_id")). + Exec(ctx, &conversationLastStatusIDs); // nocollapse + err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, err + } + + return conversationLastStatusIDs, nil + }) +} + +func (c *conversationDB) getConversationsByLastStatusIDs( + ctx context.Context, + accountID string, + conversationLastStatusIDs []string, +) ([]*gtsmodel.Conversation, error) { + // Load all conversation IDs via cache loader callbacks. + conversations, err := c.state.Caches.GTS.Conversation.LoadIDs2Part( + "AccountID,LastStatusID", + accountID, + conversationLastStatusIDs, + func(accountID string, uncached []string) ([]*gtsmodel.Conversation, error) { + // Preallocate expected length of uncached conversations. + conversations := make([]*gtsmodel.Conversation, 0, len(uncached)) + + // Perform database query scanning the remaining (uncached) IDs. + if err := c.db.NewSelect(). + Model(&conversations). + Where("? = ?", bun.Ident("account_id"), accountID). + Where("? IN (?)", bun.Ident("last_status_id"), bun.In(uncached)). + Scan(ctx); err != nil { + return nil, err + } + + return conversations, nil + }, + ) + if err != nil { + return nil, err + } + + // Reorder the conversations by their last status IDs to ensure correct order. + getID := func(b *gtsmodel.Conversation) string { return b.ID } + util.OrderBy(conversations, conversationLastStatusIDs, getID) + + if gtscontext.Barebones(ctx) { + // no need to fully populate. + return conversations, nil + } + + // Populate all loaded conversations, removing those we fail to populate. + conversations = slices.DeleteFunc(conversations, func(conversation *gtsmodel.Conversation) bool { + if err := c.populateConversation(ctx, conversation); err != nil { + log.Errorf(ctx, "error populating conversation %s: %v", conversation.ID, err) + return true + } + return false + }) + + return conversations, nil +} + +func (c *conversationDB) UpsertConversation(ctx context.Context, conversation *gtsmodel.Conversation, columns ...string) error { + // If we're updating by column, ensure "updated_at" is included. + if len(columns) > 0 { + columns = append(columns, "updated_at") + } + + return c.state.Caches.GTS.Conversation.Store(conversation, func() error { + _, err := NewUpsert(c.db). + Model(conversation). + Constraint("id"). + Column(columns...). + Exec(ctx) + return err + }) +} + +func (c *conversationDB) LinkConversationToStatus(ctx context.Context, conversationID string, statusID string) error { + conversationToStatus := >smodel.ConversationToStatus{ + ConversationID: conversationID, + StatusID: statusID, + } + + if _, err := c.db.NewInsert(). + Model(conversationToStatus). + Exec(ctx); // nocollapse + err != nil { + return err + } + return nil +} + +func (c *conversationDB) DeleteConversationByID(ctx context.Context, id string) error { + // Load conversation into cache before attempting a delete, + // as we need it cached in order to trigger the invalidate + // callback. This in turn invalidates others. + _, err := c.GetConversationByID(gtscontext.SetBarebones(ctx), id) + if err != nil { + if errors.Is(err, db.ErrNoEntries) { + // not an issue. + err = nil + } + return err + } + + // Drop this now-cached conversation on return after delete. + defer c.state.Caches.GTS.Conversation.Invalidate("ID", id) + + // Finally delete conversation from DB. + _, err = c.db.NewDelete(). + Model((*gtsmodel.Conversation)(nil)). + Where("? = ?", bun.Ident("id"), id). + Exec(ctx) + return err +} + +func (c *conversationDB) DeleteConversationsByOwnerAccountID(ctx context.Context, accountID string) error { + defer func() { + // Invalidate any cached conversations and conversation IDs owned by this account on return. + // Conversation invalidate hooks only invalidate the conversation ID cache, + // so we don't need to load all conversations into the cache to run invalidation hooks, + // as with some other object types (blocks, for example). + c.state.Caches.GTS.Conversation.Invalidate("AccountID", accountID) + // In case there were no cached conversations, + // explicitly invalidate the user's conversation last status ID cache. + c.state.Caches.GTS.ConversationLastStatusIDs.Invalidate(accountID) + }() + + return c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + // Delete conversations matching the account ID. + deletedConversationIDs := []string{} + if err := tx.NewDelete(). + Model((*gtsmodel.Conversation)(nil)). + Where("? = ?", bun.Ident("account_id"), accountID). + Returning("?", bun.Ident("id")). + Scan(ctx, &deletedConversationIDs); // nocollapse + err != nil { + return gtserror.Newf("error deleting conversations for account %s: %w", accountID, err) + } + + // Delete any conversation-to-status links matching the deleted conversation IDs. + if _, err := tx.NewDelete(). + Model((*gtsmodel.ConversationToStatus)(nil)). + Where("? IN (?)", bun.Ident("conversation_id"), bun.In(deletedConversationIDs)). + Exec(ctx); // nocollapse + err != nil { + return gtserror.Newf("error deleting conversation-to-status links for account %s: %w", accountID, err) + } + + return nil + }) +} + +func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, statusID string) error { + // SQL returning the current time. + var nowSQL string + switch c.db.Dialect().Name() { + case dialect.SQLite: + nowSQL = "DATE('now')" + case dialect.PG: + nowSQL = "NOW()" + default: + log.Panicf(nil, "db conn %s was neither pg nor sqlite", c.db) + } + + updatedConversationIDs := []string{} + deletedConversationIDs := []string{} + + if err := c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + // Delete this status from conversation-to-status links. + if _, err := tx.NewDelete(). + Model((*gtsmodel.ConversationToStatus)(nil)). + Where("? = ?", bun.Ident("status_id"), statusID). + Exec(ctx); // nocollapse + err != nil { + return gtserror.Newf("error deleting conversation-to-status links while deleting status %s: %w", statusID, err) + } + + // Note: Bun doesn't currently support CREATE TABLE … AS SELECT … so we need to use raw queries here. + + // Create a temporary table with all statuses other than the deleted status + // in each conversation for which the deleted status is the last status + // (if there are such statuses). + conversationStatusesTempTable := "conversation_statuses_" + id.NewULID() + if _, err := tx.NewRaw( + "CREATE TEMPORARY TABLE ? AS ?", + bun.Ident(conversationStatusesTempTable), + tx.NewSelect(). + ColumnExpr( + "? AS ?", + bun.Ident("conversations.id"), + bun.Ident("conversation_id"), + ). + ColumnExpr( + "? AS ?", + bun.Ident("conversation_to_statuses.status_id"), + bun.Ident("id"), + ). + Column("statuses.created_at"). + Table("conversations"). + Join("LEFT JOIN ?", bun.Ident("conversation_to_statuses")). + JoinOn( + "? = ?", + bun.Ident("conversations.id"), + bun.Ident("conversation_to_statuses.conversation_id"), + ). + JoinOn( + "? != ?", + bun.Ident("conversation_to_statuses.status_id"), + statusID, + ). + Join("LEFT JOIN ?", bun.Ident("statuses")). + JoinOn( + "? = ?", + bun.Ident("conversation_to_statuses.status_id"), + bun.Ident("statuses.id"), + ). + Where( + "? = ?", + bun.Ident("conversations.last_status_id"), + statusID, + ), + ). + Exec(ctx); // nocollapse + err != nil { + return gtserror.Newf("error creating conversationStatusesTempTable while deleting status %s: %w", statusID, err) + } + + // Create a temporary table with the most recently created status in each conversation + // for which the deleted status is the last status (if there is such a status). + latestConversationStatusesTempTable := "latest_conversation_statuses_" + id.NewULID() + if _, err := tx.NewRaw( + "CREATE TEMPORARY TABLE ? AS ?", + bun.Ident(latestConversationStatusesTempTable), + tx.NewSelect(). + Column( + "conversation_statuses.conversation_id", + "conversation_statuses.id", + ). + TableExpr( + "? AS ?", + bun.Ident(conversationStatusesTempTable), + bun.Ident("conversation_statuses"), + ). + Join( + "LEFT JOIN ? AS ?", + bun.Ident(conversationStatusesTempTable), + bun.Ident("later_statuses"), + ). + JoinOn( + "? = ?", + bun.Ident("conversation_statuses.conversation_id"), + bun.Ident("later_statuses.conversation_id"), + ). + JoinOn( + "? > ?", + bun.Ident("later_statuses.created_at"), + bun.Ident("conversation_statuses.created_at"), + ). + Where("? IS NULL", bun.Ident("later_statuses.id")), + ). + Exec(ctx); // nocollapse + err != nil { + return gtserror.Newf("error creating latestConversationStatusesTempTable while deleting status %s: %w", statusID, err) + } + + // For every conversation where the given status was the last one, + // reset its last status to the most recently created in the conversation other than that one, + // if there is such a status. + // Return conversation IDs for invalidation. + if err := tx.NewUpdate(). + Model((*gtsmodel.Conversation)(nil)). + SetColumn("last_status_id", "?", bun.Ident("latest_conversation_statuses.id")). + SetColumn("updated_at", "?", bun.Safe(nowSQL)). + TableExpr("? AS ?", bun.Ident(latestConversationStatusesTempTable), bun.Ident("latest_conversation_statuses")). + Where("?TableAlias.? = ?", bun.Ident("id"), bun.Ident("latest_conversation_statuses.conversation_id")). + Where("? IS NOT NULL", bun.Ident("latest_conversation_statuses.id")). + Returning("?TableName.?", bun.Ident("id")). + Scan(ctx, &updatedConversationIDs); // nocollapse + err != nil { + return gtserror.Newf("error rolling back last status for conversation while deleting status %s: %w", statusID, err) + } + + // If there is no such status, delete the conversation. + // Return conversation IDs for invalidation. + if err := tx.NewDelete(). + Model((*gtsmodel.Conversation)(nil)). + Where( + "? IN (?)", + bun.Ident("id"), + tx.NewSelect(). + Table(latestConversationStatusesTempTable). + Column("conversation_id"). + Where("? IS NULL", bun.Ident("id")), + ). + Returning("?", bun.Ident("id")). + Scan(ctx, &deletedConversationIDs); // nocollapse + err != nil { + return gtserror.Newf("error deleting conversation while deleting status %s: %w", statusID, err) + } + + // Clean up. + for _, tempTable := range []string{ + conversationStatusesTempTable, + latestConversationStatusesTempTable, + } { + if _, err := tx.NewDropTable().Table(tempTable).Exec(ctx); err != nil { + return gtserror.Newf( + "error dropping temporary table %s after deleting status %s: %w", + tempTable, + statusID, + err, + ) + } + } + + return nil + }); err != nil { + return err + } + + updatedConversationIDs = append(updatedConversationIDs, deletedConversationIDs...) + c.state.Caches.GTS.Conversation.InvalidateIDs("ID", updatedConversationIDs) + + return nil +} diff --git a/internal/db/bundb/conversation_test.go b/internal/db/bundb/conversation_test.go new file mode 100644 index 000000000..24d35d482 --- /dev/null +++ b/internal/db/bundb/conversation_test.go @@ -0,0 +1,115 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package bundb_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/db/test" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +type ConversationTestSuite struct { + BunDBStandardTestSuite + + cf test.ConversationFactory + + // testAccount is the owner of statuses and conversations in these tests (must be local). + testAccount *gtsmodel.Account + // threadID is the thread used for statuses in any given test. + threadID string +} + +func (suite *ConversationTestSuite) SetupSuite() { + suite.BunDBStandardTestSuite.SetupSuite() + + suite.cf.SetupSuite(suite) + + suite.testAccount = suite.testAccounts["local_account_1"] +} + +func (suite *ConversationTestSuite) SetupTest() { + suite.BunDBStandardTestSuite.SetupTest() + + suite.cf.SetupTest(suite.db) + + suite.threadID = suite.cf.NewULID(0) +} + +// deleteStatus deletes a status from conversations and ends the test if that fails. +func (suite *ConversationTestSuite) deleteStatus(statusID string) { + err := suite.db.DeleteStatusFromConversations(context.Background(), statusID) + if err != nil { + suite.FailNow(err.Error()) + } +} + +// getConversation fetches a conversation by ID and ends the test if that fails. +func (suite *ConversationTestSuite) getConversation(conversationID string) *gtsmodel.Conversation { + conversation, err := suite.db.GetConversationByID(context.Background(), conversationID) + if err != nil { + suite.FailNow(err.Error()) + } + return conversation +} + +// If we delete a status that is in a conversation but not the last status, +// the conversation's last status should not change. +func (suite *ConversationTestSuite) TestDeleteNonLastStatus() { + conversation := suite.cf.NewTestConversation(suite.testAccount, 0) + initial := conversation.LastStatus + reply := suite.cf.NewTestStatus(suite.testAccount, conversation.ThreadID, 1*time.Second, initial) + conversation = suite.cf.SetLastStatus(conversation, reply) + + suite.deleteStatus(initial.ID) + conversation = suite.getConversation(conversation.ID) + suite.Equal(reply.ID, conversation.LastStatusID) +} + +// If we delete the last status in a conversation that has other statuses, +// a previous status should become the new last status. +func (suite *ConversationTestSuite) TestDeleteLastStatus() { + conversation := suite.cf.NewTestConversation(suite.testAccount, 0) + initial := conversation.LastStatus + reply := suite.cf.NewTestStatus(suite.testAccount, conversation.ThreadID, 1*time.Second, initial) + conversation = suite.cf.SetLastStatus(conversation, reply) + conversation = suite.getConversation(conversation.ID) + + suite.deleteStatus(reply.ID) + conversation = suite.getConversation(conversation.ID) + suite.Equal(initial.ID, conversation.LastStatusID) +} + +// If we delete the only status in a conversation, +// the conversation should be deleted as well. +func (suite *ConversationTestSuite) TestDeleteOnlyStatus() { + conversation := suite.cf.NewTestConversation(suite.testAccount, 0) + initial := conversation.LastStatus + + suite.deleteStatus(initial.ID) + _, err := suite.db.GetConversationByID(context.Background(), conversation.ID) + suite.ErrorIs(err, db.ErrNoEntries) +} + +func TestConversationTestSuite(t *testing.T) { + suite.Run(t, new(ConversationTestSuite)) +} diff --git a/internal/db/bundb/migrations/20240611190733_add_conversations.go b/internal/db/bundb/migrations/20240611190733_add_conversations.go new file mode 100644 index 000000000..25b226aff --- /dev/null +++ b/internal/db/bundb/migrations/20240611190733_add_conversations.go @@ -0,0 +1,78 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package migrations + +import ( + "context" + + gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/uptrace/bun" +) + +// Note: this migration has an advanced migration followup. +// See Conversations.MigrateDMs(). +func init() { + up := func(ctx context.Context, db *bun.DB) error { + return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + for _, model := range []interface{}{ + >smodel.Conversation{}, + >smodel.ConversationToStatus{}, + } { + if _, err := tx. + NewCreateTable(). + Model(model). + IfNotExists(). + Exec(ctx); err != nil { + return err + } + } + + // Add indexes to the conversations table. + for index, columns := range map[string][]string{ + "conversations_account_id_idx": { + "account_id", + }, + "conversations_last_status_id_idx": { + "last_status_id", + }, + } { + if _, err := tx. + NewCreateIndex(). + Model(>smodel.Conversation{}). + Index(index). + Column(columns...). + IfNotExists(). + Exec(ctx); err != nil { + return err + } + } + + return nil + }) + } + + down := func(ctx context.Context, db *bun.DB) error { + return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + return nil + }) + } + + if err := Migrations.Register(up, down); err != nil { + panic(err) + } +} diff --git a/internal/db/bundb/migrations/20240712005536_add_advanced_migrations.go b/internal/db/bundb/migrations/20240712005536_add_advanced_migrations.go new file mode 100644 index 000000000..183065285 --- /dev/null +++ b/internal/db/bundb/migrations/20240712005536_add_advanced_migrations.go @@ -0,0 +1,49 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package migrations + +import ( + "context" + + gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/uptrace/bun" +) + +// Create the advanced migrations table. +func init() { + up := func(ctx context.Context, db *bun.DB) error { + return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + _, err := tx. + NewCreateTable(). + Model((*gtsmodel.AdvancedMigration)(nil)). + IfNotExists(). + Exec(ctx) + return err + }) + } + + down := func(ctx context.Context, db *bun.DB) error { + return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + return nil + }) + } + + if err := Migrations.Register(up, down); err != nil { + panic(err) + } +} diff --git a/internal/db/bundb/status.go b/internal/db/bundb/status.go index dfb97cff1..b0ed32e0e 100644 --- a/internal/db/bundb/status.go +++ b/internal/db/bundb/status.go @@ -682,3 +682,35 @@ func (s *statusDB) getStatusBoostIDs(ctx context.Context, statusID string) ([]st return statusIDs, nil }) } + +func (s *statusDB) MaxDirectStatusID(ctx context.Context) (string, error) { + maxID := "" + if err := s.db. + NewSelect(). + Model((*gtsmodel.Status)(nil)). + ColumnExpr("COALESCE(MAX(?), '')", bun.Ident("id")). + Where("? = ?", bun.Ident("visibility"), gtsmodel.VisibilityDirect). + Scan(ctx, &maxID); // nocollapse + err != nil { + return "", err + } + return maxID, nil +} + +func (s *statusDB) GetDirectStatusIDsBatch(ctx context.Context, minID string, maxIDInclusive string, count int) ([]string, error) { + var statusIDs []string + if err := s.db. + NewSelect(). + Model((*gtsmodel.Status)(nil)). + Column("id"). + Where("? = ?", bun.Ident("visibility"), gtsmodel.VisibilityDirect). + Where("? > ?", bun.Ident("id"), minID). + Where("? <= ?", bun.Ident("id"), maxIDInclusive). + Order("id ASC"). + Limit(count). + Scan(ctx, &statusIDs); // nocollapse + err != nil { + return nil, err + } + return statusIDs, nil +} diff --git a/internal/db/conversation.go b/internal/db/conversation.go new file mode 100644 index 000000000..3d0b4213e --- /dev/null +++ b/internal/db/conversation.go @@ -0,0 +1,52 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package db + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/paging" +) + +type Conversation interface { + // GetConversationByID gets a single conversation by ID. + GetConversationByID(ctx context.Context, id string) (*gtsmodel.Conversation, error) + + // GetConversationByThreadAndAccountIDs retrieves a conversation by thread ID and participant account IDs, if it exists. + GetConversationByThreadAndAccountIDs(ctx context.Context, threadID string, accountID string, otherAccountIDs []string) (*gtsmodel.Conversation, error) + + // GetConversationsByOwnerAccountID gets all conversations owned by the given account, + // with optional paging based on last status ID. + GetConversationsByOwnerAccountID(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Conversation, error) + + // UpsertConversation creates or updates a conversation. + UpsertConversation(ctx context.Context, conversation *gtsmodel.Conversation, columns ...string) error + + // LinkConversationToStatus creates a conversation-to-status link. + LinkConversationToStatus(ctx context.Context, statusID string, conversationID string) error + + // DeleteConversationByID deletes a conversation, removing it from the owning account's conversation list. + DeleteConversationByID(ctx context.Context, id string) error + + // DeleteConversationsByOwnerAccountID deletes all conversations owned by the given account. + DeleteConversationsByOwnerAccountID(ctx context.Context, accountID string) error + + // DeleteStatusFromConversations handles when a status is deleted by updating or deleting conversations for which it was the last status. + DeleteStatusFromConversations(ctx context.Context, statusID string) error +} diff --git a/internal/db/db.go b/internal/db/db.go index a148d778a..4b2152732 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -26,8 +26,10 @@ const ( type DB interface { Account Admin + AdvancedMigration Application Basic + Conversation Domain Emoji HeaderFilter diff --git a/internal/db/status.go b/internal/db/status.go index 88ae12a12..ade900728 100644 --- a/internal/db/status.go +++ b/internal/db/status.go @@ -78,4 +78,16 @@ type Status interface { // GetStatusChildren gets the child statuses of a given status. GetStatusChildren(ctx context.Context, statusID string) ([]*gtsmodel.Status, error) + + // MaxDirectStatusID returns the newest ID across all DM statuses. + // Returns the empty string with no error if there are no DM statuses yet. + // It is used only by the conversation advanced migration. + MaxDirectStatusID(ctx context.Context) (string, error) + + // GetDirectStatusIDsBatch returns up to count DM status IDs strictly greater than minID + // and less than or equal to maxIDInclusive. Note that this is different from most of our paging, + // which uses a maxID and returns IDs strictly less than that, because it's called with the result of + // MaxDirectStatusID, and expects to eventually return the status with that ID. + // It is used only by the conversation advanced migration. + GetDirectStatusIDsBatch(ctx context.Context, minID string, maxIDInclusive string, count int) ([]string, error) } diff --git a/internal/db/test/conversation.go b/internal/db/test/conversation.go new file mode 100644 index 000000000..95713927e --- /dev/null +++ b/internal/db/test/conversation.go @@ -0,0 +1,122 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package test + +import ( + "context" + "crypto/rand" + "time" + + "github.com/oklog/ulid" + "github.com/superseriousbusiness/gotosocial/internal/ap" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +type testSuite interface { + FailNow(string, ...interface{}) bool +} + +// ConversationFactory can be embedded or included by test suites that want to generate statuses and conversations. +type ConversationFactory struct { + // Test suite, or at least the methods from it that we care about. + suite testSuite + // Test DB. + db db.DB + + // TestStart is the timestamp used as a base for timestamps and ULIDs in any given test. + TestStart time.Time +} + +// SetupSuite should be called by the SetupSuite of test suites that use this mixin. +func (f *ConversationFactory) SetupSuite(suite testSuite) { + f.suite = suite +} + +// SetupTest should be called by the SetupTest of test suites that use this mixin. +func (f *ConversationFactory) SetupTest(db db.DB) { + f.db = db + f.TestStart = time.Now() +} + +// NewULID is a version of id.NewULID that uses the test start time and an offset instead of the real time. +func (f *ConversationFactory) NewULID(offset time.Duration) string { + ulid, err := ulid.New( + ulid.Timestamp(f.TestStart.Add(offset)), rand.Reader, + ) + if err != nil { + panic(err) + } + return ulid.String() +} + +func (f *ConversationFactory) NewTestStatus(localAccount *gtsmodel.Account, threadID string, nowOffset time.Duration, inReplyToStatus *gtsmodel.Status) *gtsmodel.Status { + statusID := f.NewULID(nowOffset) + createdAt := f.TestStart.Add(nowOffset) + status := >smodel.Status{ + ID: statusID, + CreatedAt: createdAt, + UpdatedAt: createdAt, + URI: "http://localhost:8080/users/" + localAccount.Username + "/statuses/" + statusID, + AccountID: localAccount.ID, + AccountURI: localAccount.URI, + Local: util.Ptr(true), + ThreadID: threadID, + Visibility: gtsmodel.VisibilityDirect, + ActivityStreamsType: ap.ObjectNote, + Federated: util.Ptr(true), + } + if inReplyToStatus != nil { + status.InReplyToID = inReplyToStatus.ID + status.InReplyToURI = inReplyToStatus.URI + status.InReplyToAccountID = inReplyToStatus.AccountID + } + if err := f.db.PutStatus(context.Background(), status); err != nil { + f.suite.FailNow(err.Error()) + } + return status +} + +// NewTestConversation creates a new status and adds it to a new unread conversation, returning the conversation. +func (f *ConversationFactory) NewTestConversation(localAccount *gtsmodel.Account, nowOffset time.Duration) *gtsmodel.Conversation { + threadID := f.NewULID(nowOffset) + status := f.NewTestStatus(localAccount, threadID, nowOffset, nil) + conversation := >smodel.Conversation{ + ID: f.NewULID(nowOffset), + AccountID: localAccount.ID, + ThreadID: status.ThreadID, + Read: util.Ptr(false), + } + f.SetLastStatus(conversation, status) + return conversation +} + +// SetLastStatus sets an already stored status as the last status of a new or already stored conversation, +// and returns the updated conversation. +func (f *ConversationFactory) SetLastStatus(conversation *gtsmodel.Conversation, status *gtsmodel.Status) *gtsmodel.Conversation { + conversation.LastStatusID = status.ID + conversation.LastStatus = status + if err := f.db.UpsertConversation(context.Background(), conversation, "last_status_id"); err != nil { + f.suite.FailNow(err.Error()) + } + if err := f.db.LinkConversationToStatus(context.Background(), conversation.ID, status.ID); err != nil { + f.suite.FailNow(err.Error()) + } + return conversation +} diff --git a/internal/gtsmodel/advancedmigration.go b/internal/gtsmodel/advancedmigration.go new file mode 100644 index 000000000..d9ce9d543 --- /dev/null +++ b/internal/gtsmodel/advancedmigration.go @@ -0,0 +1,32 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package gtsmodel + +import ( + "time" +) + +// AdvancedMigration stores state for an "advanced migration", which is a migration +// that doesn't fit into the Bun migration framework. +type AdvancedMigration struct { + ID string `bun:",pk,nullzero,notnull,unique"` // id of this migration (preassigned, not a ULID) + CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created + UpdatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item last updated + StateJSON []byte `bun:",nullzero"` // JSON dump of the migration state + Finished *bool `bun:",nullzero,notnull,default:false"` // has this migration finished? +} diff --git a/internal/gtsmodel/conversation.go b/internal/gtsmodel/conversation.go new file mode 100644 index 000000000..f03f27458 --- /dev/null +++ b/internal/gtsmodel/conversation.go @@ -0,0 +1,77 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package gtsmodel + +import ( + "slices" + "strings" + "time" + + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +// Conversation represents direct messages between the owner account and a set of other accounts. +type Conversation struct { + // ID of this item in the database. + ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` + + // When was this item created? + CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` + + // When was this item last updated? + UpdatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` + + // Account that owns the conversation. + AccountID string `bun:"type:CHAR(26),nullzero,notnull,unique:conversations_thread_id_account_id_other_accounts_key_uniq,unique:conversations_account_id_last_status_id_uniq"` + Account *Account `bun:"-"` + + // Other accounts participating in the conversation. + // Doesn't include the owner. May be empty in the case of a DM to yourself. + OtherAccountIDs []string `bun:"other_account_ids,array"` + OtherAccounts []*Account `bun:"-"` + + // Denormalized lookup key derived from unique OtherAccountIDs, sorted and concatenated with commas. + // May be empty in the case of a DM to yourself. + OtherAccountsKey string `bun:",notnull,unique:conversations_thread_id_account_id_other_accounts_key_uniq"` + + // Thread that the conversation is part of. + ThreadID string `bun:"type:CHAR(26),nullzero,notnull,unique:conversations_thread_id_account_id_other_accounts_key_uniq"` + + // ID of the last status in this conversation. + LastStatusID string `bun:"type:CHAR(26),nullzero,notnull,unique:conversations_account_id_last_status_id_uniq"` + LastStatus *Status `bun:"-"` + + // Has the owner read all statuses in this conversation? + Read *bool `bun:",default:false"` +} + +// ConversationOtherAccountsKey creates an OtherAccountsKey from a list of OtherAccountIDs. +func ConversationOtherAccountsKey(otherAccountIDs []string) string { + otherAccountIDs = util.UniqueStrings(otherAccountIDs) + slices.Sort(otherAccountIDs) + return strings.Join(otherAccountIDs, ",") +} + +// ConversationToStatus is an intermediate struct to facilitate the many2many relationship between a conversation and its statuses, +// including but not limited to the last status. These are used only when deleting a status from a conversation. +type ConversationToStatus struct { + ConversationID string `bun:"type:CHAR(26),unique:conversation_to_statuses_conversation_id_status_id_uniq,nullzero,notnull"` + Conversation *Conversation `bun:"rel:belongs-to"` + StatusID string `bun:"type:CHAR(26),unique:conversation_to_statuses_conversation_id_status_id_uniq,nullzero,notnull"` + Status *Status `bun:"rel:belongs-to"` +} diff --git a/internal/processing/account/delete.go b/internal/processing/account/delete.go index 075e94544..702b46cda 100644 --- a/internal/processing/account/delete.go +++ b/internal/processing/account/delete.go @@ -460,6 +460,14 @@ func (p *Processor) deleteAccountPeripheral(ctx context.Context, account *gtsmod // TODO: add status mutes here when they're implemented. + // Delete all conversations owned by given account. + // Conversations in which it has only participated will be retained; + // they can always be deleted by their owners. + if err := p.state.DB.DeleteConversationsByOwnerAccountID(ctx, account.ID); // nocollapse + err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("error deleting conversations owned by account: %w", err) + } + // Delete all poll votes owned by given account. if err := p.state.DB.DeletePollVotesByAccountID(ctx, account.ID); // nocollapse err != nil && !errors.Is(err, db.ErrNoEntries) { diff --git a/internal/processing/advancedmigrations/advancedmigrations.go b/internal/processing/advancedmigrations/advancedmigrations.go new file mode 100644 index 000000000..3f1876539 --- /dev/null +++ b/internal/processing/advancedmigrations/advancedmigrations.go @@ -0,0 +1,48 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package advancedmigrations + +import ( + "context" + "fmt" + + "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" +) + +// Processor holds references to any other processor that has migrations to run. +type Processor struct { + conversations *conversations.Processor +} + +func New( + conversations *conversations.Processor, +) Processor { + return Processor{ + conversations: conversations, + } +} + +// Migrate runs all advanced migrations. +// Errors should be in the same format thrown by other server or testrig startup failures. +func (p *Processor) Migrate(ctx context.Context) error { + if err := p.conversations.MigrateDMsToConversations(ctx); err != nil { + return fmt.Errorf("error running conversations advanced migration: %w", err) + } + + return nil +} diff --git a/internal/processing/conversations/conversations.go b/internal/processing/conversations/conversations.go new file mode 100644 index 000000000..d95740605 --- /dev/null +++ b/internal/processing/conversations/conversations.go @@ -0,0 +1,126 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations + +import ( + "context" + "errors" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" + "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +type Processor struct { + state *state.State + converter *typeutils.Converter + filter *visibility.Filter +} + +func New( + state *state.State, + converter *typeutils.Converter, + filter *visibility.Filter, +) Processor { + return Processor{ + state: state, + converter: converter, + filter: filter, + } +} + +const conversationNotFoundHelpText = "conversation not found" + +// getConversationOwnedBy gets a conversation by ID and checks that it is owned by the given account. +func (p *Processor) getConversationOwnedBy( + ctx context.Context, + id string, + requestingAccount *gtsmodel.Account, +) (*gtsmodel.Conversation, gtserror.WithCode) { + // Get the conversation so that we can check its owning account ID. + conversation, err := p.state.DB.GetConversationByID(ctx, id) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.NewErrorInternalError( + gtserror.Newf( + "DB error getting conversation %s for account %s: %w", + id, + requestingAccount.ID, + err, + ), + ) + } + if conversation == nil { + return nil, gtserror.NewErrorNotFound( + gtserror.Newf( + "conversation %s not found: %w", + id, + err, + ), + conversationNotFoundHelpText, + ) + } + if conversation.AccountID != requestingAccount.ID { + return nil, gtserror.NewErrorNotFound( + gtserror.Newf( + "conversation %s not owned by account %s: %w", + id, + requestingAccount.ID, + err, + ), + conversationNotFoundHelpText, + ) + } + + return conversation, nil +} + +// getFiltersAndMutes gets the given account's filters and compiled mute list. +func (p *Processor) getFiltersAndMutes( + ctx context.Context, + requestingAccount *gtsmodel.Account, +) ([]*gtsmodel.Filter, *usermute.CompiledUserMuteList, gtserror.WithCode) { + filters, err := p.state.DB.GetFiltersForAccountID(ctx, requestingAccount.ID) + if err != nil { + return nil, nil, gtserror.NewErrorInternalError( + gtserror.Newf( + "DB error getting filters for account %s: %w", + requestingAccount.ID, + err, + ), + ) + } + + mutes, err := p.state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAccount.ID, nil) + if err != nil { + return nil, nil, gtserror.NewErrorInternalError( + gtserror.Newf( + "DB error getting mutes for account %s: %w", + requestingAccount.ID, + err, + ), + ) + } + compiledMutes := usermute.NewCompiledUserMuteList(mutes) + + return filters, compiledMutes, nil +} diff --git a/internal/processing/conversations/conversations_test.go b/internal/processing/conversations/conversations_test.go new file mode 100644 index 000000000..cc7ec617e --- /dev/null +++ b/internal/processing/conversations/conversations_test.go @@ -0,0 +1,151 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/db" + dbtest "github.com/superseriousbusiness/gotosocial/internal/db/test" + "github.com/superseriousbusiness/gotosocial/internal/email" + "github.com/superseriousbusiness/gotosocial/internal/federation" + "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/media" + "github.com/superseriousbusiness/gotosocial/internal/messages" + "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/storage" + "github.com/superseriousbusiness/gotosocial/internal/transport" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type ConversationsTestSuite struct { + // standard suite interfaces + suite.Suite + db db.DB + tc *typeutils.Converter + storage *storage.Driver + state state.State + mediaManager *media.Manager + transportController transport.Controller + federator *federation.Federator + emailSender email.Sender + sentEmails map[string]string + filter *visibility.Filter + + // standard suite models + testTokens map[string]*gtsmodel.Token + testClients map[string]*gtsmodel.Client + testApplications map[string]*gtsmodel.Application + testUsers map[string]*gtsmodel.User + testAccounts map[string]*gtsmodel.Account + testFollows map[string]*gtsmodel.Follow + testAttachments map[string]*gtsmodel.MediaAttachment + testStatuses map[string]*gtsmodel.Status + + // module being tested + conversationsProcessor conversations.Processor + + // Owner of test conversations + testAccount *gtsmodel.Account + + // Mixin for conversation tests + dbtest.ConversationFactory +} + +func (suite *ConversationsTestSuite) getClientMsg(timeout time.Duration) (*messages.FromClientAPI, bool) { + ctx := context.Background() + ctx, cncl := context.WithTimeout(ctx, timeout) + defer cncl() + return suite.state.Workers.Client.Queue.PopCtx(ctx) +} + +func (suite *ConversationsTestSuite) SetupSuite() { + suite.testTokens = testrig.NewTestTokens() + suite.testClients = testrig.NewTestClients() + suite.testApplications = testrig.NewTestApplications() + suite.testUsers = testrig.NewTestUsers() + suite.testAccounts = testrig.NewTestAccounts() + suite.testFollows = testrig.NewTestFollows() + suite.testAttachments = testrig.NewTestAttachments() + suite.testStatuses = testrig.NewTestStatuses() + + suite.ConversationFactory.SetupSuite(suite) +} + +func (suite *ConversationsTestSuite) SetupTest() { + suite.state.Caches.Init() + testrig.StartNoopWorkers(&suite.state) + + testrig.InitTestConfig() + testrig.InitTestLog() + + suite.db = testrig.NewTestDB(&suite.state) + suite.state.DB = suite.db + suite.tc = typeutils.NewConverter(&suite.state) + suite.filter = visibility.NewFilter(&suite.state) + + testrig.StartTimelines( + &suite.state, + suite.filter, + suite.tc, + ) + + suite.storage = testrig.NewInMemoryStorage() + suite.state.Storage = suite.storage + suite.mediaManager = testrig.NewTestMediaManager(&suite.state) + + suite.transportController = testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../testrig/media")) + suite.federator = testrig.NewTestFederator(&suite.state, suite.transportController, suite.mediaManager) + suite.sentEmails = make(map[string]string) + suite.emailSender = testrig.NewEmailSender("../../../web/template/", suite.sentEmails) + + suite.conversationsProcessor = conversations.New(&suite.state, suite.tc, suite.filter) + testrig.StandardDBSetup(suite.db, nil) + testrig.StandardStorageSetup(suite.storage, "../../../testrig/media") + + suite.ConversationFactory.SetupTest(suite.db) + + suite.testAccount = suite.testAccounts["local_account_1"] +} + +func (suite *ConversationsTestSuite) TearDownTest() { + conversationModels := []interface{}{ + (*gtsmodel.Conversation)(nil), + (*gtsmodel.ConversationToStatus)(nil), + } + for _, model := range conversationModels { + if err := suite.db.DropTable(context.Background(), model); err != nil { + log.Error(context.Background(), err) + } + } + + testrig.StandardDBTeardown(suite.db) + testrig.StandardStorageTeardown(suite.storage) + testrig.StopWorkers(&suite.state) +} + +func TestConversationsTestSuite(t *testing.T) { + suite.Run(t, new(ConversationsTestSuite)) +} diff --git a/internal/processing/conversations/delete.go b/internal/processing/conversations/delete.go new file mode 100644 index 000000000..5cbdd00a5 --- /dev/null +++ b/internal/processing/conversations/delete.go @@ -0,0 +1,45 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +func (p *Processor) Delete( + ctx context.Context, + requestingAccount *gtsmodel.Account, + id string, +) gtserror.WithCode { + // Get the conversation so that we can check its owning account ID. + conversation, errWithCode := p.getConversationOwnedBy(gtscontext.SetBarebones(ctx), id, requestingAccount) + if errWithCode != nil { + return errWithCode + } + + // Delete the conversation. + if err := p.state.DB.DeleteConversationByID(ctx, conversation.ID); err != nil { + return gtserror.NewErrorInternalError(err) + } + + return nil +} diff --git a/internal/processing/conversations/delete_test.go b/internal/processing/conversations/delete_test.go new file mode 100644 index 000000000..23b4f1c1a --- /dev/null +++ b/internal/processing/conversations/delete_test.go @@ -0,0 +1,27 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations_test + +import "context" + +func (suite *ConversationsTestSuite) TestDelete() { + conversation := suite.NewTestConversation(suite.testAccount, 0) + + err := suite.conversationsProcessor.Delete(context.Background(), suite.testAccount, conversation.ID) + suite.NoError(err) +} diff --git a/internal/processing/conversations/get.go b/internal/processing/conversations/get.go new file mode 100644 index 000000000..0c7832cae --- /dev/null +++ b/internal/processing/conversations/get.go @@ -0,0 +1,101 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations + +import ( + "context" + "errors" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/paging" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +// GetAll returns conversations owned by the given account. +// The additional parameters can be used for paging. +func (p *Processor) GetAll( + ctx context.Context, + requestingAccount *gtsmodel.Account, + page *paging.Page, +) (*apimodel.PageableResponse, gtserror.WithCode) { + conversations, err := p.state.DB.GetConversationsByOwnerAccountID( + ctx, + requestingAccount.ID, + page, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.NewErrorInternalError( + gtserror.Newf( + "DB error getting conversations for account %s: %w", + requestingAccount.ID, + err, + ), + ) + } + + // Check for empty response. + count := len(conversations) + if len(conversations) == 0 { + return util.EmptyPageableResponse(), nil + } + + // Get the lowest and highest last status ID values, used for paging. + lo := conversations[count-1].LastStatusID + hi := conversations[0].LastStatusID + + items := make([]interface{}, 0, count) + + filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount) + if errWithCode != nil { + return nil, errWithCode + } + + for _, conversation := range conversations { + // Convert conversation to frontend API model. + apiConversation, err := p.converter.ConversationToAPIConversation( + ctx, + conversation, + requestingAccount, + filters, + mutes, + ) + if err != nil { + log.Errorf( + ctx, + "error converting conversation %s to API representation: %v", + conversation.ID, + err, + ) + continue + } + + // Append conversation to return items. + items = append(items, apiConversation) + } + + return paging.PackageResponse(paging.ResponseParams{ + Items: items, + Path: "/api/v1/conversations", + Next: page.Next(lo, hi), + Prev: page.Prev(lo, hi), + }), nil +} diff --git a/internal/processing/conversations/get_test.go b/internal/processing/conversations/get_test.go new file mode 100644 index 000000000..7b3d60749 --- /dev/null +++ b/internal/processing/conversations/get_test.go @@ -0,0 +1,65 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations_test + +import ( + "context" + "time" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" +) + +func (suite *ConversationsTestSuite) TestGetAll() { + conversation := suite.NewTestConversation(suite.testAccount, 0) + + resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil) + if suite.NoError(err) && suite.Len(resp.Items, 1) && suite.IsType((*apimodel.Conversation)(nil), resp.Items[0]) { + apiConversation := resp.Items[0].(*apimodel.Conversation) + suite.Equal(conversation.ID, apiConversation.ID) + suite.True(apiConversation.Unread) + } +} + +// Test that conversations with newer last status IDs are returned earlier. +func (suite *ConversationsTestSuite) TestGetAllOrder() { + // Create a new conversation. + conversation1 := suite.NewTestConversation(suite.testAccount, 0) + + // Create another new conversation with a last status newer than conversation1's. + conversation2 := suite.NewTestConversation(suite.testAccount, 1*time.Second) + + // Add an even newer status than that to conversation1. + conversation1Status2 := suite.NewTestStatus(suite.testAccount, conversation1.LastStatus.ThreadID, 2*time.Second, conversation1.LastStatus) + conversation1.LastStatusID = conversation1Status2.ID + if err := suite.db.UpsertConversation(context.Background(), conversation1, "last_status_id"); err != nil { + suite.FailNow(err.Error()) + } + + resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil) + if suite.NoError(err) && suite.Len(resp.Items, 2) { + // conversation1 should be the first conversation returned. + apiConversation1 := resp.Items[0].(*apimodel.Conversation) + suite.Equal(conversation1.ID, apiConversation1.ID) + // It should have the newest status added to it. + suite.Equal(conversation1.LastStatusID, conversation1Status2.ID) + + // conversation2 should be the second conversation returned. + apiConversation2 := resp.Items[1].(*apimodel.Conversation) + suite.Equal(conversation2.ID, apiConversation2.ID) + } +} diff --git a/internal/processing/conversations/migrate.go b/internal/processing/conversations/migrate.go new file mode 100644 index 000000000..959ffcca4 --- /dev/null +++ b/internal/processing/conversations/migrate.go @@ -0,0 +1,131 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations + +import ( + "context" + "encoding/json" + "errors" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +const advancedMigrationID = "20240611190733_add_conversations" +const statusBatchSize = 100 + +type AdvancedMigrationState struct { + MinID string + MaxIDInclusive string +} + +func (p *Processor) MigrateDMsToConversations(ctx context.Context) error { + advancedMigration, err := p.state.DB.GetAdvancedMigration(ctx, advancedMigrationID) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("couldn't get advanced migration with ID %s: %w", advancedMigrationID, err) + } + state := AdvancedMigrationState{} + if advancedMigration != nil { + // There was a previous migration. + if *advancedMigration.Finished { + // This migration has already been run to completion; we don't need to run it again. + return nil + } + // Otherwise, pick up where we left off. + if err := json.Unmarshal(advancedMigration.StateJSON, &state); err != nil { + // This should never happen. + return gtserror.Newf("couldn't deserialize advanced migration state from JSON: %w", err) + } + } else { + // Start at the beginning. + state.MinID = id.Lowest + + // Find the max ID of all existing statuses. + // This will be the last one we migrate; + // newer ones will be handled by the normal conversation flow. + state.MaxIDInclusive, err = p.state.DB.MaxDirectStatusID(ctx) + if err != nil { + return gtserror.Newf("couldn't get max DM status ID for migration: %w", err) + } + + // Save a new advanced migration record. + advancedMigration = >smodel.AdvancedMigration{ + ID: advancedMigrationID, + Finished: util.Ptr(false), + } + if advancedMigration.StateJSON, err = json.Marshal(state); err != nil { + // This should never happen. + return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err) + } + if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil { + return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err) + } + } + + log.Info(ctx, "migrating DMs to conversations…") + + // In batches, get all statuses up to and including the max ID, + // and update conversations for each in order. + for { + // Get status IDs for this batch. + statusIDs, err := p.state.DB.GetDirectStatusIDsBatch(ctx, state.MinID, state.MaxIDInclusive, statusBatchSize) + if err != nil { + return gtserror.Newf("couldn't get DM status ID batch for migration: %w", err) + } + if len(statusIDs) == 0 { + break + } + log.Infof(ctx, "migrating %d DMs starting after %s", len(statusIDs), state.MinID) + + // Load the batch by IDs. + statuses, err := p.state.DB.GetStatusesByIDs(ctx, statusIDs) + if err != nil { + return gtserror.Newf("couldn't get DM statuses for migration: %w", err) + } + + // Update conversations for each status. Don't generate notifications. + for _, status := range statuses { + if _, err := p.UpdateConversationsForStatus(ctx, status); err != nil { + return gtserror.Newf("couldn't update conversations for status %s during migration: %w", status.ID, err) + } + } + + // Save the migration state with the new min ID. + state.MinID = statusIDs[len(statusIDs)-1] + if advancedMigration.StateJSON, err = json.Marshal(state); err != nil { + // This should never happen. + return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err) + } + if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil { + return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err) + } + } + + // Mark the migration as finished. + advancedMigration.Finished = util.Ptr(true) + if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil { + return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err) + } + + log.Info(ctx, "finished migrating DMs to conversations.") + return nil +} diff --git a/internal/processing/conversations/migrate_test.go b/internal/processing/conversations/migrate_test.go new file mode 100644 index 000000000..b625e59ba --- /dev/null +++ b/internal/processing/conversations/migrate_test.go @@ -0,0 +1,85 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations_test + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/db/bundb" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +// Test that we can migrate DMs to conversations. +// This test assumes that we're using the standard test fixtures, which contain some conversation-eligible DMs. +func (suite *ConversationsTestSuite) TestMigrateDMsToConversations() { + advancedMigrationID := "20240611190733_add_conversations" + ctx := context.Background() + rawDB := (suite.db).(*bundb.DBService).DB() + + // Precondition: we shouldn't have any conversations yet. + numConversations := 0 + if err := rawDB.NewSelect(). + Model((*gtsmodel.Conversation)(nil)). + ColumnExpr("COUNT(*)"). + Scan(ctx, &numConversations); // nocollapse + err != nil { + suite.FailNow(err.Error()) + } + suite.Zero(numConversations) + + // Precondition: there is no record of the conversations advanced migration. + _, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID) + suite.ErrorIs(err, db.ErrNoEntries) + + // Run the migration, which should not fail. + if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil { + suite.FailNow(err.Error()) + } + + // We should now have some conversations. + if err := rawDB.NewSelect(). + Model((*gtsmodel.Conversation)(nil)). + ColumnExpr("COUNT(*)"). + Scan(ctx, &numConversations); // nocollapse + err != nil { + suite.FailNow(err.Error()) + } + suite.NotZero(numConversations) + + // The advanced migration should now be marked as finished. + advancedMigration, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID) + if err != nil { + suite.FailNow(err.Error()) + } + if suite.NotNil(advancedMigration) && suite.NotNil(advancedMigration.Finished) { + suite.True(*advancedMigration.Finished) + } + + // Run the migration again, which should not fail. + if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil { + suite.FailNow(err.Error()) + } + + // However, it shouldn't have done anything, so the advanced migration should not have been updated. + advancedMigration2, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID) + if err != nil { + suite.FailNow(err.Error()) + } + suite.Equal(advancedMigration.UpdatedAt, advancedMigration2.UpdatedAt) +} diff --git a/internal/processing/conversations/read.go b/internal/processing/conversations/read.go new file mode 100644 index 000000000..512a004a3 --- /dev/null +++ b/internal/processing/conversations/read.go @@ -0,0 +1,65 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations + +import ( + "context" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +func (p *Processor) Read( + ctx context.Context, + requestingAccount *gtsmodel.Account, + id string, +) (*apimodel.Conversation, gtserror.WithCode) { + // Get the conversation, including participating accounts and last status. + conversation, errWithCode := p.getConversationOwnedBy(ctx, id, requestingAccount) + if errWithCode != nil { + return nil, errWithCode + } + + // Mark the conversation as read. + conversation.Read = util.Ptr(true) + if err := p.state.DB.UpsertConversation(ctx, conversation, "read"); err != nil { + err = gtserror.Newf("DB error updating conversation %s: %w", id, err) + return nil, gtserror.NewErrorInternalError(err) + } + + filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount) + if errWithCode != nil { + return nil, errWithCode + } + + apiConversation, err := p.converter.ConversationToAPIConversation( + ctx, + conversation, + requestingAccount, + filters, + mutes, + ) + if err != nil { + err = gtserror.Newf("error converting conversation %s to API representation: %w", id, err) + return nil, gtserror.NewErrorInternalError(err) + } + + return apiConversation, nil +} diff --git a/internal/processing/conversations/read_test.go b/internal/processing/conversations/read_test.go new file mode 100644 index 000000000..ebd8f7fe5 --- /dev/null +++ b/internal/processing/conversations/read_test.go @@ -0,0 +1,34 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations_test + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +func (suite *ConversationsTestSuite) TestRead() { + conversation := suite.NewTestConversation(suite.testAccount, 0) + + suite.False(util.PtrOrValue(conversation.Read, false)) + apiConversation, err := suite.conversationsProcessor.Read(context.Background(), suite.testAccount, conversation.ID) + if suite.NoError(err) { + suite.False(apiConversation.Unread) + } +} diff --git a/internal/processing/conversations/update.go b/internal/processing/conversations/update.go new file mode 100644 index 000000000..7445994ae --- /dev/null +++ b/internal/processing/conversations/update.go @@ -0,0 +1,242 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations + +import ( + "context" + "errors" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +// ConversationNotification carries the arguments to processing/stream.Processor.Conversation. +type ConversationNotification struct { + // AccountID of a local account to deliver the notification to. + AccountID string + // Conversation as the notification payload. + Conversation *apimodel.Conversation +} + +// UpdateConversationsForStatus updates all conversations related to a status, +// and returns a map from local account IDs to conversation notifications that should be sent to them. +func (p *Processor) UpdateConversationsForStatus(ctx context.Context, status *gtsmodel.Status) ([]ConversationNotification, error) { + if status.Visibility != gtsmodel.VisibilityDirect { + // Only DMs are considered part of conversations. + return nil, nil + } + if status.BoostOfID != "" { + // Boosts can't be part of conversations. + // FUTURE: This may change if we ever implement quote posts. + return nil, nil + } + if status.ThreadID == "" { + // If the status doesn't have a thread ID, it didn't mention a local account, + // and thus can't be part of a conversation. + return nil, nil + } + + // We need accounts to be populated for this. + if err := p.state.DB.PopulateStatus(ctx, status); err != nil { + return nil, gtserror.Newf("DB error populating status %s: %w", status.ID, err) + } + + // The account which authored the status plus all mentioned accounts. + allParticipantsSet := make(map[string]*gtsmodel.Account, 1+len(status.Mentions)) + allParticipantsSet[status.AccountID] = status.Account + for _, mention := range status.Mentions { + allParticipantsSet[mention.TargetAccountID] = mention.TargetAccount + } + + // Create or update conversations for and send notifications to each local participant. + notifications := make([]ConversationNotification, 0, len(allParticipantsSet)) + for _, participant := range allParticipantsSet { + if participant.IsRemote() { + continue + } + localAccount := participant + + // If the status is not visible to this account, skip processing it for this account. + visible, err := p.filter.StatusVisible(ctx, localAccount, status) + if err != nil { + log.Errorf( + ctx, + "error checking status %s visibility for account %s: %v", + status.ID, + localAccount.ID, + err, + ) + continue + } else if !visible { + continue + } + + // Is the status filtered or muted for this user? + // Converting the status to an API status runs the filter/mute checks. + filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, localAccount) + if errWithCode != nil { + log.Error(ctx, errWithCode) + continue + } + _, err = p.converter.StatusToAPIStatus( + ctx, + status, + localAccount, + statusfilter.FilterContextNotifications, + filters, + mutes, + ) + if err != nil { + // If the status matched a hide filter, skip processing it for this account. + // If there was another kind of error, log that and skip it anyway. + if !errors.Is(err, statusfilter.ErrHideStatus) { + log.Errorf( + ctx, + "error checking status %s filtering/muting for account %s: %v", + status.ID, + localAccount.ID, + err, + ) + } + continue + } + + // Collect other accounts participating in the conversation. + otherAccounts := make([]*gtsmodel.Account, 0, len(allParticipantsSet)-1) + otherAccountIDs := make([]string, 0, len(allParticipantsSet)-1) + for accountID, account := range allParticipantsSet { + if accountID != localAccount.ID { + otherAccounts = append(otherAccounts, account) + otherAccountIDs = append(otherAccountIDs, accountID) + } + } + + // Check for a previously existing conversation, if there is one. + conversation, err := p.state.DB.GetConversationByThreadAndAccountIDs( + ctx, + status.ThreadID, + localAccount.ID, + otherAccountIDs, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + log.Errorf( + ctx, + "error trying to find a previous conversation for status %s and account %s: %v", + status.ID, + localAccount.ID, + err, + ) + continue + } + + if conversation == nil { + // Create a new conversation. + conversation = >smodel.Conversation{ + ID: id.NewULID(), + AccountID: localAccount.ID, + OtherAccountIDs: otherAccountIDs, + OtherAccounts: otherAccounts, + OtherAccountsKey: gtsmodel.ConversationOtherAccountsKey(otherAccountIDs), + ThreadID: status.ThreadID, + Read: util.Ptr(true), + } + } + + // Assume that if the conversation owner posted the status, they've already read it. + statusAuthoredByConversationOwner := status.AccountID == conversation.AccountID + + // Update the conversation. + // If there is no previous last status or this one is more recently created, set it as the last status. + if conversation.LastStatus == nil || conversation.LastStatus.CreatedAt.Before(status.CreatedAt) { + conversation.LastStatusID = status.ID + conversation.LastStatus = status + } + // If the conversation is unread, leave it marked as unread. + // If the conversation is read but this status might not have been, mark the conversation as unread. + if !statusAuthoredByConversationOwner { + conversation.Read = util.Ptr(false) + } + + // Create or update the conversation. + err = p.state.DB.UpsertConversation(ctx, conversation) + if err != nil { + log.Errorf( + ctx, + "error creating or updating conversation %s for status %s and account %s: %v", + conversation.ID, + status.ID, + localAccount.ID, + err, + ) + continue + } + + // Link the conversation to the status. + if err := p.state.DB.LinkConversationToStatus(ctx, conversation.ID, status.ID); err != nil { + log.Errorf( + ctx, + "error linking conversation %s to status %s: %v", + conversation.ID, + status.ID, + err, + ) + continue + } + + // Convert the conversation to API representation. + apiConversation, err := p.converter.ConversationToAPIConversation( + ctx, + conversation, + localAccount, + filters, + mutes, + ) + if err != nil { + // If the conversation's last status matched a hide filter, skip it. + // If there was another kind of error, log that and skip it anyway. + if !errors.Is(err, statusfilter.ErrHideStatus) { + log.Errorf( + ctx, + "error converting conversation %s to API representation for account %s: %v", + status.ID, + localAccount.ID, + err, + ) + } + continue + } + + // Generate a notification, + // unless the status was authored by the user who would be notified, + // in which case they already know. + if status.AccountID != localAccount.ID { + notifications = append(notifications, ConversationNotification{ + AccountID: localAccount.ID, + Conversation: apiConversation, + }) + } + } + + return notifications, nil +} diff --git a/internal/processing/conversations/update_test.go b/internal/processing/conversations/update_test.go new file mode 100644 index 000000000..8ba2800fe --- /dev/null +++ b/internal/processing/conversations/update_test.go @@ -0,0 +1,54 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package conversations_test + +import ( + "context" +) + +// Test that we can create conversations when a new status comes in. +func (suite *ConversationsTestSuite) TestUpdateConversationsForStatus() { + ctx := context.Background() + + // Precondition: the test user shouldn't have any conversations yet. + conversations, err := suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil) + if err != nil { + suite.FailNow(err.Error()) + } + suite.Empty(conversations) + + // Create a status. + threadID := suite.NewULID(0) + status := suite.NewTestStatus(suite.testAccount, threadID, 0, nil) + + // Update conversations for it. + notifications, err := suite.conversationsProcessor.UpdateConversationsForStatus(ctx, status) + if err != nil { + suite.FailNow(err.Error()) + } + + // In this test, the user is DMing themself, and should not receive a notification from that. + suite.Empty(notifications) + + // The test user should have a conversation now. + conversations, err = suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil) + if err != nil { + suite.FailNow(err.Error()) + } + suite.NotEmpty(conversations) +} diff --git a/internal/processing/processor.go b/internal/processing/processor.go index fb6b05d80..a07df76e1 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -27,7 +27,9 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/oauth" "github.com/superseriousbusiness/gotosocial/internal/processing/account" "github.com/superseriousbusiness/gotosocial/internal/processing/admin" + "github.com/superseriousbusiness/gotosocial/internal/processing/advancedmigrations" "github.com/superseriousbusiness/gotosocial/internal/processing/common" + "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" "github.com/superseriousbusiness/gotosocial/internal/processing/fedi" filtersv1 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v1" filtersv2 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v2" @@ -70,22 +72,24 @@ type Processor struct { SUB-PROCESSORS */ - account account.Processor - admin admin.Processor - fedi fedi.Processor - filtersv1 filtersv1.Processor - filtersv2 filtersv2.Processor - list list.Processor - markers markers.Processor - media media.Processor - polls polls.Processor - report report.Processor - search search.Processor - status status.Processor - stream stream.Processor - timeline timeline.Processor - user user.Processor - workers workers.Processor + account account.Processor + admin admin.Processor + advancedmigrations advancedmigrations.Processor + conversations conversations.Processor + fedi fedi.Processor + filtersv1 filtersv1.Processor + filtersv2 filtersv2.Processor + list list.Processor + markers markers.Processor + media media.Processor + polls polls.Processor + report report.Processor + search search.Processor + status status.Processor + stream stream.Processor + timeline timeline.Processor + user user.Processor + workers workers.Processor } func (p *Processor) Account() *account.Processor { @@ -96,6 +100,14 @@ func (p *Processor) Admin() *admin.Processor { return &p.admin } +func (p *Processor) AdvancedMigrations() *advancedmigrations.Processor { + return &p.advancedmigrations +} + +func (p *Processor) Conversations() *conversations.Processor { + return &p.conversations +} + func (p *Processor) Fedi() *fedi.Processor { return &p.fedi } @@ -188,6 +200,7 @@ func NewProcessor( // processors + pin them to this struct. processor.account = account.New(&common, state, converter, mediaManager, federator, filter, parseMentionFunc) processor.admin = admin.New(&common, state, cleaner, federator, converter, mediaManager, federator.TransportController(), emailSender) + processor.conversations = conversations.New(state, converter, filter) processor.fedi = fedi.New(state, &common, converter, federator, filter) processor.filtersv1 = filtersv1.New(state, converter, &processor.stream) processor.filtersv2 = filtersv2.New(state, converter, &processor.stream) @@ -200,6 +213,9 @@ func NewProcessor( processor.status = status.New(state, &common, &processor.polls, federator, converter, filter, parseMentionFunc) processor.user = user.New(state, converter, oauthServer, emailSender) + // The advanced migrations processor sequences advanced migrations from all other processors. + processor.advancedmigrations = advancedmigrations.New(&processor.conversations) + // Workers processor handles asynchronous // worker jobs; instantiate it separately // and pass subset of sub processors it needs. @@ -212,6 +228,7 @@ func NewProcessor( &processor.account, &processor.media, &processor.stream, + &processor.conversations, ) return processor diff --git a/internal/processing/stream/conversation.go b/internal/processing/stream/conversation.go new file mode 100644 index 000000000..a0236c459 --- /dev/null +++ b/internal/processing/stream/conversation.go @@ -0,0 +1,44 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package stream + +import ( + "context" + "encoding/json" + + "codeberg.org/gruf/go-byteutil" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/stream" +) + +// Conversation streams the given conversation to any open, appropriate streams belonging to the given account. +func (p *Processor) Conversation(ctx context.Context, accountID string, conversation *apimodel.Conversation) { + b, err := json.Marshal(conversation) + if err != nil { + log.Errorf(ctx, "error marshaling json: %v", err) + return + } + p.streams.Post(ctx, accountID, stream.Message{ + Payload: byteutil.B2S(b), + Event: stream.EventTypeConversation, + Stream: []string{ + stream.TimelineDirect, + }, + }) +} diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go index 49a68d27a..35c2c31b7 100644 --- a/internal/processing/workers/fromclientapi_test.go +++ b/internal/processing/workers/fromclientapi_test.go @@ -50,6 +50,8 @@ func (suite *FromClientAPITestSuite) newStatus( visibility gtsmodel.Visibility, replyToStatus *gtsmodel.Status, boostOfStatus *gtsmodel.Status, + mentionedAccounts []*gtsmodel.Account, + createThread bool, ) *gtsmodel.Status { var ( protocol = config.GetProtocol() @@ -102,6 +104,39 @@ func (suite *FromClientAPITestSuite) newStatus( newStatus.Visibility = boostOfStatus.Visibility } + for _, mentionedAccount := range mentionedAccounts { + newMention := >smodel.Mention{ + ID: id.NewULID(), + StatusID: newStatus.ID, + Status: newStatus, + OriginAccountID: account.ID, + OriginAccountURI: account.URI, + OriginAccount: account, + TargetAccountID: mentionedAccount.ID, + TargetAccount: mentionedAccount, + Silent: util.Ptr(false), + } + + newStatus.Mentions = append(newStatus.Mentions, newMention) + newStatus.MentionIDs = append(newStatus.MentionIDs, newMention.ID) + + if err := state.DB.PutMention(ctx, newMention); err != nil { + suite.FailNow(err.Error()) + } + } + + if createThread { + newThread := >smodel.Thread{ + ID: id.NewULID(), + } + + newStatus.ThreadID = newThread.ID + + if err := state.DB.PutThread(ctx, newThread); err != nil { + suite.FailNow(err.Error()) + } + } + // Put the status in the db, to mimic what would // have already happened earlier up the flow. if err := state.DB.PutStatus(ctx, newStatus); err != nil { @@ -168,6 +203,31 @@ func (suite *FromClientAPITestSuite) statusJSON( return string(statusJSON) } +func (suite *FromClientAPITestSuite) conversationJSON( + ctx context.Context, + typeConverter *typeutils.Converter, + conversation *gtsmodel.Conversation, + requestingAccount *gtsmodel.Account, +) string { + apiConversation, err := typeConverter.ConversationToAPIConversation( + ctx, + conversation, + requestingAccount, + nil, + nil, + ) + if err != nil { + suite.FailNow(err.Error()) + } + + conversationJSON, err := json.Marshal(apiConversation) + if err != nil { + suite.FailNow(err.Error()) + } + + return string(conversationJSON) +} + func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { testStructs := suite.SetupTestStructs() defer suite.TearDownTestStructs(testStructs) @@ -194,6 +254,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { gtsmodel.VisibilityPublic, nil, nil, + nil, + false, ) ) @@ -303,6 +365,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() { gtsmodel.VisibilityPublic, suite.testStatuses["local_account_2_status_1"], nil, + nil, + false, ) ) @@ -362,6 +426,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() { gtsmodel.VisibilityPublic, suite.testStatuses["local_account_1_status_1"], nil, + nil, + false, ) threadMute = >smodel.ThreadMute{ ID: "01HD3KRMBB1M85QRWHD912QWRE", @@ -420,6 +486,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() { gtsmodel.VisibilityPublic, nil, suite.testStatuses["local_account_1_status_1"], + nil, + false, ) threadMute = >smodel.ThreadMute{ ID: "01HD3KRMBB1M85QRWHD912QWRE", @@ -483,6 +551,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis gtsmodel.VisibilityPublic, suite.testStatuses["local_account_2_status_1"], nil, + nil, + false, ) ) @@ -556,6 +626,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis gtsmodel.VisibilityPublic, suite.testStatuses["local_account_2_status_1"], nil, + nil, + false, ) ) @@ -634,6 +706,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli gtsmodel.VisibilityPublic, suite.testStatuses["local_account_2_status_1"], nil, + nil, + false, ) ) @@ -704,6 +778,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() { gtsmodel.VisibilityPublic, nil, suite.testStatuses["local_account_2_status_1"], + nil, + false, ) ) @@ -765,6 +841,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() { gtsmodel.VisibilityPublic, nil, suite.testStatuses["local_account_2_status_1"], + nil, + false, ) ) @@ -807,6 +885,159 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() { ) } +// A DM to a local user should create a conversation and accompanying notification. +func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversation() { + testStructs := suite.SetupTestStructs() + defer suite.TearDownTestStructs(testStructs) + + var ( + ctx = context.Background() + postingAccount = suite.testAccounts["local_account_2"] + receivingAccount = suite.testAccounts["local_account_1"] + streams = suite.openStreams(ctx, + testStructs.Processor, + receivingAccount, + nil, + ) + homeStream = streams[stream.TimelineHome] + directStream = streams[stream.TimelineDirect] + + // turtle posts a new top-level DM mentioning zork. + status = suite.newStatus( + ctx, + testStructs.State, + postingAccount, + gtsmodel.VisibilityDirect, + nil, + nil, + []*gtsmodel.Account{receivingAccount}, + true, + ) + ) + + // Process the new status. + if err := testStructs.Processor.Workers().ProcessFromClientAPI( + ctx, + &messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + GTSModel: status, + Origin: postingAccount, + }, + ); err != nil { + suite.FailNow(err.Error()) + } + + // Locate the conversation which should now exist for zork. + conversation, err := testStructs.State.DB.GetConversationByThreadAndAccountIDs( + ctx, + status.ThreadID, + receivingAccount.ID, + []string{postingAccount.ID}, + ) + if err != nil { + suite.FailNow(err.Error()) + } + + // Check status in home stream. + suite.checkStreamed( + homeStream, + true, + "", + stream.EventTypeUpdate, + ) + + // Check mention notification in home stream. + suite.checkStreamed( + homeStream, + true, + "", + stream.EventTypeNotification, + ) + + // Check conversation in direct stream. + conversationJSON := suite.conversationJSON( + ctx, + testStructs.TypeConverter, + conversation, + receivingAccount, + ) + suite.checkStreamed( + directStream, + true, + conversationJSON, + stream.EventTypeConversation, + ) +} + +// A public message to a local user should not result in a conversation notification. +func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreateConversation() { + testStructs := suite.SetupTestStructs() + defer suite.TearDownTestStructs(testStructs) + + var ( + ctx = context.Background() + postingAccount = suite.testAccounts["local_account_2"] + receivingAccount = suite.testAccounts["local_account_1"] + streams = suite.openStreams(ctx, + testStructs.Processor, + receivingAccount, + nil, + ) + homeStream = streams[stream.TimelineHome] + directStream = streams[stream.TimelineDirect] + + // turtle posts a new top-level public message mentioning zork. + status = suite.newStatus( + ctx, + testStructs.State, + postingAccount, + gtsmodel.VisibilityPublic, + nil, + nil, + []*gtsmodel.Account{receivingAccount}, + true, + ) + ) + + // Process the new status. + if err := testStructs.Processor.Workers().ProcessFromClientAPI( + ctx, + &messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + GTSModel: status, + Origin: postingAccount, + }, + ); err != nil { + suite.FailNow(err.Error()) + } + + // Check status in home stream. + suite.checkStreamed( + homeStream, + true, + "", + stream.EventTypeUpdate, + ) + + // Check mention notification in home stream. + suite.checkStreamed( + homeStream, + true, + "", + stream.EventTypeNotification, + ) + + // Check for absence of conversation notification in direct stream. + suite.checkStreamed( + directStream, + false, + "", + "", + ) +} + func (suite *FromClientAPITestSuite) TestProcessStatusDelete() { testStructs := suite.SetupTestStructs() defer suite.TearDownTestStructs(testStructs) diff --git a/internal/processing/workers/surface.go b/internal/processing/workers/surface.go index 5ec905ae8..1a7dbbfe5 100644 --- a/internal/processing/workers/surface.go +++ b/internal/processing/workers/surface.go @@ -20,6 +20,7 @@ package workers import ( "github.com/superseriousbusiness/gotosocial/internal/email" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" + "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" "github.com/superseriousbusiness/gotosocial/internal/processing/stream" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" @@ -32,9 +33,10 @@ import ( // - sending a notification to a user // - sending an email type Surface struct { - State *state.State - Converter *typeutils.Converter - Stream *stream.Processor - Filter *visibility.Filter - EmailSender email.Sender + State *state.State + Converter *typeutils.Converter + Stream *stream.Processor + Filter *visibility.Filter + EmailSender email.Sender + Conversations *conversations.Processor } diff --git a/internal/processing/workers/surfacenotify_test.go b/internal/processing/workers/surfacenotify_test.go index 18d0277ae..937ddeca2 100644 --- a/internal/processing/workers/surfacenotify_test.go +++ b/internal/processing/workers/surfacenotify_test.go @@ -39,11 +39,12 @@ func (suite *SurfaceNotifyTestSuite) TestSpamNotifs() { defer suite.TearDownTestStructs(testStructs) surface := &workers.Surface{ - State: testStructs.State, - Converter: testStructs.TypeConverter, - Stream: testStructs.Processor.Stream(), - Filter: visibility.NewFilter(testStructs.State), - EmailSender: testStructs.EmailSender, + State: testStructs.State, + Converter: testStructs.TypeConverter, + Stream: testStructs.Processor.Stream(), + Filter: visibility.NewFilter(testStructs.State), + EmailSender: testStructs.EmailSender, + Conversations: testStructs.Processor.Conversations(), } var ( diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go index 41d7f6f2a..8ac8293ed 100644 --- a/internal/processing/workers/surfacetimeline.go +++ b/internal/processing/workers/surfacetimeline.go @@ -36,8 +36,8 @@ import ( // and LIST timelines of accounts that follow the status author. // // It will also handle notifications for any mentions attached to -// the account, and notifications for any local accounts that want -// to know when this account posts. +// the account, notifications for any local accounts that want +// to know when this account posts, and conversations containing the status. func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error { // Ensure status fully populated; including account, mentions, etc. if err := s.State.DB.PopulateStatus(ctx, status); err != nil { @@ -73,6 +73,15 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel. return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err) } + // Update any conversations containing this status, and send conversation notifications. + notifications, err := s.Conversations.UpdateConversationsForStatus(ctx, status) + if err != nil { + return gtserror.Newf("error updating conversations for status %s: %w", status.ID, err) + } + for _, notification := range notifications { + s.Stream.Conversation(ctx, notification.AccountID, notification.Conversation) + } + return nil } diff --git a/internal/processing/workers/util.go b/internal/processing/workers/util.go index 780e5ca14..915370976 100644 --- a/internal/processing/workers/util.go +++ b/internal/processing/workers/util.go @@ -137,6 +137,11 @@ func (u *utils) wipeStatus( errs.Appendf("error deleting status from timelines: %w", err) } + // delete this status from any conversations that it's part of + if err := u.state.DB.DeleteStatusFromConversations(ctx, statusToDelete.ID); err != nil { + errs.Appendf("error deleting status from conversations: %w", err) + } + // finally, delete the status itself if err := u.state.DB.DeleteStatusByID(ctx, statusToDelete.ID); err != nil { errs.Appendf("error deleting status: %w", err) diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go index 6b4cc07a6..c7f67b025 100644 --- a/internal/processing/workers/workers.go +++ b/internal/processing/workers/workers.go @@ -22,6 +22,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/federation" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/processing/account" + "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" "github.com/superseriousbusiness/gotosocial/internal/processing/media" "github.com/superseriousbusiness/gotosocial/internal/processing/stream" "github.com/superseriousbusiness/gotosocial/internal/state" @@ -44,6 +45,7 @@ func New( account *account.Processor, media *media.Processor, stream *stream.Processor, + conversations *conversations.Processor, ) Processor { // Init federate logic // wrapper struct. @@ -56,11 +58,12 @@ func New( // Init surface logic // wrapper struct. surface := &Surface{ - State: state, - Converter: converter, - Stream: stream, - Filter: filter, - EmailSender: emailSender, + State: state, + Converter: converter, + Stream: stream, + Filter: filter, + EmailSender: emailSender, + Conversations: conversations, } // Init shared util funcs. diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go index f66190d75..3093fd93a 100644 --- a/internal/processing/workers/workers_test.go +++ b/internal/processing/workers/workers_test.go @@ -108,6 +108,7 @@ func (suite *WorkersTestSuite) openStreams(ctx context.Context, processor *proce stream.TimelineHome, stream.TimelinePublic, stream.TimelineNotifications, + stream.TimelineDirect, } { stream, err := processor.Stream().Open(ctx, account, streamType) if err != nil { diff --git a/internal/stream/stream.go b/internal/stream/stream.go index e843a1b76..0a352133a 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -46,6 +46,10 @@ const ( // EventTypeFiltersChanged -- the user's filters // (including keywords and statuses) have changed. EventTypeFiltersChanged = "filters_changed" + + // EventTypeConversation -- a user + // should be shown an updated conversation. + EventTypeConversation = "conversation" ) const ( diff --git a/internal/typeutils/internaltofrontend.go b/internal/typeutils/internaltofrontend.go index 7d2889b05..a13304bd8 100644 --- a/internal/typeutils/internaltofrontend.go +++ b/internal/typeutils/internaltofrontend.go @@ -1739,6 +1739,67 @@ func (c *Converter) NotificationToAPINotification( }, nil } +// ConversationToAPIConversation converts a conversation into its API representation. +// The conversation status will be filtered using the notification filter context, +// and may be nil if the status was hidden. +func (c *Converter) ConversationToAPIConversation( + ctx context.Context, + conversation *gtsmodel.Conversation, + requestingAccount *gtsmodel.Account, + filters []*gtsmodel.Filter, + mutes *usermute.CompiledUserMuteList, +) (*apimodel.Conversation, error) { + apiConversation := &apimodel.Conversation{ + ID: conversation.ID, + Unread: !*conversation.Read, + } + for _, account := range conversation.OtherAccounts { + var apiAccount *apimodel.Account + blocked, err := c.state.DB.IsEitherBlocked(ctx, requestingAccount.ID, account.ID) + if err != nil { + return nil, gtserror.Newf( + "DB error checking blocks between accounts %s and %s: %w", + requestingAccount.ID, + account.ID, + err, + ) + } + if blocked || account.IsSuspended() { + apiAccount, err = c.AccountToAPIAccountBlocked(ctx, account) + } else { + apiAccount, err = c.AccountToAPIAccountPublic(ctx, account) + } + if err != nil { + return nil, gtserror.Newf( + "error converting account %s to API representation: %w", + account.ID, + err, + ) + } + apiConversation.Accounts = append(apiConversation.Accounts, *apiAccount) + } + if conversation.LastStatus != nil { + var err error + apiConversation.LastStatus, err = c.StatusToAPIStatus( + ctx, + conversation.LastStatus, + requestingAccount, + statusfilter.FilterContextNotifications, + filters, + mutes, + ) + if err != nil && !errors.Is(err, statusfilter.ErrHideStatus) { + return nil, gtserror.Newf( + "error converting status %s to API representation: %w", + conversation.LastStatus.ID, + err, + ) + } + } + + return apiConversation, nil +} + // DomainPermToAPIDomainPerm converts a gts model domin block or allow into an api domain permission. func (c *Converter) DomainPermToAPIDomainPerm( ctx context.Context, diff --git a/test/envparsing.sh b/test/envparsing.sh index 22abff48a..83dfb85fc 100755 --- a/test/envparsing.sh +++ b/test/envparsing.sh @@ -32,6 +32,8 @@ EXPECT=$(cat << "EOF" "block-mem-ratio": 2, "boost-of-ids-mem-ratio": 3, "client-mem-ratio": 0.1, + "conversation-last-status-ids-mem-ratio": 2, + "conversation-mem-ratio": 1, "emoji-category-mem-ratio": 0.1, "emoji-mem-ratio": 3, "filter-keyword-mem-ratio": 0.5,