main
han-joker 1 year ago
parent 4cc459c6de
commit 314b448830

@ -1,7 +1,12 @@
package netProgram
import (
"encoding/gob"
"errors"
"fmt"
"io"
"log"
"math/rand"
"net"
"sync"
"time"
@ -99,3 +104,350 @@ func TcpBacklogClient() {
wg.Wait()
}
func TcpClientRW() {
// tcp服务端地址
serverAddress := "127.0.0.1:5678" // IPv6 4
// A. 建立连接
conn, err := net.DialTimeout(tcp, serverAddress, time.Second)
//conn, err := net.Dial(tcp, serverAddress)
if err != nil {
log.Println(err)
return
}
// 保证关闭
defer conn.Close()
log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr())
// B.从服务端接收数据SerRead
buf := make([]byte, 1024)
rn, err := conn.Read(buf)
if err != nil {
log.Println(err)
}
log.Println("received from server data is:", string(buf[:rn]))
// C.向服务器端发送数据SerWrite
wn, err := conn.Write([]byte("send some data from client" + "\n"))
if err != nil {
log.Println(err)
}
log.Printf("client write len is %d\n", wn)
}
func TcpWClient() {
// tcp服务端地址
serverAddress := "127.0.0.1:5678" // IPv6 4
// A. 建立连接
conn, err := net.DialTimeout(tcp, serverAddress, time.Second)
//conn, err := net.Dial(tcp, serverAddress)
if err != nil {
log.Println(err)
return
}
// 保证关闭
defer conn.Close()
log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr())
//
//select {}
// 1. 阻塞Read
//buf := make([]byte, 1024)
//rn, err := conn.SerRead(buf)
//if err != nil {
// log.Println(err)
//}
//log.Println("received from server data is:", string(buf[:rn]))
// 2. 循环读
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
for {
buf := make([]byte, 10)
rn, err := conn.Read(buf)
if err != nil {
log.Println(err)
break
}
log.Println("received from server data is:", string(buf[:rn]))
}
}
func TcpClientRWConcurrency() {
// tcp服务端地址
serverAddress := "127.0.0.1:5678" // IPv6 4
// A. 建立连接
conn, err := net.DialTimeout(tcp, serverAddress, time.Second)
//conn, err := net.Dial(tcp, serverAddress)
if err != nil {
log.Println(err)
return
}
// 保证关闭
defer conn.Close()
log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr())
wg := sync.WaitGroup{}
// 并发的写
wg.Add(1)
go CliWrite(conn, &wg)
// 并发的读
wg.Add(1)
go CliRead(conn, &wg)
wg.Wait()
}
func CliWrite(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for {
// B.向客户端发送数据SerWrite
wn, err := conn.Write([]byte("send some data from client" + "\n"))
if err != nil {
log.Println(err)
}
log.Printf("client write len is %d\n", wn)
time.Sleep(3 * time.Second)
}
}
func CliRead(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for {
// C.从客户端接收数据SerRead
buf := make([]byte, 1024)
rn, err := conn.Read(buf)
if err != nil {
log.Println(err)
}
log.Println("received from server data is:", string(buf[:rn]))
}
}
func TcpClientFormat() {
// tcp服务端地址
serverAddress := "127.0.0.1:5678" // IPv6 4
// A. 建立连接
conn, err := net.DialTimeout(tcp, serverAddress, time.Second)
//conn, err := net.Dial(tcp, serverAddress)
if err != nil {
log.Println(err)
return
}
// 保证关闭
defer conn.Close()
log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr())
wg := sync.WaitGroup{}
// 接收端
wg.Add(1)
go CliReadFormat(conn, &wg)
wg.Wait()
}
func CliReadFormat(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for {
// 从客户端接收数据
// 接收到数据后,先解码
// 传递的消息类型
type Message struct {
ID uint `json:"id,omitempty"`
Code string `json:"code,omitempty"`
Content string `json:"content,omitempty"`
}
message := Message{}
// 1, JSON解码
//// 创建解码器
//decoder := json.NewDecoder(conn)
//// 利用解码器进行解码
//// 解码操作从conn中读取内容成功会将解码后的结果赋值到message变量
//if err := decoder.Decode(&message); err != nil {
// log.Println(err)
// continue
//}
//log.Println(message)
// 2, GOB解码
// 创建解码器
decoder := gob.NewDecoder(conn)
// 利用解码器进行解码
// 解码操作从conn中读取内容成功会将解码后的结果赋值到message变量
if err := decoder.Decode(&message); err != nil {
log.Println(err)
continue
}
log.Println(message)
}
}
// 短连接示例
func TcpClientSort() {
// tcp服务端地址
serverAddress := "127.0.0.1:5678" // IPv6 4
// A. 建立连接
conn, err := net.DialTimeout(tcp, serverAddress, time.Second)
if err != nil {
log.Println(err)
return
}
// 保证关闭
defer conn.Close()
log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr())
wg := sync.WaitGroup{}
// 接收端
wg.Add(1)
go CliReadSort(conn, &wg)
wg.Wait()
}
func CliReadSort(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
// 传递的消息类型
type Message struct {
ID uint `json:"id,omitempty"`
Code string `json:"code,omitempty"`
Content string `json:"content,omitempty"`
}
message := Message{}
for {
// 从客户端接收数据
// 接收到数据后,先解码
// GOB解码
// 创建解码器
decoder := gob.NewDecoder(conn)
// 利用解码器进行解码
// 解码操作从conn中读取内容成功会将解码后的结果赋值到message变量
err := decoder.Decode(&message)
// 错误 io.EOF 时,表示连接被给关闭
if err != nil && errors.Is(err, io.EOF) {
log.Println(err)
log.Println("link was closed")
break
}
log.Println(message)
}
}
// 响应服务端的心跳检测
func TcpClientHB() {
// tcp服务端地址
serverAddress := "127.0.0.1:5678" // IPv6 4
// A. 建立连接
conn, err := net.DialTimeout(tcp, serverAddress, time.Second)
if err != nil {
log.Println(err)
return
}
// 保证关闭
defer conn.Close()
log.Printf("connection is establish, client addr is %s\n", conn.LocalAddr())
wg := sync.WaitGroup{}
// 接收端
wg.Add(1)
go CliReadPing(conn, &wg)
wg.Wait()
}
// 传递的消息类型
type MessageHB struct {
ID uint `json:"id,omitempty"`
Code string `json:"code,omitempty"`
Content string `json:"content,omitempty"`
Time time.Time `json:"time,omitempty"`
}
func CliReadPing(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
// 传递的消息类型
message := MessageHB{}
for {
// GOB解码
decoder := gob.NewDecoder(conn)
// 解码操作从conn中读取内容成功会将解码后的结果赋值到message变量
err := decoder.Decode(&message)
// 错误 io.EOF 时,表示连接被给关闭
if err != nil && errors.Is(err, io.EOF) {
log.Println(err)
break
}
// 判断是为为 ping 类型消息
if message.Code == "PING-SERVER" {
log.Println("receive ping from", conn.RemoteAddr())
CliWritePong(conn, message)
}
}
}
func CliWritePong(conn net.Conn, pingMsg MessageHB) {
pongMsg := MessageHB{
ID: uint(rand.Int()),
Code: "PONG-CLIENT",
Content: fmt.Sprintf("pingID:%d", pingMsg.ID),
Time: time.Now(),
}
// GOB, 二进制编码
// 创建编码器
encoder := gob.NewEncoder(conn)
// 利用编码器进行编码
// encode 成功后会写入到conn已经完成了conn.Write()
if err := encoder.Encode(pongMsg); err != nil {
log.Println(err)
return
}
log.Println("pong was send to", conn.RemoteAddr())
return
}
// 连接池使用
func TcpClientPool() {
// tcp服务端地址
serverAddress := "127.0.0.1:5678" // IPv6 4
// A建立连接池
pool, err := NewTcpPool(serverAddress, PoolConfig{
Factory: &TcpConnFactory{},
InitConnNum: 4,
})
if err != nil {
log.Fatalln(err)
}
log.Println(pool, len(pool.idleList))
wg := sync.WaitGroup{}
clientNum := 50
wg.Add(clientNum)
// B, 复用连接池中的连接
for i := 0; i < clientNum; i++ {
// goroutine 模拟独立的客户端
go func(wg *sync.WaitGroup) {
defer wg.Done()
// 获取连接
conn, err := pool.Get()
if err != nil {
log.Println(err)
return
}
//log.Println(conn)
// 回收连接
pool.Put(conn)
}(&wg)
}
wg.Wait()
}

