package main import ( "bufio" "context" "crypto/sha512" "fmt" "html/template" "io" "log" "net/http" "os" "os/signal" "strconv" "strings" "sync" "time" "github.com/IBM/sarama" "github.com/xdg/scram" ) // SCRAMClientGeneratorFunc 必须指向一个返回 sarama.SCRAMClient 接口的函数 // 这里的 func() sarama.SCRAMClient { ... } 就是所需的实现。 func createScramClient() sarama.SCRAMClient { // 传入用于 SCRAM 认证的 Hash 算法,这里是 SHA-512 return &XDGSCRAMClient{HashGeneratorFcn: sha512.New} } // XDGSCRAMClient 是对 github.com/xdg/scram/client 库的包装 // 使得它符合 Sarama 的 sarama.SCRAMClient 接口 type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn } // Begin 用于启动 SCRAM 认证流程 func (x *XDGSCRAMClient) Begin(userName, password, salt string) (err error) { x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, salt) if err != nil { return err } x.ClientConversation = x.Client.NewConversation() return nil } // Step 用于处理 SCRAM 认证中的每一步挑战和响应 func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { response, err = x.ClientConversation.Step(challenge) return } // Done 检查 SCRAM 认证是否完成 func (x *XDGSCRAMClient) Done() bool { return x.ClientConversation.Done() } // --- 配置常量 --- const ( // 假设 Kafka 运行在宿主机的 9092 端口 // BrokerList = "localhost:9092" // Topic = "test-topic" BrokerList = "k8s-kafka-kafkaclu-ea0121c7a5-0f29d8348027ca52.elb.us-west-2.amazonaws.com:9094" // prd // BrokerList = "k8s-middlewa-kafkakaf-8362de7238-60b500b1ed68bc69.elb.us-west-2.amazonaws.com:9094" Topic = "message-users" GroupID = "go-sarama-consumer-group-v1" // 消费组 ID ) // --- Web 服务相关常量 --- const ( WebServerAddress = ":8080" // Web 服务监听的地址和端口 ) // 初始化 Sarama 配置 func newConfig() *sarama.Config { config := sarama.NewConfig() // 假设您使用的 Kafka 版本为 4.0.0 或更高 (Sarama 库会兼容) config.Version = sarama.V4_0_0_0 // 请根据您的 Kafka 版本进行调整 // 生产者配置 config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认 config.Producer.Retry.Max = 5 config.Producer.Return.Successes = true // 必须开启,以便同步生产者可以等待确认 // 消费者配置:使用 Consumer Group 模式 if Topic == "message-users" { // 2. 启用 SASL config.Net.SASL.Enable = true config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return createScramClient() } config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 // 4. 设置用户名和密码 config.Net.SASL.User = "kafka-user" config.Net.SASL.Password = "Nrp0vwxgFJXfydV8zISPxjp8RSETjOEq" // prd // config.Net.SASL.Password = "oP0rACG2ka5EBVWfuAyR1fslH4yAlLZX" } config.Consumer.Return.Errors = true config.Consumer.Group.Session.Timeout = 20 * time.Second config.Metadata.Timeout = 15 * time.Second return config } // -------------------------------------------------------------------------------------- // 生产者逻辑 // -------------------------------------------------------------------------------------- func startProducer(config *sarama.Config) { // 使用同步生产者 (SyncProducer) producer, err := sarama.NewSyncProducer([]string{BrokerList}, config) if err != nil { log.Printf("Error creating sync producer: %v", err) // 改为 Printf 以避免程序退出 return } defer producer.Close() log.Println("Producer started. Sending messages from long1.txt...") f, err := os.Open("./long1.txt") if err != nil { log.Printf("failed to read file: %v", err) // 改为 Printf return } defer f.Close() bs, _ := io.ReadAll(f) str := string(bs) lines := strings.Split(str, "\n") messageID := 0 // 循环发送消息 for _, line := range lines { // 忽略空行 if strings.TrimSpace(line) == "" { continue } // 1. 构造 ProducerRecord msg := &sarama.ProducerMessage{ Topic: Topic, Key: sarama.StringEncoder(fmt.Sprintf("Key-%d", messageID)), Value: sarama.StringEncoder(line), } // 2. 发送消息 partition, offset, err := producer.SendMessage(msg) if err != nil { log.Printf("Failed to send message: %v", err) } else { log.Printf("Producer sent: Topic=%s | Partition=%d | Offset=%d | Value='%s'", Topic, partition, offset, line) messageID++ } // 3. 暂停 (根据需求调整) time.Sleep(10 * time.Millisecond) } log.Println("Producer finished sending all messages.") } // fetchPartitionMessages connects to a specific partition starting from the given offset, // writes up to limit messages into outputPath (one per line), then returns. // If filterKey is not empty, only messages with matching key are written. // If withTimestamp is true, prepends formatted timestamp to each message. func fetchPartitionMessages(config *sarama.Config, partition int32, offset int64, limit int, outputPath string, filterKey string, withTimestamp bool) error { if limit <= 0 { return fmt.Errorf("limit must be > 0") } client, err := sarama.NewClient([]string{BrokerList}, config) if err != nil { return err } defer client.Close() consumer, err := sarama.NewConsumerFromClient(client) if err != nil { return err } defer consumer.Close() pc, err := consumer.ConsumePartition(Topic, partition, offset) if err != nil { return err } defer pc.Close() outFile, err := os.Create(outputPath) if err != nil { return err } defer outFile.Close() writer := bufio.NewWriter(outFile) defer writer.Flush() received := 0 scanned := 0 filterEnabled := filterKey != "" // 设置超时和最大扫描数 timeout := time.After(15 * time.Second) maxScan := limit * 1000 // 如果过滤模式下扫描了这么多消息还没找到足够的匹配,就放弃 if !filterEnabled { maxScan = limit // 非过滤模式下,扫描数就是 limit } for received < limit && scanned < maxScan { select { case <-timeout: log.Printf("Fetch timeout after 30s. Retrieved %d messages (scanned %d)", received, scanned) return nil // 超时不算错误,返回已获取的消息 case err, ok := <-pc.Errors(): if !ok { return fmt.Errorf("partition consumer error channel closed") } if err != nil { return err.Err } case msg, ok := <-pc.Messages(): if !ok { log.Printf("Partition consumer closed. Retrieved %d messages (scanned %d)", received, scanned) return nil // channel 关闭说明没有更多消息了 } if msg == nil { continue } scanned++ // 如果启用了 key 过滤,检查 key 是否匹配 if filterEnabled { msgKey := string(msg.Key) if msgKey != filterKey { continue // 跳过不匹配的消息 } log.Printf("Found matching message %d/%d (scanned %d total)", received+1, limit, scanned) } // 构造要写入的内容 var line string if withTimestamp { // 格式化时间戳为易读格式: 2006-01-02 15:04:05.000 timestampStr := msg.Timestamp.Format("2006-01-02 15:04:05.000") line = fmt.Sprintf("%s %s\n", timestampStr, string(msg.Value)) } else { line = string(msg.Value) + "\n" } if _, err := writer.WriteString(line); err != nil { return err } received++ } } if scanned >= maxScan && received < limit { log.Printf("Reached max scan limit (%d messages scanned). Retrieved %d matching messages", scanned, received) } return nil } // -------------------------------------------------------------------------------------- // 消费者逻辑 (保持不变) // -------------------------------------------------------------------------------------- // ConsumerHandler 结构体实现了 sarama.ConsumerGroupHandler 接口 type ConsumerHandler struct{} // Setup 是在新会话开始时运行的,在 ConsumeClaim 之前 func (consumer *ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { log.Println("Consumer Group session setup complete.") return nil } // Cleanup 是在会话结束时运行的,在所有 ConsumeClaim 协程退出后 func (consumer *ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { log.Println("Consumer Group session cleanup complete.") return nil } // ConsumeClaim 必须启动一个消费循环,处理分配给当前 consumer 的分区消息 func (consumer *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { log.Printf("Consumer Received: Topic=%s | Partition=%d | Offset=%d | Value='%s' (Timestamp: %v)", message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp) session.MarkMessage(message, "") } log.Printf("Consumer for Partition %d stopped or rebalancing...", claim.Partition()) return nil } func startConsumer(config *sarama.Config) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() group, err := sarama.NewConsumerGroup([]string{BrokerList}, GroupID, config) if err != nil { log.Fatalf("Error creating consumer group client: %v", err) } defer group.Close() log.Printf("Consumer Group '%s' started. Listening on topic '%s'...", GroupID, Topic) wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() handler := ConsumerHandler{} for { if err := group.Consume(ctx, []string{Topic}, &handler); err != nil { if ctx.Err() != nil { return } log.Printf("Error from consumer group: %v", err) time.Sleep(5 * time.Second) } } }() signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) if _, ok := <-signals; ok { log.Println("Interrupt signal received. Shutting down consumer...") cancel() } wg.Wait() log.Println("Consumer shutdown complete.") } // -------------------------------------------------------------------------------------- // Web 服务逻辑 // -------------------------------------------------------------------------------------- // handleRoot 处理根路径请求,显示操作页面 func handleRoot(w http.ResponseWriter, r *http.Request) { // 定义一个简单的 HTML 页面模板 const htmlTemplate = `