direct.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package ray
  2. import (
  3. "io"
  4. "v2ray.com/core/common/buf"
  5. )
  6. const (
  7. bufferSize = 512
  8. )
  9. // NewRay creates a new Ray for direct traffic transport.
  10. func NewRay() Ray {
  11. return &directRay{
  12. Input: NewStream(),
  13. Output: NewStream(),
  14. }
  15. }
  16. type directRay struct {
  17. Input *Stream
  18. Output *Stream
  19. }
  20. func (v *directRay) OutboundInput() InputStream {
  21. return v.Input
  22. }
  23. func (v *directRay) OutboundOutput() OutputStream {
  24. return v.Output
  25. }
  26. func (v *directRay) InboundInput() OutputStream {
  27. return v.Input
  28. }
  29. func (v *directRay) InboundOutput() InputStream {
  30. return v.Output
  31. }
  32. type Stream struct {
  33. buffer chan *buf.Buffer
  34. }
  35. func NewStream() *Stream {
  36. return &Stream{
  37. buffer: make(chan *buf.Buffer, bufferSize),
  38. }
  39. }
  40. func (v *Stream) Read() (*buf.Buffer, error) {
  41. buffer, open := <-v.buffer
  42. if !open {
  43. return nil, io.EOF
  44. }
  45. return buffer, nil
  46. }
  47. func (v *Stream) Write(data *buf.Buffer) (err error) {
  48. defer func() {
  49. if r := recover(); r != nil {
  50. err = io.ErrClosedPipe
  51. }
  52. }()
  53. v.buffer <- data
  54. return nil
  55. }
  56. func (v *Stream) Close() {
  57. defer swallowPanic()
  58. close(v.buffer)
  59. }
  60. func (v *Stream) Release() {
  61. defer swallowPanic()
  62. close(v.buffer)
  63. for b := range v.buffer {
  64. b.Release()
  65. }
  66. }
  67. func swallowPanic() {
  68. recover()
  69. }