@ -0,0 +1,282 @@
package netProgram
import (
"errors"
"log"
"net"
"sync"
"time"
)
// 连接池接口
type Pool interface {
// 获取连接
Get() (net.Conn, error)
// 放回连接, 不是关闭
Put(net.Conn) error
// 释放池, 关闭全部连接
Release() error
// 有效连接的长度
Len() int
}
// 连接池配置结构
type PoolConfig struct {
//初始连接数, 池初始化时的连接数
InitConnNum int
//最大连接数, 池中最多支持多少连接
MaxConnNum int
//最大空闲连接数, 池中最多有多少可用的连接
MaxIdleNum int
//空闲连接超时时间, 多久后空闲连接会被释放
IdleTimeout time.Duration
// 建立连接的超时时间
// DialTimeout time.Duration
// 连接工厂
Factory ConnFactory
}
// 空闲连接类型(管理的连接)
type IdleConn struct {
// 连接本身
conn net.Conn
// 放入池子的时间,用于判断是否空间超时
putTime time.Time
}
// 连接池结构
type TcpPool struct {
// 配置信息
config PoolConfig
// 运行时信息
// 使用的连接数量
openingConnNum int
// 空闲连接链表
idleList chan *IdleConn
// 连接地址
addr string
// 并发安全锁
mu sync.RWMutex
}
// 连接工厂接口
type ConnFactory interface {
// 生产连接
Factory(addr string) (net.Conn, error)
// 关闭连接
Close(net.Conn) error
// Ping
Ping(net.Conn) error
}
// Tcp连接工厂类型
type TcpConnFactory struct{}
// 产生连接方法
func (*TcpConnFactory) Factory(addr string) (net.Conn, error) {
// 校验参数的合理性
if addr == "" {
return nil, errors.New("addr is empty")
}
// 建立连接
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err != nil {
return nil, err
}
// return
return conn, nil
}
// 关闭连接
func (*TcpConnFactory) Close(conn net.Conn) error {
return conn.Close()
}
func (*TcpConnFactory) Ping(conn net.Conn) error {
return nil
}
const (
defaultMaxConnNum = 10
defaultInitConnNum = 1
)
// 创建TcpPool对象
func NewTcpPool(addr string, poolConfig PoolConfig) (*TcpPool, error) {
// 1校验参数
if addr == "" {
return nil, errors.New("addr is empty")
}
// 校验工厂的存在
if poolConfig.Factory == nil {
return nil, errors.New("factory is not exists")
}
// 最大连接数
if poolConfig.MaxConnNum == 0 {
//a,return错误
//return nil, errors.New("max conn num is zero")
//b,人为修改一个合理的
poolConfig.MaxConnNum = defaultMaxConnNum
}
// 初始化连接数
if poolConfig.InitConnNum == 0 {
poolConfig.InitConnNum = defaultInitConnNum
} else if poolConfig.InitConnNum > poolConfig.MaxConnNum {
poolConfig.InitConnNum = poolConfig.MaxConnNum
}
// 合理化最大空闲连接数
if poolConfig.MaxIdleNum == 0 {
poolConfig.MaxIdleNum = poolConfig.InitConnNum
} else if poolConfig.MaxIdleNum > poolConfig.MaxConnNum {
poolConfig.MaxIdleNum = poolConfig.MaxConnNum
}
// 2初始化TcpPool对象
pool := TcpPool{
config: poolConfig,
openingConnNum: 0,
idleList: make(chan *IdleConn, poolConfig.MaxIdleNum),
addr: addr,
mu: sync.RWMutex{},
}
// 3初始化连接
// 根据InitConnNum的配置来创建
for i := 0; i < poolConfig.InitConnNum; i++ {
conn, err := pool.config.Factory.Factory(addr)
if err != nil {
// 通常意味着,连接池初始化失败
// 释放可能已经存在的连接
pool.Release()
return nil, err
}
// 连接创建成功
// 加入到空闲连接队列中
pool.idleList <- &IdleConn{
conn: conn,
putTime: time.Now(),
}
}
// 4返回
return &pool, nil
}
// TcpPool 实现 Pool 接口
func (pool *TcpPool) Get() (net.Conn, error) {
// 1锁定
pool.mu.Lock()
defer pool.mu.Unlock()
// 2获取空闲连接若没有则创建连接
for {
select {
// 获取空闲连接
case idleConn, ok := <-pool.idleList:
// 判断channel是否被关闭
if !ok {
return nil, errors.New("idle list closed")
}
// 判断连接是否超时
//pool.config.IdleTimeout, idleConn.putTime
if pool.config.IdleTimeout > 0 { // 设置了超时时间
// putTime + timeout 是否在 now 之前
if idleConn.putTime.Add(pool.config.IdleTimeout).Before(time.Now()) {
// 关闭连接,继续查找下一个连接
_ = pool.config.Factory.Close(idleConn.conn)
continue
}
}
// 判断连接是否可用
if err := pool.config.Factory.Ping(idleConn.conn); err != nil {
// ping 失败,连接不可用
// 关闭连接,继续查找
_ = pool.config.Factory.Close(idleConn.conn)
continue
}
// 找到了可用的空闲连接
log.Println("get conn from Idle")
// 使用的连接计数
pool.openingConnNum++
// 返回连接
return idleConn.conn, nil
// 创建连接
default:
// a判断是否还可以继续创建
// 基于开放的连接是否已经达到了连接池最大的连接数
if pool.openingConnNum >= pool.config.MaxConnNum {
return nil, errors.New("max opening connection")
// 另一种方案,就是阻塞
//continue
}
// b创建连接
conn, err := pool.config.Factory.Factory(pool.addr)
if err != nil {
return nil, err
}
// c正确创建了可用的连接
log.Println("get conn from Factory")
// 使用的连接计数
pool.openingConnNum++
// 返回连接
return conn, nil
}
}
}
func (pool *TcpPool) Put(conn net.Conn) error {
// 1锁
pool.mu.Lock()
defer pool.mu.Unlock()
// 2做一些校验
if conn == nil {
return errors.New("connection is not exists")
}
// 判断空闲连接列表是否存在
if pool.idleList == nil {
// 关闭连接
_ = pool.config.Factory.Close(conn)
return errors.New("idle list is not exists")
}
// 3放回连接
select {
// 放回连接
case pool.idleList <- &IdleConn{
conn: conn,
putTime: time.Now(),
}:
// 只要可以发送成功,任务完成
// 更新开放的连接数量
pool.openingConnNum--
return nil
// 关闭连接
default:
_ = pool.config.Factory.Close(conn)
return nil
}
}
func (*TcpPool) Release() error {
log.Println("release all connections")
return nil
}
func (*TcpPool) Len() int {
return 0
}

