direct.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package ray
  2. import (
  3. "errors"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/v2ray/v2ray-core/common/alloc"
  8. )
  9. const (
  10. bufferSize = 128
  11. )
  12. var (
  13. ErrIOTimeout = errors.New("IO Timeout")
  14. )
  15. // NewRay creates a new Ray for direct traffic transport.
  16. func NewRay() Ray {
  17. return &directRay{
  18. Input: NewStream(),
  19. Output: NewStream(),
  20. }
  21. }
  22. type directRay struct {
  23. Input *Stream
  24. Output *Stream
  25. }
  26. func (this *directRay) OutboundInput() InputStream {
  27. return this.Input
  28. }
  29. func (this *directRay) OutboundOutput() OutputStream {
  30. return this.Output
  31. }
  32. func (this *directRay) InboundInput() OutputStream {
  33. return this.Input
  34. }
  35. func (this *directRay) InboundOutput() InputStream {
  36. return this.Output
  37. }
  38. type Stream struct {
  39. access sync.RWMutex
  40. closed bool
  41. buffer chan *alloc.Buffer
  42. }
  43. func NewStream() *Stream {
  44. return &Stream{
  45. buffer: make(chan *alloc.Buffer, bufferSize),
  46. }
  47. }
  48. func (this *Stream) Read() (*alloc.Buffer, error) {
  49. if this.buffer == nil {
  50. return nil, io.EOF
  51. }
  52. this.access.RLock()
  53. if this.buffer == nil {
  54. this.access.RUnlock()
  55. return nil, io.EOF
  56. }
  57. channel := this.buffer
  58. this.access.RUnlock()
  59. result, open := <-channel
  60. if !open {
  61. return nil, io.EOF
  62. }
  63. return result, nil
  64. }
  65. func (this *Stream) Write(data *alloc.Buffer) error {
  66. for !this.closed {
  67. err := this.TryWriteOnce(data)
  68. if err != ErrIOTimeout {
  69. return err
  70. }
  71. }
  72. return io.EOF
  73. }
  74. func (this *Stream) TryWriteOnce(data *alloc.Buffer) error {
  75. this.access.RLock()
  76. defer this.access.RUnlock()
  77. if this.closed {
  78. return io.EOF
  79. }
  80. select {
  81. case this.buffer <- data:
  82. return nil
  83. case <-time.After(2 * time.Second):
  84. return ErrIOTimeout
  85. }
  86. }
  87. func (this *Stream) Close() {
  88. if this.closed {
  89. return
  90. }
  91. this.access.Lock()
  92. defer this.access.Unlock()
  93. if this.closed {
  94. return
  95. }
  96. this.closed = true
  97. close(this.buffer)
  98. }
  99. func (this *Stream) Release() {
  100. if this.buffer == nil {
  101. return
  102. }
  103. this.Close()
  104. this.access.Lock()
  105. defer this.access.Unlock()
  106. if this.buffer == nil {
  107. return
  108. }
  109. for data := range this.buffer {
  110. data.Release()
  111. }
  112. this.buffer = nil
  113. }