package main import ( "bufio" "context" "crypto/sha512" "fmt" "html/template" "io" "log" "net/http" "os" "os/signal" "strconv" "strings" "sync" "time" "github.com/IBM/sarama" "github.com/xdg/scram" ) // SCRAMClientGeneratorFunc 必须指向一个返回 sarama.SCRAMClient 接口的函数 // 这里的 func() sarama.SCRAMClient { ... } 就是所需的实现。 func createScramClient() sarama.SCRAMClient { // 传入用于 SCRAM 认证的 Hash 算法,这里是 SHA-512 return &XDGSCRAMClient{HashGeneratorFcn: sha512.New} } // XDGSCRAMClient 是对 github.com/xdg/scram/client 库的包装 // 使得它符合 Sarama 的 sarama.SCRAMClient 接口 type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn } // Begin 用于启动 SCRAM 认证流程 func (x *XDGSCRAMClient) Begin(userName, password, salt string) (err error) { x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, salt) if err != nil { return err } x.ClientConversation = x.Client.NewConversation() return nil } // Step 用于处理 SCRAM 认证中的每一步挑战和响应 func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { response, err = x.ClientConversation.Step(challenge) return } // Done 检查 SCRAM 认证是否完成 func (x *XDGSCRAMClient) Done() bool { return x.ClientConversation.Done() } // --- 配置常量 --- const ( // 假设 Kafka 运行在宿主机的 9092 端口 // BrokerList = "localhost:9092" // Topic = "test-topic" BrokerList = "k8s-kafka-kafkaclu-ea0121c7a5-0f29d8348027ca52.elb.us-west-2.amazonaws.com:9094" // prd // BrokerList = "k8s-middlewa-kafkakaf-8362de7238-60b500b1ed68bc69.elb.us-west-2.amazonaws.com:9094" Topic = "message-users" GroupID = "go-sarama-consumer-group-v1" // 消费组 ID ) // --- Web 服务相关常量 --- const ( WebServerAddress = ":8080" // Web 服务监听的地址和端口 ) // 初始化 Sarama 配置 func newConfig() *sarama.Config { config := sarama.NewConfig() // 假设您使用的 Kafka 版本为 4.0.0 或更高 (Sarama 库会兼容) config.Version = sarama.V4_0_0_0 // 请根据您的 Kafka 版本进行调整 // 生产者配置 config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认 config.Producer.Retry.Max = 5 config.Producer.Return.Successes = true // 必须开启,以便同步生产者可以等待确认 // 消费者配置:使用 Consumer Group 模式 if Topic == "message-users" { // 2. 启用 SASL config.Net.SASL.Enable = true config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return createScramClient() } config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 // 4. 设置用户名和密码 config.Net.SASL.User = "kafka-user" config.Net.SASL.Password = "Nrp0vwxgFJXfydV8zISPxjp8RSETjOEq" // prd // config.Net.SASL.Password = "oP0rACG2ka5EBVWfuAyR1fslH4yAlLZX" } config.Consumer.Return.Errors = true config.Consumer.Group.Session.Timeout = 20 * time.Second config.Metadata.Timeout = 15 * time.Second return config } // -------------------------------------------------------------------------------------- // 生产者逻辑 // -------------------------------------------------------------------------------------- func startProducer(config *sarama.Config) { // 使用同步生产者 (SyncProducer) producer, err := sarama.NewSyncProducer([]string{BrokerList}, config) if err != nil { log.Printf("Error creating sync producer: %v", err) // 改为 Printf 以避免程序退出 return } defer producer.Close() log.Println("Producer started. Sending messages from long1.txt...") f, err := os.Open("./long1.txt") if err != nil { log.Printf("failed to read file: %v", err) // 改为 Printf return } defer f.Close() bs, _ := io.ReadAll(f) str := string(bs) lines := strings.Split(str, "\n") messageID := 0 // 循环发送消息 for _, line := range lines { // 忽略空行 if strings.TrimSpace(line) == "" { continue } // 1. 构造 ProducerRecord msg := &sarama.ProducerMessage{ Topic: Topic, Key: sarama.StringEncoder(fmt.Sprintf("Key-%d", messageID)), Value: sarama.StringEncoder(line), } // 2. 发送消息 partition, offset, err := producer.SendMessage(msg) if err != nil { log.Printf("Failed to send message: %v", err) } else { log.Printf("Producer sent: Topic=%s | Partition=%d | Offset=%d | Value='%s'", Topic, partition, offset, line) messageID++ } // 3. 暂停 (根据需求调整) time.Sleep(10 * time.Millisecond) } log.Println("Producer finished sending all messages.") } // fetchPartitionMessages connects to a specific partition starting from the given offset, // writes up to limit messages into outputPath (one per line), then returns. // If filterKey is not empty, only messages with matching key are written. // If withTimestamp is true, prepends formatted timestamp to each message. func fetchPartitionMessages(config *sarama.Config, partition int32, offset int64, limit int, outputPath string, filterKey string, withTimestamp bool) error { if limit <= 0 { return fmt.Errorf("limit must be > 0") } client, err := sarama.NewClient([]string{BrokerList}, config) if err != nil { return err } defer client.Close() consumer, err := sarama.NewConsumerFromClient(client) if err != nil { return err } defer consumer.Close() pc, err := consumer.ConsumePartition(Topic, partition, offset) if err != nil { return err } defer pc.Close() outFile, err := os.Create(outputPath) if err != nil { return err } defer outFile.Close() writer := bufio.NewWriter(outFile) defer writer.Flush() received := 0 scanned := 0 filterEnabled := filterKey != "" // 设置超时和最大扫描数 timeout := time.After(15 * time.Second) maxScan := limit * 1000 // 如果过滤模式下扫描了这么多消息还没找到足够的匹配,就放弃 if !filterEnabled { maxScan = limit // 非过滤模式下,扫描数就是 limit } for received < limit && scanned < maxScan { select { case <-timeout: log.Printf("Fetch timeout after 30s. Retrieved %d messages (scanned %d)", received, scanned) return nil // 超时不算错误,返回已获取的消息 case err, ok := <-pc.Errors(): if !ok { return fmt.Errorf("partition consumer error channel closed") } if err != nil { return err.Err } case msg, ok := <-pc.Messages(): if !ok { log.Printf("Partition consumer closed. Retrieved %d messages (scanned %d)", received, scanned) return nil // channel 关闭说明没有更多消息了 } if msg == nil { continue } scanned++ // 如果启用了 key 过滤,检查 key 是否匹配 if filterEnabled { msgKey := string(msg.Key) if msgKey != filterKey { continue // 跳过不匹配的消息 } log.Printf("Found matching message %d/%d (scanned %d total)", received+1, limit, scanned) } // 构造要写入的内容 var line string if withTimestamp { // 格式化时间戳为易读格式: 2006-01-02 15:04:05.000 timestampStr := msg.Timestamp.Format("2006-01-02 15:04:05.000") line = fmt.Sprintf("%s %s\n", timestampStr, string(msg.Value)) } else { line = string(msg.Value) + "\n" } if _, err := writer.WriteString(line); err != nil { return err } received++ } } if scanned >= maxScan && received < limit { log.Printf("Reached max scan limit (%d messages scanned). Retrieved %d matching messages", scanned, received) } return nil } // -------------------------------------------------------------------------------------- // 消费者逻辑 (保持不变) // -------------------------------------------------------------------------------------- // ConsumerHandler 结构体实现了 sarama.ConsumerGroupHandler 接口 type ConsumerHandler struct{} // Setup 是在新会话开始时运行的,在 ConsumeClaim 之前 func (consumer *ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { log.Println("Consumer Group session setup complete.") return nil } // Cleanup 是在会话结束时运行的,在所有 ConsumeClaim 协程退出后 func (consumer *ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { log.Println("Consumer Group session cleanup complete.") return nil } // ConsumeClaim 必须启动一个消费循环,处理分配给当前 consumer 的分区消息 func (consumer *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { log.Printf("Consumer Received: Topic=%s | Partition=%d | Offset=%d | Value='%s' (Timestamp: %v)", message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp) session.MarkMessage(message, "") } log.Printf("Consumer for Partition %d stopped or rebalancing...", claim.Partition()) return nil } func startConsumer(config *sarama.Config) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() group, err := sarama.NewConsumerGroup([]string{BrokerList}, GroupID, config) if err != nil { log.Fatalf("Error creating consumer group client: %v", err) } defer group.Close() log.Printf("Consumer Group '%s' started. Listening on topic '%s'...", GroupID, Topic) wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() handler := ConsumerHandler{} for { if err := group.Consume(ctx, []string{Topic}, &handler); err != nil { if ctx.Err() != nil { return } log.Printf("Error from consumer group: %v", err) time.Sleep(5 * time.Second) } } }() signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) if _, ok := <-signals; ok { log.Println("Interrupt signal received. Shutting down consumer...") cancel() } wg.Wait() log.Println("Consumer shutdown complete.") } // -------------------------------------------------------------------------------------- // Web 服务逻辑 // -------------------------------------------------------------------------------------- // handleRoot 处理根路径请求,显示操作页面 func handleRoot(w http.ResponseWriter, r *http.Request) { // 定义一个简单的 HTML 页面模板 const htmlTemplate = ` Kafka Web Producer