@ -1,8 +1,16 @@
package netProgram
import (
"bytes"
"context"
"encoding/gob"
"encoding/json"
"errors"
"io"
"log"
"math/rand"
"net"
"sync"
"time"
)
@ -44,6 +52,33 @@ func TcpServer() {
}
}
// 基本读写操作
func TcpServerRW() {
// A. 基于某个地址建立监听
// 服务端地址
address := ":5678" // Any IP or version
listener, err := net.Listen(tcp, address)
if err != nil {
log.Fatalln(err)
}
// 关闭监听
defer listener.Close()
log.Printf("%s server is listening on %s\n", tcp, listener.Addr())
// B. 接受连接请求
// 循环接受
for {
// 阻塞接受
conn, err := listener.Accept()
if err != nil {
log.Println(err)
}
// 处理连接,读写
go HandleConn(conn)
}
}
// 服务端
func TcpBacklogServer() {
// A. 基于某个地址建立监听
@ -74,3 +109,490 @@ func TcpBacklogServer() {
}(conn)
}
}
// 处理每个连接
func HandleConn(conn net.Conn) {
// 日志连接的远程地址client addr
log.Printf("accept from %s\n", conn.RemoteAddr())
// A.保证连接关闭
defer conn.Close()
// B.向客户端发送数据SerWrite
wn, err := conn.Write([]byte("send some data from server" + "\n"))
if err != nil {
log.Println(err)
}
log.Printf("server write len is %d\n", wn)
// C.从客户端接收数据SerRead
buf := make([]byte, 1024)
rn, err := conn.Read(buf)
if err != nil {
log.Println(err)
}
log.Println("received from client data is:", string(buf[:rn]))
}
// 基本读写操作
func TcpW() {
// A. 基于某个地址建立监听
// 服务端地址
address := ":5678" // Any IP or version
listener, err := net.Listen(tcp, address)
if err != nil {
log.Fatalln(err)
}
// 关闭监听
defer listener.Close()
log.Printf("%s server is listening on %s\n", tcp, listener.Addr())
// B. 接受连接请求
// 循环接受
for {
// 阻塞接受
conn, err := listener.Accept()
if err != nil {
log.Println(err)
}
// 处理连接,读写
go HandleConnW(conn)
}
}
// 处理每个连接
func HandleConnW(conn net.Conn) {
// 日志连接的远程地址client addr
log.Printf("accept from %s\n", conn.RemoteAddr())
// A.保证连接关闭
defer conn.Close()
// 1. 严谨的判断是否写入成功
//data := []byte("send some data from server" + "\n")
//wn, err := conn.SerWrite(data)
//if err != nil {
// log.Println(err)
//}
//// 若要严谨的判断是否写入成功,需要:
//if err == nil && wn == len(data) {
// log.Println("write success")
//}
//log.Printf("server write len is %d\n", wn)
// 2. 写操作会被阻塞
//for i := 0; i < 300000; i++ {
// data := []byte("send some data from server" + "\n")
// wn, err := conn.SerWrite(data)
// if err != nil {
// log.Fatalln(err)
// }
// log.Printf("%d, server write len is %d\n", i, wn)
//}
// 不执行任何写操作
//time.Sleep(5 * time.Second)
// 写入一次
data := []byte("send some data from server" + "\n")
wn, err := conn.Write(data)
if err != nil {
log.Println(err)
}
// 若要严谨的判断是否写入成功,需要:
if err == nil && wn == len(data) {
log.Println("write success")
}
log.Printf("server write len is %d\n", wn)
}
// 并发的读和写操作,全双工
func TcpServerRWConcurrency() {
// A. 基于某个地址建立监听
// 服务端地址
address := ":5678" // Any IP or version
listener, err := net.Listen(tcp, address)
if err != nil {
log.Fatalln(err)
}
// 关闭监听
defer listener.Close()
log.Printf("%s server is listening on %s\n", tcp, listener.Addr())
// B. 接受连接请求
// 循环接受
for {
// 阻塞接受
conn, err := listener.Accept()
if err != nil {
log.Println(err)
}
// 处理连接,读写
go HandleConnConcurrency(conn)
}
}
// 处理每个连接
func HandleConnConcurrency(conn net.Conn) {
// 日志连接的远程地址client addr
log.Printf("accept from %s\n", conn.RemoteAddr())
// A.保证连接关闭
defer conn.Close()
wg := sync.WaitGroup{}
// 并发的写
wg.Add(1)
go SerWrite(conn, &wg, "abcd")
wg.Add(1)
go SerWrite(conn, &wg, "efgh")
wg.Add(1)
go SerWrite(conn, &wg, "ijkl")
// 并发的读
wg.Add(1)
go SerRead(conn, &wg)
wg.Wait()
}
func SerWrite(conn net.Conn, wg *sync.WaitGroup, data string) {
defer wg.Done()
// B.向客户端发送数据SerWrite
for {
wn, err := conn.Write([]byte(data + "\n"))
if err != nil {
log.Println(err)
}
log.Printf("server write len is %d\n", wn)
time.Sleep(1 * time.Second)
}
}
func SerRead(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for {
// C.从客户端接收数据SerRead
buf := make([]byte, 1024)
rn, err := conn.Read(buf)
if err != nil {
log.Println(err)
}
log.Println("received from client data is:", string(buf[:rn]))
}
}
// 格式化传输
func TcpServerFormat() {
// A. 基于某个地址建立监听
// 服务端地址
address := ":5678" // Any IP or version
listener, err := net.Listen(tcp, address)
if err != nil {
log.Fatalln(err)
}
// 关闭监听
defer listener.Close()
log.Printf("%s server is listening on %s\n", tcp, listener.Addr())
// B. 接受连接请求
// 循环接受
for {
// 阻塞接受
conn, err := listener.Accept()
if err != nil {
log.Println(err)
}
// 处理连接,读写
go HandleConnFormat(conn)
}
}
func HandleConnFormat(conn net.Conn) {
// 日志连接的远程地址client addr
log.Printf("accept from %s\n", conn.RemoteAddr())
// A.保证连接关闭
defer conn.Close()
wg := sync.WaitGroup{}
wg.Add(1)
// 发送端,
go SerWriteFormat(conn, &wg)
wg.Wait()
}
func SerWriteFormat(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for {
// 向客户端发送数据
// 数据编码后发送
// 创建需要传递的数据
// 自定义的消息结构类型
type Message struct {
ID uint `json:"id,omitempty"`
Code string `json:"code,omitempty"`
Content string `json:"content,omitempty"`
}
message := Message{
ID: uint(rand.Int()),
Code: "SERVER-STANDARD",
Content: "message from server",
}
// 编码后数据的展示
var buf bytes.Buffer
encoderData := json.NewEncoder(&buf)
//encoderData := gob.NewEncoder(&buf)
if err := encoderData.Encode(message); err != nil {
log.Println(err)
continue
}
log.Println(buf.String())
// 1, JSON, 文本编码
//// 创建编码器
//encoder := json.NewEncoder(conn)
//// 利用编码器进行编码
//// encode 成功后会写入到conn已经完成了conn.Write()
//if err := encoder.Encode(message); err != nil {
// log.Println(err)
// continue
//}
//log.Println("message was send")
// 2, GOB, 二进制编码
// 创建编码器
encoder := gob.NewEncoder(conn)
// 利用编码器进行编码
// encode 成功后会写入到conn已经完成了conn.Write()
if err := encoder.Encode(message); err != nil {
log.Println(err)
continue
}
log.Println("message was send")
time.Sleep(1 * time.Second)
}
}
// 短连接编程示例
func TcpServerSort() {
// A. 基于某个地址建立监听
// 服务端地址
address := ":5678" // Any IP or version
listener, err := net.Listen(tcp, address)
if err != nil {
log.Fatalln(err)
}
// 关闭监听
defer listener.Close()
log.Printf("%s server is listening on %s\n", tcp, listener.Addr())
// B. 接受连接请求
// 循环接受
for {
// 阻塞接受
conn, err := listener.Accept()
if err != nil {
log.Println(err)
}
// 处理连接,读写
go HandleConnSort(conn)
}
}
func HandleConnSort(conn net.Conn) {
// 日志连接的远程地址client addr
log.Printf("accept from %s\n", conn.RemoteAddr())
// A.保证连接关闭
defer conn.Close()
wg := sync.WaitGroup{}
wg.Add(1)
// 发送端,
go SerWriteSort(conn, &wg)
wg.Wait()
}
func SerWriteSort(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
// 创建需要传递的数据
// 自定义的消息结构类型
type Message struct {
ID uint `json:"id,omitempty"`
Code string `json:"code,omitempty"`
Content string `json:"content,omitempty"`
}
message := Message{
ID: uint(rand.Int()),
Code: "SERVER-STANDARD",
Content: "message from server",
}
// GOB, 二进制编码
// 创建编码器
encoder := gob.NewEncoder(conn)
// 利用编码器进行编码
// encode 成功后会写入到conn已经完成了conn.Write()
if err := encoder.Encode(message); err != nil {
log.Println(err)
return
}
log.Println("message was send")
log.Println("link will be close")
return
}
// 心跳检测
func TcpServerHB() {
// A. 基于某个地址建立监听
// 服务端地址
address := ":5678" // Any IP or version
listener, err := net.Listen(tcp, address)
if err != nil {
log.Fatalln(err)
}
// 关闭监听
defer listener.Close()
log.Printf("%s server is listening on %s\n", tcp, listener.Addr())
// B. 接受连接请求
// 循环接受
for {
// 阻塞接受
conn, err := listener.Accept()
if err != nil {
log.Println(err)
}
// 处理连接,读写
go HandleConnHB(conn)
}
}
func HandleConnHB(conn net.Conn) {
// 日志连接的远程地址client addr
log.Printf("accept from %s\n", conn.RemoteAddr())
// A.保证连接关闭
defer func() {
conn.Close()
log.Println("connection be closed")
}()
wg := sync.WaitGroup{}
// 独立的goroutine在连接建立后周期发送ping
wg.Add(1)
// 发送ping
go SerPing(conn, &wg)
wg.Wait()
}
func SerPing(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
// 启动接收pong
ctx, cancel := context.WithCancel(context.Background())
go SerReadPong(conn, ctx)
// ping失败的次数
const maxPingNum = 3
pingErrCounter := 0
//周期性的发送
//利用 time.Ticker
ticker := time.NewTicker(2 * time.Second)
for t := range ticker.C {
pingMsg := MessageHB{
ID: uint(rand.Int()),
Code: "PING-SERVER",
Time: t,
}
// GOB, 二进制编码
encoder := gob.NewEncoder(conn)
// encode 成功后会写入到conn已经完成了conn.Write()
if err := encoder.Encode(pingMsg); err != nil {
log.Println(err)
// 连接有问题的情况
// 累加错误计数器
pingErrCounter++
// 判断是否到达上限
if pingErrCounter == maxPingNum {
// 心跳失败
// 终止pong的处理
cancel()
return
}
}
log.Printf("ping send to %s, ping id is %d\n", conn.RemoteAddr(), pingMsg.ID)
}
}
func SerReadPong(conn net.Conn, ctx context.Context) {
for {
// 处理Ping结束
select {
case <-ctx.Done():
return
default:
message := MessageHB{}
// GOB解码
decoder := gob.NewDecoder(conn)
// 解码操作从conn中读取内容成功会将解码后的结果赋值到message变量
err := decoder.Decode(&message)
// 错误 io.EOF 时,表示连接被给关闭
if err != nil && errors.Is(err, io.EOF) {
log.Println(err)
break
}
// 判断是为为 pong 类型消息
if message.Code == "PONG-CLIENT" {
log.Printf("receive pong from %s, %s\n", conn.RemoteAddr(), message.Content)
}
}
}
}
// 测试连接池服务端
func TcpServerPool() {
// A. 基于某个地址建立监听
// 服务端地址
address := ":5678" // Any IP or version
listener, err := net.Listen(tcp, address)
if err != nil {
log.Fatalln(err)
}
// 关闭监听
defer listener.Close()
log.Printf("%s server is listening on %s\n", tcp, listener.Addr())
// B. 接受连接请求
// 循环接受
for {
// 阻塞接受
conn, err := listener.Accept()
if err != nil {
log.Println(err)
}
// 处理连接,读写
go HandleConnPool(conn)
}
}
func HandleConnPool(conn net.Conn) {
// 日志连接的远程地址client addr
log.Printf("accept from %s\n", conn.RemoteAddr())
// A.保证连接关闭
defer func() {
conn.Close()
log.Println("connection be closed")
}()
select {}
}

