Browse Source

Add remote address to grpc transport layer conn (#783)

* Add remote address to grpc transport layer conn

* go fmt
maskedeken 4 years ago
parent
commit
fb9a5a136b

+ 1 - 1
transport/internet/grpc/dial.go

@@ -60,7 +60,7 @@ func dialgRPC(ctx context.Context, dest net.Destination, streamSettings *interne
 	if err != nil {
 		return nil, newError("Cannot dial grpc").Base(err)
 	}
-	return encoding.NewClientConn(gunService), nil
+	return encoding.NewGunConn(gunService, nil), nil
 }
 
 func getGrpcClient(dest net.Destination, dialOption grpc.DialOption) (*grpc.ClientConn, error) {

+ 0 - 70
transport/internet/grpc/encoding/clientConn.go

@@ -1,70 +0,0 @@
-// +build !confonly
-
-package encoding
-
-import (
-	"bytes"
-	"io"
-	"net"
-	"time"
-)
-
-type ClientConn struct {
-	client GunService_TunClient
-	reader io.Reader
-}
-
-func (*ClientConn) LocalAddr() net.Addr {
-	return nil
-}
-
-func (*ClientConn) RemoteAddr() net.Addr {
-	return nil
-}
-
-func (*ClientConn) SetDeadline(time.Time) error {
-	return nil
-}
-
-func (*ClientConn) SetReadDeadline(time.Time) error {
-	return nil
-}
-
-func (*ClientConn) SetWriteDeadline(time.Time) error {
-	return nil
-}
-
-func (s *ClientConn) Read(b []byte) (n int, err error) {
-	if s.reader == nil {
-		h, err := s.client.Recv()
-		if err != nil {
-			return 0, newError("unable to read from gun tunnel").Base(err)
-		}
-		s.reader = bytes.NewReader(h.Data)
-	}
-	n, err = s.reader.Read(b)
-	if err == io.EOF {
-		s.reader = nil
-		return n, nil
-	}
-	return n, err
-}
-
-func (s *ClientConn) Write(b []byte) (n int, err error) {
-	err = s.client.Send(&Hunk{Data: b})
-	if err != nil {
-		return 0, newError("Unable to send data over gun").Base(err)
-	}
-	return len(b), nil
-}
-
-func (s *ClientConn) Close() error {
-	return s.client.CloseSend()
-}
-
-func NewClientConn(client GunService_TunClient) *ClientConn {
-	return &ClientConn{
-		client: client,
-		reader: nil,
-	}
-}

+ 113 - 0
transport/internet/grpc/encoding/conn.go

@@ -0,0 +1,113 @@
+// +build !confonly
+
+package encoding
+
+import (
+	"bytes"
+	"context"
+	"io"
+	"net"
+	"time"
+
+	"google.golang.org/grpc/peer"
+)
+
+// GunService is the abstract interface of GunService_TunClient and GunService_TunServer
+type GunService interface {
+	Context() context.Context
+	Send(*Hunk) error
+	Recv() (*Hunk, error)
+}
+
+// GunConn implements net.Conn for gun tunnel
+type GunConn struct {
+	service GunService
+	reader  io.Reader
+	over    context.CancelFunc
+	local   net.Addr
+	remote  net.Addr
+}
+
+// Read implements net.Conn.Read()
+func (c *GunConn) Read(b []byte) (n int, err error) {
+	if c.reader == nil {
+		h, err := c.service.Recv()
+		if err != nil {
+			return 0, newError("unable to read from gun tunnel").Base(err)
+		}
+		c.reader = bytes.NewReader(h.Data)
+	}
+	n, err = c.reader.Read(b)
+	if err == io.EOF {
+		c.reader = nil
+		return n, nil
+	}
+	return n, err
+}
+
+// Write implements net.Conn.Write()
+func (c *GunConn) Write(b []byte) (n int, err error) {
+	err = c.service.Send(&Hunk{Data: b})
+	if err != nil {
+		return 0, newError("Unable to send data over gun").Base(err)
+	}
+	return len(b), nil
+}
+
+// Close implements net.Conn.Close()
+func (c *GunConn) Close() error {
+	if c.over != nil {
+		c.over()
+	}
+	return nil
+}
+
+// LocalAddr implements net.Conn.LocalAddr()
+func (c *GunConn) LocalAddr() net.Addr {
+	return c.local
+}
+
+// RemoteAddr implements net.Conn.RemoteAddr()
+func (c *GunConn) RemoteAddr() net.Addr {
+	return c.remote
+}
+
+// SetDeadline implements net.Conn.SetDeadline()
+func (*GunConn) SetDeadline(time.Time) error {
+	return nil
+}
+
+// SetReadDeadline implements net.Conn.SetReadDeadline()
+func (*GunConn) SetReadDeadline(time.Time) error {
+	return nil
+}
+
+// SetWriteDeadline implements net.Conn.SetWriteDeadline()
+func (*GunConn) SetWriteDeadline(time.Time) error {
+	return nil
+}
+
+// NewGunConn creates GunConn which handles gun tunnel
+func NewGunConn(service GunService, over context.CancelFunc) *GunConn {
+	conn := &GunConn{
+		service: service,
+		reader:  nil,
+		over:    over,
+	}
+
+	conn.local = &net.TCPAddr{
+		IP:   []byte{0, 0, 0, 0},
+		Port: 0,
+	}
+	pr, ok := peer.FromContext(service.Context())
+	if ok {
+		conn.remote = pr.Addr
+	} else {
+		conn.remote = &net.TCPAddr{
+			IP:   []byte{0, 0, 0, 0},
+			Port: 0,
+		}
+	}
+
+	return conn
+}

+ 0 - 78
transport/internet/grpc/encoding/serverconn.go

@@ -1,78 +0,0 @@
-// +build !confonly
-
-package encoding
-
-import (
-	"bytes"
-	"context"
-	"io"
-	"net"
-	"time"
-)
-
-type ServerConn struct {
-	server GunService_TunServer
-	reader io.Reader
-	over   context.CancelFunc
-}
-
-func (s *ServerConn) Read(b []byte) (n int, err error) {
-	if s.reader == nil {
-		h, err := s.server.Recv()
-		if err != nil {
-			return 0, newError("unable to read from gun tunnel").Base(err)
-		}
-		s.reader = bytes.NewReader(h.Data)
-	}
-	n, err = s.reader.Read(b)
-	if err == io.EOF {
-		s.reader = nil
-		return n, nil
-	}
-	return n, err
-}
-
-func (s *ServerConn) Write(b []byte) (n int, err error) {
-	err = s.server.Send(&Hunk{Data: b})
-	if err != nil {
-		return 0, newError("Unable to send data over gun").Base(err)
-	}
-	return len(b), nil
-}
-
-func (s *ServerConn) Close() error {
-	s.over()
-	return nil
-}
-
-func (*ServerConn) LocalAddr() net.Addr {
-	return nil
-}
-
-func (*ServerConn) RemoteAddr() net.Addr {
-	newError("gun transport do not support get remote address").AtWarning().WriteToLog()
-	return &net.UnixAddr{
-		Name: "@placeholder",
-		Net:  "unix",
-	}
-}
-
-func (*ServerConn) SetDeadline(time.Time) error {
-	return nil
-}
-
-func (*ServerConn) SetReadDeadline(time.Time) error {
-	return nil
-}
-
-func (*ServerConn) SetWriteDeadline(time.Time) error {
-	return nil
-}
-
-func NewServerConn(server GunService_TunServer, over context.CancelFunc) *ServerConn {
-	return &ServerConn{
-		server: server,
-		reader: nil,
-		over:   over,
-	}
-}

+ 1 - 1
transport/internet/grpc/hub.go

@@ -29,7 +29,7 @@ type Listener struct {
 
 func (l Listener) Tun(server encoding.GunService_TunServer) error {
 	tunCtx, cancel := context.WithCancel(l.ctx)
-	l.handler(encoding.NewServerConn(server, cancel))
+	l.handler(encoding.NewGunConn(server, cancel))
 	<-tunCtx.Done()
 	return nil
 }