| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 | 
							- package outbound
 
- import (
 
- 	"sync"
 
- 	"github.com/v2ray/v2ray-core/app"
 
- 	"github.com/v2ray/v2ray-core/common/alloc"
 
- 	v2io "github.com/v2ray/v2ray-core/common/io"
 
- 	"github.com/v2ray/v2ray-core/common/log"
 
- 	v2net "github.com/v2ray/v2ray-core/common/net"
 
- 	"github.com/v2ray/v2ray-core/common/protocol"
 
- 	"github.com/v2ray/v2ray-core/common/protocol/raw"
 
- 	"github.com/v2ray/v2ray-core/proxy"
 
- 	"github.com/v2ray/v2ray-core/proxy/internal"
 
- 	vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io"
 
- 	"github.com/v2ray/v2ray-core/transport/hub"
 
- 	"github.com/v2ray/v2ray-core/transport/ray"
 
- )
 
- type VMessOutboundHandler struct {
 
- 	receiverManager *ReceiverManager
 
- }
 
- func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error {
 
- 	defer ray.OutboundInput().Release()
 
- 	defer ray.OutboundOutput().Close()
 
- 	destination, vNextUser := this.receiverManager.PickReceiver()
 
- 	command := protocol.RequestCommandTCP
 
- 	if target.IsUDP() {
 
- 		command = protocol.RequestCommandUDP
 
- 	}
 
- 	request := &protocol.RequestHeader{
 
- 		Version: raw.Version,
 
- 		User:    vNextUser,
 
- 		Command: command,
 
- 		Address: target.Address(),
 
- 		Port:    target.Port(),
 
- 		Option:  protocol.RequestOptionChunkStream,
 
- 	}
 
- 	conn, err := hub.Dial(destination)
 
- 	if err != nil {
 
- 		log.Error("Failed to open ", destination, ": ", err)
 
- 		return err
 
- 	}
 
- 	log.Info("VMessOut: Tunneling request to ", request.Address, " via ", destination)
 
- 	defer conn.Close()
 
- 	if request.Option.IsChunkStream() {
 
- 		conn.SetReusable(true)
 
- 	}
 
- 	input := ray.OutboundInput()
 
- 	output := ray.OutboundOutput()
 
- 	var requestFinish, responseFinish sync.Mutex
 
- 	requestFinish.Lock()
 
- 	responseFinish.Lock()
 
- 	session := raw.NewClientSession(protocol.DefaultIDHash)
 
- 	go this.handleRequest(session, conn, request, payload, input, &requestFinish)
 
- 	go this.handleResponse(session, conn, request, destination, output, &responseFinish)
 
- 	requestFinish.Lock()
 
- 	responseFinish.Lock()
 
- 	return nil
 
- }
 
- func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn *hub.Connection, request *protocol.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) {
 
- 	defer finish.Unlock()
 
- 	writer := v2io.NewBufferedWriter(conn)
 
- 	defer writer.Release()
 
- 	session.EncodeRequestHeader(request, writer)
 
- 	bodyWriter := session.EncodeRequestBody(writer)
 
- 	var streamWriter v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
 
- 	if request.Option.IsChunkStream() {
 
- 		streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
 
- 	}
 
- 	streamWriter.Write(payload)
 
- 	writer.SetCached(false)
 
- 	err := v2io.Pipe(input, streamWriter)
 
- 	if err != vmessio.ErrorStreamCompleted {
 
- 		conn.SetReusable(false)
 
- 	}
 
- 	if request.Option.IsChunkStream() {
 
- 		streamWriter.Write(alloc.NewSmallBuffer().Clear())
 
- 	}
 
- 	streamWriter.Release()
 
- 	return
 
- }
 
- func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn *hub.Connection, request *protocol.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) {
 
- 	defer finish.Unlock()
 
- 	reader := v2io.NewBufferedReader(conn)
 
- 	defer reader.Release()
 
- 	header, err := session.DecodeResponseHeader(reader)
 
- 	if err != nil {
 
- 		log.Warning("VMessOut: Failed to read response: ", err)
 
- 		return
 
- 	}
 
- 	go this.handleCommand(dest, header.Command)
 
- 	reader.SetCached(false)
 
- 	decryptReader := session.DecodeResponseBody(reader)
 
- 	var bodyReader v2io.Reader
 
- 	if request.Option.IsChunkStream() {
 
- 		bodyReader = vmessio.NewAuthChunkReader(decryptReader)
 
- 	} else {
 
- 		bodyReader = v2io.NewAdaptiveReader(decryptReader)
 
- 	}
 
- 	err = v2io.Pipe(bodyReader, output)
 
- 	if err != vmessio.ErrorStreamCompleted {
 
- 		conn.SetReusable(false)
 
- 	}
 
- 	bodyReader.Release()
 
- 	return
 
- }
 
- func init() {
 
- 	internal.MustRegisterOutboundHandlerCreator("vmess",
 
- 		func(space app.Space, rawConfig interface{}) (proxy.OutboundHandler, error) {
 
- 			vOutConfig := rawConfig.(*Config)
 
- 			return &VMessOutboundHandler{
 
- 				receiverManager: NewReceiverManager(vOutConfig.Receivers),
 
- 			}, nil
 
- 		})
 
- }
 
 
  |