You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

306 lines
6.5 KiB

1 year ago
package netProgram
import (
"errors"
"log"
"net"
"sync"
"time"
)
// 连接池接口
type Pool interface {
// 获取连接
Get() (net.Conn, error)
// 放回连接, 不是关闭
Put(net.Conn) error
// 释放池, 关闭全部连接
Release() error
// 有效连接的长度
Len() int
}
// 连接池配置结构
type PoolConfig struct {
//初始连接数, 池初始化时的连接数
InitConnNum int
//最大连接数, 池中最多支持多少连接
MaxConnNum int
//最大空闲连接数, 池中最多有多少可用的连接
MaxIdleNum int
//空闲连接超时时间, 多久后空闲连接会被释放
IdleTimeout time.Duration
// 建立连接的超时时间
// DialTimeout time.Duration
// 连接工厂
Factory ConnFactory
}
// 空闲连接类型(管理的连接)
type IdleConn struct {
// 连接本身
conn net.Conn
// 放入池子的时间,用于判断是否空间超时
putTime time.Time
}
// 连接池结构
type TcpPool struct {
// 配置信息
config PoolConfig
// 运行时信息
// 使用的连接数量
openingConnNum int
// 空闲连接链表
idleList chan *IdleConn
// 连接地址
addr string
// 并发安全锁
mu sync.RWMutex
}
// 连接工厂接口
type ConnFactory interface {
// 生产连接
Factory(addr string) (net.Conn, error)
// 关闭连接
Close(net.Conn) error
// Ping
Ping(net.Conn) error
}
// Tcp连接工厂类型
type TcpConnFactory struct{}
// 产生连接方法
func (*TcpConnFactory) Factory(addr string) (net.Conn, error) {
// 校验参数的合理性
if addr == "" {
return nil, errors.New("addr is empty")
}
// 建立连接
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err != nil {
return nil, err
}
// return
return conn, nil
}
// 关闭连接
func (*TcpConnFactory) Close(conn net.Conn) error {
return conn.Close()
}
func (*TcpConnFactory) Ping(conn net.Conn) error {
return nil
}
const (
defaultMaxConnNum = 10
defaultInitConnNum = 1
)
// 创建TcpPool对象
func NewTcpPool(addr string, poolConfig PoolConfig) (*TcpPool, error) {
// 1校验参数
if addr == "" {
return nil, errors.New("addr is empty")
}
// 校验工厂的存在
if poolConfig.Factory == nil {
return nil, errors.New("factory is not exists")
}
// 最大连接数
if poolConfig.MaxConnNum == 0 {
//a,return错误
//return nil, errors.New("max conn num is zero")
//b,人为修改一个合理的
poolConfig.MaxConnNum = defaultMaxConnNum
}
// 初始化连接数
if poolConfig.InitConnNum == 0 {
poolConfig.InitConnNum = defaultInitConnNum
} else if poolConfig.InitConnNum > poolConfig.MaxConnNum {
poolConfig.InitConnNum = poolConfig.MaxConnNum
}
// 合理化最大空闲连接数
if poolConfig.MaxIdleNum == 0 {
poolConfig.MaxIdleNum = poolConfig.InitConnNum
} else if poolConfig.MaxIdleNum > poolConfig.MaxConnNum {
poolConfig.MaxIdleNum = poolConfig.MaxConnNum
}
// 2初始化TcpPool对象
pool := TcpPool{
config: poolConfig,
openingConnNum: 0,
idleList: make(chan *IdleConn, poolConfig.MaxIdleNum),
addr: addr,
mu: sync.RWMutex{},
}
// 3初始化连接
// 根据InitConnNum的配置来创建
for i := 0; i < poolConfig.InitConnNum; i++ {
conn, err := pool.config.Factory.Factory(addr)
if err != nil {
// 通常意味着,连接池初始化失败
// 释放可能已经存在的连接
pool.Release()
return nil, err
}
// 连接创建成功
// 加入到空闲连接队列中
pool.idleList <- &IdleConn{
conn: conn,
putTime: time.Now(),
}
}
// 4返回
return &pool, nil
}
// TcpPool 实现 Pool 接口
func (pool *TcpPool) Get() (net.Conn, error) {
// 1锁定
pool.mu.Lock()
defer pool.mu.Unlock()
// 2获取空闲连接若没有则创建连接
for {
select {
// 获取空闲连接
case idleConn, ok := <-pool.idleList:
// 判断channel是否被关闭
if !ok {
return nil, errors.New("idle list closed")
}
// 判断连接是否超时
//pool.config.IdleTimeout, idleConn.putTime
if pool.config.IdleTimeout > 0 { // 设置了超时时间
// putTime + timeout 是否在 now 之前
if idleConn.putTime.Add(pool.config.IdleTimeout).Before(time.Now()) {
// 关闭连接,继续查找下一个连接
_ = pool.config.Factory.Close(idleConn.conn)
continue
}
}
// 判断连接是否可用
if err := pool.config.Factory.Ping(idleConn.conn); err != nil {
// ping 失败,连接不可用
// 关闭连接,继续查找
_ = pool.config.Factory.Close(idleConn.conn)
continue
}
// 找到了可用的空闲连接
log.Println("get conn from Idle")
// 使用的连接计数
pool.openingConnNum++
// 返回连接
return idleConn.conn, nil
// 创建连接
default:
// a判断是否还可以继续创建
// 基于开放的连接是否已经达到了连接池最大的连接数
if pool.openingConnNum >= pool.config.MaxConnNum {
return nil, errors.New("max opening connection")
// 另一种方案,就是阻塞
//continue
}
// b创建连接
conn, err := pool.config.Factory.Factory(pool.addr)
if err != nil {
return nil, err
}
// c正确创建了可用的连接
log.Println("get conn from Factory")
// 使用的连接计数
pool.openingConnNum++
// 返回连接
return conn, nil
}
}
}
func (pool *TcpPool) Put(conn net.Conn) error {
// 1锁
pool.mu.Lock()
defer pool.mu.Unlock()
// 2做一些校验
if conn == nil {
return errors.New("connection is not exists")
}
// 判断空闲连接列表是否存在
if pool.idleList == nil {
// 关闭连接
_ = pool.config.Factory.Close(conn)
return errors.New("idle list is not exists")
}
// 3放回连接
select {
// 放回连接
case pool.idleList <- &IdleConn{
conn: conn,
putTime: time.Now(),
}:
// 只要可以发送成功,任务完成
// 更新开放的连接数量
pool.openingConnNum--
return nil
// 关闭连接
default:
_ = pool.config.Factory.Close(conn)
return nil
}
}
1 year ago
// 释放连接池
func (pool *TcpPool) Release() error {
// 1并发安全锁
pool.mu.Lock()
defer pool.mu.Unlock()
// 2确定连接池是否被释放
if pool.idleList == nil {
return nil
}
// 3关闭IdleList
close(pool.idleList)
// 4释放全部空闲连接
// 继续接收已关闭channel中的元素
for idleConn := range pool.idleList {
// 关闭连接
_ = pool.config.Factory.Close(idleConn.conn)
}
1 year ago
return nil
}
1 year ago
// 获取连接池长度
// 当前的可用连接数
func (pool *TcpPool) Len() int {
return len(pool.idleList)
1 year ago
}