direct.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package ray
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "time"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/buf"
  9. "v2ray.com/core/common/platform"
  10. "v2ray.com/core/common/signal"
  11. )
  12. type Option func(*directRay)
  13. type addInt64 interface {
  14. Add(int64) int64
  15. }
  16. func WithUplinkStatCounter(c addInt64) Option {
  17. return func(s *directRay) {
  18. s.Input.onDataSize = append(s.Input.onDataSize, func(delta uint64) {
  19. c.Add(int64(delta))
  20. })
  21. }
  22. }
  23. func WithDownlinkStatCounter(c addInt64) Option {
  24. return func(s *directRay) {
  25. s.Output.onDataSize = append(s.Output.onDataSize, func(delta uint64) {
  26. c.Add(int64(delta))
  27. })
  28. }
  29. }
  30. // New creates a new Ray for direct traffic transport.
  31. func New(ctx context.Context, opts ...Option) Ray {
  32. r := &directRay{
  33. Input: NewStream(ctx),
  34. Output: NewStream(ctx),
  35. }
  36. for _, opt := range opts {
  37. opt(r)
  38. }
  39. return r
  40. }
  41. type directRay struct {
  42. Input *Stream
  43. Output *Stream
  44. }
  45. func (v *directRay) OutboundInput() InputStream {
  46. return v.Input
  47. }
  48. func (v *directRay) OutboundOutput() OutputStream {
  49. return v.Output
  50. }
  51. func (v *directRay) InboundInput() OutputStream {
  52. return v.Input
  53. }
  54. func (v *directRay) InboundOutput() InputStream {
  55. return v.Output
  56. }
  57. var streamSizeLimit uint64 = 10 * 1024 * 1024
  58. func init() {
  59. const raySizeEnvKey = "v2ray.ray.buffer.size"
  60. size := platform.EnvFlag{
  61. Name: raySizeEnvKey,
  62. AltName: platform.NormalizeEnvName(raySizeEnvKey),
  63. }.GetValueAsInt(10)
  64. streamSizeLimit = uint64(size) * 1024 * 1024
  65. }
  66. // Stream is a sequential container for data in bytes.
  67. type Stream struct {
  68. access sync.RWMutex
  69. data buf.MultiBuffer
  70. size uint64
  71. ctx context.Context
  72. readSignal *signal.Notifier
  73. writeSignal *signal.Notifier
  74. onDataSize []func(uint64)
  75. close bool
  76. err bool
  77. }
  78. // NewStream creates a new Stream.
  79. func NewStream(ctx context.Context) *Stream {
  80. s := &Stream{
  81. ctx: ctx,
  82. readSignal: signal.NewNotifier(),
  83. writeSignal: signal.NewNotifier(),
  84. size: 0,
  85. }
  86. return s
  87. }
  88. func (s *Stream) getData() (buf.MultiBuffer, error) {
  89. s.access.Lock()
  90. defer s.access.Unlock()
  91. if s.data != nil {
  92. mb := s.data
  93. s.data = nil
  94. s.size = 0
  95. return mb, nil
  96. }
  97. if s.err {
  98. return nil, io.ErrClosedPipe
  99. }
  100. if s.close {
  101. return nil, io.EOF
  102. }
  103. return nil, nil
  104. }
  105. // Peek fills in the given buffer with data from head of the Stream.
  106. func (s *Stream) Peek(b *buf.Buffer) {
  107. s.access.RLock()
  108. defer s.access.RUnlock()
  109. common.Must(b.Reset(func(data []byte) (int, error) {
  110. return s.data.Copy(data), nil
  111. }))
  112. }
  113. // ReadMultiBuffer reads data from the Stream.
  114. func (s *Stream) ReadMultiBuffer() (buf.MultiBuffer, error) {
  115. for {
  116. mb, err := s.getData()
  117. if err != nil {
  118. return nil, err
  119. }
  120. if mb != nil {
  121. s.readSignal.Signal()
  122. return mb, nil
  123. }
  124. select {
  125. case <-s.ctx.Done():
  126. return nil, s.ctx.Err()
  127. case <-s.writeSignal.Wait():
  128. }
  129. }
  130. }
  131. // ReadTimeout reads from the Stream with a specified timeout.
  132. func (s *Stream) ReadTimeout(timeout time.Duration) (buf.MultiBuffer, error) {
  133. for {
  134. mb, err := s.getData()
  135. if err != nil {
  136. return nil, err
  137. }
  138. if mb != nil {
  139. s.readSignal.Signal()
  140. return mb, nil
  141. }
  142. select {
  143. case <-s.ctx.Done():
  144. return nil, s.ctx.Err()
  145. case <-time.After(timeout):
  146. return nil, buf.ErrReadTimeout
  147. case <-s.writeSignal.Wait():
  148. }
  149. }
  150. }
  151. // Size returns the number of bytes hold in the Stream.
  152. func (s *Stream) Size() uint64 {
  153. s.access.RLock()
  154. defer s.access.RUnlock()
  155. return s.size
  156. }
  157. // waitForStreamSize waits until the Stream has room for more data, or any error happens.
  158. func (s *Stream) waitForStreamSize() error {
  159. if streamSizeLimit == 0 {
  160. return nil
  161. }
  162. for s.Size() >= streamSizeLimit {
  163. select {
  164. case <-s.ctx.Done():
  165. return s.ctx.Err()
  166. case <-s.readSignal.Wait():
  167. if s.err || s.close {
  168. return io.ErrClosedPipe
  169. }
  170. }
  171. }
  172. return nil
  173. }
  174. // WriteMultiBuffer writes more data into the Stream.
  175. func (s *Stream) WriteMultiBuffer(data buf.MultiBuffer) error {
  176. if data.IsEmpty() {
  177. return nil
  178. }
  179. if err := s.waitForStreamSize(); err != nil {
  180. data.Release()
  181. return err
  182. }
  183. s.access.Lock()
  184. defer s.access.Unlock()
  185. if s.err || s.close {
  186. data.Release()
  187. return io.ErrClosedPipe
  188. }
  189. if s.data == nil {
  190. s.data = buf.NewMultiBufferCap(128)
  191. }
  192. dataSize := uint64(data.Len())
  193. for _, f := range s.onDataSize {
  194. f(dataSize)
  195. }
  196. s.data.AppendMulti(data)
  197. s.size += dataSize
  198. s.writeSignal.Signal()
  199. return nil
  200. }
  201. // Close closes the stream for writing. Read() still works until EOF.
  202. func (s *Stream) Close() error {
  203. s.access.Lock()
  204. s.close = true
  205. s.readSignal.Signal()
  206. s.writeSignal.Signal()
  207. s.access.Unlock()
  208. return nil
  209. }
  210. // CloseError closes the Stream with error. Read() will return an error afterwards.
  211. func (s *Stream) CloseError() {
  212. s.access.Lock()
  213. s.err = true
  214. if s.data != nil {
  215. s.data.Release()
  216. s.data = nil
  217. s.size = 0
  218. }
  219. s.access.Unlock()
  220. s.readSignal.Signal()
  221. s.writeSignal.Signal()
  222. }