You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

306 lines
6.5 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package netProgram
import (
"errors"
"log"
"net"
"sync"
"time"
)
// 连接池接口
type Pool interface {
// 获取连接
Get() (net.Conn, error)
// 放回连接, 不是关闭
Put(net.Conn) error
// 释放池, 关闭全部连接
Release() error
// 有效连接的长度
Len() int
}
// 连接池配置结构
type PoolConfig struct {
//初始连接数, 池初始化时的连接数
InitConnNum int
//最大连接数, 池中最多支持多少连接
MaxConnNum int
//最大空闲连接数, 池中最多有多少可用的连接
MaxIdleNum int
//空闲连接超时时间, 多久后空闲连接会被释放
IdleTimeout time.Duration
// 建立连接的超时时间
// DialTimeout time.Duration
// 连接工厂
Factory ConnFactory
}
// 空闲连接类型(管理的连接)
type IdleConn struct {
// 连接本身
conn net.Conn
// 放入池子的时间,用于判断是否空间超时
putTime time.Time
}
// 连接池结构
type TcpPool struct {
// 配置信息
config PoolConfig
// 运行时信息
// 使用的连接数量
openingConnNum int
// 空闲连接链表
idleList chan *IdleConn
// 连接地址
addr string
// 并发安全锁
mu sync.RWMutex
}
// 连接工厂接口
type ConnFactory interface {
// 生产连接
Factory(addr string) (net.Conn, error)
// 关闭连接
Close(net.Conn) error
// Ping
Ping(net.Conn) error
}
// Tcp连接工厂类型
type TcpConnFactory struct{}
// 产生连接方法
func (*TcpConnFactory) Factory(addr string) (net.Conn, error) {
// 校验参数的合理性
if addr == "" {
return nil, errors.New("addr is empty")
}
// 建立连接
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err != nil {
return nil, err
}
// return
return conn, nil
}
// 关闭连接
func (*TcpConnFactory) Close(conn net.Conn) error {
return conn.Close()
}
func (*TcpConnFactory) Ping(conn net.Conn) error {
return nil
}
const (
defaultMaxConnNum = 10
defaultInitConnNum = 1
)
// 创建TcpPool对象
func NewTcpPool(addr string, poolConfig PoolConfig) (*TcpPool, error) {
// 1校验参数
if addr == "" {
return nil, errors.New("addr is empty")
}
// 校验工厂的存在
if poolConfig.Factory == nil {
return nil, errors.New("factory is not exists")
}
// 最大连接数
if poolConfig.MaxConnNum == 0 {
//a,return错误
//return nil, errors.New("max conn num is zero")
//b,人为修改一个合理的
poolConfig.MaxConnNum = defaultMaxConnNum
}
// 初始化连接数
if poolConfig.InitConnNum == 0 {
poolConfig.InitConnNum = defaultInitConnNum
} else if poolConfig.InitConnNum > poolConfig.MaxConnNum {
poolConfig.InitConnNum = poolConfig.MaxConnNum
}
// 合理化最大空闲连接数
if poolConfig.MaxIdleNum == 0 {
poolConfig.MaxIdleNum = poolConfig.InitConnNum
} else if poolConfig.MaxIdleNum > poolConfig.MaxConnNum {
poolConfig.MaxIdleNum = poolConfig.MaxConnNum
}
// 2初始化TcpPool对象
pool := TcpPool{
config: poolConfig,
openingConnNum: 0,
idleList: make(chan *IdleConn, poolConfig.MaxIdleNum),
addr: addr,
mu: sync.RWMutex{},
}
// 3初始化连接
// 根据InitConnNum的配置来创建
for i := 0; i < poolConfig.InitConnNum; i++ {
conn, err := pool.config.Factory.Factory(addr)
if err != nil {
// 通常意味着,连接池初始化失败
// 释放可能已经存在的连接
pool.Release()
return nil, err
}
// 连接创建成功
// 加入到空闲连接队列中
pool.idleList <- &IdleConn{
conn: conn,
putTime: time.Now(),
}
}
// 4返回
return &pool, nil
}
// TcpPool 实现 Pool 接口
func (pool *TcpPool) Get() (net.Conn, error) {
// 1锁定
pool.mu.Lock()
defer pool.mu.Unlock()
// 2获取空闲连接若没有则创建连接
for {
select {
// 获取空闲连接
case idleConn, ok := <-pool.idleList:
// 判断channel是否被关闭
if !ok {
return nil, errors.New("idle list closed")
}
// 判断连接是否超时
//pool.config.IdleTimeout, idleConn.putTime
if pool.config.IdleTimeout > 0 { // 设置了超时时间
// putTime + timeout 是否在 now 之前
if idleConn.putTime.Add(pool.config.IdleTimeout).Before(time.Now()) {
// 关闭连接,继续查找下一个连接
_ = pool.config.Factory.Close(idleConn.conn)
continue
}
}
// 判断连接是否可用
if err := pool.config.Factory.Ping(idleConn.conn); err != nil {
// ping 失败,连接不可用
// 关闭连接,继续查找
_ = pool.config.Factory.Close(idleConn.conn)
continue
}
// 找到了可用的空闲连接
log.Println("get conn from Idle")
// 使用的连接计数
pool.openingConnNum++
// 返回连接
return idleConn.conn, nil
// 创建连接
default:
// a判断是否还可以继续创建
// 基于开放的连接是否已经达到了连接池最大的连接数
if pool.openingConnNum >= pool.config.MaxConnNum {
return nil, errors.New("max opening connection")
// 另一种方案,就是阻塞
//continue
}
// b创建连接
conn, err := pool.config.Factory.Factory(pool.addr)
if err != nil {
return nil, err
}
// c正确创建了可用的连接
log.Println("get conn from Factory")
// 使用的连接计数
pool.openingConnNum++
// 返回连接
return conn, nil
}
}
}
func (pool *TcpPool) Put(conn net.Conn) error {
// 1锁
pool.mu.Lock()
defer pool.mu.Unlock()
// 2做一些校验
if conn == nil {
return errors.New("connection is not exists")
}
// 判断空闲连接列表是否存在
if pool.idleList == nil {
// 关闭连接
_ = pool.config.Factory.Close(conn)
return errors.New("idle list is not exists")
}
// 3放回连接
select {
// 放回连接
case pool.idleList <- &IdleConn{
conn: conn,
putTime: time.Now(),
}:
// 只要可以发送成功,任务完成
// 更新开放的连接数量
pool.openingConnNum--
return nil
// 关闭连接
default:
_ = pool.config.Factory.Close(conn)
return nil
}
}
// 释放连接池
func (pool *TcpPool) Release() error {
// 1并发安全锁
pool.mu.Lock()
defer pool.mu.Unlock()
// 2确定连接池是否被释放
if pool.idleList == nil {
return nil
}
// 3关闭IdleList
close(pool.idleList)
// 4释放全部空闲连接
// 继续接收已关闭channel中的元素
for idleConn := range pool.idleList {
// 关闭连接
_ = pool.config.Factory.Close(idleConn.conn)
}
return nil
}
// 获取连接池长度
// 当前的可用连接数
func (pool *TcpPool) Len() int {
return len(pool.idleList)
}