09/02/2025 - AWS, DOCKER, ELASTICSEARCH, GO
This is similar to the previous example but we will be using client API v4. We are going to show how OpenSearch can be used in Golang to bulk upsert
documents, finding and listing documents.
Note: I left the example a bit rough because I want you to focus on the actual OpenSearch client and storage implementations which is the main point here. I leave it up to you to tidy up the whole thing.
├── docker-compose.yaml
├── main.go
├── pkg
│ ├── xerror
│ │ └── error.go
│ └── xopensearch
│ └── opensearch.go
└── src
├── api
│ └── comment.go
├── model
│ ├── api
│ │ └── comment.go
│ └── storage
│ └── document
│ ├── comment.go
│ └── response.go
└── storage
└── opensearch
├── decode.go
├── opensearch.go
├── page.go
├── query.go
└── response.go
services:
opensearch-node:
image: opensearchproject/opensearch:2.13.0
container_name: opensearch-node
environment:
- discovery.type=single-node
- OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=5tr0ng_Pa55w0rd
ports:
- 9200:9200 # Rest HTTP
- 9600:9600 # Dashboard HTTP
opensearch-dashboard:
image: opensearchproject/opensearch-dashboards:2.13.0
container_name: opensearch-dashboard
environment:
OPENSEARCH_HOSTS: '["https://opensearch-node:9200"]'
ports:
- 5601:5601
package xerror
import "errors"
var (
ErrNotFound = errors.New("not found")
)
package xopensearch
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"strings"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/opensearch-project/opensearch-go/v4"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/opensearch-project/opensearch-go/v4/signer/awsv2"
)
// Config is used to configure OpenSearch client.
type Config struct {
// Insecure is used when working in local environment.
Insecure bool
// Username for HTTP Basic Authentication.
Username string
// Password for HTTP Basic Authentication.
Password string
// A list of nodes to connect to.
Addresses []string
// AWSSigner indicates that the requests must be signed with AWS signer.
// This is used for deployment environments only.
AWSSigner bool
// AWSConfig is used to sign requests only if `AWSSigner` is `true`.
AWSConfig aws.Config
}
type OpenSearch struct {
client *opensearch.Client
}
// New returns a new OpenSearch type.
func New(config Config) (OpenSearch, error) {
cfg := opensearch.Config{
Addresses: config.Addresses,
}
if config.Username != "" {
cfg.Username = config.Username
}
if config.Password != "" {
cfg.Password = config.Password
}
if config.Insecure {
cfg.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: config.Insecure,
},
}
}
if config.AWSSigner {
sig, err := awsv2.NewSignerWithService(config.AWSConfig, "es")
if err != nil {
return OpenSearch{}, fmt.Errorf("unable to create signer: %w", err)
}
cfg.Signer = sig
}
cli, err := opensearchapi.NewClient(opensearchapi.Config{Client: cfg})
if err != nil {
return OpenSearch{}, fmt.Errorf("unable to create client: %w", err)
}
return OpenSearch{
client: cli.Client,
}, nil
}
// Client return OpenSearch client.
func (o OpenSearch) Client() *opensearch.Client {
return o.client
}
// CreateIndex creates a concrete index with optional body parameter. If it
// already exists, nil is returned.
func (o OpenSearch) CreateIndex(ctx context.Context, index, body string) error {
exits, err := o.client.Do(ctx, opensearchapi.IndicesExistsReq{
Indices: []string{index},
}, nil)
if err != nil {
return fmt.Errorf("index exists: %w", err)
}
defer exits.Body.Close()
if exits.StatusCode == http.StatusOK {
return nil
}
if exits.StatusCode != http.StatusNotFound {
return fmt.Errorf("unexpected response: %s", exits.String())
}
create, err := o.client.Do(ctx, opensearchapi.IndicesCreateReq{
Index: index,
Body: strings.NewReader(body),
}, nil)
if err != nil {
return fmt.Errorf("index create: %w", err)
}
defer create.Body.Close()
if create.IsError() {
return fmt.Errorf("invalid response: %s", create.String())
}
return nil
}
// CreateAlias creates an alias that points to an existing concrete index. If it
// already exists, nil is returned.
func (o OpenSearch) CreateAlias(ctx context.Context, index, alias string) error {
exits, err := o.client.Do(ctx, opensearchapi.AliasExistsReq{
Indices: []string{alias},
}, nil)
if err != nil {
return fmt.Errorf("alias exists: %w", err)
}
defer exits.Body.Close()
if exits.StatusCode == http.StatusOK {
return nil
}
if exits.StatusCode != http.StatusNotFound {
return fmt.Errorf("unexpected response: %s", exits.String())
}
create, err := o.client.Do(ctx, opensearchapi.AliasPutReq{
Indices: []string{index},
Alias: alias,
}, nil)
if err != nil {
return fmt.Errorf("alias create: %w", err)
}
defer create.Body.Close()
if create.IsError() {
return fmt.Errorf("invalid response: %s", create.String())
}
return nil
}
package main
import (
"context"
"log/slog"
"net/http"
"opensearch/pkg/xopensearch"
"opensearch/src/api"
"opensearch/src/storage/opensearch"
)
func main() {
ctx := context.Background()
ops, err := xopensearch.New(xopensearch.Config{
Insecure: true,
Username: "admin",
Password: "5tr0ng_Pa55w0rd",
Addresses: []string{"https://localhost:9200"},
})
if err != nil {
slog.ErrorContext(ctx, "Create OpenSearch client", "error", err)
return
}
var (
indexName = "blog"
indexBody = `{
"mappings": {
"properties": {
"type": {
"type": "keyword"
},
"id": {
"type": "keyword"
},
"user_id": {
"type": "keyword"
},
"post_id": {
"type": "keyword"
},
"text": {
"type": "text"
},
"date": {
"type": "date"
}
}
},
"aliases": {
"alias_20250209_1244": {}
}
}`
)
if err := ops.CreateIndex(ctx, indexName, indexBody); err != nil {
slog.ErrorContext(ctx, "Create OpenSearch index", "error", err)
return
}
storage := opensearch.OpenSearch{
Client: ops.Client(),
Index: indexName,
}
comment := api.Comment{
Storage: storage,
}
router := http.NewServeMux()
router.HandleFunc("PUT /api/v1/comments", comment.Create)
router.HandleFunc("GET /api/v1/comments/{id}", comment.Find)
router.HandleFunc("POST /api/v1/comments", comment.List)
http.ListenAndServe(":8080", router)
}
package api
import (
"context"
"encoding/json"
"errors"
"log/slog"
"net/http"
"time"
"opensearch/pkg/xerror"
"opensearch/src/model/api"
"opensearch/src/model/storage/document"
"github.com/google/uuid"
)
type storer interface {
UpsertComments(ctx context.Context, comments []*document.Comment) (*document.BulkResponse, error)
FindComment(ctx context.Context, id string) (document.Comment, error)
ListComments(ctx context.Context, param document.ListParam) ([]*document.Comment, int, error)
}
type Comment struct {
Storage storer
}
func (c Comment) Create(w http.ResponseWriter, r *http.Request) {
var req []api.CreateCommentRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
slog.ErrorContext(r.Context(), "Unable to decode request", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
comments := make([]*document.Comment, 0, len(req))
for _, comment := range req {
comments = append(comments, &document.Comment{
Type: "comment",
ID: uuid.New().String(),
PostID: comment.PostID,
UserID: comment.UserID,
Text: comment.Text,
Date: time.Now().UTC(),
})
}
res, err := c.Storage.UpsertComments(r.Context(), comments)
if err != nil {
slog.ErrorContext(r.Context(), "Unable to upsert comments", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if res.AnyError {
for _, operation := range res.Operations {
if operation.Update.Status > 399 {
slog.WarnContext(r.Context(), "Unable to upsert comment",
"data.id", operation.Update.ID,
"data.reason", operation.Update.Error.Reason,
)
}
}
}
w.WriteHeader(http.StatusOK)
}
func (c Comment) Find(w http.ResponseWriter, r *http.Request) {
res, err := c.Storage.FindComment(r.Context(), r.PathValue("id"))
if err != nil {
if errors.Is(err, xerror.ErrNotFound) {
w.WriteHeader(http.StatusNotFound)
return
}
slog.ErrorContext(r.Context(), "Unable to find comment", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
doc := api.FindCommentResponse{
Comment: api.Comment{
ID: res.ID,
UserID: res.UserID,
PostID: res.PostID,
Text: res.Text,
Date: res.Date,
},
}
dat, err := json.Marshal(doc)
if err != nil {
slog.ErrorContext(r.Context(), "Unable to marshal response", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Add("Content-type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(dat)
}
func (c Comment) List(w http.ResponseWriter, r *http.Request) {
var req api.ListCommentsRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
slog.ErrorContext(r.Context(), "Unable to decode request", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
res, tot, err := c.Storage.ListComments(r.Context(), document.ListParam{
Page: req.Page,
Size: req.Size,
UserID: req.UserID,
PostID: req.PostID,
})
if err != nil {
slog.ErrorContext(r.Context(), "Unable to lis comments", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
docs := make([]*api.Comment, 0, len(res))
for _, comment := range res {
docs = append(docs, &api.Comment{
ID: comment.ID,
UserID: comment.UserID,
PostID: comment.PostID,
Text: comment.Text,
Date: comment.Date,
})
}
result := api.ListCommentsResponse{
Comments: docs,
Total: tot,
}
dat, err := json.Marshal(result)
if err != nil {
slog.ErrorContext(r.Context(), "Unable to marshal response", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Add("Content-type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(dat)
}
package api
import "time"
type Comment struct {
ID string `json:"id"`
UserID string `json:"user_id"`
PostID string `json:"post_id"`
Text string `json:"text"`
Date time.Time `json:"date"`
}
type CreateCommentRequest struct {
UserID string `json:"user_id"`
PostID string `json:"post_id"`
Text string `json:"text"`
}
type FindCommentResponse struct {
Comment
}
type ListCommentsRequest struct {
Page *int `json:"page"`
Size *int `json:"size"`
UserID *string `json:"user_id"`
PostID *string `json:"post_id"`
}
type ListCommentsResponse struct {
Comments []*Comment `json:"comments"`
Total int `json:"total"`
}
package document
import (
"time"
)
type Comment struct {
Type string `json:"type"`
ID string `json:"id"`
UserID string `json:"user_id"`
PostID string `json:"post_id"`
Text string `json:"text"`
Date time.Time `json:"date"`
}
type ListParam struct {
Page *int
Size *int
UserID *string
PostID *string
}
package document
type BulkResponse struct {
AnyError bool `json:"errors"`
Operations []*BulkResponseOperation `json:"items"`
}
type BulkResponseOperation struct {
Update *BulkResponseItem `json:"update,omitempty"`
}
type BulkResponseItem struct {
ID string `json:"_id"`
Status int `json:"status"`
Error *BulkResponseError `json:"error,omitempty"`
}
type BulkResponseError struct {
Reason string `json:"reason"`
}
package opensearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"opensearch/pkg/xerror"
"opensearch/src/model/storage/document"
"github.com/opensearch-project/opensearch-go/v4"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
)
type OpenSearch struct {
Client *opensearch.Client
Index string
}
func (o OpenSearch) UpsertComments(ctx context.Context, comments []*document.Comment) (*document.BulkResponse, error) {
body := &bytes.Buffer{}
for _, comment := range comments {
doc, err := json.Marshal(map[string]any{"doc": comment, "doc_as_upsert": true})
if err != nil {
return nil, fmt.Errorf("marshal: %s: %w", comment.ID, err)
}
body.WriteString(`{"update":{"_id":"` + comment.ID + `"}}`)
body.WriteString("\n")
body.Write(doc)
body.WriteString("\n")
}
req := opensearchapi.BulkReq{
Index: o.Index,
Body: body,
Params: opensearchapi.BulkParams{
Refresh: "true",
},
}
res, err := o.Client.Do(ctx, req, nil)
if err != nil {
return nil, fmt.Errorf("do: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("unexpected response: %s", res.String())
}
result := &document.BulkResponse{}
if err := json.NewDecoder(res.Body).Decode(result); err != nil {
return nil, fmt.Errorf("response decode: %w", err)
}
return result, nil
}
func (o OpenSearch) FindComment(ctx context.Context, id string) (document.Comment, error) {
req := opensearchapi.DocumentGetReq{
Index: o.Index,
DocumentID: id,
}
res, err := o.Client.Do(ctx, req, nil)
if err != nil {
return document.Comment{}, fmt.Errorf("do: %w", err)
}
defer res.Body.Close()
if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return document.Comment{}, xerror.ErrNotFound
}
return document.Comment{}, fmt.Errorf("unexpected response: %s", res.String())
}
var (
src document.Comment
doc = getResponse{Source: &src}
)
if err := json.NewDecoder(res.Body).Decode(&doc); err != nil {
return document.Comment{}, fmt.Errorf("response decode: %w", err)
}
if !doc.Found {
return document.Comment{}, nil
}
return src, nil
}
func (o OpenSearch) ListComments(ctx context.Context, param document.ListParam) ([]*document.Comment, int, error) {
req := opensearchapi.SearchReq{
Indices: []string{o.Index},
Body: listCommentsQuery(param),
}
res, err := o.Client.Do(ctx, req, nil)
if err != nil {
return nil, 0, err
}
defer res.Body.Close()
var (
comments []*document.Comment
total int
)
if err := decodeMany(res, &comments, &total); err != nil {
return nil, 0, err
}
return comments, total, nil
}
package opensearch
import "encoding/json"
type getResponse struct {
Found bool `json:"found"`
Source any `json:"_source"`
}
type searchResponse struct {
Hits struct {
Total struct {
Value int `json:"value"`
} `json:"total"`
Hits []struct {
ID string `json:"_id"`
Source json.RawMessage `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
package opensearch
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"github.com/opensearch-project/opensearch-go/v4"
)
func decodeMany(res *opensearch.Response, src any, total *int) error {
if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return fmt.Errorf("no record found: %d", res.StatusCode)
}
return fmt.Errorf("unexpected response: %s", res.String())
}
if res.Body == nil {
return errors.New("nil response body")
}
var result searchResponse
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return fmt.Errorf("decode record: %w", err)
}
if result.Hits.Total.Value == 0 {
return nil
}
documents := make([]json.RawMessage, 0, len(result.Hits.Hits))
for _, hit := range result.Hits.Hits {
documents = append(documents, hit.Source)
}
data, err := json.Marshal(documents)
if err != nil {
return fmt.Errorf("marshal records: %w", err)
}
if err := json.Unmarshal(data, src); err != nil {
return fmt.Errorf("decode record sources: %w", err)
}
*total = result.Hits.Total.Value
return nil
}
package opensearch
// page returns page size and page from values.
func page(page, size *int) (int, int) {
pgno := 0
if page != nil && *page != 0 {
pgno = *page
}
pgsz := 100
if size != nil && *size != 0 && *size <= 100 {
pgsz = *size
}
pgfr := 0
if pgno != 0 {
pgfr = (pgsz * pgno) - pgsz
}
return pgsz, pgfr
}
package opensearch
import (
"context"
"fmt"
"io"
"log/slog"
"strings"
"opensearch/src/model/storage/document"
)
func listCommentsQuery(param document.ListParam) io.Reader {
size, from := page(param.Page, param.Size)
must := `{"term":{"type":"comment"}}`
if param.UserID != nil && *param.UserID != "" {
must += `,{"term":{"user_id":"` + *param.UserID + `"}}`
}
if param.PostID != nil && *param.PostID != "" {
must += `,{"term":{"post_id":"` + *param.PostID + `"}}`
}
query := fmt.Sprintf(`{"query":{"bool":{"must":[%s]}},"size":%d,"from":%d}`, must, size, from)
slog.InfoContext(context.Background(), "List comments", "data.query", query)
return strings.NewReader(query)
}
curl --location --request PUT 'http://localhost:8080/api/v1/comments' \
--header 'Content-Type: application/json' \
--data '[
{
"user_id": "e4d1ca02-dff1-47d7-b193-0486e40c2733",
"post_id": "ef2f15e4-690d-4eea-aeb3-ff45dd70f460",
"text": "Perferendis assumenda dolor officiis similique ex doloribus et qui doloribus."
},
{
"user_id": "7be2e4e3-0421-4d16-92c2-7ad481156bfa",
"post_id": "d877f91d-c0f7-4c61-bf40-8399b350214d",
"text": "Perspiciatis architecto iste dolores assumenda quam aperiam officiis officiis tenetur."
},
{
"user_id": "0440e7cb-ede0-48df-94ee-ba5c810c9696",
"post_id": "7b856264-514a-49e3-b2bb-1ca63929b8a9",
"text": "Tenetur explicabo qui deleniti ab repudiandae quasi tempore explicabo."
}
]'
curl --location 'http://localhost:8080/api/v1/comments/424adee8-dc89-4126-859d-b6d8b067ce4f'
curl --location 'http://localhost:8080/api/v1/comments' \
--header 'Content-Type: application/json' \
--data '{
"page": 1,
"size": 3,
"user_id": "27a7c30b-cea0-4d49-8ab3-8712823ec9e8",
"post_id": "41571b9f-68bc-4c41-b63b-924877906eb5"
}'