hub.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package udp
  2. import (
  3. "v2ray.com/core/common/buf"
  4. "v2ray.com/core/common/net"
  5. )
  6. // Payload represents a single UDP payload.
  7. type Payload struct {
  8. Content *buf.Buffer
  9. Source net.Destination
  10. OriginalDestination net.Destination
  11. }
  12. type HubOption func(h *Hub)
  13. func HubCapacity(cap int) HubOption {
  14. return func(h *Hub) {
  15. h.capacity = cap
  16. }
  17. }
  18. func HubReceiveOriginalDestination(r bool) HubOption {
  19. return func(h *Hub) {
  20. h.recvOrigDest = r
  21. }
  22. }
  23. type Hub struct {
  24. conn *net.UDPConn
  25. cache chan *Payload
  26. capacity int
  27. recvOrigDest bool
  28. }
  29. func ListenUDP(address net.Address, port net.Port, options ...HubOption) (*Hub, error) {
  30. udpConn, err := net.ListenUDP("udp", &net.UDPAddr{
  31. IP: address.IP(),
  32. Port: int(port),
  33. })
  34. if err != nil {
  35. return nil, err
  36. }
  37. newError("listening UDP on ", address, ":", port).WriteToLog()
  38. hub := &Hub{
  39. conn: udpConn,
  40. capacity: 256,
  41. recvOrigDest: false,
  42. }
  43. for _, opt := range options {
  44. opt(hub)
  45. }
  46. hub.cache = make(chan *Payload, hub.capacity)
  47. if hub.recvOrigDest {
  48. rawConn, err := udpConn.SyscallConn()
  49. if err != nil {
  50. return nil, newError("failed to get fd").Base(err)
  51. }
  52. err = rawConn.Control(func(fd uintptr) {
  53. if err := SetOriginalDestOptions(int(fd)); err != nil {
  54. newError("failed to set socket options").Base(err).WriteToLog()
  55. }
  56. })
  57. if err != nil {
  58. return nil, newError("failed to control socket").Base(err)
  59. }
  60. }
  61. go hub.start()
  62. return hub, nil
  63. }
  64. // Close implements net.Listener.
  65. func (h *Hub) Close() error {
  66. h.conn.Close()
  67. return nil
  68. }
  69. func (h *Hub) WriteTo(payload []byte, dest net.Destination) (int, error) {
  70. return h.conn.WriteToUDP(payload, &net.UDPAddr{
  71. IP: dest.Address.IP(),
  72. Port: int(dest.Port),
  73. })
  74. }
  75. func (h *Hub) start() {
  76. c := h.cache
  77. defer close(c)
  78. oobBytes := make([]byte, 256)
  79. for {
  80. buffer := buf.New()
  81. var noob int
  82. var addr *net.UDPAddr
  83. err := buffer.Reset(func(b []byte) (int, error) {
  84. n, nb, _, a, e := ReadUDPMsg(h.conn, b, oobBytes)
  85. noob = nb
  86. addr = a
  87. return n, e
  88. })
  89. if err != nil {
  90. newError("failed to read UDP msg").Base(err).WriteToLog()
  91. buffer.Release()
  92. break
  93. }
  94. if buffer.IsEmpty() {
  95. buffer.Release()
  96. continue
  97. }
  98. payload := &Payload{
  99. Content: buffer,
  100. Source: net.UDPDestination(net.IPAddress(addr.IP), net.Port(addr.Port)),
  101. }
  102. if h.recvOrigDest && noob > 0 {
  103. payload.OriginalDestination = RetrieveOriginalDest(oobBytes[:noob])
  104. if payload.OriginalDestination.IsValid() {
  105. newError("UDP original destination: ", payload.OriginalDestination).AtDebug().WriteToLog()
  106. } else {
  107. newError("failed to read UDP original destination").WriteToLog()
  108. }
  109. }
  110. select {
  111. case c <- payload:
  112. default:
  113. buffer.Release()
  114. payload.Content = nil
  115. }
  116. }
  117. }
  118. // Addr implements net.Listener.
  119. func (h *Hub) Addr() net.Addr {
  120. return h.conn.LocalAddr()
  121. }
  122. func (h *Hub) Receive() <-chan *Payload {
  123. return h.cache
  124. }