dispatcher_test.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package udp_test
  2. import (
  3. "context"
  4. "sync/atomic"
  5. "testing"
  6. "time"
  7. "v2ray.com/core/common/buf"
  8. "v2ray.com/core/common/net"
  9. . "v2ray.com/core/transport/internet/udp"
  10. "v2ray.com/core/transport/ray"
  11. . "v2ray.com/ext/assert"
  12. )
  13. type TestDispatcher struct {
  14. OnDispatch func(ctx context.Context, dest net.Destination) (ray.InboundRay, error)
  15. }
  16. func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) {
  17. return d.OnDispatch(ctx, dest)
  18. }
  19. func (d *TestDispatcher) Start() error {
  20. return nil
  21. }
  22. func (d *TestDispatcher) Close() error {
  23. return nil
  24. }
  25. func TestSameDestinationDispatching(t *testing.T) {
  26. assert := With(t)
  27. ctx, cancel := context.WithCancel(context.Background())
  28. link := ray.NewRay(ctx)
  29. go func() {
  30. for {
  31. data, err := link.OutboundInput().ReadMultiBuffer()
  32. if err != nil {
  33. break
  34. }
  35. err = link.OutboundOutput().WriteMultiBuffer(data)
  36. assert(err, IsNil)
  37. }
  38. }()
  39. var count uint32
  40. td := &TestDispatcher{
  41. OnDispatch: func(ctx context.Context, dest net.Destination) (ray.InboundRay, error) {
  42. atomic.AddUint32(&count, 1)
  43. return link, nil
  44. },
  45. }
  46. dest := net.UDPDestination(net.LocalHostIP, 53)
  47. b := buf.New()
  48. b.AppendBytes('a', 'b', 'c', 'd')
  49. dispatcher := NewDispatcher(td)
  50. var msgCount uint32
  51. dispatcher.Dispatch(ctx, dest, b, func(payload *buf.Buffer) {
  52. atomic.AddUint32(&msgCount, 1)
  53. })
  54. for i := 0; i < 5; i++ {
  55. dispatcher.Dispatch(ctx, dest, b, func(payload *buf.Buffer) {})
  56. }
  57. time.Sleep(time.Second)
  58. cancel()
  59. assert(count, Equals, uint32(1))
  60. assert(msgCount, Equals, uint32(6))
  61. }