编程学习网 > 数据库 > 一次 golang fasthttp 踩坑经验
2020
03-16

一次 golang fasthttp 踩坑经验

一个简单的系统,结构如下:


我们的服务 A 接受外部的 http 请求,然后通过 golang 的 fasthttp 将请求转发给服务 B,流程非常简单。线上运行一段时间之后,发现服务 B 完全不再接收任何请求,查看服务 A 的日志,发现大量的如下错误

从错误原因看是因为连接被占满导致的。进入服务 A 的容器中(服务 A 和服务 B 都是通过 docker 启动的),通过 netstat -anlp 查看,发现有大量的 tpc 连接,处于 ESTABLISH。我们采用的是长连接的方式,此时心里非常疑惑:1. fasthttp 是能够复用连接的,为什么还会有如此多的 TCP 连接,2.为什么这些连接不能够使用了,出现上述异常,原因是什么?

从 fasthttpclient 源码出发,我们调用请求转发的时候是用的是 f.Client.DoTimeout(req, resp, f.ExecTimeout),其中 f.Client 是一个 fasthttp.HostClient,f.ExecTimeout 设置的是 5s。

追查代码,直到 client.go 中的这个方法

func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) { if req == nil { panic("BUG: req cannot be nil")
    } if resp == nil { panic("BUG: resp cannot be nil")
    }

    atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix)) // Free up resources occupied by response before sending the request, // so the GC may reclaim these resources (e.g. response body). resp.Reset() // If we detected a redirect to another schema if req.schemaUpdate {
        c.IsTLS = bytes.Equal(req.URI().Scheme(), strHTTPS)
        c.Addr = addMissingPort(string(req.Host()), c.IsTLS)
        c.addrIdx = 0 c.addrs = nil req.schemaUpdate = false req.SetConnectionClose()
    }

    cc, err := c.acquireConn() if err != nil { return false, err
    }
    conn := cc.c

    resp.parseNetConn(conn) if c.WriteTimeout > 0 { // Set Deadline every time, since golang has fixed the performance issue // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details currentTime := time.Now() if err = conn.SetWriteDeadline(currentTime.Add(c.WriteTimeout)); err != nil {
            c.closeConn(cc) return true, err
        }
    }

    resetConnection := false if c.MaxConnDuration > 0 && time.Since(cc.createdTime) > c.MaxConnDuration && !req.ConnectionClose() {
        req.SetConnectionClose()
        resetConnection = true }

    userAgentOld := req.Header.UserAgent() if len(userAgentOld) == 0 {
        req.Header.userAgent = c.getClientName()
    }
    bw := c.acquireWriter(conn)
    err = req.Write(bw) if resetConnection {
        req.Header.ResetConnectionClose()
    } if err == nil {
        err = bw.Flush()
    } if err != nil {
        c.releaseWriter(bw)
        c.closeConn(cc) return true, err
    }
    c.releaseWriter(bw) if c.ReadTimeout > 0 { // Set Deadline every time, since golang has fixed the performance issue // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details currentTime := time.Now() if err = conn.SetReadDeadline(currentTime.Add(c.ReadTimeout)); err != nil {
            c.closeConn(cc) return true, err
        }
    } if !req.Header.IsGet() && req.Header.IsHead() {
        resp.SkipBody = true } if c.DisableHeaderNamesNormalizing {
        resp.Header.DisableNormalizing()
    }

    br := c.acquireReader(conn) if err = resp.ReadLimitBody(br, c.MaxResponseBodySize); err != nil {
        c.releaseReader(br)
        c.closeConn(cc) // Don't retry in case of ErrBodyTooLarge since we will just get the same again. retry := err != ErrBodyTooLarge return retry, err
    }
    c.releaseReader(br) if resetConnection || req.ConnectionClose() || resp.ConnectionClose() {
        c.closeConn(cc)
    } else {
        c.releaseConn(cc)
    } return false, err
}

请注意 c.acquireConn()这个方法,这个方法即从连接池中获取连接,如果没有可用连接,则创建新的连接,该方法实现如下

func (c *HostClient) acquireConn() (*clientConn, error) { var cc *clientConn
    createConn := false startCleaner := false var n int c.connsLock.Lock()
    n = len(c.conns) if n == 0 {
        maxConns := c.MaxConns if maxConns <= 0 {
            maxConns = DefaultMaxConnsPerHost
        } if c.connsCount < maxConns {
            c.connsCount++
            createConn = true if !c.connsCleanerRun {
                startCleaner = true c.connsCleanerRun = true }
        }
    } else {
        n--
        cc = c.conns[n]
        c.conns[n] = nil c.conns = c.conns[:n]
    }
    c.connsLock.Unlock() if cc != nil { return cc, nil } if !createConn { return nil, ErrNoFreeConns
    } if startCleaner { go c.connsCleaner()
    }

    conn, err := c.dialHostHard() if err != nil {
        c.decConnsCount() return nil, err
    }
    cc = acquireClientConn(conn) return cc, nil }

