tcp_hub.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package internet
  2. import (
  3. "context"
  4. "net"
  5. "time"
  6. v2net "v2ray.com/core/common/net"
  7. )
  8. var (
  9. transportListenerCache = make(map[TransportProtocol]ListenFunc)
  10. )
  11. func RegisterTransportListener(protocol TransportProtocol, listener ListenFunc) error {
  12. if _, found := transportListenerCache[protocol]; found {
  13. return newError(protocol, " listener already registered.").AtError()
  14. }
  15. transportListenerCache[protocol] = listener
  16. return nil
  17. }
  18. type AddConnection func(context.Context, Connection) bool
  19. type ListenFunc func(ctx context.Context, address v2net.Address, port v2net.Port, addConn AddConnection) (Listener, error)
  20. type Listener interface {
  21. Close() error
  22. Addr() net.Addr
  23. }
  24. func ListenTCP(ctx context.Context, address v2net.Address, port v2net.Port, conns chan<- Connection) (Listener, error) {
  25. settings := StreamSettingsFromContext(ctx)
  26. protocol := settings.GetEffectiveProtocol()
  27. transportSettings, err := settings.GetEffectiveTransportSettings()
  28. if err != nil {
  29. return nil, err
  30. }
  31. ctx = ContextWithTransportSettings(ctx, transportSettings)
  32. if settings != nil && settings.HasSecuritySettings() {
  33. securitySettings, err := settings.GetEffectiveSecuritySettings()
  34. if err != nil {
  35. return nil, err
  36. }
  37. ctx = ContextWithSecuritySettings(ctx, securitySettings)
  38. }
  39. listenFunc := transportListenerCache[protocol]
  40. if listenFunc == nil {
  41. return nil, newError(protocol, " listener not registered.").AtError()
  42. }
  43. listener, err := listenFunc(ctx, address, port, func(ctx context.Context, conn Connection) bool {
  44. select {
  45. case <-ctx.Done():
  46. conn.Close()
  47. return false
  48. case conns <- conn:
  49. return true
  50. default:
  51. select {
  52. case <-ctx.Done():
  53. conn.Close()
  54. return false
  55. case conns <- conn:
  56. return true
  57. case <-time.After(time.Second * 5):
  58. conn.Close()
  59. return false
  60. }
  61. }
  62. })
  63. if err != nil {
  64. return nil, newError("failed to listen on address: ", address, ":", port).Base(err)
  65. }
  66. return listener, nil
  67. }