Bu örnekte, Golang kullanarak bir dosya yüklemek için AWS multipart özelliğini kullanacağız. Dosya bölümleri eşzamanlı olarak ele alınacaktır. İşte bazı detaylar ve öneriler.


İşlemde herhangi bir terslik olursa ve goroutinler bir hata döndürürse, önceden yüklenen tüm parçalar iptal edilecektir. AWS günlükleri, işlemin hata yapması durumunda bazı kısım yüklemeleri için 500 veya 200 yanıt kodu bildirebilir. Bu normaldir çünkü akışta (goroutine) bir hata oluştuğunda bağlam iptal edilecektir. Bununla birlikte, halihazırda çalışan bazı goroutinler, henüz iptal edilmemiş bir contexte sahip olacaktır, bu nedenle günlüklerde 200 rapor edecektir. İptal edilmiş bir contexti devralanlar, bunun yerine 500 rapor edecekler. Her ikisi de beklenen davranıştır. Her iki durumda da sonunda tüm akış başarıyla durdurulacak, bu yüzden endişelenmenize gerek yok. Öte yandan, her şey yolunda giderse, çok parçalı yükleme tamamlanmadan önce tüm goroutinler beklenecektir.


Arabellek okunduktan sonra yeniden deneme başarısız olursa, bir şeyler ters gitti demektir. Böyle bir durumda, arabellek zaten okunduğu için yeniden denenmez. Bu, boş bir arabellek yükleme girişiminden kaçınmak için bir tür kısa devredir. Bunun yerine süreci sonlandırır.


Tüm varsayılanlar, hız yerine kaynak kullanım verimliliğini hedeflemeyi amaçlar. Her biri bellekte MB'larca veriye sahip olan çok sayıda aktif bağlantı için çok sayıda goroutine oluşturarak uygulamayı çökertmek istemezsiniz. Meşgul bir uygulamanız varsa, belleğin tükenmesini önlemek için maksimum işleyici sayısını 2'de bırakmanızı öneririm, aksi takdirde uygulama hızla çökebilir. Çok parçalı karşıya yüklemelerin genellikle engellemeyen işlemler olduğu göz önüne alındığında, açık bir neden olmadıkça acele etmeye gerek yoktur. Bunun yerine, bellek verimliliği öncelik olmalıdır. Genellikle eşzamanlı kullanıcılar olacaktır, bu nedenle belleği paylaşmak iyi bir fikirdir.


Ön bilgilendirme


"Contextler bir yapı türü içinde saklanmamalıdır" (ref). Hepimizin bu tavsiyeyi bildiğini varsayıyorum. Ancak ben, bir yapı türü içinde bir context saklıyorum. Neden? Çünkü bu tavsiye Go ekibi tarafından makul ve gerçekçi bir nedenle gevşetilmiştir. Örneğimizde, contexti struct içinde depolamak kesinlikle mantıklıdır, aksi takdirde goroutinleri ve iptalleri tek başına temiz bir şekilde yönetemezsiniz. Bu tavsiyenin yeni hali "Contextler mantıklı olmadıkça bir yapı türü içinde saklanmamalıdır" şeklindedir. Gerekçeyle ilgileniyorsanız işte Russ Cox'un yorumu ve GitHub yorumları/PR'lar (1, 2, 3).


Temel kullanım


Aşağıda bir ana dosyam var ama bu hızlı bir kullanım örneği.


ctx := context.Background()

// AWS config
cfg, err := aws.NewConfig(ctx)
if err != nil {
log.Fatalln(err)
}

// AWS S3 client
s3 := aws.NewS3(cfg, "root-bucket")

// Multipart uploader instance
up, err := s3.CreateMultipartUpload(ctx, aws.MultipartUploadConfig{
Key: uuid.NewString(),
Filename: "uuid-list.txt",
Mime: "text/plain",
})
if err != nil {
return err
}
defer up.Abort()

// Version 1: Write single part (line)
err := up.Write(scanner.Text() + "\n")
if err != nil {
return err
}

// Version 2: Write many parts (lines)
for _, row := range some_data_slice {
err := up.Write(row)
if err != nil {
return err
}
}

