Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix loadbalancer logic #356

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 31 additions & 28 deletions pkg/agent/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/sirupsen/logrus"
"github.com/inetaf/tcpproxy"
"github.com/xiaods/k8e/pkg/version"
"inet.af/tcpproxy"
"github.com/sirupsen/logrus"
)

// server tracks the connections to a server, so that they can be closed when the server is removed.
type server struct {
// This mutex protects access to the connections map. All direct access to the map should be protected by it.
mutex sync.Mutex
address string
healthCheck func() bool
connections map[net.Conn]struct{}
}

Expand All @@ -31,7 +35,9 @@ type serverConn struct {
// actually balance connections, but instead fails over to a new server only
// when a connection attempt to the currently selected server fails.
type LoadBalancer struct {
mutex sync.Mutex
// This mutex protects access to servers map and randomServers list.
// All direct access to the servers map/list should be protected by it.
mutex sync.RWMutex
proxy *tcpproxy.Proxy

serviceName string
Expand Down Expand Up @@ -123,26 +129,9 @@ func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPo
}
logrus.Infof("Running load balancer %s %s -> %v [default: %s]", serviceName, lb.localAddress, lb.ServerAddresses, lb.defaultServerAddress)

return lb, nil
}
go lb.runHealthChecks(ctx)

func (lb *LoadBalancer) SetDefault(serverAddress string) {
lb.mutex.Lock()
defer lb.mutex.Unlock()

_, hasOriginalServer := sortServers(lb.ServerAddresses, lb.defaultServerAddress)
// if the old default server is not currently in use, remove it from the server map
if server := lb.servers[lb.defaultServerAddress]; server != nil && !hasOriginalServer {
defer server.closeAll()
delete(lb.servers, lb.defaultServerAddress)
}
// if the new default server doesn't have an entry in the map, add one
if _, ok := lb.servers[serverAddress]; !ok {
lb.servers[serverAddress] = &server{connections: make(map[net.Conn]struct{})}
}

lb.defaultServerAddress = serverAddress
logrus.Infof("Updated load balancer %s default server address -> %s", lb.serviceName, serverAddress)
return lb, nil
}

func (lb *LoadBalancer) Update(serverAddresses []string) {
Expand All @@ -166,28 +155,38 @@ func (lb *LoadBalancer) LoadBalancerServerURL() string {
return lb.localServerURL
}

func (lb *LoadBalancer) dialContext(ctx context.Context, network, address string) (net.Conn, error) {
func (lb *LoadBalancer) dialContext(ctx context.Context, network, _ string) (net.Conn, error) {
lb.mutex.RLock()
defer lb.mutex.RUnlock()

var allChecksFailed bool
startIndex := lb.nextServerIndex
for {
targetServer := lb.currentServerAddress

server := lb.servers[targetServer]
if server == nil || targetServer == "" {
logrus.Debugf("Nil server for load balancer %s: %s", lb.serviceName, targetServer)
} else {
} else if allChecksFailed || server.healthCheck() {
dialTime := time.Now()
conn, err := server.dialContext(ctx, network, targetServer)
if err == nil {
return conn, nil
}
logrus.Debugf("Dial error from load balancer %s: %s", lb.serviceName, err)
logrus.Debugf("Dial error from load balancer %s after %s: %s", lb.serviceName, time.Now().Sub(dialTime), err)
// Don't close connections to the failed server if we're retrying with health checks ignored.
// We don't want to disrupt active connections if it is unlikely they will have anywhere to go.
if !allChecksFailed {
defer server.closeAll()
}
}

newServer, err := lb.nextServer(targetServer)
if err != nil {
return nil, err
}
if targetServer != newServer {
logrus.Debugf("Failed over to new server for load balancer %s: %s", lb.serviceName, newServer)
logrus.Debugf("Failed over to new server for load balancer %s: %s -> %s", lb.serviceName, targetServer, newServer)
}
if ctx.Err() != nil {
return nil, ctx.Err()
Expand All @@ -198,7 +197,11 @@ func (lb *LoadBalancer) dialContext(ctx context.Context, network, address string
startIndex = maxIndex
}
if lb.nextServerIndex == startIndex {
return nil, errors.New("all servers failed")
if allChecksFailed {
return nil, errors.New("all servers failed")
}
logrus.Debugf("Health checks for all servers in load balancer %s have failed: retrying with health checks ignored", lb.serviceName)
allChecksFailed = true
}
}
}
Expand All @@ -215,4 +218,4 @@ func ResetLoadBalancer(dataDir, serviceName string) error {
logrus.Warn(err)
}
return nil
}
}
5 changes: 3 additions & 2 deletions pkg/agent/loadbalancer/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/xiaods/k8e/pkg/cli/cmds"
"github.com/sirupsen/logrus"
)

func init() {
Expand Down Expand Up @@ -127,6 +127,7 @@ func Test_UnitFailOver(t *testing.T) {
conn2, err := net.Dial("tcp", localAddress)
if err != nil {
t.Fatalf("net.Dial failed: %v", err)

}
result2, err := ping(conn2)
if err != nil {
Expand Down Expand Up @@ -170,4 +171,4 @@ func Test_UnitFailFast(t *testing.T) {
case <-timeout:
t.Fatal("Test timed out")
}
}
}
7 changes: 3 additions & 4 deletions pkg/cli/cmds/init_linux.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
//go:build linux && cgo
// +build linux,cgo

package cmds

import (
"os"

"github.com/containerd/containerd/sys"
"github.com/containerd/containerd/pkg/userns"
"github.com/pkg/errors"
"github.com/rootless-containers/rootlesskit/pkg/parent/cgrouputil"
)

// EvacuateCgroup2 will handle evacuating the root cgroup in order to enable subtree_control,
// if running as pid 1 without rootless support.
func EvacuateCgroup2() error {
if os.Getpid() == 1 && !sys.RunningInUserNS() {
if os.Getpid() == 1 && !userns.RunningInUserNS() {
// The root cgroup has to be empty to enable subtree_control, so evacuate it by placing
// ourselves in the init cgroup.
if err := cgrouputil.EvacuateCgroup2("init"); err != nil {
return errors.Wrap(err, "failed to evacuate root cgroup")
}
}
return nil
}
}
Loading