feat: opensearch客户端初始化 feat: 索引模板接口 ai: 开发准则 chore: TDD流水线脚本
This commit is contained in:
58
operations/cluster/cluster.go
Normal file
58
operations/cluster/cluster.go
Normal file
@@ -0,0 +1,58 @@
|
||||
// Package cluster provides cluster-level operations for OpenSearch administration.
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"es-demo/client"
|
||||
|
||||
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
|
||||
)
|
||||
|
||||
// Info represents essential information about the OpenSearch cluster.
|
||||
type Info struct {
|
||||
Name string `json:"name"`
|
||||
ClusterName string `json:"cluster_name"`
|
||||
ClusterUUID string `json:"cluster_uuid"`
|
||||
Version struct {
|
||||
Number string `json:"number"`
|
||||
BuildType string `json:"build_type"`
|
||||
BuildHash string `json:"build_hash"`
|
||||
BuildDate string `json:"build_date"`
|
||||
BuildSnapshot bool `json:"build_snapshot"`
|
||||
LuceneVersion string `json:"lucene_version"`
|
||||
MinimumWireCompatibilityVersion string `json:"minimum_wire_compatibility_version"`
|
||||
MinimumIndexCompatibilityVersion string `json:"minimum_index_compatibility_version"`
|
||||
} `json:"version"`
|
||||
Tagline string `json:"tagline"`
|
||||
}
|
||||
|
||||
// GetInfo retrieves essential information about the OpenSearch cluster.
|
||||
// It returns the cluster information or an error if the request fails.
|
||||
func GetInfo(ctx context.Context, c *client.Client) (*Info, error) {
|
||||
req := opensearchapi.InfoRequest{}
|
||||
|
||||
res, err := req.Do(ctx, c.GetClient())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to execute info request: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := res.Body.Close(); closeErr != nil {
|
||||
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
||||
}
|
||||
}()
|
||||
|
||||
if res.IsError() {
|
||||
return nil, fmt.Errorf("request failed with status: %s", res.Status())
|
||||
}
|
||||
|
||||
var info Info
|
||||
if err := json.NewDecoder(res.Body).Decode(&info); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode cluster info: %w", err)
|
||||
}
|
||||
|
||||
return &info, nil
|
||||
}
|
||||
40
operations/cluster/cluster_test.go
Normal file
40
operations/cluster/cluster_test.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"es-demo/client"
|
||||
)
|
||||
|
||||
func TestGetInfo(t *testing.T) {
|
||||
// This is a unit test that doesn't require real connection
|
||||
// Integration tests should be run separately
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
cfg := &client.Config{
|
||||
Endpoint: "https://example.com",
|
||||
Region: "us-east-1",
|
||||
AccessKey: "test-key",
|
||||
SecretKey: "test-secret",
|
||||
}
|
||||
|
||||
c, err := client.NewClient(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create client: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// This will fail without a real cluster, which is expected in unit tests
|
||||
_, err = GetInfo(ctx, c)
|
||||
// We just verify the method signature is correct
|
||||
// Real integration tests would check actual responses
|
||||
if err == nil {
|
||||
t.Log("GetInfo succeeded (unexpected in unit test)")
|
||||
}
|
||||
}
|
||||
217
operations/index/template.go
Normal file
217
operations/index/template.go
Normal file
@@ -0,0 +1,217 @@
|
||||
// Package index provides index-level operations for OpenSearch.
|
||||
package index
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"es-demo/client"
|
||||
|
||||
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrInvalidTemplate is returned when the template configuration is invalid.
|
||||
ErrInvalidTemplate = errors.New("invalid template configuration")
|
||||
|
||||
// ErrTemplateNotFound is returned when the specified template does not exist.
|
||||
ErrTemplateNotFound = errors.New("template not found")
|
||||
)
|
||||
|
||||
// Template represents an OpenSearch index template.
|
||||
type Template struct {
|
||||
// IndexPatterns defines the index patterns this template applies to.
|
||||
IndexPatterns []string `json:"index_patterns"`
|
||||
|
||||
// Settings contains index settings like number of shards and replicas.
|
||||
Settings map[string]any `json:"settings,omitempty"`
|
||||
|
||||
// Mappings defines the field mappings for the index.
|
||||
Mappings map[string]any `json:"mappings,omitempty"`
|
||||
|
||||
// Aliases defines index aliases.
|
||||
Aliases map[string]any `json:"aliases,omitempty"`
|
||||
|
||||
// Priority determines template precedence when multiple templates match.
|
||||
Priority int `json:"priority,omitempty"`
|
||||
|
||||
// Version is used for external version management.
|
||||
Version int `json:"version,omitempty"`
|
||||
}
|
||||
|
||||
// TemplateResponse represents the response when getting a template.
|
||||
type TemplateResponse struct {
|
||||
IndexTemplates []struct {
|
||||
Name string `json:"name"`
|
||||
IndexTemplate Template `json:"index_template"`
|
||||
} `json:"index_templates"`
|
||||
}
|
||||
|
||||
// PutTemplate creates or updates an index template.
|
||||
func PutTemplate(ctx context.Context, c *client.Client, name string, template *Template) error {
|
||||
if err := validateTemplate(template); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if name == "" {
|
||||
return fmt.Errorf("%w: template name is required", ErrInvalidTemplate)
|
||||
}
|
||||
|
||||
body, err := json.Marshal(template)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal template: %w", err)
|
||||
}
|
||||
|
||||
req := opensearchapi.IndicesPutIndexTemplateRequest{
|
||||
Name: name,
|
||||
Body: bytes.NewReader(body),
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, c.GetClient())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute put template request: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := res.Body.Close(); closeErr != nil {
|
||||
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
||||
}
|
||||
}()
|
||||
|
||||
if res.IsError() {
|
||||
bodyBytes, _ := io.ReadAll(res.Body)
|
||||
return fmt.Errorf("put template failed with status %s: %s", res.Status(), string(bodyBytes))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetTemplate retrieves an index template by name.
|
||||
func GetTemplate(ctx context.Context, c *client.Client, name string) (*Template, error) {
|
||||
if name == "" {
|
||||
return nil, fmt.Errorf("%w: template name is required", ErrInvalidTemplate)
|
||||
}
|
||||
|
||||
req := opensearchapi.IndicesGetIndexTemplateRequest{
|
||||
Name: []string{name},
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, c.GetClient())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to execute get template request: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := res.Body.Close(); closeErr != nil {
|
||||
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
||||
}
|
||||
}()
|
||||
|
||||
if res.IsError() {
|
||||
if res.StatusCode == 404 {
|
||||
return nil, ErrTemplateNotFound
|
||||
}
|
||||
bodyBytes, _ := io.ReadAll(res.Body)
|
||||
return nil, fmt.Errorf("get template failed with status %s: %s", res.Status(), string(bodyBytes))
|
||||
}
|
||||
|
||||
var response TemplateResponse
|
||||
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode template response: %w", err)
|
||||
}
|
||||
|
||||
if len(response.IndexTemplates) == 0 {
|
||||
return nil, ErrTemplateNotFound
|
||||
}
|
||||
|
||||
return &response.IndexTemplates[0].IndexTemplate, nil
|
||||
}
|
||||
|
||||
// DeleteTemplate deletes an index template.
|
||||
func DeleteTemplate(ctx context.Context, c *client.Client, name string) error {
|
||||
if name == "" {
|
||||
return fmt.Errorf("%w: template name is required", ErrInvalidTemplate)
|
||||
}
|
||||
|
||||
req := opensearchapi.IndicesDeleteIndexTemplateRequest{
|
||||
Name: name,
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, c.GetClient())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute delete template request: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := res.Body.Close(); closeErr != nil {
|
||||
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
||||
}
|
||||
}()
|
||||
|
||||
if res.IsError() {
|
||||
if res.StatusCode == 404 {
|
||||
return ErrTemplateNotFound
|
||||
}
|
||||
bodyBytes, _ := io.ReadAll(res.Body)
|
||||
return fmt.Errorf("delete template failed with status %s: %s", res.Status(), string(bodyBytes))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListTemplates retrieves all index templates or templates matching a pattern.
|
||||
func ListTemplates(ctx context.Context, c *client.Client, names ...string) (map[string]*Template, error) {
|
||||
var nameList []string
|
||||
if len(names) > 0 {
|
||||
nameList = names
|
||||
}
|
||||
|
||||
req := opensearchapi.IndicesGetIndexTemplateRequest{
|
||||
Name: nameList,
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, c.GetClient())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to execute list templates request: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := res.Body.Close(); closeErr != nil {
|
||||
fmt.Fprintf(os.Stderr, "warning: failed to close response body: %v\n", closeErr)
|
||||
}
|
||||
}()
|
||||
|
||||
if res.IsError() {
|
||||
if res.StatusCode == 404 {
|
||||
return make(map[string]*Template), nil
|
||||
}
|
||||
bodyBytes, _ := io.ReadAll(res.Body)
|
||||
return nil, fmt.Errorf("list templates failed with status %s: %s", res.Status(), string(bodyBytes))
|
||||
}
|
||||
|
||||
var response TemplateResponse
|
||||
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode templates response: %w", err)
|
||||
}
|
||||
|
||||
templates := make(map[string]*Template)
|
||||
for _, item := range response.IndexTemplates {
|
||||
templates[item.Name] = &item.IndexTemplate
|
||||
}
|
||||
|
||||
return templates, nil
|
||||
}
|
||||
|
||||
// validateTemplate validates the template configuration.
|
||||
func validateTemplate(template *Template) error {
|
||||
if template == nil {
|
||||
return fmt.Errorf("%w: template cannot be nil", ErrInvalidTemplate)
|
||||
}
|
||||
|
||||
if len(template.IndexPatterns) == 0 {
|
||||
return fmt.Errorf("%w: index patterns are required", ErrInvalidTemplate)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
180
operations/index/template_test.go
Normal file
180
operations/index/template_test.go
Normal file
@@ -0,0 +1,180 @@
|
||||
package index
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"es-demo/client"
|
||||
)
|
||||
|
||||
func TestPutTemplate(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
cfg := &client.Config{
|
||||
Endpoint: "https://example.com",
|
||||
Region: "us-east-1",
|
||||
AccessKey: "test-key",
|
||||
SecretKey: "test-secret",
|
||||
}
|
||||
|
||||
c, err := client.NewClient(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create client: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
template := &Template{
|
||||
IndexPatterns: []string{"test-*"},
|
||||
Settings: map[string]any{
|
||||
"number_of_shards": 1,
|
||||
"number_of_replicas": 0,
|
||||
},
|
||||
Mappings: map[string]any{
|
||||
"properties": map[string]any{
|
||||
"timestamp": map[string]any{
|
||||
"type": "date",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// This will fail without a real cluster
|
||||
err = PutTemplate(ctx, c, "test-template", template)
|
||||
if err == nil {
|
||||
t.Log("PutTemplate succeeded (unexpected in unit test)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetTemplate(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
cfg := &client.Config{
|
||||
Endpoint: "https://example.com",
|
||||
Region: "us-east-1",
|
||||
AccessKey: "test-key",
|
||||
SecretKey: "test-secret",
|
||||
}
|
||||
|
||||
c, err := client.NewClient(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create client: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// This will fail without a real cluster
|
||||
_, err = GetTemplate(ctx, c, "test-template")
|
||||
if err == nil {
|
||||
t.Log("GetTemplate succeeded (unexpected in unit test)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteTemplate(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
cfg := &client.Config{
|
||||
Endpoint: "https://example.com",
|
||||
Region: "us-east-1",
|
||||
AccessKey: "test-key",
|
||||
SecretKey: "test-secret",
|
||||
}
|
||||
|
||||
c, err := client.NewClient(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create client: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// This will fail without a real cluster
|
||||
err = DeleteTemplate(ctx, c, "test-template")
|
||||
if err == nil {
|
||||
t.Log("DeleteTemplate succeeded (unexpected in unit test)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestListTemplates(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
cfg := &client.Config{
|
||||
Endpoint: "https://example.com",
|
||||
Region: "us-east-1",
|
||||
AccessKey: "test-key",
|
||||
SecretKey: "test-secret",
|
||||
}
|
||||
|
||||
c, err := client.NewClient(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create client: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// This will fail without a real cluster
|
||||
_, err = ListTemplates(ctx, c)
|
||||
if err == nil {
|
||||
t.Log("ListTemplates succeeded (unexpected in unit test)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTemplate_Validate(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
template *Template
|
||||
wantErr error
|
||||
}{
|
||||
{
|
||||
name: "valid template",
|
||||
template: &Template{
|
||||
IndexPatterns: []string{"logs-*"},
|
||||
Settings: map[string]any{
|
||||
"number_of_shards": 1,
|
||||
},
|
||||
},
|
||||
wantErr: nil,
|
||||
},
|
||||
{
|
||||
name: "nil template",
|
||||
template: nil,
|
||||
wantErr: ErrInvalidTemplate,
|
||||
},
|
||||
{
|
||||
name: "empty index patterns",
|
||||
template: &Template{
|
||||
IndexPatterns: []string{},
|
||||
},
|
||||
wantErr: ErrInvalidTemplate,
|
||||
},
|
||||
{
|
||||
name: "nil index patterns",
|
||||
template: &Template{
|
||||
IndexPatterns: nil,
|
||||
},
|
||||
wantErr: ErrInvalidTemplate,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := validateTemplate(tt.template)
|
||||
if !errors.Is(err, tt.wantErr) {
|
||||
t.Errorf("validateTemplate() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user