[feature] add per-uri dereferencer locks (#2291)

This commit is contained in:
kim
2023-10-31 11:12:22 +00:00
committed by GitHub
parent 51d0a0bba5
commit ce71a5a790
54 changed files with 2432 additions and 2719 deletions

View File

@@ -115,7 +115,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u
}
// Create and pass-through a new bare-bones model for deref.
return d.enrichStatus(ctx, requestUser, uri, &gtsmodel.Status{
return d.enrichStatusSafely(ctx, requestUser, uri, &gtsmodel.Status{
Local: func() *bool { var false bool; return &false }(),
URI: uriStr,
}, nil)
@@ -131,7 +131,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u
}
// Try to update + deref existing status model.
latest, apubStatus, err := d.enrichStatus(ctx,
latest, apubStatus, err := d.enrichStatusSafely(ctx,
requestUser,
uri,
status,
@@ -140,10 +140,6 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u
if err != nil {
log.Errorf(ctx, "error enriching remote status: %v", err)
// Update fetch-at to slow re-attempts.
status.FetchedAt = time.Now()
_ = d.state.DB.UpdateStatus(ctx, status, "fetched_at")
// Fallback to existing.
return status, nil, nil
}
@@ -166,8 +162,8 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st
return nil, nil, gtserror.Newf("invalid status uri %q: %w", status.URI, err)
}
// Try to update + deref existing status model.
latest, apubStatus, err := d.enrichStatus(ctx,
// Try to update + deref the passed status model.
latest, apubStatus, err := d.enrichStatusSafely(ctx,
requestUser,
uri,
status,
@@ -189,7 +185,7 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st
// This is a more optimized form of manually enqueueing .UpdateStatus() to the federation worker, since it only enqueues update if necessary.
func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) {
// Check whether needs update.
if statusUpToDate(status) {
if !force && statusUpToDate(status) {
return
}
@@ -202,17 +198,81 @@ func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser strin
// Enqueue a worker function to re-fetch this status async.
d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
latest, apubStatus, err := d.enrichStatus(ctx, requestUser, uri, status, apubStatus)
latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, apubStatus)
if err != nil {
log.Errorf(ctx, "error enriching remote status: %v", err)
return
}
// This status was updated, re-dereference the whole thread.
d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus)
if apubStatus != nil {
// This status was updated, re-dereference the whole thread.
d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus)
}
})
}
// enrichStatusSafely wraps enrichStatus() to perform
// it within the State{}.FedLocks mutexmap, which protects
// dereferencing actions with per-URI mutex locks.
func (d *Dereferencer) enrichStatusSafely(
ctx context.Context,
requestUser string,
uri *url.URL,
status *gtsmodel.Status,
apubStatus ap.Statusable,
) (*gtsmodel.Status, ap.Statusable, error) {
uriStr := status.URI
if status.ID != "" {
// This is an existing status, first try to populate it. This
// is required by the checks below for existing tags, media etc.
if err := d.state.DB.PopulateStatus(ctx, status); err != nil {
log.Errorf(ctx, "error populating existing status %s: %v", uriStr, err)
}
}
// Acquire per-URI deref lock, wraping unlock
// to safely defer in case of panic, while still
// performing more granular unlocks when needed.
unlock := d.state.FedLocks.Lock(uriStr)
unlock = doOnce(unlock)
defer unlock()
// Perform status enrichment with passed vars.
latest, apubStatus, err := d.enrichStatus(ctx,
requestUser,
uri,
status,
apubStatus,
)
if gtserror.StatusCode(err) >= 400 {
// Update fetch-at to slow re-attempts.
status.FetchedAt = time.Now()
_ = d.state.DB.UpdateStatus(ctx, status, "fetched_at")
}
// Unlock now
// we're done.
unlock()
if errors.Is(err, db.ErrAlreadyExists) {
// Ensure AP model isn't set,
// otherwise this indicates WE
// enriched the status.
apubStatus = nil
// DATA RACE! We likely lost out to another goroutine
// in a call to db.Put(Status). Look again in DB by URI.
latest, err = d.state.DB.GetStatusByURI(ctx, status.URI)
if err != nil {
err = gtserror.Newf("error getting status %s from database after race: %w", uriStr, err)
}
}
return latest, apubStatus, err
}
// enrichStatus will enrich the given status, whether a new
// barebones model, or existing model from the database.
// It handles necessary dereferencing, database updates, etc.
@@ -258,15 +318,10 @@ func (d *Dereferencer) enrichStatus(
return nil, nil, gtserror.New("attributedTo was empty")
}
// Ensure we have the author account of the status dereferenced (+ up-to-date).
if author, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil {
if status.AccountID == "" {
// Provided status account is nil, i.e. this is a new status / author, so a deref fail is unrecoverable.
return nil, nil, gtserror.Newf("failed to dereference status author %s: %w", uri, err)
}
} else if status.AccountID != "" && status.AccountID != author.ID {
// There already existed an account for this status author, but account ID changed. This shouldn't happen!
log.Warnf(ctx, "status author account ID changed: old=%s new=%s", status.AccountID, author.ID)
// Ensure we have the author account of the status dereferenced (+ up-to-date). If this is a new status
// (i.e. status.AccountID == "") then any error here is irrecoverable. AccountID must ALWAYS be set.
if _, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil && status.AccountID == "" {
return nil, nil, gtserror.Newf("failed to dereference status author %s: %w", uri, err)
}
// ActivityPub model was recently dereferenced, so assume that passed status
@@ -303,7 +358,7 @@ func (d *Dereferencer) enrichStatus(
}
// Ensure the status' tags are populated, (changes are expected / okay).
if err := d.fetchStatusTags(ctx, latestStatus); err != nil {
if err := d.fetchStatusTags(ctx, status, latestStatus); err != nil {
return nil, nil, gtserror.Newf("error populating tags for status %s: %w", uri, err)
}
@@ -323,13 +378,6 @@ func (d *Dereferencer) enrichStatus(
//
// This is new, put the status in the database.
err := d.state.DB.PutStatus(ctx, latestStatus)
if errors.Is(err, db.ErrAlreadyExists) {
// TODO: replace this quick fix with per-URI deref locks.
latestStatus, err = d.state.DB.GetStatusByURI(ctx, latestStatus.URI)
return latestStatus, nil, err
}
if err != nil {
return nil, nil, gtserror.Newf("error putting in database: %w", err)
}
@@ -545,36 +593,41 @@ func (d *Dereferencer) threadStatus(ctx context.Context, status *gtsmodel.Status
return nil
}
func (d *Dereferencer) fetchStatusTags(ctx context.Context, status *gtsmodel.Status) error {
func (d *Dereferencer) fetchStatusTags(ctx context.Context, existing, status *gtsmodel.Status) error {
// Allocate new slice to take the yet-to-be determined tag IDs.
status.TagIDs = make([]string, len(status.Tags))
for i := range status.Tags {
placeholder := status.Tags[i]
tag := status.Tags[i]
// Look for existing tag with this name first.
tag, err := d.state.DB.GetTagByName(ctx, placeholder.Name)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
log.Errorf(ctx, "db error getting tag %s: %v", tag.Name, err)
// Look for tag in existing status with name.
existing, ok := existing.GetTagByName(tag.Name)
if ok && existing.ID != "" {
status.Tags[i] = existing
status.TagIDs[i] = existing.ID
continue
}
if tag == nil {
// Create new ID for tag name.
tag = &gtsmodel.Tag{
ID: id.NewULID(),
Name: placeholder.Name,
}
// Insert this tag with new name into the database.
if err := d.state.DB.PutTag(ctx, tag); err != nil {
log.Errorf(ctx, "db error putting tag %s: %v", tag.Name, err)
continue
}
// Look for existing tag with name in the database.
existing, err := d.state.DB.GetTagByName(ctx, tag.Name)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("db error getting tag %s: %w", tag.Name, err)
} else if existing != nil {
status.Tags[i] = existing
status.TagIDs[i] = existing.ID
continue
}
// Set the *new* tag and ID.
status.Tags[i] = tag
// Create new ID for tag.
tag.ID = id.NewULID()
// Insert this tag with new name into the database.
if err := d.state.DB.PutTag(ctx, tag); err != nil {
log.Errorf(ctx, "db error putting tag %s: %v", tag.Name, err)
continue
}
// Set new tag ID in slice.
status.TagIDs[i] = tag.ID
}
@@ -600,10 +653,10 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp
status.AttachmentIDs = make([]string, len(status.Attachments))
for i := range status.Attachments {
placeholder := status.Attachments[i]
attachment := status.Attachments[i]
// Look for existing media attachment with remoet URL first.
existing, ok := existing.GetAttachmentByRemoteURL(placeholder.RemoteURL)
existing, ok := existing.GetAttachmentByRemoteURL(attachment.RemoteURL)
if ok && existing.ID != "" && *existing.Cached {
status.Attachments[i] = existing
status.AttachmentIDs[i] = existing.ID
@@ -611,9 +664,9 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp
}
// Ensure a valid media attachment remote URL.
remoteURL, err := url.Parse(placeholder.RemoteURL)
remoteURL, err := url.Parse(attachment.RemoteURL)
if err != nil {
log.Errorf(ctx, "invalid remote media url %q: %v", placeholder.RemoteURL, err)
log.Errorf(ctx, "invalid remote media url %q: %v", attachment.RemoteURL, err)
continue
}
@@ -622,9 +675,9 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp
return tsport.DereferenceMedia(ctx, remoteURL)
}, status.AccountID, &media.AdditionalMediaInfo{
StatusID: &status.ID,
RemoteURL: &placeholder.RemoteURL,
Description: &placeholder.Description,
Blurhash: &placeholder.Blurhash,
RemoteURL: &attachment.RemoteURL,
Description: &attachment.Description,
Blurhash: &attachment.Blurhash,
})
if err != nil {
log.Errorf(ctx, "error processing attachment: %v", err)
@@ -632,15 +685,15 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp
}
// Force attachment loading *right now*.
media, err := processing.LoadAttachment(ctx)
attachment, err = processing.LoadAttachment(ctx)
if err != nil {
log.Errorf(ctx, "error loading attachment: %v", err)
continue
}
// Set the *new* attachment and ID.
status.Attachments[i] = media
status.AttachmentIDs[i] = media.ID
status.Attachments[i] = attachment
status.AttachmentIDs[i] = attachment.ID
}
for i := 0; i < len(status.AttachmentIDs); {