Kafka 消息生产者

` // 解析并执行模板 tmpl, err := template.New("index").Parse(htmlTemplate) if err != nil { http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } tmpl.Execute(w, nil) } // handleStartProducer 处理开始生产消息的请求 func handleStartProducer(config *sarama.Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) return } log.Println("Received request to start producer...") // 使用 goroutine 在后台运行生产者,避免阻塞 HTTP 响应 go startProducer(config) // 立即返回响应给前端 w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) fmt.Fprintln(w, `{"message": "生产者已在后台启动,请查看服务器日志确认消息发送状态。"}`) } } // startWebServer 启动 Web 服务 func startWebServer(config *sarama.Config) { // 注册 HTTP 路由处理器 http.HandleFunc("/", handleRoot) http.HandleFunc("/start-producer", handleStartProducer(config)) log.Printf("Web server starting on %s...", WebServerAddress) log.Println("Visit http://localhost:8080 in your browser.") // 启动 HTTP 服务器 if err := http.ListenAndServe(WebServerAddress, nil); err != nil { log.Fatalf("Failed to start web server: %v", err) } } // -------------------------------------------------------------------------------------- // 主函数与命令行处理 // -------------------------------------------------------------------------------------- func main() { if len(os.Args) < 2 { fmt.Println("Usage: go run main.go ") fmt.Println(" web: Starts the web server to trigger the producer.") fmt.Println(" consumer: Starts the Kafka consumer.") fmt.Println(" fetch [key] [--with-timestamp]: Fetches messages to a file.") fmt.Println(" If [key] is provided, only messages with that key will be fetched.") fmt.Println(" If --with-timestamp is provided, timestamp will be prepended to each message.") return } // 初始化配置 config := newConfig() role := strings.ToLower(os.Args[1]) switch role { case "web": startWebServer(config) // 启动 Web 服务 case "consumer": startConsumer(config) // 启动消费者 case "fetch": if len(os.Args) < 6 { fmt.Println("Usage: go run main.go fetch [key] [--with-timestamp]") return } partitionVal, err := strconv.Atoi(os.Args[2]) if err != nil { log.Fatalf("invalid partition: %v", err) } offsetArg := strings.ToLower(os.Args[3]) var offset int64 switch offsetArg { case "oldest": offset = sarama.OffsetOldest case "latest": offset = sarama.OffsetNewest default: offset, err = strconv.ParseInt(offsetArg, 10, 64) if err != nil { log.Fatalf("invalid offset: %v", err) } } limit, err := strconv.Atoi(os.Args[4]) if err != nil { log.Fatalf("invalid count: %v", err) } outputPath := os.Args[5] // 可选的 key 参数和 timestamp 标志 var filterKey string withTimestamp := false // 检查剩余的参数 for i := 6; i < len(os.Args); i++ { arg := os.Args[i] if arg == "--with-timestamp" || arg == "-t" { withTimestamp = true } else if filterKey == "" { // 第一个非标志参数作为 filterKey filterKey = arg } } if filterKey != "" { log.Printf("Filtering by key: %s", filterKey) } if withTimestamp { log.Printf("Will include timestamps in output") } if err := fetchPartitionMessages(config, int32(partitionVal), offset, limit, outputPath, filterKey, withTimestamp); err != nil { log.Fatalf("fetch failed: %v", err) } offsetDesc := offsetArg if offsetArg != "oldest" && offsetArg != "latest" { offsetDesc = fmt.Sprintf("%d", offset) } if filterKey != "" { log.Printf("Fetched %d messages with key '%s' from partition %d starting at %s into %s", limit, filterKey, partitionVal, offsetDesc, outputPath) } else { log.Printf("Fetched %d messages from partition %d starting at %s into %s", limit, partitionVal, offsetDesc, outputPath) } default: fmt.Println("Invalid argument. Usage: go run main.go ") } }