添加go-zero协议代码

master v0.0.6
mic 2023-02-09 13:35:14 +08:00
parent d53b3f1d39
commit ae8c4fc9d0
15 changed files with 951 additions and 0 deletions

1
go.mod
View File

@ -14,6 +14,7 @@ require (
github.com/gobwas/pool v0.2.1 // indirect
github.com/mattn/go-colorable v0.1.9 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/panjf2000/ants/v2 v2.7.1 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.opentelemetry.io/otel v1.11.0 // indirect

42
tcp/config.go 100644
View File

@ -0,0 +1,42 @@
/*
* (c) 2023.
* .
*
* :mic
* Email:funui@outlook.com
*/
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
package tcp
type (
// A EchoConf is a socket API server config.
EchoConf struct {
// Valid network schemes:
// tcp - bind to both IPv4 and IPv6
// tcp4 - IPv4
// tcp6 - IPv6
// udp - bind to both IPv4 and IPv6
// udp4 - IPv4
// udp6 - IPv6
// unix - Unix Domain Socket
Network string
Host string `json:",default=0.0.0.0"`
Port int
CertFile string `json:",optional"`
KeyFile string `json:",optional"`
MaxConns int `json:",default=10000"`
// 协议包信息
Timeout int64 `json:",default=15000"`
Magic uint32 //包头
Key string //包数据校验Key
}
)

54
tcp/engine.go 100644
View File

@ -0,0 +1,54 @@
/*
* (c) 2023.
* .
*
* :mic
* Email:funui@outlook.com
*/
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
package tcp
import (
"fmt"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
"github.com/zeromicro/go-zero/core/logx"
"yh-core/common/sapi/tcp/internal"
)
type engine struct {
conf EchoConf
routes featuredRoutes
}
func newEngine(c EchoConf) *engine {
svr := &engine{
conf: c,
}
return svr
}
func (ng *engine) addRoutes(r featuredRoutes) {
ng.routes = r
}
func (ng *engine) start() error {
ss := &GnetServer{
workerPool: goPool.Default(),
network: ng.conf.Network,
addr: fmt.Sprintf("%s:%d", ng.conf.Host, ng.conf.Port),
}
ss.addRoutes(ng.routes)
return internal.StartEcho(ss, ng.conf.Network, ng.conf.Host, ng.conf.Port)
}
func (ng *engine) stop() error {
internal.StopEcho(ng.conf.Network, ng.conf.Host, ng.conf.Port)
return logx.Close()
}

135
tcp/gnetserver.go 100644
View File

@ -0,0 +1,135 @@
/*
* (c) 2023.
* .
*
* :mic
* Email:funui@outlook.com
*/
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
package tcp
import (
"fmt"
"github.com/panjf2000/gnet/v2"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
"github.com/zeromicro/go-zero/core/logx"
"sync"
"sync/atomic"
"time"
)
type GnetServer struct {
gnet.BuiltinEventEngine
workerPool *goPool.Pool
//链接的对象汇总
//connectedSockets sync.Map
//心跳
tick time.Duration
eng gnet.Engine
network string
addr string
connected int32
maxConnected int32
handlers sync.Map
}
func (s *GnetServer) addRoutes(r featuredRoutes) {
for _, route := range r.routes {
s.handlers.Store(route.Command, route.Handler)
}
}
func (s *GnetServer) OnBoot(eng gnet.Engine) (action gnet.Action) {
logx.Infof("服务启动成功 %s ",
fmt.Sprintf("%s://%s", s.network, s.addr))
s.eng = eng
return
}
func (s *GnetServer) OnShutdown(eng gnet.Engine) {
//关闭工作池
logx.Infof("关闭服务 %s ",
fmt.Sprintf("%s://%s", s.network, s.addr))
s.workerPool.Release()
}
func (s *GnetServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
connected := atomic.AddInt32(&s.connected, 1)
maxConnected := atomic.LoadInt32(&s.maxConnected)
if connected > maxConnected {
atomic.SwapInt32(&s.maxConnected, connected)
}
logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected))
return
}
func (s *GnetServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
if err != nil {
logx.Infof("error occurred on connection=%s, %v\n", c.RemoteAddr().String(), err)
}
connected := atomic.AddInt32(&s.connected, -1)
logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected))
return
}
func (s *GnetServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
var (
err error
pkg YHProto.EchoPackage
)
for {
err = pkg.DecodeGnet(c)
if err == YHProto.ErrNotEnoughStream {
break
}
if err != nil {
logx.Errorf("invalid packet: %v", err)
return gnet.Close
}
handler, ok := s.handlers.Load(pkg.H.Command)
if !ok {
logx.Errorf("illegal command{%d}", pkg.H.Command)
return gnet.Close
}
//处理pkg
_ = s.workerPool.Submit(
func() {
h := handler.(HandlerFunc)
h(c, &pkg)
})
}
return
}
func (s *GnetServer) OnTick() (delay time.Duration, action gnet.Action) {
return
}

