/* * 版权所有 (c) 上海元泓软件科技有限公司 2022. * 严禁通过任何媒介未经授权复制本文件. * * 作者:mic * Email:funui@outlook.com */ package tcpgnet import ( "fmt" "git.yhrjkj.com/YuanHong/YHEchoPackage/YHProto" "git.yhrjkj.com/YuanHong/YHEchoPackage/define" "github.com/panjf2000/gnet/v2" goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine" "github.com/zeromicro/go-zero/core/logx" "sync" "sync/atomic" "time" ) type GnetServer struct { gnet.BuiltinEventEngine workerPool *goPool.Pool //链接的对象汇总 //connectedSockets sync.Map //心跳 tick time.Duration eng gnet.Engine network string addr string connected int32 maxConnected int32 handlers sync.Map } func (s *GnetServer) addRoutes(r featuredRoutes) { for _, route := range r.routes { if route.Command > define.SYS_NET_ENV_START { logx.Errorf("命令标识不能超过%d", define.SYS_NET_ENV_START) continue } s.handlers.Store(route.Command, route.Handler) } } func (s *GnetServer) OnBoot(eng gnet.Engine) (action gnet.Action) { logx.Infof("服务启动成功 %s ", fmt.Sprintf("%s://%s", s.network, s.addr)) s.eng = eng if handler, ok := s.handlers.Load(define.SYS_NET_ENV_BOOT); ok { _ = s.workerPool.Submit( func() { h := handler.(HandlerFunc) h(nil, nil) }) } //处理pkg return } func (s *GnetServer) OnShutdown(eng gnet.Engine) { //关闭工作池 logx.Infof("关闭服务 %s ", fmt.Sprintf("%s://%s", s.network, s.addr)) if handler, ok := s.handlers.Load(define.SYS_NET_ENV_SHUTDOWN); ok { _ = s.workerPool.Submit( func() { h := handler.(HandlerFunc) h(nil, nil) }) } s.workerPool.Release() } func (s *GnetServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { connected := atomic.AddInt32(&s.connected, 1) maxConnected := atomic.LoadInt32(&s.maxConnected) if connected > maxConnected { atomic.SwapInt32(&s.maxConnected, connected) } logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected)) if handler, ok := s.handlers.Load(define.SYS_NET_ENV_OPEN); ok { _ = s.workerPool.Submit( func() { h := handler.(HandlerFunc) h(c, nil) }) } return } func (s *GnetServer) OnClose(c gnet.Conn, err error) (action gnet.Action) { if err != nil { logx.Infof("error occurred on connection=%s, %v\n", c.RemoteAddr().String(), err) } connected := atomic.AddInt32(&s.connected, -1) logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected)) if handler, ok := s.handlers.Load(define.SYS_NET_ENV_CLOSE); ok { _ = s.workerPool.Submit( func() { h := handler.(HandlerFunc) h(c, nil) }) } return } func (s *GnetServer) OnTraffic(c gnet.Conn) (action gnet.Action) { for { var ( err error pkg YHProto.EchoPackage ) err = pkg.DecodeGnet(c) if err == YHProto.ErrNotEnoughStream { break } if err != nil { logx.Errorf("invalid packet: %v", err) return gnet.Close } handler, ok := s.handlers.Load(pkg.H.Command) if !ok { logx.Errorf("illegal command{%d}", pkg.H.Command) return gnet.Close } //处理pkg _ = s.workerPool.Submit( func() { h := handler.(HandlerFunc) h(c, &pkg) }) } return } func (s *GnetServer) OnTick() (delay time.Duration, action gnet.Action) { return }