Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
server: serve all traffic on a single port
non-TLS case:

net.Listen -> cmux.New -> pgwire.Match -> pgwire.Server.ServeConn
              |
              -  -> cmux.HTTP2 -> http2.(*Server).ServeConn
              -  -> cmux.Any -> http.(*Server).Serve

TLS case:

net.Listen -> cmux.New -> pgwire.Match -> pgwire.Server.ServeConn
              |
              -  -> cmux.Any -> tls.NewListener -> http.(*Server).Serve

Impact on benchmarks is negligible:
```
name                   old time/op    new time/op    delta
Bank_Cockroach-4          376µs ± 7%     392µs ±13%    ~           (p=0.280 n=10+10)
Select1_Cockroach-4      57.8µs ±10%    52.5µs ± 4%  -9.03%          (p=0.002 n=9+9)
Select2_Cockroach-4       968µs ±11%     873µs ± 3%  -9.78%        (p=0.002 n=10+10)
Insert1_Cockroach-4       486µs ± 7%     500µs ± 9%    ~            (p=0.278 n=9+10)
Insert10_Cockroach-4      849µs ± 6%     874µs ± 9%    ~           (p=0.247 n=10+10)
Insert100_Cockroach-4    4.24ms ±12%    3.98ms ± 5%  -6.14%         (p=0.004 n=10+9)
Update1_Cockroach-4       861µs ± 8%     837µs ±10%    ~            (p=0.211 n=10+9)
Update10_Cockroach-4     1.75ms ±14%    1.60ms ± 8%  -8.55%        (p=0.023 n=10+10)
Update100_Cockroach-4    8.47ms ±12%    8.89ms ±19%    ~           (p=0.796 n=10+10)
Delete1_Cockroach-4       956µs ± 8%     943µs ± 7%    ~             (p=0.541 n=8+9)
Delete10_Cockroach-4     2.54ms ± 8%    2.42ms ± 4%  -4.88%        (p=0.019 n=10+10)
Delete100_Cockroach-4    19.4ms ± 4%    18.1ms ± 3%  -6.82%        (p=0.000 n=10+10)
Scan1_Cockroach-4         263µs ±11%     245µs ± 5%  -6.86%        (p=0.002 n=10+10)
Scan10_Cockroach-4        311µs ± 8%     290µs ± 5%  -6.91%        (p=0.004 n=10+10)
Scan100_Cockroach-4       669µs ± 6%     627µs ± 5%  -6.34%        (p=0.000 n=10+10)

name                   old alloc/op   new alloc/op   delta
Bank_Cockroach-4         45.9kB ± 0%    45.9kB ± 0%    ~             (p=0.867 n=8+7)
Select1_Cockroach-4      1.92kB ± 0%    1.92kB ± 0%    ~            (p=0.146 n=10+8)
Select2_Cockroach-4      72.6kB ± 0%    72.5kB ± 0%    ~           (p=0.225 n=10+10)
Insert1_Cockroach-4      23.4kB ± 0%    23.4kB ± 0%    ~            (p=0.062 n=9+10)
Insert10_Cockroach-4     74.7kB ± 0%    74.7kB ± 0%    ~            (p=0.412 n=9+10)
Insert100_Cockroach-4     586kB ± 0%     586kB ± 0%    ~             (p=0.450 n=9+9)
Update1_Cockroach-4      34.4kB ± 0%    34.3kB ± 0%    ~           (p=0.971 n=10+10)
Update10_Cockroach-4      123kB ± 0%     123kB ± 0%    ~            (p=0.371 n=8+10)
Update100_Cockroach-4     961kB ± 0%     961kB ± 0%    ~            (p=0.400 n=10+9)
Delete1_Cockroach-4      34.4kB ± 0%    34.4kB ± 0%    ~           (p=0.393 n=10+10)
Delete10_Cockroach-4      134kB ± 0%     134kB ± 0%    ~           (p=0.928 n=10+10)
Delete100_Cockroach-4    1.09MB ± 0%    1.09MB ± 0%    ~             (p=0.606 n=8+9)
Scan1_Cockroach-4        9.84kB ± 0%    9.85kB ± 0%    ~            (p=0.458 n=10+9)
Scan10_Cockroach-4       15.5kB ± 0%    15.5kB ± 0%  -0.13%        (p=0.033 n=10+10)
Scan100_Cockroach-4      60.5kB ± 0%    60.5kB ± 0%    ~           (p=0.446 n=10+10)

name                   old allocs/op  new allocs/op  delta
Bank_Cockroach-4            887 ± 0%       887 ± 0%    ~             (p=0.853 n=8+7)
Select1_Cockroach-4        39.0 ± 0%      39.0 ± 0%    ~     (all samples are equal)
Select2_Cockroach-4       2.10k ± 0%     2.10k ± 0%    ~           (p=1.000 n=10+10)
Insert1_Cockroach-4         323 ± 0%       323 ± 0%  -0.12%         (p=0.046 n=8+10)
Insert10_Cockroach-4        807 ± 0%       807 ± 0%  -0.05%         (p=0.046 n=8+10)
Insert100_Cockroach-4     5.36k ± 0%     5.36k ± 0%    ~             (p=0.550 n=9+9)
Update1_Cockroach-4         564 ± 0%       564 ± 0%    ~            (p=0.912 n=6+10)
Update10_Cockroach-4      1.40k ± 0%     1.40k ± 0%  +0.04%         (p=0.023 n=8+10)
Update100_Cockroach-4     9.11k ± 0%     9.11k ± 0%    ~            (p=0.572 n=10+9)
Delete1_Cockroach-4         570 ± 0%       569 ± 0%    ~           (p=0.513 n=10+10)
Delete10_Cockroach-4      1.75k ± 0%     1.75k ± 0%    ~            (p=1.000 n=10+7)
Delete100_Cockroach-4     12.9k ± 0%     12.9k ± 0%    ~             (p=0.798 n=8+9)
Scan1_Cockroach-4           181 ± 0%       181 ± 0%    ~     (all samples are equal)
Scan10_Cockroach-4          284 ± 0%       283 ± 0%  -0.18%        (p=0.033 n=10+10)
Scan100_Cockroach-4       1.19k ± 0%     1.19k ± 0%    ~           (p=1.000 n=10+10)

```
  • Loading branch information