View File

@ -0,0 +1,26 @@
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
package internal
import (
"context"
"fmt"
"github.com/panjf2000/gnet/v2"
)
func StartEcho(eventHandler gnet.EventHandler, network string, host string, port int) error {
if network == "ws" {
return gnet.Run(eventHandler, fmt.Sprintf("tcp://%s:%d", host, port))
}
return gnet.Run(eventHandler, fmt.Sprintf("%s://%s:%d", network, host, port))
}
func StopEcho(network string, host string, port int) {
gnet.Stop(context.Background(), fmt.Sprintf("%s://%s:%d", network, host, port))
}

70
tcp/server.go 100644
View File

@ -0,0 +1,70 @@
/*
* (c) 2023.
* .
*
* :mic
* Email:funui@outlook.com
*/
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
package tcp
import (
"log"
)
// A EchoServer is a rpc server.
type Server struct {
echo *engine
}
func MustNewServer(c EchoConf) *Server {
//设置协议信息
YHProto.SetEchoPkgMagic(c.Magic)
YHProto.SetEchoPkgKey(c.Key)
server, err := NewServer(c)
if err != nil {
log.Fatal(err)
}
return server
}
func NewServer(c EchoConf) (*Server, error) {
server := &Server{
echo: newEngine(c),
}
return server, nil
}
// AddRoutes add given routes into the Server.
func (s *Server) AddRoutes(rs []Route) {
r := featuredRoutes{
routes: rs,
}
s.echo.addRoutes(r)
}
// AddRoute adds given route into the Server.
func (s *Server) AddRoute(r Route) {
s.AddRoutes([]Route{r})
}
func (s *Server) Start() {
err := s.echo.start()
if err != nil {
log.Fatal(err)
}
}
func (s *Server) Stop() {
err := s.echo.stop()
if err != nil {
return
}
}

38
tcp/types.go 100644
View File

@ -0,0 +1,38 @@
/*
* (c) 2023.
* .
*
* :mic
* Email:funui@outlook.com
*/
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
package tcp
import (
"github.com/panjf2000/gnet/v2"
"yh-core/common/sapi/tcp/protocol"
)
type (
HandlerFunc func(gnet.Conn, *protocol.EchoPackage)
//PackageHandler interface {
// Handle(gnet.Conn, *protocol.EchoPackage) error
//}
// A handlers is a socket cmd.
Route struct {
Command uint16 //operation command code
Handler HandlerFunc
}
featuredRoutes struct {
routes []Route
}
)

169
ws/codec.go 100644
View File

