[bugfix] Fix possible race condition in federatingdb (#490)

Signed-off-by: kim <grufwub@gmail.com>
This commit is contained in:
kim 2022-04-28 10:18:27 +01:00 committed by GitHub
parent 8e80f983b3
commit cc5f2e98b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 131 additions and 220 deletions

4
go.mod
View File

@ -4,6 +4,7 @@ go 1.18
require ( require (
codeberg.org/gruf/go-errors v1.0.5 codeberg.org/gruf/go-errors v1.0.5
codeberg.org/gruf/go-mutexes v1.1.2
codeberg.org/gruf/go-runners v1.2.0 codeberg.org/gruf/go-runners v1.2.0
codeberg.org/gruf/go-store v1.3.6 codeberg.org/gruf/go-store v1.3.6
github.com/ReneKroon/ttlcache v1.7.0 github.com/ReneKroon/ttlcache v1.7.0
@ -31,7 +32,7 @@ require (
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.10.0 github.com/spf13/viper v1.10.0
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
github.com/superseriousbusiness/activity v1.0.1-0.20220405135100-18e8f86a760a github.com/superseriousbusiness/activity v1.1.0-gts
github.com/superseriousbusiness/exif-terminator v0.2.0 github.com/superseriousbusiness/exif-terminator v0.2.0
github.com/superseriousbusiness/oauth2/v4 v4.3.2-SSB github.com/superseriousbusiness/oauth2/v4 v4.3.2-SSB
github.com/tdewolff/minify/v2 v2.9.22 github.com/tdewolff/minify/v2 v2.9.22
@ -54,7 +55,6 @@ require (
codeberg.org/gruf/go-fastpath v1.0.2 // indirect codeberg.org/gruf/go-fastpath v1.0.2 // indirect
codeberg.org/gruf/go-format v1.0.3 // indirect codeberg.org/gruf/go-format v1.0.3 // indirect
codeberg.org/gruf/go-hashenc v1.0.1 // indirect codeberg.org/gruf/go-hashenc v1.0.1 // indirect
codeberg.org/gruf/go-mutexes v1.1.2 // indirect
codeberg.org/gruf/go-pools v1.0.2 // indirect codeberg.org/gruf/go-pools v1.0.2 // indirect
github.com/aymerick/douceur v0.2.0 // indirect github.com/aymerick/douceur v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect

4
go.sum
View File

@ -535,8 +535,8 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/superseriousbusiness/activity v1.0.1-0.20220405135100-18e8f86a760a h1:tKr18ijUgZ+PCM/n+St6uO2BaPgkReRtM3IJHC/Otf4= github.com/superseriousbusiness/activity v1.1.0-gts h1:BSnMzs/84s0Zme7BngE9iJAHV7g1Bv1nhLCP0aJtU3I=
github.com/superseriousbusiness/activity v1.0.1-0.20220405135100-18e8f86a760a/go.mod h1:AZw0Xb4Oju8rmaJCZ21gc5CPg47MmNgyac+Hx5jo8VM= github.com/superseriousbusiness/activity v1.1.0-gts/go.mod h1:AZw0Xb4Oju8rmaJCZ21gc5CPg47MmNgyac+Hx5jo8VM=
github.com/superseriousbusiness/exif-terminator v0.2.0 h1:C21KOUr54E37qTqYS7WJX0J83sNzzCwBEy0KXyDprqU= github.com/superseriousbusiness/exif-terminator v0.2.0 h1:C21KOUr54E37qTqYS7WJX0J83sNzzCwBEy0KXyDprqU=
github.com/superseriousbusiness/exif-terminator v0.2.0/go.mod h1:DHJuKguXqyOVqB/oyOylutEDIZCbkYsn2GZFNSUDT9E= github.com/superseriousbusiness/exif-terminator v0.2.0/go.mod h1:DHJuKguXqyOVqB/oyOylutEDIZCbkYsn2GZFNSUDT9E=
github.com/superseriousbusiness/go-jpeg-image-structure/v2 v2.0.0-20220321154430-d89a106fdabe h1:ksl2oCx/Qo8sNDc3Grb8WGKBM9nkvhCm25uvlT86azE= github.com/superseriousbusiness/go-jpeg-image-structure/v2 v2.0.0-20220321154430-d89a106fdabe h1:ksl2oCx/Qo8sNDc3Grb8WGKBM9nkvhCm25uvlT86azE=

View File

@ -20,9 +20,8 @@ package federatingdb
import ( import (
"context" "context"
"sync"
"time"
"codeberg.org/gruf/go-mutexes"
"github.com/superseriousbusiness/activity/pub" "github.com/superseriousbusiness/activity/pub"
"github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/activity/streams/vocab"
"github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/db"
@ -41,9 +40,7 @@ type DB interface {
// FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface. // FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface.
// It doesn't care what the underlying implementation of the DB interface is, as long as it works. // It doesn't care what the underlying implementation of the DB interface is, as long as it works.
type federatingDB struct { type federatingDB struct {
mutex sync.Mutex locks mutexes.MutexMap
locks map[string]*mutex
pool sync.Pool
db db.DB db db.DB
typeConverter typeutils.TypeConverter typeConverter typeutils.TypeConverter
} }
@ -51,29 +48,9 @@ type federatingDB struct {
// New returns a DB interface using the given database and config // New returns a DB interface using the given database and config
func New(db db.DB) DB { func New(db db.DB) DB {
fdb := federatingDB{ fdb := federatingDB{
mutex: sync.Mutex{}, locks: mutexes.NewMap(-1, -1), // use defaults
locks: make(map[string]*mutex, 100),
pool: sync.Pool{New: func() interface{} { return &mutex{} }},
db: db, db: db,
typeConverter: typeutils.NewConverter(db), typeConverter: typeutils.NewConverter(db),
} }
go fdb.cleanupLocks()
return &fdb return &fdb
} }
func (db *federatingDB) cleanupLocks() {
for {
// Sleep for a minute...
time.Sleep(time.Minute)
// Delete unused locks from map
db.mutex.Lock()
for id, mu := range db.locks {
if !mu.inUse() {
delete(db.locks, id)
db.pool.Put(mu)
}
}
db.mutex.Unlock()
}
}

View File

@ -22,10 +22,6 @@ import (
"context" "context"
"errors" "errors"
"net/url" "net/url"
"sync"
"sync/atomic"
"github.com/sirupsen/logrus"
) )
// Lock takes a lock for the object at the specified id. If an error // Lock takes a lock for the object at the specified id. If an error
@ -39,83 +35,10 @@ import (
// processes require tight loops acquiring and releasing locks. // processes require tight loops acquiring and releasing locks.
// //
// Used to ensure race conditions in multiple requests do not occur. // Used to ensure race conditions in multiple requests do not occur.
func (f *federatingDB) Lock(c context.Context, id *url.URL) error { func (f *federatingDB) Lock(c context.Context, id *url.URL) (func(), error) {
// Before any other Database methods are called, the relevant `id`
// entries are locked to allow for fine-grained concurrency.
// Strategy: create a new lock, if stored, continue. Otherwise, lock the
// existing mutex.
if id == nil { if id == nil {
return errors.New("Lock: id was nil") return nil, errors.New("Lock: id was nil")
} }
idStr := id.String() unlock := f.locks.Lock(id.String())
return unlock, nil
// Acquire map lock
f.mutex.Lock()
// Get mutex, or create new
mu, ok := f.locks[idStr]
if !ok {
mu, ok = f.pool.Get().(*mutex)
if !ok {
logrus.Panic("Lock: pool entry was not a *mutex")
}
f.locks[idStr] = mu
}
// Unlock map, acquire mutex lock
f.mutex.Unlock()
mu.Lock()
return nil
}
// Unlock makes the lock for the object at the specified id available.
// If an error is returned, the lock must have still been freed.
//
// Used to ensure race conditions in multiple requests do not occur.
func (f *federatingDB) Unlock(c context.Context, id *url.URL) error {
// Once Go-Fed is done calling Database methods, the relevant `id`
// entries are unlocked.
if id == nil {
return errors.New("Unlock: id was nil")
}
idStr := id.String()
// Check map for mutex
f.mutex.Lock()
mu, ok := f.locks[idStr]
f.mutex.Unlock()
if !ok {
return errors.New("missing an id in unlock")
}
// Unlock the mutex
mu.Unlock()
return nil
}
// mutex defines a mutex we can check the lock status of.
// this is not perfect, but it's good enough for a semi
// regular mutex cleanup routine
type mutex struct {
mu sync.Mutex
st uint32
}
// inUse returns if the mutex is in use
func (mu *mutex) inUse() bool {
return atomic.LoadUint32(&mu.st) == 1
}
// Lock acquire mutex lock
func (mu *mutex) Lock() {
mu.mu.Lock()
atomic.StoreUint32(&mu.st, 1)
}
// Unlock releases mutex lock
func (mu *mutex) Unlock() {
mu.mu.Unlock()
atomic.StoreUint32(&mu.st, 0)
} }

View File

@ -19,12 +19,7 @@ type Database interface {
// processes require tight loops acquiring and releasing locks. // processes require tight loops acquiring and releasing locks.
// //
// Used to ensure race conditions in multiple requests do not occur. // Used to ensure race conditions in multiple requests do not occur.
Lock(c context.Context, id *url.URL) error Lock(c context.Context, id *url.URL) (unlock func(), err error)
// Unlock makes the lock for the object at the specified id available.
// If an error is returned, the lock must have still been freed.
//
// Used to ensure race conditions in multiple requests do not occur.
Unlock(c context.Context, id *url.URL) error
// InboxContains returns true if the OrderedCollection at 'inbox' // InboxContains returns true if the OrderedCollection at 'inbox'
// contains the specified 'id'. // contains the specified 'id'.
// //

View File

@ -263,11 +263,12 @@ func (w FederatingWrappedCallbacks) create(c context.Context, a vocab.ActivitySt
if err != nil { if err != nil {
return err return err
} }
err = w.db.Lock(c, id) var unlock func()
unlock, err = w.db.Lock(c, id)
if err != nil { if err != nil {
return err return err
} }
defer w.db.Unlock(c, id) defer unlock()
if err := w.db.Create(c, t); err != nil { if err := w.db.Create(c, t); err != nil {
return err return err
} }
@ -304,11 +305,12 @@ func (w FederatingWrappedCallbacks) update(c context.Context, a vocab.ActivitySt
if err != nil { if err != nil {
return err return err
} }
err = w.db.Lock(c, id) var unlock func()
unlock, err = w.db.Lock(c, id)
if err != nil { if err != nil {
return err return err
} }
defer w.db.Unlock(c, id) defer unlock()
if err := w.db.Update(c, t); err != nil { if err := w.db.Update(c, t); err != nil {
return err return err
} }
@ -341,11 +343,12 @@ func (w FederatingWrappedCallbacks) deleteFn(c context.Context, a vocab.Activity
if err != nil { if err != nil {
return err return err
} }
err = w.db.Lock(c, id) var unlock func()
unlock, err = w.db.Lock(c, id)
if err != nil { if err != nil {
return err return err
} }
defer w.db.Unlock(c, id) defer unlock()
if err := w.db.Delete(c, id); err != nil { if err := w.db.Delete(c, id); err != nil {
return err return err
} }
@ -373,16 +376,16 @@ func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivitySt
// //
// If not then don't send a response. It was federated to us as an FYI, // If not then don't send a response. It was federated to us as an FYI,
// by mistake, or some other reason. // by mistake, or some other reason.
if err := w.db.Lock(c, w.inboxIRI); err != nil { unlock, err := w.db.Lock(c, w.inboxIRI)
if err != nil {
return err return err
} }
// WARNING: Unlock not deferred. // WARNING: Unlock not deferred.
actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI) actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI)
unlock() // unlock even on error
if err != nil { if err != nil {
w.db.Unlock(c, w.inboxIRI)
return err return err
} }
w.db.Unlock(c, w.inboxIRI)
// Unlock must be called by now and every branch above. // Unlock must be called by now and every branch above.
isMe := false isMe := false
if w.OnFollow != OnFollowDoNothing { if w.OnFollow != OnFollowDoNothing {
@ -434,13 +437,14 @@ func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivitySt
// //
// If automatically rejecting, do not update the // If automatically rejecting, do not update the
// followers collection. // followers collection.
if err := w.db.Lock(c, actorIRI); err != nil { unlock, err := w.db.Lock(c, actorIRI)
if err != nil {
return err return err
} }
// WARNING: Unlock not deferred. // WARNING: Unlock not deferred.
followers, err := w.db.Followers(c, actorIRI) followers, err := w.db.Followers(c, actorIRI)
if err != nil { if err != nil {
w.db.Unlock(c, actorIRI) unlock()
return err return err
} }
items := followers.GetActivityStreamsItems() items := followers.GetActivityStreamsItems()
@ -451,21 +455,23 @@ func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivitySt
for _, elem := range recipients { for _, elem := range recipients {
items.PrependIRI(elem) items.PrependIRI(elem)
} }
if err = w.db.Update(c, followers); err != nil { err = w.db.Update(c, followers)
w.db.Unlock(c, actorIRI) unlock() // unlock even on error
if err != nil {
return err return err
} }
w.db.Unlock(c, actorIRI)
// Unlock must be called by now and every branch above. // Unlock must be called by now and every branch above.
} }
// Lock without defer! // Lock without defer!
w.db.Lock(c, w.inboxIRI) unlock, err := w.db.Lock(c, w.inboxIRI)
outboxIRI, err := w.db.OutboxForInbox(c, w.inboxIRI) if err != nil {
return err
}
outboxIRI, err := w.db.OutboxForInbox(c, w.inboxIRI)
unlock() // unlock after, regardless
if err != nil { if err != nil {
w.db.Unlock(c, w.inboxIRI)
return err return err
} }
w.db.Unlock(c, w.inboxIRI)
// Everything must be unlocked by now. // Everything must be unlocked by now.
if err := w.addNewIds(c, response); err != nil { if err := w.addNewIds(c, response); err != nil {
return err return err
@ -484,16 +490,16 @@ func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivitySt
op := a.GetActivityStreamsObject() op := a.GetActivityStreamsObject()
if op != nil && op.Len() > 0 { if op != nil && op.Len() > 0 {
// Get this actor's id. // Get this actor's id.
if err := w.db.Lock(c, w.inboxIRI); err != nil { unlock, err := w.db.Lock(c, w.inboxIRI)
if err != nil {
return err return err
} }
// WARNING: Unlock not deferred. // WARNING: Unlock not deferred.
actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI) actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI)
unlock() // unlock after regardless
if err != nil { if err != nil {
w.db.Unlock(c, w.inboxIRI)
return err return err
} }
w.db.Unlock(c, w.inboxIRI)
// Unlock must be called by now and every branch above. // Unlock must be called by now and every branch above.
// //
// Determine if we are in a follow on the 'object' property. // Determine if we are in a follow on the 'object' property.
@ -568,10 +574,11 @@ func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivitySt
// Use an anonymous function to properly scope the // Use an anonymous function to properly scope the
// database lock, immediately call it. // database lock, immediately call it.
err = func() error { err = func() error {
if err := w.db.Lock(c, maybeMyFollowIRI); err != nil { unlock, err := w.db.Lock(c, maybeMyFollowIRI)
if err != nil {
return err return err
} }
defer w.db.Unlock(c, maybeMyFollowIRI) defer unlock()
t, err := w.db.Get(c, maybeMyFollowIRI) t, err := w.db.Get(c, maybeMyFollowIRI)
if err != nil { if err != nil {
return err return err
@ -630,13 +637,14 @@ func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivitySt
return err return err
} }
// Add the peer to our following collection. // Add the peer to our following collection.
if err := w.db.Lock(c, actorIRI); err != nil { unlock, err := w.db.Lock(c, actorIRI)
if err != nil {
return err return err
} }
// WARNING: Unlock not deferred. // WARNING: Unlock not deferred.
following, err := w.db.Following(c, actorIRI) following, err := w.db.Following(c, actorIRI)
if err != nil { if err != nil {
w.db.Unlock(c, actorIRI) unlock()
return err return err
} }
items := following.GetActivityStreamsItems() items := following.GetActivityStreamsItems()
@ -647,16 +655,16 @@ func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivitySt
for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() { for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() {
id, err := ToId(iter) id, err := ToId(iter)
if err != nil { if err != nil {
w.db.Unlock(c, actorIRI) unlock()
return err return err
} }
items.PrependIRI(id) items.PrependIRI(id)
} }
if err = w.db.Update(c, following); err != nil { err = w.db.Update(c, following)
w.db.Unlock(c, actorIRI) unlock() // unlock after regardless
if err != nil {
return err return err
} }
w.db.Unlock(c, actorIRI)
// Unlock must be called by now and every branch above. // Unlock must be called by now and every branch above.
} }
} }
@ -729,10 +737,11 @@ func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStre
if err != nil { if err != nil {
return err return err
} }
if err := w.db.Lock(c, objId); err != nil { unlock, err := w.db.Lock(c, objId)
if err != nil {
return err return err
} }
defer w.db.Unlock(c, objId) defer unlock()
if owns, err := w.db.Owns(c, objId); err != nil { if owns, err := w.db.Owns(c, objId); err != nil {
return err return err
} else if !owns { } else if !owns {
@ -810,10 +819,11 @@ func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.Activity
if err != nil { if err != nil {
return err return err
} }
if err := w.db.Lock(c, objId); err != nil { unlock, err := w.db.Lock(c, objId)
if err != nil {
return err return err
} }
defer w.db.Unlock(c, objId) defer unlock()
if owns, err := w.db.Owns(c, objId); err != nil { if owns, err := w.db.Owns(c, objId); err != nil {
return err return err
} else if !owns { } else if !owns {

View File

@ -64,18 +64,20 @@ func NewActivityStreamsHandlerScheme(db Database, clock Clock, scheme string) Ha
} }
isASRequest = true isASRequest = true
id := requestId(r, scheme) id := requestId(r, scheme)
var unlock func()
// Lock and obtain a copy of the requested ActivityStreams value // Lock and obtain a copy of the requested ActivityStreams value
err = db.Lock(c, id) unlock, err = db.Lock(c, id)
if err != nil { if err != nil {
return return
} }
// WARNING: Unlock not deferred // WARNING: Unlock not deferred
t, err := db.Get(c, id) t, err := db.Get(c, id)
unlock() // unlock even on error
if err != nil { if err != nil {
db.Unlock(c, id)
return return
} }
db.Unlock(c, id)
// Unlock must have been called by this point and in every // Unlock must have been called by this point and in every
// branch above // branch above
if t == nil { if t == nil {

View File

@ -148,7 +148,7 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL,
// Obtain the id of the activity // Obtain the id of the activity
id := activity.GetJSONLDId() id := activity.GetJSONLDId()
// Acquire a lock for the id. To be held for the rest of execution. // Acquire a lock for the id. To be held for the rest of execution.
err := a.db.Lock(c, id.Get()) unlock, err := a.db.Lock(c, id.Get())
if err != nil { if err != nil {
return err return err
} }
@ -157,19 +157,18 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL,
// If the database already contains the activity, exit early. // If the database already contains the activity, exit early.
exists, err := a.db.Exists(c, id.Get()) exists, err := a.db.Exists(c, id.Get())
if err != nil { if err != nil {
a.db.Unlock(c, id.Get()) unlock()
return err return err
} else if exists { } else if exists {
a.db.Unlock(c, id.Get()) unlock()
return nil return nil
} }
// Attempt to create the activity entry. // Attempt to create the activity entry.
err = a.db.Create(c, activity) err = a.db.Create(c, activity)
unlock() // unlock even on error return
if err != nil { if err != nil {
a.db.Unlock(c, id.Get())
return err return err
} }
a.db.Unlock(c, id.Get())
// Unlock by this point and in every branch above. // Unlock by this point and in every branch above.
// //
// 2. The values of 'to', 'cc', or 'audience' are Collections owned by // 2. The values of 'to', 'cc', or 'audience' are Collections owned by
@ -212,19 +211,19 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL,
if err != nil { if err != nil {
return err return err
} }
err = a.db.Lock(c, iri) var unlock func()
unlock, err = a.db.Lock(c, iri)
if err != nil { if err != nil {
return err return err
} }
// WARNING: Unlock is not deferred // WARNING: Unlock is not deferred
if owns, err := a.db.Owns(c, iri); err != nil { owns, err := a.db.Owns(c, iri)
a.db.Unlock(c, iri) unlock() // unlock even on error
if err != nil {
return err return err
} else if !owns { } else if !owns {
a.db.Unlock(c, iri)
continue continue
} }
a.db.Unlock(c, iri)
// Unlock by this point and in every branch above. // Unlock by this point and in every branch above.
myIRIs = append(myIRIs, iri) myIRIs = append(myIRIs, iri)
} }
@ -236,7 +235,8 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL,
col := make(map[string]itemser) col := make(map[string]itemser)
oCol := make(map[string]orderedItemser) oCol := make(map[string]orderedItemser)
for _, iri := range myIRIs { for _, iri := range myIRIs {
err = a.db.Lock(c, iri) var unlock func()
unlock, err = a.db.Lock(c, iri)
if err != nil { if err != nil {
return err return err
} }
@ -249,20 +249,20 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL,
if im, ok := t.(orderedItemser); ok { if im, ok := t.(orderedItemser); ok {
oCol[iri.String()] = im oCol[iri.String()] = im
colIRIs = append(colIRIs, iri) colIRIs = append(colIRIs, iri)
defer a.db.Unlock(c, iri) defer unlock()
} else { } else {
a.db.Unlock(c, iri) unlock() // unlock instantly
} }
} else if streams.IsOrExtendsActivityStreamsCollection(t) { } else if streams.IsOrExtendsActivityStreamsCollection(t) {
if im, ok := t.(itemser); ok { if im, ok := t.(itemser); ok {
col[iri.String()] = im col[iri.String()] = im
colIRIs = append(colIRIs, iri) colIRIs = append(colIRIs, iri)
defer a.db.Unlock(c, iri) defer unlock()
} else { } else {
a.db.Unlock(c, iri) unlock() // unlock instantly
} }
} else { } else {
a.db.Unlock(c, iri) unlock() // unlock instantly
} }
} }
// If we own none of the Collection IRIs in 'to', 'cc', or 'audience' // If we own none of the Collection IRIs in 'to', 'cc', or 'audience'
@ -409,17 +409,17 @@ func (a *sideEffectActor) Deliver(c context.Context, outboxIRI *url.URL, activit
// WrapInCreate wraps an object with a Create activity. // WrapInCreate wraps an object with a Create activity.
func (a *sideEffectActor) WrapInCreate(c context.Context, obj vocab.Type, outboxIRI *url.URL) (create vocab.ActivityStreamsCreate, err error) { func (a *sideEffectActor) WrapInCreate(c context.Context, obj vocab.Type, outboxIRI *url.URL) (create vocab.ActivityStreamsCreate, err error) {
err = a.db.Lock(c, outboxIRI) var unlock func()
unlock, err = a.db.Lock(c, outboxIRI)
if err != nil { if err != nil {
return return
} }
// WARNING: No deferring the Unlock // WARNING: No deferring the Unlock
actorIRI, err := a.db.ActorForOutbox(c, outboxIRI) actorIRI, err := a.db.ActorForOutbox(c, outboxIRI)
unlock() // unlock after regardless
if err != nil { if err != nil {
a.db.Unlock(c, outboxIRI)
return return
} }
a.db.Unlock(c, outboxIRI)
// Unlock the lock at this point and every branch above // Unlock the lock at this point and every branch above
return wrapInCreate(c, obj, actorIRI) return wrapInCreate(c, obj, actorIRI)
} }
@ -447,26 +447,25 @@ func (a *sideEffectActor) deliverToRecipients(c context.Context, boxIRI *url.URL
func (a *sideEffectActor) addToOutbox(c context.Context, outboxIRI *url.URL, activity Activity) error { func (a *sideEffectActor) addToOutbox(c context.Context, outboxIRI *url.URL, activity Activity) error {
// Set the activity in the database first. // Set the activity in the database first.
id := activity.GetJSONLDId() id := activity.GetJSONLDId()
err := a.db.Lock(c, id.Get()) unlock, err := a.db.Lock(c, id.Get())
if err != nil { if err != nil {
return err return err
} }
// WARNING: Unlock not deferred // WARNING: Unlock not deferred
err = a.db.Create(c, activity) err = a.db.Create(c, activity)
unlock() // unlock after regardless
if err != nil { if err != nil {
a.db.Unlock(c, id.Get())
return err return err
} }
a.db.Unlock(c, id.Get())
// WARNING: Unlock(c, id) should be called by this point and in every // WARNING: Unlock(c, id) should be called by this point and in every
// return before here. // return before here.
// //
// Acquire a lock to read the outbox. Defer release. // Acquire a lock to read the outbox. Defer release.
err = a.db.Lock(c, outboxIRI) unlock, err = a.db.Lock(c, outboxIRI)
if err != nil { if err != nil {
return err return err
} }
defer a.db.Unlock(c, outboxIRI) defer unlock()
outbox, err := a.db.GetOutbox(c, outboxIRI) outbox, err := a.db.GetOutbox(c, outboxIRI)
if err != nil { if err != nil {
return err return err
@ -491,11 +490,12 @@ func (a *sideEffectActor) addToOutbox(c context.Context, outboxIRI *url.URL, act
// Returns true when the activity is novel. // Returns true when the activity is novel.
func (a *sideEffectActor) addToInboxIfNew(c context.Context, inboxIRI *url.URL, activity Activity) (isNew bool, err error) { func (a *sideEffectActor) addToInboxIfNew(c context.Context, inboxIRI *url.URL, activity Activity) (isNew bool, err error) {
// Acquire a lock to read the inbox. Defer release. // Acquire a lock to read the inbox. Defer release.
err = a.db.Lock(c, inboxIRI) var unlock func()
unlock, err = a.db.Lock(c, inboxIRI)
if err != nil { if err != nil {
return return
} }
defer a.db.Unlock(c, inboxIRI) defer unlock()
// Obtain the id of the activity // Obtain the id of the activity
id := activity.GetJSONLDId() id := activity.GetJSONLDId()
// If the inbox already contains the URL, early exit. // If the inbox already contains the URL, early exit.
@ -539,19 +539,18 @@ func (a *sideEffectActor) hasInboxForwardingValues(c context.Context, inboxIRI *
types, iris := getInboxForwardingValues(val) types, iris := getInboxForwardingValues(val)
// For IRIs, simply check if we own them. // For IRIs, simply check if we own them.
for _, iri := range iris { for _, iri := range iris {
err := a.db.Lock(c, iri) unlock, err := a.db.Lock(c, iri)
if err != nil { if err != nil {
return false, err return false, err
} }
// WARNING: Unlock is not deferred // WARNING: Unlock is not deferred
if owns, err := a.db.Owns(c, iri); err != nil { owns, err := a.db.Owns(c, iri)
a.db.Unlock(c, iri) unlock() // unlock after regardless
if err != nil {
return false, err return false, err
} else if owns { } else if owns {
a.db.Unlock(c, iri)
return true, nil return true, nil
} }
a.db.Unlock(c, iri)
// Unlock by this point and in every branch above // Unlock by this point and in every branch above
} }
// For embedded literals, check the id. // For embedded literals, check the id.
@ -560,19 +559,19 @@ func (a *sideEffectActor) hasInboxForwardingValues(c context.Context, inboxIRI *
if err != nil { if err != nil {
return false, err return false, err
} }
err = a.db.Lock(c, id) var unlock func()
unlock, err = a.db.Lock(c, id)
if err != nil { if err != nil {
return false, err return false, err
} }
// WARNING: Unlock is not deferred // WARNING: Unlock is not deferred
if owns, err := a.db.Owns(c, id); err != nil { owns, err := a.db.Owns(c, id)
a.db.Unlock(c, id) unlock() // unlock after regardless
if err != nil {
return false, err return false, err
} else if owns { } else if owns {
a.db.Unlock(c, id)
return true, nil return true, nil
} }
a.db.Unlock(c, id)
// Unlock by this point and in every branch above // Unlock by this point and in every branch above
} }
// Recur Preparation: Try fetching the IRIs so we can recur into them. // Recur Preparation: Try fetching the IRIs so we can recur into them.
@ -683,7 +682,8 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit
foundInboxesFromDB := []*url.URL{} foundInboxesFromDB := []*url.URL{}
for _, actorIRI := range r { for _, actorIRI := range r {
// BEGIN LOCK // BEGIN LOCK
err = a.db.Lock(c, actorIRI) var unlock func()
unlock, err = a.db.Lock(c, actorIRI)
if err != nil { if err != nil {
return return
} }
@ -691,7 +691,7 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit
inboxes, err := a.db.InboxesForIRI(c, actorIRI) inboxes, err := a.db.InboxesForIRI(c, actorIRI)
if err != nil { if err != nil {
// bail on error // bail on error
a.db.Unlock(c, actorIRI) unlock()
return nil, err return nil, err
} }
@ -699,16 +699,13 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit
// we have a hit // we have a hit
foundInboxesFromDB = append(foundInboxesFromDB, inboxes...) foundInboxesFromDB = append(foundInboxesFromDB, inboxes...)
// if we found inboxes for this iri, we should remove it from // if we found inboxes for this iri, we should remove it from
// the list of actors/iris we still need to dereference // the list of actors/iris we still need to dereference
r = removeOne(r, actorIRI) r = removeOne(r, actorIRI)
} }
// END LOCK // END LOCK
a.db.Unlock(c, actorIRI) unlock()
if err != nil {
return nil, err
}
} }
// look for any actors' inboxes that weren't already discovered above; // look for any actors' inboxes that weren't already discovered above;
@ -733,25 +730,25 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit
targets = append(targets, foundInboxesFromRemote...) targets = append(targets, foundInboxesFromRemote...)
// Get inboxes of sender. // Get inboxes of sender.
err = a.db.Lock(c, outboxIRI) var unlock func()
unlock, err = a.db.Lock(c, outboxIRI)
if err != nil { if err != nil {
return return
} }
// WARNING: No deferring the Unlock // WARNING: No deferring the Unlock
actorIRI, err := a.db.ActorForOutbox(c, outboxIRI) actorIRI, err := a.db.ActorForOutbox(c, outboxIRI)
unlock() // unlock after regardless
if err != nil { if err != nil {
a.db.Unlock(c, outboxIRI)
return return
} }
a.db.Unlock(c, outboxIRI)
// Get the inbox on the sender. // Get the inbox on the sender.
err = a.db.Lock(c, actorIRI) unlock, err = a.db.Lock(c, actorIRI)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// BEGIN LOCK // BEGIN LOCK
thisActor, err := a.db.Get(c, actorIRI) thisActor, err := a.db.Get(c, actorIRI)
a.db.Unlock(c, actorIRI) unlock()
// END LOCK -- Still need to handle err // END LOCK -- Still need to handle err
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -259,11 +259,12 @@ func (w SocialWrappedCallbacks) create(c context.Context, a vocab.ActivityStream
if err != nil { if err != nil {
return err return err
} }
err = w.db.Lock(c, id) var unlock func()
unlock, err = w.db.Lock(c, id)
if err != nil { if err != nil {
return err return err
} }
defer w.db.Unlock(c, id) defer unlock()
if err := w.db.Create(c, obj); err != nil { if err := w.db.Create(c, obj); err != nil {
return err return err
} }
@ -301,11 +302,11 @@ func (w SocialWrappedCallbacks) update(c context.Context, a vocab.ActivityStream
// Create anonymous loop function to be able to properly scope the defer // Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration. // for the database lock at each iteration.
loopFn := func(idx int, loopId *url.URL) error { loopFn := func(idx int, loopId *url.URL) error {
err := w.db.Lock(c, loopId) unlock, err := w.db.Lock(c, loopId)
if err != nil { if err != nil {
return err return err
} }
defer w.db.Unlock(c, loopId) defer unlock()
t, err := w.db.Get(c, loopId) t, err := w.db.Get(c, loopId)
if err != nil { if err != nil {
return err return err
@ -371,11 +372,11 @@ func (w SocialWrappedCallbacks) deleteFn(c context.Context, a vocab.ActivityStre
// Create anonymous loop function to be able to properly scope the defer // Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration. // for the database lock at each iteration.
loopFn := func(idx int, loopId *url.URL) error { loopFn := func(idx int, loopId *url.URL) error {
err := w.db.Lock(c, loopId) unlock, err := w.db.Lock(c, loopId)
if err != nil { if err != nil {
return err return err
} }
defer w.db.Unlock(c, loopId) defer unlock()
t, err := w.db.Get(c, loopId) t, err := w.db.Get(c, loopId)
if err != nil { if err != nil {
return err return err
@ -458,23 +459,24 @@ func (w SocialWrappedCallbacks) like(c context.Context, a vocab.ActivityStreamsL
return ErrObjectRequired return ErrObjectRequired
} }
// Get this actor's IRI. // Get this actor's IRI.
if err := w.db.Lock(c, w.outboxIRI); err != nil { unlock, err := w.db.Lock(c, w.outboxIRI)
if err != nil {
return err return err
} }
// WARNING: Unlock not deferred. // WARNING: Unlock not deferred.
actorIRI, err := w.db.ActorForOutbox(c, w.outboxIRI) actorIRI, err := w.db.ActorForOutbox(c, w.outboxIRI)
unlock() // unlock even on error
if err != nil { if err != nil {
w.db.Unlock(c, w.outboxIRI)
return err return err
} }
w.db.Unlock(c, w.outboxIRI)
// Unlock must be called by now and every branch above. // Unlock must be called by now and every branch above.
// //
// Now obtain this actor's 'liked' collection. // Now obtain this actor's 'liked' collection.
if err := w.db.Lock(c, actorIRI); err != nil { unlock, err = w.db.Lock(c, actorIRI)
if err != nil {
return err return err
} }
defer w.db.Unlock(c, actorIRI) defer unlock()
liked, err := w.db.Liked(c, actorIRI) liked, err := w.db.Liked(c, actorIRI)
if err != nil { if err != nil {
return err return err

View File

@ -753,7 +753,8 @@ func mustHaveActivityActorsMatchObjectActors(c context.Context,
actors vocab.ActivityStreamsActorProperty, actors vocab.ActivityStreamsActorProperty,
op vocab.ActivityStreamsObjectProperty, op vocab.ActivityStreamsObjectProperty,
newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error), newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error),
boxIRI *url.URL) error { boxIRI *url.URL,
) error {
activityActorMap := make(map[string]bool, actors.Len()) activityActorMap := make(map[string]bool, actors.Len())
for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() { for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
id, err := ToId(iter) id, err := ToId(iter)
@ -808,7 +809,8 @@ func mustHaveActivityActorsMatchObjectActors(c context.Context,
func add(c context.Context, func add(c context.Context,
op vocab.ActivityStreamsObjectProperty, op vocab.ActivityStreamsObjectProperty,
target vocab.ActivityStreamsTargetProperty, target vocab.ActivityStreamsTargetProperty,
db Database) error { db Database,
) error {
opIds := make([]*url.URL, 0, op.Len()) opIds := make([]*url.URL, 0, op.Len())
for iter := op.Begin(); iter != op.End(); iter = iter.Next() { for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
id, err := ToId(iter) id, err := ToId(iter)
@ -828,10 +830,11 @@ func add(c context.Context,
// Create anonymous loop function to be able to properly scope the defer // Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration. // for the database lock at each iteration.
loopFn := func(t *url.URL) error { loopFn := func(t *url.URL) error {
if err := db.Lock(c, t); err != nil { unlock, err := db.Lock(c, t)
if err != nil {
return err return err
} }
defer db.Unlock(c, t) defer unlock()
if owns, err := db.Owns(c, t); err != nil { if owns, err := db.Owns(c, t); err != nil {
return err return err
} else if !owns { } else if !owns {
@ -889,7 +892,8 @@ func add(c context.Context,
func remove(c context.Context, func remove(c context.Context,
op vocab.ActivityStreamsObjectProperty, op vocab.ActivityStreamsObjectProperty,
target vocab.ActivityStreamsTargetProperty, target vocab.ActivityStreamsTargetProperty,
db Database) error { db Database,
) error {
opIds := make(map[string]bool, op.Len()) opIds := make(map[string]bool, op.Len())
for iter := op.Begin(); iter != op.End(); iter = iter.Next() { for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
id, err := ToId(iter) id, err := ToId(iter)
@ -909,10 +913,11 @@ func remove(c context.Context,
// Create anonymous loop function to be able to properly scope the defer // Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration. // for the database lock at each iteration.
loopFn := func(t *url.URL) error { loopFn := func(t *url.URL) error {
if err := db.Lock(c, t); err != nil { unlock, err := db.Lock(c, t)
if err != nil {
return err return err
} }
defer db.Unlock(c, t) defer unlock()
if owns, err := db.Owns(c, t); err != nil { if owns, err := db.Owns(c, t); err != nil {
return err return err
} else if !owns { } else if !owns {

2
vendor/modules.txt vendored
View File

@ -284,7 +284,7 @@ github.com/stretchr/testify/suite
# github.com/subosito/gotenv v1.2.0 # github.com/subosito/gotenv v1.2.0
## explicit ## explicit
github.com/subosito/gotenv github.com/subosito/gotenv
# github.com/superseriousbusiness/activity v1.0.1-0.20220405135100-18e8f86a760a # github.com/superseriousbusiness/activity v1.1.0-gts
## explicit; go 1.18 ## explicit; go 1.18
github.com/superseriousbusiness/activity/pub github.com/superseriousbusiness/activity/pub
github.com/superseriousbusiness/activity/streams github.com/superseriousbusiness/activity/streams