Files
mouseleee fc14798af5 feat: 增加ism管理接口
test: 索引模板和ism的单元测试和集成测试
2025-11-16 23:00:31 +08:00

246 lines
6.7 KiB
Go

// Package index provides index-level operations for OpenSearch.
package index
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"es-demo/client"
)
// ISMPolicyManager implements PolicyManager for AWS OpenSearch ISM.
type ISMPolicyManager struct {
client *client.Client
}
// NewISMPolicyManager creates a new ISM policy manager for AWS OpenSearch.
func NewISMPolicyManager(c *client.Client) PolicyManager {
return &ISMPolicyManager{client: c}
}
// PutPolicy creates or updates an ISM policy.
func (m *ISMPolicyManager) PutPolicy(ctx context.Context, name string, policy *Policy) error {
if name == "" {
return fmt.Errorf("policy name cannot be empty")
}
if err := policy.Validate(); err != nil {
return fmt.Errorf("invalid policy: %w", err)
}
// Build ISM policy structure
ismPolicy := map[string]interface{}{
"policy": map[string]interface{}{
"description": policy.Description,
"default_state": policy.DefaultState,
"states": policy.States,
},
}
// Add ISM template if provided
if len(policy.ISMTemplate) > 0 {
ismPolicy["policy"].(map[string]interface{})["ism_template"] = policy.ISMTemplate
}
body, err := json.Marshal(ismPolicy)
if err != nil {
return fmt.Errorf("failed to marshal policy: %w", err)
}
// ISM API endpoint: PUT _plugins/_ism/policies/{policy_name}
req, err := http.NewRequestWithContext(
ctx,
http.MethodPut,
fmt.Sprintf("/_plugins/_ism/policies/%s", name),
bytes.NewReader(body),
)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
res, err := m.client.GetClient().Perform(req)
if err != nil {
return fmt.Errorf("failed to execute put policy request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusCreated {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("put policy failed with status %d: %s", res.StatusCode, string(bodyBytes))
}
return nil
}
// GetPolicy retrieves an ISM policy by name.
func (m *ISMPolicyManager) GetPolicy(ctx context.Context, name string) (*Policy, error) {
if name == "" {
return nil, fmt.Errorf("policy name cannot be empty")
}
// ISM API endpoint: GET _plugins/_ism/policies/{policy_name}
req, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
fmt.Sprintf("/_plugins/_ism/policies/%s", name),
nil,
)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
res, err := m.client.GetClient().Perform(req)
if err != nil {
return nil, fmt.Errorf("failed to execute get policy request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("policy %q not found", name)
}
if res.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("get policy failed with status %d: %s", res.StatusCode, string(bodyBytes))
}
// Parse response
var response struct {
ID string `json:"_id"`
Version int `json:"_version"`
SeqNo int `json:"_seq_no"`
Policy struct {
Description string `json:"description"`
DefaultState string `json:"default_state"`
States []State `json:"states"`
ISMTemplate []ISMTemplate `json:"ism_template,omitempty"`
} `json:"policy"`
}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return nil, fmt.Errorf("failed to decode policy response: %w", err)
}
policy := &Policy{
Description: response.Policy.Description,
DefaultState: response.Policy.DefaultState,
States: response.Policy.States,
ISMTemplate: response.Policy.ISMTemplate,
}
return policy, nil
}
// DeletePolicy deletes an ISM policy by name.
func (m *ISMPolicyManager) DeletePolicy(ctx context.Context, name string) error {
if name == "" {
return fmt.Errorf("policy name cannot be empty")
}
// ISM API endpoint: DELETE _plugins/_ism/policies/{policy_name}
req, err := http.NewRequestWithContext(
ctx,
http.MethodDelete,
fmt.Sprintf("/_plugins/_ism/policies/%s", name),
nil,
)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
res, err := m.client.GetClient().Perform(req)
if err != nil {
return fmt.Errorf("failed to execute delete policy request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode == http.StatusNotFound {
return fmt.Errorf("policy %q not found", name)
}
if res.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("delete policy failed with status %d: %s", res.StatusCode, string(bodyBytes))
}
return nil
}
// ListPolicies retrieves all ISM policies.
func (m *ISMPolicyManager) ListPolicies(ctx context.Context) (map[string]*Policy, error) {
// ISM API endpoint: GET _plugins/_ism/policies
req, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
"/_plugins/_ism/policies",
nil,
)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
res, err := m.client.GetClient().Perform(req)
if err != nil {
return nil, fmt.Errorf("failed to execute list policies request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("list policies failed with status %d: %s", res.StatusCode, string(bodyBytes))
}
// Parse response
var response struct {
Policies []struct {
ID string `json:"_id"`
Policy struct {
Description string `json:"description"`
DefaultState string `json:"default_state"`
States []State `json:"states"`
ISMTemplate []ISMTemplate `json:"ism_template,omitempty"`
} `json:"policy"`
} `json:"policies"`
TotalPolicies int `json:"total_policies"`
}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return nil, fmt.Errorf("failed to decode policies response: %w", err)
}
policies := make(map[string]*Policy, len(response.Policies))
for _, item := range response.Policies {
policies[item.ID] = &Policy{
Description: item.Policy.Description,
DefaultState: item.Policy.DefaultState,
States: item.Policy.States,
ISMTemplate: item.Policy.ISMTemplate,
}
}
return policies, nil
}