527 lines
16 KiB
Go
527 lines
16 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"crypto/sha512"
|
|
"fmt"
|
|
"html/template"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/IBM/sarama"
|
|
"github.com/xdg/scram"
|
|
)
|
|
|
|
// SCRAMClientGeneratorFunc 必须指向一个返回 sarama.SCRAMClient 接口的函数
|
|
// 这里的 func() sarama.SCRAMClient { ... } 就是所需的实现。
|
|
func createScramClient() sarama.SCRAMClient {
|
|
// 传入用于 SCRAM 认证的 Hash 算法,这里是 SHA-512
|
|
return &XDGSCRAMClient{HashGeneratorFcn: sha512.New}
|
|
}
|
|
|
|
// XDGSCRAMClient 是对 github.com/xdg/scram/client 库的包装
|
|
// 使得它符合 Sarama 的 sarama.SCRAMClient 接口
|
|
type XDGSCRAMClient struct {
|
|
*scram.Client
|
|
*scram.ClientConversation
|
|
scram.HashGeneratorFcn
|
|
}
|
|
|
|
// Begin 用于启动 SCRAM 认证流程
|
|
func (x *XDGSCRAMClient) Begin(userName, password, salt string) (err error) {
|
|
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, salt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
x.ClientConversation = x.Client.NewConversation()
|
|
return nil
|
|
}
|
|
|
|
// Step 用于处理 SCRAM 认证中的每一步挑战和响应
|
|
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
|
|
response, err = x.ClientConversation.Step(challenge)
|
|
return
|
|
}
|
|
|
|
// Done 检查 SCRAM 认证是否完成
|
|
func (x *XDGSCRAMClient) Done() bool {
|
|
return x.ClientConversation.Done()
|
|
}
|
|
|
|
// --- 配置常量 ---
|
|
const (
|
|
// 假设 Kafka 运行在宿主机的 9092 端口
|
|
// BrokerList = "localhost:9092"
|
|
// Topic = "test-topic"
|
|
BrokerList = "k8s-kafka-kafkaclu-ea0121c7a5-0f29d8348027ca52.elb.us-west-2.amazonaws.com:9094"
|
|
// prd
|
|
// BrokerList = "k8s-middlewa-kafkakaf-8362de7238-60b500b1ed68bc69.elb.us-west-2.amazonaws.com:9094"
|
|
Topic = "message-users"
|
|
GroupID = "go-sarama-consumer-group-v1" // 消费组 ID
|
|
)
|
|
|
|
// --- Web 服务相关常量 ---
|
|
const (
|
|
WebServerAddress = ":8080" // Web 服务监听的地址和端口
|
|
)
|
|
|
|
// 初始化 Sarama 配置
|
|
func newConfig() *sarama.Config {
|
|
config := sarama.NewConfig()
|
|
// 假设您使用的 Kafka 版本为 4.0.0 或更高 (Sarama 库会兼容)
|
|
config.Version = sarama.V4_0_0_0 // 请根据您的 Kafka 版本进行调整
|
|
// 生产者配置
|
|
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
|
|
config.Producer.Retry.Max = 5
|
|
config.Producer.Return.Successes = true // 必须开启,以便同步生产者可以等待确认
|
|
// 消费者配置:使用 Consumer Group 模式
|
|
if Topic == "message-users" {
|
|
// 2. 启用 SASL
|
|
config.Net.SASL.Enable = true
|
|
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
|
return createScramClient()
|
|
}
|
|
|
|
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
|
|
|
// 4. 设置用户名和密码
|
|
config.Net.SASL.User = "kafka-user"
|
|
config.Net.SASL.Password = "Nrp0vwxgFJXfydV8zISPxjp8RSETjOEq"
|
|
// prd
|
|
// config.Net.SASL.Password = "oP0rACG2ka5EBVWfuAyR1fslH4yAlLZX"
|
|
}
|
|
|
|
config.Consumer.Return.Errors = true
|
|
config.Consumer.Group.Session.Timeout = 20 * time.Second
|
|
config.Metadata.Timeout = 15 * time.Second
|
|
return config
|
|
}
|
|
|
|
// --------------------------------------------------------------------------------------
|
|
// 生产者逻辑
|
|
// --------------------------------------------------------------------------------------
|
|
func startProducer(config *sarama.Config) {
|
|
// 使用同步生产者 (SyncProducer)
|
|
producer, err := sarama.NewSyncProducer([]string{BrokerList}, config)
|
|
if err != nil {
|
|
log.Printf("Error creating sync producer: %v", err) // 改为 Printf 以避免程序退出
|
|
return
|
|
}
|
|
defer producer.Close()
|
|
log.Println("Producer started. Sending messages from long1.txt...")
|
|
|
|
f, err := os.Open("./long1.txt")
|
|
if err != nil {
|
|
log.Printf("failed to read file: %v", err) // 改为 Printf
|
|
return
|
|
}
|
|
defer f.Close()
|
|
|
|
bs, _ := io.ReadAll(f)
|
|
str := string(bs)
|
|
lines := strings.Split(str, "\n")
|
|
|
|
messageID := 0
|
|
// 循环发送消息
|
|
for _, line := range lines {
|
|
// 忽略空行
|
|
if strings.TrimSpace(line) == "" {
|
|
continue
|
|
}
|
|
|
|
// 1. 构造 ProducerRecord
|
|
msg := &sarama.ProducerMessage{
|
|
Topic: Topic,
|
|
Key: sarama.StringEncoder(fmt.Sprintf("Key-%d", messageID)),
|
|
Value: sarama.StringEncoder(line),
|
|
}
|
|
|
|
// 2. 发送消息
|
|
partition, offset, err := producer.SendMessage(msg)
|
|
if err != nil {
|
|
log.Printf("Failed to send message: %v", err)
|
|
} else {
|
|
log.Printf("Producer sent: Topic=%s | Partition=%d | Offset=%d | Value='%s'", Topic, partition, offset, line)
|
|
messageID++
|
|
}
|
|
|
|
// 3. 暂停 (根据需求调整)
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
log.Println("Producer finished sending all messages.")
|
|
}
|
|
|
|
// fetchPartitionMessages connects to a specific partition starting from the given offset,
|
|
// writes up to limit messages into outputPath (one per line), then returns.
|
|
// If filterKey is not empty, only messages with matching key are written.
|
|
// If withTimestamp is true, prepends formatted timestamp to each message.
|
|
func fetchPartitionMessages(config *sarama.Config, partition int32, offset int64, limit int, outputPath string, filterKey string, withTimestamp bool) error {
|
|
if limit <= 0 {
|
|
return fmt.Errorf("limit must be > 0")
|
|
}
|
|
|
|
client, err := sarama.NewClient([]string{BrokerList}, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
consumer, err := sarama.NewConsumerFromClient(client)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer consumer.Close()
|
|
|
|
pc, err := consumer.ConsumePartition(Topic, partition, offset)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer pc.Close()
|
|
|
|
outFile, err := os.Create(outputPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer outFile.Close()
|
|
|
|
writer := bufio.NewWriter(outFile)
|
|
defer writer.Flush()
|
|
|
|
received := 0
|
|
scanned := 0
|
|
filterEnabled := filterKey != ""
|
|
|
|
// 设置超时和最大扫描数
|
|
timeout := time.After(15 * time.Second)
|
|
maxScan := limit * 1000 // 如果过滤模式下扫描了这么多消息还没找到足够的匹配,就放弃
|
|
if !filterEnabled {
|
|
maxScan = limit // 非过滤模式下,扫描数就是 limit
|
|
}
|
|
|
|
for received < limit && scanned < maxScan {
|
|
select {
|
|
case <-timeout:
|
|
log.Printf("Fetch timeout after 30s. Retrieved %d messages (scanned %d)", received, scanned)
|
|
return nil // 超时不算错误,返回已获取的消息
|
|
case err, ok := <-pc.Errors():
|
|
if !ok {
|
|
return fmt.Errorf("partition consumer error channel closed")
|
|
}
|
|
if err != nil {
|
|
return err.Err
|
|
}
|
|
case msg, ok := <-pc.Messages():
|
|
if !ok {
|
|
log.Printf("Partition consumer closed. Retrieved %d messages (scanned %d)", received, scanned)
|
|
return nil // channel 关闭说明没有更多消息了
|
|
}
|
|
if msg == nil {
|
|
continue
|
|
}
|
|
|
|
scanned++
|
|
|
|
// 如果启用了 key 过滤,检查 key 是否匹配
|
|
if filterEnabled {
|
|
msgKey := string(msg.Key)
|
|
if msgKey != filterKey {
|
|
continue // 跳过不匹配的消息
|
|
}
|
|
log.Printf("Found matching message %d/%d (scanned %d total)", received+1, limit, scanned)
|
|
}
|
|
|
|
// 构造要写入的内容
|
|
var line string
|
|
if withTimestamp {
|
|
// 格式化时间戳为易读格式: 2006-01-02 15:04:05.000
|
|
timestampStr := msg.Timestamp.Format("2006-01-02 15:04:05.000")
|
|
line = fmt.Sprintf("%s %s\n", timestampStr, string(msg.Value))
|
|
} else {
|
|
line = string(msg.Value) + "\n"
|
|
}
|
|
|
|
if _, err := writer.WriteString(line); err != nil {
|
|
return err
|
|
}
|
|
received++
|
|
}
|
|
}
|
|
|
|
if scanned >= maxScan && received < limit {
|
|
log.Printf("Reached max scan limit (%d messages scanned). Retrieved %d matching messages", scanned, received)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// --------------------------------------------------------------------------------------
|
|
// 消费者逻辑 (保持不变)
|
|
// --------------------------------------------------------------------------------------
|
|
// ConsumerHandler 结构体实现了 sarama.ConsumerGroupHandler 接口
|
|
type ConsumerHandler struct{}
|
|
|
|
// Setup 是在新会话开始时运行的,在 ConsumeClaim 之前
|
|
func (consumer *ConsumerHandler) Setup(sarama.ConsumerGroupSession) error {
|
|
log.Println("Consumer Group session setup complete.")
|
|
return nil
|
|
}
|
|
|
|
// Cleanup 是在会话结束时运行的,在所有 ConsumeClaim 协程退出后
|
|
func (consumer *ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
|
|
log.Println("Consumer Group session cleanup complete.")
|
|
return nil
|
|
}
|
|
|
|
// ConsumeClaim 必须启动一个消费循环,处理分配给当前 consumer 的分区消息
|
|
func (consumer *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
for message := range claim.Messages() {
|
|
log.Printf("Consumer Received: Topic=%s | Partition=%d | Offset=%d | Value='%s' (Timestamp: %v)",
|
|
message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp)
|
|
session.MarkMessage(message, "")
|
|
}
|
|
log.Printf("Consumer for Partition %d stopped or rebalancing...", claim.Partition())
|
|
return nil
|
|
}
|
|
|
|
func startConsumer(config *sarama.Config) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
group, err := sarama.NewConsumerGroup([]string{BrokerList}, GroupID, config)
|
|
if err != nil {
|
|
log.Fatalf("Error creating consumer group client: %v", err)
|
|
}
|
|
defer group.Close()
|
|
|
|
log.Printf("Consumer Group '%s' started. Listening on topic '%s'...", GroupID, Topic)
|
|
wg := &sync.WaitGroup{}
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
handler := ConsumerHandler{}
|
|
for {
|
|
if err := group.Consume(ctx, []string{Topic}, &handler); err != nil {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
log.Printf("Error from consumer group: %v", err)
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
}
|
|
}()
|
|
|
|
signals := make(chan os.Signal, 1)
|
|
signal.Notify(signals, os.Interrupt)
|
|
if _, ok := <-signals; ok {
|
|
log.Println("Interrupt signal received. Shutting down consumer...")
|
|
cancel()
|
|
}
|
|
|
|
wg.Wait()
|
|
log.Println("Consumer shutdown complete.")
|
|
}
|
|
|
|
// --------------------------------------------------------------------------------------
|
|
// Web 服务逻辑
|
|
// --------------------------------------------------------------------------------------
|
|
|
|
// handleRoot 处理根路径请求,显示操作页面
|
|
func handleRoot(w http.ResponseWriter, r *http.Request) {
|
|
// 定义一个简单的 HTML 页面模板
|
|
const htmlTemplate = `
|
|
<!DOCTYPE html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<title>Kafka Web Producer</title>
|
|
<style>
|
|
body { font-family: Arial, sans-serif; display: flex; justify-content: center; align-items: center; height: 100vh; margin: 0; background-color: #f4f4f4; }
|
|
.container { text-align: center; }
|
|
#startButton {
|
|
padding: 15px 30px;
|
|
font-size: 18px;
|
|
cursor: pointer;
|
|
background-color: #4CAF50;
|
|
color: white;
|
|
border: none;
|
|
border-radius: 5px;
|
|
transition: background-color 0.3s;
|
|
}
|
|
#startButton:hover { background-color: #45a049; }
|
|
#status { margin-top: 20px; font-size: 16px; color: #333; }
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="container">
|
|
<h1>Kafka 消息生产者</h1>
|
|
<button id="startButton">开始发送消息</button>
|
|
<p id="status"></p>
|
|
</div>
|
|
<script>
|
|
document.getElementById('startButton').addEventListener('click', function() {
|
|
const statusElem = document.getElementById('status');
|
|
statusElem.textContent = '正在请求服务器开始发送消息...';
|
|
|
|
fetch('/start-producer', { method: 'POST' })
|
|
.then(response => response.json())
|
|
.then(data => {
|
|
console.log('Success:', data);
|
|
statusElem.textContent = data.message;
|
|
})
|
|
.catch((error) => {
|
|
console.error('Error:', error);
|
|
statusElem.textContent = '请求失败: ' + error;
|
|
});
|
|
});
|
|
</script>
|
|
</body>
|
|
</html>
|
|
`
|
|
// 解析并执行模板
|
|
tmpl, err := template.New("index").Parse(htmlTemplate)
|
|
if err != nil {
|
|
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
tmpl.Execute(w, nil)
|
|
}
|
|
|
|
// handleStartProducer 处理开始生产消息的请求
|
|
func handleStartProducer(config *sarama.Config) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
log.Println("Received request to start producer...")
|
|
|
|
// 使用 goroutine 在后台运行生产者,避免阻塞 HTTP 响应
|
|
go startProducer(config)
|
|
|
|
// 立即返回响应给前端
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
fmt.Fprintln(w, `{"message": "生产者已在后台启动,请查看服务器日志确认消息发送状态。"}`)
|
|
}
|
|
}
|
|
|
|
// startWebServer 启动 Web 服务
|
|
func startWebServer(config *sarama.Config) {
|
|
// 注册 HTTP 路由处理器
|
|
http.HandleFunc("/", handleRoot)
|
|
http.HandleFunc("/start-producer", handleStartProducer(config))
|
|
|
|
log.Printf("Web server starting on %s...", WebServerAddress)
|
|
log.Println("Visit http://localhost:8080 in your browser.")
|
|
|
|
// 启动 HTTP 服务器
|
|
if err := http.ListenAndServe(WebServerAddress, nil); err != nil {
|
|
log.Fatalf("Failed to start web server: %v", err)
|
|
}
|
|
}
|
|
|
|
// --------------------------------------------------------------------------------------
|
|
// 主函数与命令行处理
|
|
// --------------------------------------------------------------------------------------
|
|
func main() {
|
|
if len(os.Args) < 2 {
|
|
fmt.Println("Usage: go run main.go <web|consumer|fetch>")
|
|
fmt.Println(" web: Starts the web server to trigger the producer.")
|
|
fmt.Println(" consumer: Starts the Kafka consumer.")
|
|
fmt.Println(" fetch <partition> <offset|oldest|latest> <count> <output-file> [key] [--with-timestamp]: Fetches messages to a file.")
|
|
fmt.Println(" If [key] is provided, only messages with that key will be fetched.")
|
|
fmt.Println(" If --with-timestamp is provided, timestamp will be prepended to each message.")
|
|
return
|
|
}
|
|
|
|
// 初始化配置
|
|
config := newConfig()
|
|
|
|
role := strings.ToLower(os.Args[1])
|
|
switch role {
|
|
case "web":
|
|
startWebServer(config) // 启动 Web 服务
|
|
case "consumer":
|
|
startConsumer(config) // 启动消费者
|
|
case "fetch":
|
|
if len(os.Args) < 6 {
|
|
fmt.Println("Usage: go run main.go fetch <partition> <offset|oldest|latest> <count> <output-file> [key] [--with-timestamp]")
|
|
return
|
|
}
|
|
|
|
partitionVal, err := strconv.Atoi(os.Args[2])
|
|
if err != nil {
|
|
log.Fatalf("invalid partition: %v", err)
|
|
}
|
|
|
|
offsetArg := strings.ToLower(os.Args[3])
|
|
var offset int64
|
|
switch offsetArg {
|
|
case "oldest":
|
|
offset = sarama.OffsetOldest
|
|
case "latest":
|
|
offset = sarama.OffsetNewest
|
|
default:
|
|
offset, err = strconv.ParseInt(offsetArg, 10, 64)
|
|
if err != nil {
|
|
log.Fatalf("invalid offset: %v", err)
|
|
}
|
|
}
|
|
|
|
limit, err := strconv.Atoi(os.Args[4])
|
|
if err != nil {
|
|
log.Fatalf("invalid count: %v", err)
|
|
}
|
|
|
|
outputPath := os.Args[5]
|
|
|
|
// 可选的 key 参数和 timestamp 标志
|
|
var filterKey string
|
|
withTimestamp := false
|
|
|
|
// 检查剩余的参数
|
|
for i := 6; i < len(os.Args); i++ {
|
|
arg := os.Args[i]
|
|
if arg == "--with-timestamp" || arg == "-t" {
|
|
withTimestamp = true
|
|
} else if filterKey == "" {
|
|
// 第一个非标志参数作为 filterKey
|
|
filterKey = arg
|
|
}
|
|
}
|
|
|
|
if filterKey != "" {
|
|
log.Printf("Filtering by key: %s", filterKey)
|
|
}
|
|
if withTimestamp {
|
|
log.Printf("Will include timestamps in output")
|
|
}
|
|
|
|
if err := fetchPartitionMessages(config, int32(partitionVal), offset, limit, outputPath, filterKey, withTimestamp); err != nil {
|
|
log.Fatalf("fetch failed: %v", err)
|
|
}
|
|
|
|
offsetDesc := offsetArg
|
|
if offsetArg != "oldest" && offsetArg != "latest" {
|
|
offsetDesc = fmt.Sprintf("%d", offset)
|
|
}
|
|
if filterKey != "" {
|
|
log.Printf("Fetched %d messages with key '%s' from partition %d starting at %s into %s", limit, filterKey, partitionVal, offsetDesc, outputPath)
|
|
} else {
|
|
log.Printf("Fetched %d messages from partition %d starting at %s into %s", limit, partitionVal, offsetDesc, outputPath)
|
|
}
|
|
default:
|
|
fmt.Println("Invalid argument. Usage: go run main.go <web|consumer|fetch>")
|
|
}
|
|
}
|