hub.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package udp
  2. import (
  3. "net"
  4. "sync"
  5. "v2ray.com/core/common/alloc"
  6. "v2ray.com/core/common/log"
  7. v2net "v2ray.com/core/common/net"
  8. "v2ray.com/core/proxy"
  9. "v2ray.com/core/transport/internet/internal"
  10. )
  11. type UDPPayloadHandler func(*alloc.Buffer, *proxy.SessionInfo)
  12. type UDPHub struct {
  13. sync.RWMutex
  14. conn *net.UDPConn
  15. option ListenOption
  16. accepting bool
  17. pool *alloc.BufferPool
  18. }
  19. type ListenOption struct {
  20. Callback UDPPayloadHandler
  21. ReceiveOriginalDest bool
  22. }
  23. func ListenUDP(address v2net.Address, port v2net.Port, option ListenOption) (*UDPHub, error) {
  24. udpConn, err := net.ListenUDP("udp", &net.UDPAddr{
  25. IP: address.IP(),
  26. Port: int(port),
  27. })
  28. if err != nil {
  29. return nil, err
  30. }
  31. if option.ReceiveOriginalDest {
  32. fd, err := internal.GetSysFd(udpConn)
  33. if err != nil {
  34. log.Warning("UDP|Listener: Failed to get fd: ", err)
  35. return nil, err
  36. }
  37. err = SetOriginalDestOptions(fd)
  38. if err != nil {
  39. log.Warning("UDP|Listener: Failed to set socket options: ", err)
  40. return nil, err
  41. }
  42. }
  43. hub := &UDPHub{
  44. conn: udpConn,
  45. option: option,
  46. pool: alloc.NewBufferPool(2048, 64),
  47. }
  48. go hub.start()
  49. return hub, nil
  50. }
  51. func (this *UDPHub) Close() {
  52. this.Lock()
  53. defer this.Unlock()
  54. this.accepting = false
  55. this.conn.Close()
  56. }
  57. func (this *UDPHub) WriteTo(payload []byte, dest v2net.Destination) (int, error) {
  58. return this.conn.WriteToUDP(payload, &net.UDPAddr{
  59. IP: dest.Address.IP(),
  60. Port: int(dest.Port),
  61. })
  62. }
  63. func (this *UDPHub) start() {
  64. this.Lock()
  65. this.accepting = true
  66. this.Unlock()
  67. oobBytes := make([]byte, 256)
  68. for this.Running() {
  69. buffer := this.pool.Allocate()
  70. nBytes, noob, _, addr, err := ReadUDPMsg(this.conn, buffer.Value, oobBytes)
  71. if err != nil {
  72. log.Info("UDP|Hub: Failed to read UDP msg: ", err)
  73. buffer.Release()
  74. continue
  75. }
  76. buffer.Slice(0, nBytes)
  77. session := new(proxy.SessionInfo)
  78. session.Source = v2net.UDPDestination(v2net.IPAddress(addr.IP), v2net.Port(addr.Port))
  79. if this.option.ReceiveOriginalDest && noob > 0 {
  80. session.Destination = RetrieveOriginalDest(oobBytes[:noob])
  81. }
  82. go this.option.Callback(buffer, session)
  83. }
  84. }
  85. func (this *UDPHub) Running() bool {
  86. this.RLock()
  87. defer this.RUnlock()
  88. return this.accepting
  89. }
  90. // Connection return the net.Conn underneath this hub.
  91. // Private: Visible for testing only
  92. func (this *UDPHub) Connection() net.Conn {
  93. return this.conn
  94. }
  95. func (this *UDPHub) Addr() net.Addr {
  96. return this.conn.LocalAddr()
  97. }