feat: 增加ism管理接口

test: 索引模板和ism的单元测试和集成测试
This commit is contained in:
mouseleee
2025-11-16 23:00:31 +08:00
parent da3883205c
commit fc14798af5
10 changed files with 1777 additions and 108 deletions

245
operations/index/ism.go Normal file
View File

@@ -0,0 +1,245 @@
// Package index provides index-level operations for OpenSearch.
package index
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"es-demo/client"
)
// ISMPolicyManager implements PolicyManager for AWS OpenSearch ISM.
type ISMPolicyManager struct {
client *client.Client
}
// NewISMPolicyManager creates a new ISM policy manager for AWS OpenSearch.
func NewISMPolicyManager(c *client.Client) PolicyManager {
return &ISMPolicyManager{client: c}
}
// PutPolicy creates or updates an ISM policy.
func (m *ISMPolicyManager) PutPolicy(ctx context.Context, name string, policy *Policy) error {
if name == "" {
return fmt.Errorf("policy name cannot be empty")
}
if err := policy.Validate(); err != nil {
return fmt.Errorf("invalid policy: %w", err)
}
// Build ISM policy structure
ismPolicy := map[string]interface{}{
"policy": map[string]interface{}{
"description": policy.Description,
"default_state": policy.DefaultState,
"states": policy.States,
},
}
// Add ISM template if provided
if len(policy.ISMTemplate) > 0 {
ismPolicy["policy"].(map[string]interface{})["ism_template"] = policy.ISMTemplate
}
body, err := json.Marshal(ismPolicy)
if err != nil {
return fmt.Errorf("failed to marshal policy: %w", err)
}
// ISM API endpoint: PUT _plugins/_ism/policies/{policy_name}
req, err := http.NewRequestWithContext(
ctx,
http.MethodPut,
fmt.Sprintf("/_plugins/_ism/policies/%s", name),
bytes.NewReader(body),
)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
res, err := m.client.GetClient().Perform(req)
if err != nil {
return fmt.Errorf("failed to execute put policy request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusCreated {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("put policy failed with status %d: %s", res.StatusCode, string(bodyBytes))
}
return nil
}
// GetPolicy retrieves an ISM policy by name.
func (m *ISMPolicyManager) GetPolicy(ctx context.Context, name string) (*Policy, error) {
if name == "" {
return nil, fmt.Errorf("policy name cannot be empty")
}
// ISM API endpoint: GET _plugins/_ism/policies/{policy_name}
req, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
fmt.Sprintf("/_plugins/_ism/policies/%s", name),
nil,
)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
res, err := m.client.GetClient().Perform(req)
if err != nil {
return nil, fmt.Errorf("failed to execute get policy request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("policy %q not found", name)
}
if res.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("get policy failed with status %d: %s", res.StatusCode, string(bodyBytes))
}
// Parse response
var response struct {
ID string `json:"_id"`
Version int `json:"_version"`
SeqNo int `json:"_seq_no"`
Policy struct {
Description string `json:"description"`
DefaultState string `json:"default_state"`
States []State `json:"states"`
ISMTemplate []ISMTemplate `json:"ism_template,omitempty"`
} `json:"policy"`
}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return nil, fmt.Errorf("failed to decode policy response: %w", err)
}
policy := &Policy{
Description: response.Policy.Description,
DefaultState: response.Policy.DefaultState,
States: response.Policy.States,
ISMTemplate: response.Policy.ISMTemplate,
}
return policy, nil
}
// DeletePolicy deletes an ISM policy by name.
func (m *ISMPolicyManager) DeletePolicy(ctx context.Context, name string) error {
if name == "" {
return fmt.Errorf("policy name cannot be empty")
}
// ISM API endpoint: DELETE _plugins/_ism/policies/{policy_name}
req, err := http.NewRequestWithContext(
ctx,
http.MethodDelete,
fmt.Sprintf("/_plugins/_ism/policies/%s", name),
nil,
)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
res, err := m.client.GetClient().Perform(req)
if err != nil {
return fmt.Errorf("failed to execute delete policy request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode == http.StatusNotFound {
return fmt.Errorf("policy %q not found", name)
}
if res.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("delete policy failed with status %d: %s", res.StatusCode, string(bodyBytes))
}
return nil
}
// ListPolicies retrieves all ISM policies.
func (m *ISMPolicyManager) ListPolicies(ctx context.Context) (map[string]*Policy, error) {
// ISM API endpoint: GET _plugins/_ism/policies
req, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
"/_plugins/_ism/policies",
nil,
)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
res, err := m.client.GetClient().Perform(req)
if err != nil {
return nil, fmt.Errorf("failed to execute list policies request: %w", err)
}
defer func() {
if closeErr := res.Body.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
}
}()
if res.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("list policies failed with status %d: %s", res.StatusCode, string(bodyBytes))
}
// Parse response
var response struct {
Policies []struct {
ID string `json:"_id"`
Policy struct {
Description string `json:"description"`
DefaultState string `json:"default_state"`
States []State `json:"states"`
ISMTemplate []ISMTemplate `json:"ism_template,omitempty"`
} `json:"policy"`
} `json:"policies"`
TotalPolicies int `json:"total_policies"`
}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return nil, fmt.Errorf("failed to decode policies response: %w", err)
}
policies := make(map[string]*Policy, len(response.Policies))
for _, item := range response.Policies {
policies[item.ID] = &Policy{
Description: item.Policy.Description,
DefaultState: item.Policy.DefaultState,
States: item.Policy.States,
ISMTemplate: item.Policy.ISMTemplate,
}
}
return policies, nil
}

