247 lines
6.1 KiB
Go
247 lines
6.1 KiB
Go
// Package index provides index-level operations for OpenSearch.
|
|
package index
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
|
|
"es-demo/client"
|
|
|
|
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
|
|
)
|
|
|
|
var (
|
|
// ErrIndexNotFound is returned when the specified index does not exist.
|
|
ErrIndexNotFound = errors.New("index not found")
|
|
|
|
// ErrIndexAlreadyExists is returned when trying to create an index that already exists.
|
|
ErrIndexAlreadyExists = errors.New("index already exists")
|
|
|
|
// ErrInvalidIndexName is returned when the index name is invalid.
|
|
ErrInvalidIndexName = errors.New("invalid index name")
|
|
)
|
|
|
|
// IndexSettings represents the settings for an index.
|
|
type IndexSettings struct {
|
|
NumberOfShards int `json:"number_of_shards,omitempty"`
|
|
NumberOfReplicas int `json:"number_of_replicas,omitempty"`
|
|
}
|
|
|
|
// IndexConfig represents the complete configuration for creating an index.
|
|
type IndexConfig struct {
|
|
Settings *IndexSettings `json:"settings,omitempty"`
|
|
Mappings map[string]any `json:"mappings,omitempty"`
|
|
Aliases map[string]any `json:"aliases,omitempty"`
|
|
}
|
|
|
|
// IndexInfo represents information about an index.
|
|
type IndexInfo struct {
|
|
Name string
|
|
Settings *IndexSettings
|
|
Mappings map[string]any
|
|
Aliases map[string]any
|
|
}
|
|
|
|
// CreateIndex creates a new index with the specified configuration.
|
|
func CreateIndex(ctx context.Context, c *client.Client, name string, config *IndexConfig) error {
|
|
if name == "" {
|
|
return ErrInvalidIndexName
|
|
}
|
|
|
|
// Build request body
|
|
var body io.Reader
|
|
if config != nil {
|
|
data, err := json.Marshal(config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal index config: %w", err)
|
|
}
|
|
body = bytes.NewReader(data)
|
|
}
|
|
|
|
req := opensearchapi.IndicesCreateRequest{
|
|
Index: name,
|
|
Body: body,
|
|
}
|
|
|
|
res, err := req.Do(ctx, c.GetClient())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to execute create index request: %w", err)
|
|
}
|
|
defer func() {
|
|
if closeErr := res.Body.Close(); closeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
|
}
|
|
}()
|
|
|
|
if res.IsError() {
|
|
bodyBytes, _ := io.ReadAll(res.Body)
|
|
if res.StatusCode == 400 {
|
|
return fmt.Errorf("%w: %s", ErrIndexAlreadyExists, string(bodyBytes))
|
|
}
|
|
return fmt.Errorf("create index failed with status %s: %s", res.Status(), string(bodyBytes))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetIndex retrieves information about an index.
|
|
func GetIndex(ctx context.Context, c *client.Client, name string) (*IndexInfo, error) {
|
|
if name == "" {
|
|
return nil, ErrInvalidIndexName
|
|
}
|
|
|
|
req := opensearchapi.IndicesGetRequest{
|
|
Index: []string{name},
|
|
}
|
|
|
|
res, err := req.Do(ctx, c.GetClient())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to execute get index request: %w", err)
|
|
}
|
|
defer func() {
|
|
if closeErr := res.Body.Close(); closeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
|
}
|
|
}()
|
|
|
|
if res.StatusCode == 404 {
|
|
return nil, ErrIndexNotFound
|
|
}
|
|
|
|
if res.IsError() {
|
|
bodyBytes, _ := io.ReadAll(res.Body)
|
|
return nil, fmt.Errorf("get index failed with status %s: %s", res.Status(), string(bodyBytes))
|
|
}
|
|
|
|
// Parse response
|
|
var response map[string]struct {
|
|
Settings struct {
|
|
Index struct {
|
|
NumberOfShards string `json:"number_of_shards"`
|
|
NumberOfReplicas string `json:"number_of_replicas"`
|
|
} `json:"index"`
|
|
} `json:"settings"`
|
|
Mappings map[string]any `json:"mappings"`
|
|
Aliases map[string]any `json:"aliases"`
|
|
}
|
|
|
|
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
|
|
return nil, fmt.Errorf("failed to decode index info: %w", err)
|
|
}
|
|
|
|
indexData, exists := response[name]
|
|
if !exists {
|
|
return nil, ErrIndexNotFound
|
|
}
|
|
|
|
info := &IndexInfo{
|
|
Name: name,
|
|
Mappings: indexData.Mappings,
|
|
Aliases: indexData.Aliases,
|
|
}
|
|
|
|
return info, nil
|
|
}
|
|
|
|
// DeleteIndex deletes an index.
|
|
func DeleteIndex(ctx context.Context, c *client.Client, name string) error {
|
|
if name == "" {
|
|
return ErrInvalidIndexName
|
|
}
|
|
|
|
req := opensearchapi.IndicesDeleteRequest{
|
|
Index: []string{name},
|
|
}
|
|
|
|
res, err := req.Do(ctx, c.GetClient())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to execute delete index request: %w", err)
|
|
}
|
|
defer func() {
|
|
if closeErr := res.Body.Close(); closeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
|
}
|
|
}()
|
|
|
|
if res.StatusCode == 404 {
|
|
return ErrIndexNotFound
|
|
}
|
|
|
|
if res.IsError() {
|
|
bodyBytes, _ := io.ReadAll(res.Body)
|
|
return fmt.Errorf("delete index failed with status %s: %s", res.Status(), string(bodyBytes))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// IndexExists checks if an index exists.
|
|
func IndexExists(ctx context.Context, c *client.Client, name string) (bool, error) {
|
|
if name == "" {
|
|
return false, ErrInvalidIndexName
|
|
}
|
|
|
|
req := opensearchapi.IndicesExistsRequest{
|
|
Index: []string{name},
|
|
}
|
|
|
|
res, err := req.Do(ctx, c.GetClient())
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to execute exists request: %w", err)
|
|
}
|
|
defer func() {
|
|
if closeErr := res.Body.Close(); closeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
|
}
|
|
}()
|
|
|
|
return res.StatusCode == 200, nil
|
|
}
|
|
|
|
// ListIndices lists all indices matching a pattern (use "*" for all).
|
|
func ListIndices(ctx context.Context, c *client.Client, pattern string) ([]string, error) {
|
|
if pattern == "" {
|
|
pattern = "*"
|
|
}
|
|
|
|
req := opensearchapi.CatIndicesRequest{
|
|
Index: []string{pattern},
|
|
Format: "json",
|
|
}
|
|
|
|
res, err := req.Do(ctx, c.GetClient())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to execute list indices request: %w", err)
|
|
}
|
|
defer func() {
|
|
if closeErr := res.Body.Close(); closeErr != nil {
|
|
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
|
}
|
|
}()
|
|
|
|
if res.IsError() {
|
|
bodyBytes, _ := io.ReadAll(res.Body)
|
|
return nil, fmt.Errorf("list indices failed with status %s: %s", res.Status(), string(bodyBytes))
|
|
}
|
|
|
|
var indices []struct {
|
|
Index string `json:"index"`
|
|
}
|
|
|
|
if err := json.NewDecoder(res.Body).Decode(&indices); err != nil {
|
|
return nil, fmt.Errorf("failed to decode indices list: %w", err)
|
|
}
|
|
|
|
result := make([]string, len(indices))
|
|
for i, idx := range indices {
|
|
result[i] = idx.Index
|
|
}
|
|
|
|
return result, nil
|
|
}
|