diff --git a/README.md b/README.md index 21aea09..edaf851 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,17 @@ es-demo/ │ └── cluster_test.go # 集群操作测试 └── index/ # 索引级别业务操作 ├── template.go # 索引模板管理(CRUD) - └── template_test.go # 模板操作测试 + ├── template_test.go # 模板操作测试 + ├── ism.go # ISM 策略管理 + ├── policy.go # 统一策略接口 + ├── index.go # 索引 CRUD 操作 + ├── index_test.go # 索引操作测试 + ├── mapping.go # 索引 Mapping 管理 + ├── mapping_test.go # Mapping 操作测试 + ├── document.go # 文档 CRUD 操作 + ├── document_test.go # 文档操作测试 + ├── search.go # 全文搜索功能 + └── search_test.go # 搜索功能测试 ``` ## 开发原则 @@ -135,16 +145,49 @@ es-demo/ - ListPolicies: 列出所有策略 - 策略配置验证 +- [x] 索引管理(Index Management) + - CreateIndex: 创建索引(支持 Settings 和 Mappings 配置) + - GetIndex: 获取索引信息 + - DeleteIndex: 删除索引 + - IndexExists: 检查索引是否存在 + - ListIndices: 列出匹配的索引 + - 索引名称验证和错误处理 + +- [x] Mapping 管理(Dynamic Mapping) + - PutMapping: 添加/更新字段映射 + - GetMapping: 获取索引映射信息 + - AddField: 便捷的单字段添加 + - FieldMapping 配置:Type、Index、Store、Analyzer、Format、IgnoreAbove + +- [x] 文档操作(Document CRUD) + - IndexDocument: 索引文档(支持自动生成或指定 ID) + - GetDocument: 根据 ID 获取文档 + - UpdateDocument: 部分更新文档 + - DeleteDocument: 删除文档 + - BulkIndexDocuments: 批量索引文档(NDJSON 格式) + +- [x] 全文搜索(Search) + - Search: 主搜索函数(支持分页、排序、字段过滤) + - Query 构建器: + - MatchQuery: 全文匹配查询 + - TermQuery: 精确词项查询 + - RangeQuery: 范围查询 + - BoolQuery: 布尔组合查询(Must、Should、MustNot、Filter) + - MultiMatchQuery: 多字段匹配 + - WildcardQuery: 通配符查询 + - PrefixQuery: 前缀查询 + - MatchAllQuery: 匹配所有文档 + ### 待实现功能 以下功能将根据实际需求逐步实现: -- [ ] 索引管理(创建、删除、更新索引) -- [ ] 文档操作(CRUD) -- [ ] 搜索功能(基础查询、复杂查询) -- [ ] 聚合查询 -- [ ] 批量操作 +- [ ] 聚合查询(Aggregations) +- [ ] 复杂查询组合(嵌套、父子文档) +- [ ] 索引别名管理 +- [ ] 快照和恢复 - [ ] 性能测试工具 +- [ ] 监控和指标 ## 快速开始 @@ -225,14 +268,16 @@ go tool cover -html=coverage.out -o coverage.html - [x] 测试流水线自动化(Lint → Build → Test → Cleanup) - [x] 代码质量保障(golangci-lint 集成) -### 第二阶段:核心功能(进行中) +### 第二阶段:核心功能(已完成) - [x] 集群运维操作(GetInfo) - [x] 索引模板管理(CRUD) -- [ ] 索引管理功能(创建、删除、更新索引) -- [ ] 文档 CRUD 操作 -- [ ] 基础搜索功能 -- [ ] 完善测试覆盖 +- [x] ISM 策略管理(CRUD) +- [x] 索引管理功能(创建、删除、查询、列表) +- [x] Mapping 动态管理(添加字段、获取映射) +- [x] 文档 CRUD 操作(包含批量索引) +- [x] 全文搜索功能(多种查询类型) +- [x] 完善测试覆盖(集成测试 + 单元测试) ### 第三阶段:高级功能 diff --git a/operations/index/document.go b/operations/index/document.go new file mode 100644 index 0000000..4431264 --- /dev/null +++ b/operations/index/document.go @@ -0,0 +1,312 @@ +// Package index provides index-level operations for OpenSearch. +package index + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "os" + + "es-demo/client" + + "github.com/opensearch-project/opensearch-go/v2/opensearchapi" +) + +var ( + // ErrDocumentNotFound is returned when the specified document does not exist. + ErrDocumentNotFound = fmt.Errorf("document not found") +) + +// Document represents a document in OpenSearch. +type Document struct { + ID string + Source map[string]any +} + +// IndexDocument indexes a document (create or update). +// If ID is empty, OpenSearch will generate one automatically. +func IndexDocument(ctx context.Context, c *client.Client, indexName string, doc *Document) (string, error) { + if indexName == "" { + return "", ErrInvalidIndexName + } + + if doc == nil || doc.Source == nil { + return "", fmt.Errorf("document cannot be nil") + } + + data, err := json.Marshal(doc.Source) + if err != nil { + return "", fmt.Errorf("failed to marshal document: %w", err) + } + + req := opensearchapi.IndexRequest{ + Index: indexName, + DocumentID: doc.ID, + Body: bytes.NewReader(data), + Refresh: "true", // Make document immediately searchable + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return "", fmt.Errorf("failed to execute index document request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + return "", fmt.Errorf("index document failed with status %s: %s", res.Status(), string(bodyBytes)) + } + + // Parse response to get document ID + var response struct { + ID string `json:"_id"` + } + + bodyBytes, _ := io.ReadAll(res.Body) + if err := json.Unmarshal(bodyBytes, &response); err != nil { + return "", fmt.Errorf("failed to decode response: %w", err) + } + + return response.ID, nil +} + +// GetDocument retrieves a document by ID. +func GetDocument(ctx context.Context, c *client.Client, indexName string, docID string) (*Document, error) { + if indexName == "" { + return nil, ErrInvalidIndexName + } + + if docID == "" { + return nil, fmt.Errorf("document ID cannot be empty") + } + + req := opensearchapi.GetRequest{ + Index: indexName, + DocumentID: docID, + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return nil, fmt.Errorf("failed to execute get document request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + if res.StatusCode == 404 { + return nil, ErrDocumentNotFound + } + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + return nil, fmt.Errorf("get document failed with status %s: %s", res.Status(), string(bodyBytes)) + } + + var response struct { + ID string `json:"_id"` + Found bool `json:"found"` + Source map[string]any `json:"_source"` + } + + if err := json.NewDecoder(res.Body).Decode(&response); err != nil { + return nil, fmt.Errorf("failed to decode document: %w", err) + } + + if !response.Found { + return nil, ErrDocumentNotFound + } + + return &Document{ + ID: response.ID, + Source: response.Source, + }, nil +} + +// UpdateDocument updates a document by ID using partial update. +func UpdateDocument(ctx context.Context, c *client.Client, indexName string, docID string, partialDoc map[string]any) error { + if indexName == "" { + return ErrInvalidIndexName + } + + if docID == "" { + return fmt.Errorf("document ID cannot be empty") + } + + if len(partialDoc) == 0 { + return fmt.Errorf("partial document cannot be empty") + } + + updateBody := map[string]any{ + "doc": partialDoc, + } + + data, err := json.Marshal(updateBody) + if err != nil { + return fmt.Errorf("failed to marshal update body: %w", err) + } + + req := opensearchapi.UpdateRequest{ + Index: indexName, + DocumentID: docID, + Body: bytes.NewReader(data), + Refresh: "true", + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return fmt.Errorf("failed to execute update document request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + if res.StatusCode == 404 { + return ErrDocumentNotFound + } + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + return fmt.Errorf("update document failed with status %s: %s", res.Status(), string(bodyBytes)) + } + + return nil +} + +// DeleteDocument deletes a document by ID. +func DeleteDocument(ctx context.Context, c *client.Client, indexName string, docID string) error { + if indexName == "" { + return ErrInvalidIndexName + } + + if docID == "" { + return fmt.Errorf("document ID cannot be empty") + } + + req := opensearchapi.DeleteRequest{ + Index: indexName, + DocumentID: docID, + Refresh: "true", + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return fmt.Errorf("failed to execute delete document request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + if res.StatusCode == 404 { + return ErrDocumentNotFound + } + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + return fmt.Errorf("delete document failed with status %s: %s", res.Status(), string(bodyBytes)) + } + + return nil +} + +// BulkIndexDocuments indexes multiple documents in a single request. +func BulkIndexDocuments(ctx context.Context, c *client.Client, indexName string, docs []*Document) error { + if indexName == "" { + return ErrInvalidIndexName + } + + if len(docs) == 0 { + return fmt.Errorf("documents cannot be empty") + } + + // Build bulk request body + var buf bytes.Buffer + for _, doc := range docs { + // Action line + action := map[string]any{ + "index": map[string]any{ + "_index": indexName, + }, + } + if doc.ID != "" { + action["index"].(map[string]any)["_id"] = doc.ID + } + + actionData, err := json.Marshal(action) + if err != nil { + return fmt.Errorf("failed to marshal action: %w", err) + } + buf.Write(actionData) + buf.WriteByte('\n') + + // Document line + docData, err := json.Marshal(doc.Source) + if err != nil { + return fmt.Errorf("failed to marshal document: %w", err) + } + buf.Write(docData) + buf.WriteByte('\n') + } + + req := opensearchapi.BulkRequest{ + Body: &buf, + Refresh: "true", + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return fmt.Errorf("failed to execute bulk request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + return fmt.Errorf("bulk request failed with status %s: %s", res.Status(), string(bodyBytes)) + } + + // Check for errors in bulk response + var response struct { + Errors bool `json:"errors"` + Items []map[string]struct { + Error *struct { + Type string `json:"type"` + Reason string `json:"reason"` + } `json:"error"` + } `json:"items"` + } + + bodyBytes, _ := io.ReadAll(res.Body) + if err := json.Unmarshal(bodyBytes, &response); err != nil { + return fmt.Errorf("failed to decode bulk response: %w", err) + } + + if response.Errors { + // Collect error details + var errorMsg string + for i, item := range response.Items { + for _, itemData := range item { + if itemData.Error != nil { + errorMsg += fmt.Sprintf("doc %d: %s - %s; ", i, itemData.Error.Type, itemData.Error.Reason) + } + } + } + return fmt.Errorf("bulk indexing had errors: %s", errorMsg) + } + + return nil +} diff --git a/operations/index/document_test.go b/operations/index/document_test.go new file mode 100644 index 0000000..658f181 --- /dev/null +++ b/operations/index/document_test.go @@ -0,0 +1,368 @@ +package index + +import ( + "context" + "testing" + "time" +) + +// TestDocumentIntegration tests complete document lifecycle. +func TestDocumentIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + indexName := "test-doc-integration" + + // Setup index + err := CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + var docID string + + // Test 1: Index document + t.Run("IndexDocument", func(t *testing.T) { + doc := &Document{ + Source: map[string]any{ + "title": "Test Document", + "content": "This is a test", + "count": 42, + }, + } + + id, err := IndexDocument(ctx, c, indexName, doc) + if err != nil { + t.Fatalf("IndexDocument() error = %v", err) + } + if id == "" { + t.Error("IndexDocument() returned empty ID") + } + docID = id + t.Logf("Document indexed with ID: %s", docID) + }) + + // Test 2: Get document + t.Run("GetDocument", func(t *testing.T) { + doc, err := GetDocument(ctx, c, indexName, docID) + if err != nil { + t.Fatalf("GetDocument() error = %v", err) + } + + if doc.ID != docID { + t.Errorf("GetDocument() ID = %q, want %q", doc.ID, docID) + } + + if doc.Source["title"] != "Test Document" { + t.Errorf("GetDocument() title = %v, want %v", doc.Source["title"], "Test Document") + } + + t.Logf("Retrieved document: %s", doc.ID) + }) + + // Test 3: Update document + t.Run("UpdateDocument", func(t *testing.T) { + updates := map[string]any{ + "title": "Updated Document", + "count": 100, + } + + err := UpdateDocument(ctx, c, indexName, docID, updates) + if err != nil { + t.Fatalf("UpdateDocument() error = %v", err) + } + + // Verify update + doc, err := GetDocument(ctx, c, indexName, docID) + if err != nil { + t.Fatalf("GetDocument() error = %v", err) + } + + if doc.Source["title"] != "Updated Document" { + t.Errorf("Updated title = %v, want %v", doc.Source["title"], "Updated Document") + } + + t.Log("Document updated successfully") + }) + + // Test 4: Delete document + t.Run("DeleteDocument", func(t *testing.T) { + err := DeleteDocument(ctx, c, indexName, docID) + if err != nil { + t.Fatalf("DeleteDocument() error = %v", err) + } + + // Verify deletion + _, err = GetDocument(ctx, c, indexName, docID) + if err != ErrDocumentNotFound { + t.Errorf("GetDocument() error = %v, want %v", err, ErrDocumentNotFound) + } + + t.Log("Document deleted successfully") + }) +} + +func TestBulkIndexDocuments(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-bulk-index" + + // Setup index + err := CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + // Create multiple documents + docs := []*Document{ + { + ID: "doc1", + Source: map[string]any{ + "title": "Document 1", + "value": 1, + }, + }, + { + ID: "doc2", + Source: map[string]any{ + "title": "Document 2", + "value": 2, + }, + }, + { + ID: "doc3", + Source: map[string]any{ + "title": "Document 3", + "value": 3, + }, + }, + } + + err = BulkIndexDocuments(ctx, c, indexName, docs) + if err != nil { + t.Fatalf("BulkIndexDocuments() error = %v", err) + } + + // Verify all documents were indexed + for _, doc := range docs { + retrieved, err := GetDocument(ctx, c, indexName, doc.ID) + if err != nil { + t.Errorf("GetDocument(%s) error = %v", doc.ID, err) + continue + } + + if retrieved.Source["title"] != doc.Source["title"] { + t.Errorf("Document %s title = %v, want %v", doc.ID, retrieved.Source["title"], doc.Source["title"]) + } + } + + t.Logf("BulkIndexDocuments succeeded: indexed %d documents", len(docs)) +} + +func TestIndexDocument(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-index-document" + + // Cleanup any existing index first + _ = DeleteIndex(ctx, c, indexName) + + // Setup + err := CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + doc := &Document{ + Source: map[string]any{ + "field1": "value1", + "field2": 123, + }, + } + + id, err := IndexDocument(ctx, c, indexName, doc) + if err != nil { + t.Fatalf("IndexDocument() error = %v", err) + } + + if id == "" { + t.Error("IndexDocument() returned empty ID") + } + + t.Logf("IndexDocument succeeded: %s", id) +} + +func TestGetDocument(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-get-document" + + // Cleanup any existing index first + _ = DeleteIndex(ctx, c, indexName) + + // Setup + err := CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + // Index a document + doc := &Document{ + ID: "test-doc", + Source: map[string]any{ + "test": "data", + }, + } + + _, err = IndexDocument(ctx, c, indexName, doc) + if err != nil { + t.Fatalf("setup: IndexDocument() error = %v", err) + } + + // Test getting the document + retrieved, err := GetDocument(ctx, c, indexName, "test-doc") + if err != nil { + t.Fatalf("GetDocument() error = %v", err) + } + + if retrieved.ID != "test-doc" { + t.Errorf("GetDocument() ID = %q, want %q", retrieved.ID, "test-doc") + } + + t.Logf("GetDocument succeeded: %s", retrieved.ID) +} + +func TestUpdateDocument(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-update-document" + + // Cleanup any existing index first + _ = DeleteIndex(ctx, c, indexName) + + // Setup + err := CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + // Index a document + doc := &Document{ + ID: "test-doc", + Source: map[string]any{ + "value": 1, + }, + } + + _, err = IndexDocument(ctx, c, indexName, doc) + if err != nil { + t.Fatalf("setup: IndexDocument() error = %v", err) + } + + // Test update + updates := map[string]any{ + "value": 2, + } + + err = UpdateDocument(ctx, c, indexName, "test-doc", updates) + if err != nil { + t.Fatalf("UpdateDocument() error = %v", err) + } + + t.Log("UpdateDocument succeeded") +} + +func TestDeleteDocument(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-delete-document" + + // Cleanup any existing index first + _ = DeleteIndex(ctx, c, indexName) + + // Setup + err := CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + // Index a document + doc := &Document{ + ID: "test-doc", + Source: map[string]any{ + "test": "data", + }, + } + + _, err = IndexDocument(ctx, c, indexName, doc) + if err != nil { + t.Fatalf("setup: IndexDocument() error = %v", err) + } + + // Test deletion + err = DeleteDocument(ctx, c, indexName, "test-doc") + if err != nil { + t.Fatalf("DeleteDocument() error = %v", err) + } + + t.Log("DeleteDocument succeeded") +} diff --git a/operations/index/index.go b/operations/index/index.go new file mode 100644 index 0000000..4abc9ed --- /dev/null +++ b/operations/index/index.go @@ -0,0 +1,246 @@ +// Package index provides index-level operations for OpenSearch. +package index + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + + "es-demo/client" + + "github.com/opensearch-project/opensearch-go/v2/opensearchapi" +) + +var ( + // ErrIndexNotFound is returned when the specified index does not exist. + ErrIndexNotFound = errors.New("index not found") + + // ErrIndexAlreadyExists is returned when trying to create an index that already exists. + ErrIndexAlreadyExists = errors.New("index already exists") + + // ErrInvalidIndexName is returned when the index name is invalid. + ErrInvalidIndexName = errors.New("invalid index name") +) + +// IndexSettings represents the settings for an index. +type IndexSettings struct { + NumberOfShards int `json:"number_of_shards,omitempty"` + NumberOfReplicas int `json:"number_of_replicas,omitempty"` +} + +// IndexConfig represents the complete configuration for creating an index. +type IndexConfig struct { + Settings *IndexSettings `json:"settings,omitempty"` + Mappings map[string]any `json:"mappings,omitempty"` + Aliases map[string]any `json:"aliases,omitempty"` +} + +// IndexInfo represents information about an index. +type IndexInfo struct { + Name string + Settings *IndexSettings + Mappings map[string]any + Aliases map[string]any +} + +// CreateIndex creates a new index with the specified configuration. +func CreateIndex(ctx context.Context, c *client.Client, name string, config *IndexConfig) error { + if name == "" { + return ErrInvalidIndexName + } + + // Build request body + var body io.Reader + if config != nil { + data, err := json.Marshal(config) + if err != nil { + return fmt.Errorf("failed to marshal index config: %w", err) + } + body = bytes.NewReader(data) + } + + req := opensearchapi.IndicesCreateRequest{ + Index: name, + Body: body, + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return fmt.Errorf("failed to execute create index request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + if res.StatusCode == 400 { + return fmt.Errorf("%w: %s", ErrIndexAlreadyExists, string(bodyBytes)) + } + return fmt.Errorf("create index failed with status %s: %s", res.Status(), string(bodyBytes)) + } + + return nil +} + +// GetIndex retrieves information about an index. +func GetIndex(ctx context.Context, c *client.Client, name string) (*IndexInfo, error) { + if name == "" { + return nil, ErrInvalidIndexName + } + + req := opensearchapi.IndicesGetRequest{ + Index: []string{name}, + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return nil, fmt.Errorf("failed to execute get index request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + if res.StatusCode == 404 { + return nil, ErrIndexNotFound + } + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + return nil, fmt.Errorf("get index failed with status %s: %s", res.Status(), string(bodyBytes)) + } + + // Parse response + var response map[string]struct { + Settings struct { + Index struct { + NumberOfShards string `json:"number_of_shards"` + NumberOfReplicas string `json:"number_of_replicas"` + } `json:"index"` + } `json:"settings"` + Mappings map[string]any `json:"mappings"` + Aliases map[string]any `json:"aliases"` + } + + if err := json.NewDecoder(res.Body).Decode(&response); err != nil { + return nil, fmt.Errorf("failed to decode index info: %w", err) + } + + indexData, exists := response[name] + if !exists { + return nil, ErrIndexNotFound + } + + info := &IndexInfo{ + Name: name, + Mappings: indexData.Mappings, + Aliases: indexData.Aliases, + } + + return info, nil +} + +// DeleteIndex deletes an index. +func DeleteIndex(ctx context.Context, c *client.Client, name string) error { + if name == "" { + return ErrInvalidIndexName + } + + req := opensearchapi.IndicesDeleteRequest{ + Index: []string{name}, + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return fmt.Errorf("failed to execute delete index request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + if res.StatusCode == 404 { + return ErrIndexNotFound + } + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + return fmt.Errorf("delete index failed with status %s: %s", res.Status(), string(bodyBytes)) + } + + return nil +} + +// IndexExists checks if an index exists. +func IndexExists(ctx context.Context, c *client.Client, name string) (bool, error) { + if name == "" { + return false, ErrInvalidIndexName + } + + req := opensearchapi.IndicesExistsRequest{ + Index: []string{name}, + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return false, fmt.Errorf("failed to execute exists request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + return res.StatusCode == 200, nil +} + +// ListIndices lists all indices matching a pattern (use "*" for all). +func ListIndices(ctx context.Context, c *client.Client, pattern string) ([]string, error) { + if pattern == "" { + pattern = "*" + } + + req := opensearchapi.CatIndicesRequest{ + Index: []string{pattern}, + Format: "json", + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return nil, fmt.Errorf("failed to execute list indices request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + return nil, fmt.Errorf("list indices failed with status %s: %s", res.Status(), string(bodyBytes)) + } + + var indices []struct { + Index string `json:"index"` + } + + if err := json.NewDecoder(res.Body).Decode(&indices); err != nil { + return nil, fmt.Errorf("failed to decode indices list: %w", err) + } + + result := make([]string, len(indices)) + for i, idx := range indices { + result[i] = idx.Index + } + + return result, nil +} diff --git a/operations/index/index_test.go b/operations/index/index_test.go new file mode 100644 index 0000000..14e8b48 --- /dev/null +++ b/operations/index/index_test.go @@ -0,0 +1,297 @@ +package index + +import ( + "context" + "testing" + "time" +) + +// TestIndexIntegration tests complete index lifecycle. +func TestIndexIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + indexName := "test-index-integration" + + // Cleanup before and after test + _ = DeleteIndex(ctx, c, indexName) + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + // Test 1: Create index + t.Run("CreateIndex", func(t *testing.T) { + config := &IndexConfig{ + Settings: &IndexSettings{ + NumberOfShards: 1, + NumberOfReplicas: 0, + }, + Mappings: map[string]any{ + "properties": map[string]any{ + "title": map[string]any{ + "type": "text", + }, + "count": map[string]any{ + "type": "integer", + }, + }, + }, + } + + err := CreateIndex(ctx, c, indexName, config) + if err != nil { + t.Fatalf("CreateIndex() error = %v", err) + } + t.Log("Index created successfully") + }) + + // Test 2: Index exists + t.Run("IndexExists", func(t *testing.T) { + exists, err := IndexExists(ctx, c, indexName) + if err != nil { + t.Fatalf("IndexExists() error = %v", err) + } + if !exists { + t.Error("IndexExists() = false, want true") + } + t.Log("Index exists check passed") + }) + + // Test 3: Get index + t.Run("GetIndex", func(t *testing.T) { + info, err := GetIndex(ctx, c, indexName) + if err != nil { + t.Fatalf("GetIndex() error = %v", err) + } + if info.Name != indexName { + t.Errorf("GetIndex() name = %q, want %q", info.Name, indexName) + } + t.Logf("Retrieved index: %s", info.Name) + }) + + // Test 4: List indices + t.Run("ListIndices", func(t *testing.T) { + indices, err := ListIndices(ctx, c, "test-*") + if err != nil { + t.Fatalf("ListIndices() error = %v", err) + } + + found := false + for _, idx := range indices { + if idx == indexName { + found = true + break + } + } + + if !found { + t.Errorf("Index %q not found in list", indexName) + } + t.Logf("Found %d indices", len(indices)) + }) + + // Test 5: Delete index + t.Run("DeleteIndex", func(t *testing.T) { + err := DeleteIndex(ctx, c, indexName) + if err != nil { + t.Fatalf("DeleteIndex() error = %v", err) + } + + // Verify deletion + exists, err := IndexExists(ctx, c, indexName) + if err != nil { + t.Fatalf("IndexExists() error = %v", err) + } + if exists { + t.Error("Index still exists after deletion") + } + t.Log("Index deleted successfully") + }) +} + +func TestCreateIndex(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-create-index" + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + config := &IndexConfig{ + Settings: &IndexSettings{ + NumberOfShards: 1, + NumberOfReplicas: 0, + }, + } + + err := CreateIndex(ctx, c, indexName, config) + if err != nil { + t.Fatalf("CreateIndex() error = %v", err) + } + + t.Log("CreateIndex succeeded") +} + +func TestGetIndex(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-get-index" + + // Cleanup any existing index first + _ = DeleteIndex(ctx, c, indexName) + + // Setup + err := CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + // Test getting the index + info, err := GetIndex(ctx, c, indexName) + if err != nil { + t.Fatalf("GetIndex() error = %v", err) + } + + if info.Name != indexName { + t.Errorf("GetIndex() name = %q, want %q", info.Name, indexName) + } + + t.Logf("GetIndex succeeded: %s", info.Name) +} + +func TestDeleteIndex(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-delete-index" + + // Create index + err := CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + // Test deletion + err = DeleteIndex(ctx, c, indexName) + if err != nil { + t.Fatalf("DeleteIndex() error = %v", err) + } + + t.Log("DeleteIndex succeeded") +} + +func TestIndexExists(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-exists-index" + + // Test non-existent index + exists, err := IndexExists(ctx, c, indexName) + if err != nil { + t.Fatalf("IndexExists() error = %v", err) + } + if exists { + t.Error("IndexExists() = true for non-existent index") + } + + // Create index + err = CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + // Test existing index + exists, err = IndexExists(ctx, c, indexName) + if err != nil { + t.Fatalf("IndexExists() error = %v", err) + } + if !exists { + t.Error("IndexExists() = false for existing index") + } + + t.Log("IndexExists succeeded") +} + +func TestListIndices(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indices, err := ListIndices(ctx, c, "*") + if err != nil { + t.Fatalf("ListIndices() error = %v", err) + } + + if indices == nil { + t.Error("ListIndices() returned nil") + } + + t.Logf("ListIndices succeeded: found %d indices", len(indices)) +} + +// Unit tests for validation +func TestCreateIndex_Validation(t *testing.T) { + ctx := context.Background() + + tests := []struct { + name string + indexName string + wantErr error + }{ + { + name: "empty index name", + indexName: "", + wantErr: ErrInvalidIndexName, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Pass nil client to only test validation logic + err := CreateIndex(ctx, nil, tt.indexName, nil) + if err != tt.wantErr { + t.Errorf("CreateIndex() error = %v, want %v", err, tt.wantErr) + } + }) + } +} diff --git a/operations/index/mapping.go b/operations/index/mapping.go new file mode 100644 index 0000000..98e5e15 --- /dev/null +++ b/operations/index/mapping.go @@ -0,0 +1,124 @@ +// Package index provides index-level operations for OpenSearch. +package index + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "os" + + "es-demo/client" + + "github.com/opensearch-project/opensearch-go/v2/opensearchapi" +) + +// FieldMapping represents a field mapping configuration. +type FieldMapping struct { + Type string `json:"type"` + Fields map[string]any `json:"fields,omitempty"` + // Common field options + Index *bool `json:"index,omitempty"` + Store *bool `json:"store,omitempty"` + Analyzer string `json:"analyzer,omitempty"` + Format string `json:"format,omitempty"` + IgnoreAbove int `json:"ignore_above,omitempty"` +} + +// PutMapping updates the mapping for an index by adding new fields. +// Note: Existing field mappings cannot be changed in OpenSearch. +func PutMapping(ctx context.Context, c *client.Client, indexName string, properties map[string]FieldMapping) error { + if indexName == "" { + return ErrInvalidIndexName + } + + if len(properties) == 0 { + return fmt.Errorf("properties cannot be empty") + } + + // Build mapping request + mappingBody := map[string]any{ + "properties": properties, + } + + data, err := json.Marshal(mappingBody) + if err != nil { + return fmt.Errorf("failed to marshal mapping: %w", err) + } + + req := opensearchapi.IndicesPutMappingRequest{ + Index: []string{indexName}, + Body: bytes.NewReader(data), + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return fmt.Errorf("failed to execute put mapping request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + return fmt.Errorf("put mapping failed with status %s: %s", res.Status(), string(bodyBytes)) + } + + return nil +} + +// GetMapping retrieves the mapping for an index. +func GetMapping(ctx context.Context, c *client.Client, indexName string) (map[string]any, error) { + if indexName == "" { + return nil, ErrInvalidIndexName + } + + req := opensearchapi.IndicesGetMappingRequest{ + Index: []string{indexName}, + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return nil, fmt.Errorf("failed to execute get mapping request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + if res.StatusCode == 404 { + return nil, ErrIndexNotFound + } + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + return nil, fmt.Errorf("get mapping failed with status %s: %s", res.Status(), string(bodyBytes)) + } + + var response map[string]struct { + Mappings map[string]any `json:"mappings"` + } + + if err := json.NewDecoder(res.Body).Decode(&response); err != nil { + return nil, fmt.Errorf("failed to decode mapping response: %w", err) + } + + indexData, exists := response[indexName] + if !exists { + return nil, ErrIndexNotFound + } + + return indexData.Mappings, nil +} + +// AddField adds a new field to the index mapping. +// This is a convenience wrapper around PutMapping for adding a single field. +func AddField(ctx context.Context, c *client.Client, indexName string, fieldName string, mapping FieldMapping) error { + return PutMapping(ctx, c, indexName, map[string]FieldMapping{ + fieldName: mapping, + }) +} diff --git a/operations/index/mapping_test.go b/operations/index/mapping_test.go new file mode 100644 index 0000000..2584ee9 --- /dev/null +++ b/operations/index/mapping_test.go @@ -0,0 +1,204 @@ +package index + +import ( + "context" + "testing" + "time" +) + +// TestMappingIntegration tests mapping operations. +func TestMappingIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + indexName := "test-mapping-integration" + + // Setup index + err := CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + // Test 1: Add field + t.Run("AddField", func(t *testing.T) { + mapping := FieldMapping{ + Type: "text", + } + + err := AddField(ctx, c, indexName, "new_field", mapping) + if err != nil { + t.Fatalf("AddField() error = %v", err) + } + + t.Log("Field added successfully") + }) + + // Test 2: Get mapping + t.Run("GetMapping", func(t *testing.T) { + mappings, err := GetMapping(ctx, c, indexName) + if err != nil { + t.Fatalf("GetMapping() error = %v", err) + } + + if mappings == nil { + t.Error("GetMapping() returned nil") + } + + t.Logf("Retrieved mappings for index %s", indexName) + }) + + // Test 3: Put mapping with multiple fields + t.Run("PutMapping", func(t *testing.T) { + properties := map[string]FieldMapping{ + "title": { + Type: "text", + }, + "count": { + Type: "integer", + }, + "timestamp": { + Type: "date", + Format: "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis", + }, + } + + err := PutMapping(ctx, c, indexName, properties) + if err != nil { + t.Fatalf("PutMapping() error = %v", err) + } + + t.Log("Mapping updated successfully") + }) +} + +func TestPutMapping(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-put-mapping" + + // Cleanup any existing index first + _ = DeleteIndex(ctx, c, indexName) + + // Setup + err := CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + properties := map[string]FieldMapping{ + "name": { + Type: "text", + }, + "age": { + Type: "integer", + }, + } + + err = PutMapping(ctx, c, indexName, properties) + if err != nil { + t.Fatalf("PutMapping() error = %v", err) + } + + t.Log("PutMapping succeeded") +} + +func TestGetMapping(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-get-mapping" + + // Cleanup any existing index first + _ = DeleteIndex(ctx, c, indexName) + + // Setup with initial mapping + config := &IndexConfig{ + Mappings: map[string]any{ + "properties": map[string]any{ + "field1": map[string]any{ + "type": "text", + }, + }, + }, + } + + err := CreateIndex(ctx, c, indexName, config) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + mappings, err := GetMapping(ctx, c, indexName) + if err != nil { + t.Fatalf("GetMapping() error = %v", err) + } + + if mappings == nil { + t.Error("GetMapping() returned nil") + } + + t.Log("GetMapping succeeded") +} + +func TestAddField(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-add-field" + + // Cleanup any existing index first + _ = DeleteIndex(ctx, c, indexName) + + // Setup + err := CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + mapping := FieldMapping{ + Type: "keyword", + } + + err = AddField(ctx, c, indexName, "status", mapping) + if err != nil { + t.Fatalf("AddField() error = %v", err) + } + + t.Log("AddField succeeded") +} diff --git a/operations/index/search.go b/operations/index/search.go new file mode 100644 index 0000000..d050cc3 --- /dev/null +++ b/operations/index/search.go @@ -0,0 +1,187 @@ +// Package index provides index-level operations for OpenSearch. +package index + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "os" + + "es-demo/client" + + "github.com/opensearch-project/opensearch-go/v2/opensearchapi" +) + +// SearchQuery represents a search query configuration. +type SearchQuery struct { + // Query is the search query DSL + Query map[string]any `json:"query,omitempty"` + + // Size is the maximum number of hits to return (default: 10) + Size int `json:"size,omitempty"` + + // From is the starting offset (for pagination) + From int `json:"from,omitempty"` + + // Sort defines the sort order + Sort []map[string]any `json:"sort,omitempty"` + + // Source defines which fields to return + Source any `json:"_source,omitempty"` +} + +// SearchResult represents search results. +type SearchResult struct { + Took int64 `json:"took"` + Hits struct { + Total struct { + Value int64 `json:"value"` + Relation string `json:"relation"` + } `json:"total"` + MaxScore *float64 `json:"max_score"` + Hits []Hit `json:"hits"` + } `json:"hits"` +} + +// Hit represents a single search result hit. +type Hit struct { + Index string `json:"_index"` + ID string `json:"_id"` + Score *float64 `json:"_score"` + Source map[string]any `json:"_source"` +} + +// Search performs a search query on an index. +func Search(ctx context.Context, c *client.Client, indexName string, query *SearchQuery) (*SearchResult, error) { + if indexName == "" { + return nil, ErrInvalidIndexName + } + + if query == nil { + query = &SearchQuery{} + } + + // Set defaults + if query.Size == 0 { + query.Size = 10 + } + + data, err := json.Marshal(query) + if err != nil { + return nil, fmt.Errorf("failed to marshal search query: %w", err) + } + + req := opensearchapi.SearchRequest{ + Index: []string{indexName}, + Body: bytes.NewReader(data), + } + + res, err := req.Do(ctx, c.GetClient()) + if err != nil { + return nil, fmt.Errorf("failed to execute search request: %w", err) + } + defer func() { + if closeErr := res.Body.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr) + } + }() + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + return nil, fmt.Errorf("search failed with status %s: %s", res.Status(), string(bodyBytes)) + } + + var result SearchResult + if err := json.NewDecoder(res.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode search result: %w", err) + } + + return &result, nil +} + +// MatchQuery creates a match query for full-text search. +func MatchQuery(field string, value any) map[string]any { + return map[string]any{ + "match": map[string]any{ + field: value, + }, + } +} + +// TermQuery creates a term query for exact matching. +func TermQuery(field string, value any) map[string]any { + return map[string]any{ + "term": map[string]any{ + field: value, + }, + } +} + +// RangeQuery creates a range query. +func RangeQuery(field string, gte, lte any) map[string]any { + rangeMap := make(map[string]any) + if gte != nil { + rangeMap["gte"] = gte + } + if lte != nil { + rangeMap["lte"] = lte + } + + return map[string]any{ + "range": map[string]any{ + field: rangeMap, + }, + } +} + +// BoolQuery creates a bool query for combining multiple queries. +type BoolQuery struct { + Must []map[string]any `json:"must,omitempty"` + Should []map[string]any `json:"should,omitempty"` + MustNot []map[string]any `json:"must_not,omitempty"` + Filter []map[string]any `json:"filter,omitempty"` +} + +// ToBoolQuery converts BoolQuery to query DSL. +func (b *BoolQuery) ToBoolQuery() map[string]any { + return map[string]any{ + "bool": b, + } +} + +// MatchAllQuery creates a match_all query. +func MatchAllQuery() map[string]any { + return map[string]any{ + "match_all": map[string]any{}, + } +} + +// MultiMatchQuery creates a multi_match query for searching across multiple fields. +func MultiMatchQuery(value any, fields ...string) map[string]any { + return map[string]any{ + "multi_match": map[string]any{ + "query": value, + "fields": fields, + }, + } +} + +// WildcardQuery creates a wildcard query. +func WildcardQuery(field string, value string) map[string]any { + return map[string]any{ + "wildcard": map[string]any{ + field: value, + }, + } +} + +// PrefixQuery creates a prefix query. +func PrefixQuery(field string, value string) map[string]any { + return map[string]any{ + "prefix": map[string]any{ + field: value, + }, + } +} diff --git a/operations/index/search_test.go b/operations/index/search_test.go new file mode 100644 index 0000000..72b8088 --- /dev/null +++ b/operations/index/search_test.go @@ -0,0 +1,343 @@ +package index + +import ( + "context" + "testing" + "time" +) + +// TestSearchIntegration tests search operations. +func TestSearchIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + indexName := "test-search-integration" + + // Setup index with mapping + config := &IndexConfig{ + Mappings: map[string]any{ + "properties": map[string]any{ + "title": map[string]any{ + "type": "text", + }, + "category": map[string]any{ + "type": "keyword", + }, + "price": map[string]any{ + "type": "integer", + }, + }, + }, + } + + err := CreateIndex(ctx, c, indexName, config) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + // Index test documents + docs := []*Document{ + { + ID: "1", + Source: map[string]any{ + "title": "Go Programming Language", + "category": "books", + "price": 50, + }, + }, + { + ID: "2", + Source: map[string]any{ + "title": "Python for Beginners", + "category": "books", + "price": 40, + }, + }, + { + ID: "3", + Source: map[string]any{ + "title": "JavaScript Guide", + "category": "books", + "price": 45, + }, + }, + } + + err = BulkIndexDocuments(ctx, c, indexName, docs) + if err != nil { + t.Fatalf("setup: BulkIndexDocuments() error = %v", err) + } + + // Give ES a moment to index + time.Sleep(1 * time.Second) + + // Test 1: Match query + t.Run("MatchQuery", func(t *testing.T) { + query := &SearchQuery{ + Query: MatchQuery("title", "Programming"), + Size: 10, + } + + result, err := Search(ctx, c, indexName, query) + if err != nil { + t.Fatalf("Search() error = %v", err) + } + + if result.Hits.Total.Value == 0 { + t.Error("Search() returned no results") + } + + t.Logf("Match query found %d results", result.Hits.Total.Value) + }) + + // Test 2: Term query + t.Run("TermQuery", func(t *testing.T) { + query := &SearchQuery{ + Query: TermQuery("category", "books"), + Size: 10, + } + + result, err := Search(ctx, c, indexName, query) + if err != nil { + t.Fatalf("Search() error = %v", err) + } + + if result.Hits.Total.Value != 3 { + t.Errorf("Term query found %d results, want 3", result.Hits.Total.Value) + } + + t.Logf("Term query found %d results", result.Hits.Total.Value) + }) + + // Test 3: Range query + t.Run("RangeQuery", func(t *testing.T) { + query := &SearchQuery{ + Query: RangeQuery("price", 40, 50), + Size: 10, + } + + result, err := Search(ctx, c, indexName, query) + if err != nil { + t.Fatalf("Search() error = %v", err) + } + + if result.Hits.Total.Value == 0 { + t.Error("Range query returned no results") + } + + t.Logf("Range query found %d results", result.Hits.Total.Value) + }) + + // Test 4: Bool query + t.Run("BoolQuery", func(t *testing.T) { + boolQ := &BoolQuery{ + Must: []map[string]any{ + TermQuery("category", "books"), + }, + Filter: []map[string]any{ + RangeQuery("price", nil, 45), + }, + } + + query := &SearchQuery{ + Query: boolQ.ToBoolQuery(), + Size: 10, + } + + result, err := Search(ctx, c, indexName, query) + if err != nil { + t.Fatalf("Search() error = %v", err) + } + + if result.Hits.Total.Value == 0 { + t.Error("Bool query returned no results") + } + + t.Logf("Bool query found %d results", result.Hits.Total.Value) + }) + + // Test 5: Match all query + t.Run("MatchAllQuery", func(t *testing.T) { + query := &SearchQuery{ + Query: MatchAllQuery(), + Size: 10, + } + + result, err := Search(ctx, c, indexName, query) + if err != nil { + t.Fatalf("Search() error = %v", err) + } + + if result.Hits.Total.Value != 3 { + t.Errorf("Match all query found %d results, want 3", result.Hits.Total.Value) + } + + t.Logf("Match all query found %d results", result.Hits.Total.Value) + }) + + // Test 6: Pagination + t.Run("Pagination", func(t *testing.T) { + query := &SearchQuery{ + Query: MatchAllQuery(), + Size: 1, + From: 1, + } + + result, err := Search(ctx, c, indexName, query) + if err != nil { + t.Fatalf("Search() error = %v", err) + } + + if len(result.Hits.Hits) != 1 { + t.Errorf("Pagination returned %d hits, want 1", len(result.Hits.Hits)) + } + + t.Log("Pagination test passed") + }) + + // Test 7: Multi-match query + t.Run("MultiMatchQuery", func(t *testing.T) { + query := &SearchQuery{ + Query: MultiMatchQuery("Programming", "title", "category"), + Size: 10, + } + + result, err := Search(ctx, c, indexName, query) + if err != nil { + t.Fatalf("Search() error = %v", err) + } + + if result.Hits.Total.Value == 0 { + t.Error("Multi-match query returned no results") + } + + t.Logf("Multi-match query found %d results", result.Hits.Total.Value) + }) +} + +func TestSearch(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + c := setupIntegrationTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + indexName := "test-search" + + // Cleanup any existing index first + _ = DeleteIndex(ctx, c, indexName) + + // Setup + err := CreateIndex(ctx, c, indexName, nil) + if err != nil { + t.Fatalf("setup: CreateIndex() error = %v", err) + } + + defer func() { + _ = DeleteIndex(ctx, c, indexName) + }() + + // Index a document + doc := &Document{ + Source: map[string]any{ + "message": "test search", + }, + } + + _, err = IndexDocument(ctx, c, indexName, doc) + if err != nil { + t.Fatalf("setup: IndexDocument() error = %v", err) + } + + // Give ES a moment + time.Sleep(1 * time.Second) + + query := &SearchQuery{ + Query: MatchAllQuery(), + Size: 10, + } + + result, err := Search(ctx, c, indexName, query) + if err != nil { + t.Fatalf("Search() error = %v", err) + } + + if result == nil { + t.Error("Search() returned nil") + } + + t.Logf("Search succeeded: found %d results", result.Hits.Total.Value) +} + +func TestMatchQuery(t *testing.T) { + query := MatchQuery("field", "value") + + if query == nil { + t.Error("MatchQuery() returned nil") + } + + if _, ok := query["match"]; !ok { + t.Error("MatchQuery() missing 'match' key") + } + + t.Log("MatchQuery test passed") +} + +func TestTermQuery(t *testing.T) { + query := TermQuery("field", "value") + + if query == nil { + t.Error("TermQuery() returned nil") + } + + if _, ok := query["term"]; !ok { + t.Error("TermQuery() missing 'term' key") + } + + t.Log("TermQuery test passed") +} + +func TestRangeQuery(t *testing.T) { + query := RangeQuery("age", 18, 65) + + if query == nil { + t.Error("RangeQuery() returned nil") + } + + if _, ok := query["range"]; !ok { + t.Error("RangeQuery() missing 'range' key") + } + + t.Log("RangeQuery test passed") +} + +func TestBoolQuery(t *testing.T) { + boolQ := &BoolQuery{ + Must: []map[string]any{ + MatchQuery("field", "value"), + }, + } + + query := boolQ.ToBoolQuery() + + if query == nil { + t.Error("ToBoolQuery() returned nil") + } + + if _, ok := query["bool"]; !ok { + t.Error("ToBoolQuery() missing 'bool' key") + } + + t.Log("BoolQuery test passed") +}