Golang快速接入RocketMQ实现消息队列业务

D
dashen46 2025-01-22T08:02:12+08:00
0 0 212

引言

消息队列是现代分布式系统中常用的组件,它能够实现高效、可靠的消息传递。RocketMQ是阿里巴巴开源的一款分布式消息队列系统,具有高可用、高性能、高可靠等特点。本文将介绍如何使用Golang快速接入RocketMQ,以实现消息队列业务。

目录

  1. 环境准备
  2. 安装RocketMQ Go客户端
  3. 创建Producer
  4. 创建Consumer
  5. 使用RocketMQ实现消息队列业务
  6. 总结

1. 环境准备

在开始之前,我们需要确保已经安装好了以下环境:

2. 安装RocketMQ Go客户端

Golang提供了许多与RocketMQ交互的开源库,本文使用的是github.com/apache/rocketmq-client-go

使用以下命令安装RocketMQ Go客户端:

go get github.com/apache/rocketmq-client-go

3. 创建Producer

首先,我们需要创建一个Producer,用于发送消息到RocketMQ。

package main

import (
    "fmt"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
    p, _ := rocketmq.NewProducer(
        producer.WithNameServer([]string{"127.0.0.1:9876"}),
    )
    err := p.Start()
    if err != nil {
        panic(err)
    }
    defer p.Shutdown()

    msg := &primitive.Message{
        Topic: "test_topic",
        Body:  []byte("Hello RocketMQ!"),
    }
    result, err := p.SendSync(context.Background(), msg)
    if err != nil {
        panic(err)
    }
    fmt.Printf("Send message success. Result: %v\n", result)
}

在上述代码中,首先需要创建一个Producer实例,并指定RocketMQ的NameServer地址。然后,调用Start()方法启动Producer实例,成功启动后我们可以通过SendSync()方法发送同步消息。通过上述代码,我们成功发送了一条消息到名为test_topic的主题。

4. 创建Consumer

接下来,我们需要创建一个Consumer,用于从RocketMQ接收消息。

package main

import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
)

type TestMessageListener struct{}

func (l *TestMessageListener) ConsumeMessage(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    for _, msg := range msgs {
        fmt.Printf("Receive message: %s\n", msg.Body)
    }
    return consumer.ConsumeSuccess, nil
}

func main() {
    c, _ := rocketmq.NewPushConsumer(
        consumer.WithNameServer([]string{"127.0.0.1:9876"}),
    )
    err := c.Subscribe("test_topic", consumer.MessageSelector{}, &TestMessageListener{})
    if err != nil {
        panic(err)
    }
    err = c.Start()
    if err != nil {
        panic(err)
    }
    defer c.Shutdown()

    // Keep running
    select {}
}

在上述代码中,我们创建了一个名为TestMessageListener的消息监听器。它实现了ConsumeMessage方法,用于处理从RocketMQ接收到的消息。在main函数中,我们创建了一个PushConsumer实例,并指定了RocketMQ的NameServer地址。然后,我们订阅名为test_topic的主题,指定了消息选择器为空,并将消息监听器设置为TestMessageListener。最后,调用Start()方法启动Consumer实例,并通过一个空的select{}使程序保持运行。这样,我们就成功创建了一个Consumer,并可以接收从RocketMQ传递过来的消息。

5. 使用RocketMQ实现消息队列业务

在实际的业务中,我们可以使用RocketMQ来实现许多有用的场景,例如消息广播、消息订阅、消息顺序发送等。

下面是一个使用RocketMQ实现消息广播的示例代码:

package main

import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "github.com/apache/rocketmq-client-go/v2/producer"
)

type TestMessageListener struct{}

func (l *TestMessageListener) ConsumeMessage(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    for _, msg := range msgs {
        fmt.Printf("Receive message: %s\n", msg.Body)
    }
    return consumer.ConsumeSuccess, nil
}

func main() {
    p, _ := rocketmq.NewProducer(
        producer.WithNameServer([]string{"127.0.0.1:9876"}),
    )
    err := p.Start()
    if err != nil {
        panic(err)
    }
    defer p.Shutdown()

    // 发送消息到主题topic1,并广播给所有消费者
    msg := &primitive.Message{
        Topic: "topic1",
        Body:  []byte("Hello RocketMQ!"),
    }
    _, err = p.SendSync(context.Background(), msg)
    if err != nil {
        panic(err)
    }

    c, _ := rocketmq.NewPushConsumer(
        consumer.WithNameServer([]string{"127.0.0.1:9876"}),
    )
    err = c.Subscribe("topic1", consumer.MessageSelector{}, &TestMessageListener{})
    if err != nil {
        panic(err)
    }
    err = c.Start()
    if err != nil {
        panic(err)
    }
    defer c.Shutdown()

    // Keep running
    select {}
}

在上述代码中,我们首先创建了一个Producer实例,并通过SendSync()方法发送一条消息到名为topic1的主题。然后,我们创建了一个PushConsumer实例,并通过Subscribe()方法订阅名为topic1的主题,并将消息监听器设置为TestMessageListener。最后,我们通过一个空的select{}使程序保持运行。

6. 总结

经过以上步骤,我们成功地使用Golang接入RocketMQ,实现了消息队列业务。RocketMQ具有高可用、高性能、高可靠等特点,并且Golang提供了许多与RocketMQ交互的开源库,非常方便快捷。希望本文能够对你的开发工作有所帮助!

参考链接:

相似文章

    评论 (0)