From b8ca3d67deddaec32b5d7abc2886afed0f8ee091 Mon Sep 17 00:00:00 2001 From: Iwan BK Date: Fri, 8 Apr 2022 08:26:09 +0700 Subject: [PATCH] notes on ClusterServerPool.cpp --- src/ClusterServerPool.cpp | 18 +++++++++++++++--- src/Handler.cpp | 6 +++--- src/Server.cpp | 3 +++ 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/ClusterServerPool.cpp b/src/ClusterServerPool.cpp index 35591d2..dcb5648 100644 --- a/src/ClusterServerPool.cpp +++ b/src/ClusterServerPool.cpp @@ -95,7 +95,7 @@ void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request } while (true) { ClusterNodesParser::Status st = p.parse(); - if (st == ClusterNodesParser::Node) { + if (st == ClusterNodesParser::Node) { // it is a node logDebug("redis cluster update parse node %s %s %s %s", p.nodeId().data(), p.addr().data(), @@ -112,8 +112,11 @@ void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request String addr(p.addr()); auto it = mServs.find(addr); Server* serv = it == mServs.end() ? nullptr : it->second; + // the raw address is not in our server list, check further for these: + // - myself + // - if using `@` separator if (!serv) { - if (strstr(p.flags().data(), "myself")) { + if (strstr(p.flags().data(), "myself")) { // the node is where the command get executed serv = s->server(); } else if (const char* t = strchr(p.addr().data(), '@')) { addr = String(p.addr().data(), t - p.addr().data()); @@ -121,11 +124,15 @@ void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request serv = it == mServs.end() ? nullptr : it->second; } } + // still not found in our server list, check further: + // - unknown role || fair || handshake -> ignore + // - num of server already exceed the pool capacity -> ignore it + // if not above, create new server object and add to the mServPool & mServs if (!serv) { const char* flags = p.flags().data(); if (p.role() == Server::Unknown || strstr(flags, "fail") || strstr(flags, "handshake")) { - logWarn("redis cluster nodes get node abnormal %s %s %s %s", + logNotice("redis cluster nodes get node abnormal %s %s %s %s", p.nodeId().data(), p.addr().data(), p.flags().data(), @@ -153,6 +160,9 @@ void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request serv->setRole(p.role()); serv->setName(p.nodeId()); serv->setMasterName(p.master()); + // if it is master + // - add group if needed + // - add the slot if (p.role() == Server::Master) { ServerGroup* g = getGroup(p.nodeId()); if (!g) { @@ -189,6 +199,8 @@ void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request return; } } + // assign to proper group + // there is no health check here for (auto serv : mServPool) { if (serv->updating()) { serv->setUpdating(false); diff --git a/src/Handler.cpp b/src/Handler.cpp index b45127b..6633a02 100644 --- a/src/Handler.cpp +++ b/src/Handler.cpp @@ -259,7 +259,7 @@ void Handler::postConnectConnectionEvent() case Socket::EventError: { Server* serv = s->server(); - serv->incrFail(); + serv->incrFail(); // increase fail counter only for EventError if (serv->fail()) { logNotice("server %s mark failure", serv->addr().data()); } @@ -1368,7 +1368,7 @@ void Handler::innerResponse(ConnectConnection* s, Request* req, Response* res) if (s && res->isPong()) { Server* serv = s->server(); if (serv->fail()) { - serv->setFail(false); + serv->setFail(false); // if server respond to ping, mark fail as false. TODO: add for other case like loading logNotice("h %d s %s %d mark server alive", id(), s->peer(), s->fd()); } @@ -1382,7 +1382,7 @@ void Handler::innerResponse(ConnectConnection* s, Request* req, Response* res) id(), s->peer(), s->fd()); } break; - case Command::Readonly: + case Command::Readonly: // when loading, predixy can detect it here if (!res->isOk()) { s->setStatus(ConnectConnection::LogicError); addPostEvent(s, Multiplexor::ErrorEvent); diff --git a/src/Server.cpp b/src/Server.cpp index 0da0e20..1004ea2 100644 --- a/src/Server.cpp +++ b/src/Server.cpp @@ -47,10 +47,13 @@ bool Server::activate() return AtomicCAS(mNextActivateTime, v, now + mPool->serverRetryTimeout()); } +// increase the fail counter of the server +// and set status if the fail counter already reach the threshold void Server::incrFail() { long cnt = ++mFailureCnt; if (cnt % mPool->serverFailureLimit() == 0) { + logError("[ibk]SET FAIL TRUE"); setFail(true); } }