// Write (flush) any remaining parts
tot, err := up.Flush(ctx)
if err != nil {
return err
}

log.Println("uploaded parts:", tot)

Dosyalar


config.go


Gördüğünüz gibi bu, test amacıyla Localstack kullanıyor.


package aws

import (
"context"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
)

func NewConfig(ctx context.Context) (aws.Config, error) {
return config.LoadDefaultConfig(ctx,
config.WithRegion("eu-west-1"),
config.WithSharedConfigProfile("localstack"),
config.WithEndpointResolver(aws.EndpointResolverFunc(func(_, _ string) (aws.Endpoint, error) {
return aws.Endpoint{
URL: "http://localhost:4566",
HostnameImmutable: true,
}, nil
})),
)
}

s3.go


package aws

import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"golang.org/x/sync/errgroup"
)

type S3 struct {
client *s3.Client
signer *s3.PresignClient
bucket string
}

func NewS3(config aws.Config, bucket string) S3 {
client := s3.NewFromConfig(config)

return S3{
client: client,
signer: s3.NewPresignClient(client),
bucket: bucket,
}
}

func (s S3) CreateMultipartUpload(ctx context.Context, cfg MultipartUploadConfig) (*MultipartUpload, error) {
if cfg.Key == "" || cfg.Filename == "" {
return nil, errors.New("required field: Key")
}

if cfg.Filename == "" {
return nil, errors.New("required field: Filename")
}

if cfg.Mime == "" {
return nil, errors.New("required field: Mime")
}

if cfg.Size != 0 && cfg.Size < multipartUploadMinPartSize {
return nil, fmt.Errorf("invalid value: Size: minimum required value is %d", multipartUploadMinPartSize)
}

if cfg.Bucket == "" {
cfg.Bucket = s.bucket
}

if cfg.Workers == 0 {
cfg.Workers = 1
}

if cfg.Expiry == 0 {
cfg.Expiry = 172800
}
exp := time.Now().Add(time.Second * time.Duration(cfg.Expiry))

if cfg.Size == 0 {
cfg.Size = multipartUploadMinPartSize
}

inp := &s3.CreateMultipartUploadInput{
Bucket: &cfg.Bucket,
Key: &cfg.Key,
ContentType: &cfg.Mime,
Expires: &exp,
ContentDisposition: aws.String(fmt.Sprintf(`attachment; filename="%s"`, cfg.Filename)),
}

res, err := s.client.CreateMultipartUpload(ctx, inp)
if err != nil {
return nil, fmt.Errorf("create multipart upload: %w", err)
}

erg, erc := errgroup.WithContext(ctx)

return &MultipartUpload{
client: s.client,
config: cfg,
goroutineGroup: erg,
goroutineContext: erc,
buffer: &bytes.Buffer{},
workers: make(chan struct{}, cfg.Workers),
mux: &sync.Mutex{},
parts: make(map[int]*string),
cursor: 1,
id: *res.UploadId,
}, nil
}

multipart_upload.go


package aws

import (
"bytes"
"context"
"fmt"
"sync"

"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"golang.org/x/sync/errgroup"

v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
)

const multipartUploadMinPartSize = 5 * 1024 * 1024

// MultipartUploadConfig defines multipart upload specifications.
// Example:
//
// aws.MultipartUploadConfig{
// Key: "some-unique-key",
// Filename: "some-filename",
// Mime: "some-valid-mime-type",
// }
//
// aws.MultipartUploadConfig{
// Key: "some-unique-key",
// Filename: "some-filename",
// Mime: "some-valid-mime-type",
// Bucket: "some-bucket",
// Workers: 2,
// Expiry: 86400, // 1 day
// Retry: 3,
// Size: 10 * 1024 * 1024, // 10MB
// }
type MultipartUploadConfig struct {
// Object key for which the multipart upload was initiated. Required.
Key string

// Used to name file for downloading. Required.
Filename string

// The MIME type representing the format of the object data. Required.
Mime string

// The name of the bucket where the object is stored. If not set, it falls
// back to S3 level root bucket.
Bucket string

// Specifis the maximum amount of goroutines to run at a time to handle part
// uploads. If not set, it falls back to 1. The higher it goes, the more
// stress on memory. An ideal value would be no more than 2 for better
// memory efficiency.
Workers int

// Specifies the maximum time (in seconds) a failed/dangling multipart
// upload can live in a S3 bucket before being automatically removed using
// lifecycle management service even if you forget to call abort. Parts of
// an incomplete multipart upload are invisible however incur fees for the
// allocations. If not set, it falls back to 2 days.
Expiry int

// Forces retrying a failed part upload in case of an error. If not set, no
// retry.
Retry int

// Specifies the minimum buffer size in bytes for each upload part. It is
// defaulted to 5 MB but the last part has no limit.
// Ref: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
Size int
}

