313 lines
7.5 KiB
Go
313 lines
7.5 KiB
Go
// Package index provides index-level operations for OpenSearch.
|
|
package index
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
|
|
"es-demo/client"
|
|
|
|
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
|
|
)
|
|
|
|
var (
|
|
// ErrDocumentNotFound is returned when the specified document does not exist.
|
|
ErrDocumentNotFound = fmt.Errorf("document not found")
|
|
)
|
|
|
|
// Document represents a document in OpenSearch.
|
|
type Document struct {
|
|
ID string
|
|
Source map[string]any
|
|
}
|
|
|
|
// IndexDocument indexes a document (create or update).
|
|
// If ID is empty, OpenSearch will generate one automatically.
|
|
func IndexDocument(ctx context.Context, c *client.Client, indexName string, doc *Document) (string, error) {
|
|
if indexName == "" {
|
|
return "", ErrInvalidIndexName
|
|
}
|
|
|
|
if doc == nil || doc.Source == nil {
|
|
return "", fmt.Errorf("document cannot be nil")
|
|
}
|
|
|
|
data, err := json.Marshal(doc.Source)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to marshal document: %w", err)
|
|
}
|
|
|
|
req := opensearchapi.IndexRequest{
|
|
Index: indexName,
|
|
DocumentID: doc.ID,
|
|
Body: bytes.NewReader(data),
|
|
Refresh: "true", // Make document immediately searchable
|
|
}
|
|
|
|
res, err := req.Do(ctx, c.GetClient())
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to execute index document request: %w", err)
|
|
}
|
|
defer func() {
|
|
if closeErr := res.Body.Close(); closeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
|
}
|
|
}()
|
|
|
|
if res.IsError() {
|
|
bodyBytes, _ := io.ReadAll(res.Body)
|
|
return "", fmt.Errorf("index document failed with status %s: %s", res.Status(), string(bodyBytes))
|
|
}
|
|
|
|
// Parse response to get document ID
|
|
var response struct {
|
|
ID string `json:"_id"`
|
|
}
|
|
|
|
bodyBytes, _ := io.ReadAll(res.Body)
|
|
if err := json.Unmarshal(bodyBytes, &response); err != nil {
|
|
return "", fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return response.ID, nil
|
|
}
|
|
|
|
// GetDocument retrieves a document by ID.
|
|
func GetDocument(ctx context.Context, c *client.Client, indexName string, docID string) (*Document, error) {
|
|
if indexName == "" {
|
|
return nil, ErrInvalidIndexName
|
|
}
|
|
|
|
if docID == "" {
|
|
return nil, fmt.Errorf("document ID cannot be empty")
|
|
}
|
|
|
|
req := opensearchapi.GetRequest{
|
|
Index: indexName,
|
|
DocumentID: docID,
|
|
}
|
|
|
|
res, err := req.Do(ctx, c.GetClient())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to execute get document request: %w", err)
|
|
}
|
|
defer func() {
|
|
if closeErr := res.Body.Close(); closeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
|
}
|
|
}()
|
|
|
|
if res.StatusCode == 404 {
|
|
return nil, ErrDocumentNotFound
|
|
}
|
|
|
|
if res.IsError() {
|
|
bodyBytes, _ := io.ReadAll(res.Body)
|
|
return nil, fmt.Errorf("get document failed with status %s: %s", res.Status(), string(bodyBytes))
|
|
}
|
|
|
|
var response struct {
|
|
ID string `json:"_id"`
|
|
Found bool `json:"found"`
|
|
Source map[string]any `json:"_source"`
|
|
}
|
|
|
|
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
|
|
return nil, fmt.Errorf("failed to decode document: %w", err)
|
|
}
|
|
|
|
if !response.Found {
|
|
return nil, ErrDocumentNotFound
|
|
}
|
|
|
|
return &Document{
|
|
ID: response.ID,
|
|
Source: response.Source,
|
|
}, nil
|
|
}
|
|
|
|
// UpdateDocument updates a document by ID using partial update.
|
|
func UpdateDocument(ctx context.Context, c *client.Client, indexName string, docID string, partialDoc map[string]any) error {
|
|
if indexName == "" {
|
|
return ErrInvalidIndexName
|
|
}
|
|
|
|
if docID == "" {
|
|
return fmt.Errorf("document ID cannot be empty")
|
|
}
|
|
|
|
if len(partialDoc) == 0 {
|
|
return fmt.Errorf("partial document cannot be empty")
|
|
}
|
|
|
|
updateBody := map[string]any{
|
|
"doc": partialDoc,
|
|
}
|
|
|
|
data, err := json.Marshal(updateBody)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal update body: %w", err)
|
|
}
|
|
|
|
req := opensearchapi.UpdateRequest{
|
|
Index: indexName,
|
|
DocumentID: docID,
|
|
Body: bytes.NewReader(data),
|
|
Refresh: "true",
|
|
}
|
|
|
|
res, err := req.Do(ctx, c.GetClient())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to execute update document request: %w", err)
|
|
}
|
|
defer func() {
|
|
if closeErr := res.Body.Close(); closeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
|
}
|
|
}()
|
|
|
|
if res.StatusCode == 404 {
|
|
return ErrDocumentNotFound
|
|
}
|
|
|
|
if res.IsError() {
|
|
bodyBytes, _ := io.ReadAll(res.Body)
|
|
return fmt.Errorf("update document failed with status %s: %s", res.Status(), string(bodyBytes))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteDocument deletes a document by ID.
|
|
func DeleteDocument(ctx context.Context, c *client.Client, indexName string, docID string) error {
|
|
if indexName == "" {
|
|
return ErrInvalidIndexName
|
|
}
|
|
|
|
if docID == "" {
|
|
return fmt.Errorf("document ID cannot be empty")
|
|
}
|
|
|
|
req := opensearchapi.DeleteRequest{
|
|
Index: indexName,
|
|
DocumentID: docID,
|
|
Refresh: "true",
|
|
}
|
|
|
|
res, err := req.Do(ctx, c.GetClient())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to execute delete document request: %w", err)
|
|
}
|
|
defer func() {
|
|
if closeErr := res.Body.Close(); closeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
|
}
|
|
}()
|
|
|
|
if res.StatusCode == 404 {
|
|
return ErrDocumentNotFound
|
|
}
|
|
|
|
if res.IsError() {
|
|
bodyBytes, _ := io.ReadAll(res.Body)
|
|
return fmt.Errorf("delete document failed with status %s: %s", res.Status(), string(bodyBytes))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// BulkIndexDocuments indexes multiple documents in a single request.
|
|
func BulkIndexDocuments(ctx context.Context, c *client.Client, indexName string, docs []*Document) error {
|
|
if indexName == "" {
|
|
return ErrInvalidIndexName
|
|
}
|
|
|
|
if len(docs) == 0 {
|
|
return fmt.Errorf("documents cannot be empty")
|
|
}
|
|
|
|
// Build bulk request body
|
|
var buf bytes.Buffer
|
|
for _, doc := range docs {
|
|
// Action line
|
|
action := map[string]any{
|
|
"index": map[string]any{
|
|
"_index": indexName,
|
|
},
|
|
}
|
|
if doc.ID != "" {
|
|
action["index"].(map[string]any)["_id"] = doc.ID
|
|
}
|
|
|
|
actionData, err := json.Marshal(action)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal action: %w", err)
|
|
}
|
|
buf.Write(actionData)
|
|
buf.WriteByte('\n')
|
|
|
|
// Document line
|
|
docData, err := json.Marshal(doc.Source)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal document: %w", err)
|
|
}
|
|
buf.Write(docData)
|
|
buf.WriteByte('\n')
|
|
}
|
|
|
|
req := opensearchapi.BulkRequest{
|
|
Body: &buf,
|
|
Refresh: "true",
|
|
}
|
|
|
|
res, err := req.Do(ctx, c.GetClient())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to execute bulk request: %w", err)
|
|
}
|
|
defer func() {
|
|
if closeErr := res.Body.Close(); closeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
|
}
|
|
}()
|
|
|
|
if res.IsError() {
|
|
bodyBytes, _ := io.ReadAll(res.Body)
|
|
return fmt.Errorf("bulk request failed with status %s: %s", res.Status(), string(bodyBytes))
|
|
}
|
|
|
|
// Check for errors in bulk response
|
|
var response struct {
|
|
Errors bool `json:"errors"`
|
|
Items []map[string]struct {
|
|
Error *struct {
|
|
Type string `json:"type"`
|
|
Reason string `json:"reason"`
|
|
} `json:"error"`
|
|
} `json:"items"`
|
|
}
|
|
|
|
bodyBytes, _ := io.ReadAll(res.Body)
|
|
if err := json.Unmarshal(bodyBytes, &response); err != nil {
|
|
return fmt.Errorf("failed to decode bulk response: %w", err)
|
|
}
|
|
|
|
if response.Errors {
|
|
// Collect error details
|
|
var errorMsg string
|
|
for i, item := range response.Items {
|
|
for _, itemData := range item {
|
|
if itemData.Error != nil {
|
|
errorMsg += fmt.Sprintf("doc %d: %s - %s; ", i, itemData.Error.Type, itemData.Error.Reason)
|
|
}
|
|
}
|
|
}
|
|
return fmt.Errorf("bulk indexing had errors: %s", errorMsg)
|
|
}
|
|
|
|
return nil
|
|
}
|