refactor(tcp): 重构TCP消息处理机制

- 修改导入路径从gf-common到gin-base
- 实现消息帧解析功能,支持4字节长度前缀协议
- 添加连接缓冲区机制处理分包粘包问题
- 移除旧的消息处理逻辑并替换为新的帧解析方法
- 更新消息发送逻辑以使用帧格式传输数据
- 调整GOHCache配置文件以匹配新的代码结构
main v1.0.1007
black1552 2026-03-02 09:25:43 +08:00
parent 2520655fbd
commit 86116618e4
3 changed files with 107 additions and 242 deletions

View File

@ -17,20 +17,6 @@
</set>
</value>
</entry>
<entry key="BadgerDB">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/db/badger.go" />
</set>
</value>
</entry>
<entry key="BadgerPool">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/pool/badger.go" />
</set>
</value>
</entry>
<entry key="BaseConfig">
<value>
<set>
@ -48,8 +34,6 @@
<entry key="Config">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/db/badger.go" />
<option value="file://$PROJECT_DIR$/../gf-common/server/ws/websocket.go" />
<option value="file://$PROJECT_DIR$/ws/websocket.go" />
</set>
</value>
@ -57,7 +41,6 @@
<entry key="Connection">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/server/ws/websocket.go" />
<option value="file://$PROJECT_DIR$/ws/websocket.go" />
</set>
</value>
@ -65,41 +48,17 @@
<entry key="ConnectionInfo">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/pool/badger.go" />
</set>
</value>
</entry>
<entry key="ConnectionMeta">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/db/connection_store.go" />
<option value="file://$PROJECT_DIR$/pool/common.go" />
</set>
</value>
</entry>
<entry key="ConnectionPool">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/tcp/tcp.go" />
<option value="file://$PROJECT_DIR$/../gf-common/tcp/tcp_badger.go" />
<option value="file://$PROJECT_DIR$/tcp/tcp.go" />
</set>
</value>
</entry>
<entry key="ConnectionPoolBadger">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/tcp/tcpConfig.go" />
<option value="file://$PROJECT_DIR$/../gf-common/tcp/tcp_badger.go" />
</set>
</value>
</entry>
<entry key="ConnectionStore">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/db/connection_store.go" />
</set>
</value>
</entry>
<entry key="Curd">
<value>
<set>
@ -131,33 +90,17 @@
<entry key="Manager">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/server/ws/websocket.go" />
<option value="file://$PROJECT_DIR$/ws/websocket.go" />
</set>
</value>
</entry>
<entry key="ManagerBadger">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/server/ws/websocket_badger.go" />
</set>
</value>
</entry>
<entry key="Msg">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/server/ws/websocket.go" />
<option value="file://$PROJECT_DIR$/ws/websocket.go" />
</set>
</value>
</entry>
<entry key="OfflineMessage">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/db/connection_store.go" />
</set>
</value>
</entry>
<entry key="Paginate">
<value>
<set>
@ -165,6 +108,13 @@
</set>
</value>
</entry>
<entry key="SQLitePool">
<value>
<set>
<option value="file://$PROJECT_DIR$/pool/sqlite.go" />
</set>
</value>
</entry>
<entry key="ServerConfig">
<value>
<set>
@ -175,8 +125,6 @@
<entry key="TCPServer">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/tcp/tcp.go" />
<option value="file://$PROJECT_DIR$/../gf-common/tcp/tcp_badger.go" />
<option value="file://$PROJECT_DIR$/tcp/tcp.go" />
</set>
</value>
@ -184,7 +132,6 @@
<entry key="TcpConnection">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/tcp/tcpConfig.go" />
<option value="file://$PROJECT_DIR$/tcp/tcpConfig.go" />
</set>
</value>
@ -192,7 +139,6 @@
<entry key="TcpMessage">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/tcp/tcpConfig.go" />
<option value="file://$PROJECT_DIR$/tcp/tcpConfig.go" />
</set>
</value>
@ -200,7 +146,6 @@
<entry key="TcpPoolConfig">
<value>
<set>
<option value="file://$PROJECT_DIR$/../gf-common/tcp/tcpConfig.go" />
<option value="file://$PROJECT_DIR$/tcp/tcpConfig.go" />
</set>
</value>
@ -219,140 +164,10 @@
</set>
</value>
</entry>
<entry key="sTcp">
<value>
<set>
<option value="file://$PROJECT_DIR$/../yingji/socket-server/internal/service/tcp.go" />
</set>
</value>
</entry>
</map>
</option>
<option name="scannedPathMapping">
<map>
<entry key="file://$PROJECT_DIR$/../gf-common/db/badger.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772154321394" />
<option name="schema">
<list>
<option value="BadgerDB" />
<option value="Config" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/../gf-common/db/connection_store.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772154031118" />
<option name="schema">
<list>
<option value="ConnectionMeta" />
<option value="OfflineMessage" />
<option value="ConnectionStore" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/../gf-common/example_badger.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772154243481" />
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/../gf-common/pool/badger.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772156220301" />
<option name="schema">
<list>
<option value="ConnectionInfo" />
<option value="BadgerPool" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/../gf-common/server/ws/example.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772156267433" />
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/../gf-common/server/ws/websocket.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772156237755" />
<option name="schema">
<list>
<option value="Config" />
<option value="Connection" />
<option value="Manager" />
<option value="Msg" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/../gf-common/server/ws/websocket_badger.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772154194371" />
<option name="schema">
<list>
<option value="ManagerBadger" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/../gf-common/tcp/example.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772156261275" />
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/../gf-common/tcp/tcp.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772156252696" />
<option name="schema">
<list>
<option value="TCPServer" />
<option value="ConnectionPool" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/../gf-common/tcp/tcpConfig.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772154391708" />
<option name="schema">
<list>
<option value="TcpPoolConfig" />
<option value="TcpConnection" />
<option value="TcpMessage" />
<option value="ConnectionPoolBadger" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/../gf-common/tcp/tcp_badger.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772154435736" />
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/config/fun.go">
<value>
<ScannedPath>
@ -456,6 +271,30 @@
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/pool/common.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772181310902" />
<option name="schema">
<list>
<option value="ConnectionInfo" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/pool/sqlite.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772181310902" />
<option name="schema">
<list>
<option value="SQLitePool" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/response/code.go">
<value>
<ScannedPath>
@ -481,14 +320,14 @@
<entry key="file://$PROJECT_DIR$/tcp/example.go">
<value>
<ScannedPath>
<option name="lastModified" value="1770025697304" />
<option name="lastModified" value="1772159997490" />
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/tcp/tcp.go">
<value>
<ScannedPath>
<option name="lastModified" value="1770025697304" />
<option name="lastModified" value="1772413469943" />
<option name="schema">
<list>
<option value="TCPServer" />
@ -501,7 +340,7 @@
<entry key="file://$PROJECT_DIR$/tcp/tcpConfig.go">
<value>
<ScannedPath>
<option name="lastModified" value="1770025697304" />
<option name="lastModified" value="1772186068135" />
<option name="schema">
<list>
<option value="TcpPoolConfig" />
@ -548,7 +387,7 @@
<entry key="file://$PROJECT_DIR$/ws/example.go">
<value>
<ScannedPath>
<option name="lastModified" value="1770025697531" />
<option name="lastModified" value="1772181056448" />
</ScannedPath>
</value>
</entry>
@ -567,54 +406,35 @@
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/../yingji/socket-server/internal/service/tcp.go">
<value>
<ScannedPath>
<option name="lastModified" value="1772157204272" />
<option name="schema">
<list>
<option value="sTcp" />
</list>
</option>
</ScannedPath>
</value>
</entry>
</map>
</option>
<option name="tableStructMapping">
<map>
<entry key="api" value="Api" />
<entry key="api_file" value="ApiFile" />
<entry key="badger_db" value="BadgerDB" />
<entry key="badger_pool" value="BadgerPool" />
<entry key="base_config" value="BaseConfig" />
<entry key="client" value="Client" />
<entry key="config" value="Config" />
<entry key="connection" value="Connection" />
<entry key="connection_info" value="ConnectionInfo" />
<entry key="connection_meta" value="ConnectionMeta" />
<entry key="connection_pool" value="ConnectionPool" />
<entry key="connection_pool_badger" value="ConnectionPoolBadger" />
<entry key="connection_store" value="ConnectionStore" />
<entry key="curd" value="Curd" />
<entry key="data_base_config" value="DataBaseConfig" />
<entry key="jwt_claims" value="JWTClaims" />
<entry key="jwt_config" value="JwtConfig" />
<entry key="manager" value="Manager" />
<entry key="manager_badger" value="ManagerBadger" />
<entry key="msg" value="Msg" />
<entry key="offline_message" value="OfflineMessage" />
<entry key="paginate" value="Paginate" />
<entry key="res_file" value="resFile" />
<entry key="response" value="response" />
<entry key="s_tcp" value="sTcp" />
<entry key="server_config" value="ServerConfig" />
<entry key="sq_lite_pool" value="SQLitePool" />
<entry key="tcp_connection" value="TcpConnection" />
<entry key="tcp_message" value="TcpMessage" />
<entry key="tcp_pool_config" value="TcpPoolConfig" />
<entry key="tcp_server" value="TCPServer" />
</map>
</option>
<option name="lastTimeChecked" value="1772152595532" />
<option name="lastTimeChecked" value="1772413295191" />
</component>
</project>

