direct.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package ray
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "time"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/buf"
  9. "v2ray.com/core/common/platform"
  10. "v2ray.com/core/common/signal"
  11. )
  12. type Option func(*Stream)
  13. type addInt64 interface {
  14. Add(int64) int64
  15. }
  16. func WithStatCounter(c addInt64) Option {
  17. return func(s *Stream) {
  18. s.onDataSize = append(s.onDataSize, func(delta uint64) {
  19. c.Add(int64(delta))
  20. })
  21. }
  22. }
  23. // New creates a new Ray for direct traffic transport.
  24. func New(ctx context.Context, opts ...Option) Ray {
  25. return &directRay{
  26. Input: NewStream(ctx, opts...),
  27. Output: NewStream(ctx, opts...),
  28. }
  29. }
  30. type directRay struct {
  31. Input *Stream
  32. Output *Stream
  33. }
  34. func (v *directRay) OutboundInput() InputStream {
  35. return v.Input
  36. }
  37. func (v *directRay) OutboundOutput() OutputStream {
  38. return v.Output
  39. }
  40. func (v *directRay) InboundInput() OutputStream {
  41. return v.Input
  42. }
  43. func (v *directRay) InboundOutput() InputStream {
  44. return v.Output
  45. }
  46. var streamSizeLimit uint64 = 10 * 1024 * 1024
  47. func init() {
  48. const raySizeEnvKey = "v2ray.ray.buffer.size"
  49. size := platform.EnvFlag{
  50. Name: raySizeEnvKey,
  51. AltName: platform.NormalizeEnvName(raySizeEnvKey),
  52. }.GetValueAsInt(10)
  53. streamSizeLimit = uint64(size) * 1024 * 1024
  54. }
  55. // Stream is a sequential container for data in bytes.
  56. type Stream struct {
  57. access sync.RWMutex
  58. data buf.MultiBuffer
  59. size uint64
  60. ctx context.Context
  61. readSignal *signal.Notifier
  62. writeSignal *signal.Notifier
  63. onDataSize []func(uint64)
  64. close bool
  65. err bool
  66. }
  67. // NewStream creates a new Stream.
  68. func NewStream(ctx context.Context, opts ...Option) *Stream {
  69. s := &Stream{
  70. ctx: ctx,
  71. readSignal: signal.NewNotifier(),
  72. writeSignal: signal.NewNotifier(),
  73. size: 0,
  74. }
  75. for _, opt := range opts {
  76. opt(s)
  77. }
  78. return s
  79. }
  80. func (s *Stream) getData() (buf.MultiBuffer, error) {
  81. s.access.Lock()
  82. defer s.access.Unlock()
  83. if s.data != nil {
  84. mb := s.data
  85. s.data = nil
  86. s.size = 0
  87. return mb, nil
  88. }
  89. if s.err {
  90. return nil, io.ErrClosedPipe
  91. }
  92. if s.close {
  93. return nil, io.EOF
  94. }
  95. return nil, nil
  96. }
  97. // Peek fills in the given buffer with data from head of the Stream.
  98. func (s *Stream) Peek(b *buf.Buffer) {
  99. s.access.RLock()
  100. defer s.access.RUnlock()
  101. common.Must(b.Reset(func(data []byte) (int, error) {
  102. return s.data.Copy(data), nil
  103. }))
  104. }
  105. // ReadMultiBuffer reads data from the Stream.
  106. func (s *Stream) ReadMultiBuffer() (buf.MultiBuffer, error) {
  107. for {
  108. mb, err := s.getData()
  109. if err != nil {
  110. return nil, err
  111. }
  112. if mb != nil {
  113. s.readSignal.Signal()
  114. return mb, nil
  115. }
  116. select {
  117. case <-s.ctx.Done():
  118. return nil, s.ctx.Err()
  119. case <-s.writeSignal.Wait():
  120. }
  121. }
  122. }
  123. // ReadTimeout reads from the Stream with a specified timeout.
  124. func (s *Stream) ReadTimeout(timeout time.Duration) (buf.MultiBuffer, error) {
  125. for {
  126. mb, err := s.getData()
  127. if err != nil {
  128. return nil, err
  129. }
  130. if mb != nil {
  131. s.readSignal.Signal()
  132. return mb, nil
  133. }
  134. select {
  135. case <-s.ctx.Done():
  136. return nil, s.ctx.Err()
  137. case <-time.After(timeout):
  138. return nil, buf.ErrReadTimeout
  139. case <-s.writeSignal.Wait():
  140. }
  141. }
  142. }
  143. // Size returns the number of bytes hold in the Stream.
  144. func (s *Stream) Size() uint64 {
  145. s.access.RLock()
  146. defer s.access.RUnlock()
  147. return s.size
  148. }
  149. // waitForStreamSize waits until the Stream has room for more data, or any error happens.
  150. func (s *Stream) waitForStreamSize() error {
  151. if streamSizeLimit == 0 {
  152. return nil
  153. }
  154. for s.Size() >= streamSizeLimit {
  155. select {
  156. case <-s.ctx.Done():
  157. return s.ctx.Err()
  158. case <-s.readSignal.Wait():
  159. if s.err || s.close {
  160. return io.ErrClosedPipe
  161. }
  162. }
  163. }
  164. return nil
  165. }
  166. // WriteMultiBuffer writes more data into the Stream.
  167. func (s *Stream) WriteMultiBuffer(data buf.MultiBuffer) error {
  168. if data.IsEmpty() {
  169. return nil
  170. }
  171. if err := s.waitForStreamSize(); err != nil {
  172. data.Release()
  173. return err
  174. }
  175. s.access.Lock()
  176. defer s.access.Unlock()
  177. if s.err || s.close {
  178. data.Release()
  179. return io.ErrClosedPipe
  180. }
  181. if s.data == nil {
  182. s.data = buf.NewMultiBufferCap(128)
  183. }
  184. dataSize := uint64(data.Len())
  185. for _, f := range s.onDataSize {
  186. f(dataSize)
  187. }
  188. s.data.AppendMulti(data)
  189. s.size += dataSize
  190. s.writeSignal.Signal()
  191. return nil
  192. }
  193. // Close closes the stream for writing. Read() still works until EOF.
  194. func (s *Stream) Close() error {
  195. s.access.Lock()
  196. s.close = true
  197. s.readSignal.Signal()
  198. s.writeSignal.Signal()
  199. s.access.Unlock()
  200. return nil
  201. }
  202. // CloseError closes the Stream with error. Read() will return an error afterwards.
  203. func (s *Stream) CloseError() {
  204. s.access.Lock()
  205. s.err = true
  206. if s.data != nil {
  207. s.data.Release()
  208. s.data = nil
  209. s.size = 0
  210. }
  211. s.access.Unlock()
  212. s.readSignal.Signal()
  213. s.writeSignal.Signal()
  214. }