| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 | package wsimport (	"bufio"	"io"	"net"	"sync"	"time"	"github.com/gorilla/websocket"	"v2ray.com/core/common/errors"	"v2ray.com/core/common/log")type wsconn struct {	wsc         *websocket.Conn	readBuffer  *bufio.Reader	connClosing bool	reusable    bool	rlock       *sync.Mutex	wlock       *sync.Mutex	config      *Config}func (ws *wsconn) Read(b []byte) (n int, err error) {	ws.rlock.Lock()	n, err = ws.read(b)	ws.rlock.Unlock()	return n, err}func (ws *wsconn) read(b []byte) (n int, err error) {	if ws.connClosing {		return 0, io.EOF	}	n, err = ws.readNext(b)	return n, err}func (ws *wsconn) getNewReadBuffer() error {	_, r, err := ws.wsc.NextReader()	if err != nil {		log.Warning("WS transport: ws connection NewFrameReader return ", err)		ws.connClosing = true		ws.Close()		return err	}	ws.readBuffer = bufio.NewReader(r)	return nil}func (ws *wsconn) readNext(b []byte) (n int, err error) {	if ws.readBuffer == nil {		err = ws.getNewReadBuffer()		if err != nil {			return 0, err		}	}	n, err = ws.readBuffer.Read(b)	if err == nil {		return n, err	}	if errors.Cause(err) == io.EOF {		ws.readBuffer = nil		if n == 0 {			return ws.readNext(b)		}		return n, nil	}	return n, err}func (ws *wsconn) Write(b []byte) (n int, err error) {	ws.wlock.Lock()	if ws.connClosing {		return 0, io.ErrClosedPipe	}	n, err = ws.write(b)	ws.wlock.Unlock()	return n, err}func (ws *wsconn) write(b []byte) (n int, err error) {	wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)	if err != nil {		log.Warning("WS transport: ws connection NewFrameReader return ", err)		ws.connClosing = true		ws.Close()		return 0, err	}	n, err = wr.Write(b)	if err != nil {		return 0, err	}	err = wr.Close()	if err != nil {		return 0, err	}	return n, err}func (ws *wsconn) Close() error {	ws.connClosing = true	ws.wlock.Lock()	ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))	ws.wlock.Unlock()	err := ws.wsc.Close()	return err}func (ws *wsconn) LocalAddr() net.Addr {	return ws.wsc.LocalAddr()}func (ws *wsconn) RemoteAddr() net.Addr {	return ws.wsc.RemoteAddr()}func (ws *wsconn) SetDeadline(t time.Time) error {	return func() error {		errr := ws.SetReadDeadline(t)		errw := ws.SetWriteDeadline(t)		if errr == nil || errw == nil {			return nil		}		if errr != nil {			return errr		}		return errw	}()}func (ws *wsconn) SetReadDeadline(t time.Time) error {	return ws.wsc.SetReadDeadline(t)}func (ws *wsconn) SetWriteDeadline(t time.Time) error {	return ws.wsc.SetWriteDeadline(t)}func (ws *wsconn) setup() {	ws.connClosing = false	/*		https://godoc.org/github.com/gorilla/websocket#Conn.NextReader		https://godoc.org/github.com/gorilla/websocket#Conn.NextWriter		Both Read and write access are both exclusive.		And in both case it will need a lock.	*/	ws.rlock = &sync.Mutex{}	ws.wlock = &sync.Mutex{}	ws.pingPong()}func (ws *wsconn) Reusable() bool {	return ws.reusable && !ws.connClosing}func (ws *wsconn) SetReusable(reusable bool) {	if !ws.config.ConnectionReuse.IsEnabled() {		return	}	ws.reusable = reusable}func (ws *wsconn) pingPong() {	pongRcv := make(chan int, 0)	ws.wsc.SetPongHandler(func(data string) error {		pongRcv <- 0		return nil	})	go func() {		for !ws.connClosing {			ws.wlock.Lock()			ws.wsc.WriteMessage(websocket.PingMessage, nil)			ws.wlock.Unlock()			tick := time.After(time.Second * 3)			select {			case <-pongRcv:			case <-tick:				if !ws.connClosing {					log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())				}				ws.Close()			}			<-time.After(time.Second * 27)		}		return	}()}
 |