YHEchoPackage/ws/gnetserver.go

156 lines
3.4 KiB
Go

/*
* 版权所有 (c) 上海元泓软件科技有限公司 2022.
* 严禁通过任何媒介未经授权复制本文件.
*
* 作者:mic
* Email:funui@outlook.com
*/
package wsgnet
import (
"fmt"
"git.yhrjkj.com/YuanHong/YHEchoPackage/YHProto"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/panjf2000/gnet/v2"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
"github.com/zeromicro/go-zero/core/logx"
"sync"
"sync/atomic"
"time"
)
type GnetServer struct {
gnet.BuiltinEventEngine
workerPool *goPool.Pool
//链接的对象汇总
//connectedSockets sync.Map
//心跳
tick time.Duration
eng gnet.Engine
network string
addr string
connected int32
maxConnected int32
handlers sync.Map
}
func (s *GnetServer) addRoutes(r featuredRoutes) {
for _, route := range r.routes {
s.handlers.Store(route.Command, route.Handler)
}
}
func (s *GnetServer) OnBoot(eng gnet.Engine) (action gnet.Action) {
logx.Infof("服务启动成功 %s ",
fmt.Sprintf("%s://%s", s.network, s.addr))
s.eng = eng
return
}
func (s *GnetServer) OnShutdown(eng gnet.Engine) {
//关闭工作池
logx.Infof("关闭服务 %s ",
fmt.Sprintf("%s://%s", s.network, s.addr))
s.workerPool.Release()
}
func (s *GnetServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
c.SetContext(new(wsCodec))
connected := atomic.AddInt32(&s.connected, 1)
maxConnected := atomic.LoadInt32(&s.maxConnected)
if connected > maxConnected {
atomic.SwapInt32(&s.maxConnected, connected)
}
logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected))
return
}
func (s *GnetServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
if err != nil {
logx.Infof("error occurred on connection=%s, %v\n", c.RemoteAddr().String(), err)
}
connected := atomic.AddInt32(&s.connected, -1)
logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected))
return
}
func (s *GnetServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
wsc := c.Context().(*wsCodec)
if wsc.readBufferBytes(c) == gnet.Close {
return gnet.Close
}
ok, action := wsc.upgrade(c)
if !ok {
return action
}
if wsc.buf.Len() <= 0 {
return gnet.None
}
messages, err := wsc.Decode(c)
if err != nil {
return gnet.Close
}
if messages == nil {
return gnet.None
}
for _, message := range messages {
var (
pkg YHProto.EchoPackage
)
switch message.OpCode {
case ws.OpContinuation:
case ws.OpText:
logx.Infof("conn[%v] receive [op=%v] [msg=%v, len=%d]", c.RemoteAddr().String(), message.OpCode, string(message.Payload), len(message.Payload))
err = wsutil.WriteServerMessage(c, ws.OpText, message.Payload)
if err != nil {
logx.Infof("conn[%v] [err=%v]", c.RemoteAddr().String(), err.Error())
return gnet.Close
}
case ws.OpBinary:
err = pkg.DecodeWsMessage(message)
if err == YHProto.ErrNotEnoughStream {
break
}
if err != nil {
logx.Errorf("invalid packet: %v", err)
return gnet.Close
}
handler, ok := s.handlers.Load(pkg.H.Command)
if !ok {
logx.Errorf("illegal command{%d}", pkg.H.Command)
return gnet.Close
}
//处理pkg
_ = s.workerPool.Submit(
func() {
h := handler.(HandlerFunc)
h(c, &pkg)
})
case ws.OpClose:
return gnet.Close
case ws.OpPing:
case ws.OpPong:
}
}
return
}
func (s *GnetServer) OnTick() (delay time.Duration, action gnet.Action) {
return
}