事件模型:
核心代码:
/**
启动tcpServer
*/
func startTcpServer(port string) {
addr := gxnet.HostAddress2("localhost", port)
server := getty.NewTCPServer(
getty.WithLocalAddress(addr),
)
// run server
server.RunEventLoop(newSession)
log.Info("server bind addr{%s} ok!", port)
}
func newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
)
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetKeepAlive(true)
session.SetPkgHandler(irisPkgHandler)
session.SetEventListener(irisMsgHandler)
session.SetCronPeriod(1000 * 10)
log.Debug("client new session:%s\n", session.Stat())
return nil
}
設置包处理器, 用于消息的编解码,需实现ReadWriter 接口,让用户可以自己实现具体的序列化/反序列化逻辑
session.SetPkgHandler(irisPkgHandler)
設置監控接口, 通过这些接口可以知道每个网络连接在每个阶段的状态, 需实现EventListener接口,
session.SetEventListener(irisMsgHandler)
- 「OnOpen」:连接建立时提供给用户使用,若当前连接总数超过用户设定的连接数,则可以返回一个非 nil 的 error,Getty 就会在初始阶段关闭这个连接。
- 「OnError」:用于连接有异常时的监控,Getty 执行这个接口后关闭连接
- 「OnClose」:用于连接关闭时的监控,Getty 执行这个接口后关闭连接。
- 「OnMessage」:当 Getty 调用 Reader 接口成功从 TCP流/UDP/WebSocket 网络中解析出一个 package 后,通过这个接口把数据包交给用户处理。
- 「OnCron」:定时接口,用户可以在这里接口函数中执行心跳检测等一些定时逻辑。
设置定时接口的触发周期
session.SetCronPeriod(1000 * 10)
完整示例:
package main
import (
"flag"
"fmt"
getty "github.com/AlexStocks/getty/transport"
gxnet "github.com/AlexStocks/goext/net"
log "github.com/sirupsen/logrus"
conf "iris/config"
"iris/global"
"iris/tcp/handler"
"iris/web/router"
"net"
"net/http"
"time"
)
var (
cfgPath string
irisMsgHandler = handler.NewEchoMessageHandler()
irisPkgHandler = handler.NewEchoPackageHandler()
)
func init() {
flag.StringVar(&cfgPath, "c", "./etc/config.yaml", "")
flag.Parse()
}
func main() {
println("starting.....")
err := conf.Init(cfgPath)
if err != nil {
log.WithFields(log.Fields{"cfg_path": cfgPath}).WithError(err).Error("[main] config init error")
return
}
log.Infof("conf:%v", conf.GConfig)
//初始化全局变量
global.Init()
go startTcpServer(conf.GConfig.TcpServer.Port)
router, err := router.InitRouter()
if err != nil {
log.WithError(err).Error("[main] init router error")
return
}
server := &http.Server{
Addr: ":" + conf.GConfig.WebServer.Port,
Handler: router,
ReadTimeout: time.Second * 60,
WriteTimeout: time.Second * 60,
MaxHeaderBytes: http.DefaultMaxHeaderBytes,
}
err = server.ListenAndServe()
if err != nil {
log.WithError(err).Error("[main] start web server error")
return
}
}
/**
启动tcpServer
*/
func startTcpServer(port string) {
addr := gxnet.HostAddress2("localhost", port)
server := getty.NewTCPServer(
getty.WithLocalAddress(addr),
)
// run server
server.RunEventLoop(newSession)
log.Info("server bind addr{%s} ok!", port)
}
func newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
)
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetKeepAlive(true)
session.SetPkgHandler(irisPkgHandler)
session.SetEventListener(irisMsgHandler)
session.SetCronPeriod(1000 * 10)
log.Debug("client new session:%s\n", session.Stat())
return nil
}
定义 handler:
package handler
import (
"encoding/hex"
getty "github.com/AlexStocks/getty/transport"
log "github.com/sirupsen/logrus"
"iris/constant"
"iris/global"
"iris/utils"
"strings"
)
type IrisMessageHandler struct{}
func NewEchoMessageHandler() *IrisMessageHandler {
return &IrisMessageHandler{}
}
func (h *IrisMessageHandler) OnOpen(session getty.Session) error {
log.Info("client connected:" + session.RemoteAddr())
return nil
}
func (h *IrisMessageHandler) OnError(session getty.Session, err error) {
log.Errorf("session{%s} got error{%v}.", session.Stat(), err)
}
func (h *IrisMessageHandler) OnClose(session getty.Session) {
log.Infof("session{%s} is closing......", session.Stat())
}
func (h *IrisMessageHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Infof("get echo package{%s}", pkg)
}
func (h *IrisMessageHandler) OnCron(session getty.Session) {
//心跳处理
log.Info("心跳处理")
}
定义包处理器, 用于消息的编解码,需实现ReadWriter 接口,让用户可以自己实现具体的序列化/反序列化逻辑
type IrisPackageHandler struct{}
func NewEchoPackageHandler() *IrisPackageHandler {
return &IrisPackageHandler{}
}
func (h *IrisPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
log.Info("receive:" + string(data))
return string(data), len(data), nil
}
func (h *IrisPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
log.Info("send:" + pkg.(string))
return hex.DecodeString(pkg.(string))
}