Skip to content

Commit

Permalink
Fix Blackhole implemention for e2e tests
Browse files Browse the repository at this point in the history
Based on Fu Wei's idea discussed in the [issue](etcd-io#17737),
we employ the blocking on L7 but without using external tools.

[Background]

A peer will
(a) receive traffic from its peers
(b) initiate connections to its peers (via stream and pipeline).

Thus, the current mechanism of only blocking peer traffic via the peer's
existing proxy is insufficient, since only scenario (a) is handled, and
scenario (b) is not blocked at all.

[Proposed solution]

We introduce an forward proxy for each peer, which will be proxying all
the connections initiated from a peer to its peers.

The modified architecture will look something like this:
```
A -- A's forward proxy ----- B's reverse proxy - B
     ^ newly introduced      ^ in the original codebase (renamed)
```

By adding this forward proxy, we can block all in and out traffic that
is initiated from a peer to others, without having to resort to external
tools, such as iptables.

[Testing]
- `make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionLeader$ go.etcd.io/etcd/tests/v3/e2e -v -count=1`
- `make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionFollower$ go.etcd.io/etcd/tests/v3/e2e -v -count=1`

[Issues]
- I run into `context deadline exceeded` sometimes
```
    etcd_mix_versions_test.go:175:
                Error Trace:    /Users/henrybear327/go/src/etcd/tests/e2e/etcd_mix_versions_test.go:175
                                                        /Users/henrybear327/go/src/etcd/tests/e2e/blackhole_test.go:75
                                                        /Users/henrybear327/go/src/etcd/tests/e2e/blackhole_test.go:31
                Error:          Received unexpected error:
                                [/Users/henrybear327/go/src/etcd/bin/etcdctl --endpoints=http://localhost:20006 put key-0 value-0] match not found.  Set EXPECT_DEBUG for more info Errs: [unexpected exit code [1] after running [/Users/henrybear327/go/src/etcd/bin/etcdctl --endpoints=http://localhost:20006 put key-0 value-0]], last lines:
                                {"level":"warn","ts":"2024-05-05T23:02:36.809726+0800","logger":"etcd-client","caller":"[email protected]/retry_interceptor.go:65","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0x140001ee960/localhost:20006","method":"/etcdserverpb.KV/Put","attempt":0,"error":"rpc error: code = DeadlineExceeded desc = context deadline exceeded"}
                                Error: context deadline exceeded
                                 (expected "OK", got []). Try EXPECT_DEBUG=TRUE
                Test:           TestBlackholeByMockingPartitionLeader
                Messages:       failed to put "key-0", error: [/Users/henrybear327/go/src/etcd/bin/etcdctl --endpoints=http://localhost:20006 put key-0 value-0] match not found.  Set EXPECT_DEBUG for more info Errs: [unexpected exit code [1] after running [/Users/henrybear327/go/src/etcd/bin/etcdctl --endpoints=http://localhost:20006 put key-0 value-0]], last lines:
                                {"level":"warn","ts":"2024-05-05T23:02:36.809726+0800","logger":"etcd-client","caller":"[email protected]/retry_interceptor.go:65","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0x140001ee960/localhost:20006","method":"/etcdserverpb.KV/Put","attempt":0,"error":"rpc error: code = DeadlineExceeded desc = context deadline exceeded"}
                                Error: context deadline exceeded
                                 (expected "OK", got []). Try EXPECT_DEBUG=TRUE
```

[References]
[1] issue etcd-io#17737
[2] PR (V1) https://github.com/henrybear327/etcd/tree/fix/e2e_blackhole
[3] PR (V2) etcd-io#17891

Signed-off-by: Chun-Hung Tseng <[email protected]>

It's verified that the blocking of traffic is complete, compared to
previous solutions [2][3].

[Implementation]

The main subtasks are
- set up an environment variable `E2E_TEST_FORWARD_PROXY_IP`
- implement forward proxy by extending the existing proxy server code
- implement enable/disable of the forward proxy in the e2e test

The result is that for every peer, we will have the arch like this
```
A -- A's forward proxy
     (connections initiated from A will be forwarded from this proxy)
 |   ^ covers case (b)
 |
 --- A's (currently existing) reverse proxy
     (advertised to other peers where the connection should come in from)
     ^ covers case (a)
```
  • Loading branch information