View File

@@ -0,0 +1,334 @@
package index
import (
"context"
"testing"
"es-demo/client"
"es-demo/config"
)
// setupISMIntegrationTest loads configuration and creates a client for ISM integration tests.
func setupISMIntegrationTest(t *testing.T) *client.Client {
t.Helper()
// Load configuration from project root
if err := config.Load("../../.env"); err != nil {
t.Logf("warning: failed to load .env file: %v", err)
}
cfg := &client.Config{
Endpoint: config.Endpoint,
Region: config.Region,
AccessKey: config.AccessKey,
SecretKey: config.SecretKey,
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
return c
}
// TestISMPutPolicy tests creating and updating ISM policies.
func TestISMPutPolicy(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
c := setupISMIntegrationTest(t)
// Create policy manager
pm := NewISMPolicyManager(c)
// Test policy
testPolicy := &Policy{
Description: "Test ISM policy",
DefaultState: "hot",
States: []State{
{
Name: "hot",
Actions: []Action{
NewAction("rollover", map[string]interface{}{
"min_index_age": "1d",
}),
},
Transitions: []Transition{
{
StateName: "delete",
Conditions: &Conditions{
MinIndexAge: "7d",
},
},
},
},
{
Name: "delete",
Actions: []Action{
NewAction("delete", map[string]interface{}{}),
},
},
},
ISMTemplate: []ISMTemplate{
{
IndexPatterns: []string{"test-*"},
Priority: 100,
},
},
}
ctx := context.Background()
policyName := "test-ism-policy"
// Create policy
if err := pm.PutPolicy(ctx, policyName, testPolicy); err != nil {
t.Fatalf("PutPolicy() error = %v", err)
}
// Cleanup
defer func() {
if err := pm.DeletePolicy(ctx, policyName); err != nil {
t.Logf("cleanup: failed to delete policy: %v", err)
}
}()
t.Logf("Successfully created ISM policy: %s", policyName)
}
// TestISMGetPolicy tests retrieving ISM policies.
func TestISMGetPolicy(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
c := setupISMIntegrationTest(t)
pm := NewISMPolicyManager(c)
ctx := context.Background()
policyName := "test-get-policy"
// Create a policy first
testPolicy := &Policy{
Description: "Test get policy",
DefaultState: "hot",
States: []State{
{
Name: "hot",
Actions: []Action{
NewAction("rollover", map[string]interface{}{
"min_index_age": "1d",
}),
},
},
},
}
if err := pm.PutPolicy(ctx, policyName, testPolicy); err != nil {
t.Fatalf("setup: failed to create policy: %v", err)
}
defer func() {
if err := pm.DeletePolicy(ctx, policyName); err != nil {
t.Logf("cleanup: failed to delete policy: %v", err)
}
}()
// Get the policy
retrievedPolicy, err := pm.GetPolicy(ctx, policyName)
if err != nil {
t.Fatalf("GetPolicy() error = %v", err)
}
if retrievedPolicy.Description != testPolicy.Description {
t.Errorf("Description = %v, want %v", retrievedPolicy.Description, testPolicy.Description)
}
if retrievedPolicy.DefaultState != testPolicy.DefaultState {
t.Errorf("DefaultState = %v, want %v", retrievedPolicy.DefaultState, testPolicy.DefaultState)
}
if len(retrievedPolicy.States) != len(testPolicy.States) {
t.Errorf("States count = %v, want %v", len(retrievedPolicy.States), len(testPolicy.States))
}
}
// TestISMDeletePolicy tests deleting ISM policies.
func TestISMDeletePolicy(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
c := setupISMIntegrationTest(t)
pm := NewISMPolicyManager(c)
ctx := context.Background()
policyName := "test-delete-policy"
// Create a policy
testPolicy := &Policy{
Description: "Test delete policy",
DefaultState: "hot",
States: []State{
{
Name: "hot",
},
},
}
if err := pm.PutPolicy(ctx, policyName, testPolicy); err != nil {
t.Fatalf("setup: failed to create policy: %v", err)
}
// Delete the policy
if err := pm.DeletePolicy(ctx, policyName); err != nil {
t.Fatalf("DeletePolicy() error = %v", err)
}
// Verify it's deleted
_, err := pm.GetPolicy(ctx, policyName)
if err == nil {
t.Error("GetPolicy() should return error for deleted policy")
}
}
// TestISMListPolicies tests listing all ISM policies.
func TestISMListPolicies(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
c := setupISMIntegrationTest(t)
pm := NewISMPolicyManager(c)
ctx := context.Background()
// List policies
policies, err := pm.ListPolicies(ctx)
if err != nil {
t.Fatalf("ListPolicies() error = %v", err)
}
t.Logf("Found %d policies", len(policies))
// Create a test policy to verify it appears in the list
policyName := "test-list-policy"
testPolicy := &Policy{
Description: "Test list policy",
DefaultState: "hot",
States: []State{
{
Name: "hot",
},
},
}
if err := pm.PutPolicy(ctx, policyName, testPolicy); err != nil {
t.Fatalf("setup: failed to create policy: %v", err)
}
defer func() {
if err := pm.DeletePolicy(ctx, policyName); err != nil {
t.Logf("cleanup: failed to delete policy: %v", err)
}
}()
// List again and verify the new policy exists
policies, err = pm.ListPolicies(ctx)
if err != nil {
t.Fatalf("ListPolicies() error = %v", err)
}
if _, exists := policies[policyName]; !exists {
t.Errorf("ListPolicies() should include policy %q", policyName)
}
}
// TestPolicy_Validate tests policy validation.
func TestPolicy_Validate(t *testing.T) {
tests := []struct {
name string
policy *Policy
wantErr bool
}{
{
name: "valid ISM policy",
policy: &Policy{
Description: "Test policy",
DefaultState: "hot",
States: []State{
{Name: "hot"},
{Name: "warm"},
},
},
wantErr: false,
},
{
name: "nil policy",
policy: nil,
wantErr: true,
},
{
name: "missing default_state",
policy: &Policy{
States: []State{
{Name: "hot"},
},
},
wantErr: true,
},
{
name: "default_state not in states",
policy: &Policy{
DefaultState: "cold",
States: []State{
{Name: "hot"},
{Name: "warm"},
},
},
wantErr: true,
},
{
name: "no states or phases",
policy: &Policy{
Description: "Empty policy",
},
wantErr: true,
},
{
name: "both ISM and ILM defined",
policy: &Policy{
DefaultState: "hot",
States: []State{{Name: "hot"}},
Phases: map[string]Phase{
"hot": {},
},
},
wantErr: true,
},
{
name: "valid ILM policy",
policy: &Policy{
Phases: map[string]Phase{
"hot": {
MinAge: "0ms",
},
"delete": {
MinAge: "30d",
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.policy.Validate()
if (err != nil) != tt.wantErr {
t.Errorf("Policy.Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

View File

@@ -0,0 +1,503 @@
package index
import (
"context"
"encoding/json"
"strings"
"testing"
"es-demo/client"
"es-demo/config"
)
// TestISMPolicyManager_PutPolicy_Validation tests policy validation
func TestISMPolicyManager_PutPolicy_Validation(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
if err := config.Load(".env"); err != nil {
t.Logf("warning: failed to load .env file: %v", err)
}
config.Init()
cfg := &client.Config{
Endpoint: config.Endpoint,
Region: config.Region,
AccessKey: config.AccessKey,
SecretKey: config.SecretKey,
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
pm := NewISMPolicyManager(c)
ctx := context.Background()
tests := []struct {
name string
policyName string
policy *Policy
wantErr bool
}{
{
name: "empty policy name",
policyName: "",
policy: &Policy{
DefaultState: "hot",
States: []State{{Name: "hot"}},
},
wantErr: true,
},
{
name: "invalid policy - nil",
policyName: "test",
policy: nil,
wantErr: true,
},
{
name: "invalid policy - no states",
policyName: "test",
policy: &Policy{
Description: "Test",
},
wantErr: true,
},
{
name: "invalid policy - missing default_state",
policyName: "test",
policy: &Policy{
States: []State{{Name: "hot"}},
},
wantErr: true,
},
{
name: "invalid policy - default_state not in states",
policyName: "test",
policy: &Policy{
DefaultState: "cold",
States: []State{{Name: "hot"}},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := pm.PutPolicy(ctx, tt.policyName, tt.policy)
if (err != nil) != tt.wantErr {
t.Errorf("PutPolicy() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
// TestISMPolicyManager_GetPolicy_NotFound tests GetPolicy when policy doesn't exist
func TestISMPolicyManager_GetPolicy_NotFound(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
if err := config.Load(".env"); err != nil {
t.Logf("warning: failed to load .env file: %v", err)
}
config.Init()
cfg := &client.Config{
Endpoint: config.Endpoint,
Region: config.Region,
AccessKey: config.AccessKey,
SecretKey: config.SecretKey,
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
pm := NewISMPolicyManager(c)
ctx := context.Background()
_, err = pm.GetPolicy(ctx, "non-existent-policy-12345")
if err == nil {
t.Error("GetPolicy() should return error for non-existent policy")
}
}
// TestISMPolicyManager_DeletePolicy_NotFound tests DeletePolicy when policy doesn't exist
func TestISMPolicyManager_DeletePolicy_NotFound(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
if err := config.Load(".env"); err != nil {
t.Logf("warning: failed to load .env file: %v", err)
}
config.Init()
cfg := &client.Config{
Endpoint: config.Endpoint,
Region: config.Region,
AccessKey: config.AccessKey,
SecretKey: config.SecretKey,
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
pm := NewISMPolicyManager(c)
ctx := context.Background()
err = pm.DeletePolicy(ctx, "non-existent-policy-12345")
if err == nil {
t.Error("DeletePolicy() should return error for non-existent policy")
}
}
// TestISMPolicyManager_ListPolicies_EmptyCheck tests ListPolicies returns valid map
func TestISMPolicyManager_ListPolicies_EmptyCheck(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
if err := config.Load(".env"); err != nil {
t.Logf("warning: failed to load .env file: %v", err)
}
config.Init()
cfg := &client.Config{
Endpoint: config.Endpoint,
Region: config.Region,
AccessKey: config.AccessKey,
SecretKey: config.SecretKey,
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
pm := NewISMPolicyManager(c)
ctx := context.Background()
policies, err := pm.ListPolicies(ctx)
if err != nil {
t.Fatalf("ListPolicies() error = %v", err)
}
if policies == nil {
t.Error("ListPolicies() should not return nil map")
}
}
// TestPolicy_ValidateEdgeCases tests additional edge cases for policy validation
func TestPolicy_ValidateEdgeCases(t *testing.T) {
tests := []struct {
name string
policy *Policy
wantErr bool
}{
{
name: "nil policy",
policy: nil,
wantErr: true,
},
{
name: "empty states and empty phases",
policy: &Policy{
Description: "Empty policy",
},
wantErr: true,
},
{
name: "both ISM states and ILM phases",
policy: &Policy{
DefaultState: "hot",
States: []State{{Name: "hot"}},
Phases: map[string]Phase{
"hot": {MinAge: "0ms"},
},
},
wantErr: true,
},
{
name: "valid ISM policy with transitions",
policy: &Policy{
Description: "Test",
DefaultState: "hot",
States: []State{
{
Name: "hot",
Transitions: []Transition{
{
StateName: "warm",
Conditions: &Conditions{
MinIndexAge: "1d",
},
},
},
},
{
Name: "warm",
},
},
},
wantErr: false,
},
{
name: "valid ISM policy with actions",
policy: &Policy{
DefaultState: "hot",
States: []State{
{
Name: "hot",
Actions: []Action{
{
Config: map[string]interface{}{
"rollover": map[string]interface{}{
"min_index_age": "1d",
},
},
},
},
},
},
},
wantErr: false,
},
{
name: "valid ILM policy with multiple phases",
policy: &Policy{
Description: "ILM policy",
Phases: map[string]Phase{
"hot": {
MinAge: "0ms",
Actions: map[string]interface{}{
"rollover": map[string]interface{}{
"max_age": "30d",
},
},
},
"delete": {
MinAge: "90d",
Actions: map[string]interface{}{
"delete": map[string]interface{}{},
},
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.policy.Validate()
if (err != nil) != tt.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
// TestPolicy_JSONMarshaling tests policy JSON marshaling
func TestPolicy_JSONMarshaling(t *testing.T) {
policy := &Policy{
Description: "Test policy",
DefaultState: "hot",
States: []State{
{
Name: "hot",
Actions: []Action{
{
Config: map[string]interface{}{
"rollover": map[string]interface{}{
"min_index_age": "1d",
},
},
},
},
},
},
ISMTemplate: []ISMTemplate{
{
IndexPatterns: []string{"test-*"},
Priority: 100,
},
},
}
// Marshal
data, err := json.Marshal(policy)
if err != nil {
t.Fatalf("json.Marshal() error = %v", err)
}
// Unmarshal
var decoded Policy
if err := json.Unmarshal(data, &decoded); err != nil {
t.Fatalf("json.Unmarshal() error = %v", err)
}
// Verify
if decoded.Description != policy.Description {
t.Errorf("Description = %v, want %v", decoded.Description, policy.Description)
}
if decoded.DefaultState != policy.DefaultState {
t.Errorf("DefaultState = %v, want %v", decoded.DefaultState, policy.DefaultState)
}
if len(decoded.States) != len(policy.States) {
t.Errorf("States length = %d, want %d", len(decoded.States), len(policy.States))
}
if len(decoded.ISMTemplate) != len(policy.ISMTemplate) {
t.Errorf("ISMTemplate length = %d, want %d", len(decoded.ISMTemplate), len(policy.ISMTemplate))
}
}
// TestPolicy_MarshalError tests error handling in policy marshaling
func TestPolicy_MarshalError(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
if err := config.Load(".env"); err != nil {
t.Logf("warning: failed to load .env file: %v", err)
}
config.Init()
cfg := &client.Config{
Endpoint: config.Endpoint,
Region: config.Region,
AccessKey: config.AccessKey,
SecretKey: config.SecretKey,
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
pm := NewISMPolicyManager(c)
ctx := context.Background()
// 创建一个包含不可序列化数据的策略
policy := &Policy{
DefaultState: "hot",
States: []State{
{
Name: "hot",
Actions: []Action{
{
Config: map[string]interface{}{
"invalid": make(chan int), // channels 不能被 JSON 序列化
},
},
},
},
},
}
err = pm.PutPolicy(ctx, "test", policy)
if err == nil {
t.Error("PutPolicy() should return error for invalid JSON")
}
if !strings.Contains(err.Error(), "marshal") {
t.Errorf("Error should mention marshal, got: %v", err)
}
}
// TestState_Transitions tests state transition logic
func TestState_Transitions(t *testing.T) {
state := State{
Name: "hot",
Transitions: []Transition{
{
StateName: "warm",
Conditions: &Conditions{
MinIndexAge: "1d",
},
},
{
StateName: "delete",
Conditions: &Conditions{
MinIndexAge: "7d",
},
},
},
}
if len(state.Transitions) != 2 {
t.Errorf("Expected 2 transitions, got %d", len(state.Transitions))
}
if state.Transitions[0].StateName != "warm" {
t.Errorf("First transition state = %v, want warm", state.Transitions[0].StateName)
}
if state.Transitions[1].Conditions.MinIndexAge != "7d" {
t.Errorf("Second transition min_index_age = %v, want 7d", state.Transitions[1].Conditions.MinIndexAge)
}
}
// TestAction_Config tests action configuration
func TestAction_Config(t *testing.T) {
action := Action{
Config: map[string]interface{}{
"rollover": map[string]interface{}{
"min_index_age": "1d",
"min_doc_count": 1000000,
},
},
}
rollover, ok := action.Config["rollover"].(map[string]interface{})
if !ok {
t.Fatal("Failed to get rollover config")
}
if rollover["min_index_age"] != "1d" {
t.Errorf("min_index_age = %v, want 1d", rollover["min_index_age"])
}
if rollover["min_doc_count"] != 1000000 {
t.Errorf("min_doc_count = %v, want 1000000", rollover["min_doc_count"])
}
}
// TestISMTemplate_Patterns tests ISM template index patterns
func TestISMTemplate_Patterns(t *testing.T) {
template := ISMTemplate{
IndexPatterns: []string{"logs-*", "metrics-*", "traces-*"},
Priority: 100,
}
if len(template.IndexPatterns) != 3 {
t.Errorf("Expected 3 index patterns, got %d", len(template.IndexPatterns))
}
if template.Priority != 100 {
t.Errorf("Priority = %d, want 100", template.Priority)
}
// Test JSON marshaling
data, err := json.Marshal(template)
if err != nil {
t.Fatalf("json.Marshal() error = %v", err)
}
var decoded ISMTemplate
if err := json.Unmarshal(data, &decoded); err != nil {
t.Fatalf("json.Unmarshal() error = %v", err)
}
if len(decoded.IndexPatterns) != len(template.IndexPatterns) {
t.Errorf("Decoded index patterns length = %d, want %d", len(decoded.IndexPatterns), len(template.IndexPatterns))
}
}

149
operations/index/policy.go Normal file
View File

@@ -0,0 +1,149 @@
// Package index provides index-level operations for OpenSearch.
package index
import (
"context"
"encoding/json"
"errors"
)
var (
// ErrInvalidPolicy is returned when the policy configuration is invalid.
ErrInvalidPolicy = errors.New("invalid policy configuration")
)
// PolicyManager defines the interface for managing index lifecycle policies.
// This interface abstracts both AWS OpenSearch ISM (Index State Management)
// and Elasticsearch ILM (Index Lifecycle Management) to provide a unified API.
type PolicyManager interface {
// PutPolicy creates or updates a lifecycle policy.
PutPolicy(ctx context.Context, name string, policy *Policy) error
// GetPolicy retrieves a lifecycle policy by name.
GetPolicy(ctx context.Context, name string) (*Policy, error)
// DeletePolicy deletes a lifecycle policy by name.
DeletePolicy(ctx context.Context, name string) error
// ListPolicies retrieves all lifecycle policies.
ListPolicies(ctx context.Context) (map[string]*Policy, error)
}
// Policy represents a generic index lifecycle policy.
// It can be converted to ISM (AWS OpenSearch) or ILM (Elasticsearch) format.
type Policy struct {
// Description of the policy
Description string `json:"description,omitempty"`
// DefaultState is the initial state (ISM specific)
DefaultState string `json:"default_state,omitempty"`
// States defines the lifecycle states (ISM format)
States []State `json:"states,omitempty"`
// Phases defines the lifecycle phases (ILM format)
// This field is used when targeting Elasticsearch
Phases map[string]Phase `json:"phases,omitempty"`
// ISMTemplate for applying policy to indices (ISM specific)
ISMTemplate []ISMTemplate `json:"ism_template,omitempty"`
}
// State represents a state in ISM (AWS OpenSearch).
type State struct {
Name string `json:"name"`
Actions []Action `json:"actions,omitempty"`
Transitions []Transition `json:"transitions,omitempty"`
}
// Action represents an action within a state.
// In ISM, actions are serialized as a single key-value object where
// the key is the action type and the value is the configuration.
type Action struct {
// Config contains the action type as the key and its configuration as the value
// Example: {"rollover": {"min_index_age": "1d"}}
Config map[string]interface{}
}
// NewAction creates a new action with the specified type and configuration.
func NewAction(actionType string, config map[string]interface{}) Action {
return Action{
Config: map[string]interface{}{
actionType: config,
},
}
}
// MarshalJSON implements custom JSON marshaling for Action.
// It directly marshals the Config map instead of wrapping it in a "Config" field.
func (a Action) MarshalJSON() ([]byte, error) {
return json.Marshal(a.Config)
}
// UnmarshalJSON implements custom JSON unmarshaling for Action.
func (a *Action) UnmarshalJSON(data []byte) error {
return json.Unmarshal(data, &a.Config)
}
// Transition defines when to move to the next state.
type Transition struct {
StateName string `json:"state_name"`
Conditions *Conditions `json:"conditions,omitempty"`
}
// Conditions defines the conditions for state transitions.
type Conditions struct {
MinIndexAge string `json:"min_index_age,omitempty"`
MinDocCount *int64 `json:"min_doc_count,omitempty"`
MinSize string `json:"min_size,omitempty"`
}
// Phase represents a phase in ILM (Elasticsearch).
// This is for future Elasticsearch compatibility.
type Phase struct {
MinAge string `json:"min_age,omitempty"`
Actions map[string]interface{} `json:"actions,omitempty"`
}
// ISMTemplate defines index patterns for automatic policy application.
type ISMTemplate struct {
IndexPatterns []string `json:"index_patterns"`
Priority int `json:"priority"`
}
// Validate checks if the policy configuration is valid.
func (p *Policy) Validate() error {
if p == nil {
return ErrInvalidPolicy
}
// For ISM: must have at least one state with a default_state
if len(p.States) > 0 {
if p.DefaultState == "" {
return errors.New("default_state is required when states are defined")
}
// Verify default_state exists in states
found := false
for _, state := range p.States {
if state.Name == p.DefaultState {
found = true
break
}
}
if !found {
return errors.New("default_state must match one of the defined states")
}
}
// For ILM: must have at least one phase
if len(p.Phases) > 0 && len(p.States) > 0 {
return errors.New("cannot define both ISM states and ILM phases in the same policy")
}
if len(p.States) == 0 && len(p.Phases) == 0 {
return errors.New("policy must define either states (ISM) or phases (ILM)")
}
return nil
}

View File

@@ -62,7 +62,37 @@ func PutTemplate(ctx context.Context, c *client.Client, name string, template *T
return fmt.Errorf("%w: template name is required", ErrInvalidTemplate)
}
body, err := json.Marshal(template)
// Build template object with only non-empty fields
templateObj := make(map[string]any)
if len(template.Settings) > 0 {
templateObj["settings"] = template.Settings
}
if len(template.Mappings) > 0 {
templateObj["mappings"] = template.Mappings
}
if len(template.Aliases) > 0 {
templateObj["aliases"] = template.Aliases
}
// Wrap template in template field for the API
requestBody := map[string]any{
"index_patterns": template.IndexPatterns,
}
// Only add template field if it has content
if len(templateObj) > 0 {
requestBody["template"] = templateObj
}
// Add optional fields
if template.Priority > 0 {
requestBody["priority"] = template.Priority
}
if template.Version > 0 {
requestBody["version"] = template.Version
}
body, err := json.Marshal(requestBody)
if err != nil {
return fmt.Errorf("failed to marshal template: %w", err)
}

View File

@@ -2,23 +2,27 @@ package index
import (
"context"
"errors"
"testing"
"time"
"es-demo/client"
"es-demo/config"
)
func TestPutTemplate(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
// setupIntegrationTest loads configuration and creates a client for integration tests.
func setupIntegrationTest(t *testing.T) *client.Client {
t.Helper()
// Load configuration from project root
if err := config.Load("../../.env"); err != nil {
t.Logf("warning: failed to load .env file: %v", err)
}
cfg := &client.Config{
Endpoint: "https://example.com",
Region: "us-east-1",
AccessKey: "test-key",
SecretKey: "test-secret",
Endpoint: config.Endpoint,
Region: config.Region,
AccessKey: config.AccessKey,
SecretKey: config.SecretKey,
}
c, err := client.NewClient(cfg)
@@ -26,11 +30,139 @@ func TestPutTemplate(t *testing.T) {
t.Fatalf("failed to create client: %v", err)
}
return c
}
// TestTemplateIntegration tests complete template lifecycle
func TestTemplateIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
c := setupIntegrationTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
templateName := "test-template-integration"
// Cleanup before test
_ = DeleteTemplate(ctx, c, templateName)
// Test 1: Create template
template := &Template{
IndexPatterns: []string{"test-integration-*"},
Settings: map[string]any{
"number_of_shards": 1,
"number_of_replicas": 0,
},
Mappings: map[string]any{
"properties": map[string]any{
"timestamp": map[string]any{
"type": "date",
},
"message": map[string]any{
"type": "text",
},
},
},
Priority: 100,
}
t.Run("PutTemplate", func(t *testing.T) {
err := PutTemplate(ctx, c, templateName, template)
if err != nil {
t.Fatalf("PutTemplate() error = %v", err)
}
t.Log("Template created successfully")
})
// Test 2: Get template
t.Run("GetTemplate", func(t *testing.T) {
retrieved, err := GetTemplate(ctx, c, templateName)
if err != nil {
t.Fatalf("GetTemplate() error = %v", err)
}
if len(retrieved.IndexPatterns) != len(template.IndexPatterns) {
t.Errorf("IndexPatterns = %v, want %v", retrieved.IndexPatterns, template.IndexPatterns)
}
if retrieved.Priority != template.Priority {
t.Errorf("Priority = %d, want %d", retrieved.Priority, template.Priority)
}
t.Logf("Retrieved template: %d index patterns, priority %d", len(retrieved.IndexPatterns), retrieved.Priority)
})
// Test 3: Update template
t.Run("UpdateTemplate", func(t *testing.T) {
template.Priority = 200
err := PutTemplate(ctx, c, templateName, template)
if err != nil {
t.Fatalf("PutTemplate() (update) error = %v", err)
}
retrieved, err := GetTemplate(ctx, c, templateName)
if err != nil {
t.Fatalf("GetTemplate() error = %v", err)
}
if retrieved.Priority != 200 {
t.Errorf("Updated Priority = %d, want 200", retrieved.Priority)
}
t.Log("Template updated successfully")
})
// Test 4: List templates
t.Run("ListTemplates", func(t *testing.T) {
templates, err := ListTemplates(ctx, c)
if err != nil {
t.Fatalf("ListTemplates() error = %v", err)
}
if _, exists := templates[templateName]; !exists {
t.Errorf("Template %q not found in list", templateName)
}
t.Logf("Found %d templates", len(templates))
})
// Test 5: Delete template
t.Run("DeleteTemplate", func(t *testing.T) {
err := DeleteTemplate(ctx, c, templateName)
if err != nil {
t.Fatalf("DeleteTemplate() error = %v", err)
}
// Verify deletion
_, err = GetTemplate(ctx, c, templateName)
if err == nil {
t.Error("GetTemplate() should return error after deletion")
}
t.Log("Template deleted successfully")
})
}
func TestPutTemplate(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
c := setupIntegrationTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
templateName := "test-put-template"
defer func() {
_ = DeleteTemplate(context.Background(), c, templateName)
}()
template := &Template{
IndexPatterns: []string{"test-*"},
IndexPatterns: []string{"test-put-*"},
Settings: map[string]any{
"number_of_shards": 1,
"number_of_replicas": 0,
@@ -44,11 +176,12 @@ func TestPutTemplate(t *testing.T) {
},
}
// This will fail without a real cluster
err = PutTemplate(ctx, c, "test-template", template)
if err == nil {
t.Log("PutTemplate succeeded (unexpected in unit test)")
err := PutTemplate(ctx, c, templateName, template)
if err != nil {
t.Fatalf("PutTemplate() error = %v", err)
}
t.Log("PutTemplate succeeded")
}
func TestGetTemplate(t *testing.T) {
@@ -56,26 +189,41 @@ func TestGetTemplate(t *testing.T) {
t.Skip("skipping integration test in short mode")
}
cfg := &client.Config{
Endpoint: "https://example.com",
Region: "us-east-1",
AccessKey: "test-key",
SecretKey: "test-secret",
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
c := setupIntegrationTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// This will fail without a real cluster
_, err = GetTemplate(ctx, c, "test-template")
if err == nil {
t.Log("GetTemplate succeeded (unexpected in unit test)")
templateName := "test-get-template"
// Create a template first
template := &Template{
IndexPatterns: []string{"test-get-*"},
Settings: map[string]any{
"number_of_shards": 1,
},
}
err := PutTemplate(ctx, c, templateName, template)
if err != nil {
t.Fatalf("setup: PutTemplate() error = %v", err)
}
defer func() {
_ = DeleteTemplate(context.Background(), c, templateName)
}()
// Test getting the template
retrieved, err := GetTemplate(ctx, c, templateName)
if err != nil {
t.Fatalf("GetTemplate() error = %v", err)
}
if len(retrieved.IndexPatterns) == 0 {
t.Error("GetTemplate() returned empty index patterns")
}
t.Logf("GetTemplate succeeded: %v", retrieved.IndexPatterns)
}
func TestDeleteTemplate(t *testing.T) {
@@ -83,26 +231,30 @@ func TestDeleteTemplate(t *testing.T) {
t.Skip("skipping integration test in short mode")
}
cfg := &client.Config{
Endpoint: "https://example.com",
Region: "us-east-1",
AccessKey: "test-key",
SecretKey: "test-secret",
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
c := setupIntegrationTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// This will fail without a real cluster
err = DeleteTemplate(ctx, c, "test-template")
if err == nil {
t.Log("DeleteTemplate succeeded (unexpected in unit test)")
templateName := "test-delete-template"
// Create a template
template := &Template{
IndexPatterns: []string{"test-delete-*"},
}
err := PutTemplate(ctx, c, templateName, template)
if err != nil {
t.Fatalf("setup: PutTemplate() error = %v", err)
}
// Test deletion
err = DeleteTemplate(ctx, c, templateName)
if err != nil {
t.Fatalf("DeleteTemplate() error = %v", err)
}
t.Log("DeleteTemplate succeeded")
}
func TestListTemplates(t *testing.T) {
@@ -110,71 +262,19 @@ func TestListTemplates(t *testing.T) {
t.Skip("skipping integration test in short mode")
}
cfg := &client.Config{
Endpoint: "https://example.com",
Region: "us-east-1",
AccessKey: "test-key",
SecretKey: "test-secret",
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
c := setupIntegrationTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// This will fail without a real cluster
_, err = ListTemplates(ctx, c)
if err == nil {
t.Log("ListTemplates succeeded (unexpected in unit test)")
}
}
func TestTemplate_Validate(t *testing.T) {
tests := []struct {
name string
template *Template
wantErr error
}{
{
name: "valid template",
template: &Template{
IndexPatterns: []string{"logs-*"},
Settings: map[string]any{
"number_of_shards": 1,
},
},
wantErr: nil,
},
{
name: "nil template",
template: nil,
wantErr: ErrInvalidTemplate,
},
{
name: "empty index patterns",
template: &Template{
IndexPatterns: []string{},
},
wantErr: ErrInvalidTemplate,
},
{
name: "nil index patterns",
template: &Template{
IndexPatterns: nil,
},
wantErr: ErrInvalidTemplate,
},
templates, err := ListTemplates(ctx, c)
if err != nil {
t.Fatalf("ListTemplates() error = %v", err)
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateTemplate(tt.template)
if !errors.Is(err, tt.wantErr) {
t.Errorf("validateTemplate() error = %v, wantErr %v", err, tt.wantErr)
}
})
if templates == nil {
t.Error("ListTemplates() returned nil")
}
t.Logf("ListTemplates succeeded: found %d templates", len(templates))
}

View File

@@ -0,0 +1,292 @@
package index
import (
"context"
"encoding/json"
"testing"
"es-demo/client"
"es-demo/config"
)
// TestPutTemplate_Validation tests template validation in PutTemplate
func TestPutTemplate_Validation(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
// 加载真实配置用于集成测试
if err := config.Load(".env"); err != nil {
t.Logf("warning: failed to load .env file: %v", err)
}
config.Init()
cfg := &client.Config{
Endpoint: config.Endpoint,
Region: config.Region,
AccessKey: config.AccessKey,
SecretKey: config.SecretKey,
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
ctx := context.Background()
tests := []struct {
name string
tmplName string
template *Template
wantErr bool
}{
{
name: "empty template name",
tmplName: "",
template: &Template{
IndexPatterns: []string{"test-*"},
},
wantErr: true,
},
{
name: "nil template",
tmplName: "test",
template: nil,
wantErr: true,
},
{
name: "empty index patterns",
tmplName: "test",
template: &Template{
IndexPatterns: []string{},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := PutTemplate(ctx, c, tt.tmplName, tt.template)
if (err != nil) != tt.wantErr {
t.Errorf("PutTemplate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
// TestGetTemplate_NotFound tests GetTemplate when template doesn't exist
func TestGetTemplate_NotFound(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
if err := config.Load(".env"); err != nil {
t.Logf("warning: failed to load .env file: %v", err)
}
config.Init()
cfg := &client.Config{
Endpoint: config.Endpoint,
Region: config.Region,
AccessKey: config.AccessKey,
SecretKey: config.SecretKey,
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
ctx := context.Background()
// 尝试获取不存在的模板
_, err = GetTemplate(ctx, c, "non-existent-template-12345")
if err == nil {
t.Error("GetTemplate() should return error for non-existent template")
}
}
// TestDeleteTemplate_NotFound tests DeleteTemplate when template doesn't exist
func TestDeleteTemplate_NotFound(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
if err := config.Load(".env"); err != nil {
t.Logf("warning: failed to load .env file: %v", err)
}
config.Init()
cfg := &client.Config{
Endpoint: config.Endpoint,
Region: config.Region,
AccessKey: config.AccessKey,
SecretKey: config.SecretKey,
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
ctx := context.Background()
// 尝试删除不存在的模板
err = DeleteTemplate(ctx, c, "non-existent-template-12345")
if err == nil {
t.Error("DeleteTemplate() should return error for non-existent template")
}
}
// TestValidateTemplate_EdgeCases tests edge cases in template validation
func TestValidateTemplate_EdgeCases(t *testing.T) {
tests := []struct {
name string
template *Template
wantErr bool
}{
{
name: "nil template",
template: nil,
wantErr: true,
},
{
name: "empty index patterns",
template: &Template{
IndexPatterns: []string{},
},
wantErr: true,
},
{
name: "nil index patterns",
template: &Template{
IndexPatterns: nil,
},
wantErr: true,
},
{
name: "valid minimal template",
template: &Template{
IndexPatterns: []string{"test-*"},
},
wantErr: false,
},
{
name: "valid template with settings",
template: &Template{
IndexPatterns: []string{"logs-*", "metrics-*"},
Settings: map[string]any{
"number_of_shards": 2,
},
},
wantErr: false,
},
{
name: "valid template with all fields",
template: &Template{
IndexPatterns: []string{"app-*"},
Settings: map[string]any{
"number_of_shards": 1,
},
Mappings: map[string]any{
"properties": map[string]any{
"timestamp": map[string]any{
"type": "date",
},
},
},
Aliases: map[string]any{
"my-alias": map[string]any{},
},
Priority: 100,
Version: 1,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateTemplate(tt.template)
if (err != nil) != tt.wantErr {
t.Errorf("validateTemplate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
// TestListTemplates_EmptyResponse tests ListTemplates with no templates
func TestListTemplates_EmptyResponse(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
if err := config.Load(".env"); err != nil {
t.Logf("warning: failed to load .env file: %v", err)
}
config.Init()
cfg := &client.Config{
Endpoint: config.Endpoint,
Region: config.Region,
AccessKey: config.AccessKey,
SecretKey: config.SecretKey,
}
c, err := client.NewClient(cfg)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
ctx := context.Background()
// ListTemplates 应该始终返回至少一个空的 map
templates, err := ListTemplates(ctx, c)
if err != nil {
t.Fatalf("ListTemplates() error = %v", err)
}
if templates == nil {
t.Error("ListTemplates() should not return nil map")
}
}
// TestTemplate_JSONMarshaling tests template JSON marshaling
func TestTemplate_JSONMarshaling(t *testing.T) {
template := &Template{
IndexPatterns: []string{"test-*"},
Settings: map[string]any{
"number_of_shards": 1,
},
Mappings: map[string]any{
"properties": map[string]any{
"field1": map[string]any{
"type": "text",
},
},
},
Priority: 100,
}
// Marshal
data, err := json.Marshal(template)
if err != nil {
t.Fatalf("json.Marshal() error = %v", err)
}
// Unmarshal
var decoded Template
if err := json.Unmarshal(data, &decoded); err != nil {
t.Fatalf("json.Unmarshal() error = %v", err)
}
// Verify
if len(decoded.IndexPatterns) != len(template.IndexPatterns) {
t.Errorf("IndexPatterns length = %d, want %d", len(decoded.IndexPatterns), len(template.IndexPatterns))
}
if decoded.Priority != template.Priority {
t.Errorf("Priority = %d, want %d", decoded.Priority, template.Priority)
}
}