io.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package dns
  2. import (
  3. "sync"
  4. "golang.org/x/net/dns/dnsmessage"
  5. "v2ray.com/core/common"
  6. "v2ray.com/core/common/buf"
  7. )
  8. func PackMessage(msg *dnsmessage.Message) (*buf.Buffer, error) {
  9. buffer := buf.New()
  10. rawBytes := buffer.Extend(buf.Size)
  11. packed, err := msg.AppendPack(rawBytes[:0])
  12. if err != nil {
  13. buffer.Release()
  14. return nil, err
  15. }
  16. buffer.Resize(0, int32(len(packed)))
  17. return buffer, nil
  18. }
  19. type MessageReader interface {
  20. ReadMessage() (*buf.Buffer, error)
  21. }
  22. type UDPReader struct {
  23. buf.Reader
  24. access sync.Mutex
  25. cache buf.MultiBuffer
  26. }
  27. func (r *UDPReader) readCache() *buf.Buffer {
  28. r.access.Lock()
  29. defer r.access.Unlock()
  30. mb, b := buf.SplitFirst(r.cache)
  31. r.cache = mb
  32. return b
  33. }
  34. func (r *UDPReader) refill() error {
  35. mb, err := r.Reader.ReadMultiBuffer()
  36. if err != nil {
  37. return err
  38. }
  39. r.access.Lock()
  40. r.cache = mb
  41. r.access.Unlock()
  42. return nil
  43. }
  44. // ReadMessage implements MessageReader.
  45. func (r *UDPReader) ReadMessage() (*buf.Buffer, error) {
  46. for {
  47. b := r.readCache()
  48. if b != nil {
  49. return b, nil
  50. }
  51. if err := r.refill(); err != nil {
  52. return nil, err
  53. }
  54. }
  55. }
  56. // Close implements common.Closable.
  57. func (r *UDPReader) Close() error {
  58. defer func() {
  59. r.access.Lock()
  60. buf.ReleaseMulti(r.cache)
  61. r.cache = nil
  62. r.access.Unlock()
  63. }()
  64. return common.Close(r.Reader)
  65. }