From 9c961d5911410f902c970112355b0a27439161a0 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Fri, 20 Sep 2024 12:21:22 -0400 Subject: [PATCH] core: amend primary election (part three) * node => current-primary retry via pub-addr, if differnt * with refactoring; logs * part three, prev. commit: e3503fc67c38 Signed-off-by: Alex Aizman --- ais/htrun.go | 8 +++-- ais/kalive.go | 5 ++- ais/vote.go | 90 ++++++++++++++++++++++++++++++++------------------- 3 files changed, 63 insertions(+), 40 deletions(-) diff --git a/ais/htrun.go b/ais/htrun.go index f23333f476..dd87a9d92d 100644 --- a/ais/htrun.go +++ b/ais/htrun.go @@ -1429,9 +1429,11 @@ func (h *htrun) reqHealth(si *meta.Snode, tout time.Duration, q url.Values, smap freeCR(res) if err != nil && len(retry) > 0 { - // [NOTE] - // about to remove the node from the cluster map - not checking IsErrDNSLookup and similar - // (ie., not trying to narrow down - compare w/ slow-keepalive) + // [NOTE] retrying when: + // - about to remove the node from the cluster map, or + // - about to elect new primary + // not checking `IsErrDNSLookup` and similar - ie., not trying to narrow down + // (compare w/ slow-keepalive) if si.PubNet.Hostname != si.ControlNet.Hostname { cargs.req.Base = si.URL(cmn.NetPublic) nlog.Warningln("retrying via pub addr:", cargs.req.Base) diff --git a/ais/kalive.go b/ais/kalive.go index 37ae7abd2d..f0e3dd2443 100644 --- a/ais/kalive.go +++ b/ais/kalive.go @@ -423,9 +423,8 @@ func (pkr *palive) retry(si *meta.Snode, ticker *time.Ticker, timeout time.Durat started = mono.NanoTime() smap = pkr.p.owner.smap.get() ) - // retry upon (intermittent) network failure - // (compare with slowKalive) - _, status, err := pkr.p.reqHealth(si, timeout, nil, smap, 1 /*retry via pub-addr*/) + // retry via pub-addr if different (compare with slowKalive) + _, status, err := pkr.p.reqHealth(si, timeout, nil, smap, 1 /*retry*/) if err == nil { now := mono.NanoTime() pkr.statsT.Add(stats.KeepAliveLatency, now-started) diff --git a/ais/vote.go b/ais/vote.go index c762dfdf5f..f08f74a370 100644 --- a/ais/vote.go +++ b/ais/vote.go @@ -114,6 +114,11 @@ func (p *proxy) voteHandler(w http.ResponseWriter, r *http.Request) { // PUT /v1/vote/init (via sendElectionRequest) func (p *proxy) httpelect(w http.ResponseWriter, r *http.Request) { + const tag = "[httpelect]" + var ( + pname = p.String() + pnameC = pname + ":" + ) if _, err := p.parseURL(w, r, apc.URLPathVoteInit.L, 0, false); err != nil { return } @@ -123,15 +128,16 @@ func (p *proxy) httpelect(w http.ResponseWriter, r *http.Request) { } newSmap := msg.Request.Smap if err := newSmap.validate(); err != nil { - p.writeErrf(w, r, "%s: invalid %s in the Vote Request, err: %v", p.si, newSmap, err) + p.writeErrf(w, r, "%s %s: invalid %s in the Vote Request, err: %v", tag, pname, newSmap, err) return } smap := p.owner.smap.get() caller := r.Header.Get(apc.HdrCallerName) - nlog.Infof("[vote] receive %s from %q (local: %s)", newSmap.StringEx(), caller, smap.StringEx()) + + nlog.Infoln(tag, pnameC, "receive", newSmap.StringEx(), "from", caller, "local [", smap.StringEx(), "]") if !newSmap.isPresent(p.si) { - p.writeErrf(w, r, "%s: not present in the Vote Request, %s", p.si, newSmap) + p.writeErrf(w, r, "%s %s: not present in the Vote Request, %s", tag, pname, newSmap) return } debug.Assert(!newSmap.isPrimary(p.si)) @@ -145,13 +151,13 @@ func (p *proxy) httpelect(w http.ResponseWriter, r *http.Request) { } } if err != nil { - p.writeErr(w, r, cmn.NewErrFailedTo(p, "synchronize", newSmap, err)) + p.writeErr(w, r, cmn.NewErrFailedTo(p, "synchronize", newSmap.StringEx(), err)) return } } smap = p.owner.smap.get() - psi, err := smap.HrwProxy(smap.Primary.ID()) + psi, err := smap.HrwProxy(smap.Primary.ID() /*skip*/) if err != nil { p.writeErr(w, r, err) return @@ -159,11 +165,11 @@ func (p *proxy) httpelect(w http.ResponseWriter, r *http.Request) { // proceed with election iff: if psi.ID() != p.SID() { - nlog.Warningf("%s: not next in line %s", p, psi) + nlog.Warningln(tag, pnameC, "not the next in line [ vs", psi.StringEx(), "]") return } if !p.ClusterStarted() { - nlog.Warningf("%s: not ready yet to be elected - starting up", p) + nlog.Warningln(tag, pnameC, "not ready yet to be elected - starting up") w.WriteHeader(http.StatusServiceUnavailable) return } @@ -177,20 +183,25 @@ func (p *proxy) httpelect(w http.ResponseWriter, r *http.Request) { // include resulting Smap in the response vr.Smap = p.owner.smap.get() - // xaction (minimal and, unlike target xactions, not visible via API (TODO)) - go p.startElection(vr) + // run xaction + // minimal and, unlike all the rest (target) xactions, not visible via API (TODO) + go p.startElection(vr, false /*cloned*/) } // Election Functions -func (p *proxy) startElection(vr *VoteRecord) { - if p.owner.smap.get().isPrimary(p.si) { - nlog.Infof("%s: already in primary state", p) +func (p *proxy) startElection(vr *VoteRecord, cloned bool) { + smap := p.owner.smap.get() + if smap.isPrimary(p.si) { + nlog.Infoln(p.String(), "already in primary state [", cloned, "]") return } + if !cloned { + vr.Smap = smap + } rns := xreg.RenewElection() if rns.Err != nil { - nlog.Errorf("%s: %+v %v", p, vr, rns.Err) + nlog.Errorf("FATAL: %s failed to start election: %+v %v", p, vr, rns.Err) // (unlikely) debug.AssertNoErr(rns.Err) return } @@ -200,30 +211,40 @@ func (p *proxy) startElection(vr *VoteRecord) { xctn := rns.Entry.Get() xele, ok := xctn.(*xs.Election) debug.Assert(ok) + nlog.Infoln(xele.Name()) + p.elect(vr, xele) xele.Finish() } +const retryCurPrimary = 2 + func (p *proxy) elect(vr *VoteRecord, xele *xs.Election) { var ( smap *smapX err error curPrimary = vr.Smap.Primary + curName = curPrimary.StringEx() + pname = p.String() + pnameC = pname + ":" config = cmn.GCO.Get() timeout = config.Timeout.CplaneOperation.D() / 2 ) - // 1. ping the current primary (not using apc.QparamAskPrimary as it might be transitioning) - for i := range 2 { + // 1. ping the current primary (not using `apc.QparamAskPrimary` as the latter might be transitioning) + for i := range retryCurPrimary { if i > 0 { runtime.Gosched() } smap = p.owner.smap.get() if smap.version() > vr.Smap.version() { - nlog.Warningf("%s: %s updated from %s, moving back to idle", p, smap, vr.Smap) + nlog.Warningln(pnameC, "[", smap.StringEx(), "version greater than Vote Req's", vr.Smap.StringEx(), + "] - moving back to idle") return } - _, _, err = p.reqHealth(curPrimary, timeout, nil /*ask primary*/, smap) + + // retry via pub-addr if different (compare with palive.retry) + _, _, err = p.reqHealth(curPrimary, timeout, nil /*ask primary*/, smap, 1 /*retry*/) if err == nil { break } @@ -232,29 +253,31 @@ func (p *proxy) elect(vr *VoteRecord, xele *xs.Election) { if err == nil { // move back to idle query := url.Values{apc.QparamAskPrimary: []string{"true"}} - _, _, err = p.reqHealth(curPrimary, timeout, query /*ask primary*/, smap) + + // retry via pub-addr if different (compare with palive.retry) + _, _, err = p.reqHealth(curPrimary, timeout, query /*ask primary*/, smap, 1 /*retry*/) + if err == nil { - nlog.Infof("%s: current primary %s is up, moving back to idle", p, curPrimary) + nlog.Infoln(pnameC, "the current primary", curName, "is up, moving back to idle") } else { - errV := fmt.Errorf("%s: current primary(?) %s responds but does not consider itself primary", - p, curPrimary.StringEx()) + errV := fmt.Errorf("%s: current primary(?) %s responds but does not consider itself primary", pname, curName) xele.AddErr(errV, 0) } return } - nlog.Infof("%s: primary %s is confirmed down: [%v] - moving to election state phase 1 (prepare)", - p, curPrimary.StringEx(), err) + + nlog.Warningln(pnameC, "primary", curName, "is confirmed down: [", err, "] moving to election state phase 1 (prepare)") // 2. election phase 1 elected, votingErrors := p.electPhase1(vr) if !elected { - errV := fmt.Errorf("%s: election phase 1 (prepare) failed: primary still %s w/ status unknown", - p, curPrimary.StringEx()) + errV := fmt.Errorf("%s: election phase 1 (prepare) failed: primary still %s w/ status unknown", pname, curName) xele.AddErr(errV, 0) smap = p.owner.smap.get() if smap.version() > vr.Smap.version() { - nlog.Warningf("%s: %s updated from %s, moving back to idle", p, smap, vr.Smap) + nlog.Warningln(pnameC, "[", smap.StringEx(), "version greater than vote-record's", vr.Smap.StringEx(), + "] - moving back to idle") return } @@ -262,11 +285,10 @@ func (p *proxy) elect(vr *VoteRecord, xele *xs.Election) { svm, _, slowp := p.bcastMaxVer(smap, nil, nil) if svm.Smap != nil && !slowp { if svm.Smap.UUID == smap.UUID && svm.Smap.version() > smap.version() && svm.Smap.validate() == nil { - nlog.Warningf("%s: upgrading local %s to cluster max-ver %s", - p, smap.StringEx(), svm.Smap.StringEx()) + nlog.Warningln(pnameC, "upgrading local", smap.StringEx(), "to cluster max-ver", svm.Smap.StringEx()) + if svm.Smap.Primary.ID() != smap.Primary.ID() { - nlog.Warningf("%s: new primary %s is already elected ...", - p, svm.Smap.Primary.StringEx()) + nlog.Warningln(pnameC, "new primary", svm.Smap.Primary.StringEx(), "is already elected ...") } errV := p.owner.smap.synchronize(p.si, svm.Smap, nil /*ms payload*/, p.smapUpdatedCB) if errV != nil { @@ -279,17 +301,17 @@ func (p *proxy) elect(vr *VoteRecord, xele *xs.Election) { } // 3. election phase 2 - nlog.Infoln(p.String()+":", "moving to election state phase 2 (commit)") + nlog.Infoln(pnameC, "moving to election state phase 2 (commit)") confirmationErrors := p.electPhase2(vr) for sid := range confirmationErrors { if !votingErrors.Contains(sid) { - errV := fmt.Errorf("%s: error confirming the election: %s was healthy when voting", p, sid) + errV := fmt.Errorf("%s: error confirming the election: %s was healthy when voting", pname, sid) xele.AddErr(errV, 0) } } // 4. become! - nlog.Infof("%s: becoming primary", p) + nlog.Infoln(pnameC, "becoming primary") p.becomeNewPrimary(vr.Primary /*proxyIDToRemove*/) } @@ -461,7 +483,7 @@ func (h *htrun) onPrimaryDown(self *proxy, callerID string) { Initiator: h.si.ID(), } vr.Smap = clone - self.startElection(vr) + self.startElection(vr, true /*cloned*/) return }