outbound.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package commander
  2. import (
  3. "context"
  4. "net"
  5. "sync"
  6. "v2ray.com/core/common/signal"
  7. "v2ray.com/core/transport/ray"
  8. )
  9. type OutboundListener struct {
  10. buffer chan net.Conn
  11. }
  12. func (l *OutboundListener) add(conn net.Conn) {
  13. select {
  14. case l.buffer <- conn:
  15. default:
  16. conn.Close()
  17. }
  18. }
  19. func (l *OutboundListener) Accept() (net.Conn, error) {
  20. c, open := <-l.buffer
  21. if !open {
  22. return nil, newError("listener closed")
  23. }
  24. return c, nil
  25. }
  26. func (l *OutboundListener) Close() error {
  27. close(l.buffer)
  28. return nil
  29. }
  30. func (l *OutboundListener) Addr() net.Addr {
  31. return &net.TCPAddr{
  32. IP: net.IP{0, 0, 0, 0},
  33. Port: 0,
  34. }
  35. }
  36. type CommanderOutbound struct {
  37. tag string
  38. listener *OutboundListener
  39. access sync.RWMutex
  40. closed bool
  41. }
  42. func (co *CommanderOutbound) Dispatch(ctx context.Context, r ray.OutboundRay) {
  43. co.access.RLock()
  44. if co.closed {
  45. r.OutboundInput().CloseError()
  46. r.OutboundOutput().CloseError()
  47. co.access.RUnlock()
  48. return
  49. }
  50. closeSignal := signal.NewNotifier()
  51. c := ray.NewConnection(r.OutboundInput(), r.OutboundOutput(), ray.ConnCloseSignal(closeSignal))
  52. co.listener.add(c)
  53. co.access.RUnlock()
  54. <-closeSignal.Wait()
  55. return
  56. }
  57. func (co *CommanderOutbound) Tag() string {
  58. return co.tag
  59. }
  60. func (co *CommanderOutbound) Start() error {
  61. co.access.Lock()
  62. co.closed = false
  63. co.access.Unlock()
  64. return nil
  65. }
  66. func (co *CommanderOutbound) Close() error {
  67. co.access.Lock()
  68. co.closed = true
  69. co.listener.Close()
  70. co.access.Unlock()
  71. return nil
  72. }