In this example we are going to show how OpenSearch can be used in Golang to bulk upsert documents and manually refresh index at the end of the operation. Along with that, we upsert a single document and automatically refresh index. Finally we get a document by its id. One thing to note, we directly work with index alias rather than the concrete index.


Files


docker-compose.yaml


You can access UI using http://0.0.0.0:5601/ and login with admin:Adm1n_Pa55w0rd?.


version: "3.8"

services:
opensearch-node:
image: opensearchproject/opensearch:2.13.0
container_name: opensearch-node
environment:
- discovery.type=single-node
- OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=Adm1n_Pa55w0rd?
ports:
- 9200:9200 # Rest HTTP
- 9600:9600 # Dashboard HTTP

opensearch-dashboard:
image: opensearchproject/opensearch-dashboards:2.13.0
container_name: opensearch-dashboard
environment:
OPENSEARCH_HOSTS: '["https://opensearch-node:9200"]'
ports:
- 5601:5601

Makefile


ADDRESS = https://0.0.0.0:9200
CREDENTIALS = admin:Adm1n_Pa55w0rd?

.PHONY: opensearch-up
opensearch-up: ## Bring up OpenSearch instance.
docker-compose up

.PHONY: index-create
index-create: ## Create index and alias.
@curl -X PUT --insecure -u "${CREDENTIALS}" "${ADDRESS}/order_index" -H "Content-Type: application/json" -d '{"aliases":{"order_index_alias":{}}}'; echo
@curl -X PUT --insecure -u "${CREDENTIALS}" "${ADDRESS}/order_meta_index" -H "Content-Type: application/json" -d '{"aliases":{"order_meta_index_alias":{}}}'; echo

.PHONY: index-clear
index-clear: ## Delete all documents in index.
@curl -X POST --insecure -u "${CREDENTIALS}" "${ADDRESS}/order_index/_delete_by_query" -H "Content-Type: application/json" -d '{"query":{"match_all":{}}}'; echo
@curl -X POST --insecure -u "${CREDENTIALS}" "${ADDRESS}/order_meta_index/_delete_by_query" -H "Content-Type: application/json" -d '{"query":{"match_all":{}}}'; echo

.PHONY: index-load
index-load: ## Populate index with test documents.
@curl -i -X POST --insecure -u "${CREDENTIALS}" "${ADDRESS}/_bulk" -H "Content-Type: application/json" --data-binary @script/opensearch/fixture/order_index.txt; echo
@curl -i -X POST --insecure -u "${CREDENTIALS}" "${ADDRESS}/_bulk" -H "Content-Type: application/json" --data-binary @script/opensearch/fixture/order_meta_index.txt; echo

order_index.txt


{"index":{"_index":"order_index","_id":"id-1"}}
{"id":"id-1","data":{"label":{"list":["one"],"active":false,"region":["Euro"]}}}
{"index":{"_index":"order_index","_id":"id-2"}}
{"id":"id-2","data":{"label":{"list":["one","two"],"active":true,"region":["Asia"]}}}
{"index":{"_index":"order_index","_id":"id-3"}}
{"id":"id-3","data":{"label":{"list":["one","two","three"],"active":false,"region":["Africa"]}}}
{"index":{"_index":"order_index","_id":"id-4"}}
{"id":"id-4","data":{"label":{"list":["two","three"],"active":true,"region":["Euro"]}}}
{"index":{"_index":"order_index","_id":"id-5"}}
{"id":"id-5","data":{"label":{"list":["three"],"active":true,"region":["Africa", "Asia"]}}}

order_meta_index.txt


{"index":{"_index":"order_meta_index","_id":"id-1"}}
{"updated_at": "2024-12-01T12:02:02.022022Z"}

xopensearch.go


package xopensearch

import (
"crypto/tls"
"fmt"
"net/http"

"github.com/opensearch-project/opensearch-go"
)

type Config struct {
Insecure bool
Username string
Password string
Addresses []string
}

// New returns a new OpenSearch client instance.
func New(cfg Config) (*opensearch.Client, error) {
cli, err := opensearch.NewClient(opensearch.Config{
Username: cfg.Username,
Password: cfg.Password,
Addresses: cfg.Addresses,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: cfg.Insecure,
},
},
})
if err != nil {
return nil, fmt.Errorf("unable to create client: %w", err)
}

res, err := cli.Info()
if err != nil {
return nil, fmt.Errorf("unable to get client info: %w", err)
}
defer res.Body.Close()

if res.IsError() {
return nil, fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}

return cli, nil
}

connection.go


func main() {
client, err := xopensearch.New(xopensearch.Config{
Insecure: true,
Username: "admin",
Password: "Adm1n_Pa55w0rd?",
Addresses: []string{"https://localhost:9200"},
})
if err != nil {
log.Fatalln("connection:", err)
}
}

order_meta.go


# shopping/src/model/storage/document/order_meta.go

package document

import (
"time"
)

type OrderMeta struct {
UpdatedAt time.Time `json:"updated_at"`
}

order.go


# shopping/src/model/storage/document/order.go

package document

import (
"encoding/json"
)

type Order struct {
ID string `json:"id"`
Data json.RawMessage `json:"data"`
}

opensearch.go


You should implement custom errors and return them instead.


# shopping/src/storage/opensearch/opensearch.go

package opensearch

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"

"shopping/src/model/storage/document"

