Files
es-demo/operations/index/index.go

247 lines
6.1 KiB
Go

// Package index provides index-level operations for OpenSearch.
package index
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"es-demo/client"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
)
var (
// ErrIndexNotFound is returned when the specified index does not exist.
ErrIndexNotFound = errors.New("index not found")
// ErrIndexAlreadyExists is returned when trying to create an index that already exists.
ErrIndexAlreadyExists = errors.New("index already exists")
// ErrInvalidIndexName is returned when the index name is invalid.
ErrInvalidIndexName = errors.New("invalid index name")
)
// IndexSettings represents the settings for an index.
type IndexSettings struct {
NumberOfShards int `json:"number_of_shards,omitempty"`
NumberOfReplicas int `json:"number_of_replicas,omitempty"`
}
// IndexConfig represents the complete configuration for creating an index.
type IndexConfig struct {
Settings *IndexSettings `json:"settings,omitempty"`
Mappings map[string]any `json:"mappings,omitempty"`
Aliases map[string]any `json:"aliases,omitempty"`
}
// IndexInfo represents information about an index.
type IndexInfo struct {
Name string
Settings *IndexSettings
Mappings map[string]any
Aliases map[string]any
}
// CreateIndex creates a new index with the specified configuration.
func CreateIndex(ctx context.Context, c *client.Client, name string, config *IndexConfig) error {
if name == "" {
return ErrInvalidIndexName
}
// Build request body
var body io.Reader
if config != nil {
data, err := json.Marshal(config)
if err != nil {
return fmt.Errorf("failed to marshal index config: %w", err)
}
body = bytes.NewReader(data)
}
req := opensearchapi.IndicesCreateRequest{
Index: name,
Body: body,
}
res, err := req.Do(ctx, c.GetClient())
if err != nil {
return fmt.Errorf("failed to execute create index request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
if res.StatusCode == 400 {
return fmt.Errorf("%w: %s", ErrIndexAlreadyExists, string(bodyBytes))
}
return fmt.Errorf("create index failed with status %s: %s", res.Status(), string(bodyBytes))
}
return nil
}
// GetIndex retrieves information about an index.
func GetIndex(ctx context.Context, c *client.Client, name string) (*IndexInfo, error) {
if name == "" {
return nil, ErrInvalidIndexName
}
req := opensearchapi.IndicesGetRequest{
Index: []string{name},
}
res, err := req.Do(ctx, c.GetClient())
if err != nil {
return nil, fmt.Errorf("failed to execute get index request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode == 404 {
return nil, ErrIndexNotFound
}
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("get index failed with status %s: %s", res.Status(), string(bodyBytes))
}
// Parse response
var response map[string]struct {
Settings struct {
Index struct {
NumberOfShards string `json:"number_of_shards"`
NumberOfReplicas string `json:"number_of_replicas"`
} `json:"index"`
} `json:"settings"`
Mappings map[string]any `json:"mappings"`
Aliases map[string]any `json:"aliases"`
}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return nil, fmt.Errorf("failed to decode index info: %w", err)
}
indexData, exists := response[name]
if !exists {
return nil, ErrIndexNotFound
}
info := &IndexInfo{
Name: name,
Mappings: indexData.Mappings,
Aliases: indexData.Aliases,
}
return info, nil
}
// DeleteIndex deletes an index.
func DeleteIndex(ctx context.Context, c *client.Client, name string) error {
if name == "" {
return ErrInvalidIndexName
}
req := opensearchapi.IndicesDeleteRequest{
Index: []string{name},
}
res, err := req.Do(ctx, c.GetClient())
if err != nil {
return fmt.Errorf("failed to execute delete index request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode == 404 {
return ErrIndexNotFound
}
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("delete index failed with status %s: %s", res.Status(), string(bodyBytes))
}
return nil
}
// IndexExists checks if an index exists.
func IndexExists(ctx context.Context, c *client.Client, name string) (bool, error) {
if name == "" {
return false, ErrInvalidIndexName
}
req := opensearchapi.IndicesExistsRequest{
Index: []string{name},
}
res, err := req.Do(ctx, c.GetClient())
if err != nil {
return false, fmt.Errorf("failed to execute exists request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
return res.StatusCode == 200, nil
}
// ListIndices lists all indices matching a pattern (use "*" for all).
func ListIndices(ctx context.Context, c *client.Client, pattern string) ([]string, error) {
if pattern == "" {
pattern = "*"
}
req := opensearchapi.CatIndicesRequest{
Index: []string{pattern},
Format: "json",
}
res, err := req.Do(ctx, c.GetClient())
if err != nil {
return nil, fmt.Errorf("failed to execute list indices request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("list indices failed with status %s: %s", res.Status(), string(bodyBytes))
}
var indices []struct {
Index string `json:"index"`
}
if err := json.NewDecoder(res.Body).Decode(&indices); err != nil {
return nil, fmt.Errorf("failed to decode indices list: %w", err)
}
result := make([]string, len(indices))
for i, idx := range indices {
result[i] = idx.Index
}
return result, nil
}