Skip to content

Commit

Permalink
core: amend primary election (part three)
Browse files Browse the repository at this point in the history
* node => current-primary retry via pub-addr, if differnt
* with refactoring; logs
* part three, prev. commit: e3503fc

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 20, 2024
1 parent c2cebdf commit 9c961d5
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 40 deletions.
8 changes: 5 additions & 3 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -1429,9 +1429,11 @@ func (h *htrun) reqHealth(si *meta.Snode, tout time.Duration, q url.Values, smap
freeCR(res)

if err != nil && len(retry) > 0 {
// [NOTE]
// about to remove the node from the cluster map - not checking IsErrDNSLookup and similar
// (ie., not trying to narrow down - compare w/ slow-keepalive)
// [NOTE] retrying when:
// - about to remove the node from the cluster map, or
// - about to elect new primary
// not checking `IsErrDNSLookup` and similar - ie., not trying to narrow down
// (compare w/ slow-keepalive)
if si.PubNet.Hostname != si.ControlNet.Hostname {
cargs.req.Base = si.URL(cmn.NetPublic)
nlog.Warningln("retrying via pub addr:", cargs.req.Base)
Expand Down
5 changes: 2 additions & 3 deletions ais/kalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,8 @@ func (pkr *palive) retry(si *meta.Snode, ticker *time.Ticker, timeout time.Durat
started = mono.NanoTime()
smap = pkr.p.owner.smap.get()
)
// retry upon (intermittent) network failure
// (compare with slowKalive)
_, status, err := pkr.p.reqHealth(si, timeout, nil, smap, 1 /*retry via pub-addr*/)
// retry via pub-addr if different (compare with slowKalive)
_, status, err := pkr.p.reqHealth(si, timeout, nil, smap, 1 /*retry*/)
if err == nil {
now := mono.NanoTime()
pkr.statsT.Add(stats.KeepAliveLatency, now-started)
Expand Down
90 changes: 56 additions & 34 deletions ais/vote.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func (p *proxy) voteHandler(w http.ResponseWriter, r *http.Request) {

// PUT /v1/vote/init (via sendElectionRequest)
func (p *proxy) httpelect(w http.ResponseWriter, r *http.Request) {
const tag = "[httpelect]"
var (
pname = p.String()
pnameC = pname + ":"
)
if _, err := p.parseURL(w, r, apc.URLPathVoteInit.L, 0, false); err != nil {
return
}
Expand All @@ -123,15 +128,16 @@ func (p *proxy) httpelect(w http.ResponseWriter, r *http.Request) {
}
newSmap := msg.Request.Smap
if err := newSmap.validate(); err != nil {
p.writeErrf(w, r, "%s: invalid %s in the Vote Request, err: %v", p.si, newSmap, err)
p.writeErrf(w, r, "%s %s: invalid %s in the Vote Request, err: %v", tag, pname, newSmap, err)
return
}
smap := p.owner.smap.get()
caller := r.Header.Get(apc.HdrCallerName)
nlog.Infof("[vote] receive %s from %q (local: %s)", newSmap.StringEx(), caller, smap.StringEx())

nlog.Infoln(tag, pnameC, "receive", newSmap.StringEx(), "from", caller, "local [", smap.StringEx(), "]")

if !newSmap.isPresent(p.si) {
p.writeErrf(w, r, "%s: not present in the Vote Request, %s", p.si, newSmap)
p.writeErrf(w, r, "%s %s: not present in the Vote Request, %s", tag, pname, newSmap)
return
}
debug.Assert(!newSmap.isPrimary(p.si))
Expand All @@ -145,25 +151,25 @@ func (p *proxy) httpelect(w http.ResponseWriter, r *http.Request) {
}
}
if err != nil {
p.writeErr(w, r, cmn.NewErrFailedTo(p, "synchronize", newSmap, err))
p.writeErr(w, r, cmn.NewErrFailedTo(p, "synchronize", newSmap.StringEx(), err))
return
}
}

smap = p.owner.smap.get()
psi, err := smap.HrwProxy(smap.Primary.ID())
psi, err := smap.HrwProxy(smap.Primary.ID() /*skip*/)
if err != nil {
p.writeErr(w, r, err)
return
}

// proceed with election iff:
if psi.ID() != p.SID() {
nlog.Warningf("%s: not next in line %s", p, psi)
nlog.Warningln(tag, pnameC, "not the next in line [ vs", psi.StringEx(), "]")
return
}
if !p.ClusterStarted() {
nlog.Warningf("%s: not ready yet to be elected - starting up", p)
nlog.Warningln(tag, pnameC, "not ready yet to be elected - starting up")
w.WriteHeader(http.StatusServiceUnavailable)
return
}
Expand All @@ -177,20 +183,25 @@ func (p *proxy) httpelect(w http.ResponseWriter, r *http.Request) {
// include resulting Smap in the response
vr.Smap = p.owner.smap.get()

// xaction (minimal and, unlike target xactions, not visible via API (TODO))
go p.startElection(vr)
// run xaction
// minimal and, unlike all the rest (target) xactions, not visible via API (TODO)
go p.startElection(vr, false /*cloned*/)
}

// Election Functions

func (p *proxy) startElection(vr *VoteRecord) {
if p.owner.smap.get().isPrimary(p.si) {
nlog.Infof("%s: already in primary state", p)
func (p *proxy) startElection(vr *VoteRecord, cloned bool) {
smap := p.owner.smap.get()
if smap.isPrimary(p.si) {
nlog.Infoln(p.String(), "already in primary state [", cloned, "]")
return
}
if !cloned {
vr.Smap = smap
}
rns := xreg.RenewElection()
if rns.Err != nil {
nlog.Errorf("%s: %+v %v", p, vr, rns.Err)
nlog.Errorf("FATAL: %s failed to start election: %+v %v", p, vr, rns.Err) // (unlikely)
debug.AssertNoErr(rns.Err)
return
}
Expand All @@ -200,30 +211,40 @@ func (p *proxy) startElection(vr *VoteRecord) {
xctn := rns.Entry.Get()
xele, ok := xctn.(*xs.Election)
debug.Assert(ok)

nlog.Infoln(xele.Name())

p.elect(vr, xele)
xele.Finish()
}

const retryCurPrimary = 2

func (p *proxy) elect(vr *VoteRecord, xele *xs.Election) {
var (
smap *smapX
err error
curPrimary = vr.Smap.Primary
curName = curPrimary.StringEx()
pname = p.String()
pnameC = pname + ":"
config = cmn.GCO.Get()
timeout = config.Timeout.CplaneOperation.D() / 2
)
// 1. ping the current primary (not using apc.QparamAskPrimary as it might be transitioning)
for i := range 2 {
// 1. ping the current primary (not using `apc.QparamAskPrimary` as the latter might be transitioning)
for i := range retryCurPrimary {
if i > 0 {
runtime.Gosched()
}
smap = p.owner.smap.get()
if smap.version() > vr.Smap.version() {
nlog.Warningf("%s: %s updated from %s, moving back to idle", p, smap, vr.Smap)
nlog.Warningln(pnameC, "[", smap.StringEx(), "version greater than Vote Req's", vr.Smap.StringEx(),
"] - moving back to idle")
return
}
_, _, err = p.reqHealth(curPrimary, timeout, nil /*ask primary*/, smap)

// retry via pub-addr if different (compare with palive.retry)
_, _, err = p.reqHealth(curPrimary, timeout, nil /*ask primary*/, smap, 1 /*retry*/)
if err == nil {
break
}
Expand All @@ -232,41 +253,42 @@ func (p *proxy) elect(vr *VoteRecord, xele *xs.Election) {
if err == nil {
// move back to idle
query := url.Values{apc.QparamAskPrimary: []string{"true"}}
_, _, err = p.reqHealth(curPrimary, timeout, query /*ask primary*/, smap)

// retry via pub-addr if different (compare with palive.retry)
_, _, err = p.reqHealth(curPrimary, timeout, query /*ask primary*/, smap, 1 /*retry*/)

if err == nil {
nlog.Infof("%s: current primary %s is up, moving back to idle", p, curPrimary)
nlog.Infoln(pnameC, "the current primary", curName, "is up, moving back to idle")
} else {
errV := fmt.Errorf("%s: current primary(?) %s responds but does not consider itself primary",
p, curPrimary.StringEx())
errV := fmt.Errorf("%s: current primary(?) %s responds but does not consider itself primary", pname, curName)
xele.AddErr(errV, 0)
}
return
}
nlog.Infof("%s: primary %s is confirmed down: [%v] - moving to election state phase 1 (prepare)",
p, curPrimary.StringEx(), err)

nlog.Warningln(pnameC, "primary", curName, "is confirmed down: [", err, "] moving to election state phase 1 (prepare)")

// 2. election phase 1
elected, votingErrors := p.electPhase1(vr)
if !elected {
errV := fmt.Errorf("%s: election phase 1 (prepare) failed: primary still %s w/ status unknown",
p, curPrimary.StringEx())
errV := fmt.Errorf("%s: election phase 1 (prepare) failed: primary still %s w/ status unknown", pname, curName)
xele.AddErr(errV, 0)

smap = p.owner.smap.get()
if smap.version() > vr.Smap.version() {
nlog.Warningf("%s: %s updated from %s, moving back to idle", p, smap, vr.Smap)
nlog.Warningln(pnameC, "[", smap.StringEx(), "version greater than vote-record's", vr.Smap.StringEx(),
"] - moving back to idle")
return
}

// best-effort
svm, _, slowp := p.bcastMaxVer(smap, nil, nil)
if svm.Smap != nil && !slowp {
if svm.Smap.UUID == smap.UUID && svm.Smap.version() > smap.version() && svm.Smap.validate() == nil {
nlog.Warningf("%s: upgrading local %s to cluster max-ver %s",
p, smap.StringEx(), svm.Smap.StringEx())
nlog.Warningln(pnameC, "upgrading local", smap.StringEx(), "to cluster max-ver", svm.Smap.StringEx())

if svm.Smap.Primary.ID() != smap.Primary.ID() {
nlog.Warningf("%s: new primary %s is already elected ...",
p, svm.Smap.Primary.StringEx())
nlog.Warningln(pnameC, "new primary", svm.Smap.Primary.StringEx(), "is already elected ...")
}
errV := p.owner.smap.synchronize(p.si, svm.Smap, nil /*ms payload*/, p.smapUpdatedCB)
if errV != nil {
Expand All @@ -279,17 +301,17 @@ func (p *proxy) elect(vr *VoteRecord, xele *xs.Election) {
}

// 3. election phase 2
nlog.Infoln(p.String()+":", "moving to election state phase 2 (commit)")
nlog.Infoln(pnameC, "moving to election state phase 2 (commit)")
confirmationErrors := p.electPhase2(vr)
for sid := range confirmationErrors {
if !votingErrors.Contains(sid) {
errV := fmt.Errorf("%s: error confirming the election: %s was healthy when voting", p, sid)
errV := fmt.Errorf("%s: error confirming the election: %s was healthy when voting", pname, sid)
xele.AddErr(errV, 0)
}
}

// 4. become!
nlog.Infof("%s: becoming primary", p)
nlog.Infoln(pnameC, "becoming primary")
p.becomeNewPrimary(vr.Primary /*proxyIDToRemove*/)
}

Expand Down Expand Up @@ -461,7 +483,7 @@ func (h *htrun) onPrimaryDown(self *proxy, callerID string) {
Initiator: h.si.ID(),
}
vr.Smap = clone
self.startElection(vr)
self.startElection(vr, true /*cloned*/)
return
}

Expand Down

0 comments on commit 9c961d5

Please sign in to comment.