@ -0,0 +1,169 @@
/*
* (c) 2023.
* .
*
* :mic
* Email:funui@outlook.com
*/
package wsgnet
import (
"bytes"
"fmt"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/panjf2000/gnet/v2"
"github.com/zeromicro/go-zero/core/logx"
"io"
)
type wsCodec struct {
upgraded bool // 链接是否升级
buf bytes.Buffer // 从实际socket中读取到的数据缓存
wsMsgBuf wsMessageBuf // ws 消息缓存
}
type wsMessageBuf struct {
firstHeader *ws.Header
curHeader *ws.Header
cachedBuf bytes.Buffer
}
type readWrite struct {
io.Reader
io.Writer
}
func (w *wsCodec) upgrade(c gnet.Conn) (ok bool, action gnet.Action) {
if w.upgraded {
ok = true
return
}
buf := &w.buf
tmpReader := bytes.NewReader(buf.Bytes())
oldLen := tmpReader.Len()
logx.Infof("do Upgrade")
hs, err := ws.Upgrade(readWrite{tmpReader, c})
skipN := oldLen - tmpReader.Len()
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF { //数据不完整
return
}
buf.Next(skipN)
logx.Infof("conn[%v] [err=%v]", c.RemoteAddr().String(), err.Error())
action = gnet.Close
return
}
buf.Next(skipN)
logx.Infof("conn[%v] upgrade websocket protocol! Handshake: %v", c.RemoteAddr().String(), hs)
if err != nil {
logx.Infof("conn[%v] [err=%v]", c.RemoteAddr().String(), err.Error())
action = gnet.Close
return
}
ok = true
w.upgraded = true
return
}
func (w *wsCodec) readBufferBytes(c gnet.Conn) gnet.Action {
size := c.InboundBuffered()
buf := make([]byte, size, size)
read, err := c.Read(buf)
if err != nil {
logx.Infof("read err! %w", err)
return gnet.Close
}
if read < size {
logx.Infof("read bytes len err! size: %d read: %d", size, read)
return gnet.Close
}
w.buf.Write(buf)
return gnet.None
}
func (w *wsCodec) Decode(c gnet.Conn) (outs []wsutil.Message, err error) {
fmt.Println("do Decode")
messages, err := w.readWsMessages()
if err != nil {
logx.Infof("Error reading message! %v", err)
return nil, err
}
if messages == nil || len(messages) <= 0 { //没有读到完整数据 不处理
return
}
for _, message := range messages {
if message.OpCode.IsControl() {
err = wsutil.HandleClientControlMessage(c, message)
if err != nil {
return
}
continue
}
if message.OpCode == ws.OpText || message.OpCode == ws.OpBinary {
outs = append(outs, message)
}
}
return
}
func (w *wsCodec) readWsMessages() (messages []wsutil.Message, err error) {
msgBuf := &w.wsMsgBuf
in := &w.buf
for {
if msgBuf.curHeader == nil {
if in.Len() < ws.MinHeaderSize { //头长度至少是2
return
}
var head ws.Header
if in.Len() >= ws.MaxHeaderSize {
head, err = ws.ReadHeader(in)
if err != nil {
return messages, err
}
} else { //有可能不完整,构建新的 reader 读取 head 读取成功才实际对 in 进行读操作
tmpReader := bytes.NewReader(in.Bytes())
oldLen := tmpReader.Len()
head, err = ws.ReadHeader(tmpReader)
skipN := oldLen - tmpReader.Len()
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF { //数据不完整
return messages, nil
}
in.Next(skipN)
return nil, err
}
in.Next(skipN)
}
msgBuf.curHeader = &head
err = ws.WriteHeader(&msgBuf.cachedBuf, head)
if err != nil {
return nil, err
}
}
dataLen := (int)(msgBuf.curHeader.Length)
if dataLen > 0 {
if in.Len() >= dataLen {
_, err = io.CopyN(&msgBuf.cachedBuf, in, int64(dataLen))
if err != nil {
return
}
} else { //数据不完整
fmt.Println(in.Len(), dataLen)
logx.Infof("incomplete data")
return
}
}
if msgBuf.curHeader.Fin { //当前 header 已经是一个完整消息
messages, err = wsutil.ReadClientMessage(&msgBuf.cachedBuf, messages)
if err != nil {
return nil, err
}
msgBuf.cachedBuf.Reset()
} else {
logx.Infof("The data is split into multiple frames")
}
msgBuf.curHeader = nil
}
}

42
ws/config.go 100644
View File

