dispatcher_test.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package udp_test
  2. import (
  3. "context"
  4. "sync/atomic"
  5. "testing"
  6. "time"
  7. "v2ray.com/core/common/buf"
  8. v2net "v2ray.com/core/common/net"
  9. "v2ray.com/core/testing/assert"
  10. . "v2ray.com/core/transport/internet/udp"
  11. "v2ray.com/core/transport/ray"
  12. )
  13. type TestDispatcher struct {
  14. OnDispatch func(ctx context.Context, dest v2net.Destination) (ray.InboundRay, error)
  15. }
  16. func (d *TestDispatcher) Dispatch(ctx context.Context, dest v2net.Destination) (ray.InboundRay, error) {
  17. return d.OnDispatch(ctx, dest)
  18. }
  19. func TestSameDestinationDispatching(t *testing.T) {
  20. assert := assert.On(t)
  21. ctx, cancel := context.WithCancel(context.Background())
  22. link := ray.NewRay(ctx)
  23. go func() {
  24. for {
  25. data, err := link.OutboundInput().Read()
  26. if err != nil {
  27. break
  28. }
  29. err = link.OutboundOutput().Write(data)
  30. assert.Error(err).IsNil()
  31. }
  32. }()
  33. var count uint32
  34. td := &TestDispatcher{
  35. OnDispatch: func(ctx context.Context, dest v2net.Destination) (ray.InboundRay, error) {
  36. atomic.AddUint32(&count, 1)
  37. return link, nil
  38. },
  39. }
  40. dest := v2net.UDPDestination(v2net.LocalHostIP, 53)
  41. b := buf.New()
  42. b.AppendBytes('a', 'b', 'c', 'd')
  43. dispatcher := NewDispatcher(td)
  44. var msgCount uint32
  45. dispatcher.Dispatch(ctx, dest, b, func(payload *buf.Buffer) {
  46. atomic.AddUint32(&msgCount, 1)
  47. })
  48. for i := 0; i < 5; i++ {
  49. dispatcher.Dispatch(ctx, dest, b, func(payload *buf.Buffer) {})
  50. }
  51. time.Sleep(time.Second)
  52. cancel()
  53. assert.Uint32(count).Equals(1)
  54. assert.Uint32(msgCount).Equals(6)
  55. }