pipe.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package pipe
  2. import (
  3. "context"
  4. "v2ray.com/core"
  5. "v2ray.com/core/common/signal"
  6. "v2ray.com/core/common/signal/done"
  7. )
  8. // Option for creating new Pipes.
  9. type Option func(*pipe)
  10. func WithoutSizeLimit() Option {
  11. return func(p *pipe) {
  12. p.limit = -1
  13. }
  14. }
  15. func WithSizeLimit(limit int32) Option {
  16. return func(p *pipe) {
  17. p.limit = limit
  18. }
  19. }
  20. func OptionsFromContext(ctx context.Context) []Option {
  21. var opt []Option
  22. bp := core.BufferPolicyFromContext(ctx)
  23. if bp.PerConnection >= 0 {
  24. opt = append(opt, WithSizeLimit(bp.PerConnection))
  25. } else {
  26. opt = append(opt, WithoutSizeLimit())
  27. }
  28. return opt
  29. }
  30. // New creates a new Reader and Writer that connects to each other.
  31. func New(opts ...Option) (*Reader, *Writer) {
  32. p := &pipe{
  33. limit: -1,
  34. readSignal: signal.NewNotifier(),
  35. writeSignal: signal.NewNotifier(),
  36. done: done.New(),
  37. }
  38. for _, opt := range opts {
  39. opt(p)
  40. }
  41. return &Reader{
  42. pipe: p,
  43. }, &Writer{
  44. pipe: p,
  45. }
  46. }
  47. type closeError interface {
  48. CloseError()
  49. }
  50. // CloseError invokes CloseError() method if the object is either Reader or Writer.
  51. func CloseError(v interface{}) {
  52. if c, ok := v.(closeError); ok {
  53. c.CloseError()
  54. }
  55. }