This is similar to the previous example but we will be using client API v4. We are going to show how OpenSearch can be used in Golang to bulk upsert documents, finding and listing documents.


Note: I left the example a bit rough because I want you to focus on the actual OpenSearch client and storage implementations which is the main point here. I leave it up to you to tidy up the whole thing.


Structure


├── docker-compose.yaml
├── main.go
├── pkg
│   ├── xerror
│   │   └── error.go
│   └── xopensearch
│   └── opensearch.go
└── src
├── api
│   └── comment.go
├── model
│   ├── api
│   │   └── comment.go
│   └── storage
│   └── document
│   ├── comment.go
│   └── response.go
└── storage
└── opensearch
├── decode.go
├── opensearch.go
├── page.go
├── query.go
└── response.go

Files


docker-compose.yaml


services:
opensearch-node:
image: opensearchproject/opensearch:2.13.0
container_name: opensearch-node
environment:
- discovery.type=single-node
- OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=5tr0ng_Pa55w0rd
ports:
- 9200:9200 # Rest HTTP
- 9600:9600 # Dashboard HTTP

opensearch-dashboard:
image: opensearchproject/opensearch-dashboards:2.13.0
container_name: opensearch-dashboard
environment:
OPENSEARCH_HOSTS: '["https://opensearch-node:9200"]'
ports:
- 5601:5601

error.go


package xerror

import "errors"

var (
ErrNotFound = errors.New("not found")
)

opensearch.go


package xopensearch

import (
"context"
"crypto/tls"
"fmt"
"net/http"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/opensearch-project/opensearch-go/v4"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/opensearch-project/opensearch-go/v4/signer/awsv2"
)

// Config is used to configure OpenSearch client.
type Config struct {
// Insecure is used when working in local environment.
Insecure bool

// Username for HTTP Basic Authentication.
Username string

// Password for HTTP Basic Authentication.
Password string

// A list of nodes to connect to.
Addresses []string

// AWSSigner indicates that the requests must be signed with AWS signer.
// This is used for deployment environments only.
AWSSigner bool

// AWSConfig is used to sign requests only if `AWSSigner` is `true`.
AWSConfig aws.Config
}

type OpenSearch struct {
client *opensearch.Client
}

// New returns a new OpenSearch type.
func New(config Config) (OpenSearch, error) {
cfg := opensearch.Config{
Addresses: config.Addresses,
}

if config.Username != "" {
cfg.Username = config.Username
}

if config.Password != "" {
cfg.Password = config.Password
}

if config.Insecure {
cfg.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: config.Insecure,
},
}
}

if config.AWSSigner {
sig, err := awsv2.NewSignerWithService(config.AWSConfig, "es")
if err != nil {
return OpenSearch{}, fmt.Errorf("unable to create signer: %w", err)
}

cfg.Signer = sig
}

cli, err := opensearchapi.NewClient(opensearchapi.Config{Client: cfg})
if err != nil {
return OpenSearch{}, fmt.Errorf("unable to create client: %w", err)
}

return OpenSearch{
client: cli.Client,
}, nil
}

// Client return OpenSearch client.
func (o OpenSearch) Client() *opensearch.Client {
return o.client
}

// CreateIndex creates a concrete index with optional body parameter. If it
// already exists, nil is returned.
func (o OpenSearch) CreateIndex(ctx context.Context, index, body string) error {
exits, err := o.client.Do(ctx, opensearchapi.IndicesExistsReq{
Indices: []string{index},
}, nil)
if err != nil {
return fmt.Errorf("index exists: %w", err)
}
defer exits.Body.Close()

if exits.StatusCode == http.StatusOK {
return nil
}

if exits.StatusCode != http.StatusNotFound {
return fmt.Errorf("unexpected response: %s", exits.String())
}

create, err := o.client.Do(ctx, opensearchapi.IndicesCreateReq{
Index: index,
Body: strings.NewReader(body),
}, nil)
if err != nil {
return fmt.Errorf("index create: %w", err)
}
defer create.Body.Close()

if create.IsError() {
return fmt.Errorf("invalid response: %s", create.String())
}

return nil
}

