pipe.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package pipe
  2. import (
  3. "context"
  4. "github.com/v2fly/v2ray-core/v5/common/signal"
  5. "github.com/v2fly/v2ray-core/v5/common/signal/done"
  6. "github.com/v2fly/v2ray-core/v5/features/policy"
  7. )
  8. // Option for creating new Pipes.
  9. type Option func(*pipeOption)
  10. // WithoutSizeLimit returns an Option for Pipe to have no size limit.
  11. func WithoutSizeLimit() Option {
  12. return func(opt *pipeOption) {
  13. opt.limit = -1
  14. }
  15. }
  16. // WithSizeLimit returns an Option for Pipe to have the given size limit.
  17. func WithSizeLimit(limit int32) Option {
  18. return func(opt *pipeOption) {
  19. opt.limit = limit
  20. }
  21. }
  22. // DiscardOverflow returns an Option for Pipe to discard writes if full.
  23. func DiscardOverflow() Option {
  24. return func(opt *pipeOption) {
  25. opt.discardOverflow = true
  26. }
  27. }
  28. // OptionsFromContext returns a list of Options from context.
  29. func OptionsFromContext(ctx context.Context) []Option {
  30. var opt []Option
  31. bp := policy.BufferPolicyFromContext(ctx)
  32. if bp.PerConnection >= 0 {
  33. opt = append(opt, WithSizeLimit(bp.PerConnection))
  34. } else {
  35. opt = append(opt, WithoutSizeLimit())
  36. }
  37. return opt
  38. }
  39. // New creates a new Reader and Writer that connects to each other.
  40. func New(opts ...Option) (*Reader, *Writer) {
  41. p := &pipe{
  42. readSignal: signal.NewNotifier(),
  43. writeSignal: signal.NewNotifier(),
  44. done: done.New(),
  45. option: pipeOption{
  46. limit: -1,
  47. },
  48. }
  49. for _, opt := range opts {
  50. opt(&(p.option))
  51. }
  52. return &Reader{
  53. pipe: p,
  54. }, &Writer{
  55. pipe: p,
  56. }
  57. }