@ -21,3 +21,59 @@ func TestTcpBacklogServer(t *testing.T) {
func TestTcpBacklogClient(t *testing.T) {
TcpBacklogClient()
}
func TestTcpServerRW(t *testing.T) {
TcpServerRW()
}
func TestTcpClientRW(t *testing.T) {
TcpClientRW()
}
// Wirte方法测试
func TestTcpW(t *testing.T) {
TcpW()
}
func TestTcpWClient(t *testing.T) {
TcpWClient()
}
// 并发读写的测试
func TestTcpServerRWConcurrency(t *testing.T) {
TcpServerRWConcurrency()
}
func TestTcpClientRWConcurrency(t *testing.T) {
TcpClientRWConcurrency()
}
// 格式化消息的测试
func TestTcpServerFormat(t *testing.T) {
TcpServerFormat()
}
func TestTcpClientFormat(t *testing.T) {
TcpClientFormat()
}
// 短连接测试
func TestTcpServerSort(t *testing.T) {
TcpServerSort()
}
func TestTcpClientSort(t *testing.T) {
TcpClientSort()
}
// 长连接下的HeartBeat
func TestTcpServerHB(t *testing.T) {
TcpServerHB()
}
func TestTcpClientHB(t *testing.T) {
TcpClientHB()
}
// 连接池测试
func TestTcpServerPool(t *testing.T) {
TcpServerPool()
}
func TestTcpClientPool(t *testing.T) {
TcpClientPool()
}

Loading…
Cancel
Save