hub.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package udp
  2. import (
  3. "v2ray.com/core/common/buf"
  4. "v2ray.com/core/common/net"
  5. )
  6. // Payload represents a single UDP payload.
  7. type Payload struct {
  8. payload *buf.Buffer
  9. source net.Destination
  10. originalDest net.Destination
  11. }
  12. // PayloadHandler is function to handle Payload.
  13. type PayloadHandler func(payload *buf.Buffer, source net.Destination, originalDest net.Destination)
  14. type HubOption func(h *Hub)
  15. func HubCapacity(cap int) HubOption {
  16. return func(h *Hub) {
  17. h.capacity = cap
  18. }
  19. }
  20. func HubReceiveOriginalDestination(r bool) HubOption {
  21. return func(h *Hub) {
  22. h.recvOrigDest = r
  23. }
  24. }
  25. type Hub struct {
  26. conn *net.UDPConn
  27. callback PayloadHandler
  28. capacity int
  29. recvOrigDest bool
  30. }
  31. func ListenUDP(address net.Address, port net.Port, callback PayloadHandler, options ...HubOption) (*Hub, error) {
  32. udpConn, err := net.ListenUDP("udp", &net.UDPAddr{
  33. IP: address.IP(),
  34. Port: int(port),
  35. })
  36. if err != nil {
  37. return nil, err
  38. }
  39. newError("listening UDP on ", address, ":", port).WriteToLog()
  40. hub := &Hub{
  41. conn: udpConn,
  42. capacity: 256,
  43. callback: callback,
  44. recvOrigDest: false,
  45. }
  46. for _, opt := range options {
  47. opt(hub)
  48. }
  49. if hub.recvOrigDest {
  50. rawConn, err := udpConn.SyscallConn()
  51. if err != nil {
  52. return nil, newError("failed to get fd").Base(err)
  53. }
  54. err = rawConn.Control(func(fd uintptr) {
  55. if err := SetOriginalDestOptions(int(fd)); err != nil {
  56. newError("failed to set socket options").Base(err).WriteToLog()
  57. }
  58. })
  59. if err != nil {
  60. return nil, newError("failed to control socket").Base(err)
  61. }
  62. }
  63. c := make(chan *Payload, hub.capacity)
  64. go hub.start(c)
  65. go hub.process(c)
  66. return hub, nil
  67. }
  68. // Close implements net.Listener.
  69. func (h *Hub) Close() error {
  70. h.conn.Close()
  71. return nil
  72. }
  73. func (h *Hub) WriteTo(payload []byte, dest net.Destination) (int, error) {
  74. return h.conn.WriteToUDP(payload, &net.UDPAddr{
  75. IP: dest.Address.IP(),
  76. Port: int(dest.Port),
  77. })
  78. }
  79. func (h *Hub) process(c <-chan *Payload) {
  80. for p := range c {
  81. h.callback(p.payload, p.source, p.originalDest)
  82. }
  83. }
  84. func (h *Hub) start(c chan<- *Payload) {
  85. defer close(c)
  86. oobBytes := make([]byte, 256)
  87. for {
  88. buffer := buf.New()
  89. var noob int
  90. var addr *net.UDPAddr
  91. err := buffer.AppendSupplier(func(b []byte) (int, error) {
  92. n, nb, _, a, e := ReadUDPMsg(h.conn, b, oobBytes)
  93. noob = nb
  94. addr = a
  95. return n, e
  96. })
  97. if err != nil {
  98. newError("failed to read UDP msg").Base(err).WriteToLog()
  99. buffer.Release()
  100. break
  101. }
  102. payload := &Payload{
  103. payload: buffer,
  104. }
  105. payload.source = net.UDPDestination(net.IPAddress(addr.IP), net.Port(addr.Port))
  106. if h.recvOrigDest && noob > 0 {
  107. payload.originalDest = RetrieveOriginalDest(oobBytes[:noob])
  108. if payload.originalDest.IsValid() {
  109. newError("UDP original destination: ", payload.originalDest).AtDebug().WriteToLog()
  110. } else {
  111. newError("failed to read UDP original destination").WriteToLog()
  112. }
  113. }
  114. select {
  115. case c <- payload:
  116. default:
  117. }
  118. }
  119. }
  120. // Addr implements net.Listener.
  121. func (h *Hub) Addr() net.Addr {
  122. return h.conn.LocalAddr()
  123. }