In this example we are going to use AWS multipart feature to upload a file using Golang. File parts will be handled concurrently. Here's some details and suggestion.


If anything goes wrong within the process and goroutines return an error, all previous uploaded parts will be aborted. AWS logs might sometimes report 500 or 200 response codes for some part uploads in case the operation errors. This is normal because when an error occurs in the flow (goroutine), context will be cancelled. However, some already running goroutines will have a context that was not cancelled yet hence it will report 200 in logs. Those who inherited a cancelled context will obviously report 500 instead. Both are expected behaviour. Either way at the end, the whole flow will be successfully aborted so you shouldn't worry. On the other hand, if everything went fine, all goroutines will be waited before completing the multipart upload.


If retry fails after reading the buffer then it means something has gone wrong. In such case, it won't retry again because the buffer is already read. This is a kind of short circuit to avoid attempting to upload an empty buffer. Instead it terminates the process.


All the defaults aim to target resource usage efficiency rather than the speed. You don't want to take down the application by spinning up so many goroutines for so many active connections each having MBs of data in memory. If you have a busy application, I would suggest leaving max workers at 2 to prevent exhausting memory otherwise application might quickly go down. Given multipart uploads are often non-blocking operations, there is no need to rush unless there is an explicit reason. Instead, memory efficiency should be the priority. Often there will be concurrent users hence sharing memory is a good idea.


Heads up


"Contexts should not be stored inside a struct type" (ref). I assume we all know this advice. However I am storing a context inside a struct type. Why? It is because this advice was relaxed by Go team for a sensible and realistic reason. In our example it makes absolute sense to store context in struct otherwise you wouldn't be cleanly able to manage goroutines and cancellations in isolation. Now the advice is more like "Contexts should not be stored inside a struct type UNLESS it makes sense". Here's Russ Cox's comment and GitHub comments/PRs if you are interested in the rationale (1, 2, 3).


Basic usage


I have a main file below but this is a quick usage example.


ctx := context.Background()

// AWS config
cfg, err := aws.NewConfig(ctx)
if err != nil {
log.Fatalln(err)
}

// AWS S3 client
s3 := aws.NewS3(cfg, "root-bucket")

// Multipart uploader instance
up, err := s3.CreateMultipartUpload(ctx, aws.MultipartUploadConfig{
Key: uuid.NewString(),
Filename: "uuid-list.txt",
Mime: "text/plain",
})
if err != nil {
return err
}
defer up.Abort()

// Version 1: Write single part (line)
err := up.Write(scanner.Text() + "\n")
if err != nil {
return err
}

// Version 2: Write many parts (lines)
for _, row := range some_data_slice {
err := up.Write(row)
if err != nil {
return err
}
}

// Write (flush) any remaining parts
tot, err := up.Flush(ctx)
if err != nil {
return err
}

log.Println("uploaded parts:", tot)

Files


config.go


As you can see this is using Localstack for testing purposes.


package aws

import (
"context"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
)

func NewConfig(ctx context.Context) (aws.Config, error) {
return config.LoadDefaultConfig(ctx,
config.WithRegion("eu-west-1"),
config.WithSharedConfigProfile("localstack"),
config.WithEndpointResolver(aws.EndpointResolverFunc(func(_, _ string) (aws.Endpoint, error) {
return aws.Endpoint{
URL: "http://localhost:4566",
HostnameImmutable: true,
}, nil
})),
)
}

s3.go


package aws

import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"golang.org/x/sync/errgroup"
)

type S3 struct {
client *s3.Client
signer *s3.PresignClient
bucket string
}

func NewS3(config aws.Config, bucket string) S3 {
client := s3.NewFromConfig(config)

return S3{
client: client,
signer: s3.NewPresignClient(client),
bucket: bucket,
}
}

func (s S3) CreateMultipartUpload(ctx context.Context, cfg MultipartUploadConfig) (*MultipartUpload, error) {
if cfg.Key == "" || cfg.Filename == "" {
return nil, errors.New("required field: Key")
}

if cfg.Filename == "" {
return nil, errors.New("required field: Filename")
}

if cfg.Mime == "" {
return nil, errors.New("required field: Mime")
}

if cfg.Size != 0 && cfg.Size < multipartUploadMinPartSize {
return nil, fmt.Errorf("invalid value: Size: minimum required value is %d", multipartUploadMinPartSize)
}

if cfg.Bucket == "" {
cfg.Bucket = s.bucket
}

if cfg.Workers == 0 {
cfg.Workers = 1
}

if cfg.Expiry == 0 {
cfg.Expiry = 172800
}
exp := time.Now().Add(time.Second * time.Duration(cfg.Expiry))

if cfg.Size == 0 {
cfg.Size = multipartUploadMinPartSize
}

inp := &s3.CreateMultipartUploadInput{
Bucket: &cfg.Bucket,
Key: &cfg.Key,
ContentType: &cfg.Mime,
Expires: &exp,
ContentDisposition: aws.String(fmt.Sprintf(`attachment; filename="%s"`, cfg.Filename)),
}

res, err := s.client.CreateMultipartUpload(ctx, inp)
if err != nil {
return nil, fmt.Errorf("create multipart upload: %w", err)
}

erg, erc := errgroup.WithContext(ctx)

return &MultipartUpload{
client: s.client,
config: cfg,
goroutineGroup: erg,
goroutineContext: erc,
buffer: &bytes.Buffer{},
workers: make(chan struct{}, cfg.Workers),
mux: &sync.Mutex{},
parts: make(map[int]*string),
cursor: 1,
id: *res.UploadId,
}, nil
}

