consumer.go

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/Shopify/sarama"
  5. "log"
  6. "os"
  7. "strings"
  8. "sync"
  9. )
  10. var (
  11. wg sync.WaitGroup
  12. logger = log.New(os.Stderr, "[srama]", log.LstdFlags)
  13. )
  14. func main() {
  15. sarama.Logger = logger
  16. consumer, err := sarama.NewConsumer(strings.Split("localhost:9092", ","), nil)
  17. if err != nil {
  18. logger.Println("Failed to start consumer: %s", err)
  19. }
  20. partitionList, err := consumer.Partitions("hello")
  21. if err != nil {
  22. logger.Println("Failed to get the list of partitions: ", err)
  23. }
  24. for partition := range partitionList {
  25. pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest)
  26. if err != nil {
  27. logger.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
  28. }
  29. defer pc.AsyncClose()
  30. wg.Add(1)
  31. go func(sarama.PartitionConsumer) {
  32. defer wg.Done()
  33. for msg := range pc.Messages() {
  34. fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
  35. fmt.Println()
  36. }
  37. }(pc)
  38. }
  39. wg.Wait()
  40. logger.Println("Done consuming topic hello")
  41. consumer.Close()
  42. }

producer.go

  1. package main
  2. import (
  3. "github.com/Shopify/sarama"
  4. "log"
  5. "os"
  6. "strings"
  7. )
  8. var (
  9. logger = log.New(os.Stderr, "[srama]", log.LstdFlags)
  10. )
  11. func main() {
  12. sarama.Logger = logger
  13. config := sarama.NewConfig()
  14. config.Producer.RequiredAcks = sarama.WaitForAll
  15. config.Producer.Partitioner = sarama.NewRandomPartitioner
  16. msg := &sarama.ProducerMessage{}
  17. msg.Topic = "hello"
  18. msg.Partition = int32(-1)
  19. msg.Key = sarama.StringEncoder("key")
  20. msg.Value = sarama.ByteEncoder("你好, 世界!")
  21. producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)
  22. if err != nil {
  23. logger.Println("Failed to produce message: %s", err)
  24. os.Exit(500)
  25. }
  26. defer producer.Close()
  27. partition, offset, err := producer.SendMessage(msg)
  28. if err != nil {
  29. logger.Println("Failed to produce message: ", err)
  30. }
  31. logger.Printf("partition=%d, offset=%d\n", partition, offset)
  32. }