pipe.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package pipe
  2. import (
  3. "context"
  4. "v2ray.com/core"
  5. "v2ray.com/core/common/signal"
  6. )
  7. type Option func(*pipe)
  8. func WithoutSizeLimit() Option {
  9. return func(p *pipe) {
  10. p.limit = -1
  11. }
  12. }
  13. func WithSizeLimit(limit int32) Option {
  14. return func(p *pipe) {
  15. p.limit = limit
  16. }
  17. }
  18. func OptionsFromContext(ctx context.Context) []Option {
  19. var opt []Option
  20. bp := core.BufferPolicyFromContext(ctx)
  21. if bp.PerConnection >= 0 {
  22. opt = append(opt, WithSizeLimit(bp.PerConnection))
  23. } else {
  24. opt = append(opt, WithoutSizeLimit())
  25. }
  26. return opt
  27. }
  28. // New creates a new Reader and Writer that connects to each other.
  29. func New(opts ...Option) (*Reader, *Writer) {
  30. p := &pipe{
  31. limit: -1,
  32. readSignal: signal.NewNotifier(),
  33. writeSignal: signal.NewNotifier(),
  34. }
  35. for _, opt := range opts {
  36. opt(p)
  37. }
  38. return &Reader{
  39. pipe: p,
  40. }, &Writer{
  41. pipe: p,
  42. }
  43. }
  44. type closeError interface {
  45. CloseError()
  46. }
  47. // CloseError invokes CloseError() method if the object is either Reader or Writer.
  48. func CloseError(v interface{}) {
  49. if c, ok := v.(closeError); ok {
  50. c.CloseError()
  51. }
  52. }