Skip to content

Commit

Permalink
fix loadbalancer logic
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaods committed Oct 21, 2024
1 parent ffe5f18 commit e8cade1
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 34 deletions.
59 changes: 31 additions & 28 deletions pkg/agent/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/sirupsen/logrus"
"github.com/inetaf/tcpproxy"
"github.com/xiaods/k8e/pkg/version"
"inet.af/tcpproxy"
"github.com/sirupsen/logrus"
)

// server tracks the connections to a server, so that they can be closed when the server is removed.
type server struct {
// This mutex protects access to the connections map. All direct access to the map should be protected by it.
mutex sync.Mutex
address string
healthCheck func() bool
connections map[net.Conn]struct{}
}

Expand All @@ -31,7 +35,9 @@ type serverConn struct {
// actually balance connections, but instead fails over to a new server only
// when a connection attempt to the currently selected server fails.
type LoadBalancer struct {
mutex sync.Mutex
// This mutex protects access to servers map and randomServers list.
// All direct access to the servers map/list should be protected by it.
mutex sync.RWMutex
proxy *tcpproxy.Proxy

serviceName string
Expand Down Expand Up @@ -123,26 +129,9 @@ func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPo
}
logrus.Infof("Running load balancer %s %s -> %v [default: %s]", serviceName, lb.localAddress, lb.ServerAddresses, lb.defaultServerAddress)

return lb, nil
}
go lb.runHealthChecks(ctx)

func (lb *LoadBalancer) SetDefault(serverAddress string) {
lb.mutex.Lock()
defer lb.mutex.Unlock()

_, hasOriginalServer := sortServers(lb.ServerAddresses, lb.defaultServerAddress)
// if the old default server is not currently in use, remove it from the server map
if server := lb.servers[lb.defaultServerAddress]; server != nil && !hasOriginalServer {
defer server.closeAll()
delete(lb.servers, lb.defaultServerAddress)
}
// if the new default server doesn't have an entry in the map, add one
if _, ok := lb.servers[serverAddress]; !ok {
lb.servers[serverAddress] = &server{connections: make(map[net.Conn]struct{})}
}

lb.defaultServerAddress = serverAddress
logrus.Infof("Updated load balancer %s default server address -> %s", lb.serviceName, serverAddress)
return lb, nil
}

func (lb *LoadBalancer) Update(serverAddresses []string) {
Expand All @@ -166,28 +155,38 @@ func (lb *LoadBalancer) LoadBalancerServerURL() string {
return lb.localServerURL
}

func (lb *LoadBalancer) dialContext(ctx context.Context, network, address string) (net.Conn, error) {
func (lb *LoadBalancer) dialContext(ctx context.Context, network, _ string) (net.Conn, error) {
lb.mutex.RLock()
defer lb.mutex.RUnlock()

var allChecksFailed bool
startIndex := lb.nextServerIndex
for {
targetServer := lb.currentServerAddress

server := lb.servers[targetServer]
if server == nil || targetServer == "" {
logrus.Debugf("Nil server for load balancer %s: %s", lb.serviceName, targetServer)
} else {
} else if allChecksFailed || server.healthCheck() {
dialTime := time.Now()
conn, err := server.dialContext(ctx, network, targetServer)
if err == nil {
return conn, nil
}
logrus.Debugf("Dial error from load balancer %s: %s", lb.serviceName, err)
logrus.Debugf("Dial error from load balancer %s after %s: %s", lb.serviceName, time.Now().Sub(dialTime), err)
// Don't close connections to the failed server if we're retrying with health checks ignored.
// We don't want to disrupt active connections if it is unlikely they will have anywhere to go.
if !allChecksFailed {
defer server.closeAll()
}
}

newServer, err := lb.nextServer(targetServer)
if err != nil {
return nil, err
}
if targetServer != newServer {
logrus.Debugf("Failed over to new server for load balancer %s: %s", lb.serviceName, newServer)
logrus.Debugf("Failed over to new server for load balancer %s: %s -> %s", lb.serviceName, targetServer, newServer)
}
if ctx.Err() != nil {
return nil, ctx.Err()
Expand All @@ -198,7 +197,11 @@ func (lb *LoadBalancer) dialContext(ctx context.Context, network, address string
startIndex = maxIndex
}
if lb.nextServerIndex == startIndex {
return nil, errors.New("all servers failed")
if allChecksFailed {
return nil, errors.New("all servers failed")
}
logrus.Debugf("Health checks for all servers in load balancer %s have failed: retrying with health checks ignored", lb.serviceName)
allChecksFailed = true
}
}
}
Expand All @@ -215,4 +218,4 @@ func ResetLoadBalancer(dataDir, serviceName string) error {
logrus.Warn(err)
}
return nil
}
}
5 changes: 3 additions & 2 deletions pkg/agent/loadbalancer/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/xiaods/k8e/pkg/cli/cmds"
"github.com/sirupsen/logrus"
)

func init() {
Expand Down Expand Up @@ -127,6 +127,7 @@ func Test_UnitFailOver(t *testing.T) {
conn2, err := net.Dial("tcp", localAddress)
if err != nil {
t.Fatalf("net.Dial failed: %v", err)

}
result2, err := ping(conn2)
if err != nil {
Expand Down Expand Up @@ -170,4 +171,4 @@ func Test_UnitFailFast(t *testing.T) {
case <-timeout:
t.Fatal("Test timed out")
}
}
}
7 changes: 3 additions & 4 deletions pkg/cli/cmds/init_linux.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
//go:build linux && cgo
// +build linux,cgo

package cmds

import (
"os"

"github.com/containerd/containerd/sys"
"github.com/containerd/containerd/pkg/userns"
"github.com/pkg/errors"
"github.com/rootless-containers/rootlesskit/pkg/parent/cgrouputil"
)

// EvacuateCgroup2 will handle evacuating the root cgroup in order to enable subtree_control,
// if running as pid 1 without rootless support.
func EvacuateCgroup2() error {
if os.Getpid() == 1 && !sys.RunningInUserNS() {
if os.Getpid() == 1 && !userns.RunningInUserNS() {
// The root cgroup has to be empty to enable subtree_control, so evacuate it by placing
// ourselves in the init cgroup.
if err := cgrouputil.EvacuateCgroup2("init"); err != nil {
return errors.Wrap(err, "failed to evacuate root cgroup")
}
}
return nil
}
}

0 comments on commit e8cade1

Please sign in to comment.