direct.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package ray
  2. import (
  3. "io"
  4. "sync"
  5. "github.com/v2ray/v2ray-core/common/alloc"
  6. )
  7. const (
  8. bufferSize = 128
  9. )
  10. // NewRay creates a new Ray for direct traffic transport.
  11. func NewRay() Ray {
  12. return &directRay{
  13. Input: NewStream(),
  14. Output: NewStream(),
  15. }
  16. }
  17. type directRay struct {
  18. Input *Stream
  19. Output *Stream
  20. }
  21. func (this *directRay) OutboundInput() InputStream {
  22. return this.Input
  23. }
  24. func (this *directRay) OutboundOutput() OutputStream {
  25. return this.Output
  26. }
  27. func (this *directRay) InboundInput() OutputStream {
  28. return this.Input
  29. }
  30. func (this *directRay) InboundOutput() InputStream {
  31. return this.Output
  32. }
  33. type Stream struct {
  34. access sync.RWMutex
  35. closed bool
  36. buffer chan *alloc.Buffer
  37. }
  38. func NewStream() *Stream {
  39. return &Stream{
  40. buffer: make(chan *alloc.Buffer, bufferSize),
  41. }
  42. }
  43. func (this *Stream) Read() (*alloc.Buffer, error) {
  44. if this.buffer == nil {
  45. return nil, io.EOF
  46. }
  47. this.access.RLock()
  48. defer this.access.RUnlock()
  49. if this.buffer == nil {
  50. return nil, io.EOF
  51. }
  52. result, open := <-this.buffer
  53. if !open {
  54. return nil, io.EOF
  55. }
  56. return result, nil
  57. }
  58. func (this *Stream) Write(data *alloc.Buffer) error {
  59. if this.closed {
  60. return io.EOF
  61. }
  62. if this.buffer == nil {
  63. return io.EOF
  64. }
  65. this.access.RLock()
  66. defer this.access.RUnlock()
  67. if this.buffer == nil {
  68. return io.EOF
  69. }
  70. this.buffer <- data
  71. return nil
  72. }
  73. func (this *Stream) Close() {
  74. if this.closed {
  75. return
  76. }
  77. this.access.RLock()
  78. defer this.access.RUnlock()
  79. if this.closed {
  80. return
  81. }
  82. this.closed = true
  83. close(this.buffer)
  84. }
  85. func (this *Stream) Release() {
  86. if this.buffer == nil {
  87. return
  88. }
  89. this.Close()
  90. this.access.Lock()
  91. defer this.access.Unlock()
  92. if this.buffer == nil {
  93. return
  94. }
  95. for data := range this.buffer {
  96. data.Release()
  97. }
  98. this.buffer = nil
  99. }