diff --git a/rule/forward.go b/rule/forward.go index b76d1f4..5633d9f 100644 --- a/rule/forward.go +++ b/rule/forward.go @@ -25,6 +25,7 @@ type Forwarder struct { failures uint32 latency int64 intface string // local interface or ip address + cntConn int handlers []StatusHandler } @@ -113,6 +114,9 @@ func (f *Forwarder) Dial(network, addr string) (c net.Conn, err error) { if err != nil { f.IncFailures() } + if IsLeastConn == true { + c = NewConn(c, f) + } return c, err } diff --git a/rule/group.go b/rule/group.go index 1edd699..27ceccc 100644 --- a/rule/group.go +++ b/rule/group.go @@ -81,6 +81,10 @@ func newFwdrGroup(name string, fwdrs []*Forwarder, c *Strategy) *FwdrGroup { case "dh": p.next = p.scheduleDH log.F("[strategy] %s: %d forwarders forward in destination hashing mode.", name, count) + case "lc": + IsLeastConn = true + p.next = p.scheduleLC + log.F("[strategy] %s: %d forwarders forward in least connection mode.", name, count) default: p.next = p.scheduleRR log.F("[strategy] %s: not supported forward mode '%s', use round robin mode for %d forwarders.", name, c.Strategy, count) @@ -300,3 +304,13 @@ func (p *FwdrGroup) scheduleDH(dstAddr string) *Forwarder { fnv1a.Write([]byte(dstAddr)) return p.avail[fnv1a.Sum32()%uint32(len(p.avail))] } +// least connection +func (p *FwdrGroup) scheduleLC(dstAddr string) *Forwarder { + lcfwdr := p.avail[0] + for _, f := range p.avail { + if f.cntConn < lcfwdr.cntConn { + lcfwdr = f + } + } + return lcfwdr +} diff --git a/rule/proxy.go b/rule/proxy.go index b131f02..4dced0b 100644 --- a/rule/proxy.go +++ b/rule/proxy.go @@ -17,7 +17,7 @@ type Proxy struct { ipMap sync.Map cidrMap sync.Map } - +var IsLeastConn bool //is use leastConn strategy // NewProxy returns a new rule proxy. func NewProxy(mainForwarders []string, mainStrategy *Strategy, rules []*Config) *Proxy { rd := &Proxy{main: NewFwdrGroup("main", mainForwarders, mainStrategy)} diff --git a/rule/umconn.go b/rule/umconn.go new file mode 100644 index 0000000..2dadb7d --- /dev/null +++ b/rule/umconn.go @@ -0,0 +1,90 @@ +package rule + +import ( + "net" + "time" +) + +type Conn struct { + conn net.Conn + forwarder *Forwarder +} +func NewConn( c net.Conn,forwarder *Forwarder) *Conn { + forwarder.cntConn++ + return &Conn{ + conn: c, + forwarder: forwarder, + } + +} +// Read reads data from the connection. +// Read can be made to time out and return an error after a fixed +// time limit; see SetDeadline and SetReadDeadline. +func (c *Conn) Read(b []byte) (n int, err error) { + return c.conn.Read(b) +} + +// Write writes data to the connection. +// Write can be made to time out and return an error after a fixed +// time limit; see SetDeadline and SetWriteDeadline. +func (c *Conn) Write(b []byte) (n int, err error) { + return c.conn.Write(b) +} + +// Close closes the connection. +// Any blocked Read or Write operations will be unblocked and return errors. +func (c *Conn) Close() error { + c.forwarder.cntConn-- + return c.conn.Close() +} + +// LocalAddr returns the local network address. +func (c *Conn) LocalAddr() net.Addr { + return c.conn.LocalAddr() +} + +// RemoteAddr returns the remote network address. +func (c *Conn) RemoteAddr() net.Addr { + return c.conn.RemoteAddr() +} + +// SetDeadline sets the read and write deadlines associated +// with the connection. It is equivalent to calling both +// SetReadDeadline and SetWriteDeadline. +// +// A deadline is an absolute time after which I/O operations +// fail instead of blocking. The deadline applies to all future +// and pending I/O, not just the immediately following call to +// Read or Write. After a deadline has been exceeded, the +// connection can be refreshed by setting a deadline in the future. +// +// If the deadline is exceeded a call to Read or Write or to other +// I/O methods will return an error that wraps os.ErrDeadlineExceeded. +// This can be tested using errors.Is(err, os.ErrDeadlineExceeded). +// The error's Timeout method will return true, but note that there +// are other possible errors for which the Timeout method will +// return true even if the deadline has not been exceeded. +// +// An idle timeout can be implemented by repeatedly extending +// the deadline after successful Read or Write calls. +// +// A zero value for t means I/O operations will not time out. +func (c *Conn) SetDeadline(t time.Time) error { + return c.conn.SetDeadline(t) +} + +// SetReadDeadline sets the deadline for future Read calls +// and any currently-blocked Read call. +// A zero value for t means Read will not time out. +func (c *Conn) SetReadDeadline(t time.Time) error { + return c.conn.SetReadDeadline(t) +} + +// SetWriteDeadline sets the deadline for future Write calls +// and any currently-blocked Write call. +// Even if write times out, it may return n > 0, indicating that +// some of the data was successfully written. +// A zero value for t means Write will not time out. +func (c *Conn) SetWriteDeadline(t time.Time) error { + return c.conn.SetWriteDeadline(t) +}