01/11/2024 - AWS, DOCKER, ELASTICSEARCH, GO
Bu örnekte, OpenSearch ile Golang'da belgeleri toplu ekleme/yenileme (upsert
) ve işlemin sonunda manuel olarak yenilemeyi (refresh
) nasıl kullanılabileceğini göstereceğiz. Bununla birlikte, tek bir belgeyi ekleme/yenileme (upsert
) yapıp dizini otomatik olarak yenileyeceğiz. Son olarak, bir belgeyi kimliğine göre çekeceğiz. Dikkat edilmesi gereken bir nokta, somut dizin yerine (index) doğrudan dizin takma adıyla (index alias) çalıştığımızdır.
http://0.0.0.0:5601/
ile arayüze ulaşıp, admin:Adm1n_Pa55w0rd?
login olacağız.
version: "3.8"
services:
opensearch-node:
image: opensearchproject/opensearch:2.13.0
container_name: opensearch-node
environment:
- discovery.type=single-node
- OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=Adm1n_Pa55w0rd?
ports:
- 9200:9200 # Rest HTTP
- 9600:9600 # Dashboard HTTP
opensearch-dashboard:
image: opensearchproject/opensearch-dashboards:2.13.0
container_name: opensearch-dashboard
environment:
OPENSEARCH_HOSTS: '["https://opensearch-node:9200"]'
ports:
- 5601:5601
ADDRESS = https://0.0.0.0:9200
CREDENTIALS = admin:Adm1n_Pa55w0rd?
.PHONY: opensearch-up
opensearch-up: ## Bring up OpenSearch instance.
docker-compose up
.PHONY: index-create
index-create: ## Create index and alias.
@curl -X PUT --insecure -u "${CREDENTIALS}" "${ADDRESS}/order_index" -H "Content-Type: application/json" -d '{"aliases":{"order_index_alias":{}}}'; echo
@curl -X PUT --insecure -u "${CREDENTIALS}" "${ADDRESS}/order_meta_index" -H "Content-Type: application/json" -d '{"aliases":{"order_meta_index_alias":{}}}'; echo
.PHONY: index-clear
index-clear: ## Delete all documents in index.
@curl -X POST --insecure -u "${CREDENTIALS}" "${ADDRESS}/order_index/_delete_by_query" -H "Content-Type: application/json" -d '{"query":{"match_all":{}}}'; echo
@curl -X POST --insecure -u "${CREDENTIALS}" "${ADDRESS}/order_meta_index/_delete_by_query" -H "Content-Type: application/json" -d '{"query":{"match_all":{}}}'; echo
.PHONY: index-load
index-load: ## Populate index with test documents.
@curl -i -X POST --insecure -u "${CREDENTIALS}" "${ADDRESS}/_bulk" -H "Content-Type: application/json" --data-binary @script/opensearch/fixture/order_index.txt; echo
@curl -i -X POST --insecure -u "${CREDENTIALS}" "${ADDRESS}/_bulk" -H "Content-Type: application/json" --data-binary @script/opensearch/fixture/order_meta_index.txt; echo
{"index":{"_index":"order_index","_id":"id-1"}}
{"id":"id-1","data":{"label":{"list":["one"],"active":false,"region":["Euro"]}}}
{"index":{"_index":"order_index","_id":"id-2"}}
{"id":"id-2","data":{"label":{"list":["one","two"],"active":true,"region":["Asia"]}}}
{"index":{"_index":"order_index","_id":"id-3"}}
{"id":"id-3","data":{"label":{"list":["one","two","three"],"active":false,"region":["Africa"]}}}
{"index":{"_index":"order_index","_id":"id-4"}}
{"id":"id-4","data":{"label":{"list":["two","three"],"active":true,"region":["Euro"]}}}
{"index":{"_index":"order_index","_id":"id-5"}}
{"id":"id-5","data":{"label":{"list":["three"],"active":true,"region":["Africa", "Asia"]}}}
{"index":{"_index":"order_meta_index","_id":"id-1"}}
{"updated_at": "2024-12-01T12:02:02.022022Z"}
package xopensearch
import (
"crypto/tls"
"fmt"
"net/http"
"github.com/opensearch-project/opensearch-go"
)
type Config struct {
Insecure bool
Username string
Password string
Addresses []string
}
// New returns a new OpenSearch client instance.
func New(cfg Config) (*opensearch.Client, error) {
cli, err := opensearch.NewClient(opensearch.Config{
Username: cfg.Username,
Password: cfg.Password,
Addresses: cfg.Addresses,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: cfg.Insecure,
},
},
})
if err != nil {
return nil, fmt.Errorf("unable to create client: %w", err)
}
res, err := cli.Info()
if err != nil {
return nil, fmt.Errorf("unable to get client info: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}
return cli, nil
}
func main() {
client, err := xopensearch.New(xopensearch.Config{
Insecure: true,
Username: "admin",
Password: "Adm1n_Pa55w0rd?",
Addresses: []string{"https://localhost:9200"},
})
if err != nil {
log.Fatalln("connection:", err)
}
}
# shopping/src/model/storage/document/order_meta.go
package document
import (
"time"
)
type OrderMeta struct {
UpdatedAt time.Time `json:"updated_at"`
}
# shopping/src/model/storage/document/order.go
package document
import (
"encoding/json"
)
type Order struct {
ID string `json:"id"`
Data json.RawMessage `json:"data"`
}
Burada özel hataları uygulayıp bunları döndürmelisiniz.
# shopping/src/storage/opensearch/opensearch.go
package opensearch
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"shopping/src/model/storage/document"
"github.com/opensearch-project/opensearch-go"
"github.com/opensearch-project/opensearch-go/opensearchapi"
)
type Opensearch struct {
Client *opensearch.Client
OrderIndexAlias string
OrderMetaAlias string
}
// UpsertOrders bulk inserts Order related documents if not found otherwise
// updates.
func (o Opensearch) UpsertOrders(ctx context.Context, Orders []*document.Order) error {
var body strings.Builder
for _, Order := range Orders {
body.WriteString(fmt.Sprintf(`{"update":{"_id":"%s","_index":"%s"}}`, Order.ID, o.OrderIndexAlias))
body.WriteString("\n")
doc, err := json.Marshal(map[string]any{"doc": Order, "doc_as_upsert": true})
if err != nil {
return fmt.Errorf("marshal: %s: %w", Order.ID, err)
}
body.Write(doc)
body.WriteString("\n")
}
req := opensearchapi.BulkRequest{
Body: strings.NewReader(body.String()),
}
res, err := req.Do(ctx, o.Client)
if err != nil {
return fmt.Errorf("do: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}
return nil
}
// RefreshOrders refreshes index and index alias no matter which one is set.
func (o Opensearch) RefreshOrders(ctx context.Context) error {
req := opensearchapi.IndicesRefreshRequest{
Index: []string{o.OrderIndexAlias},
}
res, err := req.Do(ctx, o.Client)
if err != nil {
return fmt.Errorf("do: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}
return nil
}
// UpsertOrderMeta inserts an Order related meta document if not found otherwise
// updates it which is followed by an immediate index refresh.
func (o Opensearch) UpsertOrderMeta(ctx context.Context, meta document.OrderMeta, orderID string) error {
var body strings.Builder
body.WriteString(fmt.Sprintf(`{"update":{"_id":"%s","_index":"%s"}}`, orderID, o.OrderMetaAlias))
body.WriteString("\n")
doc, err := json.Marshal(map[string]any{"doc": meta, "doc_as_upsert": true})
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
body.Write(doc)
body.WriteString("\n")
req := opensearchapi.BulkRequest{
Body: strings.NewReader(body.String()),
Refresh: "true",
}
res, err := req.Do(ctx, o.Client)
if err != nil {
return fmt.Errorf("do: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}
return nil
}
// FindOrderMeta returns a document if found.
func (o Opensearch) FindOrderMeta(ctx context.Context, orderID string) (document.OrderMeta, error) {
req := opensearchapi.GetRequest{
Index: o.OrderMetaAlias,
DocumentID: orderID,
}
res, err := req.Do(ctx, o.Client)
if err != nil {
return document.OrderMeta{}, fmt.Errorf("do: %w", err)
}
defer res.Body.Close()
if res.IsError() {
if res.StatusCode == http.StatusNotFound {
return document.OrderMeta{}, nil
}
return document.OrderMeta{}, fmt.Errorf("unexpected response: %s - %s", res.Status(), res.String())
}
var (
doc = document.OrderMeta{}
found = response{Model: &doc}
)
if err := json.NewDecoder(res.Body).Decode(&found); err != nil {
return document.OrderMeta{}, fmt.Errorf("response decode: %w", err)
}
if !found.Found {
return document.OrderMeta{}, nil
}
return doc, nil
}
# shopping/src/storage/opensearch/response.go
package opensearch
type response struct {
Found bool `json:"found"`
Model any `json:"_source"`
}
PUT /order_index
{
"aliases": {
"order_index_alias": {}
}
}
PUT /order_meta_index
{
"aliases": {
"order_meta_index_alias": {}
}
}
$ curl -i -X PUT \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index" \
-H "Content-Type: application/json" \
-d '{"aliases":{"order_index_alias":{}}}'
$ curl -i -X PUT \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_meta_index" \
-H "Content-Type: application/json" \
-d '{"aliases":{"order_meta_index_alias":{}}}'
$ curl -i -X POST \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/_bulk" \
-H "Content-Type: application/json" \
-d '
{"update":{"_id":"order-path-1","_index":"order_index_alias"}}
{"doc":{"id":"order-id-1","path":"order-path-1","data":{"key":"val"}}, "doc_as_upsert": true}
{"update":{"_id":"order-path-2","_index":"order_index_alias"}}
{"doc":{"id":"order-id-2","path":"order-path-2","data":{"key":"val"}}, "doc_as_upsert": true}
'
$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_doc/order-path-1"
$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_search" \
-H "Content-Type: application/json" \
-d '{"query":{"terms":{"_id":["order-path-1"]}}}'
$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_search"
$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_search?size=20&pretty=true"
$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_refresh"
$ curl -i -X POST \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_delete_by_query" \
-H "Content-Type: application/json" \
-d '{"query":{"match_all":{}}}'
$ curl -i -X GET \
--insecure -u "admin:Adm1n_Pa55w0rd?" \
"https://0.0.0.0:9200/order_index_alias/_search" \
-H "Content-Type: application/json" \
-d '
{
"query": {
"bool": {
"must": [
{
"match": {
"data.label.list": "one"
}
},
{
"match": {
"data.label.region": "Asia"
}
},
{
"match": {
"data.label.active": true
}
}
]
}
}
}
'