"github.com/opensearch-project/opensearch-go"
"github.com/opensearch-project/opensearch-go/opensearchapi"
)

type Opensearch struct {
Client *opensearch.Client
OrderIndexAlias string
OrderMetaAlias string
}

// UpsertOrders bulk inserts Order related documents if not found otherwise
// updates.
func (o Opensearch) UpsertOrders(ctx context.Context, Orders []*document.Order) error {
var body strings.Builder

for _, Order := range Orders {
body.WriteString(fmt.Sprintf(`{"update":{"_id":"%s","_index":"%s"}}`, Order.ID, o.OrderIndexAlias))
body.WriteString("\n")

doc, err := json.Marshal(map[string]any{"doc": Order, "doc_as_upsert": true})
if err != nil {
return fmt.Errorf("marshal: %s: %w", Order.ID, err)
}

body.Write(doc)
body.WriteString("\n")
}

req := opensearchapi.BulkRequest{
Body: strings.NewReader(body.String()),
}

res, err := req.Do(ctx, o.Client)
if err != nil {
return fmt.Errorf("do: %w", err)
}
defer res.Body.Close()

if res.IsError() {
return fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}

return nil
}

// RefreshOrders refreshes index and index alias no matter which one is set.
func (o Opensearch) RefreshOrders(ctx context.Context) error {
req := opensearchapi.IndicesRefreshRequest{
Index: []string{o.OrderIndexAlias},
}

res, err := req.Do(ctx, o.Client)
if err != nil {
return fmt.Errorf("do: %w", err)
}
defer res.Body.Close()

if res.IsError() {
return fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}

return nil
}

// UpsertOrderMeta inserts an Order related meta document if not found otherwise
// updates it which is followed by an immediate index refresh.
func (o Opensearch) UpsertOrderMeta(ctx context.Context, meta document.OrderMeta, orderID string) error {
var body strings.Builder
body.WriteString(fmt.Sprintf(`{"update":{"_id":"%s","_index":"%s"}}`, orderID, o.OrderMetaAlias))
body.WriteString("\n")

doc, err := json.Marshal(map[string]any{"doc": meta, "doc_as_upsert": true})
if err != nil {
return fmt.Errorf("marshal: %w", err)
}

body.Write(doc)
body.WriteString("\n")

req := opensearchapi.BulkRequest{
Body: strings.NewReader(body.String()),
Refresh: "true",
}

res, err := req.Do(ctx, o.Client)
if err != nil {
return fmt.Errorf("do: %w", err)
}
defer res.Body.Close()

if res.IsError() {
return fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}

return nil
}

// FindOrderMeta returns a document if found.
func (o Opensearch) FindOrderMeta(ctx context.Context, orderID string) (document.OrderMeta, error) {
req := opensearchapi.GetRequest{
Index: o.OrderMetaAlias,
DocumentID: orderID,
}

res, err := req.Do(ctx, o.Client)
if err != nil {
return document.OrderMeta{}, fmt.Errorf("do: %w", err)
}
defer res.Body.Close()

if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return document.OrderMeta{}, nil
}

return document.OrderMeta{}, fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}

var (
doc = document.OrderMeta{}
found = response{Model: &doc}
)

if err := json.NewDecoder(res.Body).Decode(&found); err != nil {
return document.OrderMeta{}, fmt.Errorf("response decode: %w", err)
}

if !found.Found {
return document.OrderMeta{}, nil
}

return doc, nil
}

response.go


# shopping/src/storage/opensearch/response.go

package opensearch

type response struct {
Found bool `json:"found"`
Model any `json:"_source"`
}

Example commands


Create index with alias


PUT /order_index
{
"aliases": {
"order_index_alias": {}
}
}

PUT /order_meta_index
{
"aliases": {
"order_meta_index_alias": {}
}
}

$ curl -i -X PUT \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index" \
-H "Content-Type: application/json" \
-d '{"aliases":{"order_index_alias":{}}}'

$ curl -i -X PUT \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_meta_index" \
-H "Content-Type: application/json" \
-d '{"aliases":{"order_meta_index_alias":{}}}'

Bulk upsert


$ curl -i -X POST \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/_bulk" \
-H "Content-Type: application/json" \
-d '
{"update":{"_id":"order-path-1","_index":"order_index_alias"}}
{"doc":{"id":"order-id-1","path":"order-path-1","data":{"key":"val"}}, "doc_as_upsert": true}
{"update":{"_id":"order-path-2","_index":"order_index_alias"}}
{"doc":{"id":"order-id-2","path":"order-path-2","data":{"key":"val"}}, "doc_as_upsert": true}
'

Find document by ID


$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_doc/order-path-1"

$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_search" \
-H "Content-Type: application/json" \
-d '{"query":{"terms":{"_id":["order-path-1"]}}}'

List documents


$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_search"

$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_search?size=20&pretty=true"

Refresh index


$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_refresh"

Delete all documents in an index


$ curl -i -X POST \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_delete_by_query" \
-H "Content-Type: application/json" \
-d '{"query":{"match_all":{}}}'

Nested search


$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_search" \
-H "Content-Type: application/json" \
-d '
{
"query": {
"bool": {
"must": [
{
"match": {
"data.label.list": "one"
}
},
{
"match": {
"data.label.region": "Asia"
}
},
{
"match": {
"data.label.active": true
}
}
]
}
}
}
'

References