// CreateAlias creates an alias that points to an existing concrete index. If it
// already exists, nil is returned.
func (o OpenSearch) CreateAlias(ctx context.Context, index, alias string) error {
exits, err := o.client.Do(ctx, opensearchapi.AliasExistsReq{
Indices: []string{alias},
}, nil)
if err != nil {
return fmt.Errorf("alias exists: %w", err)
}
defer exits.Body.Close()

if exits.StatusCode == http.StatusOK {
return nil
}

if exits.StatusCode != http.StatusNotFound {
return fmt.Errorf("unexpected response: %s", exits.String())
}

create, err := o.client.Do(ctx, opensearchapi.AliasPutReq{
Indices: []string{index},
Alias: alias,
}, nil)
if err != nil {
return fmt.Errorf("alias create: %w", err)
}
defer create.Body.Close()

if create.IsError() {
return fmt.Errorf("invalid response: %s", create.String())
}

return nil
}

main.go


package main

import (
"context"
"log/slog"
"net/http"

"opensearch/pkg/xopensearch"
"opensearch/src/api"
"opensearch/src/storage/opensearch"
)

func main() {
ctx := context.Background()

ops, err := xopensearch.New(xopensearch.Config{
Insecure: true,
Username: "admin",
Password: "5tr0ng_Pa55w0rd",
Addresses: []string{"https://localhost:9200"},
})
if err != nil {
slog.ErrorContext(ctx, "Create OpenSearch client", "error", err)

return
}

var (
indexName = "blog"
indexBody = `{
"mappings": {
"properties": {
"type": {
"type": "keyword"
},
"id": {
"type": "keyword"
},
"user_id": {
"type": "keyword"
},
"post_id": {
"type": "keyword"
},
"text": {
"type": "text"
},
"date": {
"type": "date"
}
}
},
"aliases": {
"alias_20250209_1244": {}
}
}`
)

if err := ops.CreateIndex(ctx, indexName, indexBody); err != nil {
slog.ErrorContext(ctx, "Create OpenSearch index", "error", err)

return
}

storage := opensearch.OpenSearch{
Client: ops.Client(),
Index: indexName,
}

comment := api.Comment{
Storage: storage,
}

router := http.NewServeMux()
router.HandleFunc("PUT /api/v1/comments", comment.Create)
router.HandleFunc("GET /api/v1/comments/{id}", comment.Find)
router.HandleFunc("POST /api/v1/comments", comment.List)

http.ListenAndServe(":8080", router)
}

api/comment.go


package api

import (
"context"
"encoding/json"
"errors"
"log/slog"
"net/http"
"time"

"opensearch/pkg/xerror"
"opensearch/src/model/api"
"opensearch/src/model/storage/document"

"github.com/google/uuid"
)

type storer interface {
UpsertComments(ctx context.Context, comments []*document.Comment) (*document.BulkResponse, error)
FindComment(ctx context.Context, id string) (document.Comment, error)
ListComments(ctx context.Context, param document.ListParam) ([]*document.Comment, int, error)
}

type Comment struct {
Storage storer
}

