Golang轻量级高并发Socket框架介绍

发表时间: 2024-06-12 19:59

这是基于golang socket 一个轻量级,支持高并发操作的开发框架chitchat。

无论是服务器还是客户端,都只需要三个步骤就可以正常工作:

  1. 告诉它注册(监听)的地址;
  2. 告诉它读取数据后如何处理;
  3. 正确处理错误通知。
package chitchatimport (    "encoding/json"    "errors"    "fmt"    "time")type MasterRoler interface {    Listen() error    Close() error}type NodeRoler interface {    Register() error    Leave() error}type Node struct {    roleMaster bool    local ipx    remote ipx    //For Nodes/master    leave func() error    //For MasterNode    registeredIP []string    closesignal chan struct{}}type ipx struct {    ipaddr string    ipport string}type pingStruct struct {    Data string    Id int}func registerNode(str []byte, s ReadFuncer) error {    if sx := string(str); sx != "" {    fmt.Println(sx)    x := s.Addon().(*Node)    x.registeredIP = append(x.registeredIP, x.remote.ipaddr)    go daemonHBChecker(x.remote, x.closesignal)    fmt.Println(x.registeredIP)    }    return nil}func hb4node(str []byte, s ReadFuncer) error {    v := new(pingStruct)    err := json.Unmarshal(str, v)    if err != nil {    return err    }    if v.Data == "heartbeat ping" {        if err := s.Write(pingStruct{            Data: "heartbeat pong",            Id: v.Id + 1,            }); err == nil {            return errors.New("succeed")        }    return errors.New("writing data error")    }    return errors.New("err message received")}func hb4master(str []byte, s ReadFuncer) error {    defer s.Close()    v := new(pingStruct)    err := json.Unmarshal(str, v)    if err != nil {    return err    }    if v.Data == "heartbeat pong" {    return errors.New("succeed")    }    return errors.New("err message received")}func (t *Node) daemonHBListener() error { //for Nodes listen Master's hbc    fmt.Println("HBListen start.")    defer fmt.Println("->HBListen quit.")    s := NewServer(t.local.ipaddr+":"+"7939", '\n', hb4node, nil)    t.leave = s.Cut    if err := s.Listen(); err != nil {    return err    }    go func() {    defer fmt.Println("->HBL err Daemon closed.")    fmt.Println("HBL err Daemon start.")    timeout := time.Second * 10    timer := time.NewTimer(timeout)    for {    select {    case v, ok := <-s.ErrChan():    if ok {    if v.Err.Error() == "succeed" { //node sends succeed.    timer.Reset(timeout)    }    } else {    return    }    case <-timer.C:    err := s.Cut()    if err != nil {    println(err)    }    fmt.Println("!Found Master is Dead")    return    //TODO: Timeout, master is dead.    }    }    }()    return nil}func daemonHBChecker(ip ipx, csignal <-chan struct{}) { //for master check    defer fmt.Println("->HBChecker quit")    fmt.Println("HBChecker start.")    i := time.NewTicker(3 * time.Second)    failedTimes := 0    for {        select {        case <-csignal:        return        case <-i.C:        fmt.Println("-----------------------------------")        c := NewClient(ip.ipaddr+":"+"7939", '\n', hb4master, nil)        c.SetDeadLine(2 * time.Second)        if err := c.Dial(); err != nil {        //TODO: Failed once.        failedTimes++        break    }    go func() {    fmt.Println("HBC err Daemon start.")    for {        v, ok := <-c.ErrChan()        if ok {        if v.Err.Error() == "err message received" {            failedTimes++        } else if v.Err.Error() == "hbc succeed" {            failedTimes = 0        }        } else {            fmt.Println("->HBC err Daemon closed.")            return        }    }    }()    if err := c.Write(pingStruct{    Data: "heartbeat ping",    Id: 0,    }); err != nil {    failedTimes++    break    }    } //break to here    fmt.Println(ip.ipaddr+":"+ip.ipport+" failed time: ", failedTimes)    if failedTimes > 3 {    //TODO: this connection is failed.    return    }    }}func iportSplitter(socket string) *ipx {    flag := false    s1 := make([]byte, 0)    for i := 0; i < len(socket); i++ {    if socket[i] == ':' {        flag = true        continue    }    if !flag {        s1 = append(s1, socket[i])    } else {        return &ipx{string(s1), socket[i:]}    }    }    return nil}func NewNode(remoteAddr string) NodeRoler {    return &Node{    roleMaster: false,    remote: *iportSplitter(remoteAddr),    registeredIP: nil,    }}func NewMaster(ipAddr string) MasterRoler {    t := iportSplitter(ipAddr)    return &Node{        roleMaster: true,        local: *t,        remote: *t,        registeredIP: make([]string, 0),        closesignal: make(chan struct{}),        }    }    func (t *Node) Listen() error {    server := NewServer(t.local.ipaddr+":"+t.local.ipport, 0, registerNode, t)    if err := server.Listen(); err != nil {    return err    }    t.leave = server.Cut    return nil}func (t *Node) Register() error {    slave := NewClient(t.remote.ipaddr+":"+t.remote.ipport, 0, nil, nil)    if err := slave.Dial(); err != nil {        return err    }    t.local = *iportSplitter(slave.GetLocalAddr())    if err := slave.Write("hello"); err != nil {        return err    }    slave.Close()    return t.daemonHBListener()}func (t *Node) Leave() error {    return t.leave()}func (t *Node) Close() error {    err := t.leave()    if err != nil {        return err    }    close(t.closesignal)    return nil}