direct.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package ray
  2. import (
  3. "errors"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/v2ray/v2ray-core/common/alloc"
  8. )
  9. const (
  10. bufferSize = 128
  11. )
  12. var (
  13. ErrIOTimeout = errors.New("IO Timeout")
  14. )
  15. // NewRay creates a new Ray for direct traffic transport.
  16. func NewRay() Ray {
  17. return &directRay{
  18. Input: NewStream(),
  19. Output: NewStream(),
  20. }
  21. }
  22. type directRay struct {
  23. Input *Stream
  24. Output *Stream
  25. }
  26. func (this *directRay) OutboundInput() InputStream {
  27. return this.Input
  28. }
  29. func (this *directRay) OutboundOutput() OutputStream {
  30. return this.Output
  31. }
  32. func (this *directRay) InboundInput() OutputStream {
  33. return this.Input
  34. }
  35. func (this *directRay) InboundOutput() InputStream {
  36. return this.Output
  37. }
  38. type Stream struct {
  39. access sync.RWMutex
  40. closed bool
  41. buffer chan *alloc.Buffer
  42. }
  43. func NewStream() *Stream {
  44. return &Stream{
  45. buffer: make(chan *alloc.Buffer, bufferSize),
  46. }
  47. }
  48. func (this *Stream) Read() (*alloc.Buffer, error) {
  49. if this.buffer == nil {
  50. return nil, io.EOF
  51. }
  52. this.access.RLock()
  53. if this.buffer == nil {
  54. this.access.RUnlock()
  55. return nil, io.EOF
  56. }
  57. channel := this.buffer
  58. this.access.RUnlock()
  59. result, open := <-channel
  60. if !open {
  61. return nil, io.EOF
  62. }
  63. return result, nil
  64. }
  65. func (this *Stream) Write(data *alloc.Buffer) error {
  66. if this.closed {
  67. return io.EOF
  68. }
  69. for {
  70. err := this.TryWriteOnce(data)
  71. if err != ErrIOTimeout {
  72. return err
  73. }
  74. }
  75. }
  76. func (this *Stream) TryWriteOnce(data *alloc.Buffer) error {
  77. this.access.RLock()
  78. defer this.access.RUnlock()
  79. if this.closed {
  80. return io.EOF
  81. }
  82. select {
  83. case this.buffer <- data:
  84. return nil
  85. case <-time.After(2 * time.Second):
  86. return ErrIOTimeout
  87. }
  88. }
  89. func (this *Stream) Close() {
  90. if this.closed {
  91. return
  92. }
  93. this.access.Lock()
  94. defer this.access.Unlock()
  95. if this.closed {
  96. return
  97. }
  98. this.closed = true
  99. close(this.buffer)
  100. }
  101. func (this *Stream) Release() {
  102. if this.buffer == nil {
  103. return
  104. }
  105. this.Close()
  106. this.access.Lock()
  107. defer this.access.Unlock()
  108. if this.buffer == nil {
  109. return
  110. }
  111. for data := range this.buffer {
  112. data.Release()
  113. }
  114. this.buffer = nil
  115. }