mirror of
https://github.com/superseriousbusiness/gotosocial
synced 2025-06-05 21:59:39 +02:00
[chore]: Bump github.com/minio/minio-go/v7 from 7.0.44 to 7.0.47 (#1348)
Bumps [github.com/minio/minio-go/v7](https://github.com/minio/minio-go) from 7.0.44 to 7.0.47. - [Release notes](https://github.com/minio/minio-go/releases) - [Commits](https://github.com/minio/minio-go/compare/v7.0.44...v7.0.47) --- updated-dependencies: - dependency-name: github.com/minio/minio-go/v7 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
This commit is contained in:
215
vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
generated
vendored
215
vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
generated
vendored
@ -28,6 +28,7 @@ import (
|
||||
"net/url"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/minio/minio-go/v7/pkg/s3utils"
|
||||
@ -44,7 +45,9 @@ import (
|
||||
func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
|
||||
reader io.Reader, size int64, opts PutObjectOptions,
|
||||
) (info UploadInfo, err error) {
|
||||
if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
|
||||
if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
|
||||
info, err = c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
|
||||
} else if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
|
||||
// Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
|
||||
info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
|
||||
} else {
|
||||
@ -266,6 +269,9 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
|
||||
// Sort all completed parts.
|
||||
sort.Sort(completedParts(complMultipartUpload.Parts))
|
||||
|
||||
opts = PutObjectOptions{
|
||||
ServerSideEncryption: opts.ServerSideEncryption,
|
||||
}
|
||||
if withChecksum {
|
||||
// Add hash of hashes.
|
||||
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||
@ -278,7 +284,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
|
||||
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
|
||||
}
|
||||
|
||||
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
|
||||
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
|
||||
if err != nil {
|
||||
return UploadInfo{}, err
|
||||
}
|
||||
@ -425,6 +431,211 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
|
||||
// Sort all completed parts.
|
||||
sort.Sort(completedParts(complMultipartUpload.Parts))
|
||||
|
||||
opts = PutObjectOptions{
|
||||
ServerSideEncryption: opts.ServerSideEncryption,
|
||||
}
|
||||
if len(crcBytes) > 0 {
|
||||
// Add hash of hashes.
|
||||
crc.Reset()
|
||||
crc.Write(crcBytes)
|
||||
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
|
||||
}
|
||||
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
|
||||
if err != nil {
|
||||
return UploadInfo{}, err
|
||||
}
|
||||
|
||||
uploadInfo.Size = totalUploadedSize
|
||||
return uploadInfo, nil
|
||||
}
|
||||
|
||||
// putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel.
|
||||
// This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer.
|
||||
func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string,
|
||||
reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
|
||||
// Input validation.
|
||||
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
|
||||
return UploadInfo{}, err
|
||||
}
|
||||
|
||||
if err = s3utils.CheckValidObjectName(objectName); err != nil {
|
||||
return UploadInfo{}, err
|
||||
}
|
||||
|
||||
if !opts.SendContentMd5 {
|
||||
if opts.UserMetadata == nil {
|
||||
opts.UserMetadata = make(map[string]string, 1)
|
||||
}
|
||||
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
|
||||
}
|
||||
|
||||
// Cancel all when an error occurs.
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
// Calculate the optimal parts info for a given size.
|
||||
totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
|
||||
if err != nil {
|
||||
return UploadInfo{}, err
|
||||
}
|
||||
|
||||
// Initiates a new multipart request
|
||||
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
|
||||
if err != nil {
|
||||
return UploadInfo{}, err
|
||||
}
|
||||
delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
|
||||
|
||||
// Aborts the multipart upload if the function returns
|
||||
// any error, since we do not resume we should purge
|
||||
// the parts which have been uploaded to relinquish
|
||||
// storage space.
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
|
||||
}
|
||||
}()
|
||||
|
||||
// Create checksums
|
||||
// CRC32C is ~50% faster on AMD64 @ 30GB/s
|
||||
var crcBytes []byte
|
||||
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||
md5Hash := c.md5Hasher()
|
||||
defer md5Hash.Close()
|
||||
|
||||
// Total data read and written to server. should be equal to 'size' at the end of the call.
|
||||
var totalUploadedSize int64
|
||||
|
||||
// Initialize parts uploaded map.
|
||||
partsInfo := make(map[int]ObjectPart)
|
||||
|
||||
// Create a buffer.
|
||||
nBuffers := int64(opts.NumThreads)
|
||||
bufs := make(chan []byte, nBuffers)
|
||||
all := make([]byte, nBuffers*partSize)
|
||||
for i := int64(0); i < nBuffers; i++ {
|
||||
bufs <- all[i*partSize : i*partSize+partSize]
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
errCh := make(chan error, opts.NumThreads)
|
||||
|
||||
reader = newHook(reader, opts.Progress)
|
||||
|
||||
// Part number always starts with '1'.
|
||||
var partNumber int
|
||||
for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
|
||||
// Proceed to upload the part.
|
||||
var buf []byte
|
||||
select {
|
||||
case buf = <-bufs:
|
||||
case err = <-errCh:
|
||||
cancel()
|
||||
wg.Wait()
|
||||
return UploadInfo{}, err
|
||||
}
|
||||
|
||||
if int64(len(buf)) != partSize {
|
||||
return UploadInfo{}, fmt.Errorf("read buffer < %d than expected partSize: %d", len(buf), partSize)
|
||||
}
|
||||
|
||||
length, rerr := readFull(reader, buf)
|
||||
if rerr == io.EOF && partNumber > 1 {
|
||||
// Done
|
||||
break
|
||||
}
|
||||
|
||||
if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
|
||||
cancel()
|
||||
wg.Wait()
|
||||
return UploadInfo{}, rerr
|
||||
}
|
||||
|
||||
// Calculate md5sum.
|
||||
customHeader := make(http.Header)
|
||||
if !opts.SendContentMd5 {
|
||||
// Add CRC32C instead.
|
||||
crc.Reset()
|
||||
crc.Write(buf[:length])
|
||||
cSum := crc.Sum(nil)
|
||||
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
|
||||
crcBytes = append(crcBytes, cSum...)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(partNumber int) {
|
||||
// Avoid declaring variables in the for loop
|
||||
var md5Base64 string
|
||||
|
||||
if opts.SendContentMd5 {
|
||||
md5Hash.Reset()
|
||||
md5Hash.Write(buf[:length])
|
||||
md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil))
|
||||
}
|
||||
|
||||
defer wg.Done()
|
||||
p := uploadPartParams{
|
||||
bucketName: bucketName,
|
||||
objectName: objectName,
|
||||
uploadID: uploadID,
|
||||
reader: bytes.NewReader(buf[:length]),
|
||||
partNumber: partNumber,
|
||||
md5Base64: md5Base64,
|
||||
size: int64(length),
|
||||
sse: opts.ServerSideEncryption,
|
||||
streamSha256: !opts.DisableContentSha256,
|
||||
customHeader: customHeader,
|
||||
}
|
||||
objPart, uerr := c.uploadPart(ctx, p)
|
||||
if uerr != nil {
|
||||
errCh <- uerr
|
||||
}
|
||||
|
||||
// Save successfully uploaded part metadata.
|
||||
mu.Lock()
|
||||
partsInfo[partNumber] = objPart
|
||||
mu.Unlock()
|
||||
|
||||
// Send buffer back so it can be reused.
|
||||
bufs <- buf
|
||||
}(partNumber)
|
||||
|
||||
// Save successfully uploaded size.
|
||||
totalUploadedSize += int64(length)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Collect any error
|
||||
select {
|
||||
case err = <-errCh:
|
||||
return UploadInfo{}, err
|
||||
default:
|
||||
}
|
||||
|
||||
// Complete multipart upload.
|
||||
var complMultipartUpload completeMultipartUpload
|
||||
|
||||
// Loop over total uploaded parts to save them in
|
||||
// Parts array before completing the multipart request.
|
||||
for i := 1; i < partNumber; i++ {
|
||||
part, ok := partsInfo[i]
|
||||
if !ok {
|
||||
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
|
||||
}
|
||||
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
|
||||
ETag: part.ETag,
|
||||
PartNumber: part.PartNumber,
|
||||
ChecksumCRC32: part.ChecksumCRC32,
|
||||
ChecksumCRC32C: part.ChecksumCRC32C,
|
||||
ChecksumSHA1: part.ChecksumSHA1,
|
||||
ChecksumSHA256: part.ChecksumSHA256,
|
||||
})
|
||||
}
|
||||
|
||||
// Sort all completed parts.
|
||||
sort.Sort(completedParts(complMultipartUpload.Parts))
|
||||
|
||||
opts = PutObjectOptions{}
|
||||
if len(crcBytes) > 0 {
|
||||
// Add hash of hashes.
|
||||
|
Reference in New Issue
Block a user