Searching...

gkafka

gkafka模块实现了对kafka消息队列系统的客户端功能封装,支持分组消费指定起始位置等特性,并提供简便易用的API接口。

模块安装

go get -u github.com/gogf/gkafka

或者使用go.mod:

require github.com/gogf/gkafka latest

使用方式

import "github.com/gogf/gkafka"

接口文档

godoc.org/github.com/gogf/gkafka

使用示例

生产者

package main

import (
    "fmt"
    "github.com/gogf/gkafka"
    "time"
)

func newKafkaClientProducer(topic string) *gkafka.Client {
    kafkaConfig               := gkafka.NewConfig()
    kafkaConfig.Servers        = "localhost:9092"
    kafkaConfig.AutoMarkOffset = false
    kafkaConfig.Topics         = topic
    return gkafka.NewClient(kafkaConfig)
}

func main()  {
    client := newKafkaClientProducer("test")
    defer client.Close()
    for {
        s := time.Now().String()
        fmt.Println("produce:", s)
        if err := client.SyncSend(&gkafka.Message{Value: []byte(s)}); err != nil {
            fmt.Println(err)
        }
        time.Sleep(time.Second)
    }
}

消费者

package main

import (
    "fmt"
    "github.com/gogf/gkafka"
)

func newKafkaClientConsumer(topic, group string) *gkafka.Client {
    kafkaConfig               := gkafka.NewConfig()
    kafkaConfig.Servers        = "localhost:9092"
    kafkaConfig.AutoMarkOffset = false
    kafkaConfig.Topics         = topic
    kafkaConfig.GroupId        = group
    return gkafka.NewClient(kafkaConfig)
}

func main()  {
    group  := "test-group"
    topic  := "test"
    client := newKafkaClientConsumer(topic, group)
    defer client.Close()
    for {
        if msg, err := client.Receive(); err != nil {
            fmt.Println(err)
            break
        } else {
            fmt.Println("consume:", msg.Partition, msg.Offset, string(msg.Value))
            msg.MarkOffset()
        }
    }
}