glider/proxy/anytls/stream.go
2026-06-23 23:17:09 +08:00

97 lines
1.9 KiB
Go

package anytls
import (
"errors"
"io"
"net"
"sync"
"time"
)
var errStreamClosed = errors.New("stream closed")
type stream struct {
id uint32
s *session
in chan []byte
readBuf []byte
closeIn sync.Once
closeOut sync.Once
mu sync.Mutex
closed bool
}
func newStream(id uint32, s *session) *stream {
return &stream{id: id, s: s, in: make(chan []byte, 32)}
}
func (st *stream) Read(p []byte) (int, error) {
for len(st.readBuf) == 0 {
b, ok := <-st.in
if !ok {
return 0, io.EOF
}
st.readBuf = b
}
n := copy(p, st.readBuf)
st.readBuf = st.readBuf[n:]
return n, nil
}
func (st *stream) Write(p []byte) (int, error) {
st.mu.Lock()
closed := st.closed
st.mu.Unlock()
if closed {
return 0, errStreamClosed
}
written := 0
for len(p) > 0 {
n := len(p)
if n > maxFrameData {
n = maxFrameData
}
if err := st.s.writeFrame(frame{command: cmdPSH, streamID: st.id, data: p[:n]}); err != nil {
return written, err
}
written += n
p = p[n:]
}
return written, nil
}
func (st *stream) Close() error {
st.mu.Lock()
already := st.closed
st.closed = true
st.mu.Unlock()
if !already {
st.closeOut.Do(func() {
_ = st.s.writeFrame(frame{command: cmdFIN, streamID: st.id})
})
st.s.removeStream(st.id)
}
return nil
}
func (st *stream) closeRead() {
st.closeIn.Do(func() { close(st.in) })
}
func (st *stream) push(data []byte) {
cp := make([]byte, len(data))
copy(cp, data)
select {
case st.in <- cp:
case <-st.s.done:
}
}
func (st *stream) LocalAddr() net.Addr { return st.s.conn.LocalAddr() }
func (st *stream) RemoteAddr() net.Addr { return st.s.conn.RemoteAddr() }
func (st *stream) SetDeadline(t time.Time) error { return st.s.conn.SetDeadline(t) }
func (st *stream) SetReadDeadline(t time.Time) error { return st.s.conn.SetReadDeadline(t) }
func (st *stream) SetWriteDeadline(t time.Time) error { return st.s.conn.SetWriteDeadline(t) }