type MultipartUpload struct {
client *s3.Client
config MultipartUploadConfig
goroutineGroup *errgroup.Group
goroutineContext context.Context
buffer *bytes.Buffer
workers chan struct{}
mux *sync.Mutex
parts map[int]*string
cursor int
id string
abort bool
}

// Write writes a new data to a buffer for the part upload.
func (m *MultipartUpload) Write(row string) error {
if _, err := m.buffer.WriteString(row); err != nil {
m.mux.Lock()
m.abort = true
m.mux.Unlock()

return fmt.Errorf("write string: %w", err)
}

if m.buffer.Len() < m.config.Size {
return nil
}

m.workers <- struct{}{}

buffer := &bytes.Buffer{}
buffer.Write(m.buffer.Bytes())
part := m.cursor

m.goroutineGroup.Go(func() error {
err := m.upload(buffer, part)
if err != nil {
m.mux.Lock()
m.abort = true
m.mux.Unlock()
}

<-m.workers

return err
})

m.buffer.Reset()
m.cursor++

return nil
}

// Flush uploads last remaining part, waits for all ongoing uploads to end
// before completing the whole upload operation. In case an error occurs,
// `abort` method is called.
func (m *MultipartUpload) Flush(ctx context.Context) (int, error) {
m.goroutineGroup.Go(func() error {
return m.upload(m.buffer, m.cursor)
})

if err := m.goroutineGroup.Wait(); err != nil {
m.mux.Lock()
m.abort = true
m.mux.Unlock()

return 0, fmt.Errorf("goroutine group wait: %w", err)
}

total := len(m.parts)
parts := make([]types.CompletedPart, total)

for i, tag := range m.parts {
parts[i-1] = types.CompletedPart{
PartNumber: int32(i),
ETag: tag,
}
}

inp := &s3.CompleteMultipartUploadInput{
Bucket: &m.config.Bucket,
Key: &m.config.Key,
UploadId: &m.id,
MultipartUpload: &types.CompletedMultipartUpload{Parts: parts},
}

if _, err := m.client.CompleteMultipartUpload(ctx, inp); err != nil {
m.mux.Lock()
m.abort = true
m.mux.Unlock()

return 0, fmt.Errorf("complete multipart upload: %w", err)
}

return total, nil
}

// upload tries to upload a new part and saves its ETag. In case all tries are
// exhausted, last error is returned.
func (m *MultipartUpload) upload(buffer *bytes.Buffer, part int) error {
if buffer.Len() == 0 {
return nil
}

var (
try int
res *s3.UploadPartOutput
err error
)

inp := &s3.UploadPartInput{
Bucket: &m.config.Bucket,
Key: &m.config.Key,
UploadId: &m.id,
PartNumber: int32(part),
Body: buffer,
}

opt := s3.WithAPIOptions(
v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware,
)

for try <= m.config.Retry {
select {
case <-m.goroutineContext.Done():
return m.goroutineContext.Err()
default:
}

res, err = m.client.UploadPart(m.goroutineContext, inp, opt)
if err == nil {
m.mux.Lock()
m.parts[part] = res.ETag
m.mux.Unlock()

return nil
}

if buffer.Len() == 0 {
return fmt.Errorf("upload part: exiting retry due to an empty buffer: %w", err)
}

try++
}

return fmt.Errorf("upload part: %w", err)
}

