🆕 新增MQ消息通道

This commit is contained in:
李寻欢
2024-02-19 14:17:26 +08:00
parent ea4262adf0
commit 1413181bd4
8 changed files with 211 additions and 29 deletions

46
mq/handler.go Normal file
View File

@@ -0,0 +1,46 @@
package mq
import (
"encoding/json"
"go-wechat/common/current"
"go-wechat/model"
"go-wechat/types"
"log"
"strings"
)
// parse
// @description: 解析消息
// @param msg
func parse(msg []byte) {
var m model.Message
if err := json.Unmarshal(msg, &m); err != nil {
log.Printf("消息解析失败: %v", err)
log.Printf("消息内容: %d -> %v", len(msg), string(msg))
return
}
// 记录原始数据
m.Raw = string(msg)
// 提取出群成员信息
// Sys类型的消息正文不包含微信 Id所以不需要处理
if m.IsGroup() && m.Type != types.MsgTypeSys {
// 群消息,处理一下消息和发信人
groupUser := strings.Split(m.Content, "\n")[0]
groupUser = strings.ReplaceAll(groupUser, ":", "")
// 如果两个id一致说明是系统发的
if m.FromUser != groupUser {
m.GroupUser = groupUser
}
// 用户的操作单独提出来处理一下
m.Content = strings.Join(strings.Split(m.Content, "\n")[1:], "\n")
}
log.Printf("收到新微信消息\n消息来源: %s\n群成员: %s\n消息类型: %v\n消息内容: %s", m.FromUser, m.GroupUser, m.Type, m.Content)
// 插件不为空,开始执行
if p := current.GetRobotMessageHandler(); p != nil {
p(&m)
}
return
}

114
mq/rabbitmq.go Normal file
View File

@@ -0,0 +1,114 @@
package mq
import (
amqp "github.com/rabbitmq/amqp091-go"
"go-wechat/config"
"log"
)
// MQ连接对象
var conn *amqp.Connection
var channel *amqp.Channel
// 交换机名称
const exchangeName = "wechat-message"
// Init
// @description: 初始化MQ
func Init() {
// 读取MQ连接配置
mqUrl := config.Conf.Mq.RabbitMQ.GetURL()
if mqUrl == "" {
log.Panicf("MQ配置异常")
}
var err error
// 创建MQ连接
if conn, err = amqp.Dial(mqUrl); err != nil {
log.Panicf("RabbitMQ连接失败: %s", err)
}
//获取channel
if channel, err = conn.Channel(); err != nil {
log.Panicf("打开Channel失败: %s", err)
}
log.Println("RabbitMQ连接成功")
go Receive()
log.Println("开始监听消息")
}
// Receive
// @description: 接收消息
func Receive() (err error) {
// 创建交换机
if err = channel.ExchangeDeclare(
exchangeName,
"fanout",
true,
false,
//true表示这个exchange不可以被client用来推送消息仅用来进行exchange和exchange之间的绑定
false,
false,
nil,
); err != nil {
log.Printf("声明Exchange失败: %s", err)
return
}
// 创建队列
var q amqp.Queue
q, err = channel.QueueDeclare(
"", //随机生产队列名称
false,
false,
true,
false,
nil,
)
if err != nil {
log.Printf("无法声明Queue: %s", err)
return
}
// 绑定队列到 exchange 中
err = channel.QueueBind(
q.Name,
//在pub/sub模式下这里的key要为空
"",
exchangeName,
false,
nil)
if err != nil {
log.Printf("绑定队列失败: %s", err)
return
}
// 消费消息
var messages <-chan amqp.Delivery
messages, err = channel.Consume(
q.Name,
"",
false, // 不自动ack手动处理这样即使消费者挂掉消息也不会丢失
false,
false,
false,
nil,
)
if err != nil {
log.Printf("无法使用队列: %s", err)
return
}
for {
msg, ok := <-messages
if !ok {
log.Printf("获取消息失败")
return Receive()
}
log.Printf("收到消息: %s", msg.Body)
parse(msg.Body)
// ACK消息
if err = msg.Ack(true); err != nil {
log.Printf("ACK消息失败: %s", err)
continue
}
}
}