From 98ce20b29515198c7b2a0b317f6ea0d76cd4231b Mon Sep 17 00:00:00 2001 From: nadoo <287492+nadoo@users.noreply.github.com> Date: Thu, 26 Nov 2020 19:21:27 +0800 Subject: [PATCH] check: stop checking when protocol not supported --- go.mod | 4 +- go.sum | 8 ++-- proxy/dialer.go | 4 ++ proxy/tcp/tcp.go | 27 +------------- proxy/udp/udp.go | 4 +- rule/check.go | 96 +++++++++++++----------------------------------- rule/forward.go | 5 ++- rule/group.go | 41 +++++++++++++-------- rule/proxy.go | 2 +- 9 files changed, 69 insertions(+), 122 deletions(-) diff --git a/go.mod b/go.mod index 3bfacb6..6f358a0 100644 --- a/go.mod +++ b/go.mod @@ -14,9 +14,9 @@ require ( github.com/nadoo/ipset v0.3.0 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/xtaci/kcp-go/v5 v5.6.1 - golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9 + golang.org/x/crypto v0.0.0-20201124201722-c8d3bf9c5392 golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect - golang.org/x/tools v0.0.0-20201124005743-911501bfb504 // indirect + golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b // indirect gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect ) diff --git a/go.sum b/go.sum index 90abcce..5a72ba5 100644 --- a/go.sum +++ b/go.sum @@ -87,8 +87,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9 h1:phUcVbl53swtrUN8kQEXFhUxPlIlWyBfKmidCu7P95o= -golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20201124201722-c8d3bf9c5392 h1:xYJJ3S178yv++9zXV/hnr29plCAGO9vAFG9dorqaFQc= +golang.org/x/crypto v0.0.0-20201124201722-c8d3bf9c5392/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -141,8 +141,8 @@ golang.org/x/tools v0.0.0-20200425043458-8463f397d07c/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200808161706-5bf02b21f123/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174 h1:0rx0F4EjJNbxTuzWe0KjKcIzs+3VEb/Mrs/d1ciNz1c= golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.0.0-20201124005743-911501bfb504 h1:jOKV2ysikH1GANB7t2LotmhyvkkPvl7HQoEXkV6slJA= -golang.org/x/tools v0.0.0-20201124005743-911501bfb504/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b h1:Lq5JUTFhiybGVf28jB6QRpqd13/JPOaCnET17PVzYJE= +golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/proxy/dialer.go b/proxy/dialer.go index a30d6ca..912c68a 100644 --- a/proxy/dialer.go +++ b/proxy/dialer.go @@ -6,6 +6,10 @@ import ( "strings" ) +var ( + ErrNotSupported = errors.New("not supported") +) + // Dialer is used to create connection. type Dialer interface { TCPDialer diff --git a/proxy/tcp/tcp.go b/proxy/tcp/tcp.go index 236abbb..b851482 100644 --- a/proxy/tcp/tcp.go +++ b/proxy/tcp/tcp.go @@ -1,7 +1,6 @@ package tcp import ( - "errors" "net" "net/url" "strings" @@ -12,10 +11,9 @@ import ( // TCP struct. type TCP struct { + addr string dialer proxy.Dialer proxy proxy.Proxy - server proxy.Server - addr string } func init() { @@ -47,19 +45,6 @@ func NewTCPDialer(s string, d proxy.Dialer) (proxy.Dialer, error) { // NewTCPServer returns a tcp transport layer before the real server. func NewTCPServer(s string, p proxy.Proxy) (proxy.Server, error) { - // transport := strings.Split(s, ",") - - // prepare transport listener - // TODO: check here - // if len(transport) < 2 { - // return nil, errors.New("[tcp] malformd listener:" + s) - // } - - // t.server, err = proxy.ServerFromURL(transport[1], p) - // if err != nil { - // return nil, err - // } - return NewTCP(s, nil, p) } @@ -87,14 +72,6 @@ func (s *TCP) ListenAndServe() { // Serve serves a connection. func (s *TCP) Serve(c net.Conn) { - // we know the internal server will close the connection after serve - // defer c.Close() - - if s.server != nil { - s.server.Serve(c) - return - } - defer c.Close() if c, ok := c.(*net.TCPConn); ok { @@ -135,5 +112,5 @@ func (s *TCP) Dial(network, addr string) (net.Conn, error) { // DialUDP connects to the given address via the proxy. func (s *TCP) DialUDP(network, addr string) (net.PacketConn, net.Addr, error) { - return nil, nil, errors.New("tcp client does not support udp now") + return nil, nil, proxy.ErrNotSupported } diff --git a/proxy/udp/udp.go b/proxy/udp/udp.go index 52d5a1a..c6216d0 100644 --- a/proxy/udp/udp.go +++ b/proxy/udp/udp.go @@ -1,6 +1,7 @@ package udp import ( + "fmt" "net" "net/url" "sync" @@ -15,7 +16,6 @@ type UDP struct { addr string dialer proxy.Dialer proxy proxy.Proxy - server proxy.Server } func init() { @@ -120,7 +120,7 @@ func (s *UDP) Addr() string { // Dial connects to the address addr on the network net via the proxy. func (s *UDP) Dial(network, addr string) (net.Conn, error) { - return s.dialer.Dial("udp", s.addr) + return nil, fmt.Errorf("can not dial tcp via udp dialer: %w", proxy.ErrNotSupported) } // DialUDP connects to the given address via the proxy. diff --git a/rule/check.go b/rule/check.go index f5d045c..bea5211 100644 --- a/rule/check.go +++ b/rule/check.go @@ -2,18 +2,20 @@ package rule import ( "bytes" + "errors" + "fmt" "io" "os" "os/exec" "time" - "github.com/nadoo/glider/log" "github.com/nadoo/glider/pool" + "github.com/nadoo/glider/proxy" ) // Checker is a forwarder health checker. type Checker interface { - Check(fwdr *Forwarder) (healthy bool) + Check(dialer proxy.Dialer) (elap time.Duration, err error) } type tcpChecker struct { @@ -25,34 +27,16 @@ func newTcpChecker(addr string, timeout time.Duration) *tcpChecker { return &tcpChecker{addr, timeout} } -func (c *tcpChecker) Check(fwdr *Forwarder) bool { +// Check implements the Checker interface. +func (c *tcpChecker) Check(dialer proxy.Dialer) (time.Duration, error) { startTime := time.Now() - - rc, err := fwdr.Dial("tcp", c.addr) + rc, err := dialer.Dial("tcp", c.addr) if err != nil { - log.F("[check] tcp:%s(%d), FAILED. error in dial: %s", fwdr.Addr(), fwdr.Priority(), err) - fwdr.Disable() - return false + return 0, err } defer rc.Close() - if c.timeout > 0 { - rc.SetDeadline(time.Now().Add(c.timeout)) - } - - elapsed := time.Since(startTime) - fwdr.SetLatency(int64(elapsed)) - - if elapsed > c.timeout { - log.F("[check] tcp:%s(%d), FAILED. check timeout: %s", fwdr.Addr(), fwdr.Priority(), elapsed) - fwdr.Disable() - return false - } - - log.F("[check] tcp:%s(%d), SUCCESS. elapsed: %s", fwdr.Addr(), fwdr.Priority(), elapsed) - fwdr.Enable() - - return true + return time.Since(startTime), nil } type httpChecker struct { @@ -66,13 +50,12 @@ func newHttpChecker(addr, uri, expect string, timeout time.Duration) *httpChecke return &httpChecker{addr, uri, expect, timeout} } -func (c *httpChecker) Check(fwdr *Forwarder) bool { +// Check implements the Checker interface. +func (c *httpChecker) Check(dialer proxy.Dialer) (time.Duration, error) { startTime := time.Now() - rc, err := fwdr.Dial("tcp", c.addr) + rc, err := dialer.Dial("tcp", c.addr) if err != nil { - log.F("[check] %s(%d) -> http://%s, FAILED. error in dial: %s", fwdr.Addr(), fwdr.Priority(), c.addr, err) - fwdr.Disable() - return false + return 0, err } defer rc.Close() @@ -80,11 +63,9 @@ func (c *httpChecker) Check(fwdr *Forwarder) bool { rc.SetDeadline(time.Now().Add(c.timeout)) } - _, err = io.WriteString(rc, "GET "+c.uri+" HTTP/1.1\r\nHost:"+c.addr+"\r\nConnection: close"+"\r\n\r\n") - if err != nil { - log.F("[check] %s(%d) -> http://%s, FAILED. error in write: %s", fwdr.Addr(), fwdr.Priority(), c.addr, err) - fwdr.Disable() - return false + if _, err = io.WriteString(rc, + "GET "+c.uri+" HTTP/1.1\r\nHost:"+c.addr+"\r\nConnection: close"+"\r\n\r\n"); err != nil { + return 0, err } r := pool.GetBufReader(rc) @@ -92,55 +73,30 @@ func (c *httpChecker) Check(fwdr *Forwarder) bool { line, _, err := r.ReadLine() if err != nil { - log.F("[check] %s(%d) -> http://%s, FAILED. error in read: %s", fwdr.Addr(), fwdr.Priority(), c.addr, err) - fwdr.Disable() - return false + return 0, err } if !bytes.Contains(line, []byte(c.expect)) { - log.F("[check] %s(%d) -> http://%s, FAILED. expect: %s, server response: %s", fwdr.Addr(), fwdr.Priority(), c.addr, c.expect, line) - fwdr.Disable() - return false + return 0, fmt.Errorf("expect: %s, got: %s", c.expect, line) } elapsed := time.Since(startTime) - fwdr.SetLatency(int64(elapsed)) - if elapsed > c.timeout { - log.F("[check] %s(%d) -> http://%s, FAILED. check timeout: %s", fwdr.Addr(), fwdr.Priority(), c.addr, elapsed) - fwdr.Disable() - return false + return elapsed, errors.New("timeout") } - log.F("[check] %s(%d) -> http://%s, SUCCESS. elapsed: %s", fwdr.Addr(), fwdr.Priority(), c.addr, elapsed) - fwdr.Enable() - - return true + return elapsed, nil } -type fileChecker struct { - path string -} +type fileChecker struct{ path string } -func newFileChecker(path string) *fileChecker { - return &fileChecker{path} -} +func newFileChecker(path string) *fileChecker { return &fileChecker{path} } -func (c *fileChecker) Check(fwdr *Forwarder) bool { +// Check implements the Checker interface. +func (c *fileChecker) Check(dialer proxy.Dialer) (time.Duration, error) { cmd := exec.Command(c.path) cmd.Stdout = os.Stdout cmd.Env = os.Environ() - cmd.Env = append(cmd.Env, "FORWARDER_ADDR="+fwdr.Addr()) - - err := cmd.Run() - if err != nil { - log.F("[check] file:%s(%d), FAILED. err: %s", fwdr.Addr(), fwdr.Priority(), err) - fwdr.Disable() - return false - } - - log.F("[check] file:%s(%d), SUCCESS.", fwdr.Addr(), fwdr.Priority()) - fwdr.Enable() - - return true + cmd.Env = append(cmd.Env, "FORWARDER_ADDR="+dialer.Addr()) + return 0, cmd.Run() } diff --git a/rule/forward.go b/rule/forward.go index 76273cf..73d7b4a 100644 --- a/rule/forward.go +++ b/rule/forward.go @@ -119,9 +119,10 @@ func (f *Forwarder) IncFailures() { return } - log.F("[forwarder] %s recorded %d failures, maxfailures: %d", f.addr, failures, f.MaxFailures()) + // log.F("[forwarder] %s(%d) recorded %d failures, maxfailures: %d", f.addr, f.Priority(), failures, f.MaxFailures()) - if failures >= f.MaxFailures() && f.Enabled() { + if failures == f.MaxFailures() && f.Enabled() { + log.F("[forwarder] %s(%d) reaches maxfailures: %d", f.addr, f.Priority(), f.MaxFailures()) f.Disable() } } diff --git a/rule/group.go b/rule/group.go index 3b40b5b..eaab333 100644 --- a/rule/group.go +++ b/rule/group.go @@ -1,6 +1,7 @@ package rule import ( + "errors" "hash/fnv" "net" "net/url" @@ -176,7 +177,6 @@ func (p *FwdrGroup) onStatusChanged(fwdr *Forwarder) { // Check runs the forwarder checks. func (p *FwdrGroup) Check() { if len(p.fwdrs) == 1 { - p.config.MaxFailures = 0 log.F("[group] only 1 forwarder found, disable health checking") return } @@ -212,17 +212,18 @@ func (p *FwdrGroup) Check() { case "file": checker = newFileChecker(u.Host + u.Path) default: - p.config.MaxFailures = 0 log.F("[group] invalid check config `%s`, disable health checking", p.config.Check) return } + log.F("[group] using check config: %s", p.config.Check) + for i := 0; i < len(p.fwdrs); i++ { go p.check(p.fwdrs[i], checker) } } -func (p *FwdrGroup) check(f *Forwarder, checker Checker) { +func (p *FwdrGroup) check(fwdr *Forwarder, checker Checker) { wait := uint8(0) intval := time.Duration(p.config.CheckInterval) * time.Second @@ -230,27 +231,35 @@ func (p *FwdrGroup) check(f *Forwarder, checker Checker) { time.Sleep(intval * time.Duration(wait)) // check all forwarders at least one time - if wait > 0 && (f.Priority() < p.Priority()) { + if wait > 0 && (fwdr.Priority() < p.Priority()) { continue } - if f.Enabled() && p.config.CheckDisabledOnly { + if fwdr.Enabled() && p.config.CheckDisabledOnly { continue } - if checker.Check(f) { - wait = 1 - continue + elapsed, err := checker.Check(fwdr) + if err != nil { + if errors.Is(err, proxy.ErrNotSupported) { + fwdr.SetMaxFailures(0) + log.F("[check] %s(%d), %s, stop checking", fwdr.Addr(), fwdr.Priority(), err) + break + } + + wait++ + if wait > 16 { + wait = 16 + } + + log.F("[check] %s(%d), FAILED. error: %s", fwdr.Addr(), fwdr.Priority(), err) + fwdr.Disable() } - if wait == 0 { - wait = 1 - } - - wait *= 2 - if wait > 16 { - wait = 16 - } + wait = 1 + fwdr.Enable() + fwdr.SetLatency(int64(elapsed)) + log.F("[check] %s(%d), SUCCESS. elapsed: %s", fwdr.Addr(), fwdr.Priority(), elapsed) } } diff --git a/rule/proxy.go b/rule/proxy.go index 50699a1..9bb1d55 100644 --- a/rule/proxy.go +++ b/rule/proxy.go @@ -45,7 +45,7 @@ func NewProxy(mainForwarders []string, mainStrategy *Strategy, rules []*Config) if len(mainForwarders) > 0 { direct := NewFwdrGroup("", nil, mainStrategy) for _, f := range rd.main.fwdrs { - host := strings.Split(f.addr, ":")[0] + host, _, _ := net.SplitHostPort(f.addr) if ip := net.ParseIP(host); ip == nil { rd.domainMap.Store(strings.ToLower(host), direct) }