pipe.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package pipe
  2. import (
  3. "context"
  4. "v2ray.com/core/common/signal"
  5. "v2ray.com/core/common/signal/done"
  6. "v2ray.com/core/features/policy"
  7. )
  8. // Option for creating new Pipes.
  9. type Option func(*pipe)
  10. // WithoutSizeLimit returns an Option for Pipe to have no size limit.
  11. func WithoutSizeLimit() Option {
  12. return func(p *pipe) {
  13. p.limit = -1
  14. }
  15. }
  16. // WithSizeLimit returns an Option for Pipe to have the given size limit.
  17. func WithSizeLimit(limit int32) Option {
  18. return func(p *pipe) {
  19. p.limit = limit
  20. }
  21. }
  22. // DiscardOverflow returns an Option for Pipe to discard writes if full.
  23. func DiscardOverflow() Option {
  24. return func(p *pipe) {
  25. p.discardOverflow = true
  26. }
  27. }
  28. // OptionsFromContext returns a list of Options from context.
  29. func OptionsFromContext(ctx context.Context) []Option {
  30. var opt []Option
  31. bp := policy.BufferPolicyFromContext(ctx)
  32. if bp.PerConnection >= 0 {
  33. opt = append(opt, WithSizeLimit(bp.PerConnection))
  34. } else {
  35. opt = append(opt, WithoutSizeLimit())
  36. }
  37. return opt
  38. }
  39. // New creates a new Reader and Writer that connects to each other.
  40. func New(opts ...Option) (*Reader, *Writer) {
  41. p := &pipe{
  42. limit: -1,
  43. readSignal: signal.NewNotifier(),
  44. writeSignal: signal.NewNotifier(),
  45. done: done.New(),
  46. }
  47. for _, opt := range opts {
  48. opt(p)
  49. }
  50. return &Reader{
  51. pipe: p,
  52. }, &Writer{
  53. pipe: p,
  54. }
  55. }
  56. type closeError interface {
  57. CloseError()
  58. }
  59. // CloseError invokes CloseError() method if the object is either Reader or Writer.
  60. func CloseError(v interface{}) {
  61. if c, ok := v.(closeError); ok {
  62. c.CloseError()
  63. }
  64. }