hub.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package udp
  2. import (
  3. "net"
  4. "sync"
  5. "github.com/v2ray/v2ray-core/common/alloc"
  6. "github.com/v2ray/v2ray-core/common/log"
  7. v2net "github.com/v2ray/v2ray-core/common/net"
  8. "github.com/v2ray/v2ray-core/proxy"
  9. "github.com/v2ray/v2ray-core/transport/internet/internal"
  10. )
  11. type UDPPayloadHandler func(*alloc.Buffer, *proxy.SessionInfo)
  12. type UDPHub struct {
  13. sync.RWMutex
  14. conn *net.UDPConn
  15. option ListenOption
  16. accepting bool
  17. }
  18. type ListenOption struct {
  19. Callback UDPPayloadHandler
  20. ReceiveOriginalDest bool
  21. }
  22. func ListenUDP(address v2net.Address, port v2net.Port, option ListenOption) (*UDPHub, error) {
  23. udpConn, err := net.ListenUDP("udp", &net.UDPAddr{
  24. IP: address.IP(),
  25. Port: int(port),
  26. })
  27. if err != nil {
  28. return nil, err
  29. }
  30. if option.ReceiveOriginalDest {
  31. fd, err := internal.GetSysFd(udpConn)
  32. if err != nil {
  33. log.Warning("UDP|Listener: Failed to get fd: ", err)
  34. return nil, err
  35. }
  36. err = SetOriginalDestOptions(fd)
  37. if err != nil {
  38. log.Warning("UDP|Listener: Failed to set socket options: ", err)
  39. return nil, err
  40. }
  41. }
  42. hub := &UDPHub{
  43. conn: udpConn,
  44. option: option,
  45. }
  46. go hub.start()
  47. return hub, nil
  48. }
  49. func (this *UDPHub) Close() {
  50. this.Lock()
  51. defer this.Unlock()
  52. this.accepting = false
  53. this.conn.Close()
  54. }
  55. func (this *UDPHub) WriteTo(payload []byte, dest v2net.Destination) (int, error) {
  56. return this.conn.WriteToUDP(payload, &net.UDPAddr{
  57. IP: dest.Address().IP(),
  58. Port: int(dest.Port()),
  59. })
  60. }
  61. func (this *UDPHub) start() {
  62. this.Lock()
  63. this.accepting = true
  64. this.Unlock()
  65. oobBytes := make([]byte, 256)
  66. for this.Running() {
  67. buffer := alloc.NewBuffer()
  68. nBytes, noob, _, addr, err := this.conn.ReadMsgUDP(buffer.Value, oobBytes)
  69. if err != nil {
  70. buffer.Release()
  71. continue
  72. }
  73. buffer.Slice(0, nBytes)
  74. session := new(proxy.SessionInfo)
  75. session.Source = v2net.UDPDestination(v2net.IPAddress(addr.IP), v2net.Port(addr.Port))
  76. if this.option.ReceiveOriginalDest && noob > 0 {
  77. session.Destination = RetrieveOriginalDest(oobBytes[:noob])
  78. }
  79. go this.option.Callback(buffer, session)
  80. }
  81. }
  82. func (this *UDPHub) Running() bool {
  83. this.RLock()
  84. defer this.RUnlock()
  85. return this.accepting
  86. }
  87. // Connection return the net.Conn underneath this hub.
  88. // Private: Visible for testing only
  89. func (this *UDPHub) Connection() net.Conn {
  90. return this.conn
  91. }