outbound.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package commander
  2. import (
  3. "context"
  4. "net"
  5. "sync"
  6. "v2ray.com/core/common"
  7. "v2ray.com/core/common/signal"
  8. "v2ray.com/core/transport/ray"
  9. )
  10. type OutboundListener struct {
  11. buffer chan net.Conn
  12. done *signal.Done
  13. }
  14. func (l *OutboundListener) add(conn net.Conn) {
  15. select {
  16. case l.buffer <- conn:
  17. case <-l.done.C():
  18. conn.Close()
  19. default:
  20. conn.Close()
  21. }
  22. }
  23. // Accept implements net.Listener.
  24. func (l *OutboundListener) Accept() (net.Conn, error) {
  25. select {
  26. case <-l.done.C():
  27. return nil, newError("listen closed")
  28. case c := <-l.buffer:
  29. return c, nil
  30. }
  31. }
  32. // Close implement net.Listener.
  33. func (l *OutboundListener) Close() error {
  34. common.Must(l.done.Close())
  35. L:
  36. for {
  37. select {
  38. case c := <-l.buffer:
  39. c.Close()
  40. default:
  41. break L
  42. }
  43. }
  44. return nil
  45. }
  46. // Addr implements net.Listener.
  47. func (l *OutboundListener) Addr() net.Addr {
  48. return &net.TCPAddr{
  49. IP: net.IP{0, 0, 0, 0},
  50. Port: 0,
  51. }
  52. }
  53. // Outbound is a core.OutboundHandler that handles gRPC connections.
  54. type Outbound struct {
  55. tag string
  56. listener *OutboundListener
  57. access sync.RWMutex
  58. closed bool
  59. }
  60. // Dispatch implements core.OutboundHandler.
  61. func (co *Outbound) Dispatch(ctx context.Context, r ray.OutboundRay) {
  62. co.access.RLock()
  63. if co.closed {
  64. r.OutboundInput().CloseError()
  65. r.OutboundOutput().CloseError()
  66. co.access.RUnlock()
  67. return
  68. }
  69. closeSignal := signal.NewNotifier()
  70. c := ray.NewConnection(r.OutboundInput(), r.OutboundOutput(), ray.ConnCloseSignal(closeSignal))
  71. co.listener.add(c)
  72. co.access.RUnlock()
  73. <-closeSignal.Wait()
  74. }
  75. // Tag implements core.OutboundHandler.
  76. func (co *Outbound) Tag() string {
  77. return co.tag
  78. }
  79. // Start implements common.Runnable.
  80. func (co *Outbound) Start() error {
  81. co.access.Lock()
  82. co.closed = false
  83. co.access.Unlock()
  84. return nil
  85. }
  86. // Close implements common.Closable.
  87. func (co *Outbound) Close() error {
  88. co.access.Lock()
  89. defer co.access.Unlock()
  90. co.closed = true
  91. return co.listener.Close()
  92. }