dispatcher_test.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package udp_test
  2. import (
  3. "context"
  4. "sync/atomic"
  5. "testing"
  6. "time"
  7. "v2ray.com/core/common/buf"
  8. "v2ray.com/core/common/net"
  9. "v2ray.com/core/common/vio"
  10. "v2ray.com/core/features/routing"
  11. . "v2ray.com/core/transport/internet/udp"
  12. "v2ray.com/core/transport/pipe"
  13. . "v2ray.com/ext/assert"
  14. )
  15. type TestDispatcher struct {
  16. OnDispatch func(ctx context.Context, dest net.Destination) (*vio.Link, error)
  17. }
  18. func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) {
  19. return d.OnDispatch(ctx, dest)
  20. }
  21. func (d *TestDispatcher) Start() error {
  22. return nil
  23. }
  24. func (d *TestDispatcher) Close() error {
  25. return nil
  26. }
  27. func (*TestDispatcher) Type() interface{} {
  28. return routing.DispatcherType()
  29. }
  30. func TestSameDestinationDispatching(t *testing.T) {
  31. assert := With(t)
  32. ctx, cancel := context.WithCancel(context.Background())
  33. uplinkReader, uplinkWriter := pipe.New(pipe.WithSizeLimit(1024))
  34. downlinkReader, downlinkWriter := pipe.New(pipe.WithSizeLimit(1024))
  35. go func() {
  36. for {
  37. data, err := uplinkReader.ReadMultiBuffer()
  38. if err != nil {
  39. break
  40. }
  41. err = downlinkWriter.WriteMultiBuffer(data)
  42. assert(err, IsNil)
  43. }
  44. }()
  45. var count uint32
  46. td := &TestDispatcher{
  47. OnDispatch: func(ctx context.Context, dest net.Destination) (*vio.Link, error) {
  48. atomic.AddUint32(&count, 1)
  49. return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
  50. },
  51. }
  52. dest := net.UDPDestination(net.LocalHostIP, 53)
  53. b := buf.New()
  54. b.WriteBytes('a', 'b', 'c', 'd')
  55. var msgCount uint32
  56. dispatcher := NewDispatcher(td, func(ctx context.Context, payload *buf.Buffer) {
  57. atomic.AddUint32(&msgCount, 1)
  58. })
  59. dispatcher.Dispatch(ctx, dest, b)
  60. for i := 0; i < 5; i++ {
  61. dispatcher.Dispatch(ctx, dest, b)
  62. }
  63. time.Sleep(time.Second)
  64. cancel()
  65. assert(count, Equals, uint32(1))
  66. assert(atomic.LoadUint32(&msgCount), Equals, uint32(6))
  67. }