diff --git a/tcp_client.go b/tcp_client.go index 79afc44..f350722 100644 --- a/tcp_client.go +++ b/tcp_client.go @@ -1,7 +1,12 @@ package netProgram import ( + "encoding/gob" + "errors" + "fmt" + "io" "log" + "math/rand" "net" "sync" "time" @@ -99,3 +104,350 @@ func TcpBacklogClient() { wg.Wait() } + +func TcpClientRW() { + // tcp服务端地址 + serverAddress := "127.0.0.1:5678" // IPv6 4 + + // A. 建立连接 + conn, err := net.DialTimeout(tcp, serverAddress, time.Second) + //conn, err := net.Dial(tcp, serverAddress) + if err != nil { + log.Println(err) + return + } + // 保证关闭 + defer conn.Close() + log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr()) + + // B.从服务端接收数据,SerRead + buf := make([]byte, 1024) + rn, err := conn.Read(buf) + if err != nil { + log.Println(err) + } + log.Println("received from server data is:", string(buf[:rn])) + + // C.向服务器端发送数据,SerWrite + wn, err := conn.Write([]byte("send some data from client" + "\n")) + if err != nil { + log.Println(err) + } + log.Printf("client write len is %d\n", wn) +} + +func TcpWClient() { + // tcp服务端地址 + serverAddress := "127.0.0.1:5678" // IPv6 4 + + // A. 建立连接 + conn, err := net.DialTimeout(tcp, serverAddress, time.Second) + //conn, err := net.Dial(tcp, serverAddress) + if err != nil { + log.Println(err) + return + } + // 保证关闭 + defer conn.Close() + log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr()) + // + //select {} + + // 1. 阻塞Read + //buf := make([]byte, 1024) + //rn, err := conn.SerRead(buf) + //if err != nil { + // log.Println(err) + //} + //log.Println("received from server data is:", string(buf[:rn])) + + // 2. 循环读 + conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + for { + + buf := make([]byte, 10) + rn, err := conn.Read(buf) + if err != nil { + log.Println(err) + break + } + log.Println("received from server data is:", string(buf[:rn])) + } +} + +func TcpClientRWConcurrency() { + // tcp服务端地址 + serverAddress := "127.0.0.1:5678" // IPv6 4 + + // A. 建立连接 + conn, err := net.DialTimeout(tcp, serverAddress, time.Second) + //conn, err := net.Dial(tcp, serverAddress) + if err != nil { + log.Println(err) + return + } + // 保证关闭 + defer conn.Close() + log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr()) + + wg := sync.WaitGroup{} + // 并发的写 + wg.Add(1) + go CliWrite(conn, &wg) + + // 并发的读 + wg.Add(1) + go CliRead(conn, &wg) + + wg.Wait() +} + +func CliWrite(conn net.Conn, wg *sync.WaitGroup) { + defer wg.Done() + for { + // B.向客户端发送数据,SerWrite + wn, err := conn.Write([]byte("send some data from client" + "\n")) + if err != nil { + log.Println(err) + } + log.Printf("client write len is %d\n", wn) + time.Sleep(3 * time.Second) + } + +} + +func CliRead(conn net.Conn, wg *sync.WaitGroup) { + defer wg.Done() + for { + // C.从客户端接收数据,SerRead + buf := make([]byte, 1024) + rn, err := conn.Read(buf) + if err != nil { + log.Println(err) + } + log.Println("received from server data is:", string(buf[:rn])) + } +} + +func TcpClientFormat() { + // tcp服务端地址 + serverAddress := "127.0.0.1:5678" // IPv6 4 + + // A. 建立连接 + conn, err := net.DialTimeout(tcp, serverAddress, time.Second) + //conn, err := net.Dial(tcp, serverAddress) + if err != nil { + log.Println(err) + return + } + // 保证关闭 + defer conn.Close() + log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr()) + + wg := sync.WaitGroup{} + + // 接收端 + wg.Add(1) + go CliReadFormat(conn, &wg) + + wg.Wait() +} + +func CliReadFormat(conn net.Conn, wg *sync.WaitGroup) { + defer wg.Done() + for { + // 从客户端接收数据 + // 接收到数据后,先解码 + + // 传递的消息类型 + type Message struct { + ID uint `json:"id,omitempty"` + Code string `json:"code,omitempty"` + Content string `json:"content,omitempty"` + } + message := Message{} + + // 1, JSON解码 + //// 创建解码器 + //decoder := json.NewDecoder(conn) + //// 利用解码器进行解码 + //// 解码操作,从conn中读取内容,成功会将解码后的结果,赋值到message变量 + //if err := decoder.Decode(&message); err != nil { + // log.Println(err) + // continue + //} + //log.Println(message) + + // 2, GOB解码 + // 创建解码器 + decoder := gob.NewDecoder(conn) + // 利用解码器进行解码 + // 解码操作,从conn中读取内容,成功会将解码后的结果,赋值到message变量 + if err := decoder.Decode(&message); err != nil { + log.Println(err) + continue + } + log.Println(message) + + } +} + +// 短连接示例 +func TcpClientSort() { + // tcp服务端地址 + serverAddress := "127.0.0.1:5678" // IPv6 4 + // A. 建立连接 + conn, err := net.DialTimeout(tcp, serverAddress, time.Second) + if err != nil { + log.Println(err) + return + } + // 保证关闭 + defer conn.Close() + log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr()) + + wg := sync.WaitGroup{} + + // 接收端 + wg.Add(1) + go CliReadSort(conn, &wg) + + wg.Wait() +} + +func CliReadSort(conn net.Conn, wg *sync.WaitGroup) { + defer wg.Done() + // 传递的消息类型 + type Message struct { + ID uint `json:"id,omitempty"` + Code string `json:"code,omitempty"` + Content string `json:"content,omitempty"` + } + message := Message{} + for { + // 从客户端接收数据 + // 接收到数据后,先解码 + // GOB解码 + // 创建解码器 + decoder := gob.NewDecoder(conn) + // 利用解码器进行解码 + // 解码操作,从conn中读取内容,成功会将解码后的结果,赋值到message变量 + err := decoder.Decode(&message) + // 错误 io.EOF 时,表示连接被给关闭 + if err != nil && errors.Is(err, io.EOF) { + log.Println(err) + log.Println("link was closed") + break + } + log.Println(message) + } +} + +// 响应服务端的心跳检测 +func TcpClientHB() { + // tcp服务端地址 + serverAddress := "127.0.0.1:5678" // IPv6 4 + // A. 建立连接 + conn, err := net.DialTimeout(tcp, serverAddress, time.Second) + if err != nil { + log.Println(err) + return + } + // 保证关闭 + defer conn.Close() + log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr()) + + wg := sync.WaitGroup{} + // 接收端 + wg.Add(1) + go CliReadPing(conn, &wg) + + wg.Wait() +} + +// 传递的消息类型 +type MessageHB struct { + ID uint `json:"id,omitempty"` + Code string `json:"code,omitempty"` + Content string `json:"content,omitempty"` + Time time.Time `json:"time,omitempty"` +} + +func CliReadPing(conn net.Conn, wg *sync.WaitGroup) { + defer wg.Done() + // 传递的消息类型 + message := MessageHB{} + for { + // GOB解码 + decoder := gob.NewDecoder(conn) + // 解码操作,从conn中读取内容,成功会将解码后的结果,赋值到message变量 + err := decoder.Decode(&message) + // 错误 io.EOF 时,表示连接被给关闭 + if err != nil && errors.Is(err, io.EOF) { + log.Println(err) + break + } + // 判断是为为 ping 类型消息 + if message.Code == "PING-SERVER" { + log.Println("receive ping from", conn.RemoteAddr()) + CliWritePong(conn, message) + } + } +} + +func CliWritePong(conn net.Conn, pingMsg MessageHB) { + pongMsg := MessageHB{ + ID: uint(rand.Int()), + Code: "PONG-CLIENT", + Content: fmt.Sprintf("pingID:%d", pingMsg.ID), + Time: time.Now(), + } + + // GOB, 二进制编码 + // 创建编码器 + encoder := gob.NewEncoder(conn) + // 利用编码器进行编码 + // encode 成功后,会写入到conn,已经完成了conn.Write() + if err := encoder.Encode(pongMsg); err != nil { + log.Println(err) + return + } + log.Println("pong was send to", conn.RemoteAddr()) + return +} + +// 连接池使用 +func TcpClientPool() { + // tcp服务端地址 + serverAddress := "127.0.0.1:5678" // IPv6 4 + // A,建立连接池 + pool, err := NewTcpPool(serverAddress, PoolConfig{ + Factory: &TcpConnFactory{}, + InitConnNum: 4, + }) + if err != nil { + log.Fatalln(err) + } + log.Println(pool, len(pool.idleList)) + + wg := sync.WaitGroup{} + clientNum := 50 + wg.Add(clientNum) + // B, 复用连接池中的连接 + for i := 0; i < clientNum; i++ { + // goroutine 模拟独立的客户端 + go func(wg *sync.WaitGroup) { + defer wg.Done() + // 获取连接 + conn, err := pool.Get() + if err != nil { + log.Println(err) + return + } + //log.Println(conn) + // 回收连接 + pool.Put(conn) + }(&wg) + } + wg.Wait() +} diff --git a/tcp_pool.go b/tcp_pool.go new file mode 100644 index 0000000..88fb2ba --- /dev/null +++ b/tcp_pool.go @@ -0,0 +1,282 @@ +package netProgram + +import ( + "errors" + "log" + "net" + "sync" + "time" +) + +// 连接池接口 +type Pool interface { + // 获取连接 + Get() (net.Conn, error) + // 放回连接, 不是关闭 + Put(net.Conn) error + // 释放池, 关闭全部连接 + Release() error + // 有效连接的长度 + Len() int +} + +// 连接池配置结构 +type PoolConfig struct { + //初始连接数, 池初始化时的连接数 + InitConnNum int + //最大连接数, 池中最多支持多少连接 + MaxConnNum int + //最大空闲连接数, 池中最多有多少可用的连接 + MaxIdleNum int + //空闲连接超时时间, 多久后空闲连接会被释放 + IdleTimeout time.Duration + + // 建立连接的超时时间 + // DialTimeout time.Duration + + // 连接工厂 + Factory ConnFactory +} + +// 空闲连接类型(管理的连接) +type IdleConn struct { + // 连接本身 + conn net.Conn + // 放入池子的时间,用于判断是否空间超时 + putTime time.Time +} + +// 连接池结构 +type TcpPool struct { + // 配置信息 + config PoolConfig + + // 运行时信息 + // 使用的连接数量 + openingConnNum int + // 空闲连接链表 + idleList chan *IdleConn + + // 连接地址 + addr string + + // 并发安全锁 + mu sync.RWMutex +} + +// 连接工厂接口 +type ConnFactory interface { + // 生产连接 + Factory(addr string) (net.Conn, error) + // 关闭连接 + Close(net.Conn) error + // Ping + Ping(net.Conn) error +} + +// Tcp连接工厂类型 +type TcpConnFactory struct{} + +// 产生连接方法 +func (*TcpConnFactory) Factory(addr string) (net.Conn, error) { + // 校验参数的合理性 + if addr == "" { + return nil, errors.New("addr is empty") + } + + // 建立连接 + conn, err := net.DialTimeout("tcp", addr, 5*time.Second) + if err != nil { + return nil, err + } + + // return + return conn, nil +} + +// 关闭连接 +func (*TcpConnFactory) Close(conn net.Conn) error { + return conn.Close() +} + +func (*TcpConnFactory) Ping(conn net.Conn) error { + return nil +} + +const ( + defaultMaxConnNum = 10 + defaultInitConnNum = 1 +) + +// 创建TcpPool对象 +func NewTcpPool(addr string, poolConfig PoolConfig) (*TcpPool, error) { + // 1校验参数 + if addr == "" { + return nil, errors.New("addr is empty") + } + + // 校验工厂的存在 + if poolConfig.Factory == nil { + return nil, errors.New("factory is not exists") + } + + // 最大连接数 + if poolConfig.MaxConnNum == 0 { + //a,return错误 + //return nil, errors.New("max conn num is zero") + + //b,人为修改一个合理的 + poolConfig.MaxConnNum = defaultMaxConnNum + } + // 初始化连接数 + if poolConfig.InitConnNum == 0 { + poolConfig.InitConnNum = defaultInitConnNum + } else if poolConfig.InitConnNum > poolConfig.MaxConnNum { + poolConfig.InitConnNum = poolConfig.MaxConnNum + } + // 合理化最大空闲连接数 + if poolConfig.MaxIdleNum == 0 { + poolConfig.MaxIdleNum = poolConfig.InitConnNum + } else if poolConfig.MaxIdleNum > poolConfig.MaxConnNum { + poolConfig.MaxIdleNum = poolConfig.MaxConnNum + } + + // 2初始化TcpPool对象 + pool := TcpPool{ + config: poolConfig, + openingConnNum: 0, + idleList: make(chan *IdleConn, poolConfig.MaxIdleNum), + addr: addr, + mu: sync.RWMutex{}, + } + + // 3初始化连接 + // 根据InitConnNum的配置来创建 + for i := 0; i < poolConfig.InitConnNum; i++ { + conn, err := pool.config.Factory.Factory(addr) + if err != nil { + // 通常意味着,连接池初始化失败 + // 释放可能已经存在的连接 + pool.Release() + return nil, err + } + // 连接创建成功 + // 加入到空闲连接队列中 + pool.idleList <- &IdleConn{ + conn: conn, + putTime: time.Now(), + } + } + + // 4返回 + return &pool, nil +} + +// TcpPool 实现 Pool 接口 +func (pool *TcpPool) Get() (net.Conn, error) { + // 1锁定 + pool.mu.Lock() + defer pool.mu.Unlock() + + // 2获取空闲连接,若没有则创建连接 + for { + select { + // 获取空闲连接 + case idleConn, ok := <-pool.idleList: + // 判断channel是否被关闭 + if !ok { + return nil, errors.New("idle list closed") + } + + // 判断连接是否超时 + //pool.config.IdleTimeout, idleConn.putTime + if pool.config.IdleTimeout > 0 { // 设置了超时时间 + // putTime + timeout 是否在 now 之前 + if idleConn.putTime.Add(pool.config.IdleTimeout).Before(time.Now()) { + // 关闭连接,继续查找下一个连接 + _ = pool.config.Factory.Close(idleConn.conn) + continue + } + } + + // 判断连接是否可用 + if err := pool.config.Factory.Ping(idleConn.conn); err != nil { + // ping 失败,连接不可用 + // 关闭连接,继续查找 + _ = pool.config.Factory.Close(idleConn.conn) + continue + } + + // 找到了可用的空闲连接 + log.Println("get conn from Idle") + // 使用的连接计数 + pool.openingConnNum++ + // 返回连接 + return idleConn.conn, nil + + // 创建连接 + default: + // a判断是否还可以继续创建 + // 基于开放的连接是否已经达到了连接池最大的连接数 + if pool.openingConnNum >= pool.config.MaxConnNum { + return nil, errors.New("max opening connection") + // 另一种方案,就是阻塞 + //continue + } + + // b创建连接 + conn, err := pool.config.Factory.Factory(pool.addr) + if err != nil { + return nil, err + } + + // c正确创建了可用的连接 + log.Println("get conn from Factory") + // 使用的连接计数 + pool.openingConnNum++ + // 返回连接 + return conn, nil + } + } +} + +func (pool *TcpPool) Put(conn net.Conn) error { + // 1锁 + pool.mu.Lock() + defer pool.mu.Unlock() + + // 2做一些校验 + if conn == nil { + return errors.New("connection is not exists") + } + // 判断空闲连接列表是否存在 + if pool.idleList == nil { + // 关闭连接 + _ = pool.config.Factory.Close(conn) + return errors.New("idle list is not exists") + } + + // 3放回连接 + select { + // 放回连接 + case pool.idleList <- &IdleConn{ + conn: conn, + putTime: time.Now(), + }: + // 只要可以发送成功,任务完成 + // 更新开放的连接数量 + pool.openingConnNum-- + return nil + // 关闭连接 + default: + _ = pool.config.Factory.Close(conn) + return nil + } +} +func (*TcpPool) Release() error { + log.Println("release all connections") + return nil +} +func (*TcpPool) Len() int { + return 0 +} diff --git a/tcp_server.go b/tcp_server.go index ab0fb6f..b100bce 100644 --- a/tcp_server.go +++ b/tcp_server.go @@ -1,8 +1,16 @@ package netProgram import ( + "bytes" + "context" + "encoding/gob" + "encoding/json" + "errors" + "io" "log" + "math/rand" "net" + "sync" "time" ) @@ -44,6 +52,33 @@ func TcpServer() { } } +// 基本读写操作 +func TcpServerRW() { + // A. 基于某个地址建立监听 + // 服务端地址 + address := ":5678" // Any IP or version + listener, err := net.Listen(tcp, address) + if err != nil { + log.Fatalln(err) + } + // 关闭监听 + defer listener.Close() + log.Printf("%s server is listening on %s\n", tcp, listener.Addr()) + + // B. 接受连接请求 + // 循环接受 + for { + // 阻塞接受 + conn, err := listener.Accept() + if err != nil { + log.Println(err) + } + + // 处理连接,读写 + go HandleConn(conn) + } +} + // 服务端 func TcpBacklogServer() { // A. 基于某个地址建立监听 @@ -74,3 +109,490 @@ func TcpBacklogServer() { }(conn) } } + +// 处理每个连接 +func HandleConn(conn net.Conn) { + // 日志连接的远程地址(client addr) + log.Printf("accept from %s\n", conn.RemoteAddr()) + + // A.保证连接关闭 + defer conn.Close() + + // B.向客户端发送数据,SerWrite + wn, err := conn.Write([]byte("send some data from server" + "\n")) + if err != nil { + log.Println(err) + } + log.Printf("server write len is %d\n", wn) + + // C.从客户端接收数据,SerRead + buf := make([]byte, 1024) + rn, err := conn.Read(buf) + if err != nil { + log.Println(err) + } + log.Println("received from client data is:", string(buf[:rn])) +} + +// 基本读写操作 +func TcpW() { + // A. 基于某个地址建立监听 + // 服务端地址 + address := ":5678" // Any IP or version + listener, err := net.Listen(tcp, address) + if err != nil { + log.Fatalln(err) + } + // 关闭监听 + defer listener.Close() + log.Printf("%s server is listening on %s\n", tcp, listener.Addr()) + + // B. 接受连接请求 + // 循环接受 + for { + // 阻塞接受 + conn, err := listener.Accept() + if err != nil { + log.Println(err) + } + + // 处理连接,读写 + go HandleConnW(conn) + } +} + +// 处理每个连接 +func HandleConnW(conn net.Conn) { + // 日志连接的远程地址(client addr) + log.Printf("accept from %s\n", conn.RemoteAddr()) + + // A.保证连接关闭 + defer conn.Close() + + // 1. 严谨的判断是否写入成功 + //data := []byte("send some data from server" + "\n") + //wn, err := conn.SerWrite(data) + //if err != nil { + // log.Println(err) + //} + //// 若要严谨的判断是否写入成功,需要: + //if err == nil && wn == len(data) { + // log.Println("write success") + //} + //log.Printf("server write len is %d\n", wn) + + // 2. 写操作会被阻塞 + //for i := 0; i < 300000; i++ { + // data := []byte("send some data from server" + "\n") + // wn, err := conn.SerWrite(data) + // if err != nil { + // log.Fatalln(err) + // } + // log.Printf("%d, server write len is %d\n", i, wn) + //} + + // 不执行任何写操作 + //time.Sleep(5 * time.Second) + + // 写入一次 + data := []byte("send some data from server" + "\n") + wn, err := conn.Write(data) + if err != nil { + log.Println(err) + } + // 若要严谨的判断是否写入成功,需要: + if err == nil && wn == len(data) { + log.Println("write success") + } + log.Printf("server write len is %d\n", wn) +} + +// 并发的读和写操作,全双工 +func TcpServerRWConcurrency() { + // A. 基于某个地址建立监听 + // 服务端地址 + address := ":5678" // Any IP or version + listener, err := net.Listen(tcp, address) + if err != nil { + log.Fatalln(err) + } + // 关闭监听 + defer listener.Close() + log.Printf("%s server is listening on %s\n", tcp, listener.Addr()) + + // B. 接受连接请求 + // 循环接受 + for { + // 阻塞接受 + conn, err := listener.Accept() + if err != nil { + log.Println(err) + } + + // 处理连接,读写 + go HandleConnConcurrency(conn) + } +} + +// 处理每个连接 +func HandleConnConcurrency(conn net.Conn) { + // 日志连接的远程地址(client addr) + log.Printf("accept from %s\n", conn.RemoteAddr()) + // A.保证连接关闭 + defer conn.Close() + + wg := sync.WaitGroup{} + // 并发的写 + wg.Add(1) + go SerWrite(conn, &wg, "abcd") + wg.Add(1) + go SerWrite(conn, &wg, "efgh") + wg.Add(1) + go SerWrite(conn, &wg, "ijkl") + + // 并发的读 + wg.Add(1) + go SerRead(conn, &wg) + + wg.Wait() +} + +func SerWrite(conn net.Conn, wg *sync.WaitGroup, data string) { + defer wg.Done() + // B.向客户端发送数据,SerWrite + for { + wn, err := conn.Write([]byte(data + "\n")) + if err != nil { + log.Println(err) + } + log.Printf("server write len is %d\n", wn) + time.Sleep(1 * time.Second) + } +} + +func SerRead(conn net.Conn, wg *sync.WaitGroup) { + defer wg.Done() + for { + // C.从客户端接收数据,SerRead + buf := make([]byte, 1024) + rn, err := conn.Read(buf) + if err != nil { + log.Println(err) + } + log.Println("received from client data is:", string(buf[:rn])) + } +} + +// 格式化传输 +func TcpServerFormat() { + // A. 基于某个地址建立监听 + // 服务端地址 + address := ":5678" // Any IP or version + listener, err := net.Listen(tcp, address) + if err != nil { + log.Fatalln(err) + } + // 关闭监听 + defer listener.Close() + log.Printf("%s server is listening on %s\n", tcp, listener.Addr()) + + // B. 接受连接请求 + // 循环接受 + for { + // 阻塞接受 + conn, err := listener.Accept() + if err != nil { + log.Println(err) + } + + // 处理连接,读写 + go HandleConnFormat(conn) + } +} + +func HandleConnFormat(conn net.Conn) { + // 日志连接的远程地址(client addr) + log.Printf("accept from %s\n", conn.RemoteAddr()) + // A.保证连接关闭 + defer conn.Close() + + wg := sync.WaitGroup{} + wg.Add(1) + // 发送端, + go SerWriteFormat(conn, &wg) + wg.Wait() +} + +func SerWriteFormat(conn net.Conn, wg *sync.WaitGroup) { + defer wg.Done() + for { + // 向客户端发送数据 + // 数据编码后发送 + + // 创建需要传递的数据 + // 自定义的消息结构类型 + type Message struct { + ID uint `json:"id,omitempty"` + Code string `json:"code,omitempty"` + Content string `json:"content,omitempty"` + } + message := Message{ + ID: uint(rand.Int()), + Code: "SERVER-STANDARD", + Content: "message from server", + } + // 编码后数据的展示 + var buf bytes.Buffer + encoderData := json.NewEncoder(&buf) + //encoderData := gob.NewEncoder(&buf) + if err := encoderData.Encode(message); err != nil { + log.Println(err) + continue + } + log.Println(buf.String()) + + // 1, JSON, 文本编码 + //// 创建编码器 + //encoder := json.NewEncoder(conn) + //// 利用编码器进行编码 + //// encode 成功后,会写入到conn,已经完成了conn.Write() + //if err := encoder.Encode(message); err != nil { + // log.Println(err) + // continue + //} + //log.Println("message was send") + + // 2, GOB, 二进制编码 + // 创建编码器 + encoder := gob.NewEncoder(conn) + // 利用编码器进行编码 + // encode 成功后,会写入到conn,已经完成了conn.Write() + if err := encoder.Encode(message); err != nil { + log.Println(err) + continue + } + log.Println("message was send") + + time.Sleep(1 * time.Second) + } +} + +// 短连接编程示例 +func TcpServerSort() { + // A. 基于某个地址建立监听 + // 服务端地址 + address := ":5678" // Any IP or version + listener, err := net.Listen(tcp, address) + if err != nil { + log.Fatalln(err) + } + // 关闭监听 + defer listener.Close() + log.Printf("%s server is listening on %s\n", tcp, listener.Addr()) + + // B. 接受连接请求 + // 循环接受 + for { + // 阻塞接受 + conn, err := listener.Accept() + if err != nil { + log.Println(err) + } + + // 处理连接,读写 + go HandleConnSort(conn) + } +} + +func HandleConnSort(conn net.Conn) { + // 日志连接的远程地址(client addr) + log.Printf("accept from %s\n", conn.RemoteAddr()) + // A.保证连接关闭 + defer conn.Close() + + wg := sync.WaitGroup{} + wg.Add(1) + // 发送端, + go SerWriteSort(conn, &wg) + wg.Wait() +} + +func SerWriteSort(conn net.Conn, wg *sync.WaitGroup) { + defer wg.Done() + + // 创建需要传递的数据 + // 自定义的消息结构类型 + type Message struct { + ID uint `json:"id,omitempty"` + Code string `json:"code,omitempty"` + Content string `json:"content,omitempty"` + } + message := Message{ + ID: uint(rand.Int()), + Code: "SERVER-STANDARD", + Content: "message from server", + } + + // GOB, 二进制编码 + // 创建编码器 + encoder := gob.NewEncoder(conn) + // 利用编码器进行编码 + // encode 成功后,会写入到conn,已经完成了conn.Write() + if err := encoder.Encode(message); err != nil { + log.Println(err) + return + } + log.Println("message was send") + log.Println("link will be close") + return +} + +// 心跳检测 +func TcpServerHB() { + // A. 基于某个地址建立监听 + // 服务端地址 + address := ":5678" // Any IP or version + listener, err := net.Listen(tcp, address) + if err != nil { + log.Fatalln(err) + } + // 关闭监听 + defer listener.Close() + log.Printf("%s server is listening on %s\n", tcp, listener.Addr()) + + // B. 接受连接请求 + // 循环接受 + for { + // 阻塞接受 + conn, err := listener.Accept() + if err != nil { + log.Println(err) + } + + // 处理连接,读写 + go HandleConnHB(conn) + } +} + +func HandleConnHB(conn net.Conn) { + // 日志连接的远程地址(client addr) + log.Printf("accept from %s\n", conn.RemoteAddr()) + // A.保证连接关闭 + defer func() { + conn.Close() + log.Println("connection be closed") + }() + + wg := sync.WaitGroup{} + + // 独立的goroutine,在连接建立后,周期发送ping + wg.Add(1) + // 发送ping + go SerPing(conn, &wg) + wg.Wait() +} + +func SerPing(conn net.Conn, wg *sync.WaitGroup) { + defer wg.Done() + + // 启动接收pong + ctx, cancel := context.WithCancel(context.Background()) + go SerReadPong(conn, ctx) + + // ping失败的次数 + const maxPingNum = 3 + pingErrCounter := 0 + + //周期性的发送 + //利用 time.Ticker + ticker := time.NewTicker(2 * time.Second) + for t := range ticker.C { + pingMsg := MessageHB{ + ID: uint(rand.Int()), + Code: "PING-SERVER", + Time: t, + } + + // GOB, 二进制编码 + encoder := gob.NewEncoder(conn) + // encode 成功后,会写入到conn,已经完成了conn.Write() + if err := encoder.Encode(pingMsg); err != nil { + log.Println(err) + // 连接有问题的情况 + // 累加错误计数器 + pingErrCounter++ + // 判断是否到达上限 + if pingErrCounter == maxPingNum { + // 心跳失败 + // 终止pong的处理 + cancel() + return + } + } + log.Printf("ping send to %s, ping id is %d\n", conn.RemoteAddr(), pingMsg.ID) + } +} +func SerReadPong(conn net.Conn, ctx context.Context) { + + for { + // 处理Ping结束 + select { + case <-ctx.Done(): + return + default: + message := MessageHB{} + // GOB解码 + decoder := gob.NewDecoder(conn) + // 解码操作,从conn中读取内容,成功会将解码后的结果,赋值到message变量 + err := decoder.Decode(&message) + // 错误 io.EOF 时,表示连接被给关闭 + if err != nil && errors.Is(err, io.EOF) { + log.Println(err) + break + } + // 判断是为为 pong 类型消息 + if message.Code == "PONG-CLIENT" { + log.Printf("receive pong from %s, %s\n", conn.RemoteAddr(), message.Content) + } + } + } +} + +// 测试连接池服务端 +func TcpServerPool() { + // A. 基于某个地址建立监听 + // 服务端地址 + address := ":5678" // Any IP or version + listener, err := net.Listen(tcp, address) + if err != nil { + log.Fatalln(err) + } + // 关闭监听 + defer listener.Close() + log.Printf("%s server is listening on %s\n", tcp, listener.Addr()) + + // B. 接受连接请求 + // 循环接受 + for { + // 阻塞接受 + conn, err := listener.Accept() + if err != nil { + log.Println(err) + } + + // 处理连接,读写 + go HandleConnPool(conn) + } +} + +func HandleConnPool(conn net.Conn) { + // 日志连接的远程地址(client addr) + log.Printf("accept from %s\n", conn.RemoteAddr()) + // A.保证连接关闭 + defer func() { + conn.Close() + log.Println("connection be closed") + }() + + select {} +} diff --git a/tcp_test.go b/tcp_test.go index 9a185ea..165a930 100644 --- a/tcp_test.go +++ b/tcp_test.go @@ -21,3 +21,59 @@ func TestTcpBacklogServer(t *testing.T) { func TestTcpBacklogClient(t *testing.T) { TcpBacklogClient() } + +func TestTcpServerRW(t *testing.T) { + TcpServerRW() +} + +func TestTcpClientRW(t *testing.T) { + TcpClientRW() +} + +// Wirte方法测试 +func TestTcpW(t *testing.T) { + TcpW() +} +func TestTcpWClient(t *testing.T) { + TcpWClient() +} + +// 并发读写的测试 +func TestTcpServerRWConcurrency(t *testing.T) { + TcpServerRWConcurrency() +} +func TestTcpClientRWConcurrency(t *testing.T) { + TcpClientRWConcurrency() +} + +// 格式化消息的测试 +func TestTcpServerFormat(t *testing.T) { + TcpServerFormat() +} +func TestTcpClientFormat(t *testing.T) { + TcpClientFormat() +} + +// 短连接测试 +func TestTcpServerSort(t *testing.T) { + TcpServerSort() +} +func TestTcpClientSort(t *testing.T) { + TcpClientSort() +} + +// 长连接下的HeartBeat +func TestTcpServerHB(t *testing.T) { + TcpServerHB() +} +func TestTcpClientHB(t *testing.T) { + TcpClientHB() +} + +// 连接池测试 +func TestTcpServerPool(t *testing.T) { + TcpServerPool() +} +func TestTcpClientPool(t *testing.T) { + TcpClientPool() +}