/* * 版权所有 (c) 上海元泓软件科技有限公司 2022. * 严禁通过任何媒介未经授权复制本文件. * * 作者:mic * Email:funui@outlook.com */ package wsgnet import ( "fmt" "git.yhrjkj.com/YuanHong/YHEchoPackage/YHProto" "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" "github.com/panjf2000/gnet/v2" goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine" "github.com/zeromicro/go-zero/core/logx" "sync" "sync/atomic" "time" ) type GnetServer struct { gnet.BuiltinEventEngine workerPool *goPool.Pool //链接的对象汇总 //connectedSockets sync.Map //心跳 tick time.Duration eng gnet.Engine network string addr string connected int32 maxConnected int32 handlers sync.Map } func (s *GnetServer) addRoutes(r featuredRoutes) { for _, route := range r.routes { s.handlers.Store(route.Command, route.Handler) } } func (s *GnetServer) OnBoot(eng gnet.Engine) (action gnet.Action) { logx.Infof("服务启动成功 %s ", fmt.Sprintf("%s://%s", s.network, s.addr)) s.eng = eng return } func (s *GnetServer) OnShutdown(eng gnet.Engine) { //关闭工作池 logx.Infof("关闭服务 %s ", fmt.Sprintf("%s://%s", s.network, s.addr)) s.workerPool.Release() } func (s *GnetServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { c.SetContext(new(wsCodec)) connected := atomic.AddInt32(&s.connected, 1) maxConnected := atomic.LoadInt32(&s.maxConnected) if connected > maxConnected { atomic.SwapInt32(&s.maxConnected, connected) } logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected)) return } func (s *GnetServer) OnClose(c gnet.Conn, err error) (action gnet.Action) { if err != nil { logx.Infof("error occurred on connection=%s, %v\n", c.RemoteAddr().String(), err) } connected := atomic.AddInt32(&s.connected, -1) logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected)) return } func (s *GnetServer) OnTraffic(c gnet.Conn) (action gnet.Action) { wsc := c.Context().(*wsCodec) if wsc.readBufferBytes(c) == gnet.Close { return gnet.Close } ok, action := wsc.upgrade(c) if !ok { return action } if wsc.buf.Len() <= 0 { return gnet.None } messages, err := wsc.Decode(c) if err != nil { return gnet.Close } if messages == nil { return gnet.None } var ( pkg YHProto.EchoPackage ) for _, message := range messages { switch message.OpCode { case ws.OpContinuation: case ws.OpText: logx.Infof("conn[%v] receive [op=%v] [msg=%v, len=%d]", c.RemoteAddr().String(), message.OpCode, string(message.Payload), len(message.Payload)) err = wsutil.WriteServerMessage(c, ws.OpText, message.Payload) if err != nil { logx.Infof("conn[%v] [err=%v]", c.RemoteAddr().String(), err.Error()) return gnet.Close } case ws.OpBinary: err = pkg.DecodeWsMessage(message) if err == YHProto.ErrNotEnoughStream { break } if err != nil { logx.Errorf("invalid packet: %v", err) return gnet.Close } handler, ok := s.handlers.Load(pkg.H.Command) if !ok { logx.Errorf("illegal command{%d}", pkg.H.Command) return gnet.Close } //处理pkg _ = s.workerPool.Submit( func() { h := handler.(HandlerFunc) h(c, &pkg) }) case ws.OpClose: return gnet.Close case ws.OpPing: case ws.OpPong: } } return } func (s *GnetServer) OnTick() (delay time.Duration, action gnet.Action) { return }