01/11/2024 - DOCKER, ELASTICSEARCH, GO
In this example we are going to show how OpenSearch can be used in Golang to bulk upsert
documents and manually refresh
index at the end of the operation. Along with that, we upsert
a single document and automatically refresh
index. Finally we get
a document by its id. One thing to note, we directly work with index alias rather than the concrete index.
You can access UI using http://0.0.0.0:5601/
and login with admin:Adm1n_Pa55w0rd?
.
version: "3.8"
services:
opensearch-node:
image: opensearchproject/opensearch:2.13.0
container_name: opensearch-node
environment:
- discovery.type=single-node
- OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=Adm1n_Pa55w0rd?
ports:
- 9200:9200 # Rest HTTP
- 9600:9600 # Dashboard HTTP
opensearch-dashboard:
image: opensearchproject/opensearch-dashboards:2.13.0
container_name: opensearch-dashboard
environment:
OPENSEARCH_HOSTS: '["https://opensearch-node:9200"]'
ports:
- 5601:5601
ADDRESS = https://0.0.0.0:9200
CREDENTIALS = admin:Adm1n_Pa55w0rd?
.PHONY: opensearch-up
opensearch-up: ## Bring up OpenSearch instance.
docker-compose up
.PHONY: index-create
index-create: ## Create index and alias.
@curl -X PUT --insecure -u "${CREDENTIALS}" "${ADDRESS}/order_index" -H "Content-Type: application/json" -d '{"aliases":{"order_index_alias":{}}}'; echo
@curl -X PUT --insecure -u "${CREDENTIALS}" "${ADDRESS}/order_meta_index" -H "Content-Type: application/json" -d '{"aliases":{"order_meta_index_alias":{}}}'; echo
.PHONY: index-clear
index-clear: ## Delete all documents in index.
@curl -X POST --insecure -u "${CREDENTIALS}" "${ADDRESS}/order_index/_delete_by_query" -H "Content-Type: application/json" -d '{"query":{"match_all":{}}}'; echo
@curl -X POST --insecure -u "${CREDENTIALS}" "${ADDRESS}/order_meta_index/_delete_by_query" -H "Content-Type: application/json" -d '{"query":{"match_all":{}}}'; echo
.PHONY: index-load
index-load: ## Populate index with test documents.
@curl -i -X POST --insecure -u "${CREDENTIALS}" "${ADDRESS}/_bulk" -H "Content-Type: application/json" --data-binary @script/opensearch/fixture/order_index.txt; echo
@curl -i -X POST --insecure -u "${CREDENTIALS}" "${ADDRESS}/_bulk" -H "Content-Type: application/json" --data-binary @script/opensearch/fixture/order_meta_index.txt; echo
{"index":{"_index":"order_index","_id":"id-1"}}
{"id":"id-1","data":{"label":{"list":["one"],"active":false,"region":["Euro"]}}}
{"index":{"_index":"order_index","_id":"id-2"}}
{"id":"id-2","data":{"label":{"list":["one","two"],"active":true,"region":["Asia"]}}}
{"index":{"_index":"order_index","_id":"id-3"}}
{"id":"id-3","data":{"label":{"list":["one","two","three"],"active":false,"region":["Africa"]}}}
{"index":{"_index":"order_index","_id":"id-4"}}
{"id":"id-4","data":{"label":{"list":["two","three"],"active":true,"region":["Euro"]}}}
{"index":{"_index":"order_index","_id":"id-5"}}
{"id":"id-5","data":{"label":{"list":["three"],"active":true,"region":["Africa", "Asia"]}}}
{"index":{"_index":"order_meta_index","_id":"id-1"}}
{"updated_at": "2024-12-01T12:02:02.022022Z"}
package xopensearch
import (
"crypto/tls"
"fmt"
"net/http"
"github.com/opensearch-project/opensearch-go"
)
type Config struct {
Insecure bool
Username string
Password string
Addresses []string
}
// New returns a new OpenSearch client instance.
func New(cfg Config) (*opensearch.Client, error) {
cli, err := opensearch.NewClient(opensearch.Config{
Username: cfg.Username,
Password: cfg.Password,
Addresses: cfg.Addresses,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: cfg.Insecure,
},
},
})
if err != nil {
return nil, fmt.Errorf("unable to create client: %w", err)
}
res, err := cli.Info()
if err != nil {
return nil, fmt.Errorf("unable to get client info: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}
return cli, nil
}
func main() {
client, err := xopensearch.New(xopensearch.Config{
Insecure: true,
Username: "admin",
Password: "Adm1n_Pa55w0rd?",
Addresses: []string{"https://localhost:9200"},
})
if err != nil {
log.Fatalln("connection:", err)
}
}
# shopping/src/model/storage/document/order_meta.go
package document
import (
"time"
)
type OrderMeta struct {
UpdatedAt time.Time `json:"updated_at"`
}
# shopping/src/model/storage/document/order.go
package document
import (
"encoding/json"
)
type Order struct {
ID string `json:"id"`
Data json.RawMessage `json:"data"`
}
You should implement custom errors and return them instead.
# shopping/src/storage/opensearch/opensearch.go
package opensearch
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"shopping/src/model/storage/document"
"github.com/opensearch-project/opensearch-go"
"github.com/opensearch-project/opensearch-go/opensearchapi"
)
type Opensearch struct {
Client *opensearch.Client
OrderIndexAlias string
OrderMetaAlias string
}
// UpsertOrders bulk inserts Order related documents if not found otherwise
// updates.
func (o Opensearch) UpsertOrders(ctx context.Context, Orders []*document.Order) error {
var body strings.Builder
for _, Order := range Orders {
body.WriteString(fmt.Sprintf(`{"update":{"_id":"%s","_index":"%s"}}`, Order.ID, o.OrderIndexAlias))
body.WriteString("\n")
doc, err := json.Marshal(map[string]any{"doc": Order, "doc_as_upsert": true})
if err != nil {
return fmt.Errorf("marshal: %s: %w", Order.ID, err)
}
body.Write(doc)
body.WriteString("\n")
}
req := opensearchapi.BulkRequest{
Body: strings.NewReader(body.String()),
}
res, err := req.Do(ctx, o.Client)
if err != nil {
return fmt.Errorf("do: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}
return nil
}
// RefreshOrders refreshes index and index alias no matter which one is set.
func (o Opensearch) RefreshOrders(ctx context.Context) error {
req := opensearchapi.IndicesRefreshRequest{
Index: []string{o.OrderIndexAlias},
}
res, err := req.Do(ctx, o.Client)
if err != nil {
return fmt.Errorf("do: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}
return nil
}
// UpsertOrderMeta inserts an Order related meta document if not found otherwise
// updates it which is followed by an immediate index refresh.
func (o Opensearch) UpsertOrderMeta(ctx context.Context, meta document.OrderMeta, orderID string) error {
var body strings.Builder
body.WriteString(fmt.Sprintf(`{"update":{"_id":"%s","_index":"%s"}}`, orderID, o.OrderMetaAlias))
body.WriteString("\n")
doc, err := json.Marshal(map[string]any{"doc": meta, "doc_as_upsert": true})
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
body.Write(doc)
body.WriteString("\n")
req := opensearchapi.BulkRequest{
Body: strings.NewReader(body.String()),
Refresh: "true",
}
res, err := req.Do(ctx, o.Client)
if err != nil {
return fmt.Errorf("do: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}
return nil
}
// FindOrderMeta returns a document if found.
func (o Opensearch) FindOrderMeta(ctx context.Context, orderID string) (document.OrderMeta, error) {
req := opensearchapi.GetRequest{
Index: o.OrderMetaAlias,
DocumentID: orderID,
}
res, err := req.Do(ctx, o.Client)
if err != nil {
return document.OrderMeta{}, fmt.Errorf("do: %w", err)
}
defer res.Body.Close()
if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return document.OrderMeta{}, nil
}
return document.OrderMeta{}, fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}
var (
doc = document.OrderMeta{}
found = response{Model: &doc}
)
if err := json.NewDecoder(res.Body).Decode(&found); err != nil {
return document.OrderMeta{}, fmt.Errorf("response decode: %w", err)
}
if !found.Found {
return document.OrderMeta{}, nil
}
return doc, nil
}
# shopping/src/storage/opensearch/response.go
package opensearch
type response struct {
Found bool `json:"found"`
Model any `json:"_source"`
}
PUT /order_index
{
"aliases": {
"order_index_alias": {}
}
}
PUT /order_meta_index
{
"aliases": {
"order_meta_index_alias": {}
}
}
$ curl -i -X PUT \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index" \
-H "Content-Type: application/json" \
-d '{"aliases":{"order_index_alias":{}}}'
$ curl -i -X PUT \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_meta_index" \
-H "Content-Type: application/json" \
-d '{"aliases":{"order_meta_index_alias":{}}}'
$ curl -i -X POST \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/_bulk" \
-H "Content-Type: application/json" \
-d '
{"update":{"_id":"order-path-1","_index":"order_index_alias"}}
{"doc":{"id":"order-id-1","path":"order-path-1","data":{"key":"val"}}, "doc_as_upsert": true}
{"update":{"_id":"order-path-2","_index":"order_index_alias"}}
{"doc":{"id":"order-id-2","path":"order-path-2","data":{"key":"val"}}, "doc_as_upsert": true}
'
$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_doc/order-path-1"
$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_search" \
-H "Content-Type: application/json" \
-d '{"query":{"terms":{"_id":["order-path-1"]}}}'
$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_search"
$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_search?size=20&pretty=true"
$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_refresh"
$ curl -i -X POST \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_delete_by_query" \
-H "Content-Type: application/json" \
-d '{"query":{"match_all":{}}}'
$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_search" \
-H "Content-Type: application/json" \
-d '
{
"query": {
"bool": {
"must": [
{
"match": {
"data.label.list": "one"
}
},
{
"match": {
"data.label.region": "Asia"
}
},
{
"match": {
"data.label.active": true
}
}
]
}
}
}
'