pipe.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package pipe
  2. import (
  3. "context"
  4. "v2ray.com/core"
  5. "v2ray.com/core/common/signal"
  6. )
  7. // Option for creating new Pipes.
  8. type Option func(*pipe)
  9. func WithoutSizeLimit() Option {
  10. return func(p *pipe) {
  11. p.limit = -1
  12. }
  13. }
  14. func WithSizeLimit(limit int32) Option {
  15. return func(p *pipe) {
  16. p.limit = limit
  17. }
  18. }
  19. func OptionsFromContext(ctx context.Context) []Option {
  20. var opt []Option
  21. bp := core.BufferPolicyFromContext(ctx)
  22. if bp.PerConnection >= 0 {
  23. opt = append(opt, WithSizeLimit(bp.PerConnection))
  24. } else {
  25. opt = append(opt, WithoutSizeLimit())
  26. }
  27. return opt
  28. }
  29. // New creates a new Reader and Writer that connects to each other.
  30. func New(opts ...Option) (*Reader, *Writer) {
  31. p := &pipe{
  32. limit: -1,
  33. readSignal: signal.NewNotifier(),
  34. writeSignal: signal.NewNotifier(),
  35. }
  36. for _, opt := range opts {
  37. opt(p)
  38. }
  39. return &Reader{
  40. pipe: p,
  41. }, &Writer{
  42. pipe: p,
  43. }
  44. }
  45. type closeError interface {
  46. CloseError()
  47. }
  48. // CloseError invokes CloseError() method if the object is either Reader or Writer.
  49. func CloseError(v interface{}) {
  50. if c, ok := v.(closeError); ok {
  51. c.CloseError()
  52. }
  53. }