// Abort is automatically called after whole upload operation to determine if
// the operation should be aborted or not.
func (m *MultipartUpload) Abort() error {
if !m.abort {
return nil
}

inp := &s3.AbortMultipartUploadInput{
Bucket: &m.config.Bucket,
Key: &m.config.Key,
UploadId: &m.id,
}

if _, err := m.client.AbortMultipartUpload(context.Background(), inp); err != nil {
return fmt.Errorf("abort multipart upload: %w", err)
}

return nil
}

main.go


package main

import (
"aws/aws"
"bufio"
"context"
"log"
"os"

"github.com/google/uuid"
)

func main() {
ctx := context.Background()

// AWS config
cfg, err := aws.NewConfig(ctx)
if err != nil {
log.Fatalln(err)
}

// AWS S3 client
s3 := aws.NewS3(cfg, "root-bucket")

if err := upload(ctx, s3); err != nil {
log.Fatalln(err)
}
}

func upload(ctx context.Context, s3 aws.S3) error {
// Dummy data fixtures
file, err := os.Open("fixtures.txt")
if err != nil {
return err
}
defer file.Close()

// Multipart uploader instance
up, err := s3.CreateMultipartUpload(ctx, aws.MultipartUploadConfig{
Key: uuid.NewString(),
Filename: "uuid-list.txt",
Mime: "text/plain",
})
if err != nil {
return err
}
defer up.Abort()

// Read fixtures line by line and upload
scanner := bufio.NewScanner(file)
for scanner.Scan() {
err := up.Write(scanner.Text() + "\n")
if err != nil {
return err
}
}
if err := scanner.Err(); err != nil {
return err
}

// Upload (flush) any remaining parts
tot, err := up.Flush(ctx)
if err != nil {
return err
}

log.Println("uploaded parts:", tot)

return nil
}

Testler


2023-07-29T13:30:42.374  INFO --- [   asgi_gw_3]: AWS s3.CreateMultipartUpload => 200
2023-07-29T13:30:43.862 INFO --- [ asgi_gw_10]: AWS s3.UploadPart => 200
2023-07-29T13:30:44.326 INFO --- [ asgi_gw_6]: AWS s3.UploadPart => 200
2023-07-29T13:30:45.145 INFO --- [ asgi_gw_7]: AWS s3.UploadPart => 200
...
2023-07-29T13:31:09.269 INFO --- [ asgi_gw_1]: AWS s3.UploadPart => 200
2023-07-29T13:31:10.525 INFO --- [ asgi_gw_9]: AWS s3.UploadPart => 200
2023-07-29T13:31:10.537 INFO --- [ asgi_gw_8]: AWS s3.UploadPart => 200
2023-07-29T13:31:12.897 INFO --- [ asgi_gw_4]: AWS s3.CompleteMultipartUpload => 200

2023-07-29T13:33:04.837  INFO --- [   asgi_gw_4]: AWS s3.CreateMultipartUpload => 200
2023-07-29T13:33:05.339 INFO --- [ asgi_gw_7]: AWS s3.UploadPart => 200
2023-07-29T13:33:05.703 INFO --- [ asgi_gw_1]: AWS s3.UploadPart => 200
2023-07-29T13:33:06.129 INFO --- [ asgi_gw_3]: AWS s3.UploadPart => 200
...
2023-07-29T13:33:07.153 INFO --- [ asgi_gw_4]: AWS s3.UploadPart => 200
2023-07-29T13:33:07.504 INFO --- [ asgi_gw_7]: AWS s3.UploadPart => 200
2023-07-29T13:33:07.887 INFO --- [ asgi_gw_1]: AWS s3.UploadPart => 200
... something has gone wrong along the way and we have the error below hence abort here
2023-07-29T13:33:16.514 INFO --- [ asgi_gw_9]: AWS s3.AbortMultipartUpload => 204

2023/07/29 13:33:16 goroutine group wait: upload part: exiting retry due to an empty buffer: operation error S3: UploadPart, https response error StatusCode: 0, RequestID: , HostID: , canceled, context deadline exceeded