direct.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package ray
  2. import (
  3. "errors"
  4. "io"
  5. "time"
  6. "v2ray.com/core/common/buf"
  7. )
  8. const (
  9. bufferSize = 512
  10. )
  11. var ErrReadTimeout = errors.New("Ray: timeout.")
  12. // NewRay creates a new Ray for direct traffic transport.
  13. func NewRay() Ray {
  14. return &directRay{
  15. Input: NewStream(),
  16. Output: NewStream(),
  17. }
  18. }
  19. type directRay struct {
  20. Input *Stream
  21. Output *Stream
  22. }
  23. func (v *directRay) OutboundInput() InputStream {
  24. return v.Input
  25. }
  26. func (v *directRay) OutboundOutput() OutputStream {
  27. return v.Output
  28. }
  29. func (v *directRay) InboundInput() OutputStream {
  30. return v.Input
  31. }
  32. func (v *directRay) InboundOutput() InputStream {
  33. return v.Output
  34. }
  35. func (v *directRay) AddInspector(inspector Inspector) {
  36. if inspector == nil {
  37. return
  38. }
  39. v.Input.inspector.AddInspector(inspector)
  40. v.Output.inspector.AddInspector(inspector)
  41. }
  42. type Stream struct {
  43. buffer chan *buf.Buffer
  44. close chan bool
  45. err chan bool
  46. inspector *InspectorChain
  47. }
  48. func NewStream() *Stream {
  49. return &Stream{
  50. buffer: make(chan *buf.Buffer, bufferSize),
  51. close: make(chan bool),
  52. err: make(chan bool),
  53. inspector: &InspectorChain{},
  54. }
  55. }
  56. func (v *Stream) Read() (*buf.Buffer, error) {
  57. select {
  58. case <-v.err:
  59. return nil, io.ErrClosedPipe
  60. case b := <-v.buffer:
  61. return b, nil
  62. default:
  63. select {
  64. case b := <-v.buffer:
  65. return b, nil
  66. case <-v.close:
  67. return nil, io.EOF
  68. case <-v.err:
  69. return nil, io.ErrClosedPipe
  70. }
  71. }
  72. }
  73. func (v *Stream) ReadTimeout(timeout time.Duration) (*buf.Buffer, error) {
  74. select {
  75. case <-v.err:
  76. return nil, io.ErrClosedPipe
  77. case b := <-v.buffer:
  78. return b, nil
  79. default:
  80. select {
  81. case b := <-v.buffer:
  82. return b, nil
  83. case <-v.close:
  84. return nil, io.EOF
  85. case <-v.err:
  86. return nil, io.ErrClosedPipe
  87. case <-time.After(timeout):
  88. return nil, ErrReadTimeout
  89. }
  90. }
  91. }
  92. func (v *Stream) Write(data *buf.Buffer) (err error) {
  93. if data.IsEmpty() {
  94. return
  95. }
  96. select {
  97. case <-v.err:
  98. return io.ErrClosedPipe
  99. case <-v.close:
  100. return io.ErrClosedPipe
  101. default:
  102. select {
  103. case <-v.err:
  104. return io.ErrClosedPipe
  105. case <-v.close:
  106. return io.ErrClosedPipe
  107. case v.buffer <- data:
  108. v.inspector.Input(data)
  109. return nil
  110. }
  111. }
  112. }
  113. func (v *Stream) Close() {
  114. defer swallowPanic()
  115. close(v.close)
  116. }
  117. func (v *Stream) CloseError() {
  118. defer swallowPanic()
  119. close(v.err)
  120. n := len(v.buffer)
  121. for i := 0; i < n; i++ {
  122. select {
  123. case b := <-v.buffer:
  124. b.Release()
  125. default:
  126. return
  127. }
  128. }
  129. }
  130. func (v *Stream) Release() {}
  131. func swallowPanic() {
  132. recover()
  133. }