From bd40b073886f69c15eed1954f6a87a63547a9048 Mon Sep 17 00:00:00 2001 From: nadoo <287492+nadoo@users.noreply.github.com> Date: Wed, 26 Feb 2025 21:34:43 +0800 Subject: [PATCH] chore: update dependencies and optimize some code style --- dns/message.go | 9 +- go.mod | 4 +- go.sum | 8 +- pkg/smux/LICENSE | 21 - pkg/smux/frame.go | 33 +- pkg/smux/mux.go | 24 +- pkg/smux/mux_test.go | 86 --- pkg/smux/session.go | 118 ++++- pkg/smux/session_test.go | 1090 -------------------------------------- pkg/smux/shaper.go | 30 ++ pkg/smux/shaper_test.go | 50 -- pkg/smux/stream.go | 172 ++++-- proxy/http/client.go | 3 +- proxy/tproxy/server.go | 2 + proxy/tproxy/tproxy.go | 2 + proxy/vsock/client.go | 2 + proxy/vsock/server.go | 2 + proxy/vsock/socket.go | 3 +- proxy/vsock/vsock.go | 2 + service/dhcpd/cilent.go | 2 + service/dhcpd/dhcpd.go | 2 + service/dhcpd/pool.go | 2 + service/dhcpd/reply.go | 2 + 23 files changed, 321 insertions(+), 1348 deletions(-) delete mode 100644 pkg/smux/LICENSE delete mode 100644 pkg/smux/mux_test.go delete mode 100644 pkg/smux/session_test.go delete mode 100644 pkg/smux/shaper_test.go diff --git a/dns/message.go b/dns/message.go index 682ff3f..adbbab1 100644 --- a/dns/message.go +++ b/dns/message.go @@ -210,10 +210,11 @@ func (h *Header) SetAncount(ancount int) { h.ANCOUNT = uint16(ancount) } -func (h *Header) setFlag(QR uint16, Opcode uint16, AA uint16, - TC uint16, RD uint16, RA uint16, RCODE uint16) { - h.Bits = QR<<15 + Opcode<<11 + AA<<10 + TC<<9 + RD<<8 + RA<<7 + RCODE -} +// Not used now, but keep it for future use. +// func (h *Header) setFlag(QR uint16, Opcode uint16, AA uint16, +// TC uint16, RD uint16, RA uint16, RCODE uint16) { +// h.Bits = QR<<15 + Opcode<<11 + AA<<10 + TC<<9 + RD<<8 + RA<<7 + RCODE +// } // MarshalTo marshals header struct to []byte and write to w. func (h *Header) MarshalTo(w io.Writer) (int, error) { diff --git a/go.mod b/go.mod index d1838b6..c11078c 100644 --- a/go.mod +++ b/go.mod @@ -11,13 +11,13 @@ require ( github.com/nadoo/conflag v0.3.1 github.com/nadoo/ipset v0.5.0 github.com/xtaci/kcp-go/v5 v5.6.18 - golang.org/x/crypto v0.34.0 + golang.org/x/crypto v0.35.0 golang.org/x/sys v0.30.0 ) require ( github.com/ebfe/rc2 v0.0.0-20131011165748-24b9757f5521 // indirect - github.com/klauspost/cpuid/v2 v2.2.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/klauspost/reedsolomon v1.12.4 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/go.sum b/go.sum index 8b61ce9..928f208 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,8 @@ github.com/insomniacslk/dhcp v0.0.0-20250109001534-8abf58130905 h1:q3OEI9RaN/wwc github.com/insomniacslk/dhcp v0.0.0-20250109001534-8abf58130905/go.mod h1:VvGYjkZoJyKqlmT1yzakUs4mfKMNB0XdODP0+rdml6k= github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= -github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= -github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= +github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= +github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/klauspost/reedsolomon v1.12.4 h1:5aDr3ZGoJbgu/8+j45KtUJxzYm8k08JGtB9Wx1VQ4OA= github.com/klauspost/reedsolomon v1.12.4/go.mod h1:d3CzOMOt0JXGIFZm1StgkyF14EYr3xneR2rNWo7NcMU= github.com/mdlayher/packet v1.1.2 h1:3Up1NG6LZrsgDVn6X4L9Ge/iyRyxFEFD9o6Pr3Q1nQY= @@ -73,8 +73,8 @@ github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.34.0 h1:+/C6tk6rf/+t5DhUketUbD1aNGqiSX3j15Z6xuIDlBA= -golang.org/x/crypto v0.34.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= +golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= +golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= diff --git a/pkg/smux/LICENSE b/pkg/smux/LICENSE deleted file mode 100644 index eed41ac..0000000 --- a/pkg/smux/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2016-2017 Daniel Fu - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/pkg/smux/frame.go b/pkg/smux/frame.go index 467a058..902b655 100644 --- a/pkg/smux/frame.go +++ b/pkg/smux/frame.go @@ -1,3 +1,25 @@ +// MIT License +// +// Copyright (c) 2016-2017 xtaci +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + package smux import ( @@ -38,16 +60,18 @@ const ( // Frame defines a packet from or to be multiplexed into a single connection type Frame struct { - ver byte - cmd byte - sid uint32 - data []byte + ver byte // version + cmd byte // command + sid uint32 // stream id + data []byte // payload } +// newFrame creates a new frame with given version, command and stream id func newFrame(version byte, cmd byte, sid uint32) Frame { return Frame{ver: version, cmd: cmd, sid: sid} } +// rawHeader is a byte array representation of Frame header type rawHeader [headerSize]byte func (h rawHeader) Version() byte { @@ -71,6 +95,7 @@ func (h rawHeader) String() string { h.Version(), h.Cmd(), h.StreamID(), h.Length()) } +// updHeader is a byte array representation of cmdUPD type updHeader [szCmdUPD]byte func (h updHeader) Consumed() uint32 { diff --git a/pkg/smux/mux.go b/pkg/smux/mux.go index c0b8ab8..39815c7 100644 --- a/pkg/smux/mux.go +++ b/pkg/smux/mux.go @@ -1,7 +1,25 @@ -// Package smux is a multiplexing library for Golang. +// MIT License // -// It relies on an underlying connection to provide reliability and ordering, such as TCP or KCP, -// and provides stream-oriented multiplexing over a single channel. +// Copyright (c) 2016-2017 xtaci +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + package smux import ( diff --git a/pkg/smux/mux_test.go b/pkg/smux/mux_test.go deleted file mode 100644 index dc9c1c1..0000000 --- a/pkg/smux/mux_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package smux - -import ( - "bytes" - "testing" -) - -type buffer struct { - bytes.Buffer -} - -func (b *buffer) Close() error { - b.Buffer.Reset() - return nil -} - -func TestConfig(t *testing.T) { - VerifyConfig(DefaultConfig()) - - config := DefaultConfig() - config.KeepAliveInterval = 0 - err := VerifyConfig(config) - t.Log(err) - if err == nil { - t.Fatal(err) - } - - config = DefaultConfig() - config.KeepAliveInterval = 10 - config.KeepAliveTimeout = 5 - err = VerifyConfig(config) - t.Log(err) - if err == nil { - t.Fatal(err) - } - - config = DefaultConfig() - config.MaxFrameSize = 0 - err = VerifyConfig(config) - t.Log(err) - if err == nil { - t.Fatal(err) - } - - config = DefaultConfig() - config.MaxFrameSize = 65536 - err = VerifyConfig(config) - t.Log(err) - if err == nil { - t.Fatal(err) - } - - config = DefaultConfig() - config.MaxReceiveBuffer = 0 - err = VerifyConfig(config) - t.Log(err) - if err == nil { - t.Fatal(err) - } - - config = DefaultConfig() - config.MaxStreamBuffer = 0 - err = VerifyConfig(config) - t.Log(err) - if err == nil { - t.Fatal(err) - } - - config = DefaultConfig() - config.MaxStreamBuffer = 100 - config.MaxReceiveBuffer = 99 - err = VerifyConfig(config) - t.Log(err) - if err == nil { - t.Fatal(err) - } - - var bts buffer - if _, err := Server(&bts, config); err == nil { - t.Fatal("server started with wrong config") - } - - if _, err := Client(&bts, config); err == nil { - t.Fatal("client started with wrong config") - } -} diff --git a/pkg/smux/session.go b/pkg/smux/session.go index 7d76401..e47bfcb 100644 --- a/pkg/smux/session.go +++ b/pkg/smux/session.go @@ -1,3 +1,25 @@ +// MIT License +// +// Copyright (c) 2016-2017 xtaci +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + package smux import ( @@ -6,6 +28,7 @@ import ( "errors" "io" "net" + "runtime" "sync" "sync/atomic" "time" @@ -16,25 +39,38 @@ import ( const ( defaultAcceptBacklog = 1024 maxShaperSize = 1024 - openCloseTimeout = 30 * time.Second // stream open/close timeout + openCloseTimeout = 30 * time.Second // Timeout for opening/closing streams ) -// define frame class +// CLASSID represents the class of a frame type CLASSID int const ( - CLSCTRL CLASSID = iota + CLSCTRL CLASSID = iota // prioritized control signal CLSDATA ) +// timeoutError representing timeouts for operations such as accept, read and write +// +// To better cooperate with the standard library, timeoutError should implement the standard library's `net.Error`. +// +// For example, using smux to implement net.Listener and work with http.Server, the keep-alive connection (*smux.Stream) will be unexpectedly closed. +// For more details, see https://github.com/xtaci/smux/pull/99. +type timeoutError struct{} + +func (timeoutError) Error() string { return "timeout" } +func (timeoutError) Temporary() bool { return true } +func (timeoutError) Timeout() bool { return true } + var ( - ErrInvalidProtocol = errors.New("invalid protocol") - ErrConsumed = errors.New("peer consumed more than sent") - ErrGoAway = errors.New("stream id overflows, should start a new connection") - ErrTimeout = errors.New("timeout") - ErrWouldBlock = errors.New("operation would block on IO") + ErrInvalidProtocol = errors.New("invalid protocol") + ErrConsumed = errors.New("peer consumed more than sent") + ErrGoAway = errors.New("stream id overflows, should start a new connection") + ErrTimeout net.Error = &timeoutError{} + ErrWouldBlock = errors.New("operation would block on IO") ) +// writeRequest represents a request to write a frame type writeRequest struct { class CLASSID frame Frame @@ -42,6 +78,7 @@ type writeRequest struct { result chan writeResult } +// writeResult represents the result of a write request type writeResult struct { n int err error @@ -58,7 +95,7 @@ type Session struct { bucket int32 // token bucket bucketNotify chan struct{} // used for waiting for tokens - streams map[uint32]*Stream // all streams in this session + streams map[uint32]*stream // all streams in this session streamLock sync.Mutex // locks streams die chan struct{} // flag session has died @@ -77,7 +114,7 @@ type Session struct { chProtoError chan struct{} protoErrorOnce sync.Once - chAccepts chan *Stream + chAccepts chan *stream dataReady int32 // flag data has arrived @@ -85,7 +122,7 @@ type Session struct { deadline atomic.Value - requestID uint32 // write request monotonic increasing + requestID uint32 // Monotonic increasing write request ID shaper chan writeRequest // a shaper for writing writes chan writeRequest } @@ -95,8 +132,8 @@ func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { s.die = make(chan struct{}) s.conn = conn s.config = config - s.streams = make(map[uint32]*Stream) - s.chAccepts = make(chan *Stream, defaultAcceptBacklog) + s.streams = make(map[uint32]*stream) + s.chAccepts = make(chan *stream, defaultAcceptBacklog) s.bucket = int32(config.MaxReceiveBuffer) s.bucketNotify = make(chan struct{}, 1) s.shaper = make(chan writeRequest) @@ -144,7 +181,7 @@ func (s *Session) OpenStream() (*Stream, error) { stream := newStream(sid, s.config.MaxFrameSize, s) - if _, err := s.writeFrame(newFrame(byte(s.config.Version), cmdSYN, sid)); err != nil { + if _, err := s.writeControlFrame(newFrame(byte(s.config.Version), cmdSYN, sid)); err != nil { return nil, err } @@ -159,7 +196,14 @@ func (s *Session) OpenStream() (*Stream, error) { return nil, io.ErrClosedPipe default: s.streams[sid] = stream - return stream, nil + wrapper := &Stream{stream: stream} + // NOTE(x): disabled finalizer for issue #997 + /* + runtime.SetFinalizer(wrapper, func(s *Stream) { + s.Close() + }) + */ + return wrapper, nil } } @@ -180,7 +224,11 @@ func (s *Session) AcceptStream() (*Stream, error) { select { case stream := <-s.chAccepts: - return stream, nil + wrapper := &Stream{stream: stream} + runtime.SetFinalizer(wrapper, func(s *Stream) { + s.Close() + }) + return wrapper, nil case <-deadline: return nil, ErrTimeout case <-s.chSocketReadError: @@ -302,12 +350,15 @@ func (s *Session) RemoteAddr() net.Addr { // notify the session that a stream has closed func (s *Session) streamClosed(sid uint32) { s.streamLock.Lock() - if n := s.streams[sid].recycleTokens(); n > 0 { // return remaining tokens to the bucket - if atomic.AddInt32(&s.bucket, int32(n)) > 0 { - s.notifyBucket() + if stream, ok := s.streams[sid]; ok { + n := stream.recycleTokens() + if n > 0 { // return remaining tokens to the bucket + if atomic.AddInt32(&s.bucket, int32(n)) > 0 { + s.notifyBucket() + } } + delete(s.streams, sid) } - delete(s.streams, sid) s.streamLock.Unlock() } @@ -342,7 +393,7 @@ func (s *Session) recvLoop() { sid := hdr.StreamID() switch hdr.Cmd() { case cmdNOP: - case cmdSYN: + case cmdSYN: // stream opening s.streamLock.Lock() if _, ok := s.streams[sid]; !ok { stream := newStream(sid, s.config.MaxFrameSize, s) @@ -353,22 +404,26 @@ func (s *Session) recvLoop() { } } s.streamLock.Unlock() - case cmdFIN: + case cmdFIN: // stream closing s.streamLock.Lock() if stream, ok := s.streams[sid]; ok { stream.fin() stream.notifyReadEvent() } s.streamLock.Unlock() - case cmdPSH: + case cmdPSH: // data frame if hdr.Length() > 0 { newbuf := pool.GetBuffer(int(hdr.Length())) if written, err := io.ReadFull(s.conn, newbuf); err == nil { s.streamLock.Lock() if stream, ok := s.streams[sid]; ok { stream.pushBytes(newbuf) + // a stream used some token atomic.AddInt32(&s.bucket, -int32(written)) stream.notifyReadEvent() + } else { + // data directed to a missing/closed stream, recycle the buffer immediately. + pool.PutBuffer(newbuf) } s.streamLock.Unlock() } else { @@ -376,7 +431,7 @@ func (s *Session) recvLoop() { return } } - case cmdUPD: + case cmdUPD: // a window update signal if _, err := io.ReadFull(s.conn, updHdr[:]); err == nil { s.streamLock.Lock() if stream, ok := s.streams[sid]; ok { @@ -398,6 +453,7 @@ func (s *Session) recvLoop() { } } +// keepalive sends NOP frame to peer to keep the connection alive, and detect dead peers func (s *Session) keepalive() { tickerPing := time.NewTicker(s.config.KeepAliveInterval) tickerTimeout := time.NewTicker(s.config.KeepAliveTimeout) @@ -423,7 +479,8 @@ func (s *Session) keepalive() { } } -// shaper shapes the sending sequence among streams +// shaperLoop implements a priority queue for write requests, +// some control messages are prioritized over data messages func (s *Session) shaperLoop() { var reqs shaperHeap var next writeRequest @@ -464,6 +521,7 @@ func (s *Session) shaperLoop() { } } +// sendLoop sends frames to the underlying connection func (s *Session) sendLoop() { var buf []byte var n int @@ -491,6 +549,7 @@ func (s *Session) sendLoop() { binary.LittleEndian.PutUint16(buf[2:], uint16(len(request.frame.data))) binary.LittleEndian.PutUint32(buf[4:], request.frame.sid) + // support for scatter-gather I/O if len(vec) > 0 { vec[0] = buf[:headerSize] vec[1] = request.frame.data @@ -522,10 +581,13 @@ func (s *Session) sendLoop() { } } -// writeFrame writes the frame to the underlying connection +// writeControlFrame writes the control frame to the underlying connection // and returns the number of bytes written if successful -func (s *Session) writeFrame(f Frame) (n int, err error) { - return s.writeFrameInternal(f, time.After(openCloseTimeout), CLSCTRL) +func (s *Session) writeControlFrame(f Frame) (n int, err error) { + timer := time.NewTimer(openCloseTimeout) + defer timer.Stop() + + return s.writeFrameInternal(f, timer.C, CLSCTRL) } // internal writeFrame version to support deadline used in keepalive diff --git a/pkg/smux/session_test.go b/pkg/smux/session_test.go deleted file mode 100644 index b32a1c9..0000000 --- a/pkg/smux/session_test.go +++ /dev/null @@ -1,1090 +0,0 @@ -package smux - -import ( - "bytes" - crand "crypto/rand" - "encoding/binary" - "fmt" - "io" - "log" - "math/rand" - "net" - "net/http" - _ "net/http/pprof" - "strings" - "sync" - "testing" - "time" -) - -func init() { - go func() { - log.Println(http.ListenAndServe("0.0.0.0:6060", nil)) - }() -} - -// setupServer starts new server listening on a random localhost port and -// returns address of the server, function to stop the server, new client -// connection to this server or an error. -func setupServer(tb testing.TB) (addr string, stopfunc func(), client net.Conn, err error) { - ln, err := net.Listen("tcp", "localhost:0") - if err != nil { - return "", nil, nil, err - } - go func() { - conn, err := ln.Accept() - if err != nil { - return - } - go handleConnection(conn) - }() - addr = ln.Addr().String() - conn, err := net.Dial("tcp", addr) - if err != nil { - ln.Close() - return "", nil, nil, err - } - return ln.Addr().String(), func() { ln.Close() }, conn, nil -} - -func handleConnection(conn net.Conn) { - session, _ := Server(conn, nil) - for { - if stream, err := session.AcceptStream(); err == nil { - go func(s io.ReadWriteCloser) { - buf := make([]byte, 65536) - for { - n, err := s.Read(buf) - if err != nil { - return - } - s.Write(buf[:n]) - } - }(stream) - } else { - return - } - } -} - -// setupServer starts new server listening on a random localhost port and -// returns address of the server, function to stop the server, new client -// connection to this server or an error. -func setupServerV2(tb testing.TB) (addr string, stopfunc func(), client net.Conn, err error) { - ln, err := net.Listen("tcp", "localhost:0") - if err != nil { - return "", nil, nil, err - } - go func() { - conn, err := ln.Accept() - if err != nil { - return - } - go handleConnectionV2(conn) - }() - addr = ln.Addr().String() - conn, err := net.Dial("tcp", addr) - if err != nil { - ln.Close() - return "", nil, nil, err - } - return ln.Addr().String(), func() { ln.Close() }, conn, nil -} - -func handleConnectionV2(conn net.Conn) { - config := DefaultConfig() - config.Version = 2 - session, _ := Server(conn, config) - for { - if stream, err := session.AcceptStream(); err == nil { - go func(s io.ReadWriteCloser) { - buf := make([]byte, 65536) - for { - n, err := s.Read(buf) - if err != nil { - return - } - s.Write(buf[:n]) - } - }(stream) - } else { - return - } - } -} - -func TestEcho(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - stream, _ := session.OpenStream() - const N = 100 - buf := make([]byte, 10) - var sent string - var received string - for i := 0; i < N; i++ { - msg := fmt.Sprintf("hello%v", i) - stream.Write([]byte(msg)) - sent += msg - if n, err := stream.Read(buf); err != nil { - t.Fatal(err) - } else { - received += string(buf[:n]) - } - } - if sent != received { - t.Fatal("data mimatch") - } - session.Close() -} - -func TestWriteTo(t *testing.T) { - const N = 1 << 20 - // server - ln, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatal(err) - } - defer ln.Close() - - go func() { - conn, err := ln.Accept() - if err != nil { - return - } - session, _ := Server(conn, nil) - for { - if stream, err := session.AcceptStream(); err == nil { - go func(s io.ReadWriteCloser) { - numBytes := 0 - buf := make([]byte, 65536) - for { - n, err := s.Read(buf) - if err != nil { - return - } - s.Write(buf[:n]) - numBytes += n - - if numBytes == N { - s.Close() - return - } - } - }(stream) - } else { - return - } - } - }() - - addr := ln.Addr().String() - conn, err := net.Dial("tcp", addr) - if err != nil { - t.Fatal(err) - } - defer conn.Close() - - // client - session, _ := Client(conn, nil) - stream, _ := session.OpenStream() - sndbuf := make([]byte, N) - for i := range sndbuf { - sndbuf[i] = byte(rand.Int()) - } - - go stream.Write(sndbuf) - - var rcvbuf bytes.Buffer - nw, ew := stream.WriteTo(&rcvbuf) - if ew != io.EOF { - t.Fatal(ew) - } - - if nw != N { - t.Fatal("WriteTo nw mismatch", nw) - } - - if !bytes.Equal(sndbuf, rcvbuf.Bytes()) { - t.Fatal("mismatched echo bytes") - } -} - -func TestWriteToV2(t *testing.T) { - config := DefaultConfig() - config.Version = 2 - const N = 1 << 20 - // server - ln, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatal(err) - } - defer ln.Close() - - go func() { - conn, err := ln.Accept() - if err != nil { - return - } - session, _ := Server(conn, config) - for { - if stream, err := session.AcceptStream(); err == nil { - go func(s io.ReadWriteCloser) { - numBytes := 0 - buf := make([]byte, 65536) - for { - n, err := s.Read(buf) - if err != nil { - return - } - s.Write(buf[:n]) - numBytes += n - - if numBytes == N { - s.Close() - return - } - } - }(stream) - } else { - return - } - } - }() - - addr := ln.Addr().String() - conn, err := net.Dial("tcp", addr) - if err != nil { - t.Fatal(err) - } - defer conn.Close() - - // client - session, _ := Client(conn, config) - stream, _ := session.OpenStream() - sndbuf := make([]byte, N) - for i := range sndbuf { - sndbuf[i] = byte(rand.Int()) - } - - go stream.Write(sndbuf) - - var rcvbuf bytes.Buffer - nw, ew := stream.WriteTo(&rcvbuf) - if ew != io.EOF { - t.Fatal(ew) - } - - if nw != N { - t.Fatal("WriteTo nw mismatch", nw) - } - - if !bytes.Equal(sndbuf, rcvbuf.Bytes()) { - t.Fatal("mismatched echo bytes") - } -} - -func TestGetDieCh(t *testing.T) { - cs, ss, err := getSmuxStreamPair() - if err != nil { - t.Fatal(err) - } - defer ss.Close() - dieCh := ss.GetDieCh() - go func() { - select { - case <-dieCh: - case <-time.Tick(time.Second): - t.Fatal("wait die chan timeout") - } - }() - cs.Close() -} - -func TestSpeed(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - stream, _ := session.OpenStream() - t.Log(stream.LocalAddr(), stream.RemoteAddr()) - - start := time.Now() - var wg sync.WaitGroup - wg.Add(1) - go func() { - buf := make([]byte, 1024*1024) - nrecv := 0 - for { - n, err := stream.Read(buf) - if err != nil { - t.Error(err) - break - } else { - nrecv += n - if nrecv == 4096*4096 { - break - } - } - } - stream.Close() - t.Log("time for 16MB rtt", time.Since(start)) - wg.Done() - }() - msg := make([]byte, 8192) - for i := 0; i < 2048; i++ { - stream.Write(msg) - } - wg.Wait() - session.Close() -} - -func TestParallel(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - - par := 1000 - messages := 100 - var wg sync.WaitGroup - wg.Add(par) - for i := 0; i < par; i++ { - stream, _ := session.OpenStream() - go func(s *Stream) { - buf := make([]byte, 20) - for j := 0; j < messages; j++ { - msg := fmt.Sprintf("hello%v", j) - s.Write([]byte(msg)) - if _, err := s.Read(buf); err != nil { - break - } - } - s.Close() - wg.Done() - }(stream) - } - t.Log("created", session.NumStreams(), "streams") - wg.Wait() - session.Close() -} - -func TestParallelV2(t *testing.T) { - config := DefaultConfig() - config.Version = 2 - _, stop, cli, err := setupServerV2(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, config) - - par := 1000 - messages := 100 - var wg sync.WaitGroup - wg.Add(par) - for i := 0; i < par; i++ { - stream, _ := session.OpenStream() - go func(s *Stream) { - buf := make([]byte, 20) - for j := 0; j < messages; j++ { - msg := fmt.Sprintf("hello%v", j) - s.Write([]byte(msg)) - if _, err := s.Read(buf); err != nil { - break - } - } - s.Close() - wg.Done() - }(stream) - } - t.Log("created", session.NumStreams(), "streams") - wg.Wait() - session.Close() -} - -func TestCloseThenOpen(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - session.Close() - if _, err := session.OpenStream(); err == nil { - t.Fatal("opened after close") - } -} - -func TestSessionDoubleClose(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - session.Close() - if err := session.Close(); err == nil { - t.Fatal("session double close doesn't return error") - } -} - -func TestStreamDoubleClose(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - stream, _ := session.OpenStream() - stream.Close() - if err := stream.Close(); err == nil { - t.Fatal("stream double close doesn't return error") - } - session.Close() -} - -func TestConcurrentClose(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - numStreams := 100 - streams := make([]*Stream, 0, numStreams) - var wg sync.WaitGroup - wg.Add(numStreams) - for i := 0; i < 100; i++ { - stream, _ := session.OpenStream() - streams = append(streams, stream) - } - for _, s := range streams { - stream := s - go func() { - stream.Close() - wg.Done() - }() - } - session.Close() - wg.Wait() -} - -func TestTinyReadBuffer(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - stream, _ := session.OpenStream() - const N = 100 - tinybuf := make([]byte, 6) - var sent string - var received string - for i := 0; i < N; i++ { - msg := fmt.Sprintf("hello%v", i) - sent += msg - nsent, err := stream.Write([]byte(msg)) - if err != nil { - t.Fatal("cannot write") - } - nrecv := 0 - for nrecv < nsent { - if n, err := stream.Read(tinybuf); err == nil { - nrecv += n - received += string(tinybuf[:n]) - } else { - t.Fatal("cannot read with tiny buffer") - } - } - } - - if sent != received { - t.Fatal("data mimatch") - } - session.Close() -} - -func TestIsClose(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - session.Close() - if !session.IsClosed() { - t.Fatal("still open after close") - } -} - -func TestKeepAliveTimeout(t *testing.T) { - ln, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatal(err) - } - defer ln.Close() - go func() { - ln.Accept() - }() - - cli, err := net.Dial("tcp", ln.Addr().String()) - if err != nil { - t.Fatal(err) - } - defer cli.Close() - - config := DefaultConfig() - config.KeepAliveInterval = time.Second - config.KeepAliveTimeout = 2 * time.Second - session, _ := Client(cli, config) - time.Sleep(3 * time.Second) - if !session.IsClosed() { - t.Fatal("keepalive-timeout failed") - } -} - -type blockWriteConn struct { - net.Conn -} - -func (c *blockWriteConn) Write(b []byte) (n int, err error) { - forever := time.Hour * 24 - time.Sleep(forever) - return c.Conn.Write(b) -} - -func TestKeepAliveBlockWriteTimeout(t *testing.T) { - ln, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatal(err) - } - defer ln.Close() - go func() { - ln.Accept() - }() - - cli, err := net.Dial("tcp", ln.Addr().String()) - if err != nil { - t.Fatal(err) - } - defer cli.Close() - //when writeFrame block, keepalive in old version never timeout - blockWriteCli := &blockWriteConn{cli} - - config := DefaultConfig() - config.KeepAliveInterval = time.Second - config.KeepAliveTimeout = 2 * time.Second - session, _ := Client(blockWriteCli, config) - time.Sleep(3 * time.Second) - if !session.IsClosed() { - t.Fatal("keepalive-timeout failed") - } -} - -func TestServerEcho(t *testing.T) { - ln, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatal(err) - } - defer ln.Close() - go func() { - err := func() error { - conn, err := ln.Accept() - if err != nil { - return err - } - defer conn.Close() - session, err := Server(conn, nil) - if err != nil { - return err - } - defer session.Close() - buf := make([]byte, 10) - stream, err := session.OpenStream() - if err != nil { - return err - } - defer stream.Close() - for i := 0; i < 100; i++ { - msg := fmt.Sprintf("hello%v", i) - stream.Write([]byte(msg)) - n, err := stream.Read(buf) - if err != nil { - return err - } - if got := string(buf[:n]); got != msg { - return fmt.Errorf("got: %q, want: %q", got, msg) - } - } - return nil - }() - if err != nil { - t.Error(err) - } - }() - - cli, err := net.Dial("tcp", ln.Addr().String()) - if err != nil { - t.Fatal(err) - } - defer cli.Close() - if session, err := Client(cli, nil); err == nil { - if stream, err := session.AcceptStream(); err == nil { - buf := make([]byte, 65536) - for { - n, err := stream.Read(buf) - if err != nil { - break - } - stream.Write(buf[:n]) - } - } else { - t.Fatal(err) - } - } else { - t.Fatal(err) - } -} - -func TestSendWithoutRecv(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - stream, _ := session.OpenStream() - const N = 100 - for i := 0; i < N; i++ { - msg := fmt.Sprintf("hello%v", i) - stream.Write([]byte(msg)) - } - buf := make([]byte, 1) - if _, err := stream.Read(buf); err != nil { - t.Fatal(err) - } - stream.Close() -} - -func TestWriteAfterClose(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - stream, _ := session.OpenStream() - stream.Close() - if _, err := stream.Write([]byte("write after close")); err == nil { - t.Fatal("write after close failed") - } -} - -func TestReadStreamAfterSessionClose(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - stream, _ := session.OpenStream() - session.Close() - buf := make([]byte, 10) - if _, err := stream.Read(buf); err != nil { - t.Log(err) - } else { - t.Fatal("read stream after session close succeeded") - } -} - -func TestWriteStreamAfterConnectionClose(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - stream, _ := session.OpenStream() - session.conn.Close() - if _, err := stream.Write([]byte("write after connection close")); err == nil { - t.Fatal("write after connection close failed") - } -} - -func TestNumStreamAfterClose(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - if _, err := session.OpenStream(); err == nil { - if session.NumStreams() != 1 { - t.Fatal("wrong number of streams after opened") - } - session.Close() - if session.NumStreams() != 0 { - t.Fatal("wrong number of streams after session closed") - } - } else { - t.Fatal(err) - } - cli.Close() -} - -func TestRandomFrame(t *testing.T) { - addr, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - // pure random - session, _ := Client(cli, nil) - for i := 0; i < 100; i++ { - rnd := make([]byte, rand.Uint32()%1024) - io.ReadFull(crand.Reader, rnd) - session.conn.Write(rnd) - } - cli.Close() - - // double syn - cli, err = net.Dial("tcp", addr) - if err != nil { - t.Fatal(err) - } - session, _ = Client(cli, nil) - for i := 0; i < 100; i++ { - f := newFrame(1, cmdSYN, 1000) - session.writeFrame(f) - } - cli.Close() - - // random cmds - cli, err = net.Dial("tcp", addr) - if err != nil { - t.Fatal(err) - } - allcmds := []byte{cmdSYN, cmdFIN, cmdPSH, cmdNOP} - session, _ = Client(cli, nil) - for i := 0; i < 100; i++ { - f := newFrame(1, allcmds[rand.Int()%len(allcmds)], rand.Uint32()) - session.writeFrame(f) - } - cli.Close() - - // random cmds & sids - cli, err = net.Dial("tcp", addr) - if err != nil { - t.Fatal(err) - } - session, _ = Client(cli, nil) - for i := 0; i < 100; i++ { - f := newFrame(1, byte(rand.Uint32()), rand.Uint32()) - session.writeFrame(f) - } - cli.Close() - - // random version - cli, err = net.Dial("tcp", addr) - if err != nil { - t.Fatal(err) - } - session, _ = Client(cli, nil) - for i := 0; i < 100; i++ { - f := newFrame(1, byte(rand.Uint32()), rand.Uint32()) - f.ver = byte(rand.Uint32()) - session.writeFrame(f) - } - cli.Close() - - // incorrect size - cli, err = net.Dial("tcp", addr) - if err != nil { - t.Fatal(err) - } - session, _ = Client(cli, nil) - - f := newFrame(1, byte(rand.Uint32()), rand.Uint32()) - rnd := make([]byte, rand.Uint32()%1024) - io.ReadFull(crand.Reader, rnd) - f.data = rnd - - buf := make([]byte, headerSize+len(f.data)) - buf[0] = f.ver - buf[1] = f.cmd - binary.LittleEndian.PutUint16(buf[2:], uint16(len(rnd)+1)) /// incorrect size - binary.LittleEndian.PutUint32(buf[4:], f.sid) - copy(buf[headerSize:], f.data) - - session.conn.Write(buf) - cli.Close() - - // writeFrame after die - cli, err = net.Dial("tcp", addr) - if err != nil { - t.Fatal(err) - } - session, _ = Client(cli, nil) - //close first - session.Close() - for i := 0; i < 100; i++ { - f := newFrame(1, byte(rand.Uint32()), rand.Uint32()) - session.writeFrame(f) - } -} - -func TestWriteFrameInternal(t *testing.T) { - addr, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - // pure random - session, _ := Client(cli, nil) - for i := 0; i < 100; i++ { - rnd := make([]byte, rand.Uint32()%1024) - io.ReadFull(crand.Reader, rnd) - session.conn.Write(rnd) - } - cli.Close() - - // writeFrame after die - cli, err = net.Dial("tcp", addr) - if err != nil { - t.Fatal(err) - } - session, _ = Client(cli, nil) - //close first - session.Close() - for i := 0; i < 100; i++ { - f := newFrame(1, byte(rand.Uint32()), rand.Uint32()) - session.writeFrameInternal(f, time.After(session.config.KeepAliveTimeout), CLSDATA) - } - - // random cmds - cli, err = net.Dial("tcp", addr) - if err != nil { - t.Fatal(err) - } - allcmds := []byte{cmdSYN, cmdFIN, cmdPSH, cmdNOP} - session, _ = Client(cli, nil) - for i := 0; i < 100; i++ { - f := newFrame(1, allcmds[rand.Int()%len(allcmds)], rand.Uint32()) - session.writeFrameInternal(f, time.After(session.config.KeepAliveTimeout), CLSDATA) - } - //deadline occur - { - c := make(chan time.Time) - close(c) - f := newFrame(1, allcmds[rand.Int()%len(allcmds)], rand.Uint32()) - _, err := session.writeFrameInternal(f, c, CLSDATA) - if !strings.Contains(err.Error(), "timeout") { - t.Fatal("write frame with deadline failed", err) - } - } - cli.Close() - - { - cli, err = net.Dial("tcp", addr) - if err != nil { - t.Fatal(err) - } - config := DefaultConfig() - config.KeepAliveInterval = time.Second - config.KeepAliveTimeout = 2 * time.Second - session, _ = Client(&blockWriteConn{cli}, config) - f := newFrame(1, byte(rand.Uint32()), rand.Uint32()) - c := make(chan time.Time) - go func() { - //die first, deadline second, better for coverage - time.Sleep(time.Second) - session.Close() - time.Sleep(time.Second) - close(c) - }() - _, err = session.writeFrameInternal(f, c, CLSDATA) - if !strings.Contains(err.Error(), "closed pipe") { - t.Fatal("write frame with to closed conn failed", err) - } - } -} - -func TestReadDeadline(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - stream, _ := session.OpenStream() - const N = 100 - buf := make([]byte, 10) - var readErr error - for i := 0; i < N; i++ { - stream.SetReadDeadline(time.Now().Add(-1 * time.Minute)) - if _, readErr = stream.Read(buf); readErr != nil { - break - } - } - if readErr != nil { - if !strings.Contains(readErr.Error(), "timeout") { - t.Fatalf("Wrong error: %v", readErr) - } - } else { - t.Fatal("No error when reading with past deadline") - } - session.Close() -} - -func TestWriteDeadline(t *testing.T) { - _, stop, cli, err := setupServer(t) - if err != nil { - t.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - stream, _ := session.OpenStream() - buf := make([]byte, 10) - var writeErr error - for { - stream.SetWriteDeadline(time.Now().Add(-1 * time.Minute)) - if _, writeErr = stream.Write(buf); writeErr != nil { - if !strings.Contains(writeErr.Error(), "timeout") { - t.Fatalf("Wrong error: %v", writeErr) - } - break - } - } - session.Close() -} - -func BenchmarkAcceptClose(b *testing.B) { - _, stop, cli, err := setupServer(b) - if err != nil { - b.Fatal(err) - } - defer stop() - session, _ := Client(cli, nil) - for i := 0; i < b.N; i++ { - if stream, err := session.OpenStream(); err == nil { - stream.Close() - } else { - b.Fatal(err) - } - } -} -func BenchmarkConnSmux(b *testing.B) { - cs, ss, err := getSmuxStreamPair() - if err != nil { - b.Fatal(err) - } - defer cs.Close() - defer ss.Close() - bench(b, cs, ss) -} - -func BenchmarkConnTCP(b *testing.B) { - cs, ss, err := getTCPConnectionPair() - if err != nil { - b.Fatal(err) - } - defer cs.Close() - defer ss.Close() - bench(b, cs, ss) -} - -func getSmuxStreamPair() (*Stream, *Stream, error) { - c1, c2, err := getTCPConnectionPair() - if err != nil { - return nil, nil, err - } - - s, err := Server(c2, nil) - if err != nil { - return nil, nil, err - } - c, err := Client(c1, nil) - if err != nil { - return nil, nil, err - } - var ss *Stream - done := make(chan error) - go func() { - var rerr error - ss, rerr = s.AcceptStream() - done <- rerr - close(done) - }() - cs, err := c.OpenStream() - if err != nil { - return nil, nil, err - } - err = <-done - if err != nil { - return nil, nil, err - } - - return cs, ss, nil -} - -func getTCPConnectionPair() (net.Conn, net.Conn, error) { - lst, err := net.Listen("tcp", "localhost:0") - if err != nil { - return nil, nil, err - } - defer lst.Close() - - var conn0 net.Conn - var err0 error - done := make(chan struct{}) - go func() { - conn0, err0 = lst.Accept() - close(done) - }() - - conn1, err := net.Dial("tcp", lst.Addr().String()) - if err != nil { - return nil, nil, err - } - - <-done - if err0 != nil { - return nil, nil, err0 - } - return conn0, conn1, nil -} - -func bench(b *testing.B, rd io.Reader, wr io.Writer) { - buf := make([]byte, 128*1024) - buf2 := make([]byte, 128*1024) - b.SetBytes(128 * 1024) - b.ResetTimer() - b.ReportAllocs() - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - count := 0 - for { - n, _ := rd.Read(buf2) - count += n - if count == 128*1024*b.N { - return - } - } - }() - for i := 0; i < b.N; i++ { - wr.Write(buf) - } - wg.Wait() -} diff --git a/pkg/smux/shaper.go b/pkg/smux/shaper.go index 8d52ef7..27ea4e4 100644 --- a/pkg/smux/shaper.go +++ b/pkg/smux/shaper.go @@ -1,12 +1,42 @@ +// MIT License +// +// Copyright (c) 2016-2017 xtaci +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + package smux +// _itimediff returns the time difference between two uint32 values. +// The result is a signed 32-bit integer representing the difference between 'later' and 'earlier'. func _itimediff(later, earlier uint32) int32 { return (int32)(later - earlier) } +// shaperHeap is a min-heap of writeRequest. +// It orders writeRequests by class first, then by sequence number within the same class. type shaperHeap []writeRequest func (h shaperHeap) Len() int { return len(h) } + +// Less determines the ordering of elements in the heap. +// Requests are ordered by their class first. If two requests have the same class, +// they are ordered by their sequence numbers. func (h shaperHeap) Less(i, j int) bool { if h[i].class != h[j].class { return h[i].class < h[j].class diff --git a/pkg/smux/shaper_test.go b/pkg/smux/shaper_test.go deleted file mode 100644 index b02e317..0000000 --- a/pkg/smux/shaper_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package smux - -import ( - "container/heap" - "testing" -) - -func TestShaper(t *testing.T) { - w1 := writeRequest{seq: 1} - w2 := writeRequest{seq: 2} - w3 := writeRequest{seq: 3} - w4 := writeRequest{seq: 4} - w5 := writeRequest{seq: 5} - - var reqs shaperHeap - heap.Push(&reqs, w5) - heap.Push(&reqs, w4) - heap.Push(&reqs, w3) - heap.Push(&reqs, w2) - heap.Push(&reqs, w1) - - for len(reqs) > 0 { - w := heap.Pop(&reqs).(writeRequest) - t.Log("sid:", w.frame.sid, "seq:", w.seq) - } -} - -func TestShaper2(t *testing.T) { - w1 := writeRequest{class: CLSDATA, seq: 1} // stream 0 - w2 := writeRequest{class: CLSDATA, seq: 2} - w3 := writeRequest{class: CLSDATA, seq: 3} - w4 := writeRequest{class: CLSDATA, seq: 4} - w5 := writeRequest{class: CLSDATA, seq: 5} - w6 := writeRequest{class: CLSCTRL, seq: 6, frame: Frame{sid: 10}} // ctrl 1 - w7 := writeRequest{class: CLSCTRL, seq: 7, frame: Frame{sid: 11}} // ctrl 2 - - var reqs shaperHeap - heap.Push(&reqs, w6) - heap.Push(&reqs, w5) - heap.Push(&reqs, w4) - heap.Push(&reqs, w3) - heap.Push(&reqs, w2) - heap.Push(&reqs, w1) - heap.Push(&reqs, w7) - - for len(reqs) > 0 { - w := heap.Pop(&reqs).(writeRequest) - t.Log("sid:", w.frame.sid, "seq:", w.seq) - } -} diff --git a/pkg/smux/stream.go b/pkg/smux/stream.go index b463323..6b1f043 100644 --- a/pkg/smux/stream.go +++ b/pkg/smux/stream.go @@ -1,3 +1,25 @@ +// MIT License +// +// Copyright (c) 2016-2017 xtaci +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + package smux import ( @@ -11,36 +33,41 @@ import ( "github.com/nadoo/glider/pkg/pool" ) -// Stream implements net.Conn +// wrapper for GC type Stream struct { - id uint32 + *stream +} + +// Stream implements net.Conn +type stream struct { + id uint32 // Stream identifier sess *Session - buffers [][]byte - heads [][]byte // slice heads kept for recycle + buffers [][]byte // the sequential buffers of stream + heads [][]byte // slice heads of the buffers above, kept for recycle - bufferLock sync.Mutex - frameSize int + bufferLock sync.Mutex // Mutex to protect access to buffers + frameSize int // Maximum frame size for the stream // notify a read event chReadEvent chan struct{} // flag the stream has closed die chan struct{} - dieOnce sync.Once + dieOnce sync.Once // Ensures die channel is closed only once // FIN command chFinEvent chan struct{} - finEventOnce sync.Once + finEventOnce sync.Once // Ensures chFinEvent is closed only once // deadlines readDeadline atomic.Value writeDeadline atomic.Value // per stream sliding window control - numRead uint32 // number of consumed bytes + numRead uint32 // count num of bytes read numWritten uint32 // count num of bytes written - incr uint32 // counting for sending + incr uint32 // bytes sent since last window update // UPD command peerConsumed uint32 // num of bytes the peer has consumed @@ -48,9 +75,9 @@ type Stream struct { chUpdate chan struct{} // notify of remote data consuming and window update } -// newStream initiates a Stream struct -func newStream(id uint32, frameSize int, sess *Session) *Stream { - s := new(Stream) +// newStream initializes and returns a new Stream. +func newStream(id uint32, frameSize int, sess *Session) *stream { + s := new(stream) s.id = id s.chReadEvent = make(chan struct{}, 1) s.chUpdate = make(chan struct{}, 1) @@ -59,16 +86,17 @@ func newStream(id uint32, frameSize int, sess *Session) *Stream { s.die = make(chan struct{}) s.chFinEvent = make(chan struct{}) s.peerWindow = initialPeerWindow // set to initial window size + return s } -// ID returns the unique stream ID. -func (s *Stream) ID() uint32 { +// ID returns the stream's unique identifier. +func (s *stream) ID() uint32 { return s.id } -// Read implements net.Conn -func (s *Stream) Read(b []byte) (n int, err error) { +// Read reads data from the stream into the provided buffer. +func (s *stream) Read(b []byte) (n int, err error) { for { n, err = s.tryRead(b) if err == ErrWouldBlock { @@ -81,8 +109,8 @@ func (s *Stream) Read(b []byte) (n int, err error) { } } -// tryRead is the nonblocking version of Read -func (s *Stream) tryRead(b []byte) (n int, err error) { +// tryRead attempts to read data from the stream without blocking. +func (s *stream) tryRead(b []byte) (n int, err error) { if s.sess.config.Version == 2 { return s.tryReadv2(b) } @@ -91,6 +119,7 @@ func (s *Stream) tryRead(b []byte) (n int, err error) { return 0, nil } + // A critical section to copy data from buffers to s.bufferLock.Lock() if len(s.buffers) > 0 { n = copy(b, s.buffers[0]) @@ -118,7 +147,8 @@ func (s *Stream) tryRead(b []byte) (n int, err error) { } } -func (s *Stream) tryReadv2(b []byte) (n int, err error) { +// tryReadv2 is the non-blocking version of Read for version 2 streams. +func (s *stream) tryReadv2(b []byte) (n int, err error) { if len(b) == 0 { return 0, nil } @@ -139,20 +169,25 @@ func (s *Stream) tryReadv2(b []byte) (n int, err error) { // in an ideal environment: // if more than half of buffer has consumed, send read ack to peer - // based on round-trip time of ACK, continuous flowing data - // won't slow down because of waiting for ACK, as long as the - // consumer keeps on reading data - // s.numRead == n also notify window at the first read + // based on round-trip time of ACK, continous flowing data + // won't slow down due to waiting for ACK, as long as the + // consumer keeps on reading data. + // + // s.numRead == n implies that it's the initial reading s.numRead += uint32(n) s.incr += uint32(n) + + // for initial reading, send window update if s.incr >= uint32(s.sess.config.MaxStreamBuffer/2) || s.numRead == uint32(n) { notifyConsumed = s.numRead - s.incr = 0 + s.incr = 0 // reset couting for next window update } s.bufferLock.Unlock() if n > 0 { s.sess.returnTokens(n) + + // send window update if necessary if notifyConsumed > 0 { err := s.sendWindowUpdate(notifyConsumed) return n, err @@ -170,7 +205,12 @@ func (s *Stream) tryReadv2(b []byte) (n int, err error) { } // WriteTo implements io.WriteTo -func (s *Stream) WriteTo(w io.Writer) (n int64, err error) { +// WriteTo writes data to w until there's no more data to write or when an error occurs. +// The return value n is the number of bytes written. Any error encountered during the write is also returned. +// WriteTo calls Write in a loop until there is no more data to write or when an error occurs. +// If the underlying stream is a v2 stream, it will send window update to peer when necessary. +// If the underlying stream is a v1 stream, it will not send window update to peer. +func (s *stream) WriteTo(w io.Writer) (n int64, err error) { if s.sess.config.Version == 2 { return s.writeTov2(w) } @@ -187,6 +227,7 @@ func (s *Stream) WriteTo(w io.Writer) (n int64, err error) { if buf != nil { nw, ew := w.Write(buf) + // NOTE: WriteTo is a reader, so we need to return tokens here s.sess.returnTokens(len(buf)) pool.PutBuffer(buf) if nw > 0 { @@ -202,7 +243,8 @@ func (s *Stream) WriteTo(w io.Writer) (n int64, err error) { } } -func (s *Stream) writeTov2(w io.Writer) (n int64, err error) { +// check comments in WriteTo +func (s *stream) writeTov2(w io.Writer) (n int64, err error) { for { var notifyConsumed uint32 var buf []byte @@ -222,6 +264,7 @@ func (s *Stream) writeTov2(w io.Writer) (n int64, err error) { if buf != nil { nw, ew := w.Write(buf) + // NOTE: WriteTo is a reader, so we need to return tokens here s.sess.returnTokens(len(buf)) pool.PutBuffer(buf) if nw > 0 { @@ -243,7 +286,8 @@ func (s *Stream) writeTov2(w io.Writer) (n int64, err error) { } } -func (s *Stream) sendWindowUpdate(consumed uint32) error { +// sendWindowUpdate sends a window update frame to the peer. +func (s *stream) sendWindowUpdate(consumed uint32) error { var timer *time.Timer var deadline <-chan time.Time if d, ok := s.readDeadline.Load().(time.Time); ok && !d.IsZero() { @@ -257,11 +301,12 @@ func (s *Stream) sendWindowUpdate(consumed uint32) error { binary.LittleEndian.PutUint32(hdr[:], consumed) binary.LittleEndian.PutUint32(hdr[4:], uint32(s.sess.config.MaxStreamBuffer)) frame.data = hdr[:] - _, err := s.sess.writeFrameInternal(frame, deadline, CLSDATA) + _, err := s.sess.writeFrameInternal(frame, deadline, CLSCTRL) return err } -func (s *Stream) waitRead() error { +// waitRead blocks until a read event occurs or a deadline is reached. +func (s *stream) waitRead() error { var timer *time.Timer var deadline <-chan time.Time if d, ok := s.readDeadline.Load().(time.Time); ok && !d.IsZero() { @@ -271,10 +316,10 @@ func (s *Stream) waitRead() error { } select { - case <-s.chReadEvent: + case <-s.chReadEvent: // notify some data has arrived, or closed return nil case <-s.chFinEvent: - // BUG(xtaci): Fix for https://github.com/xtaci/smux/issues/82 + // BUGFIX(xtaci): Fix for https://github.com/xtaci/smux/issues/82 s.bufferLock.Lock() defer s.bufferLock.Unlock() if len(s.buffers) > 0 { @@ -297,7 +342,7 @@ func (s *Stream) waitRead() error { // // Note that the behavior when multiple goroutines write concurrently is not deterministic, // frames may interleave in random way. -func (s *Stream) Write(b []byte) (n int, err error) { +func (s *stream) Write(b []byte) (n int, err error) { if s.sess.config.Version == 2 { return s.writeV2(b) } @@ -311,6 +356,8 @@ func (s *Stream) Write(b []byte) (n int, err error) { // check if stream has closed select { + case <-s.chFinEvent: // passive closing + return 0, io.EOF case <-s.die: return 0, io.ErrClosedPipe default: @@ -338,7 +385,8 @@ func (s *Stream) Write(b []byte) (n int, err error) { return sent, nil } -func (s *Stream) writeV2(b []byte) (n int, err error) { +// writeV2 writes data to the stream for version 2 streams. +func (s *stream) writeV2(b []byte) (n int, err error) { // check empty input if len(b) == 0 { return 0, nil @@ -346,6 +394,8 @@ func (s *Stream) writeV2(b []byte) (n int, err error) { // check if stream has closed select { + case <-s.chFinEvent: + return 0, io.EOF case <-s.die: return 0, io.ErrClosedPipe default: @@ -372,14 +422,18 @@ func (s *Stream) writeV2(b []byte) (n int, err error) { // even if uint32 overflow, this math still works: // eg1: uint32(0) - uint32(math.MaxUint32) = 1 // eg2: int32(uint32(0) - uint32(1)) = -1 - // security check for misbehavior + // + // basicially, you can take it as a MODULAR ARITHMETIC inflight := int32(atomic.LoadUint32(&s.numWritten) - atomic.LoadUint32(&s.peerConsumed)) - if inflight < 0 { + if inflight < 0 { // security check for malformed data return 0, ErrConsumed } + // make sure you understand 'win' is calculated in modular arithmetic(2^32(4GB)) win := int32(atomic.LoadUint32(&s.peerWindow)) - inflight + if win > 0 { + // determine how many bytes to send if win > int32(len(b)) { bts = b b = nil @@ -388,13 +442,17 @@ func (s *Stream) writeV2(b []byte) (n int, err error) { b = b[win:] } + // frame split and transmit for len(bts) > 0 { + // splitting frame sz := len(bts) if sz > s.frameSize { sz = s.frameSize } frame.data = bts[:sz] bts = bts[sz:] + + // transmit of frame n, err := s.sess.writeFrameInternal(frame, deadline, CLSDATA) atomic.AddUint32(&s.numWritten, uint32(sz)) sent += n @@ -404,12 +462,12 @@ func (s *Stream) writeV2(b []byte) (n int, err error) { } } - // if there is any data remaining to be sent + // if there is any data left to be sent, // wait until stream closes, window changes or deadline reached - // this blocking behavior will inform upper layer to do flow control + // this blocking behavior will back propagate flow control to upper layer. if len(b) > 0 { select { - case <-s.chFinEvent: // if fin arrived, future window update is impossible + case <-s.chFinEvent: return 0, io.EOF case <-s.die: return sent, io.ErrClosedPipe @@ -417,7 +475,7 @@ func (s *Stream) writeV2(b []byte) (n int, err error) { return sent, ErrTimeout case <-s.sess.chSocketWriteError: return sent, s.sess.socketWriteError.Load().(error) - case <-s.chUpdate: + case <-s.chUpdate: // notify of remote data consuming and window update continue } } else { @@ -427,7 +485,7 @@ func (s *Stream) writeV2(b []byte) (n int, err error) { } // Close implements net.Conn -func (s *Stream) Close() error { +func (s *stream) Close() error { var once bool var err error s.dieOnce.Do(func() { @@ -436,7 +494,13 @@ func (s *Stream) Close() error { }) if once { - _, err = s.sess.writeFrame(newFrame(byte(s.sess.config.Version), cmdFIN, s.id)) + // send FIN in order + f := newFrame(byte(s.sess.config.Version), cmdFIN, s.id) + + timer := time.NewTimer(openCloseTimeout) + defer timer.Stop() + + _, err = s.sess.writeFrameInternal(f, timer.C, CLSDATA) s.sess.streamClosed(s.id) return err } else { @@ -446,14 +510,14 @@ func (s *Stream) Close() error { // GetDieCh returns a readonly chan which can be readable // when the stream is to be closed. -func (s *Stream) GetDieCh() <-chan struct{} { +func (s *stream) GetDieCh() <-chan struct{} { return s.die } // SetReadDeadline sets the read deadline as defined by // net.Conn.SetReadDeadline. // A zero time value disables the deadline. -func (s *Stream) SetReadDeadline(t time.Time) error { +func (s *stream) SetReadDeadline(t time.Time) error { s.readDeadline.Store(t) s.notifyReadEvent() return nil @@ -462,7 +526,7 @@ func (s *Stream) SetReadDeadline(t time.Time) error { // SetWriteDeadline sets the write deadline as defined by // net.Conn.SetWriteDeadline. // A zero time value disables the deadline. -func (s *Stream) SetWriteDeadline(t time.Time) error { +func (s *stream) SetWriteDeadline(t time.Time) error { s.writeDeadline.Store(t) return nil } @@ -470,7 +534,7 @@ func (s *Stream) SetWriteDeadline(t time.Time) error { // SetDeadline sets both read and write deadlines as defined by // net.Conn.SetDeadline. // A zero time value disables the deadlines. -func (s *Stream) SetDeadline(t time.Time) error { +func (s *stream) SetDeadline(t time.Time) error { if err := s.SetReadDeadline(t); err != nil { return err } @@ -481,10 +545,10 @@ func (s *Stream) SetDeadline(t time.Time) error { } // session closes -func (s *Stream) sessionClose() { s.dieOnce.Do(func() { close(s.die) }) } +func (s *stream) sessionClose() { s.dieOnce.Do(func() { close(s.die) }) } // LocalAddr satisfies net.Conn interface -func (s *Stream) LocalAddr() net.Addr { +func (s *stream) LocalAddr() net.Addr { if ts, ok := s.sess.conn.(interface { LocalAddr() net.Addr }); ok { @@ -494,7 +558,7 @@ func (s *Stream) LocalAddr() net.Addr { } // RemoteAddr satisfies net.Conn interface -func (s *Stream) RemoteAddr() net.Addr { +func (s *stream) RemoteAddr() net.Addr { if ts, ok := s.sess.conn.(interface { RemoteAddr() net.Addr }); ok { @@ -504,7 +568,7 @@ func (s *Stream) RemoteAddr() net.Addr { } // pushBytes append buf to buffers -func (s *Stream) pushBytes(buf []byte) (written int, err error) { +func (s *stream) pushBytes(buf []byte) (written int, err error) { s.bufferLock.Lock() s.buffers = append(s.buffers, buf) s.heads = append(s.heads, buf) @@ -513,7 +577,7 @@ func (s *Stream) pushBytes(buf []byte) (written int, err error) { } // recycleTokens transform remaining bytes to tokens(will truncate buffer) -func (s *Stream) recycleTokens() (n int) { +func (s *stream) recycleTokens() (n int) { s.bufferLock.Lock() for k := range s.buffers { n += len(s.buffers[k]) @@ -526,7 +590,7 @@ func (s *Stream) recycleTokens() (n int) { } // notify read event -func (s *Stream) notifyReadEvent() { +func (s *stream) notifyReadEvent() { select { case s.chReadEvent <- struct{}{}: default: @@ -534,7 +598,7 @@ func (s *Stream) notifyReadEvent() { } // update command -func (s *Stream) update(consumed uint32, window uint32) { +func (s *stream) update(consumed uint32, window uint32) { atomic.StoreUint32(&s.peerConsumed, consumed) atomic.StoreUint32(&s.peerWindow, window) select { @@ -544,7 +608,7 @@ func (s *Stream) update(consumed uint32, window uint32) { } // mark this stream has been closed in protocol -func (s *Stream) fin() { +func (s *stream) fin() { s.finEventOnce.Do(func() { close(s.chFinEvent) }) diff --git a/proxy/http/client.go b/proxy/http/client.go index b2f3f7c..6562ccc 100644 --- a/proxy/http/client.go +++ b/proxy/http/client.go @@ -33,6 +33,8 @@ func (s *HTTP) Dial(network, addr string) (net.Conn, error) { } buf := pool.GetBytesBuffer() + defer pool.PutBytesBuffer(buf) + buf.WriteString("CONNECT " + addr + " HTTP/1.1\r\n") buf.WriteString("Host: " + addr + "\r\n") buf.WriteString("Proxy-Connection: Keep-Alive\r\n") @@ -45,7 +47,6 @@ func (s *HTTP) Dial(network, addr string) (net.Conn, error) { // header ended buf.WriteString("\r\n") _, err = rc.Write(buf.Bytes()) - pool.PutBytesBuffer(buf) if err != nil { return nil, err } diff --git a/proxy/tproxy/server.go b/proxy/tproxy/server.go index b14b443..2f88d3f 100644 --- a/proxy/tproxy/server.go +++ b/proxy/tproxy/server.go @@ -1,3 +1,5 @@ +//go:build linux + package tproxy import ( diff --git a/proxy/tproxy/tproxy.go b/proxy/tproxy/tproxy.go index fda1fa5..bfa86d4 100644 --- a/proxy/tproxy/tproxy.go +++ b/proxy/tproxy/tproxy.go @@ -1,3 +1,5 @@ +//go:build linux + package tproxy import ( diff --git a/proxy/vsock/client.go b/proxy/vsock/client.go index 191a514..26bccef 100644 --- a/proxy/vsock/client.go +++ b/proxy/vsock/client.go @@ -1,3 +1,5 @@ +//go:build linux + package vsock import ( diff --git a/proxy/vsock/server.go b/proxy/vsock/server.go index bd608e3..011dc07 100644 --- a/proxy/vsock/server.go +++ b/proxy/vsock/server.go @@ -1,3 +1,5 @@ +//go:build linux + package vsock import ( diff --git a/proxy/vsock/socket.go b/proxy/vsock/socket.go index 0eae0d1..fbfbc61 100644 --- a/proxy/vsock/socket.go +++ b/proxy/vsock/socket.go @@ -1,6 +1,7 @@ +//go:build linux + // Source code from: // https://github.com/linuxkit/virtsock/tree/master/pkg/vsock - package vsock import ( diff --git a/proxy/vsock/vsock.go b/proxy/vsock/vsock.go index b03b440..2fa8c70 100644 --- a/proxy/vsock/vsock.go +++ b/proxy/vsock/vsock.go @@ -1,3 +1,5 @@ +//go:build linux + package vsock import ( diff --git a/service/dhcpd/cilent.go b/service/dhcpd/cilent.go index 3b8ba01..7101be7 100644 --- a/service/dhcpd/cilent.go +++ b/service/dhcpd/cilent.go @@ -1,3 +1,5 @@ +//go:build linux + package dhcpd import ( diff --git a/service/dhcpd/dhcpd.go b/service/dhcpd/dhcpd.go index 58d971f..a5c2715 100644 --- a/service/dhcpd/dhcpd.go +++ b/service/dhcpd/dhcpd.go @@ -1,3 +1,5 @@ +//go:build linux + package dhcpd import ( diff --git a/service/dhcpd/pool.go b/service/dhcpd/pool.go index 4cf94a1..908182f 100644 --- a/service/dhcpd/pool.go +++ b/service/dhcpd/pool.go @@ -1,3 +1,5 @@ +//go:build linux + package dhcpd import ( diff --git a/service/dhcpd/reply.go b/service/dhcpd/reply.go index 39f6949..1b1ba39 100644 --- a/service/dhcpd/reply.go +++ b/service/dhcpd/reply.go @@ -1,3 +1,5 @@ +//go:build linux + package dhcpd import (