155 lines
3.3 KiB
Go
155 lines
3.3 KiB
Go
/*
|
|
* 版权所有 (c) 上海元泓软件科技有限公司 2022.
|
|
* 严禁通过任何媒介未经授权复制本文件.
|
|
*
|
|
* 作者:mic
|
|
* Email:funui@outlook.com
|
|
*/
|
|
|
|
package wsgnet
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/gobwas/ws"
|
|
"github.com/gobwas/ws/wsutil"
|
|
"github.com/panjf2000/gnet/v2"
|
|
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
type GnetServer struct {
|
|
gnet.BuiltinEventEngine
|
|
workerPool *goPool.Pool
|
|
//链接的对象汇总
|
|
//connectedSockets sync.Map
|
|
//心跳
|
|
tick time.Duration
|
|
|
|
eng gnet.Engine
|
|
|
|
network string
|
|
addr string
|
|
|
|
connected int32
|
|
maxConnected int32
|
|
|
|
handlers sync.Map
|
|
}
|
|
|
|
func (s *GnetServer) addRoutes(r featuredRoutes) {
|
|
for _, route := range r.routes {
|
|
s.handlers.Store(route.Command, route.Handler)
|
|
}
|
|
}
|
|
|
|
func (s *GnetServer) OnBoot(eng gnet.Engine) (action gnet.Action) {
|
|
logx.Infof("服务启动成功 %s ",
|
|
fmt.Sprintf("%s://%s", s.network, s.addr))
|
|
s.eng = eng
|
|
|
|
return
|
|
}
|
|
|
|
func (s *GnetServer) OnShutdown(eng gnet.Engine) {
|
|
//关闭工作池
|
|
logx.Infof("关闭服务 %s ",
|
|
fmt.Sprintf("%s://%s", s.network, s.addr))
|
|
s.workerPool.Release()
|
|
}
|
|
|
|
func (s *GnetServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
|
|
c.SetContext(new(wsCodec))
|
|
connected := atomic.AddInt32(&s.connected, 1)
|
|
maxConnected := atomic.LoadInt32(&s.maxConnected)
|
|
if connected > maxConnected {
|
|
atomic.SwapInt32(&s.maxConnected, connected)
|
|
}
|
|
logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected))
|
|
|
|
return
|
|
}
|
|
|
|
func (s *GnetServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
|
|
if err != nil {
|
|
logx.Infof("error occurred on connection=%s, %v\n", c.RemoteAddr().String(), err)
|
|
}
|
|
connected := atomic.AddInt32(&s.connected, -1)
|
|
logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected))
|
|
return
|
|
}
|
|
|
|
func (s *GnetServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
|
|
wsc := c.Context().(*wsCodec)
|
|
if wsc.readBufferBytes(c) == gnet.Close {
|
|
return gnet.Close
|
|
}
|
|
ok, action := wsc.upgrade(c)
|
|
if !ok {
|
|
return action
|
|
}
|
|
|
|
if wsc.buf.Len() <= 0 {
|
|
return gnet.None
|
|
}
|
|
messages, err := wsc.Decode(c)
|
|
if err != nil {
|
|
return gnet.Close
|
|
}
|
|
|
|
if messages == nil {
|
|
return gnet.None
|
|
}
|
|
var (
|
|
pkg YHProto.EchoPackage
|
|
)
|
|
|
|
for _, message := range messages {
|
|
switch message.OpCode {
|
|
case ws.OpContinuation:
|
|
case ws.OpText:
|
|
logx.Infof("conn[%v] receive [op=%v] [msg=%v, len=%d]", c.RemoteAddr().String(), message.OpCode, string(message.Payload), len(message.Payload))
|
|
err = wsutil.WriteServerMessage(c, ws.OpText, message.Payload)
|
|
if err != nil {
|
|
logx.Infof("conn[%v] [err=%v]", c.RemoteAddr().String(), err.Error())
|
|
return gnet.Close
|
|
}
|
|
case ws.OpBinary:
|
|
err = pkg.DecodeWsMessage(message)
|
|
if err == YHProto.ErrNotEnoughStream {
|
|
break
|
|
}
|
|
if err != nil {
|
|
logx.Errorf("invalid packet: %v", err)
|
|
return gnet.Close
|
|
}
|
|
|
|
handler, ok := s.handlers.Load(pkg.H.Command)
|
|
if !ok {
|
|
logx.Errorf("illegal command{%d}", pkg.H.Command)
|
|
return gnet.Close
|
|
}
|
|
//处理pkg
|
|
_ = s.workerPool.Submit(
|
|
func() {
|
|
h := handler.(HandlerFunc)
|
|
h(c, &pkg)
|
|
})
|
|
case ws.OpClose:
|
|
return gnet.Close
|
|
case ws.OpPing:
|
|
case ws.OpPong:
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s *GnetServer) OnTick() (delay time.Duration, action gnet.Action) {
|
|
|
|
return
|
|
}
|