multipart_upload.go


package aws

import (
"bytes"
"context"
"fmt"
"sync"

"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"golang.org/x/sync/errgroup"

v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
)

const multipartUploadMinPartSize = 5 * 1024 * 1024

// MultipartUploadConfig defines multipart upload specifications.
// Example:
//
// aws.MultipartUploadConfig{
// Key: "some-unique-key",
// Filename: "some-filename",
// Mime: "some-valid-mime-type",
// }
//
// aws.MultipartUploadConfig{
// Key: "some-unique-key",
// Filename: "some-filename",
// Mime: "some-valid-mime-type",
// Bucket: "some-bucket",
// Workers: 2,
// Expiry: 86400, // 1 day
// Retry: 3,
// Size: 10 * 1024 * 1024, // 10MB
// }
type MultipartUploadConfig struct {
// Object key for which the multipart upload was initiated. Required.
Key string

// Used to name file for downloading. Required.
Filename string

// The MIME type representing the format of the object data. Required.
Mime string

// The name of the bucket where the object is stored. If not set, it falls
// back to S3 level root bucket.
Bucket string

// Specifis the maximum amount of goroutines to run at a time to handle part
// uploads. If not set, it falls back to 1. The higher it goes, the more
// stress on memory. An ideal value would be no more than 2 for better
// memory efficiency.
Workers int

// Specifies the maximum time (in seconds) a failed/dangling multipart
// upload can live in a S3 bucket before being automatically removed using
// lifecycle management service even if you forget to call abort. Parts of
// an incomplete multipart upload are invisible however incur fees for the
// allocations. If not set, it falls back to 2 days.
Expiry int

// Forces retrying a failed part upload in case of an error. If not set, no
// retry.
Retry int

// Specifies the minimum buffer size in bytes for each upload part. It is
// defaulted to 5 MB but the last part has no limit.
// Ref: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
Size int
}

type MultipartUpload struct {
client *s3.Client
config MultipartUploadConfig
goroutineGroup *errgroup.Group
goroutineContext context.Context
buffer *bytes.Buffer
workers chan struct{}
mux *sync.Mutex
parts map[int]*string
cursor int
id string
abort bool
}

// Write writes a new data to a buffer for the part upload.
func (m *MultipartUpload) Write(row string) error {
if _, err := m.buffer.WriteString(row); err != nil {
m.mux.Lock()
m.abort = true
m.mux.Unlock()

return fmt.Errorf("write string: %w", err)
}

if m.buffer.Len() < m.config.Size {
return nil
}

m.workers <- struct{}{}

buffer := &bytes.Buffer{}
buffer.Write(m.buffer.Bytes())
part := m.cursor

m.goroutineGroup.Go(func() error {
err := m.upload(buffer, part)
if err != nil {
m.mux.Lock()
m.abort = true
m.mux.Unlock()
}

<-m.workers

return err
})

m.buffer.Reset()
m.cursor++

return nil
}

// Flush uploads last remaining part, waits for all ongoing uploads to end
// before completing the whole upload operation. In case an error occurs,
// `abort` method is called.
func (m *MultipartUpload) Flush(ctx context.Context) (int, error) {
m.goroutineGroup.Go(func() error {
return m.upload(m.buffer, m.cursor)
})

if err := m.goroutineGroup.Wait(); err != nil {
m.mux.Lock()
m.abort = true
m.mux.Unlock()

return 0, fmt.Errorf("goroutine group wait: %w", err)
}

total := len(m.parts)
parts := make([]types.CompletedPart, total)

for i, tag := range m.parts {
parts[i-1] = types.CompletedPart{
PartNumber: int32(i),
ETag: tag,
}
}

inp := &s3.CompleteMultipartUploadInput{
Bucket: &m.config.Bucket,
Key: &m.config.Key,
UploadId: &m.id,
MultipartUpload: &types.CompletedMultipartUpload{Parts: parts},
}

if _, err := m.client.CompleteMultipartUpload(ctx, inp); err != nil {
m.mux.Lock()
m.abort = true
m.mux.Unlock()

return 0, fmt.Errorf("complete multipart upload: %w", err)
}

return total, nil
}