@ -0,0 +1,42 @@
/*
* (c) 2023.
* .
*
* :mic
* Email:funui@outlook.com
*/
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
package wsgnet
type (
// A EchoConf is a socket API server config.
EchoConf struct {
// Valid network schemes:
// tcp - bind to both IPv4 and IPv6
// tcp4 - IPv4
// tcp6 - IPv6
// udp - bind to both IPv4 and IPv6
// udp4 - IPv4
// udp6 - IPv6
// unix - Unix Domain Socket
Network string
Host string `json:",default=0.0.0.0"`
Port int
CertFile string `json:",optional"`
KeyFile string `json:",optional"`
MaxConns int `json:",default=10000"`
// 协议包信息
Timeout int64 `json:",default=15000"`
Magic uint32 //包头
Key string //包数据校验Key
}
)

46
ws/engine.go 100644
View File

@ -0,0 +1,46 @@
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
package wsgnet
import (
"fmt"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
"github.com/zeromicro/go-zero/core/logx"
"yh-core/common/sapi/ws/internal"
)
type engine struct {
conf EchoConf
routes featuredRoutes
}
func newEngine(c EchoConf) *engine {
svr := &engine{
conf: c,
}
return svr
}
func (ng *engine) addRoutes(r featuredRoutes) {
ng.routes = r
}
func (ng *engine) start() error {
ss := &GnetServer{
workerPool: goPool.Default(),
network: ng.conf.Network,
addr: fmt.Sprintf("%s:%d", ng.conf.Host, ng.conf.Port),
}
ss.addRoutes(ng.routes)
return internal.StartEcho(ss, ng.conf.Network, ng.conf.Host, ng.conf.Port)
}
func (ng *engine) stop() error {
internal.StopEcho(ng.conf.Network, ng.conf.Host, ng.conf.Port)
return logx.Close()
}

154
ws/gnetserver.go 100644
View File

@ -0,0 +1,154 @@
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
package wsgnet
import (
"fmt"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/panjf2000/gnet/v2"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
"github.com/zeromicro/go-zero/core/logx"
"sync"
"sync/atomic"
"time"
)
type GnetServer struct {
gnet.BuiltinEventEngine
workerPool *goPool.Pool
//链接的对象汇总
//connectedSockets sync.Map
//心跳
tick time.Duration
eng gnet.Engine
network string
addr string
connected int32
maxConnected int32
handlers sync.Map
}
func (s *GnetServer) addRoutes(r featuredRoutes) {
for _, route := range r.routes {
s.handlers.Store(route.Command, route.Handler)
}
}
func (s *GnetServer) OnBoot(eng gnet.Engine) (action gnet.Action) {
logx.Infof("服务启动成功 %s ",
fmt.Sprintf("%s://%s", s.network, s.addr))
s.eng = eng
return
}
func (s *GnetServer) OnShutdown(eng gnet.Engine) {
//关闭工作池
logx.Infof("关闭服务 %s ",
fmt.Sprintf("%s://%s", s.network, s.addr))
s.workerPool.Release()
}
func (s *GnetServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
c.SetContext(new(wsCodec))
connected := atomic.AddInt32(&s.connected, 1)
maxConnected := atomic.LoadInt32(&s.maxConnected)
if connected > maxConnected {
atomic.SwapInt32(&s.maxConnected, connected)
}
logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected))
return
}
func (s *GnetServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
if err != nil {
logx.Infof("error occurred on connection=%s, %v\n", c.RemoteAddr().String(), err)
}
connected := atomic.AddInt32(&s.connected, -1)
logx.Infof("在线:%d,最高在线:%d", connected, atomic.LoadInt32(&s.maxConnected))
return
}
func (s *GnetServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
wsc := c.Context().(*wsCodec)
if wsc.readBufferBytes(c) == gnet.Close {
return gnet.Close
}
ok, action := wsc.upgrade(c)
if !ok {
return action
}
if wsc.buf.Len() <= 0 {
return gnet.None
}
messages, err := wsc.Decode(c)
if err != nil {
return gnet.Close
}
if messages == nil {
return gnet.None
}
var (
pkg YHProto.EchoPackage
)
for _, message := range messages {
switch message.OpCode {
case ws.OpContinuation:
case ws.OpText:
logx.Infof("conn[%v] receive [op=%v] [msg=%v, len=%d]", c.RemoteAddr().String(), message.OpCode, string(message.Payload), len(message.Payload))
err = wsutil.WriteServerMessage(c, ws.OpText, message.Payload)
if err != nil {
logx.Infof("conn[%v] [err=%v]", c.RemoteAddr().String(), err.Error())
return gnet.Close
}
case ws.OpBinary:
err = pkg.DecodeWsMessage(message)
if err == YHProto.ErrNotEnoughStream {
break
}
if err != nil {
logx.Errorf("invalid packet: %v", err)
return gnet.Close
}
handler, ok := s.handlers.Load(pkg.H.Command)
if !ok {
logx.Errorf("illegal command{%d}", pkg.H.Command)
return gnet.Close
}
//处理pkg
_ = s.workerPool.Submit(
func() {
h := handler.(HandlerFunc)
h(c, &pkg)
})
case ws.OpClose:
return gnet.Close
case ws.OpPing:
case ws.OpPong:
}
}
return
}
func (s *GnetServer) OnTick() (delay time.Duration, action gnet.Action) {
return
}

