322 lines
9.3 KiB
Go
322 lines
9.3 KiB
Go
package services
|
||
|
||
import (
|
||
"bytes"
|
||
"encoding/json"
|
||
"fmt"
|
||
"goalfymax-admin/pkg/utils"
|
||
"io"
|
||
"net/http"
|
||
"time"
|
||
|
||
"goalfymax-admin/internal/config"
|
||
"goalfymax-admin/internal/models"
|
||
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
// GatewayClient 网关客户端
|
||
type GatewayClient struct {
|
||
baseURL string
|
||
timeout time.Duration
|
||
logger *utils.Logger
|
||
token string
|
||
}
|
||
|
||
// NewGatewayClient 创建网关客户端
|
||
func NewGatewayClient(baseURL string, timeout time.Duration, logger *utils.Logger) *GatewayClient {
|
||
return &GatewayClient{
|
||
baseURL: baseURL,
|
||
timeout: timeout,
|
||
logger: logger,
|
||
}
|
||
}
|
||
|
||
// acquireToken 从配置的登录接口获取新的网关 token(不做过期判定)
|
||
func (c *GatewayClient) acquireToken() (string, error) {
|
||
return "admin_control_0807", nil
|
||
|
||
cfg := config.GetConfig()
|
||
loginURL := cfg.Gateway.Auth.LoginURL
|
||
key := cfg.Gateway.Auth.Key
|
||
payload, _ := json.Marshal(map[string]string{"key": key})
|
||
req, err := http.NewRequest("POST", loginURL, bytes.NewBuffer(payload))
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
req.Header.Set("Content-Type", "application/json")
|
||
client := &http.Client{Timeout: c.timeout}
|
||
resp, err := client.Do(req)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
defer resp.Body.Close()
|
||
body, _ := io.ReadAll(resp.Body)
|
||
if resp.StatusCode != http.StatusOK {
|
||
return "", fmt.Errorf("login status: %d %s", resp.StatusCode, string(body))
|
||
}
|
||
var out struct {
|
||
Success bool `json:"success"`
|
||
Token string `json:"token"`
|
||
}
|
||
if err := json.Unmarshal(body, &out); err != nil {
|
||
return "", err
|
||
}
|
||
if !out.Success || out.Token == "" {
|
||
return "", fmt.Errorf("login failed: %s", string(body))
|
||
}
|
||
c.logger.Info("login succeeded", zap.String("token", out.Token))
|
||
c.token = out.Token
|
||
return c.token, nil
|
||
}
|
||
|
||
// doWithAuth 发送请求,自动注入token;若401则重取token并重试一次
|
||
func (c *GatewayClient) doWithAuth(req *http.Request) (*http.Response, error) {
|
||
if c.token == "" {
|
||
var err error
|
||
if c.token, err = c.acquireToken(); err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
req.Header.Set("Authorization", "Bearer "+c.token)
|
||
client := &http.Client{Timeout: c.timeout}
|
||
resp, err := client.Do(req)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if resp.StatusCode == http.StatusUnauthorized {
|
||
// 读尽响应体以复用连接
|
||
io.Copy(io.Discard, resp.Body)
|
||
resp.Body.Close()
|
||
if _, err := c.acquireToken(); err != nil {
|
||
return nil, err
|
||
}
|
||
// 重试一次
|
||
// 重新构建请求体(仅当是可重读的bytes.Buffer);这里假设上层构造的Body为bytes.Buffer或nil
|
||
// 如果是一次性流,上层应改为传入可重读体
|
||
if req.GetBody != nil {
|
||
bodyRc, _ := req.GetBody()
|
||
req.Body = bodyRc
|
||
}
|
||
req.Header.Set("Authorization", "Bearer "+c.token)
|
||
return client.Do(req)
|
||
}
|
||
return resp, nil
|
||
}
|
||
|
||
// GetQuotaHistory 获取配额历史数据
|
||
func (c *GatewayClient) GetQuotaHistory(req *models.QuotaHistoryRequest) (*models.QuotaHistoryResponse, error) {
|
||
// 构建请求URL
|
||
url := fmt.Sprintf("%s/aigateway-admin/api/quotas/history", c.baseURL)
|
||
|
||
// 序列化请求数据
|
||
jsonData, err := json.Marshal(req)
|
||
if err != nil {
|
||
c.logger.Error("序列化请求数据失败", zap.Error(err))
|
||
return nil, fmt.Errorf("序列化请求数据失败: %w", err)
|
||
}
|
||
|
||
// 创建HTTP请求
|
||
httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||
if err != nil {
|
||
c.logger.Error("创建HTTP请求失败", zap.Error(err))
|
||
return nil, fmt.Errorf("创建HTTP请求失败: %w", err)
|
||
}
|
||
|
||
// 设置请求头
|
||
httpReq.Header.Set("Content-Type", "application/json")
|
||
httpReq.Header.Set("Accept", "application/json")
|
||
|
||
// 发送请求
|
||
c.logger.Debug("发送配额历史查询请求",
|
||
zap.String("url", url),
|
||
zap.String("data", string(jsonData)),
|
||
)
|
||
|
||
resp, err := c.doWithAuth(httpReq)
|
||
if err != nil {
|
||
c.logger.Error("发送HTTP请求失败", zap.Error(err))
|
||
return nil, fmt.Errorf("发送HTTP请求失败: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
// 读取响应
|
||
body, err := io.ReadAll(resp.Body)
|
||
if err != nil {
|
||
c.logger.Error("读取响应数据失败", zap.Error(err))
|
||
return nil, fmt.Errorf("读取响应数据失败: %w", err)
|
||
}
|
||
|
||
// 检查HTTP状态码
|
||
if resp.StatusCode != http.StatusOK {
|
||
c.logger.Error("网关返回错误状态码",
|
||
zap.Int("status_code", resp.StatusCode),
|
||
zap.String("response", string(body)),
|
||
)
|
||
return nil, fmt.Errorf("网关返回错误状态码: %d", resp.StatusCode)
|
||
}
|
||
|
||
// 解析响应
|
||
var response models.QuotaHistoryResponse
|
||
if err := json.Unmarshal(body, &response); err != nil {
|
||
c.logger.Error("解析响应数据失败", zap.Error(err))
|
||
return nil, fmt.Errorf("解析响应数据失败: %w", err)
|
||
}
|
||
|
||
c.logger.Info("配额历史查询成功",
|
||
zap.Int("data_count", len(response.Data)),
|
||
zap.Bool("success", response.Success),
|
||
)
|
||
|
||
return &response, nil
|
||
}
|
||
|
||
// GetQuotaRules 获取配额规则列表(代理到网关),携带Authorization
|
||
func (c *GatewayClient) GetQuotaRules(authToken string) (*models.QuotaRulesResponse, error) {
|
||
url := fmt.Sprintf("%s/aigateway-admin/api/quotas/rules", c.baseURL)
|
||
|
||
httpReq, err := http.NewRequest("GET", url, nil)
|
||
if err != nil {
|
||
c.logger.Error("创建HTTP请求失败", zap.Error(err))
|
||
return nil, fmt.Errorf("创建HTTP请求失败: %w", err)
|
||
}
|
||
httpReq.Header.Set("Accept", "application/json")
|
||
if authToken != "" {
|
||
httpReq.Header.Set("Authorization", authToken)
|
||
}
|
||
|
||
c.logger.Debug("请求配额规则列表", zap.String("url", url))
|
||
resp, err := c.doWithAuth(httpReq)
|
||
if err != nil {
|
||
c.logger.Error("发送HTTP请求失败", zap.Error(err))
|
||
return nil, fmt.Errorf("发送HTTP请求失败: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
body, err := io.ReadAll(resp.Body)
|
||
if err != nil {
|
||
c.logger.Error("读取响应数据失败", zap.Error(err))
|
||
return nil, fmt.Errorf("读取响应数据失败: %w", err)
|
||
}
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
c.logger.Error("网关返回错误状态码", zap.Int("status_code", resp.StatusCode), zap.String("response", string(body)))
|
||
return nil, fmt.Errorf("网关返回错误状态码: %d", resp.StatusCode)
|
||
}
|
||
|
||
var response models.QuotaRulesResponse
|
||
if err := json.Unmarshal(body, &response); err != nil {
|
||
c.logger.Error("解析响应数据失败", zap.Error(err))
|
||
return nil, fmt.Errorf("解析响应数据失败: %w", err)
|
||
}
|
||
|
||
c.logger.Info("获取配额规则成功")
|
||
return &response, nil
|
||
}
|
||
|
||
// CreateQuotaRule 创建配额规则(代理网关)
|
||
func (c *GatewayClient) CreateQuotaRule(authToken string, body any) (*models.QuotaRulesResponse, error) {
|
||
url := fmt.Sprintf("%s/aigateway-admin/api/quotas/rules", c.baseURL)
|
||
payload, _ := json.Marshal(body)
|
||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(payload))
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
req.Header.Set("Content-Type", "application/json")
|
||
if authToken != "" {
|
||
req.Header.Set("Authorization", authToken)
|
||
}
|
||
resp, err := c.doWithAuth(req)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer resp.Body.Close()
|
||
b, _ := io.ReadAll(resp.Body)
|
||
if resp.StatusCode != http.StatusOK {
|
||
return nil, fmt.Errorf("gateway status: %d %s", resp.StatusCode, string(b))
|
||
}
|
||
var out models.QuotaRulesResponse
|
||
if err := json.Unmarshal(b, &out); err != nil {
|
||
return nil, err
|
||
}
|
||
return &out, nil
|
||
}
|
||
|
||
// UpdateQuotaRule 更新配额规则(代理网关)
|
||
func (c *GatewayClient) UpdateQuotaRule(authToken string, id string, body any) (*models.QuotaRulesResponse, error) {
|
||
url := fmt.Sprintf("%s/aigateway-admin/api/quotas/rules/%s", c.baseURL, id)
|
||
payload, _ := json.Marshal(body)
|
||
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(payload))
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
req.Header.Set("Content-Type", "application/json")
|
||
if authToken != "" {
|
||
req.Header.Set("Authorization", authToken)
|
||
}
|
||
resp, err := c.doWithAuth(req)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer resp.Body.Close()
|
||
b, _ := io.ReadAll(resp.Body)
|
||
if resp.StatusCode != http.StatusOK {
|
||
return nil, fmt.Errorf("gateway status: %d %s", resp.StatusCode, string(b))
|
||
}
|
||
var out models.QuotaRulesResponse
|
||
if err := json.Unmarshal(b, &out); err != nil {
|
||
return nil, err
|
||
}
|
||
return &out, nil
|
||
}
|
||
|
||
// DeleteQuotaRule 删除配额规则(代理网关)
|
||
func (c *GatewayClient) DeleteQuotaRule(authToken string, id string) (*models.QuotaRulesResponse, error) {
|
||
url := fmt.Sprintf("%s/aigateway-admin/api/quotas/rules/%s", c.baseURL, id)
|
||
req, err := http.NewRequest("DELETE", url, nil)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if authToken != "" {
|
||
req.Header.Set("Authorization", authToken)
|
||
}
|
||
resp, err := c.doWithAuth(req)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer resp.Body.Close()
|
||
b, _ := io.ReadAll(resp.Body)
|
||
if resp.StatusCode != http.StatusOK {
|
||
return nil, fmt.Errorf("gateway status: %d %s", resp.StatusCode, string(b))
|
||
}
|
||
var out models.QuotaRulesResponse
|
||
if err := json.Unmarshal(b, &out); err != nil {
|
||
return nil, err
|
||
}
|
||
return &out, nil
|
||
}
|
||
|
||
// HealthCheck 健康检查
|
||
func (c *GatewayClient) HealthCheck() error {
|
||
url := fmt.Sprintf("%s/aigateway-admin/health", c.baseURL)
|
||
|
||
client := &http.Client{
|
||
Timeout: c.timeout,
|
||
}
|
||
|
||
resp, err := client.Get(url)
|
||
if err != nil {
|
||
c.logger.Error("网关健康检查失败", zap.Error(err))
|
||
return fmt.Errorf("网关健康检查失败: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
c.logger.Error("网关健康检查返回错误状态码", zap.Int("status_code", resp.StatusCode))
|
||
return fmt.Errorf("网关健康检查返回错误状态码: %d", resp.StatusCode)
|
||
}
|
||
|
||
c.logger.Info("网关健康检查成功")
|
||
return nil
|
||
}
|