其中 ErrNoFreeConns 即为 errors.New("no free connections available to host"),该错误就是我们服务中出现的错误。那原因很明显就是因为!createConn,即无法创建新的连接,为什么无法创建新的连接,是因为连接数已经达到了 maxConns = DefaultMaxConnsPerHost = 512(默认值)。连接数达到最大值了,但是为什么连接没有回收也没有复用,从这块看,还是没有看出来。又仔细的查了一下业务代码,发现很多服务 A 到服务 B 的请求,都是因为超时了而结束的,即达到了 f.ExecTimeout = 5s。

又从头查看源码,终于发现了玄机。

func clientDoDeadline(req *Request, resp *Response, deadline time.Time, c clientDoer) error {
    timeout := -time.Since(deadline) if timeout <= 0 { return ErrTimeout
    } var ch chan error
    chv := errorChPool.Get() if chv == nil {
        chv = make(chan error, 1)
    }
    ch = chv.(chan error) // Make req and resp copies, since on timeout they no longer // may be accessed. reqCopy := AcquireRequest()
    req.copyToSkipBody(reqCopy)
    swapRequestBody(req, reqCopy)
    respCopy := AcquireResponse() if resp != nil { // Not calling resp.copyToSkipBody(respCopy) here to avoid // unexpected messing with headers respCopy.SkipBody = resp.SkipBody
    } // Note that the request continues execution on ErrTimeout until // client-specific ReadTimeout exceeds. This helps limiting load // on slow hosts by MaxConns* concurrent requests. // // Without this 'hack' the load on slow host could exceed MaxConns* // concurrent requests, since timed out requests on client side // usually continue execution on the host. var mu sync.Mutex var timedout bool //这个goroutine是用来处理连接以及发送请求的 go func() {
        errDo := c.Do(reqCopy, respCopy)
        mu.Lock()
        { if !timedout { if resp != nil {
                    respCopy.copyToSkipBody(resp)
                    swapResponseBody(resp, respCopy)
                }
                swapRequestBody(reqCopy, req)
                ch <- errDo
            }
        }
        mu.Unlock()

        ReleaseResponse(respCopy)
        ReleaseRequest(reqCopy)
    }() //这块内容是用来处理超时的 tc := AcquireTimer(timeout) var err error select { case err = <-ch: case <-tc.C:
        mu.Lock()
        {
            timedout = true err = ErrTimeout
        }
        mu.Unlock()
    }
    ReleaseTimer(tc) select { case <-ch: default:
    }
    errorChPool.Put(chv) return err
}

我们看到,请求的超时时间是如何处理的。当我的请求超时后,主流程直接返回了超时错误,而此时,goroutine 里面还在等待请求的返回,而偏偏 B 服务,由于一些情况会抛出异常,也就是没有对这个请求进行返回,从而导致这个链接一直未得到释放,终于解答了为什么有大量的连接一直被占有从而导致无连接可用的情况。

最后,当我心里还在腹诽为什么 fasthttp 这么优秀的框架会有这种问题,如果服务端抛异常(不对请求进行返回)就会把连接打满?又自己看了一下代码,原来,

// DoTimeout performs the given request and waits for response during
// the given timeout duration.
//
// Request must contain at least non-zero RequestURI with full url (including
// scheme and host) or non-zero Host header + RequestURI.
//
// The function doesn't follow redirects. Use Get* for following redirects.
//
// Response is ignored if resp is nil.
//
// ErrTimeout is returned if the response wasn't returned during
// the given timeout.
//
// ErrNoFreeConns is returned if all HostClient.MaxConns connections
// to the host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
// and AcquireResponse in performance-critical code.
//
// Warning: DoTimeout does not terminate the request itself. The request will
// continue in the background and the response will be discarded.
// If requests take too long and the connection pool gets filled up please
// try setting a ReadTimeout.
func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
    return clientDoTimeout(req, resp, timeout, c)
}

人家这个方法的注释早就说明了,看最后一段注释,大意就是超时之后,请求依然会继续等待返回值,只是返回值会被丢弃,如果请求时间太长,会把连接池占满,正好是我们遇到的问题。为了解决,需要设置 ReadTimeout 字段,这个字段的我个人理解的意思就是当请求发出之后,达到 ReadTimeout 时间还没有得到返回值,客户端就会把连接断开(释放)。

以上就是这次经验之谈,切记,使用 fasthttp 的时候,加上 ReadTimeout 字段。

扫码二维码 获取免费视频学习资料

Python编程学习

查 看2022高级编程视频教程免费获取