From 7e800555d77419eb7db43ebf058124b00575bee8 Mon Sep 17 00:00:00 2001 From: nadoo <287492+nadoo@users.noreply.github.com> Date: Thu, 9 Mar 2023 18:36:30 +0800 Subject: [PATCH] chore(deps): update smux pkg --- .github/workflows/build.yml | 2 +- go.mod | 2 +- go.sum | 4 +-- pkg/smux/session.go | 59 ++++++++++++++++++++++++++----------- pkg/smux/session_test.go | 8 ++--- pkg/smux/shaper.go | 16 ++++++---- pkg/smux/shaper_test.go | 42 ++++++++++++++++++-------- pkg/smux/stream.go | 20 +++++++++---- 8 files changed, 104 insertions(+), 49 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index df0cbf3..e017e27 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -117,7 +117,7 @@ jobs: type=semver,pattern={{major}}.{{minor}} - name: Docker - Build and push - uses: docker/build-push-action@v3 + uses: docker/build-push-action@v4 with: context: . file: .Dockerfile diff --git a/go.mod b/go.mod index 222335f..b85a9a8 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/dgryski/go-camellia v0.0.0-20191119043421-69a8a13fb23d github.com/dgryski/go-idea v0.0.0-20170306091226-d2fb45a411fb github.com/dgryski/go-rc2 v0.0.0-20150621095337-8a9021637152 - github.com/insomniacslk/dhcp v0.0.0-20230301142404-3e45eea5edd7 + github.com/insomniacslk/dhcp v0.0.0-20230307103557-e252950ab961 github.com/nadoo/conflag v0.3.1 github.com/nadoo/ipset v0.5.0 github.com/xtaci/kcp-go/v5 v5.6.2 diff --git a/go.sum b/go.sum index f6e01a6..56bbe12 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= -github.com/insomniacslk/dhcp v0.0.0-20230301142404-3e45eea5edd7 h1:Fg8rHYs8luh8kCSAHDUIQCNMkn74Gvr1o5YPZdNRgY0= -github.com/insomniacslk/dhcp v0.0.0-20230301142404-3e45eea5edd7/go.mod h1:I9wtoXVkcRwQJ+U9nhxzZytbnT1xjn2DzUjxQ8Qegpc= +github.com/insomniacslk/dhcp v0.0.0-20230307103557-e252950ab961 h1:x/YtdDlmypenG1te/FfH6LVM+3krhXk5CFV8VYNNX5M= +github.com/insomniacslk/dhcp v0.0.0-20230307103557-e252950ab961/go.mod h1:IKrnDWs3/Mqq5n0lI+RxA2sB7MvN/vbMBP3ehXg65UI= github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/josharian/native v1.0.1-0.20221213033349-c1e37c09b531/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= diff --git a/pkg/smux/session.go b/pkg/smux/session.go index de2132e..7d76401 100644 --- a/pkg/smux/session.go +++ b/pkg/smux/session.go @@ -4,10 +4,8 @@ import ( "container/heap" "encoding/binary" "errors" - "fmt" "io" "net" - "os" "sync" "sync/atomic" "time" @@ -17,20 +15,30 @@ import ( const ( defaultAcceptBacklog = 1024 + maxShaperSize = 1024 + openCloseTimeout = 30 * time.Second // stream open/close timeout +) + +// define frame class +type CLASSID int + +const ( + CLSCTRL CLASSID = iota + CLSDATA ) var ( ErrInvalidProtocol = errors.New("invalid protocol") ErrConsumed = errors.New("peer consumed more than sent") ErrGoAway = errors.New("stream id overflows, should start a new connection") - // ErrTimeout = errors.New("timeout") - ErrTimeout = fmt.Errorf("smux: %w", os.ErrDeadlineExceeded) - ErrWouldBlock = errors.New("operation would block on IO") + ErrTimeout = errors.New("timeout") + ErrWouldBlock = errors.New("operation would block on IO") ) type writeRequest struct { - prio uint32 + class CLASSID frame Frame + seq uint32 result chan writeResult } @@ -39,10 +47,6 @@ type writeResult struct { err error } -type buffersWriter interface { - WriteBuffers(v [][]byte) (n int, err error) -} - // Session defines a multiplexed connection for streams type Session struct { conn io.ReadWriteCloser @@ -81,8 +85,9 @@ type Session struct { deadline atomic.Value - shaper chan writeRequest // a shaper for writing - writes chan writeRequest + requestID uint32 // write request monotonic increasing + shaper chan writeRequest // a shaper for writing + writes chan writeRequest } func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { @@ -401,7 +406,7 @@ func (s *Session) keepalive() { for { select { case <-tickerPing.C: - s.writeFrameInternal(newFrame(byte(s.config.Version), cmdNOP, 0), tickerPing.C, 0) + s.writeFrameInternal(newFrame(byte(s.config.Version), cmdNOP, 0), tickerPing.C, CLSCTRL) s.notifyBucket() // force a signal to the recvLoop case <-tickerTimeout.C: if !atomic.CompareAndSwapInt32(&s.dataReady, 1, 0) { @@ -423,8 +428,10 @@ func (s *Session) shaperLoop() { var reqs shaperHeap var next writeRequest var chWrite chan writeRequest + var chShaper chan writeRequest for { + // chWrite is not available until it has packet to send if len(reqs) > 0 { chWrite = s.writes next = heap.Pop(&reqs).(writeRequest) @@ -432,10 +439,22 @@ func (s *Session) shaperLoop() { chWrite = nil } + // control heap size, chShaper is not available until packets are less than maximum allowed + if len(reqs) >= maxShaperSize { + chShaper = nil + } else { + chShaper = s.shaper + } + + // assertion on non nil + if chShaper == nil && chWrite == nil { + panic("both channel are nil") + } + select { case <-s.die: return - case r := <-s.shaper: + case r := <-chShaper: if chWrite != nil { // next is valid, reshape heap.Push(&reqs, next) } @@ -451,7 +470,10 @@ func (s *Session) sendLoop() { var err error var vec [][]byte // vector for writeBuffers - bw, ok := s.conn.(buffersWriter) + bw, ok := s.conn.(interface { + WriteBuffers(v [][]byte) (n int, err error) + }) + if ok { buf = make([]byte, headerSize) vec = make([][]byte, 2) @@ -503,14 +525,15 @@ func (s *Session) sendLoop() { // writeFrame writes the frame to the underlying connection // and returns the number of bytes written if successful func (s *Session) writeFrame(f Frame) (n int, err error) { - return s.writeFrameInternal(f, nil, 0) + return s.writeFrameInternal(f, time.After(openCloseTimeout), CLSCTRL) } // internal writeFrame version to support deadline used in keepalive -func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, prio uint32) (int, error) { +func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, class CLASSID) (int, error) { req := writeRequest{ - prio: prio, + class: class, frame: f, + seq: atomic.AddUint32(&s.requestID, 1), result: make(chan writeResult, 1), } select { diff --git a/pkg/smux/session_test.go b/pkg/smux/session_test.go index 3479570..1c49ae7 100644 --- a/pkg/smux/session_test.go +++ b/pkg/smux/session_test.go @@ -867,7 +867,7 @@ func TestWriteFrameInternal(t *testing.T) { session.Close() for i := 0; i < 100; i++ { f := newFrame(1, byte(rand.Uint32()), rand.Uint32()) - session.writeFrameInternal(f, time.After(session.config.KeepAliveTimeout), 0) + session.writeFrameInternal(f, time.After(session.config.KeepAliveTimeout), CLSDATA) } // random cmds @@ -879,14 +879,14 @@ func TestWriteFrameInternal(t *testing.T) { session, _ = Client(cli, nil) for i := 0; i < 100; i++ { f := newFrame(1, allcmds[rand.Int()%len(allcmds)], rand.Uint32()) - session.writeFrameInternal(f, time.After(session.config.KeepAliveTimeout), 0) + session.writeFrameInternal(f, time.After(session.config.KeepAliveTimeout), CLSDATA) } //deadline occur { c := make(chan time.Time) close(c) f := newFrame(1, allcmds[rand.Int()%len(allcmds)], rand.Uint32()) - _, err := session.writeFrameInternal(f, c, 0) + _, err := session.writeFrameInternal(f, c, CLSDATA) if !strings.Contains(err.Error(), "timeout") { t.Fatal("write frame with deadline failed", err) } @@ -911,7 +911,7 @@ func TestWriteFrameInternal(t *testing.T) { time.Sleep(time.Second) close(c) }() - _, err = session.writeFrameInternal(f, c, 0) + _, err = session.writeFrameInternal(f, c, CLSDATA) if !strings.Contains(err.Error(), "closed pipe") { t.Fatal("write frame with to closed conn failed", err) } diff --git a/pkg/smux/shaper.go b/pkg/smux/shaper.go index cc38aed..8d52ef7 100644 --- a/pkg/smux/shaper.go +++ b/pkg/smux/shaper.go @@ -6,12 +6,18 @@ func _itimediff(later, earlier uint32) int32 { type shaperHeap []writeRequest -func (h shaperHeap) Len() int { return len(h) } -func (h shaperHeap) Less(i, j int) bool { return _itimediff(h[j].prio, h[i].prio) > 0 } -func (h shaperHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h *shaperHeap) Push(x any) { *h = append(*h, x.(writeRequest)) } +func (h shaperHeap) Len() int { return len(h) } +func (h shaperHeap) Less(i, j int) bool { + if h[i].class != h[j].class { + return h[i].class < h[j].class + } + return _itimediff(h[j].seq, h[i].seq) > 0 +} -func (h *shaperHeap) Pop() any { +func (h shaperHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *shaperHeap) Push(x interface{}) { *h = append(*h, x.(writeRequest)) } + +func (h *shaperHeap) Pop() interface{} { old := *h n := len(old) x := old[n-1] diff --git a/pkg/smux/shaper_test.go b/pkg/smux/shaper_test.go index 572f4ae..b02e317 100644 --- a/pkg/smux/shaper_test.go +++ b/pkg/smux/shaper_test.go @@ -6,11 +6,11 @@ import ( ) func TestShaper(t *testing.T) { - w1 := writeRequest{prio: 10} - w2 := writeRequest{prio: 10} - w3 := writeRequest{prio: 20} - w4 := writeRequest{prio: 100} - w5 := writeRequest{prio: (1 << 32) - 1} + w1 := writeRequest{seq: 1} + w2 := writeRequest{seq: 2} + w3 := writeRequest{seq: 3} + w4 := writeRequest{seq: 4} + w5 := writeRequest{seq: 5} var reqs shaperHeap heap.Push(&reqs, w5) @@ -19,14 +19,32 @@ func TestShaper(t *testing.T) { heap.Push(&reqs, w2) heap.Push(&reqs, w1) - var lastPrio = reqs[0].prio for len(reqs) > 0 { w := heap.Pop(&reqs).(writeRequest) - if int32(w.prio-lastPrio) < 0 { - t.Fatal("incorrect shaper priority") - } - - t.Log("prio:", w.prio) - lastPrio = w.prio + t.Log("sid:", w.frame.sid, "seq:", w.seq) + } +} + +func TestShaper2(t *testing.T) { + w1 := writeRequest{class: CLSDATA, seq: 1} // stream 0 + w2 := writeRequest{class: CLSDATA, seq: 2} + w3 := writeRequest{class: CLSDATA, seq: 3} + w4 := writeRequest{class: CLSDATA, seq: 4} + w5 := writeRequest{class: CLSDATA, seq: 5} + w6 := writeRequest{class: CLSCTRL, seq: 6, frame: Frame{sid: 10}} // ctrl 1 + w7 := writeRequest{class: CLSCTRL, seq: 7, frame: Frame{sid: 11}} // ctrl 2 + + var reqs shaperHeap + heap.Push(&reqs, w6) + heap.Push(&reqs, w5) + heap.Push(&reqs, w4) + heap.Push(&reqs, w3) + heap.Push(&reqs, w2) + heap.Push(&reqs, w1) + heap.Push(&reqs, w7) + + for len(reqs) > 0 { + w := heap.Pop(&reqs).(writeRequest) + t.Log("sid:", w.frame.sid, "seq:", w.seq) } } diff --git a/pkg/smux/stream.go b/pkg/smux/stream.go index 231f741..02fa74a 100644 --- a/pkg/smux/stream.go +++ b/pkg/smux/stream.go @@ -139,7 +139,7 @@ func (s *Stream) tryReadv2(b []byte) (n int, err error) { // in an ideal environment: // if more than half of buffer has consumed, send read ack to peer - // based on round-trip time of ACK, continuous flowing data + // based on round-trip time of ACK, continous flowing data // won't slow down because of waiting for ACK, as long as the // consumer keeps on reading data // s.numRead == n also notify window at the first read @@ -156,8 +156,9 @@ func (s *Stream) tryReadv2(b []byte) (n int, err error) { if notifyConsumed > 0 { err := s.sendWindowUpdate(notifyConsumed) return n, err + } else { + return n, nil } - return n, nil } select { @@ -256,7 +257,7 @@ func (s *Stream) sendWindowUpdate(consumed uint32) error { binary.LittleEndian.PutUint32(hdr[:], consumed) binary.LittleEndian.PutUint32(hdr[4:], uint32(s.sess.config.MaxStreamBuffer)) frame.data = hdr[:] - _, err := s.sess.writeFrameInternal(frame, deadline, 0) + _, err := s.sess.writeFrameInternal(frame, deadline, CLSDATA) return err } @@ -273,6 +274,12 @@ func (s *Stream) waitRead() error { case <-s.chReadEvent: return nil case <-s.chFinEvent: + // BUG(xtaci): Fix for https://github.com/xtaci/smux/issues/82 + s.bufferLock.Lock() + defer s.bufferLock.Unlock() + if len(s.buffers) > 0 { + return nil + } return io.EOF case <-s.sess.chSocketReadError: return s.sess.socketReadError.Load().(error) @@ -320,7 +327,7 @@ func (s *Stream) Write(b []byte) (n int, err error) { } frame.data = bts[:sz] bts = bts[sz:] - n, err := s.sess.writeFrameInternal(frame, deadline, s.numWritten) + n, err := s.sess.writeFrameInternal(frame, deadline, CLSDATA) s.numWritten++ sent += n if err != nil { @@ -388,7 +395,7 @@ func (s *Stream) writeV2(b []byte) (n int, err error) { } frame.data = bts[:sz] bts = bts[sz:] - n, err := s.sess.writeFrameInternal(frame, deadline, atomic.LoadUint32(&s.numWritten)) + n, err := s.sess.writeFrameInternal(frame, deadline, CLSDATA) atomic.AddUint32(&s.numWritten, uint32(sz)) sent += n if err != nil { @@ -432,8 +439,9 @@ func (s *Stream) Close() error { _, err = s.sess.writeFrame(newFrame(byte(s.sess.config.Version), cmdFIN, s.id)) s.sess.streamClosed(s.id) return err + } else { + return io.ErrClosedPipe } - return io.ErrClosedPipe } // GetDieCh returns a readonly chan which can be readable