diff --git a/.idea/GOHCache.xml b/.idea/GOHCache.xml
index bbc7165..476721a 100644
--- a/.idea/GOHCache.xml
+++ b/.idea/GOHCache.xml
@@ -17,20 +17,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -48,8 +34,6 @@
-
-
@@ -57,7 +41,6 @@
-
@@ -65,41 +48,17 @@
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -131,33 +90,17 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -165,6 +108,13 @@
+
+
+
+
+
+
+
@@ -175,8 +125,6 @@
-
-
@@ -184,7 +132,6 @@
-
@@ -192,7 +139,6 @@
-
@@ -200,7 +146,6 @@
-
@@ -219,140 +164,10 @@
-
-
-
-
-
-
-
-
+
\ No newline at end of file
diff --git a/tcp/tcp.go b/tcp/tcp.go
index a2c1354..e213816 100644
--- a/tcp/tcp.go
+++ b/tcp/tcp.go
@@ -2,11 +2,12 @@ package tcp
import (
"context"
+ "encoding/binary"
"fmt"
"sync"
"time"
- "git.magicany.cc/black1552/gf-common/pool"
+ "git.magicany.cc/black1552/gin-base/pool"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gtcp"
"github.com/gogf/gf/v2/os/glog"
@@ -181,6 +182,8 @@ func (s *TCPServer) receiveMessages(conn *TcpConnection) {
now := time.Now()
conn.Mutex.Lock()
conn.LastUsed = now
+ // 将读取的数据添加到连接的缓冲区
+ conn.buffer = append(conn.buffer, buffer[:n]...)
conn.Mutex.Unlock()
// 更新SQLite中的连接信息
@@ -193,33 +196,62 @@ func (s *TCPServer) receiveMessages(conn *TcpConnection) {
}
}
- // 处理消息
- data := make([]byte, n)
- copy(data, buffer[:n])
-
- msg := &TcpMessage{
- Id: fmt.Sprintf("msg_%d", gtime.TimestampNano()),
- ConnId: conn.Id,
- Data: data,
- Timestamp: time.Now(),
- IsSend: false,
- }
-
- // 使用协程池处理消息,避免阻塞
- grpool.AddWithRecover(s.ctx, func(ctx context.Context) {
- if s.MessageHandler != nil {
- if err := s.MessageHandler(conn, msg); err != nil {
- s.Logger.Error(s.ctx, fmt.Sprintf("Message handling error: %v", err))
- }
- }
- }, func(ctx context.Context, err error) {
- s.Logger.Error(ctx, fmt.Sprintf("Message handling error: %v", err))
- })
+ // 解析消息帧
+ s.parseMessageFrames(conn)
}
}
}
}
+// parseMessageFrames 解析消息帧
+func (s *TCPServer) parseMessageFrames(conn *TcpConnection) {
+ conn.Mutex.Lock()
+ defer conn.Mutex.Unlock()
+
+ for {
+ // 检查缓冲区是否有足够的数据来读取长度前缀
+ if len(conn.buffer) < messageLengthPrefixSize {
+ // 数据不足,等待下一次读取
+ return
+ }
+
+ // 读取长度前缀
+ length := binary.BigEndian.Uint32(conn.buffer[:messageLengthPrefixSize])
+
+ // 检查缓冲区是否有足够的数据来读取完整的消息
+ if len(conn.buffer) < messageLengthPrefixSize+int(length) {
+ // 数据不足,等待下一次读取
+ return
+ }
+
+ // 提取消息数据
+ data := conn.buffer[messageLengthPrefixSize : messageLengthPrefixSize+int(length)]
+
+ // 移除已处理的消息数据
+ conn.buffer = conn.buffer[messageLengthPrefixSize+int(length):]
+
+ // 创建消息对象
+ msg := &TcpMessage{
+ Id: fmt.Sprintf("msg_%d", gtime.TimestampNano()),
+ ConnId: conn.Id,
+ Data: data,
+ Timestamp: time.Now(),
+ IsSend: false,
+ }
+
+ // 使用协程池处理消息,避免阻塞
+ grpool.AddWithRecover(s.ctx, func(ctx context.Context) {
+ if s.MessageHandler != nil {
+ if err := s.MessageHandler(conn, msg); err != nil {
+ s.Logger.Error(s.ctx, fmt.Sprintf("Message handling error: %v", err))
+ }
+ }
+ }, func(ctx context.Context, err error) {
+ s.Logger.Error(ctx, fmt.Sprintf("Message handling error: %v", err))
+ })
+ }
+}
+
// SendTo 发送消息到指定连接
func (s *TCPServer) SendTo(connID string, data []byte) error {
conn := s.Connection.Get(connID)
@@ -249,8 +281,15 @@ func (s *TCPServer) sendMessage(conn *TcpConnection, data []byte) error {
// 设置写入超时
conn.Server.SetWriteDeadline(time.Now().Add(s.Config.WriteTimeout))
+ // 创建消息帧:4字节长度前缀 + 消息数据
+ frame := make([]byte, messageLengthPrefixSize+len(data))
+ // 写入长度前缀(大端序)
+ binary.BigEndian.PutUint32(frame[:messageLengthPrefixSize], uint32(len(data)))
+ // 写入消息数据
+ copy(frame[messageLengthPrefixSize:], data)
+
// 发送数据
- _, err := conn.Server.Write(data)
+ _, err := conn.Server.Write(frame)
if err != nil {
return err
}
diff --git a/tcp/tcpConfig.go b/tcp/tcpConfig.go
index 91f425c..5179872 100644
--- a/tcp/tcpConfig.go
+++ b/tcp/tcpConfig.go
@@ -7,6 +7,11 @@ import (
"github.com/gogf/gf/v2/net/gtcp"
)
+// 消息帧格式:4字节长度前缀 + 消息数据
+const (
+ messageLengthPrefixSize = 4 // 消息长度前缀大小(4字节)
+)
+
// TcpPoolConfig TCP连接池配置
type TcpPoolConfig struct {
BufferSize int `json:"bufferSize"` // 缓冲区大小
@@ -26,6 +31,7 @@ type TcpConnection struct {
LastUsed time.Time `json:"lastUsed"` // 最后使用时间
CreatedAt time.Time `json:"createdAt"` // 创建时间
Mutex sync.RWMutex `json:"-"` // 读写锁
+ buffer []byte `json:"-"` // 用于存储未处理的字节数据
}
// TcpMessage TCP消息结构