From ae8c4fc9d0af3928163f40919885ac7bc0a35a35 Mon Sep 17 00:00:00 2001 From: mic <562710164> Date: Thu, 9 Feb 2023 13:35:14 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0go-zero=E5=8D=8F=E8=AE=AE?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + tcp/config.go | 42 ++++++++++ tcp/engine.go | 54 +++++++++++++ tcp/gnetserver.go | 135 ++++++++++++++++++++++++++++++++ tcp/internal/server.go | 26 +++++++ tcp/server.go | 70 +++++++++++++++++ tcp/types.go | 38 +++++++++ ws/codec.go | 169 +++++++++++++++++++++++++++++++++++++++++ ws/config.go | 42 ++++++++++ ws/engine.go | 46 +++++++++++ ws/gnetserver.go | 154 +++++++++++++++++++++++++++++++++++++ ws/helper.go | 38 +++++++++ ws/internal/server.go | 28 +++++++ ws/server.go | 71 +++++++++++++++++ ws/types.go | 37 +++++++++ 15 files changed, 951 insertions(+) create mode 100644 tcp/config.go create mode 100644 tcp/engine.go create mode 100644 tcp/gnetserver.go create mode 100644 tcp/internal/server.go create mode 100644 tcp/server.go create mode 100644 tcp/types.go create mode 100644 ws/codec.go create mode 100644 ws/config.go create mode 100644 ws/engine.go create mode 100644 ws/gnetserver.go create mode 100644 ws/helper.go create mode 100644 ws/internal/server.go create mode 100644 ws/server.go create mode 100644 ws/types.go diff --git a/go.mod b/go.mod index 8148967..b4b52b5 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/gobwas/pool v0.2.1 // indirect github.com/mattn/go-colorable v0.1.9 // indirect github.com/mattn/go-isatty v0.0.14 // indirect + github.com/panjf2000/ants/v2 v2.7.1 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect go.opentelemetry.io/otel v1.11.0 // indirect diff --git a/tcp/config.go b/tcp/config.go new file mode 100644 index 0000000..241b8c9 --- /dev/null +++ b/tcp/config.go @@ -0,0 +1,42 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2023. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package tcp + +type ( + // A EchoConf is a socket API server config. + EchoConf struct { + // Valid network schemes: + // tcp - bind to both IPv4 and IPv6 + // tcp4 - IPv4 + // tcp6 - IPv6 + // udp - bind to both IPv4 and IPv6 + // udp4 - IPv4 + // udp6 - IPv6 + // unix - Unix Domain Socket + Network string + Host string `json:",default=0.0.0.0"` + Port int + CertFile string `json:",optional"` + KeyFile string `json:",optional"` + MaxConns int `json:",default=10000"` + + // 协议包信息 + Timeout int64 `json:",default=15000"` + Magic uint32 //包头 + Key string //包数据校验Key + } +) diff --git a/tcp/engine.go b/tcp/engine.go new file mode 100644 index 0000000..a32aa4e --- /dev/null +++ b/tcp/engine.go @@ -0,0 +1,54 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2023. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package tcp + +import ( + "fmt" + goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine" + "github.com/zeromicro/go-zero/core/logx" + "yh-core/common/sapi/tcp/internal" +) + +type engine struct { + conf EchoConf + routes featuredRoutes +} + +func newEngine(c EchoConf) *engine { + svr := &engine{ + conf: c, + } + return svr +} +func (ng *engine) addRoutes(r featuredRoutes) { + ng.routes = r +} + +func (ng *engine) start() error { + ss := &GnetServer{ + workerPool: goPool.Default(), + network: ng.conf.Network, + addr: fmt.Sprintf("%s:%d", ng.conf.Host, ng.conf.Port), + } + ss.addRoutes(ng.routes) + + return internal.StartEcho(ss, ng.conf.Network, ng.conf.Host, ng.conf.Port) +} +func (ng *engine) stop() error { + internal.StopEcho(ng.conf.Network, ng.conf.Host, ng.conf.Port) + return logx.Close() +} diff --git a/tcp/gnetserver.go b/tcp/gnetserver.go new file mode 100644 index 0000000..40db9e5 --- /dev/null +++ b/tcp/gnetserver.go @@ -0,0 +1,135 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2023. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package tcp + +import ( + "fmt" + + "github.com/panjf2000/gnet/v2" + goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine" + "github.com/zeromicro/go-zero/core/logx" + "sync" + "sync/atomic" + "time" +) + +type GnetServer struct { + gnet.BuiltinEventEngine + workerPool *goPool.Pool + //链接的对象汇总 + //connectedSockets sync.Map + //心跳 + tick time.Duration + + eng gnet.Engine + + network string + addr string + + connected int32 + maxConnected int32 + + handlers sync.Map +} + +func (s *GnetServer) addRoutes(r featuredRoutes) { + for _, route := range r.routes { + s.handlers.Store(route.Command, route.Handler) + } +} + +func (s *GnetServer) OnBoot(eng gnet.Engine) (action gnet.Action) { + logx.Infof("服务启动成功 %s ", + fmt.Sprintf("%s://%s", s.network, s.addr)) + s.eng = eng + + return +} + +func (s *GnetServer) OnShutdown(eng gnet.Engine) { + //关闭工作池 + logx.Infof("关闭服务 %s ", + fmt.Sprintf("%s://%s", s.network, s.addr)) + s.workerPool.Release() +} + +func (s *GnetServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { + + connected := atomic.AddInt32(&s.connected, 1) + maxConnected := atomic.LoadInt32(&s.maxConnected) + if connected > maxConnected { + atomic.SwapInt32(&s.maxConnected, connected) + } + logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected)) + + return +} + +func (s *GnetServer) OnClose(c gnet.Conn, err error) (action gnet.Action) { + if err != nil { + logx.Infof("error occurred on connection=%s, %v\n", c.RemoteAddr().String(), err) + } + connected := atomic.AddInt32(&s.connected, -1) + logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected)) + return +} + +func (s *GnetServer) OnTraffic(c gnet.Conn) (action gnet.Action) { + + var ( + err error + pkg YHProto.EchoPackage + ) + + for { + err = pkg.DecodeGnet(c) + if err == YHProto.ErrNotEnoughStream { + break + } + if err != nil { + logx.Errorf("invalid packet: %v", err) + return gnet.Close + } + + handler, ok := s.handlers.Load(pkg.H.Command) + if !ok { + logx.Errorf("illegal command{%d}", pkg.H.Command) + return gnet.Close + } + //处理pkg + _ = s.workerPool.Submit( + func() { + h := handler.(HandlerFunc) + h(c, &pkg) + }) + } + + return +} + +func (s *GnetServer) OnTick() (delay time.Duration, action gnet.Action) { + + return +} diff --git a/tcp/internal/server.go b/tcp/internal/server.go new file mode 100644 index 0000000..f89b4c9 --- /dev/null +++ b/tcp/internal/server.go @@ -0,0 +1,26 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package internal + +import ( + "context" + "fmt" + "github.com/panjf2000/gnet/v2" +) + +func StartEcho(eventHandler gnet.EventHandler, network string, host string, port int) error { + + if network == "ws" { + return gnet.Run(eventHandler, fmt.Sprintf("tcp://%s:%d", host, port)) + } + return gnet.Run(eventHandler, fmt.Sprintf("%s://%s:%d", network, host, port)) +} +func StopEcho(network string, host string, port int) { + gnet.Stop(context.Background(), fmt.Sprintf("%s://%s:%d", network, host, port)) +} diff --git a/tcp/server.go b/tcp/server.go new file mode 100644 index 0000000..de7f483 --- /dev/null +++ b/tcp/server.go @@ -0,0 +1,70 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2023. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package tcp + +import ( + "log" +) + +// A EchoServer is a rpc server. +type Server struct { + echo *engine +} + +func MustNewServer(c EchoConf) *Server { + //设置协议信息 + YHProto.SetEchoPkgMagic(c.Magic) + YHProto.SetEchoPkgKey(c.Key) + + server, err := NewServer(c) + if err != nil { + log.Fatal(err) + } + return server +} +func NewServer(c EchoConf) (*Server, error) { + server := &Server{ + echo: newEngine(c), + } + return server, nil +} + +// AddRoutes add given routes into the Server. +func (s *Server) AddRoutes(rs []Route) { + r := featuredRoutes{ + routes: rs, + } + s.echo.addRoutes(r) +} + +// AddRoute adds given route into the Server. +func (s *Server) AddRoute(r Route) { + s.AddRoutes([]Route{r}) +} + +func (s *Server) Start() { + err := s.echo.start() + if err != nil { + log.Fatal(err) + } +} +func (s *Server) Stop() { + err := s.echo.stop() + if err != nil { + return + } +} diff --git a/tcp/types.go b/tcp/types.go new file mode 100644 index 0000000..59fe500 --- /dev/null +++ b/tcp/types.go @@ -0,0 +1,38 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2023. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package tcp + +import ( + "github.com/panjf2000/gnet/v2" + "yh-core/common/sapi/tcp/protocol" +) + +type ( + HandlerFunc func(gnet.Conn, *protocol.EchoPackage) + //PackageHandler interface { + // Handle(gnet.Conn, *protocol.EchoPackage) error + //} + // A handlers is a socket cmd. + Route struct { + Command uint16 //operation command code + Handler HandlerFunc + } + + featuredRoutes struct { + routes []Route + } +) diff --git a/ws/codec.go b/ws/codec.go new file mode 100644 index 0000000..3d39efb --- /dev/null +++ b/ws/codec.go @@ -0,0 +1,169 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2023. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package wsgnet + +import ( + "bytes" + "fmt" + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" + "github.com/panjf2000/gnet/v2" + "github.com/zeromicro/go-zero/core/logx" + "io" +) + +type wsCodec struct { + upgraded bool // 链接是否升级 + buf bytes.Buffer // 从实际socket中读取到的数据缓存 + wsMsgBuf wsMessageBuf // ws 消息缓存 +} + +type wsMessageBuf struct { + firstHeader *ws.Header + curHeader *ws.Header + cachedBuf bytes.Buffer +} + +type readWrite struct { + io.Reader + io.Writer +} + +func (w *wsCodec) upgrade(c gnet.Conn) (ok bool, action gnet.Action) { + if w.upgraded { + ok = true + return + } + buf := &w.buf + tmpReader := bytes.NewReader(buf.Bytes()) + oldLen := tmpReader.Len() + logx.Infof("do Upgrade") + + hs, err := ws.Upgrade(readWrite{tmpReader, c}) + skipN := oldLen - tmpReader.Len() + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { //数据不完整 + return + } + buf.Next(skipN) + logx.Infof("conn[%v] [err=%v]", c.RemoteAddr().String(), err.Error()) + action = gnet.Close + return + } + buf.Next(skipN) + logx.Infof("conn[%v] upgrade websocket protocol! Handshake: %v", c.RemoteAddr().String(), hs) + if err != nil { + logx.Infof("conn[%v] [err=%v]", c.RemoteAddr().String(), err.Error()) + action = gnet.Close + return + } + ok = true + w.upgraded = true + return +} +func (w *wsCodec) readBufferBytes(c gnet.Conn) gnet.Action { + size := c.InboundBuffered() + buf := make([]byte, size, size) + read, err := c.Read(buf) + if err != nil { + logx.Infof("read err! %w", err) + return gnet.Close + } + if read < size { + logx.Infof("read bytes len err! size: %d read: %d", size, read) + return gnet.Close + } + w.buf.Write(buf) + return gnet.None +} +func (w *wsCodec) Decode(c gnet.Conn) (outs []wsutil.Message, err error) { + fmt.Println("do Decode") + messages, err := w.readWsMessages() + if err != nil { + logx.Infof("Error reading message! %v", err) + return nil, err + } + if messages == nil || len(messages) <= 0 { //没有读到完整数据 不处理 + return + } + for _, message := range messages { + if message.OpCode.IsControl() { + err = wsutil.HandleClientControlMessage(c, message) + if err != nil { + return + } + continue + } + if message.OpCode == ws.OpText || message.OpCode == ws.OpBinary { + outs = append(outs, message) + } + } + return +} + +func (w *wsCodec) readWsMessages() (messages []wsutil.Message, err error) { + msgBuf := &w.wsMsgBuf + in := &w.buf + for { + if msgBuf.curHeader == nil { + if in.Len() < ws.MinHeaderSize { //头长度至少是2 + return + } + var head ws.Header + if in.Len() >= ws.MaxHeaderSize { + head, err = ws.ReadHeader(in) + if err != nil { + return messages, err + } + } else { //有可能不完整,构建新的 reader 读取 head 读取成功才实际对 in 进行读操作 + tmpReader := bytes.NewReader(in.Bytes()) + oldLen := tmpReader.Len() + head, err = ws.ReadHeader(tmpReader) + skipN := oldLen - tmpReader.Len() + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { //数据不完整 + return messages, nil + } + in.Next(skipN) + return nil, err + } + in.Next(skipN) + } + + msgBuf.curHeader = &head + err = ws.WriteHeader(&msgBuf.cachedBuf, head) + if err != nil { + return nil, err + } + } + dataLen := (int)(msgBuf.curHeader.Length) + if dataLen > 0 { + if in.Len() >= dataLen { + _, err = io.CopyN(&msgBuf.cachedBuf, in, int64(dataLen)) + if err != nil { + return + } + } else { //数据不完整 + fmt.Println(in.Len(), dataLen) + logx.Infof("incomplete data") + return + } + } + if msgBuf.curHeader.Fin { //当前 header 已经是一个完整消息 + messages, err = wsutil.ReadClientMessage(&msgBuf.cachedBuf, messages) + if err != nil { + return nil, err + } + msgBuf.cachedBuf.Reset() + } else { + logx.Infof("The data is split into multiple frames") + } + msgBuf.curHeader = nil + } +} diff --git a/ws/config.go b/ws/config.go new file mode 100644 index 0000000..b062533 --- /dev/null +++ b/ws/config.go @@ -0,0 +1,42 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2023. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package wsgnet + +type ( + // A EchoConf is a socket API server config. + EchoConf struct { + // Valid network schemes: + // tcp - bind to both IPv4 and IPv6 + // tcp4 - IPv4 + // tcp6 - IPv6 + // udp - bind to both IPv4 and IPv6 + // udp4 - IPv4 + // udp6 - IPv6 + // unix - Unix Domain Socket + Network string + Host string `json:",default=0.0.0.0"` + Port int + CertFile string `json:",optional"` + KeyFile string `json:",optional"` + MaxConns int `json:",default=10000"` + + // 协议包信息 + Timeout int64 `json:",default=15000"` + Magic uint32 //包头 + Key string //包数据校验Key + } +) diff --git a/ws/engine.go b/ws/engine.go new file mode 100644 index 0000000..c297c00 --- /dev/null +++ b/ws/engine.go @@ -0,0 +1,46 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package wsgnet + +import ( + "fmt" + goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine" + "github.com/zeromicro/go-zero/core/logx" + "yh-core/common/sapi/ws/internal" +) + +type engine struct { + conf EchoConf + routes featuredRoutes +} + +func newEngine(c EchoConf) *engine { + svr := &engine{ + conf: c, + } + return svr +} +func (ng *engine) addRoutes(r featuredRoutes) { + ng.routes = r +} + +func (ng *engine) start() error { + ss := &GnetServer{ + workerPool: goPool.Default(), + network: ng.conf.Network, + addr: fmt.Sprintf("%s:%d", ng.conf.Host, ng.conf.Port), + } + ss.addRoutes(ng.routes) + + return internal.StartEcho(ss, ng.conf.Network, ng.conf.Host, ng.conf.Port) +} +func (ng *engine) stop() error { + internal.StopEcho(ng.conf.Network, ng.conf.Host, ng.conf.Port) + return logx.Close() +} diff --git a/ws/gnetserver.go b/ws/gnetserver.go new file mode 100644 index 0000000..0a66b37 --- /dev/null +++ b/ws/gnetserver.go @@ -0,0 +1,154 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package wsgnet + +import ( + "fmt" + + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" + "github.com/panjf2000/gnet/v2" + goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine" + "github.com/zeromicro/go-zero/core/logx" + "sync" + "sync/atomic" + "time" +) + +type GnetServer struct { + gnet.BuiltinEventEngine + workerPool *goPool.Pool + //链接的对象汇总 + //connectedSockets sync.Map + //心跳 + tick time.Duration + + eng gnet.Engine + + network string + addr string + + connected int32 + maxConnected int32 + + handlers sync.Map +} + +func (s *GnetServer) addRoutes(r featuredRoutes) { + for _, route := range r.routes { + s.handlers.Store(route.Command, route.Handler) + } +} + +func (s *GnetServer) OnBoot(eng gnet.Engine) (action gnet.Action) { + logx.Infof("服务启动成功 %s ", + fmt.Sprintf("%s://%s", s.network, s.addr)) + s.eng = eng + + return +} + +func (s *GnetServer) OnShutdown(eng gnet.Engine) { + //关闭工作池 + logx.Infof("关闭服务 %s ", + fmt.Sprintf("%s://%s", s.network, s.addr)) + s.workerPool.Release() +} + +func (s *GnetServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { + c.SetContext(new(wsCodec)) + connected := atomic.AddInt32(&s.connected, 1) + maxConnected := atomic.LoadInt32(&s.maxConnected) + if connected > maxConnected { + atomic.SwapInt32(&s.maxConnected, connected) + } + logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected)) + + return +} + +func (s *GnetServer) OnClose(c gnet.Conn, err error) (action gnet.Action) { + if err != nil { + logx.Infof("error occurred on connection=%s, %v\n", c.RemoteAddr().String(), err) + } + connected := atomic.AddInt32(&s.connected, -1) + logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected)) + return +} + +func (s *GnetServer) OnTraffic(c gnet.Conn) (action gnet.Action) { + wsc := c.Context().(*wsCodec) + if wsc.readBufferBytes(c) == gnet.Close { + return gnet.Close + } + ok, action := wsc.upgrade(c) + if !ok { + return action + } + + if wsc.buf.Len() <= 0 { + return gnet.None + } + messages, err := wsc.Decode(c) + if err != nil { + return gnet.Close + } + + if messages == nil { + return gnet.None + } + var ( + pkg YHProto.EchoPackage + ) + + for _, message := range messages { + switch message.OpCode { + case ws.OpContinuation: + case ws.OpText: + logx.Infof("conn[%v] receive [op=%v] [msg=%v, len=%d]", c.RemoteAddr().String(), message.OpCode, string(message.Payload), len(message.Payload)) + err = wsutil.WriteServerMessage(c, ws.OpText, message.Payload) + if err != nil { + logx.Infof("conn[%v] [err=%v]", c.RemoteAddr().String(), err.Error()) + return gnet.Close + } + case ws.OpBinary: + err = pkg.DecodeWsMessage(message) + if err == YHProto.ErrNotEnoughStream { + break + } + if err != nil { + logx.Errorf("invalid packet: %v", err) + return gnet.Close + } + + handler, ok := s.handlers.Load(pkg.H.Command) + if !ok { + logx.Errorf("illegal command{%d}", pkg.H.Command) + return gnet.Close + } + //处理pkg + _ = s.workerPool.Submit( + func() { + h := handler.(HandlerFunc) + h(c, &pkg) + }) + case ws.OpClose: + return gnet.Close + case ws.OpPing: + case ws.OpPong: + } + } + + return +} + +func (s *GnetServer) OnTick() (delay time.Duration, action gnet.Action) { + + return +} diff --git a/ws/helper.go b/ws/helper.go new file mode 100644 index 0000000..d87e689 --- /dev/null +++ b/ws/helper.go @@ -0,0 +1,38 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2023. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2023. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package wsgnet + +import ( + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" + "github.com/panjf2000/gnet/v2" +) + +func WriteServerMessage(c gnet.Conn, op ws.OpCode, p []byte) error { + if c.Context() != nil { + err := wsutil.WriteServerMessage(c, op, p) + if err != nil { + return err + } + } else { + _, err := c.Write(p) + if err != nil { + return err + } + } + return nil +} diff --git a/ws/internal/server.go b/ws/internal/server.go new file mode 100644 index 0000000..5e6b3f3 --- /dev/null +++ b/ws/internal/server.go @@ -0,0 +1,28 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package internal + +import ( + "context" + "errors" + "fmt" + "github.com/panjf2000/gnet/v2" +) + +func StartEcho(eventHandler gnet.EventHandler, network string, host string, port int) error { + + if network == "ws" { + return gnet.Run(eventHandler, fmt.Sprintf("tcp://%s:%d", host, port), gnet.WithReusePort(true)) + } + return errors.New(" only network ws!") + +} +func StopEcho(network string, host string, port int) { + gnet.Stop(context.Background(), fmt.Sprintf("%s://%s:%d", network, host, port)) +} diff --git a/ws/server.go b/ws/server.go new file mode 100644 index 0000000..3313fcf --- /dev/null +++ b/ws/server.go @@ -0,0 +1,71 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2023. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package wsgnet + +import ( + "git.yhrjkj.com/YuanHong/YHEchoPackage/YHProto" + "log" +) + +// A EchoServer is a rpc server. +type Server struct { + echo *engine +} + +func MustNewServer(c EchoConf) *Server { + //设置协议信息 + YHProto.SetEchoPkgMagic(c.Magic) + YHProto.SetEchoPkgKey(c.Key) + + server, err := NewServer(c) + if err != nil { + log.Fatal(err) + } + return server +} +func NewServer(c EchoConf) (*Server, error) { + server := &Server{ + echo: newEngine(c), + } + return server, nil +} + +// AddRoutes add given routes into the Server. +func (s *Server) AddRoutes(rs []Route) { + r := featuredRoutes{ + routes: rs, + } + s.echo.addRoutes(r) +} + +// AddRoute adds given route into the Server. +func (s *Server) AddRoute(r Route) { + s.AddRoutes([]Route{r}) +} + +func (s *Server) Start() { + err := s.echo.start() + if err != nil { + log.Fatal(err) + } +} +func (s *Server) Stop() { + err := s.echo.stop() + if err != nil { + return + } +} diff --git a/ws/types.go b/ws/types.go new file mode 100644 index 0000000..0a7856f --- /dev/null +++ b/ws/types.go @@ -0,0 +1,37 @@ +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2023. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +/* + * 版权所有 (c) 上海元泓软件科技有限公司 2022. + * 严禁通过任何媒介未经授权复制本文件. + * + * 作者:mic + * Email:funui@outlook.com + */ + +package wsgnet + +import ( + "github.com/panjf2000/gnet/v2" +) + +type ( + HandlerFunc func(gnet.Conn, *YHProto.EchoPackage) + //PackageHandler interface { + // Handle(gnet.Conn, *protocol.EchoPackage) error + //} + // A handlers is a socket cmd. + Route struct { + Command uint16 //operation command code + Handler HandlerFunc + } + + featuredRoutes struct { + routes []Route + } +)