| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- package ws
- import (
- "bufio"
- "fmt"
- "io"
- "net"
- "sync"
- "time"
- "github.com/v2ray/v2ray-core/common/log"
- "github.com/gorilla/websocket"
- )
- type wsconn struct {
- wsc *websocket.Conn
- readBuffer *bufio.Reader
- connClosing bool
- reusable bool
- rlock *sync.Mutex
- wlock *sync.Mutex
- }
- func (ws *wsconn) Read(b []byte) (n int, err error) {
- ws.rlock.Lock()
- if ws.connClosing {
- return 0, io.EOF
- }
- getNewBuffer := func() error {
- _, r, err := ws.wsc.NextReader()
- if err != nil {
- log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
- ws.connClosing = true
- ws.Close()
- return err
- }
- ws.readBuffer = bufio.NewReader(r)
- return nil
- }
- /*It seems golang's support for recursive in anonymous func is yet to complete.
- func1:=func(){
- func1()
- }
- won't work, failed to compile for it can't find func1.
- Should following workaround panic,
- readNext could have been called before the actual defination was made,
- This is very unlikely.
- */
- readNext := func(b []byte) (n int, err error) { panic("Runtime unstable. Please report this bug to developer.") }
- readNext = func(b []byte) (n int, err error) {
- if ws.readBuffer == nil {
- err = getNewBuffer()
- if err != nil {
- return 0, err
- }
- }
- n, err = ws.readBuffer.Read(b)
- if err == nil {
- return n, err
- }
- if err == io.EOF {
- ws.readBuffer = nil
- if n == 0 {
- return readNext(b)
- }
- return n, nil
- }
- return n, err
- }
- n, err = readNext(b)
- ws.rlock.Unlock()
- return n, err
- }
- func (ws *wsconn) Write(b []byte) (n int, err error) {
- ws.wlock.Lock()
- /*
- process can crash as websocket report "concurrent write to websocket connection"
- even if lock is persent.
- This problem should have been resolved.
- */
- defer func() {
- if r := recover(); r != nil {
- fmt.Println("WS workaround: recover", r)
- ws.wlock.Unlock()
- }
- }()
- if ws.connClosing {
- return 0, io.EOF
- }
- writeWs := func(b []byte) (n int, err error) {
- wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
- if err != nil {
- log.Warning("WS transport: ws connection NewFrameReader return " + err.Error())
- ws.connClosing = true
- ws.Close()
- return 0, err
- }
- n, err = wr.Write(b)
- if err != nil {
- return 0, err
- }
- err = wr.Close()
- if err != nil {
- return 0, err
- }
- return n, err
- }
- n, err = writeWs(b)
- ws.wlock.Unlock()
- return n, err
- }
- func (ws *wsconn) Close() error {
- ws.connClosing = true
- ws.wlock.Lock()
- ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))
- ws.wlock.Unlock()
- err := ws.wsc.Close()
- return err
- }
- func (ws *wsconn) LocalAddr() net.Addr {
- return ws.wsc.LocalAddr()
- }
- func (ws *wsconn) RemoteAddr() net.Addr {
- return ws.wsc.RemoteAddr()
- }
- func (ws *wsconn) SetDeadline(t time.Time) error {
- return func() error {
- errr := ws.SetReadDeadline(t)
- errw := ws.SetWriteDeadline(t)
- if errr == nil || errw == nil {
- return nil
- }
- if errr != nil {
- return errr
- }
- return errw
- }()
- }
- func (ws *wsconn) SetReadDeadline(t time.Time) error {
- return ws.wsc.SetReadDeadline(t)
- }
- func (ws *wsconn) SetWriteDeadline(t time.Time) error {
- return ws.wsc.SetWriteDeadline(t)
- }
- func (ws *wsconn) setup() {
- ws.connClosing = false
- ws.rlock = &sync.Mutex{}
- ws.wlock = &sync.Mutex{}
- ws.pingPong()
- }
- func (ws *wsconn) Reusable() bool {
- return ws.reusable && !ws.connClosing
- }
- func (ws *wsconn) SetReusable(reusable bool) {
- if !effectiveConfig.ConnectionReuse {
- return
- }
- ws.reusable = reusable
- }
- func (ws *wsconn) pingPong() {
- pongRcv := make(chan int, 0)
- ws.wsc.SetPongHandler(func(data string) error {
- pongRcv <- 0
- return nil
- })
- go func() {
- for !ws.connClosing {
- ws.wlock.Lock()
- ws.wsc.WriteMessage(websocket.PingMessage, nil)
- ws.wlock.Unlock()
- tick := time.NewTicker(time.Second * 30)
- select {
- case <-pongRcv:
- break
- case <-tick.C:
- if !ws.connClosing {
- log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())
- }
- ws.Close()
- }
- <-tick.C
- tick.Stop()
- }
- return
- }()
- }
|