// Package grpcpool provides a pool of grpc clients package getcdv3 import ( "context" "errors" "sync" "time" "google.golang.org/grpc" ) var ( // ErrClosed is the error when the client pool is closed ErrClosed = errors.New("grpc pool: client pool is closed") // ErrTimeout is the error when the client pool timed out ErrTimeout = errors.New("grpc pool: client pool timed out") // ErrAlreadyClosed is the error when the client conn was already closed ErrAlreadyClosed = errors.New("grpc pool: the connection was already closed") // ErrFullPool is the error when the pool is already full ErrFullPool = errors.New("grpc pool: closing a ClientConn into a full pool") ) // Factory is a function type creating a grpc client type Factory func(schema, etcdaddr, servicename string) (*grpc.ClientConn, error) // FactoryWithContext is a function type creating a grpc client // that accepts the context parameter that could be passed from // Get or NewWithContext method. type FactoryWithContext func(context.Context) (*grpc.ClientConn, error) // Pool is the grpc client pool type Pool struct { clients chan ClientConn factory FactoryWithContext idleTimeout time.Duration maxLifeDuration time.Duration mu sync.RWMutex } // ClientConn is the wrapper for a grpc client conn type ClientConn struct { *grpc.ClientConn pool *Pool timeUsed time.Time timeInitiated time.Time unhealthy bool } // New creates a new clients pool with the given initial and maximum capacity, // and the timeout for the idle clients. Returns an error if the initial // clients could not be created func New(factory Factory, schema, etcdaddr, servicename string, init, capacity int, idleTimeout time.Duration, maxLifeDuration ...time.Duration) (*Pool, error) { return NewWithContext(context.Background(), func(ctx context.Context) (*grpc.ClientConn, error) { return factory(schema, etcdaddr, servicename) }, init, capacity, idleTimeout, maxLifeDuration...) } // NewWithContext creates a new clients pool with the given initial and maximum // capacity, and the timeout for the idle clients. The context parameter would // be passed to the factory method during initialization. Returns an error if the // initial clients could not be created. func NewWithContext(ctx context.Context, factory FactoryWithContext, init, capacity int, idleTimeout time.Duration, maxLifeDuration ...time.Duration) (*Pool, error) { if capacity <= 0 { capacity = 1 } if init < 0 { init = 0 } if init > capacity { init = capacity } p := &Pool{ clients: make(chan ClientConn, capacity), factory: factory, idleTimeout: idleTimeout, } if len(maxLifeDuration) > 0 { p.maxLifeDuration = maxLifeDuration[0] } for i := 0; i < init; i++ { c, err := factory(ctx) if err != nil { return nil, err } p.clients <- ClientConn{ ClientConn: c, pool: p, timeUsed: time.Now(), timeInitiated: time.Now(), } } // Fill the rest of the pool with empty clients for i := 0; i < capacity-init; i++ { p.clients <- ClientConn{ pool: p, } } return p, nil } func (p *Pool) getClients() chan ClientConn { p.mu.RLock() defer p.mu.RUnlock() return p.clients } // Close empties the pool calling Close on all its clients. // You can call Close while there are outstanding clients. // The pool channel is then closed, and Get will not be allowed anymore func (p *Pool) Close() { p.mu.Lock() clients := p.clients p.clients = nil p.mu.Unlock() if clients == nil { return } close(clients) for client := range clients { if client.ClientConn == nil { continue } client.ClientConn.Close() } } // IsClosed returns true if the client pool is closed. func (p *Pool) IsClosed() bool { return p == nil || p.getClients() == nil } // Get will return the next available client. If capacity // has not been reached, it will create a new one using the factory. Otherwise, // it will wait till the next client becomes available or a timeout. // A timeout of 0 is an indefinite wait func (p *Pool) Get(ctx context.Context) (*ClientConn, error) { clients := p.getClients() if clients == nil { return nil, ErrClosed } wrapper := ClientConn{ pool: p, } select { case wrapper = <-clients: // All good case <-ctx.Done(): return nil, ErrTimeout // it would better returns ctx.Err() } // If the wrapper was idle too long, close the connection and create a new // one. It's safe to assume that there isn't any newer client as the client // we fetched is the first in the channel idleTimeout := p.idleTimeout if wrapper.ClientConn != nil && idleTimeout > 0 && wrapper.timeUsed.Add(idleTimeout).Before(time.Now()) { wrapper.ClientConn.Close() wrapper.ClientConn = nil } var err error if wrapper.ClientConn == nil { wrapper.ClientConn, err = p.factory(ctx) if err != nil { // If there was an error, we want to put back a placeholder // client in the channel clients <- ClientConn{ pool: p, } } // This is a new connection, reset its initiated time wrapper.timeInitiated = time.Now() } return &wrapper, err } // Unhealthy marks the client conn as unhealthy, so that the connection // gets reset when closed func (c *ClientConn) Unhealthy() { c.unhealthy = true } // Close returns a ClientConn to the pool. It is safe to call multiple time, // but will return an error after first time func (c *ClientConn) Close() error { if c == nil { return nil } if c.ClientConn == nil { return ErrAlreadyClosed } if c.pool.IsClosed() { return ErrClosed } // If the wrapper connection has become too old, we want to recycle it. To // clarify the logic: if the sum of the initialization time and the max // duration is before Now(), it means the initialization is so old adding // the maximum duration couldn't put in the future. This sum therefore // corresponds to the cut-off point: if it's in the future we still have // time, if it's in the past it's too old maxDuration := c.pool.maxLifeDuration if maxDuration > 0 && c.timeInitiated.Add(maxDuration).Before(time.Now()) { c.Unhealthy() } // We're cloning the wrapper so we can set ClientConn to nil in the one // used by the user wrapper := ClientConn{ pool: c.pool, ClientConn: c.ClientConn, timeUsed: time.Now(), } if c.unhealthy { wrapper.ClientConn.Close() wrapper.ClientConn = nil } else { wrapper.timeInitiated = c.timeInitiated } select { case c.pool.clients <- wrapper: // All good default: return ErrFullPool } c.ClientConn = nil // Mark as closed return nil } // Capacity returns the capacity func (p *Pool) Capacity() int { if p.IsClosed() { return 0 } return cap(p.clients) } // Available returns the number of currently unused clients func (p *Pool) Available() int { if p.IsClosed() { return 0 } return len(p.clients) }