func (c Comment) Create(w http.ResponseWriter, r *http.Request) {
var req []api.CreateCommentRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
slog.ErrorContext(r.Context(), "Unable to decode request", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

comments := make([]*document.Comment, 0, len(req))

for _, comment := range req {
comments = append(comments, &document.Comment{
Type: "comment",
ID: uuid.New().String(),
PostID: comment.PostID,
UserID: comment.UserID,
Text: comment.Text,
Date: time.Now().UTC(),
})
}

res, err := c.Storage.UpsertComments(r.Context(), comments)
if err != nil {
slog.ErrorContext(r.Context(), "Unable to upsert comments", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

if res.AnyError {
for _, operation := range res.Operations {
if operation.Update.Status > 399 {
slog.WarnContext(r.Context(), "Unable to upsert comment",
"data.id", operation.Update.ID,
"data.reason", operation.Update.Error.Reason,
)
}
}
}

w.WriteHeader(http.StatusOK)
}

func (c Comment) Find(w http.ResponseWriter, r *http.Request) {
res, err := c.Storage.FindComment(r.Context(), r.PathValue("id"))
if err != nil {
if errors.Is(err, xerror.ErrNotFound) {
w.WriteHeader(http.StatusNotFound)
return
}

slog.ErrorContext(r.Context(), "Unable to find comment", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

doc := api.FindCommentResponse{
Comment: api.Comment{
ID: res.ID,
UserID: res.UserID,
PostID: res.PostID,
Text: res.Text,
Date: res.Date,
},
}

dat, err := json.Marshal(doc)
if err != nil {
slog.ErrorContext(r.Context(), "Unable to marshal response", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

w.Header().Add("Content-type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(dat)
}

func (c Comment) List(w http.ResponseWriter, r *http.Request) {
var req api.ListCommentsRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
slog.ErrorContext(r.Context(), "Unable to decode request", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

res, tot, err := c.Storage.ListComments(r.Context(), document.ListParam{
Page: req.Page,
Size: req.Size,
UserID: req.UserID,
PostID: req.PostID,
})
if err != nil {
slog.ErrorContext(r.Context(), "Unable to lis comments", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

docs := make([]*api.Comment, 0, len(res))

for _, comment := range res {
docs = append(docs, &api.Comment{
ID: comment.ID,
UserID: comment.UserID,
PostID: comment.PostID,
Text: comment.Text,
Date: comment.Date,
})
}

result := api.ListCommentsResponse{
Comments: docs,
Total: tot,
}

dat, err := json.Marshal(result)
if err != nil {
slog.ErrorContext(r.Context(), "Unable to marshal response", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

w.Header().Add("Content-type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(dat)
}

api/model/comment.go


package api

import "time"

type Comment struct {
ID string `json:"id"`
UserID string `json:"user_id"`
PostID string `json:"post_id"`
Text string `json:"text"`
Date time.Time `json:"date"`
}

type CreateCommentRequest struct {
UserID string `json:"user_id"`
PostID string `json:"post_id"`
Text string `json:"text"`
}

type FindCommentResponse struct {
Comment
}

type ListCommentsRequest struct {
Page *int `json:"page"`
Size *int `json:"size"`
UserID *string `json:"user_id"`
PostID *string `json:"post_id"`
}

type ListCommentsResponse struct {
Comments []*Comment `json:"comments"`
Total int `json:"total"`
}

document/comment.go


package document

import (
"time"
)

type Comment struct {
Type string `json:"type"`
ID string `json:"id"`
UserID string `json:"user_id"`
PostID string `json:"post_id"`
Text string `json:"text"`
Date time.Time `json:"date"`
}

type ListParam struct {
Page *int
Size *int
UserID *string
PostID *string
}

document/response.go


package document

type BulkResponse struct {
AnyError bool `json:"errors"`
Operations []*BulkResponseOperation `json:"items"`
}

type BulkResponseOperation struct {
Update *BulkResponseItem `json:"update,omitempty"`
}

type BulkResponseItem struct {
ID string `json:"_id"`
Status int `json:"status"`
Error *BulkResponseError `json:"error,omitempty"`
}

type BulkResponseError struct {
Reason string `json:"reason"`
}

opensearch/opensearch.go


package opensearch

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"

"opensearch/pkg/xerror"
"opensearch/src/model/storage/document"

"github.com/opensearch-project/opensearch-go/v4"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
)

type OpenSearch struct {
Client *opensearch.Client
Index string
}

func (o OpenSearch) UpsertComments(ctx context.Context, comments []*document.Comment) (*document.BulkResponse, error) {
body := &bytes.Buffer{}

for _, comment := range comments {
doc, err := json.Marshal(map[string]any{"doc": comment, "doc_as_upsert": true})
if err != nil {
return nil, fmt.Errorf("marshal: %s: %w", comment.ID, err)
}

body.WriteString(`{"update":{"_id":"` + comment.ID + `"}}`)
body.WriteString("\n")
body.Write(doc)
body.WriteString("\n")
}

req := opensearchapi.BulkReq{
Index: o.Index,
Body: body,
Params: opensearchapi.BulkParams{
Refresh: "true",
},
}

res, err := o.Client.Do(ctx, req, nil)
if err != nil {
return nil, fmt.Errorf("do: %w", err)
}
defer res.Body.Close()

if res.IsError() {
return nil, fmt.Errorf("unexpected response: %s", res.String())
}

result := &document.BulkResponse{}

if err := json.NewDecoder(res.Body).Decode(result); err != nil {
return nil, fmt.Errorf("response decode: %w", err)
}

return result, nil
}

func (o OpenSearch) FindComment(ctx context.Context, id string) (document.Comment, error) {
req := opensearchapi.DocumentGetReq{
Index: o.Index,
DocumentID: id,
}

res, err := o.Client.Do(ctx, req, nil)
if err != nil {
return document.Comment{}, fmt.Errorf("do: %w", err)
}
defer res.Body.Close()

if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return document.Comment{}, xerror.ErrNotFound
}

return document.Comment{}, fmt.Errorf("unexpected response: %s", res.String())
}

var (
src document.Comment
doc = getResponse{Source: &src}
)

if err := json.NewDecoder(res.Body).Decode(&doc); err != nil {
return document.Comment{}, fmt.Errorf("response decode: %w", err)
}

if !doc.Found {
return document.Comment{}, nil
}

return src, nil
}

func (o OpenSearch) ListComments(ctx context.Context, param document.ListParam) ([]*document.Comment, int, error) {
req := opensearchapi.SearchReq{
Indices: []string{o.Index},
Body: listCommentsQuery(param),
}

res, err := o.Client.Do(ctx, req, nil)
if err != nil {
return nil, 0, err
}
defer res.Body.Close()

var (
comments []*document.Comment
total int
)

if err := decodeMany(res, &comments, &total); err != nil {
return nil, 0, err
}

return comments, total, nil
}

opensearch/response.go


package opensearch

import "encoding/json"

type getResponse struct {
Found bool `json:"found"`
Source any `json:"_source"`
}

type searchResponse struct {
Hits struct {
Total struct {
Value int `json:"value"`
} `json:"total"`
Hits []struct {
ID string `json:"_id"`
Source json.RawMessage `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}

opensearch/decode.go


package opensearch

import (
"encoding/json"
"errors"
"fmt"
"net/http"

"github.com/opensearch-project/opensearch-go/v4"
)

func decodeMany(res *opensearch.Response, src any, total *int) error {
if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return fmt.Errorf("no record found: %d", res.StatusCode)
}

return fmt.Errorf("unexpected response: %s", res.String())
}

if res.Body == nil {
return errors.New("nil response body")
}

var result searchResponse

if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return fmt.Errorf("decode record: %w", err)
}

if result.Hits.Total.Value == 0 {
return nil
}

documents := make([]json.RawMessage, 0, len(result.Hits.Hits))
for _, hit := range result.Hits.Hits {
documents = append(documents, hit.Source)
}

data, err := json.Marshal(documents)
if err != nil {
return fmt.Errorf("marshal records: %w", err)
}

if err := json.Unmarshal(data, src); err != nil {
return fmt.Errorf("decode record sources: %w", err)
}

*total = result.Hits.Total.Value

return nil
}

opensearch/page.go


package opensearch

// page returns page size and page from values.
func page(page, size *int) (int, int) {
pgno := 0
if page != nil && *page != 0 {
pgno = *page
}

pgsz := 100
if size != nil && *size != 0 && *size <= 100 {
pgsz = *size
}

pgfr := 0
if pgno != 0 {
pgfr = (pgsz * pgno) - pgsz
}

return pgsz, pgfr
}

opensearch/query.go


package opensearch

import (
"context"
"fmt"
"io"
"log/slog"
"strings"

"opensearch/src/model/storage/document"
)

func listCommentsQuery(param document.ListParam) io.Reader {
size, from := page(param.Page, param.Size)

must := `{"term":{"type":"comment"}}`

if param.UserID != nil && *param.UserID != "" {
must += `,{"term":{"user_id":"` + *param.UserID + `"}}`
}

if param.PostID != nil && *param.PostID != "" {
must += `,{"term":{"post_id":"` + *param.PostID + `"}}`
}

query := fmt.Sprintf(`{"query":{"bool":{"must":[%s]}},"size":%d,"from":%d}`, must, size, from)

slog.InfoContext(context.Background(), "List comments", "data.query", query)

return strings.NewReader(query)
}

API


curl --location --request PUT 'http://localhost:8080/api/v1/comments' \
--header 'Content-Type: application/json' \
--data '[
{
"user_id": "e4d1ca02-dff1-47d7-b193-0486e40c2733",
"post_id": "ef2f15e4-690d-4eea-aeb3-ff45dd70f460",
"text": "Perferendis assumenda dolor officiis similique ex doloribus et qui doloribus."
},
{
"user_id": "7be2e4e3-0421-4d16-92c2-7ad481156bfa",
"post_id": "d877f91d-c0f7-4c61-bf40-8399b350214d",
"text": "Perspiciatis architecto iste dolores assumenda quam aperiam officiis officiis tenetur."
},
{
"user_id": "0440e7cb-ede0-48df-94ee-ba5c810c9696",
"post_id": "7b856264-514a-49e3-b2bb-1ca63929b8a9",
"text": "Tenetur explicabo qui deleniti ab repudiandae quasi tempore explicabo."
}
]'

curl --location 'http://localhost:8080/api/v1/comments/424adee8-dc89-4126-859d-b6d8b067ce4f'

curl --location 'http://localhost:8080/api/v1/comments' \
--header 'Content-Type: application/json' \
--data '{
"page": 1,
"size": 3,
"user_id": "27a7c30b-cea0-4d49-8ab3-8712823ec9e8",
"post_id": "41571b9f-68bc-4c41-b63b-924877906eb5"
}'