From 5e22f2c2c0987c1616138c562321b2e65f5939f0 Mon Sep 17 00:00:00 2001 From: black1552 Date: Fri, 27 Feb 2026 09:47:23 +0800 Subject: [PATCH] =?UTF-8?q?feat(pool):=20=E6=B7=BB=E5=8A=A0=E5=9F=BA?= =?UTF-8?q?=E4=BA=8EBadgerDB=E7=9A=84=E8=BF=9E=E6=8E=A5=E6=B1=A0=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增BadgerPool结构体,支持WebSocket和TCP连接类型 - 实现连接的增删改查功能,包括内存缓存机制提升性能 - 添加按类型查询连接、统计连接数量等辅助方法 - 实现清理非活跃连接的功能,支持定期维护 - 更新示例代码以处理初始化错误并改进错误处理 - 添加BadgerDB依赖及其相关间接依赖包 --- go.mod | 11 ++ go.sum | 59 +++++++- pool/badger.go | 331 +++++++++++++++++++++++++++++++++++++++++ server/ws/example.go | 5 +- server/ws/websocket.go | 81 +++++++++- tcp/example.go | 6 +- tcp/tcp.go | 82 +++++++++- 7 files changed, 565 insertions(+), 10 deletions(-) create mode 100644 pool/badger.go diff --git a/go.mod b/go.mod index b536415..d743d8a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module git.magicany.cc/black1552/gf-common go 1.24.3 require ( + github.com/dgraph-io/badger/v4 v4.2.0 github.com/duke-git/lancet/v2 v2.3.7 github.com/eclipse/paho.mqtt.golang v1.5.1 github.com/gogf/gf/contrib/drivers/mysql/v2 v2.9.3 @@ -21,9 +22,12 @@ require ( require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/BurntSushi/toml v1.5.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/clbanning/mxj/v2 v2.7.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect + github.com/dgraph-io/ristretto v0.1.1 // indirect + github.com/dustin/go-humanize v1.0.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/fatih/color v1.18.0 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect @@ -32,11 +36,16 @@ require ( github.com/go-sql-driver/mysql v1.9.2 // indirect github.com/gogf/gf/contrib/registry/file/v2 v2.9.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/glog v1.2.5 // indirect + github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v0.0.3 // indirect + github.com/google/flatbuffers v1.12.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grokify/html-strip-tags-go v0.1.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect + github.com/klauspost/compress v1.12.3 // indirect github.com/magiconair/properties v1.8.10 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -44,6 +53,7 @@ require ( github.com/olekukonko/errors v1.1.0 // indirect github.com/olekukonko/ll v0.0.9 // indirect github.com/olekukonko/tablewriter v1.0.9 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/richardlehane/mscfb v1.0.4 // indirect github.com/richardlehane/msoleps v1.0.4 // indirect github.com/rivo/uniseg v0.4.7 // indirect @@ -53,6 +63,7 @@ require ( go.etcd.io/etcd/api/v3 v3.5.17 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.17 // indirect go.etcd.io/etcd/client/v3 v3.5.17 // indirect + go.opencensus.io v0.22.5 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel v1.37.0 // indirect go.opentelemetry.io/otel/metric v1.37.0 // indirect diff --git a/go.sum b/go.sum index 62cf618..3fa7833 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,17 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/clbanning/mxj/v2 v2.7.0 h1:WA/La7UGCanFe5NpHF0Q3DNtnCsVoxbPKuyBNHWRyME= github.com/clbanning/mxj/v2 v2.7.0/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= @@ -13,8 +19,16 @@ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs= +github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak= +github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= +github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/duke-git/lancet/v2 v2.3.7 h1:nnNBA9KyoqwbPm4nFmEFVIbXeAmpqf6IDCH45+HHHNs= github.com/duke-git/lancet/v2 v2.3.7/go.mod h1:zGa2R4xswg6EG9I6WnyubDbFO/+A/RROxIbXcwryTsc= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= @@ -43,8 +57,21 @@ github.com/gogf/gf/v2 v2.9.3 h1:qjN4s55FfUzxZ1AE8vUHNDX3V0eIOUGXhF2DjRTVZQ4= github.com/gogf/gf/v2 v2.9.3/go.mod h1:w6rcfD13SmO7FKI80k9LSLiSMGqpMYp50Nfkrrc2sEE= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.2.5 h1:DrW6hGnjIhtvhOIiAKT6Psh/Kd/ldepEa81DKeiRJ5I= +github.com/golang/glog v1.2.5/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= +github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -59,6 +86,8 @@ github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= +github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -82,8 +111,9 @@ github.com/olekukonko/ll v0.0.9 h1:Y+1YqDfVkqMWuEQMclsF9HUR5+a82+dxJuL1HHSRpxI= github.com/olekukonko/ll v0.0.9/go.mod h1:En+sEW0JNETl26+K8eZ6/W4UQ7CYSrrgg/EdIYT2H8g= github.com/olekukonko/tablewriter v1.0.9 h1:XGwRsYLC2bY7bNd93Dk51bcPZksWZmLYuaTHR0FqfL8= github.com/olekukonko/tablewriter v1.0.9/go.mod h1:5c+EBPeSqvXnLLgkm9isDdzR3wjfBkHR9Nhfp3NWrzo= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/richardlehane/mscfb v1.0.4 h1:WULscsljNPConisD5hR0+OyZjwK46Pfyr6mPu5ZawpM= @@ -98,6 +128,7 @@ github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0t github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= @@ -118,6 +149,8 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.17 h1:XxnDXAWq2pnxqx76ljWwiQ9jylbpC4rvkAeRVOU go.etcd.io/etcd/client/pkg/v3 v3.5.17/go.mod h1:4DqK1TKacp/86nJk4FLQqo6Mn2vvQFBmruW3pP14H/w= go.etcd.io/etcd/client/v3 v3.5.17 h1:o48sINNeWz5+pjy/Z0+HKpj/xSnBkuVhVvXkjEXbqZY= go.etcd.io/etcd/client/v3 v3.5.17/go.mod h1:j2d4eXTHWkT2ClBgnnEPm/Wuu7jsqku41v9DZ3OtjQo= +go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= +go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= @@ -144,14 +177,21 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20221208152030-732eee02a75a h1:4iLhBPcpqFmylhnkbY3W0ONLUYYkDAW9xMFLfxgsvCw= golang.org/x/exp v0.0.0-20221208152030-732eee02a75a/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/image v0.25.0 h1:Y6uW6rH1y5y/LK1J8BPWZtr6yZ7hrsy6hFrXjgsc2fQ= golang.org/x/image v0.25.0/go.mod h1:tCAmOEGthTtkalusGp1g3xa2gke8J6c2N565dTyl9Rs= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -160,18 +200,25 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= @@ -181,6 +228,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -192,10 +241,16 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b h1:ULiyYQ0FdsJhwwZUwbaXpZF5yUE3h+RA+gxvBu37ucc= google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:oDOGiMSXHL4sDTJvFvIB9nRQCGdLP1o/iVaqQK8zB+M= google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b h1:zPKJod4w6F1+nRGDI9ubnXYhU9NSWoFAijkHkUXeTK8= google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= @@ -206,6 +261,7 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= @@ -217,3 +273,4 @@ gorm.io/driver/mysql v1.6.0 h1:eNbLmNTpPpTOVZi8MMxCi2aaIm0ZpInbORNXDwyLGvg= gorm.io/driver/mysql v1.6.0/go.mod h1:D/oCC2GWK3M/dqoLxnOlaNKmXz8WNTfcS9y5ovaSqKo= gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg= gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pool/badger.go b/pool/badger.go new file mode 100644 index 0000000..fe5ebee --- /dev/null +++ b/pool/badger.go @@ -0,0 +1,331 @@ +package pool + +import ( + "context" + "encoding/json" + "fmt" + "path/filepath" + "sync" + "time" + + "github.com/dgraph-io/badger/v4" + "github.com/gogf/gf/v2/os/gfile" +) + +// ConnType 连接类型 +type ConnType string + +const ( + ConnTypeWebSocket ConnType = "websocket" + ConnTypeTCP ConnType = "tcp" +) + +// ConnectionInfo 连接信息 +type ConnectionInfo struct { + ID string `json:"id"` + Type ConnType `json:"type"` + Address string `json:"address"` + IsActive bool `json:"isActive"` + LastUsed time.Time `json:"lastUsed"` + CreatedAt time.Time `json:"createdAt"` + // 额外的连接数据,根据不同类型存储不同的信息 + Data map[string]interface{} `json:"data"` +} + +// BadgerPool BadgerDB连接池 +type BadgerPool struct { + db *badger.DB + mutex sync.RWMutex + ctx context.Context + cancel context.CancelFunc + // 内存缓存,提高并发性能 + cache map[string]*ConnectionInfo +} + +// NewBadgerPool 创建BadgerDB连接池 +func NewBadgerPool() (*BadgerPool, error) { + ctx, cancel := context.WithCancel(context.Background()) + + db, err := badger.Open(badger.DefaultOptions(filepath.Join(gfile.Pwd(), "badger"))) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to open badger db: %w", err) + } + + return &BadgerPool{ + db: db, + ctx: ctx, + cancel: cancel, + cache: make(map[string]*ConnectionInfo), + }, nil +} + +// Close 关闭连接池 +func (p *BadgerPool) Close() error { + p.cancel() + return p.db.Close() +} + +// Add 添加连接 +func (p *BadgerPool) Add(conn *ConnectionInfo) error { + p.mutex.Lock() + defer p.mutex.Unlock() + + // 序列化连接信息 + data, err := json.Marshal(conn) + if err != nil { + return fmt.Errorf("failed to marshal connection info: %w", err) + } + + // 存储到BadgerDB + err = p.db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(conn.ID), data) + }) + if err != nil { + return fmt.Errorf("failed to store connection: %w", err) + } + + // 更新内存缓存 + p.cache[conn.ID] = conn + return nil +} + +// Get 获取连接 +func (p *BadgerPool) Get(connID string) (*ConnectionInfo, error) { + p.mutex.RLock() + // 先从内存缓存获取 + if conn, ok := p.cache[connID]; ok { + p.mutex.RUnlock() + return conn, nil + } + p.mutex.RUnlock() + + // 从BadgerDB获取 + var connInfo ConnectionInfo + err := p.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(connID)) + if err != nil { + return err + } + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &connInfo) + }) + }) + if err != nil { + if err == badger.ErrKeyNotFound { + return nil, nil + } + return nil, fmt.Errorf("failed to get connection: %w", err) + } + + // 更新内存缓存 + p.mutex.Lock() + p.cache[connID] = &connInfo + p.mutex.Unlock() + + return &connInfo, nil +} + +// Remove 移除连接 +func (p *BadgerPool) Remove(connID string) error { + p.mutex.Lock() + defer p.mutex.Unlock() + + // 从BadgerDB删除 + err := p.db.Update(func(txn *badger.Txn) error { + return txn.Delete([]byte(connID)) + }) + if err != nil { + return fmt.Errorf("failed to remove connection: %w", err) + } + + // 从内存缓存删除 + delete(p.cache, connID) + return nil +} + +// Update 更新连接信息 +func (p *BadgerPool) Update(conn *ConnectionInfo) error { + p.mutex.Lock() + defer p.mutex.Unlock() + + // 序列化连接信息 + data, err := json.Marshal(conn) + if err != nil { + return fmt.Errorf("failed to marshal connection info: %w", err) + } + + // 存储到BadgerDB + err = p.db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(conn.ID), data) + }) + if err != nil { + return fmt.Errorf("failed to update connection: %w", err) + } + + // 更新内存缓存 + p.cache[conn.ID] = conn + return nil +} + +// GetAll 获取所有连接 +func (p *BadgerPool) GetAll() ([]*ConnectionInfo, error) { + p.mutex.RLock() + // 如果内存缓存不为空,直接返回缓存 + if len(p.cache) > 0 { + conns := make([]*ConnectionInfo, 0, len(p.cache)) + for _, conn := range p.cache { + conns = append(conns, conn) + } + p.mutex.RUnlock() + return conns, nil + } + p.mutex.RUnlock() + + // 从BadgerDB获取所有连接 + var conns []*ConnectionInfo + err := p.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchSize = 10 + it := txn.NewIterator(opts) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + var connInfo ConnectionInfo + err := item.Value(func(val []byte) error { + return json.Unmarshal(val, &connInfo) + }) + if err != nil { + return err + } + conns = append(conns, &connInfo) + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("failed to get all connections: %w", err) + } + + // 更新内存缓存 + p.mutex.Lock() + for _, conn := range conns { + p.cache[conn.ID] = conn + } + p.mutex.Unlock() + + return conns, nil +} + +// GetByType 根据类型获取连接 +func (p *BadgerPool) GetByType(connType ConnType) ([]*ConnectionInfo, error) { + allConns, err := p.GetAll() + if err != nil { + return nil, err + } + + var filtered []*ConnectionInfo + for _, conn := range allConns { + if conn.Type == connType { + filtered = append(filtered, conn) + } + } + + return filtered, nil +} + +// Count 获取连接数量 +func (p *BadgerPool) Count() (int, error) { + p.mutex.RLock() + // 如果内存缓存不为空,直接返回缓存大小 + if len(p.cache) > 0 { + count := len(p.cache) + p.mutex.RUnlock() + return count, nil + } + p.mutex.RUnlock() + + // 从BadgerDB统计数量 + var count int + err := p.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchSize = 10 + it := txn.NewIterator(opts) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + count++ + } + return nil + }) + if err != nil { + return 0, fmt.Errorf("failed to count connections: %w", err) + } + + return count, nil +} + +// GetAllConnIDs 获取所有在线连接的ID列表 +func (p *BadgerPool) GetAllConnIDs() ([]string, error) { + p.mutex.RLock() + // 如果内存缓存不为空,从缓存中提取在线连接的ID + if len(p.cache) > 0 { + ids := make([]string, 0, len(p.cache)) + for id, conn := range p.cache { + if conn.IsActive { + ids = append(ids, id) + } + } + p.mutex.RUnlock() + return ids, nil + } + p.mutex.RUnlock() + + // 从BadgerDB获取所有在线连接的ID + var ids []string + err := p.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchSize = 10 + it := txn.NewIterator(opts) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + var connInfo ConnectionInfo + err := item.Value(func(val []byte) error { + return json.Unmarshal(val, &connInfo) + }) + if err != nil { + return err + } + if connInfo.IsActive { + ids = append(ids, string(item.Key())) + } + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("failed to get all connection IDs: %w", err) + } + + return ids, nil +} + +// CleanupInactive 清理不活跃的连接 +func (p *BadgerPool) CleanupInactive(duration time.Duration) error { + allConns, err := p.GetAll() + if err != nil { + return err + } + + now := time.Now() + for _, conn := range allConns { + if !conn.IsActive || now.Sub(conn.LastUsed) > duration { + if err := p.Remove(conn.ID); err != nil { + return err + } + } + } + + return nil +} diff --git a/server/ws/example.go b/server/ws/example.go index 1420de9..1831f55 100644 --- a/server/ws/example.go +++ b/server/ws/example.go @@ -19,7 +19,10 @@ func NewWs() *Manager { } // 2. 创建管理器 - m := NewManager(customConfig) + m, err := NewManager(customConfig) + if err != nil { + log.Fatalf("Failed to create manager: %v", err) + } // 3. 覆盖业务回调(核心:自定义消息处理逻辑) // 连接建立回调 diff --git a/server/ws/websocket.go b/server/ws/websocket.go index e3d9b2f..9dcb220 100644 --- a/server/ws/websocket.go +++ b/server/ws/websocket.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "git.magicany.cc/black1552/gf-common/pool" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gtime" @@ -92,7 +93,8 @@ type Connection struct { type Manager struct { config *Config // 配置 upgrader *websocket.Upgrader // HTTP升级器 - connections map[string]*Connection // 所有在线连接(connID -> Connection) + connections map[string]*Connection // 内存中的连接(connID -> Connection) + badgerPool *pool.BadgerPool // BadgerDB连接池 mutex sync.RWMutex // 读写锁(保护connections) // 业务回调:收到消息时触发(用户自定义处理逻辑) OnMessage func(connID string, msgType int, data any) @@ -148,7 +150,7 @@ func (c *Config) Merge(other *Config) *Config { } // NewManager 创建连接管理器 -func NewManager(config *Config) *Manager { +func NewManager(config *Config) (*Manager, error) { defaultConfig := DefaultConfig() finalConfig := defaultConfig.Merge(config) // 初始化升级器 @@ -170,10 +172,17 @@ func NewManager(config *Config) *Manager { }, } + // 初始化BadgerDB连接池 + badgerPool, err := pool.NewBadgerPool() + if err != nil { + return nil, fmt.Errorf("failed to create badger pool: %w", err) + } + return &Manager{ config: finalConfig, upgrader: upgrader, connections: make(map[string]*Connection), + badgerPool: badgerPool, mutex: sync.RWMutex{}, // 默认回调(用户可覆盖) OnMessage: func(connID string, msgType int, data any) { @@ -185,7 +194,7 @@ func NewManager(config *Config) *Manager { OnDisconnect: func(connID string, err error) { log.Printf("[默认回调] 连接[%s]已关闭:%v", connID, err) }, - } + }, nil } // Upgrade HTTP升级为WebSocket连接 @@ -237,6 +246,24 @@ func (m *Manager) Upgrade(w http.ResponseWriter, r *http.Request, connID string) m.connections[connID] = wsConn m.mutex.Unlock() + // 存储到BadgerDB + connInfo := &pool.ConnectionInfo{ + ID: connID, + Type: pool.ConnTypeWebSocket, + Address: r.RemoteAddr, + IsActive: true, + LastUsed: time.Now(), + CreatedAt: time.Now(), + Data: map[string]interface{}{ + "origin": r.Header.Get("Origin"), + "userAgent": r.Header.Get("User-Agent"), + }, + } + if err := m.badgerPool.Add(connInfo); err != nil { + log.Printf("[错误] 存储连接到BadgerDB失败:%v", err) + // 不影响连接建立,仅记录错误 + } + // 触发连接建立回调 m.OnConnect(connID) @@ -282,6 +309,18 @@ func (c *Connection) ReadPump() { return } + // 更新最后使用时间 + now := time.Now() + // 从BadgerDB获取连接信息并更新 + connInfo, err := c.manager.badgerPool.Get(c.connID) + if err == nil && connInfo != nil { + connInfo.LastUsed = now + if err := c.manager.badgerPool.Update(connInfo); err != nil { + log.Printf("[错误] 更新BadgerDB连接信息失败:%v", err) + // 不影响消息处理,仅记录错误 + } + } + // 尝试解析JSON格式的心跳消息(精准判断,替代包含判断) isHeartbeat := false // 先尝试解析为JSON对象 @@ -369,6 +408,19 @@ func (c *Connection) Send(data []byte) error { if err != nil { return fmt.Errorf("发送消息失败:%w", err) } + + // 更新最后使用时间 + now := time.Now() + // 从BadgerDB获取连接信息并更新 + connInfo, err := c.manager.badgerPool.Get(c.connID) + if err == nil && connInfo != nil { + connInfo.LastUsed = now + if err := c.manager.badgerPool.Update(connInfo); err != nil { + log.Printf("[错误] 更新BadgerDB连接信息失败:%v", err) + // 不影响消息发送,仅记录错误 + } + } + return nil } } @@ -394,6 +446,12 @@ func (c *Connection) Close(err error) { delete(c.manager.connections, c.connID) c.manager.mutex.Unlock() + // 从BadgerDB移除 + if err := c.manager.badgerPool.Remove(c.connID); err != nil { + log.Printf("[错误] 从BadgerDB移除连接失败:%v", err) + // 不影响连接关闭,仅记录错误 + } + // 触发断开回调 c.manager.OnDisconnect(c.connID, err) @@ -462,12 +520,18 @@ func (m *Manager) GetAllConn() map[string]*Connection { return connCopy } +// GetConn 获取指定连接 func (m *Manager) GetConn(connID string) *Connection { m.mutex.RLock() defer m.mutex.RUnlock() return m.connections[connID] } +// GetAllConnIDs 获取所有在线连接的ID列表 +func (m *Manager) GetAllConnIDs() ([]string, error) { + return m.badgerPool.GetAllConnIDs() +} + // CloseAll 关闭所有连接 func (m *Manager) CloseAll() { m.mutex.RLock() @@ -486,3 +550,14 @@ func (m *Manager) CloseAll() { } } } + +// Close 关闭管理器,清理资源 +func (m *Manager) Close() error { + // 关闭所有连接 + m.CloseAll() + // 关闭BadgerDB连接池 + if m.badgerPool != nil { + return m.badgerPool.Close() + } + return nil +} diff --git a/tcp/example.go b/tcp/example.go index cd54038..7101501 100644 --- a/tcp/example.go +++ b/tcp/example.go @@ -18,7 +18,11 @@ func Example() { } // 创建TCP服务器 - server := NewTCPServer("0.0.0.0:8888", config) + server, err := NewTCPServer("0.0.0.0:8888", config) + if err != nil { + fmt.Printf("Failed to create server: %v\n", err) + return + } // 设置消息处理函数 server.SetMessageHandler(func(conn *TcpConnection, msg *TcpMessage) error { diff --git a/tcp/tcp.go b/tcp/tcp.go index 73b5bd3..c248909 100644 --- a/tcp/tcp.go +++ b/tcp/tcp.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "git.magicany.cc/black1552/gf-common/pool" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/gtcp" "github.com/gogf/gf/v2/os/glog" @@ -32,18 +33,26 @@ type TCPServer struct { // ConnectionPool 连接池结构 type ConnectionPool struct { connections map[string]*TcpConnection + badgerPool *pool.BadgerPool mutex sync.RWMutex config *TcpPoolConfig logger *glog.Logger } // NewTCPServer 创建一个新的TCP服务器 -func NewTCPServer(address string, config *TcpPoolConfig) *TCPServer { +func NewTCPServer(address string, config *TcpPoolConfig) (*TCPServer, error) { logger := g.Log(address) ctx, cancel := context.WithCancel(context.Background()) + // 初始化BadgerDB连接池 + badgerPool, err := pool.NewBadgerPool() + if err != nil { + return nil, fmt.Errorf("failed to create badger pool: %w", err) + } + pool := &ConnectionPool{ connections: make(map[string]*TcpConnection), + badgerPool: badgerPool, config: config, logger: logger, } @@ -58,7 +67,7 @@ func NewTCPServer(address string, config *TcpPoolConfig) *TCPServer { } server.Listener = gtcp.NewServer(address, server.handleConnection) - return server + return server, nil } // SetMessageHandler 设置消息处理函数 @@ -86,6 +95,11 @@ func (s *TCPServer) Stop() error { s.Listener.Close() s.wg.Wait() s.Connection.Clear() + // 关闭BadgerDB连接池 + if err := s.Connection.badgerPool.Close(); err != nil { + s.Logger.Error(s.ctx, fmt.Sprintf("Failed to close BadgerDB pool: %v", err)) + // 不影响服务器停止,仅记录错误 + } s.Logger.Info(s.ctx, "TCP server stopped") return nil } @@ -109,6 +123,23 @@ func (s *TCPServer) handleConnection(conn *gtcp.Conn) { s.Connection.Add(tcpConn) s.Logger.Info(s.ctx, fmt.Sprintf("New connection established: %s", connID)) + // 存储到BadgerDB + connInfo := &pool.ConnectionInfo{ + ID: connID, + Type: pool.ConnTypeTCP, + Address: conn.RemoteAddr().String(), + IsActive: true, + LastUsed: time.Now(), + CreatedAt: time.Now(), + Data: map[string]interface{}{ + "localAddress": conn.LocalAddr().String(), + }, + } + if err := s.Connection.badgerPool.Add(connInfo); err != nil { + s.Logger.Error(s.ctx, fmt.Sprintf("Failed to store connection to BadgerDB: %v", err)) + // 不影响连接建立,仅记录错误 + } + // 启动消息接收协程 go s.receiveMessages(tcpConn) } @@ -121,6 +152,11 @@ func (s *TCPServer) receiveMessages(conn *TcpConnection) { } s.Connection.Remove(conn.Id) conn.Server.Close() + // 从BadgerDB移除 + if err := s.Connection.badgerPool.Remove(conn.Id); err != nil { + s.Logger.Error(s.ctx, fmt.Sprintf("Failed to remove connection from BadgerDB: %v", err)) + // 不影响连接关闭,仅记录错误 + } s.Logger.Info(s.ctx, fmt.Sprintf("Connection closed: %s", conn.Id)) }() @@ -142,10 +178,21 @@ func (s *TCPServer) receiveMessages(conn *TcpConnection) { if n > 0 { // 更新最后使用时间 + now := time.Now() conn.Mutex.Lock() - conn.LastUsed = time.Now() + conn.LastUsed = now conn.Mutex.Unlock() + // 更新BadgerDB中的连接信息 + connInfo, err := s.Connection.badgerPool.Get(conn.Id) + if err == nil && connInfo != nil { + connInfo.LastUsed = now + if err := s.Connection.badgerPool.Update(connInfo); err != nil { + s.Logger.Error(s.ctx, fmt.Sprintf("Failed to update connection in BadgerDB: %v", err)) + // 不影响消息处理,仅记录错误 + } + } + // 处理消息 data := make([]byte, n) copy(data, buffer[:n]) @@ -209,7 +256,19 @@ func (s *TCPServer) sendMessage(conn *TcpConnection, data []byte) error { } // 更新最后使用时间 - conn.LastUsed = time.Now() + now := time.Now() + conn.LastUsed = now + + // 更新BadgerDB中的连接信息 + connInfo, err := s.Connection.badgerPool.Get(conn.Id) + if err == nil && connInfo != nil { + connInfo.LastUsed = now + if err := s.Connection.badgerPool.Update(connInfo); err != nil { + s.Logger.Error(s.ctx, fmt.Sprintf("Failed to update connection in BadgerDB: %v", err)) + // 不影响消息发送,仅记录错误 + } + } + return nil } @@ -224,11 +283,21 @@ func (s *TCPServer) Kick(connID string) error { conn.Server.Close() // 从连接池移除 s.Connection.Remove(connID) + // 从BadgerDB移除 + if err := s.Connection.badgerPool.Remove(connID); err != nil { + s.Logger.Error(s.ctx, fmt.Sprintf("Failed to remove connection from BadgerDB: %v", err)) + // 不影响连接关闭,仅记录错误 + } s.Logger.Info(s.ctx, fmt.Sprintf("Kicked connection: %s", connID)) return nil } +// GetAllConnIDs 获取所有在线连接的ID列表 +func (s *TCPServer) GetAllConnIDs() ([]string, error) { + return s.Connection.GetAllConnIDs() +} + // Add 添加连接到连接池 func (p *ConnectionPool) Add(conn *TcpConnection) { p.mutex.Lock() @@ -278,3 +347,8 @@ func (p *ConnectionPool) Count() int { defer p.mutex.RUnlock() return len(p.connections) } + +// GetAllConnIDs 获取所有在线连接的ID列表 +func (p *ConnectionPool) GetAllConnIDs() ([]string, error) { + return p.badgerPool.GetAllConnIDs() +}