henrybear327 committed May 21, 2024
1 parent 35f09f7 commit 6a7fdd1
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 75 deletions.
14 changes: 13 additions & 1 deletion client/pkg/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
)
Expand All @@ -31,7 +33,17 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er
}

t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Proxy: func(req *http.Request) (*url.URL, error) {
// according to the comment of http.ProxyFromEnvironment: if the
// proxy URL is "localhost" (with or without a port number),
// then a nil URL and nil error will be returned.
// Thus, we need to workaround this by manually setting an
// ENV named FORWARD_PROXY and parse the URL (which is a localhost in our case)
if forwardProxy, exists := os.LookupEnv("FORWARD_PROXY"); exists {
return url.Parse(forwardProxy)
}
return http.ProxyFromEnvironment(req)
},
DialContext: (&net.Dialer{
Timeout: dialtimeoutd,
// value taken from http.DefaultTransport
Expand Down
100 changes: 84 additions & 16 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package proxy

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -130,18 +132,21 @@ type Server interface {

// ServerConfig defines proxy server configuration.
type ServerConfig struct {
Logger *zap.Logger
From url.URL
To url.URL
TLSInfo transport.TLSInfo
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration
Logger *zap.Logger
From url.URL
To url.URL
TLSInfo transport.TLSInfo
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration
IsForwardProxy bool
}

type server struct {
lg *zap.Logger

isForwardProxy bool

from url.URL
fromPort int
to url.URL
Expand Down Expand Up @@ -194,6 +199,8 @@ func NewServer(cfg ServerConfig) Server {
s := &server{
lg: cfg.Logger,

isForwardProxy: cfg.IsForwardProxy,

from: cfg.From,
to: cfg.To,

Expand All @@ -216,10 +223,12 @@ func NewServer(cfg ServerConfig) Server {
if err == nil {
s.fromPort, _ = strconv.Atoi(fromPort)
}
var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
s.toPort, _ = strconv.Atoi(toPort)
if !s.isForwardProxy {
var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
s.toPort, _ = strconv.Atoi(toPort)
}
}

if s.dialTimeout == 0 {
Expand All @@ -239,8 +248,10 @@ func NewServer(cfg ServerConfig) Server {
if strings.HasPrefix(s.from.Scheme, "http") {
s.from.Scheme = "tcp"
}
if strings.HasPrefix(s.to.Scheme, "http") {
s.to.Scheme = "tcp"
if !s.isForwardProxy {
if strings.HasPrefix(s.to.Scheme, "http") {
s.to.Scheme = "tcp"
}
}

addr := fmt.Sprintf(":%d", s.fromPort)
Expand Down Expand Up @@ -273,7 +284,10 @@ func (s *server) From() string {
}

func (s *server) To() string {
return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host)
if !s.isForwardProxy {
return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host)
}
return ""
}

// TODO: implement packet reordering from multiple TCP connections
Expand Down Expand Up @@ -353,6 +367,44 @@ func (s *server) listenAndServe() {
continue
}

parseHeaderForDestination := func() *string {
// the first request should always contain a CONNECT header field
// since we set the transport to forward the traffic to the proxy
buf := make([]byte, s.bufferSize)
var data []byte
var nr1 int
if nr1, err = in.Read(buf); err != nil {
if err == io.EOF {
return nil
// why??
// panic("No data available for forward proxy to work on")
}
panic(err)
} else {
data = buf[:nr1]
}

// attempt to parse for the HOST from the CONNECT request
var req *http.Request
if req, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(data))); err != nil {
panic("Failed to parse header in forward proxy")
}

if req.Method == http.MethodConnect {
// make sure a reply is sent back to the client
connectResponse := &http.Response{
StatusCode: 200,
ProtoMajor: 1,
ProtoMinor: 1,
}
connectResponse.Write(in)

return &req.URL.Host
}

panic("Wrong header type to start the connection")
}

var out net.Conn
if !s.tlsInfo.Empty() {
var tp *http.Transport
Expand All @@ -370,9 +422,25 @@ func (s *server) listenAndServe() {
}
continue
}
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
if s.isForwardProxy {
if dest := parseHeaderForDestination(); dest == nil {
continue
} else {
out, err = tp.DialContext(ctx, "tcp", *dest)
}
} else {
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
}
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
if s.isForwardProxy {
if dest := parseHeaderForDestination(); dest == nil {
continue
} else {
out, err = net.Dial("tcp", *dest)
}
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
}
}
if err != nil {
select {
Expand Down
21 changes: 13 additions & 8 deletions tests/e2e/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,20 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
require.NoError(t, epc.Close(), "failed to close etcd cluster")
}()

leaderId := epc.WaitLeader(t)
mockPartitionNodeIndex := leaderId
leaderID := epc.WaitLeader(t)
mockPartitionNodeIndex := leaderID
if !partitionLeader {
mockPartitionNodeIndex = (leaderId + 1) % (clusterSize)
mockPartitionNodeIndex = (leaderID + 1) % (clusterSize)
}
partitionedMember := epc.Procs[mockPartitionNodeIndex]
// Mock partition
proxy := partitionedMember.PeerProxy()
forwardProxy := partitionedMember.PeerForwardProxy()
reverseProxy := partitionedMember.PeerReverseProxy()
t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
forwardProxy.BlackholeTx()
forwardProxy.BlackholeRx()
reverseProxy.BlackholeTx()
reverseProxy.BlackholeRx()

t.Logf("Wait 5s for any open connections to expire")
time.Sleep(5 * time.Second)
Expand All @@ -79,8 +82,10 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
// Wait for some time to restore the network
time.Sleep(1 * time.Second)
t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()
forwardProxy.UnblackholeTx()
forwardProxy.UnblackholeRx()
reverseProxy.UnblackholeTx()
reverseProxy.UnblackholeRx()

leaderEPC = epc.Procs[epc.WaitLeader(t)]
time.Sleep(5 * time.Second)
Expand Down
9 changes: 6 additions & 3 deletions tests/e2e/http_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,13 @@ func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCl

func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
member := clus.Procs[0]
proxy := member.PeerProxy()
forwardProxy := member.PeerForwardProxy()
reverseProxy := member.PeerReverseProxy()
t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
forwardProxy.BlackholeTx()
forwardProxy.BlackholeRx()
reverseProxy.BlackholeTx()
reverseProxy.BlackholeRx()
}

func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
Expand Down
32 changes: 25 additions & 7 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,13 @@ func (cfg *EtcdProcessClusterConfig) SetInitialOrDiscovery(serverCfg *EtcdServer
func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i int) *EtcdServerProcessConfig {
var curls []string
var curl string
port := cfg.BasePort + 5*i
port := cfg.BasePort + 6*i
clientPort := port
peerPort := port + 1
peerPort := port + 1 // the port that the peer actually listens on
metricsPort := port + 2
peer2Port := port + 3
reverseProxyPort := port + 3 // the port that the peer advertises
clientHTTPPort := port + 4
forwardProxyPort := port + 5

if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS)
Expand All @@ -498,17 +499,33 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in

peerListenURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
peerAdvertiseURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
var proxyCfg *proxy.ServerConfig
var forwardProxyCfg *proxy.ServerConfig
var reverseProxyCfg *proxy.ServerConfig
if cfg.PeerProxy {
if !cfg.IsPeerTLS {
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
}
peerAdvertiseURL.Host = fmt.Sprintf("localhost:%d", peer2Port)
proxyCfg = &proxy.ServerConfig{

// setup reverse proxy
peerAdvertiseURL.Host = fmt.Sprintf("localhost:%d", reverseProxyPort)
reverseProxyCfg = &proxy.ServerConfig{
Logger: zap.NewNop(),
To: peerListenURL,
From: peerAdvertiseURL,
}

// setup forward proxy
forwardProxyURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", forwardProxyPort)}
forwardProxyCfg = &proxy.ServerConfig{
Logger: zap.NewNop(),
From: forwardProxyURL,
IsForwardProxy: true,
}

if cfg.EnvVars == nil {
cfg.EnvVars = make(map[string]string)
}
cfg.EnvVars["FORWARD_PROXY"] = fmt.Sprintf("http://127.0.0.1:%d", forwardProxyPort)
}

name := fmt.Sprintf("%s-test-%d", testNameCleanRegex.ReplaceAllString(tb.Name(), ""), i)
Expand Down Expand Up @@ -630,7 +647,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
InitialToken: cfg.ServerConfig.InitialClusterToken,
GoFailPort: gofailPort,
GoFailClientTimeout: cfg.GoFailClientTimeout,
Proxy: proxyCfg,
ReverseProxy: reverseProxyCfg,
ForwardProxy: forwardProxyCfg,
LazyFSEnabled: cfg.LazyFSEnabled,
}
}
Expand Down
Loading

0 comments on commit 6a7fdd1

Please sign in to comment.