// Package index provides index-level operations for OpenSearch. package index import ( "bytes" "context" "encoding/json" "fmt" "io" "os" "es-demo/client" "github.com/opensearch-project/opensearch-go/v2/opensearchapi" ) var ( // ErrDocumentNotFound is returned when the specified document does not exist. ErrDocumentNotFound = fmt.Errorf("document not found") ) // Document represents a document in OpenSearch. type Document struct { ID string Source map[string]any } // IndexDocument indexes a document (create or update). // If ID is empty, OpenSearch will generate one automatically. func IndexDocument(ctx context.Context, c *client.Client, indexName string, doc *Document) (string, error) { if indexName == "" { return "", ErrInvalidIndexName } if doc == nil || doc.Source == nil { return "", fmt.Errorf("document cannot be nil") } data, err := json.Marshal(doc.Source) if err != nil { return "", fmt.Errorf("failed to marshal document: %w", err) } req := opensearchapi.IndexRequest{ Index: indexName, DocumentID: doc.ID, Body: bytes.NewReader(data), Refresh: "true", // Make document immediately searchable } res, err := req.Do(ctx, c.GetClient()) if err != nil { return "", fmt.Errorf("failed to execute index document request: %w", err) } defer func() { if closeErr := res.Body.Close(); closeErr != nil { fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) } }() if res.IsError() { bodyBytes, _ := io.ReadAll(res.Body) return "", fmt.Errorf("index document failed with status %s: %s", res.Status(), string(bodyBytes)) } // Parse response to get document ID var response struct { ID string `json:"_id"` } bodyBytes, _ := io.ReadAll(res.Body) if err := json.Unmarshal(bodyBytes, &response); err != nil { return "", fmt.Errorf("failed to decode response: %w", err) } return response.ID, nil } // GetDocument retrieves a document by ID. func GetDocument(ctx context.Context, c *client.Client, indexName string, docID string) (*Document, error) { if indexName == "" { return nil, ErrInvalidIndexName } if docID == "" { return nil, fmt.Errorf("document ID cannot be empty") } req := opensearchapi.GetRequest{ Index: indexName, DocumentID: docID, } res, err := req.Do(ctx, c.GetClient()) if err != nil { return nil, fmt.Errorf("failed to execute get document request: %w", err) } defer func() { if closeErr := res.Body.Close(); closeErr != nil { fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) } }() if res.StatusCode == 404 { return nil, ErrDocumentNotFound } if res.IsError() { bodyBytes, _ := io.ReadAll(res.Body) return nil, fmt.Errorf("get document failed with status %s: %s", res.Status(), string(bodyBytes)) } var response struct { ID string `json:"_id"` Found bool `json:"found"` Source map[string]any `json:"_source"` } if err := json.NewDecoder(res.Body).Decode(&response); err != nil { return nil, fmt.Errorf("failed to decode document: %w", err) } if !response.Found { return nil, ErrDocumentNotFound } return &Document{ ID: response.ID, Source: response.Source, }, nil } // UpdateDocument updates a document by ID using partial update. func UpdateDocument(ctx context.Context, c *client.Client, indexName string, docID string, partialDoc map[string]any) error { if indexName == "" { return ErrInvalidIndexName } if docID == "" { return fmt.Errorf("document ID cannot be empty") } if len(partialDoc) == 0 { return fmt.Errorf("partial document cannot be empty") } updateBody := map[string]any{ "doc": partialDoc, } data, err := json.Marshal(updateBody) if err != nil { return fmt.Errorf("failed to marshal update body: %w", err) } req := opensearchapi.UpdateRequest{ Index: indexName, DocumentID: docID, Body: bytes.NewReader(data), Refresh: "true", } res, err := req.Do(ctx, c.GetClient()) if err != nil { return fmt.Errorf("failed to execute update document request: %w", err) } defer func() { if closeErr := res.Body.Close(); closeErr != nil { fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) } }() if res.StatusCode == 404 { return ErrDocumentNotFound } if res.IsError() { bodyBytes, _ := io.ReadAll(res.Body) return fmt.Errorf("update document failed with status %s: %s", res.Status(), string(bodyBytes)) } return nil } // DeleteDocument deletes a document by ID. func DeleteDocument(ctx context.Context, c *client.Client, indexName string, docID string) error { if indexName == "" { return ErrInvalidIndexName } if docID == "" { return fmt.Errorf("document ID cannot be empty") } req := opensearchapi.DeleteRequest{ Index: indexName, DocumentID: docID, Refresh: "true", } res, err := req.Do(ctx, c.GetClient()) if err != nil { return fmt.Errorf("failed to execute delete document request: %w", err) } defer func() { if closeErr := res.Body.Close(); closeErr != nil { fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) } }() if res.StatusCode == 404 { return ErrDocumentNotFound } if res.IsError() { bodyBytes, _ := io.ReadAll(res.Body) return fmt.Errorf("delete document failed with status %s: %s", res.Status(), string(bodyBytes)) } return nil } // BulkIndexDocuments indexes multiple documents in a single request. func BulkIndexDocuments(ctx context.Context, c *client.Client, indexName string, docs []*Document) error { if indexName == "" { return ErrInvalidIndexName } if len(docs) == 0 { return fmt.Errorf("documents cannot be empty") } // Build bulk request body var buf bytes.Buffer for _, doc := range docs { // Action line action := map[string]any{ "index": map[string]any{ "_index": indexName, }, } if doc.ID != "" { action["index"].(map[string]any)["_id"] = doc.ID } actionData, err := json.Marshal(action) if err != nil { return fmt.Errorf("failed to marshal action: %w", err) } buf.Write(actionData) buf.WriteByte('\n') // Document line docData, err := json.Marshal(doc.Source) if err != nil { return fmt.Errorf("failed to marshal document: %w", err) } buf.Write(docData) buf.WriteByte('\n') } req := opensearchapi.BulkRequest{ Body: &buf, Refresh: "true", } res, err := req.Do(ctx, c.GetClient()) if err != nil { return fmt.Errorf("failed to execute bulk request: %w", err) } defer func() { if closeErr := res.Body.Close(); closeErr != nil { fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) } }() if res.IsError() { bodyBytes, _ := io.ReadAll(res.Body) return fmt.Errorf("bulk request failed with status %s: %s", res.Status(), string(bodyBytes)) } // Check for errors in bulk response var response struct { Errors bool `json:"errors"` Items []map[string]struct { Error *struct { Type string `json:"type"` Reason string `json:"reason"` } `json:"error"` } `json:"items"` } bodyBytes, _ := io.ReadAll(res.Body) if err := json.Unmarshal(bodyBytes, &response); err != nil { return fmt.Errorf("failed to decode bulk response: %w", err) } if response.Errors { // Collect error details var errorMsg string for i, item := range response.Items { for _, itemData := range item { if itemData.Error != nil { errorMsg += fmt.Sprintf("doc %d: %s - %s; ", i, itemData.Error.Type, itemData.Error.Reason) } } } return fmt.Errorf("bulk indexing had errors: %s", errorMsg) } return nil }