// upload tries to upload a new part and saves its ETag. In case all tries are
// exhausted, last error is returned.
func (m *MultipartUpload) upload(buffer *bytes.Buffer, part int) error {
if buffer.Len() == 0 {
return nil
}

var (
try int
res *s3.UploadPartOutput
err error
)

inp := &s3.UploadPartInput{
Bucket: &m.config.Bucket,
Key: &m.config.Key,
UploadId: &m.id,
PartNumber: int32(part),
Body: buffer,
}

opt := s3.WithAPIOptions(
v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware,
)

for try <= m.config.Retry {
select {
case <-m.goroutineContext.Done():
return m.goroutineContext.Err()
default:
}

res, err = m.client.UploadPart(m.goroutineContext, inp, opt)
if err == nil {
m.mux.Lock()
m.parts[part] = res.ETag
m.mux.Unlock()

return nil
}

if buffer.Len() == 0 {
return fmt.Errorf("upload part: exiting retry due to an empty buffer: %w", err)
}

try++
}

return fmt.Errorf("upload part: %w", err)
}

// Abort is automatically called after whole upload operation to determine if
// the operation should be aborted or not.
func (m *MultipartUpload) Abort() error {
if !m.abort {
return nil
}

inp := &s3.AbortMultipartUploadInput{
Bucket: &m.config.Bucket,
Key: &m.config.Key,
UploadId: &m.id,
}

if _, err := m.client.AbortMultipartUpload(context.Background(), inp); err != nil {
return fmt.Errorf("abort multipart upload: %w", err)
}

return nil
}

main.go


package main

import (
"aws/aws"
"bufio"
"context"
"log"
"os"

"github.com/google/uuid"
)

func main() {
ctx := context.Background()

// AWS config
cfg, err := aws.NewConfig(ctx)
if err != nil {
log.Fatalln(err)
}

// AWS S3 client
s3 := aws.NewS3(cfg, "root-bucket")

if err := upload(ctx, s3); err != nil {
log.Fatalln(err)
}
}

func upload(ctx context.Context, s3 aws.S3) error {
// Dummy data fixtures
file, err := os.Open("fixtures.txt")
if err != nil {
return err
}
defer file.Close()

// Multipart uploader instance
up, err := s3.CreateMultipartUpload(ctx, aws.MultipartUploadConfig{
Key: uuid.NewString(),
Filename: "uuid-list.txt",
Mime: "text/plain",
})
if err != nil {
return err
}
defer up.Abort()

// Read fixtures line by line and upload
scanner := bufio.NewScanner(file)
for scanner.Scan() {
err := up.Write(scanner.Text() + "\n")
if err != nil {
return err
}
}
if err := scanner.Err(); err != nil {
return err
}

// Upload (flush) any remaining parts
tot, err := up.Flush(ctx)
if err != nil {
return err
}

log.Println("uploaded parts:", tot)

return nil
}

Tests


2023-07-29T13:30:42.374  INFO --- [   asgi_gw_3]: AWS s3.CreateMultipartUpload => 200
2023-07-29T13:30:43.862 INFO --- [ asgi_gw_10]: AWS s3.UploadPart => 200
2023-07-29T13:30:44.326 INFO --- [ asgi_gw_6]: AWS s3.UploadPart => 200
2023-07-29T13:30:45.145 INFO --- [ asgi_gw_7]: AWS s3.UploadPart => 200
...
2023-07-29T13:31:09.269 INFO --- [ asgi_gw_1]: AWS s3.UploadPart => 200
2023-07-29T13:31:10.525 INFO --- [ asgi_gw_9]: AWS s3.UploadPart => 200
2023-07-29T13:31:10.537 INFO --- [ asgi_gw_8]: AWS s3.UploadPart => 200
2023-07-29T13:31:12.897 INFO --- [ asgi_gw_4]: AWS s3.CompleteMultipartUpload => 200

2023-07-29T13:33:04.837  INFO --- [   asgi_gw_4]: AWS s3.CreateMultipartUpload => 200
2023-07-29T13:33:05.339 INFO --- [ asgi_gw_7]: AWS s3.UploadPart => 200
2023-07-29T13:33:05.703 INFO --- [ asgi_gw_1]: AWS s3.UploadPart => 200
2023-07-29T13:33:06.129 INFO --- [ asgi_gw_3]: AWS s3.UploadPart => 200
...
2023-07-29T13:33:07.153 INFO --- [ asgi_gw_4]: AWS s3.UploadPart => 200
2023-07-29T13:33:07.504 INFO --- [ asgi_gw_7]: AWS s3.UploadPart => 200
2023-07-29T13:33:07.887 INFO --- [ asgi_gw_1]: AWS s3.UploadPart => 200
... something has gone wrong along the way and we have the error below hence abort here
2023-07-29T13:33:16.514 INFO --- [ asgi_gw_9]: AWS s3.AbortMultipartUpload => 204

2023/07/29 13:33:16 goroutine group wait: upload part: exiting retry due to an empty buffer: operation error S3: UploadPart, https response error StatusCode: 0, RequestID: , HostID: , canceled, context deadline exceeded