tamird committed Feb 24, 2016
commit a2483e1cf144cbdd03970e1541157cb2a38b0f05
109 changes: 81 additions & 28 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package server

import (
"compress/gzip"
"crypto/tls"
"encoding/json"
"io"
"net"
Expand All @@ -29,7 +30,10 @@ import (
"sync"
"time"

"golang.org/x/net/http2"

assetfs "github.com/elazarl/go-bindata-assetfs"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"

snappy "github.com/cockroachdb/c-snappy"
Expand Down Expand Up @@ -78,7 +82,7 @@ type Server struct {
db *client.DB
kvDB *kv.DBServer
sqlServer sql.Server
pgServer *pgwire.Server
pgServer pgwire.Server
node *Node
recorder *status.NodeStatusRecorder
admin *adminServer
Expand Down Expand Up @@ -182,11 +186,7 @@ func NewServer(ctx *Context, stopper *stop.Stopper) (*Server, error) {
return nil, err
}

s.pgServer = pgwire.NewServer(&pgwire.Context{
Context: &s.ctx.Context,
Executor: s.sqlServer.Executor,
Stopper: stopper,
})
s.pgServer = pgwire.MakeServer(&s.ctx.Context, s.sqlServer.Executor)

// TODO(bdarnell): make StoreConfig configurable.
nCtx := storage.StoreContext{
Expand Down Expand Up @@ -229,18 +229,82 @@ func (s *Server) Start() error {
return err
}

unresolvedAddr := util.NewUnresolvedAddr("tcp", s.ctx.Addr)
ln, err := util.ListenAndServe(s.stopper, s, unresolvedAddr, tlsConfig)
// The following code is a specialization of util/net.go's ListenAndServe
// which adds pgwire support. A single port is used to serve all protocols
// (pg, http, h2) via the following construction:
//
// non-TLS case:
// net.Listen -> cmux.New -> pgwire.Match -> pgwire.Server.ServeConn
// |
// - -> cmux.HTTP2 -> http2.(*Server).ServeConn
// - -> cmux.Any -> http.(*Server).Serve
//
// TLS case:
// net.Listen -> cmux.New -> pgwire.Match -> pgwire.Server.ServeConn
// |
// - -> cmux.Any -> tls.NewListener -> http.(*Server).Serve
//
// Note that the difference between the TLS and non-TLS cases exists due to
// Go's lack of an h2c (HTTP2 Clear Text) implementation. See inline comments
// in util.ListenAndServe for an explanation of how h2c is implemented there
// and here.

ln, err := net.Listen("tcp", s.ctx.Addr)
if err != nil {
return err
}

if err := officializeAddr(unresolvedAddr, ln.Addr()); err != nil {
unresolvedAddr, err := officialAddr(s.ctx.Addr, ln.Addr())
if err != nil {
return err
}
s.ctx.Addr = unresolvedAddr.String()

s.rpcContext.SetLocalServer(s.rpc, unresolvedAddr.String())
s.stopper.RunWorker(func() {
<-s.stopper.ShouldDrain()
if err := ln.Close(); err != nil {
log.Fatal(err)
}
})

m := cmux.New(ln)
pgL := m.Match(pgwire.Match)

var serveConn func(net.Listener, func(net.Conn)) error
if tlsConfig != nil {
anyL := m.Match(cmux.Any())
serveConn = util.ServeHandler(s.stopper, s, tls.NewListener(anyL, tlsConfig), tlsConfig)
} else {
h2L := m.Match(cmux.HTTP2())
anyL := m.Match(cmux.Any())

var h2 http2.Server
serveConnOpts := &http2.ServeConnOpts{
Handler: s,
}
serveH2 := func(conn net.Conn) {
h2.ServeConn(conn, serveConnOpts)
}

serveConn = util.ServeHandler(s.stopper, s, anyL, tlsConfig)

s.stopper.RunWorker(func() {
util.FatalIfUnexpected(serveConn(h2L, serveH2))
})
}

s.stopper.RunWorker(func() {
util.FatalIfUnexpected(serveConn(pgL, func(conn net.Conn) {
if err := s.pgServer.ServeConn(conn); err != nil && !util.IsClosedConnection(err) {
log.Error(err)
}
}))
})

s.stopper.RunWorker(func() {
util.FatalIfUnexpected(m.Serve())
})

s.rpcContext.SetLocalServer(s.rpc, s.ctx.Addr)

s.gossip.Start(s.grpc, unresolvedAddr)

Expand All @@ -267,18 +331,9 @@ func (s *Server) Start() error {

s.status = newStatusServer(s.db, s.gossip, s.registry, s.ctx)

log.Infof("starting %s server at %s", s.ctx.HTTPRequestScheme(), unresolvedAddr)
log.Infof("starting %s/postgres server at %s", s.ctx.HTTPRequestScheme(), unresolvedAddr)
s.initHTTP()

pgAddr := util.NewUnresolvedAddr("tcp", s.ctx.PGAddr)
if err := s.pgServer.Start(pgAddr); err != nil {
return err
}
if err := officializeAddr(pgAddr, s.pgServer.Addr()); err != nil {
return err
}
s.ctx.PGAddr = pgAddr.String()

return nil
}

Expand Down Expand Up @@ -466,15 +521,15 @@ func (w *snappyResponseWriter) Close() {
}
}

func officializeAddr(unresolvedAddr *util.UnresolvedAddr, resolvedAddr net.Addr) error {
unresolvedHost, unresolvedPort, err := net.SplitHostPort(unresolvedAddr.String())
func officialAddr(unresolvedAddr string, resolvedAddr net.Addr) (*util.UnresolvedAddr, error) {
unresolvedHost, unresolvedPort, err := net.SplitHostPort(unresolvedAddr)
if err != nil {
return err
return nil, err
}

resolvedHost, resolvedPort, err := net.SplitHostPort(resolvedAddr.String())
if err != nil {
return err
return nil, err
}

var host string
Expand All @@ -499,7 +554,5 @@ func officializeAddr(unresolvedAddr *util.UnresolvedAddr, resolvedAddr net.Addr)
port = resolvedPort
}

unresolvedAddr.AddressField = net.JoinHostPort(host, port)

return nil
return util.NewUnresolvedAddr(resolvedAddr.Network(), net.JoinHostPort(host, port)), nil
}
9 changes: 3 additions & 6 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,15 +586,12 @@ func TestSystemConfigGossip(t *testing.T) {
}

func checkOfficialize(t *testing.T, network, oldAddrString, newAddrString, expAddrString string) {
unresolvedAddr := util.NewUnresolvedAddr(network, oldAddrString)
resolvedAddr := util.NewUnresolvedAddr(network, newAddrString)

if err := officializeAddr(unresolvedAddr, resolvedAddr); err != nil {
if unresolvedAddr, err := officialAddr(oldAddrString, resolvedAddr); err != nil {
t.Fatal(err)
}

if retAddrString := unresolvedAddr.String(); retAddrString != expAddrString {
t.Errorf("officializeAddr(%s, %s) was %s; expected %s", oldAddrString, newAddrString, retAddrString, expAddrString)
} else if retAddrString := unresolvedAddr.String(); retAddrString != expAddrString {
t.Errorf("officialAddr(%s, %s) was %s; expected %s", oldAddrString, newAddrString, retAddrString, expAddrString)
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (ts *TestServer) ServingPort() (string, error) {

// PGAddr returns the Postgres-protocol endpoint's address.
func (ts *TestServer) PGAddr() string {
return ts.pgServer.Addr().String()
return ts.ServingAddr()
}

// PGPort returns the port portion of the Postgres-protocol endpoint's address.
Expand Down
30 changes: 0 additions & 30 deletions sql/pgwire/context.go

This file was deleted.

120 changes: 29 additions & 91 deletions sql/pgwire/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"crypto/tls"
"io"
"net"
"sync"

"github.com/cockroachdb/cockroach/base"
"github.com/cockroachdb/cockroach/security"
"github.com/cockroachdb/cockroach/sql"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/metric"
Expand All @@ -44,11 +45,9 @@ var (

// Server implements the server side of the PostgreSQL wire protocol.
type Server struct {
context *Context
listener net.Listener
mu sync.Mutex // Mutex protects the fields below
conns map[net.Conn]struct{}
closing bool
context *base.Context
executor *sql.Executor

registry *metric.Registry
metrics *serverMetrics
}
Expand All @@ -59,103 +58,42 @@ type serverMetrics struct {
conns *metric.Counter
}

// NewServer creates a Server.
func NewServer(context *Context) *Server {
// MakeServer creates a Server.
func MakeServer(context *base.Context, executor *sql.Executor) Server {
// Create a registry to hold pgwire stats.
reg := metric.NewRegistry()
metrics := &serverMetrics{
conns: reg.Counter("conns"),
bytesInCount: reg.Counter("bytesin"),
bytesOutCount: reg.Counter("bytesout"),
}

return &Server{
return Server{
context: context,
conns: make(map[net.Conn]struct{}),
metrics: metrics,
executor: executor,
registry: reg,
metrics: &serverMetrics{
conns: reg.Counter("conns"),
bytesInCount: reg.Counter("bytesin"),
bytesOutCount: reg.Counter("bytesout"),
},
}
}

// Start a server on the given address.
func (s *Server) Start(addr net.Addr) error {
ln, err := net.Listen(addr.Network(), addr.String())
// Match returns true if rd appears to be a Postgres connection.
func Match(rd io.Reader) bool {
var buf readBuffer
_, err := buf.readUntypedMsg(rd)
if err != nil {
return err
}
s.listener = ln

s.context.Stopper.RunWorker(func() {
s.serve(ln)
})

s.context.Stopper.RunWorker(func() {
<-s.context.Stopper.ShouldStop()
s.close()
})
log.Infof("starting postgres server at %s", ln.Addr())
return nil
}

// Addr returns this Server's address.
func (s *Server) Addr() net.Addr {
return s.listener.Addr()
}

// serve connections on this listener until it is closed.
func (s *Server) serve(ln net.Listener) {
for {
conn, err := ln.Accept()
if err != nil {
if !s.isClosing() {
log.Error(err)
}
return
}

s.mu.Lock()
s.conns[conn] = struct{}{}
s.mu.Unlock()
s.metrics.conns.Inc(1)

go func() {
defer func() {
s.mu.Lock()
delete(s.conns, conn)
s.mu.Unlock()
s.metrics.conns.Dec(1)
conn.Close()
}()

if err := s.serveConn(conn); err != nil {
if err != io.EOF && !s.isClosing() {
log.Error(err)
}
}
}()
return false
}
}

func (s *Server) isClosing() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.closing
}

// close this server, and all client connections.
func (s *Server) close() {
s.listener.Close()
s.mu.Lock()
defer s.mu.Unlock()
s.closing = true
for conn := range s.conns {
conn.Close()
version, err := buf.getInt32()
if err != nil {
return false
}
return version == version30 || version == versionSSL
}

// serveConn serves a single connection, driving the handshake process
// ServeConn serves a single connection, driving the handshake process
// and delegating to the appropriate connection type.
func (s *Server) serveConn(conn net.Conn) error {
func (s *Server) ServeConn(conn net.Conn) error {
s.metrics.conns.Inc(1)
defer s.metrics.conns.Dec(1)

var buf readBuffer
n, err := buf.readUntypedMsg(conn)
if err != nil {
Expand Down Expand Up @@ -201,7 +139,7 @@ func (s *Server) serveConn(conn net.Conn) error {
}

if version == version30 {
v3conn := makeV3Conn(conn, s.context.Executor, s.metrics)
v3conn := makeV3Conn(conn, s.executor, s.metrics)
// This is better than always flushing on error.
defer func() {
if err := v3conn.wr.Flush(); err != nil {
Expand Down
Loading