| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- package commander
- import (
- "context"
- "sync"
- "v2ray.com/core"
- "v2ray.com/core/common"
- "v2ray.com/core/common/net"
- "v2ray.com/core/common/signal"
- "v2ray.com/core/transport/pipe"
- )
- // OutboundListener is a net.Listener for listening gRPC connections.
- type OutboundListener struct {
- buffer chan net.Conn
- done *signal.Done
- }
- func (l *OutboundListener) add(conn net.Conn) {
- select {
- case l.buffer <- conn:
- case <-l.done.Wait():
- common.Ignore(conn.Close(), "We can do nothing if Close() returns error.")
- default:
- common.Ignore(conn.Close(), "We can do nothing if Close() returns error.")
- }
- }
- // Accept implements net.Listener.
- func (l *OutboundListener) Accept() (net.Conn, error) {
- select {
- case <-l.done.Wait():
- return nil, newError("listen closed")
- case c := <-l.buffer:
- return c, nil
- }
- }
- // Close implement net.Listener.
- func (l *OutboundListener) Close() error {
- common.Must(l.done.Close())
- L:
- for {
- select {
- case c := <-l.buffer:
- common.Ignore(c.Close(), "We can do nothing if errored.")
- default:
- break L
- }
- }
- return nil
- }
- // Addr implements net.Listener.
- func (l *OutboundListener) Addr() net.Addr {
- return &net.TCPAddr{
- IP: net.IP{0, 0, 0, 0},
- Port: 0,
- }
- }
- // Outbound is a core.OutboundHandler that handles gRPC connections.
- type Outbound struct {
- tag string
- listener *OutboundListener
- access sync.RWMutex
- closed bool
- }
- // Dispatch implements core.OutboundHandler.
- func (co *Outbound) Dispatch(ctx context.Context, link *core.Link) {
- co.access.RLock()
- if co.closed {
- pipe.CloseError(link.Reader)
- pipe.CloseError(link.Writer)
- co.access.RUnlock()
- return
- }
- closeSignal := signal.NewNotifier()
- c := net.NewConnection(net.ConnectionInputMulti(link.Writer), net.ConnectionOutputMulti(link.Reader), net.ConnectionOnClose(closeSignal))
- co.listener.add(c)
- co.access.RUnlock()
- <-closeSignal.Wait()
- }
- // Tag implements core.OutboundHandler.
- func (co *Outbound) Tag() string {
- return co.tag
- }
- // Start implements common.Runnable.
- func (co *Outbound) Start() error {
- co.access.Lock()
- co.closed = false
- co.access.Unlock()
- return nil
- }
- // Close implements common.Closable.
- func (co *Outbound) Close() error {
- co.access.Lock()
- defer co.access.Unlock()
- co.closed = true
- return co.listener.Close()
- }
|