38
ws/helper.go 100644
View File

@ -0,0 +1,38 @@
/*
* (c) 2023.
* .
*
* :mic
* Email:funui@outlook.com
*/
/*
* (c) 2023.
* .
*
* :mic
* Email:funui@outlook.com
*/
package wsgnet
import (
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/panjf2000/gnet/v2"
)
func WriteServerMessage(c gnet.Conn, op ws.OpCode, p []byte) error {
if c.Context() != nil {
err := wsutil.WriteServerMessage(c, op, p)
if err != nil {
return err
}
} else {
_, err := c.Write(p)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,28 @@
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
package internal
import (
"context"
"errors"
"fmt"
"github.com/panjf2000/gnet/v2"
)
func StartEcho(eventHandler gnet.EventHandler, network string, host string, port int) error {
if network == "ws" {
return gnet.Run(eventHandler, fmt.Sprintf("tcp://%s:%d", host, port), gnet.WithReusePort(true))
}
return errors.New(" only network ws!")
}
func StopEcho(network string, host string, port int) {
gnet.Stop(context.Background(), fmt.Sprintf("%s://%s:%d", network, host, port))
}

71
ws/server.go 100644
View File

@ -0,0 +1,71 @@
/*
* (c) 2023.
* .
*
* :mic
* Email:funui@outlook.com
*/
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
package wsgnet
import (
"git.yhrjkj.com/YuanHong/YHEchoPackage/YHProto"
"log"
)
// A EchoServer is a rpc server.
type Server struct {
echo *engine
}
func MustNewServer(c EchoConf) *Server {
//设置协议信息
YHProto.SetEchoPkgMagic(c.Magic)
YHProto.SetEchoPkgKey(c.Key)
server, err := NewServer(c)
if err != nil {
log.Fatal(err)
}
return server
}
func NewServer(c EchoConf) (*Server, error) {
server := &Server{
echo: newEngine(c),
}
return server, nil
}
// AddRoutes add given routes into the Server.
func (s *Server) AddRoutes(rs []Route) {
r := featuredRoutes{
routes: rs,
}
s.echo.addRoutes(r)
}
// AddRoute adds given route into the Server.
func (s *Server) AddRoute(r Route) {
s.AddRoutes([]Route{r})
}
func (s *Server) Start() {
err := s.echo.start()
if err != nil {
log.Fatal(err)
}
}
func (s *Server) Stop() {
err := s.echo.stop()
if err != nil {
return
}
}

37
ws/types.go 100644
View File

@ -0,0 +1,37 @@
/*
* (c) 2023.
* .
*
* :mic
* Email:funui@outlook.com
*/
/*
* (c) 2022.
* .
*
* :mic
* Email:funui@outlook.com
*/
package wsgnet
import (
"github.com/panjf2000/gnet/v2"
)
type (
HandlerFunc func(gnet.Conn, *YHProto.EchoPackage)
//PackageHandler interface {
// Handle(gnet.Conn, *protocol.EchoPackage) error
//}
// A handlers is a socket cmd.
Route struct {
Command uint16 //operation command code
Handler HandlerFunc
}
featuredRoutes struct {
routes []Route
}
)