View File

@ -2,11 +2,12 @@ package tcp
import (
"context"
"encoding/binary"
"fmt"
"sync"
"time"
"git.magicany.cc/black1552/gf-common/pool"
"git.magicany.cc/black1552/gin-base/pool"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gtcp"
"github.com/gogf/gf/v2/os/glog"
@ -181,6 +182,8 @@ func (s *TCPServer) receiveMessages(conn *TcpConnection) {
now := time.Now()
conn.Mutex.Lock()
conn.LastUsed = now
// 将读取的数据添加到连接的缓冲区
conn.buffer = append(conn.buffer, buffer[:n]...)
conn.Mutex.Unlock()
// 更新SQLite中的连接信息
@ -193,10 +196,41 @@ func (s *TCPServer) receiveMessages(conn *TcpConnection) {
}
}
// 处理消息
data := make([]byte, n)
copy(data, buffer[:n])
// 解析消息帧
s.parseMessageFrames(conn)
}
}
}
}
// parseMessageFrames 解析消息帧
func (s *TCPServer) parseMessageFrames(conn *TcpConnection) {
conn.Mutex.Lock()
defer conn.Mutex.Unlock()
for {
// 检查缓冲区是否有足够的数据来读取长度前缀
if len(conn.buffer) < messageLengthPrefixSize {
// 数据不足,等待下一次读取
return
}
// 读取长度前缀
length := binary.BigEndian.Uint32(conn.buffer[:messageLengthPrefixSize])
// 检查缓冲区是否有足够的数据来读取完整的消息
if len(conn.buffer) < messageLengthPrefixSize+int(length) {
// 数据不足,等待下一次读取
return
}
// 提取消息数据
data := conn.buffer[messageLengthPrefixSize : messageLengthPrefixSize+int(length)]
// 移除已处理的消息数据
conn.buffer = conn.buffer[messageLengthPrefixSize+int(length):]
// 创建消息对象
msg := &TcpMessage{
Id: fmt.Sprintf("msg_%d", gtime.TimestampNano()),
ConnId: conn.Id,
@ -216,8 +250,6 @@ func (s *TCPServer) receiveMessages(conn *TcpConnection) {
s.Logger.Error(ctx, fmt.Sprintf("Message handling error: %v", err))
})
}
}
}
}
// SendTo 发送消息到指定连接
@ -249,8 +281,15 @@ func (s *TCPServer) sendMessage(conn *TcpConnection, data []byte) error {
// 设置写入超时
conn.Server.SetWriteDeadline(time.Now().Add(s.Config.WriteTimeout))
// 创建消息帧4字节长度前缀 + 消息数据
frame := make([]byte, messageLengthPrefixSize+len(data))
// 写入长度前缀(大端序)
binary.BigEndian.PutUint32(frame[:messageLengthPrefixSize], uint32(len(data)))
// 写入消息数据
copy(frame[messageLengthPrefixSize:], data)
// 发送数据
_, err := conn.Server.Write(data)
_, err := conn.Server.Write(frame)
if err != nil {
return err
}

View File

@ -7,6 +7,11 @@ import (
"github.com/gogf/gf/v2/net/gtcp"
)
// 消息帧格式4字节长度前缀 + 消息数据
const (
messageLengthPrefixSize = 4 // 消息长度前缀大小4字节
)
// TcpPoolConfig TCP连接池配置
type TcpPoolConfig struct {
BufferSize int `json:"bufferSize"` // 缓冲区大小
@ -26,6 +31,7 @@ type TcpConnection struct {
LastUsed time.Time `json:"lastUsed"` // 最后使用时间
CreatedAt time.Time `json:"createdAt"` // 创建时间
Mutex sync.RWMutex `json:"-"` // 读写锁
buffer []byte `json:"-"` // 用于存储未处理的字节数据
}
// TcpMessage TCP消息结构