Files
es-demo/operations/index/document.go

313 lines
7.5 KiB
Go

// Package index provides index-level operations for OpenSearch.
package index
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"es-demo/client"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
)
var (
// ErrDocumentNotFound is returned when the specified document does not exist.
ErrDocumentNotFound = fmt.Errorf("document not found")
)
// Document represents a document in OpenSearch.
type Document struct {
ID string
Source map[string]any
}
// IndexDocument indexes a document (create or update).
// If ID is empty, OpenSearch will generate one automatically.
func IndexDocument(ctx context.Context, c *client.Client, indexName string, doc *Document) (string, error) {
if indexName == "" {
return "", ErrInvalidIndexName
}
if doc == nil || doc.Source == nil {
return "", fmt.Errorf("document cannot be nil")
}
data, err := json.Marshal(doc.Source)
if err != nil {
return "", fmt.Errorf("failed to marshal document: %w", err)
}
req := opensearchapi.IndexRequest{
Index: indexName,
DocumentID: doc.ID,
Body: bytes.NewReader(data),
Refresh: "true", // Make document immediately searchable
}
res, err := req.Do(ctx, c.GetClient())
if err != nil {
return "", fmt.Errorf("failed to execute index document request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return "", fmt.Errorf("index document failed with status %s: %s", res.Status(), string(bodyBytes))
}
// Parse response to get document ID
var response struct {
ID string `json:"_id"`
}
bodyBytes, _ := io.ReadAll(res.Body)
if err := json.Unmarshal(bodyBytes, &response); err != nil {
return "", fmt.Errorf("failed to decode response: %w", err)
}
return response.ID, nil
}
// GetDocument retrieves a document by ID.
func GetDocument(ctx context.Context, c *client.Client, indexName string, docID string) (*Document, error) {
if indexName == "" {
return nil, ErrInvalidIndexName
}
if docID == "" {
return nil, fmt.Errorf("document ID cannot be empty")
}
req := opensearchapi.GetRequest{
Index: indexName,
DocumentID: docID,
}
res, err := req.Do(ctx, c.GetClient())
if err != nil {
return nil, fmt.Errorf("failed to execute get document request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode == 404 {
return nil, ErrDocumentNotFound
}
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("get document failed with status %s: %s", res.Status(), string(bodyBytes))
}
var response struct {
ID string `json:"_id"`
Found bool `json:"found"`
Source map[string]any `json:"_source"`
}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return nil, fmt.Errorf("failed to decode document: %w", err)
}
if !response.Found {
return nil, ErrDocumentNotFound
}
return &Document{
ID: response.ID,
Source: response.Source,
}, nil
}
// UpdateDocument updates a document by ID using partial update.
func UpdateDocument(ctx context.Context, c *client.Client, indexName string, docID string, partialDoc map[string]any) error {
if indexName == "" {
return ErrInvalidIndexName
}
if docID == "" {
return fmt.Errorf("document ID cannot be empty")
}
if len(partialDoc) == 0 {
return fmt.Errorf("partial document cannot be empty")
}
updateBody := map[string]any{
"doc": partialDoc,
}
data, err := json.Marshal(updateBody)
if err != nil {
return fmt.Errorf("failed to marshal update body: %w", err)
}
req := opensearchapi.UpdateRequest{
Index: indexName,
DocumentID: docID,
Body: bytes.NewReader(data),
Refresh: "true",
}
res, err := req.Do(ctx, c.GetClient())
if err != nil {
return fmt.Errorf("failed to execute update document request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode == 404 {
return ErrDocumentNotFound
}
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("update document failed with status %s: %s", res.Status(), string(bodyBytes))
}
return nil
}
// DeleteDocument deletes a document by ID.
func DeleteDocument(ctx context.Context, c *client.Client, indexName string, docID string) error {
if indexName == "" {
return ErrInvalidIndexName
}
if docID == "" {
return fmt.Errorf("document ID cannot be empty")
}
req := opensearchapi.DeleteRequest{
Index: indexName,
DocumentID: docID,
Refresh: "true",
}
res, err := req.Do(ctx, c.GetClient())
if err != nil {
return fmt.Errorf("failed to execute delete document request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode == 404 {
return ErrDocumentNotFound
}
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("delete document failed with status %s: %s", res.Status(), string(bodyBytes))
}
return nil
}
// BulkIndexDocuments indexes multiple documents in a single request.
func BulkIndexDocuments(ctx context.Context, c *client.Client, indexName string, docs []*Document) error {
if indexName == "" {
return ErrInvalidIndexName
}
if len(docs) == 0 {
return fmt.Errorf("documents cannot be empty")
}
// Build bulk request body
var buf bytes.Buffer
for _, doc := range docs {
// Action line
action := map[string]any{
"index": map[string]any{
"_index": indexName,
},
}
if doc.ID != "" {
action["index"].(map[string]any)["_id"] = doc.ID
}
actionData, err := json.Marshal(action)
if err != nil {
return fmt.Errorf("failed to marshal action: %w", err)
}
buf.Write(actionData)
buf.WriteByte('\n')
// Document line
docData, err := json.Marshal(doc.Source)
if err != nil {
return fmt.Errorf("failed to marshal document: %w", err)
}
buf.Write(docData)
buf.WriteByte('\n')
}
req := opensearchapi.BulkRequest{
Body: &buf,
Refresh: "true",
}
res, err := req.Do(ctx, c.GetClient())
if err != nil {
return fmt.Errorf("failed to execute bulk request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("bulk request failed with status %s: %s", res.Status(), string(bodyBytes))
}
// Check for errors in bulk response
var response struct {
Errors bool `json:"errors"`
Items []map[string]struct {
Error *struct {
Type string `json:"type"`
Reason string `json:"reason"`
} `json:"error"`
} `json:"items"`
}
bodyBytes, _ := io.ReadAll(res.Body)
if err := json.Unmarshal(bodyBytes, &response); err != nil {
return fmt.Errorf("failed to decode bulk response: %w", err)
}
if response.Errors {
// Collect error details
var errorMsg string
for i, item := range response.Items {
for _, itemData := range item {
if itemData.Error != nil {
errorMsg += fmt.Sprintf("doc %d: %s - %s; ", i, itemData.Error.Type, itemData.Error.Reason)
}
}
}
return fmt.Errorf("bulk indexing had errors: %s", errorMsg)
}
return nil
}