09/02/2025 - AWS, DOCKER, ELASTICSEARCH, GO
Bu önceki örneğe benzer ancak istemci API v4'ü kullanacağız. OpenSearch'ün Golang'da toplu upsert
belgeleri bulmak ve listelemek için nasıl kullanılabileceğini göstereceğiz.
Not: Örneği biraz kaba bıraktım çünkü asıl mesele olan gerçek OpenSearch istemcisine ve depolama uygulamalarına odaklanmanızı istiyorum. Geriye kalanları toparlamayı size bırakıyorum.
├── docker-compose.yaml
├── main.go
├── pkg
│ ├── xerror
│ │ └── error.go
│ └── xopensearch
│ └── opensearch.go
└── src
├── api
│ └── comment.go
├── model
│ ├── api
│ │ └── comment.go
│ └── storage
│ └── document
│ ├── comment.go
│ └── response.go
└── storage
└── opensearch
├── decode.go
├── opensearch.go
├── page.go
├── query.go
└── response.go
services:
opensearch-node:
image: opensearchproject/opensearch:2.13.0
container_name: opensearch-node
environment:
- discovery.type=single-node
- OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=5tr0ng_Pa55w0rd
ports:
- 9200:9200 # Rest HTTP
- 9600:9600 # Dashboard HTTP
opensearch-dashboard:
image: opensearchproject/opensearch-dashboards:2.13.0
container_name: opensearch-dashboard
environment:
OPENSEARCH_HOSTS: '["https://opensearch-node:9200"]'
ports:
- 5601:5601
package xerror
import "errors"
var (
ErrNotFound = errors.New("not found")
)
package xopensearch
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"strings"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/opensearch-project/opensearch-go/v4"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/opensearch-project/opensearch-go/v4/signer/awsv2"
)
// Config is used to configure OpenSearch client.
type Config struct {
// Insecure is used when working in local environment.
Insecure bool
// Username for HTTP Basic Authentication.
Username string
// Password for HTTP Basic Authentication.
Password string
// A list of nodes to connect to.
Addresses []string
// AWSSigner indicates that the requests must be signed with AWS signer.
// This is used for deployment environments only.
AWSSigner bool
// AWSConfig is used to sign requests only if `AWSSigner` is `true`.
AWSConfig aws.Config
}
type OpenSearch struct {
client *opensearch.Client
}
// New returns a new OpenSearch type.
func New(config Config) (OpenSearch, error) {
cfg := opensearch.Config{
Addresses: config.Addresses,
}
if config.Username != "" {
cfg.Username = config.Username
}
if config.Password != "" {
cfg.Password = config.Password
}
if config.Insecure {
cfg.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: config.Insecure,
},
}
}
if config.AWSSigner {
sig, err := awsv2.NewSignerWithService(config.AWSConfig, "es")
if err != nil {
return OpenSearch{}, fmt.Errorf("unable to create signer: %w", err)
}
cfg.Signer = sig
}
cli, err := opensearchapi.NewClient(opensearchapi.Config{Client: cfg})
if err != nil {
return OpenSearch{}, fmt.Errorf("unable to create client: %w", err)
}
return OpenSearch{
client: cli.Client,
}, nil
}
// Client return OpenSearch client.
func (o OpenSearch) Client() *opensearch.Client {
return o.client
}
// CreateIndex creates a concrete index with optional body parameter. If it
// already exists, nil is returned.
func (o OpenSearch) CreateIndex(ctx context.Context, index, body string) error {
exits, err := o.client.Do(ctx, opensearchapi.IndicesExistsReq{
Indices: []string{index},
}, nil)
if err != nil {
return fmt.Errorf("index exists: %w", err)
}
defer exits.Body.Close()
if exits.StatusCode == http.StatusOK {
return nil
}
if exits.StatusCode != http.StatusNotFound {
return fmt.Errorf("unexpected response: %s", exits.String())
}
create, err := o.client.Do(ctx, opensearchapi.IndicesCreateReq{
Index: index,
Body: strings.NewReader(body),
}, nil)
if err != nil {
return fmt.Errorf("index create: %w", err)
}
defer create.Body.Close()
if create.IsError() {
return fmt.Errorf("invalid response: %s", create.String())
}
return nil
}
// CreateAlias creates an alias that points to an existing concrete index. If it
// already exists, nil is returned.
func (o OpenSearch) CreateAlias(ctx context.Context, index, alias string) error {
exits, err := o.client.Do(ctx, opensearchapi.AliasExistsReq{
Indices: []string{alias},
}, nil)
if err != nil {
return fmt.Errorf("alias exists: %w", err)
}
defer exits.Body.Close()
if exits.StatusCode == http.StatusOK {
return nil
}
if exits.StatusCode != http.StatusNotFound {
return fmt.Errorf("unexpected response: %s", exits.String())
}
create, err := o.client.Do(ctx, opensearchapi.AliasPutReq{
Indices: []string{index},
Alias: alias,
}, nil)
if err != nil {
return fmt.Errorf("alias create: %w", err)
}
defer create.Body.Close()
if create.IsError() {
return fmt.Errorf("invalid response: %s", create.String())
}
return nil
}
package main
import (
"context"
"log/slog"
"net/http"
"opensearch/pkg/xopensearch"
"opensearch/src/api"
"opensearch/src/storage/opensearch"
)
func main() {
ctx := context.Background()
ops, err := xopensearch.New(xopensearch.Config{
Insecure: true,
Username: "admin",
Password: "5tr0ng_Pa55w0rd",
Addresses: []string{"https://localhost:9200"},
})
if err != nil {
slog.ErrorContext(ctx, "Create OpenSearch client", "error", err)
return
}
var (
indexName = "blog"
indexBody = `{
"mappings": {
"properties": {
"type": {
"type": "keyword"
},
"id": {
"type": "keyword"
},
"user_id": {
"type": "keyword"
},
"post_id": {
"type": "keyword"
},
"text": {
"type": "text"
},
"date": {
"type": "date"
}
}
},
"aliases": {
"alias_20250209_1244": {}
}
}`
)
if err := ops.CreateIndex(ctx, indexName, indexBody); err != nil {
slog.ErrorContext(ctx, "Create OpenSearch index", "error", err)
return
}
storage := opensearch.OpenSearch{
Client: ops.Client(),
Index: indexName,
}
comment := api.Comment{
Storage: storage,
}
router := http.NewServeMux()
router.HandleFunc("PUT /api/v1/comments", comment.Create)
router.HandleFunc("GET /api/v1/comments/{id}", comment.Find)
router.HandleFunc("POST /api/v1/comments", comment.List)
http.ListenAndServe(":8080", router)
}
package api
import (
"context"
"encoding/json"
"errors"
"log/slog"
"net/http"
"time"
"opensearch/pkg/xerror"
"opensearch/src/model/api"
"opensearch/src/model/storage/document"
"github.com/google/uuid"
)
type storer interface {
UpsertComments(ctx context.Context, comments []*document.Comment) (*document.BulkResponse, error)
FindComment(ctx context.Context, id string) (document.Comment, error)
ListComments(ctx context.Context, param document.ListParam) ([]*document.Comment, int, error)
}
type Comment struct {
Storage storer
}
func (c Comment) Create(w http.ResponseWriter, r *http.Request) {
var req []api.CreateCommentRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
slog.ErrorContext(r.Context(), "Unable to decode request", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
comments := make([]*document.Comment, 0, len(req))
for _, comment := range req {
comments = append(comments, &document.Comment{
Type: "comment",
ID: uuid.New().String(),
PostID: comment.PostID,
UserID: comment.UserID,
Text: comment.Text,
Date: time.Now().UTC(),
})
}
res, err := c.Storage.UpsertComments(r.Context(), comments)
if err != nil {
slog.ErrorContext(r.Context(), "Unable to upsert comments", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if res.AnyError {
for _, operation := range res.Operations {
if operation.Update.Status > 399 {
slog.WarnContext(r.Context(), "Unable to upsert comment",
"data.id", operation.Update.ID,
"data.reason", operation.Update.Error.Reason,
)
}
}
}
w.WriteHeader(http.StatusOK)
}
func (c Comment) Find(w http.ResponseWriter, r *http.Request) {
res, err := c.Storage.FindComment(r.Context(), r.PathValue("id"))
if err != nil {
if errors.Is(err, xerror.ErrNotFound) {
w.WriteHeader(http.StatusNotFound)
return
}
slog.ErrorContext(r.Context(), "Unable to find comment", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
doc := api.FindCommentResponse{
Comment: api.Comment{
ID: res.ID,
UserID: res.UserID,
PostID: res.PostID,
Text: res.Text,
Date: res.Date,
},
}
dat, err := json.Marshal(doc)
if err != nil {
slog.ErrorContext(r.Context(), "Unable to marshal response", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Add("Content-type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(dat)
}
func (c Comment) List(w http.ResponseWriter, r *http.Request) {
var req api.ListCommentsRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
slog.ErrorContext(r.Context(), "Unable to decode request", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
res, tot, err := c.Storage.ListComments(r.Context(), document.ListParam{
Page: req.Page,
Size: req.Size,
UserID: req.UserID,
PostID: req.PostID,
})
if err != nil {
slog.ErrorContext(r.Context(), "Unable to lis comments", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
docs := make([]*api.Comment, 0, len(res))
for _, comment := range res {
docs = append(docs, &api.Comment{
ID: comment.ID,
UserID: comment.UserID,
PostID: comment.PostID,
Text: comment.Text,
Date: comment.Date,
})
}
result := api.ListCommentsResponse{
Comments: docs,
Total: tot,
}
dat, err := json.Marshal(result)
if err != nil {
slog.ErrorContext(r.Context(), "Unable to marshal response", "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Add("Content-type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(dat)
}
package api
import "time"
type Comment struct {
ID string `json:"id"`
UserID string `json:"user_id"`
PostID string `json:"post_id"`
Text string `json:"text"`
Date time.Time `json:"date"`
}
type CreateCommentRequest struct {
UserID string `json:"user_id"`
PostID string `json:"post_id"`
Text string `json:"text"`
}
type FindCommentResponse struct {
Comment
}
type ListCommentsRequest struct {
Page *int `json:"page"`
Size *int `json:"size"`
UserID *string `json:"user_id"`
PostID *string `json:"post_id"`
}
type ListCommentsResponse struct {
Comments []*Comment `json:"comments"`
Total int `json:"total"`
}
package document
import (
"time"
)
type Comment struct {
Type string `json:"type"`
ID string `json:"id"`
UserID string `json:"user_id"`
PostID string `json:"post_id"`
Text string `json:"text"`
Date time.Time `json:"date"`
}
type ListParam struct {
Page *int
Size *int
UserID *string
PostID *string
}
package document
type BulkResponse struct {
AnyError bool `json:"errors"`
Operations []*BulkResponseOperation `json:"items"`
}
type BulkResponseOperation struct {
Update *BulkResponseItem `json:"update,omitempty"`
}
type BulkResponseItem struct {
ID string `json:"_id"`
Status int `json:"status"`
Error *BulkResponseError `json:"error,omitempty"`
}
type BulkResponseError struct {
Reason string `json:"reason"`
}
package opensearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"opensearch/pkg/xerror"
"opensearch/src/model/storage/document"
"github.com/opensearch-project/opensearch-go/v4"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
)
type OpenSearch struct {
Client *opensearch.Client
Index string
}
func (o OpenSearch) UpsertComments(ctx context.Context, comments []*document.Comment) (*document.BulkResponse, error) {
body := &bytes.Buffer{}
for _, comment := range comments {
doc, err := json.Marshal(map[string]any{"doc": comment, "doc_as_upsert": true})
if err != nil {
return nil, fmt.Errorf("marshal: %s: %w", comment.ID, err)
}
body.WriteString(`{"update":{"_id":"` + comment.ID + `"}}`)
body.WriteString("\n")
body.Write(doc)
body.WriteString("\n")
}
req := opensearchapi.BulkReq{
Index: o.Index,
Body: body,
Params: opensearchapi.BulkParams{
Refresh: "true",
},
}
res, err := o.Client.Do(ctx, req, nil)
if err != nil {
return nil, fmt.Errorf("do: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("unexpected response: %s", res.String())
}
result := &document.BulkResponse{}
if err := json.NewDecoder(res.Body).Decode(result); err != nil {
return nil, fmt.Errorf("response decode: %w", err)
}
return result, nil
}
func (o OpenSearch) FindComment(ctx context.Context, id string) (document.Comment, error) {
req := opensearchapi.DocumentGetReq{
Index: o.Index,
DocumentID: id,
}
res, err := o.Client.Do(ctx, req, nil)
if err != nil {
return document.Comment{}, fmt.Errorf("do: %w", err)
}
defer res.Body.Close()
if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return document.Comment{}, xerror.ErrNotFound
}
return document.Comment{}, fmt.Errorf("unexpected response: %s", res.String())
}
var (
src document.Comment
doc = getResponse{Source: &src}
)
if err := json.NewDecoder(res.Body).Decode(&doc); err != nil {
return document.Comment{}, fmt.Errorf("response decode: %w", err)
}
if !doc.Found {
return document.Comment{}, nil
}
return src, nil
}
func (o OpenSearch) ListComments(ctx context.Context, param document.ListParam) ([]*document.Comment, int, error) {
req := opensearchapi.SearchReq{
Indices: []string{o.Index},
Body: listCommentsQuery(param),
}
res, err := o.Client.Do(ctx, req, nil)
if err != nil {
return nil, 0, err
}
defer res.Body.Close()
var (
comments []*document.Comment
total int
)
if err := decodeMany(res, &comments, &total); err != nil {
return nil, 0, err
}
return comments, total, nil
}
package opensearch
import "encoding/json"
type getResponse struct {
Found bool `json:"found"`
Source any `json:"_source"`
}
type searchResponse struct {
Hits struct {
Total struct {
Value int `json:"value"`
} `json:"total"`
Hits []struct {
ID string `json:"_id"`
Source json.RawMessage `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
package opensearch
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"github.com/opensearch-project/opensearch-go/v4"
)
func decodeMany(res *opensearch.Response, src any, total *int) error {
if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return fmt.Errorf("no record found: %d", res.StatusCode)
}
return fmt.Errorf("unexpected response: %s", res.String())
}
if res.Body == nil {
return errors.New("nil response body")
}
var result searchResponse
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return fmt.Errorf("decode record: %w", err)
}
if result.Hits.Total.Value == 0 {
return nil
}
documents := make([]json.RawMessage, 0, len(result.Hits.Hits))
for _, hit := range result.Hits.Hits {
documents = append(documents, hit.Source)
}
data, err := json.Marshal(documents)
if err != nil {
return fmt.Errorf("marshal records: %w", err)
}
if err := json.Unmarshal(data, src); err != nil {
return fmt.Errorf("decode record sources: %w", err)
}
*total = result.Hits.Total.Value
return nil
}
package opensearch
// page returns page size and page from values.
func page(page, size *int) (int, int) {
pgno := 0
if page != nil && *page != 0 {
pgno = *page
}
pgsz := 100
if size != nil && *size != 0 && *size <= 100 {
pgsz = *size
}
pgfr := 0
if pgno != 0 {
pgfr = (pgsz * pgno) - pgsz
}
return pgsz, pgfr
}
package opensearch
import (
"context"
"fmt"
"io"
"log/slog"
"strings"
"opensearch/src/model/storage/document"
)
func listCommentsQuery(param document.ListParam) io.Reader {
size, from := page(param.Page, param.Size)
must := `{"term":{"type":"comment"}}`
if param.UserID != nil && *param.UserID != "" {
must += `,{"term":{"user_id":"` + *param.UserID + `"}}`
}
if param.PostID != nil && *param.PostID != "" {
must += `,{"term":{"post_id":"` + *param.PostID + `"}}`
}
query := fmt.Sprintf(`{"query":{"bool":{"must":[%s]}},"size":%d,"from":%d}`, must, size, from)
slog.InfoContext(context.Background(), "List comments", "data.query", query)
return strings.NewReader(query)
}
curl --location --request PUT 'http://localhost:8080/api/v1/comments' \
--header 'Content-Type: application/json' \
--data '[
{
"user_id": "e4d1ca02-dff1-47d7-b193-0486e40c2733",
"post_id": "ef2f15e4-690d-4eea-aeb3-ff45dd70f460",
"text": "Perferendis assumenda dolor officiis similique ex doloribus et qui doloribus."
},
{
"user_id": "7be2e4e3-0421-4d16-92c2-7ad481156bfa",
"post_id": "d877f91d-c0f7-4c61-bf40-8399b350214d",
"text": "Perspiciatis architecto iste dolores assumenda quam aperiam officiis officiis tenetur."
},
{
"user_id": "0440e7cb-ede0-48df-94ee-ba5c810c9696",
"post_id": "7b856264-514a-49e3-b2bb-1ca63929b8a9",
"text": "Tenetur explicabo qui deleniti ab repudiandae quasi tempore explicabo."
}
]'
curl --location 'http://localhost:8080/api/v1/comments/424adee8-dc89-4126-859d-b6d8b067ce4f'
curl --location 'http://localhost:8080/api/v1/comments' \
--header 'Content-Type: application/json' \
--data '{
"page": 1,
"size": 3,
"user_id": "27a7c30b-cea0-4d49-8ab3-8712823ec9e8",
"post_id": "41571b9f-68bc-4c41-b63b-924877906eb5"
}'