From f3dc252967fbfdbc72a23dd016fae6f0c286b75a Mon Sep 17 00:00:00 2001 From: nadoo <287492+nadoo@users.noreply.github.com> Date: Thu, 29 Jul 2021 20:05:20 +0800 Subject: [PATCH] udp,unix,tproxy: improve udp server --- go.mod | 2 +- go.sum | 4 +- proxy/tproxy/tproxy_linux.go | 60 ++++++++++------------- proxy/udp/udp.go | 92 +++++++++++++++++++++--------------- proxy/unix/server.go | 86 ++++++++++++++++++++------------- 5 files changed, 136 insertions(+), 108 deletions(-) diff --git a/go.mod b/go.mod index df27c59..b0faf1a 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/u-root/uio v0.0.0-20210528151154-e40b768296a7 // indirect github.com/xtaci/kcp-go/v5 v5.6.1 golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 - golang.org/x/net v0.0.0-20210716203947-853a461950ff // indirect + golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985 // indirect golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c ) diff --git a/go.sum b/go.sum index 18d606c..01cba5e 100644 --- a/go.sum +++ b/go.sum @@ -129,8 +129,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210716203947-853a461950ff h1:j2EK/QoxYNBsXI4R7fQkkRUk8y6wnOBI+6hgPdP/6Ds= -golang.org/x/net v0.0.0-20210716203947-853a461950ff/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985 h1:4CSI6oo7cOjJKajidEljs9h+uP0rRZBPPPhcCbj5mw8= +golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/proxy/tproxy/tproxy_linux.go b/proxy/tproxy/tproxy_linux.go index 28afeac..13a5cd0 100644 --- a/proxy/tproxy/tproxy_linux.go +++ b/proxy/tproxy/tproxy_linux.go @@ -86,35 +86,20 @@ func (s *TProxy) ListenAndServeUDP() { continue } - s.handleMsg(srcAddr, dstAddr, buf[:n]) - } -} + var session *Session + sessionKey := srcAddr.String() -// handleMsg handles an udp message. -func (s *TProxy) handleMsg(srcAddr, dstAddr *net.UDPAddr, data []byte) { - var session *Session - sessionKey := srcAddr.String() - - v, ok := nm.Load(sessionKey) - if ok && v != nil { - session = v.(*Session) - session.msgCh <- data - - select { - case <-session.finCh: - nm.Delete(session.key) - close(session.msgCh) - close(session.finCh) - default: - return + v, ok := nm.Load(sessionKey) + if !ok || v == nil { + session = newSession(sessionKey, srcAddr, dstAddr) + nm.Store(sessionKey, session) + go s.serveSession(session) + } else { + session = v.(*Session) } + + session.msgCh <- buf[:n] } - - session = newSession(sessionKey, srcAddr, dstAddr) - nm.Store(sessionKey, session) - - go s.serveSession(session) - session.msgCh <- data } // serveSession serves a udp session. @@ -133,20 +118,27 @@ func (s *TProxy) serveSession(session *Session) { } defer srcPC.Close() - log.F("[tproxyu] %s <-> %s via %s", session.src, session.dst, dialer.Addr()) - go func() { proxy.RelayUDP(srcPC, session.src, dstPC, 2*time.Minute) - session.finCh <- struct{}{} + nm.Delete(session.key) + close(session.finCh) }() - for data := range session.msgCh { - _, err = dstPC.WriteTo(data, writeTo) - if err != nil { - log.F("[tproxyu] writeTo error: %v", err) + log.F("[tproxyu] %s <-> %s via %s", session.src, session.dst, dialer.Addr()) + + for { + select { + case p := <-session.msgCh: + _, err = dstPC.WriteTo(p, writeTo) + if err != nil { + log.F("[tproxyu] writeTo %s error: %v", writeTo, err) + } + pool.PutBuffer(p) + case <-session.finCh: + return } - pool.PutBuffer(data) } + } // Session is a udp session diff --git a/proxy/udp/udp.go b/proxy/udp/udp.go index 6630930..0791eef 100644 --- a/proxy/udp/udp.go +++ b/proxy/udp/udp.go @@ -8,6 +8,7 @@ import ( "time" "github.com/nadoo/glider/log" + "github.com/nadoo/glider/pool" "github.com/nadoo/glider/proxy" ) @@ -23,6 +24,8 @@ func init() { proxy.RegisterServer("udp", NewUDPServer) } +var nm sync.Map + // NewUDP returns a udp struct. func NewUDP(s string, d proxy.Dialer, p proxy.Proxy) (*UDP, error) { u, err := url.Parse(s) @@ -61,58 +64,73 @@ func (s *UDP) ListenAndServe() { log.F("[udp] listening UDP on %s", s.addr) - var nm sync.Map - buf := make([]byte, proxy.UDPBufSize) - for { - n, lraddr, err := c.ReadFrom(buf) + buf := pool.GetBuffer(proxy.UDPBufSize) + n, srcAddr, err := c.ReadFrom(buf) if err != nil { log.F("[udp] read error: %v", err) continue } - var session *natEntry - v, ok := nm.Load(lraddr.String()) - if !ok && v == nil { - // we know we are creating an udp tunnel, so the dial addr is meaningless, - // we use lraddr here to help the unix client to identify the source socket. - pc, dialer, writeTo, err := s.proxy.DialUDP("udp", lraddr.String()) - if err != nil { - log.F("[udp] remote dial error: %v", err) - continue - } - - session = newNatEntry(pc, writeTo) - nm.Store(lraddr.String(), session) - - go func(c, pc net.PacketConn, lraddr net.Addr) { - proxy.RelayUDP(c, lraddr, pc, 2*time.Minute) - pc.Close() - nm.Delete(lraddr.String()) - }(c, pc, lraddr) - - log.F("[udp] %s <-> %s", lraddr, dialer.Addr()) + var session *Session + sessionKey := srcAddr.String() + v, ok := nm.Load(sessionKey) + if !ok || v == nil { + session = newSession(sessionKey, srcAddr, c) + nm.Store(sessionKey, session) + go s.serveSession(session) } else { - session = v.(*natEntry) - } - - _, err = session.WriteTo(buf[:n], session.writeTo) - if err != nil { - log.F("[udp] writeTo %s error: %v", session.writeTo, err) - continue + session = v.(*Session) } + session.msgCh <- buf[:n] } } -type natEntry struct { - net.PacketConn - writeTo net.Addr +func (s *UDP) serveSession(session *Session) { + // we know we are creating an udp tunnel, so the dial addr is meaningless, + // we use srcAddr here to help the unix client to identify the source socket. + dstPC, dialer, writeTo, err := s.proxy.DialUDP("udp", session.src.String()) + if err != nil { + log.F("[udp] remote dial error: %v", err) + return + } + defer dstPC.Close() + + go func() { + proxy.RelayUDP(session.srcPC, session.src, dstPC, 2*time.Minute) + nm.Delete(session.key) + close(session.finCh) + }() + + log.F("[udp] %s <-> %s", session.src, dialer.Addr()) + + for { + select { + case p := <-session.msgCh: + _, err = dstPC.WriteTo(p, writeTo) + if err != nil { + log.F("[udp] writeTo %s error: %v", writeTo, err) + } + pool.PutBuffer(p) + case <-session.finCh: + return + } + } } -func newNatEntry(pc net.PacketConn, writeTo net.Addr) *natEntry { - return &natEntry{PacketConn: pc, writeTo: writeTo} +// Session is a udp session +type Session struct { + key string + src net.Addr + srcPC net.PacketConn + msgCh chan []byte + finCh chan struct{} +} + +func newSession(key string, src net.Addr, srcPC net.PacketConn) *Session { + return &Session{key, src, srcPC, make(chan []byte, 32), make(chan struct{})} } // Serve serves a connection. diff --git a/proxy/unix/server.go b/proxy/unix/server.go index 42d13e5..58fb6b1 100644 --- a/proxy/unix/server.go +++ b/proxy/unix/server.go @@ -8,6 +8,7 @@ import ( "time" "github.com/nadoo/glider/log" + "github.com/nadoo/glider/pool" "github.com/nadoo/glider/proxy" ) @@ -15,6 +16,8 @@ func init() { proxy.RegisterServer("unix", NewUnixServer) } +var nm sync.Map + // NewUnixServer returns a unix domain socket server. func NewUnixServer(s string, p proxy.Proxy) (proxy.Server, error) { schemes := strings.SplitN(s, ",", 2) @@ -102,54 +105,69 @@ func (s *Unix) ListenAndServeUDP() { log.F("[unix] ListenPacket on %s", s.addru) - var nm sync.Map - buf := make([]byte, proxy.UDPBufSize) - for { - n, lraddr, err := c.ReadFrom(buf) + buf := pool.GetBuffer(proxy.UDPBufSize) + n, srcAddr, err := c.ReadFrom(buf) if err != nil { log.F("[unix] read error: %v", err) continue } - var session *natEntry - v, ok := nm.Load(lraddr.String()) - if !ok && v == nil { - pc, dialer, writeTo, err := s.proxy.DialUDP("udp", "") - if err != nil { - log.F("[unix] remote dial error: %v", err) - continue - } - - session = newNatEntry(pc, writeTo) - nm.Store(lraddr.String(), session) - - go func(c, pc net.PacketConn, lraddr net.Addr) { - proxy.RelayUDP(c, lraddr, pc, 2*time.Minute) - pc.Close() - nm.Delete(lraddr.String()) - }(c, pc, lraddr) - - log.F("[unix] %s <-> %s", lraddr, dialer.Addr()) + var session *Session + sessionKey := srcAddr.String() + v, ok := nm.Load(sessionKey) + if !ok || v == nil { + session = newSession(sessionKey, srcAddr, c) + nm.Store(sessionKey, session) + go s.serveSession(session) } else { - session = v.(*natEntry) + session = v.(*Session) } - _, err = session.WriteTo(buf[:n], session.writeTo) - if err != nil { - log.F("[unix] writeTo %s error: %v", session.writeTo, err) - continue - } + session.msgCh <- buf[:n] + } +} +func (s *Unix) serveSession(session *Session) { + dstPC, dialer, writeTo, err := s.proxy.DialUDP("udp", "") + if err != nil { + log.F("[unix] remote dial error: %v", err) + return + } + defer dstPC.Close() + + go func() { + proxy.RelayUDP(session.srcPC, session.src, dstPC, 2*time.Minute) + nm.Delete(session.key) + close(session.finCh) + }() + + log.F("[unix] %s <-> %s", session.src, dialer.Addr()) + + for { + select { + case p := <-session.msgCh: + _, err = dstPC.WriteTo(p, writeTo) + if err != nil { + log.F("[unix] writeTo %s error: %v", writeTo, err) + } + pool.PutBuffer(p) + case <-session.finCh: + return + } } } -type natEntry struct { - net.PacketConn - writeTo net.Addr +// Session is a udp session +type Session struct { + key string + src net.Addr + srcPC net.PacketConn + msgCh chan []byte + finCh chan struct{} } -func newNatEntry(pc net.PacketConn, writeTo net.Addr) *natEntry { - return &natEntry{PacketConn: pc, writeTo: writeTo} +func newSession(key string, src net.Addr, srcPC net.PacketConn) *Session { + return &Session{key, src, srcPC, make(chan []byte, 32), make(chan struct{})} }