gf-common/mqtt/client/mqtt.go

221 lines
5.9 KiB
Go

package client
import (
"context"
"fmt"
"sync"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gogf/gf/v2/os/glog"
)
// Client MQTT客户端结构
type Client struct {
client mqtt.Client
opts *mqtt.ClientOptions
ctx context.Context
subscribed map[string]byte
subMutex sync.RWMutex
callback mqtt.MessageHandler
// 错误处理相关
onConnectionLost func(error)
onReconnect func()
onConnect func()
onSubscriptionError func(error)
onPublishError func(error)
}
// NewClientWithAuth 创建带用户名密码认证的MQTT客户端
func NewClientWithAuth(ctx context.Context, broker, clientId, username, password string) *Client {
c := &Client{
ctx: ctx,
subscribed: make(map[string]byte),
}
opts := mqtt.NewClientOptions()
opts.AddBroker(broker)
opts.SetClientID(clientId)
opts.SetUsername(username)
opts.SetPassword(password)
// 设置连接丢失处理函数
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
glog.Error(ctx, "MQTT连接断开:", err)
if c.onConnectionLost != nil {
c.onConnectionLost(err)
}
})
// 设置重连处理函数
opts.SetReconnectingHandler(func(client mqtt.Client, opts *mqtt.ClientOptions) {
glog.Info(ctx, "MQTT客户端正在尝试重连...")
if c.onReconnect != nil {
c.onReconnect()
}
})
// 设置连接成功处理函数,重连成功后重新订阅主题
opts.SetOnConnectHandler(func(client mqtt.Client) {
glog.Info(ctx, "MQTT客户端重新连接成功")
if c.onConnect != nil {
c.onConnect()
}
// 重连成功后重新订阅主题
go c.resubscribe()
})
// 设置其他选项...
mqttClient := mqtt.NewClient(opts)
c.client = mqttClient
c.opts = opts
return c
}
// Connect 连接到MQTT服务器
func (c *Client) Connect() error {
glog.Info(c.ctx, "开始连接到MQTT服务器...")
token := c.client.Connect()
glog.Info(c.ctx, "等待连接完成...")
// 使用更长的超时时间,避免网络延迟导致连接失败
if token.WaitTimeout(30 * time.Second) {
glog.Info(c.ctx, "连接操作完成")
if token.Error() != nil {
err := fmt.Errorf("连接到MQTT代理时发生错误: %w", token.Error())
glog.Error(c.ctx, "连接MQTT服务器时发生错误:", token.Error())
return err
}
} else {
// 连接超时
err := fmt.Errorf("连接到MQTT服务器超时")
glog.Error(c.ctx, "连接MQTT服务器超时")
return err
}
glog.Info(c.ctx, "成功连接到MQTT服务器")
return nil
}
// Disconnect 断开MQTT连接
func (c *Client) Disconnect() {
if c.client.IsConnected() {
c.client.Disconnect(250)
glog.Info(c.ctx, "已断开MQTT连接")
}
}
// SubscribeMultiple 同时订阅多个主题
func (c *Client) SubscribeMultiple(topics map[string]byte, callback mqtt.MessageHandler) error {
// 保存订阅信息
c.subMutex.Lock()
for topic, qos := range topics {
c.subscribed[topic] = qos
}
c.callback = callback
c.subMutex.Unlock()
token := c.client.SubscribeMultiple(topics, callback)
// 增加订阅超时时间
if token.WaitTimeout(30*time.Second) && token.Error() != nil {
err := fmt.Errorf("同时订阅多个主题出现错误: %w", token.Error())
glog.Error(c.ctx, "订阅主题时发生错误:", token.Error())
if c.onSubscriptionError != nil {
c.onSubscriptionError(err)
}
return err
}
// 检查订阅是否成功
if token.WaitTimeout(30 * time.Second) {
glog.Info(c.ctx, "成功订阅主题:", topics)
} else {
err := fmt.Errorf("订阅主题超时: %v", topics)
glog.Error(c.ctx, "订阅主题超时:", topics)
if c.onSubscriptionError != nil {
c.onSubscriptionError(err)
}
return err
}
return nil
}
// Publish 发布消息
func (c *Client) Publish(topic string, qos byte, retained bool, payload interface{}) error {
token := c.client.Publish(topic, qos, retained, payload)
// 增加发布超时时间
if token.WaitTimeout(30*time.Second) && token.Error() != nil {
err := fmt.Errorf("发送消息到主题%s出现错误: %w", topic, token.Error())
glog.Error(c.ctx, "发布消息到主题", topic, "时发生错误:", token.Error())
if c.onPublishError != nil {
c.onPublishError(err)
}
return err
}
// 检查发布是否成功
if token.WaitTimeout(30 * time.Second) {
glog.Info(c.ctx, "成功发布消息到主题:", topic)
} else {
err := fmt.Errorf("发布消息到主题%s超时", topic)
glog.Error(c.ctx, "发布消息到主题超时:", topic)
if c.onPublishError != nil {
c.onPublishError(err)
}
return err
}
return nil
}
// resubscribe 重新订阅主题
func (c *Client) resubscribe() {
c.subMutex.RLock()
defer c.subMutex.RUnlock()
if len(c.subscribed) == 0 {
glog.Info(c.ctx, "没有需要重新订阅的主题")
return
}
// 复制订阅信息避免并发问题
topics := make(map[string]byte)
for topic, qos := range c.subscribed {
topics[topic] = qos
}
glog.Info(c.ctx, "开始重新订阅主题:", topics)
// 增加重新订阅的超时时间
token := c.client.SubscribeMultiple(topics, c.callback)
if token.WaitTimeout(30 * time.Second) {
if token.Error() != nil {
err := fmt.Errorf("重新订阅主题时发生错误: %w", token.Error())
glog.Error(c.ctx, "重新订阅主题时发生错误:", token.Error())
if c.onSubscriptionError != nil {
c.onSubscriptionError(err)
}
} else {
glog.Info(c.ctx, "重新订阅主题成功")
}
} else {
err := fmt.Errorf("重新订阅主题超时")
glog.Error(c.ctx, "重新订阅主题超时")
if c.onSubscriptionError != nil {
c.onSubscriptionError(err)
}
}
}
// IsConnected 检查是否连接
func (c *Client) IsConnected() bool {
isConnected := c.client.IsConnected()
glog.Debug(c.ctx, "检查连接状态:", isConnected)
return isConnected
}
// Subscribe 单独订阅一个主题
func (c *Client) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) error {
topics := map[string]byte{topic: qos}
return c.SubscribeMultiple(topics, callback)
}