From b4c89eada97f846a1d2ecbe21a365a778de09db2 Mon Sep 17 00:00:00 2001 From: Julien Letessier Date: Thu, 15 Jan 2026 17:00:02 +0100 Subject: [PATCH] Guard stats/latency access in info command --- src/ConnectConnectionPool.cpp | 69 +++++++++++++++- src/ConnectConnectionPool.h | 12 +++ src/Handler.cpp | 148 +++++++++++++++++++++++++--------- src/Handler.h | 19 ++++- test/info_concurrent.py | 62 ++++++++++++++ test/run.sh | 1 + 6 files changed, 266 insertions(+), 45 deletions(-) create mode 100644 test/info_concurrent.py diff --git a/src/ConnectConnectionPool.cpp b/src/ConnectConnectionPool.cpp index e4ea3e1..cedac49 100644 --- a/src/ConnectConnectionPool.cpp +++ b/src/ConnectConnectionPool.cpp @@ -22,6 +22,60 @@ ConnectConnectionPool::~ConnectConnectionPool() { } +void ConnectConnectionPool::incrRequests() +{ + std::lock_guard lck(mStatsMtx); + ++mStats.requests; +} + +void ConnectConnectionPool::incrResponses() +{ + std::lock_guard lck(mStatsMtx); + ++mStats.responses; +} + +void ConnectConnectionPool::addSendBytes(long num) +{ + std::lock_guard lck(mStatsMtx); + mStats.sendBytes += num; +} + +void ConnectConnectionPool::addRecvBytes(long num) +{ + std::lock_guard lck(mStatsMtx); + mStats.recvBytes += num; +} + +void ConnectConnectionPool::addLatency(size_t i, long elapsed) +{ + std::lock_guard lck(mStatsMtx); + mLatencyMonitors[i].add(elapsed); +} + +void ConnectConnectionPool::addLatency(size_t i, long elapsed, int idx) +{ + std::lock_guard lck(mStatsMtx); + mLatencyMonitors[i].add(elapsed, idx); +} + +ServerStats ConnectConnectionPool::snapshotStats() const +{ + std::lock_guard lck(mStatsMtx); + return mStats; +} + +LatencyMonitor ConnectConnectionPool::snapshotLatency(size_t i) const +{ + std::lock_guard lck(mStatsMtx); + return mLatencyMonitors[i]; +} + +size_t ConnectConnectionPool::latencyMonitorCount() const +{ + std::lock_guard lck(mStatsMtx); + return mLatencyMonitors.size(); +} + ConnectConnection* ConnectConnectionPool::getShareConnection(int db) { FuncCallTimer(); @@ -35,7 +89,10 @@ ConnectConnection* ConnectConnectionPool::getShareConnection(int db) if (!c) { c = ConnectConnectionAlloc::create(mServ, true); c->setDb(db); - ++mStats.connections; + { + std::lock_guard lck(mStatsMtx); + ++mStats.connections; + } mShareConns[db] = c; needInit = true; logNotice("h %d create server connection %s %d", @@ -76,7 +133,10 @@ ConnectConnection* ConnectConnectionPool::getPrivateConnection(int db) } c = ConnectConnectionAlloc::create(mServ, false); c->setDb(db); - ++mStats.connections; + { + std::lock_guard lck(mStatsMtx); + ++mStats.connections; + } needInit = true; logNotice("h %d create private server connection %s %d", mHandler->id(), c->peer(), c->fd()); @@ -133,7 +193,10 @@ bool ConnectConnectionPool::init(ConnectConnection* c) mHandler->id(), c->peer(), c->fd()); return false; } - ++mStats.connect; + { + std::lock_guard lck(mStatsMtx); + ++mStats.connect; + } if (!c->connect()) { logWarn("h %d s %s %d connect fail", mHandler->id(), c->peer(), c->fd()); diff --git a/src/ConnectConnectionPool.h b/src/ConnectConnectionPool.h index 022fe28..caa18f7 100644 --- a/src/ConnectConnectionPool.h +++ b/src/ConnectConnectionPool.h @@ -7,6 +7,7 @@ #ifndef _PREDIXY_CONNECT_CONNECTION_POOL_H_ #define _PREDIXY_CONNECT_CONNECTION_POOL_H_ +#include #include #include "ConnectConnection.h" #include "Server.h" @@ -41,6 +42,15 @@ public: { return --mPendRequests; } + void incrRequests(); + void incrResponses(); + void addSendBytes(long num); + void addRecvBytes(long num); + void addLatency(size_t i, long elapsed); + void addLatency(size_t i, long elapsed, int idx); + ServerStats snapshotStats() const; + LatencyMonitor snapshotLatency(size_t i) const; + size_t latencyMonitorCount() const; ServerStats& stats() { return mStats; @@ -59,6 +69,7 @@ public: } void resetStats() { + std::lock_guard lck(mStatsMtx); mStats.reset(); for (auto& m : mLatencyMonitors) { m.reset(); @@ -74,6 +85,7 @@ private: std::vector mPrivateConns; ServerStats mStats; std::vector mLatencyMonitors; + mutable std::mutex mStatsMtx; }; diff --git a/src/Handler.cpp b/src/Handler.cpp index f12dff4..f75105d 100644 --- a/src/Handler.cpp +++ b/src/Handler.cpp @@ -81,6 +81,24 @@ void Handler::stop() mStop.store(true); } +HandlerStats Handler::snapshotStats() const +{ + std::lock_guard lck(mStatsMtx); + return mStats; +} + +LatencyMonitor Handler::snapshotLatency(size_t i) const +{ + std::lock_guard lck(mStatsMtx); + return mLatencyMonitors[i]; +} + +size_t Handler::latencyMonitorCount() const +{ + std::lock_guard lck(mStatsMtx); + return mLatencyMonitors.size(); +} + void Handler::refreshServerPool() { FuncCallTimer(); @@ -221,7 +239,10 @@ void Handler::postAcceptConnectionEvent() mAcceptConns.remove(c); c->unref(); c->close(); + { + std::lock_guard lck(mStatsMtx); --mStats.clientConnections; + } } mPostAcceptConns.pop_front(); } @@ -296,7 +317,10 @@ void Handler::handleListenEvent(ListenSocket* s, int evts) socklen_t len = sizeof(addr); int fd = s->accept((sockaddr*)&addr, &len); if (fd >= 0) { + { + std::lock_guard lck(mStatsMtx); ++mStats.accept; + } addAcceptSocket(fd, (sockaddr*)&addr, len); } else { break; @@ -355,7 +379,10 @@ void Handler::addAcceptSocket(int fd, sockaddr* addr, socklen_t len) if (mEventLoop->addSocket(c)) { c->setLastActiveTime(Util::elapsedUSec()); mAcceptConns.push_back(c); + { + std::lock_guard lck(mStatsMtx); ++mStats.clientConnections; + } } else { fail = true; } @@ -381,7 +408,10 @@ void Handler::handleAcceptConnectionEvent(AcceptConnection* c, int evts) if (c->lastActiveTime() < 0) { c->setLastActiveTime(Util::elapsedUSec()); mAcceptConns.push_back(c); + { + std::lock_guard lck(mStatsMtx); ++mStats.clientConnections; + } } if (evts & Multiplexor::ErrorEvent) { c->setStatus(AcceptConnection::EventError); @@ -457,7 +487,7 @@ ConnectConnection* Handler::getConnectConnection(Request* req, Server* serv) p = new ConnectConnectionPool(this, serv, serv->pool()->dbNum()); mConnPool[sid] = p; } - p->stats().requests++; + p->incrRequests(); int db = 0; auto c = req->connection(); if (c) { @@ -505,7 +535,10 @@ void Handler::handleRequest(Request* req) if (c && (c->isBlockRequest() || c->isCloseASAP())) { return; } + { + std::lock_guard lck(mStatsMtx); ++mStats.requests; + } req->setDelivered(); SegmentStr key(req->key()); logDebug("h %d c %s %d handle req %ld %s %.*s", @@ -793,8 +826,11 @@ void Handler::handleRequest(Request* req, ConnectConnection* s) } s->send(this, req); addPostEvent(s, Multiplexor::WriteEvent); - mStats.requests++; - mConnPool[s->server()->id()]->stats().requests++; + { + std::lock_guard lck(mStatsMtx); + ++mStats.requests; + } + mConnPool[s->server()->id()]->incrRequests(); if (s->isShared()) { mConnPool[s->server()->id()]->incrPendRequests(); } @@ -837,9 +873,12 @@ void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res) id(), (s ? s->peer() : "None"), (s ? s->fd() : -1), req->id(), req->cmd(), key.length(), key.data(), res->id(), res->typeStr()); - mStats.responses++; + { + std::lock_guard lck(mStatsMtx); + ++mStats.responses; + } if (s) { - mConnPool[s->server()->id()]->stats().responses++; + mConnPool[s->server()->id()]->incrResponses(); if (s->isShared()) { mConnPool[s->server()->id()]->decrPendRequests(); } @@ -899,16 +938,24 @@ void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res) addPostEvent(c, Multiplexor::WriteEvent); } long elapsed = Util::elapsedUSec() - req->createTime(); - if (auto cp = s ? mConnPool[s->server()->id()] : nullptr) { - for (auto i : mProxy->latencyMonitorSet().cmdIndex(req->type())) { - int idx = mLatencyMonitors[i].add(elapsed); - if (idx >= 0) { - cp->latencyMonitors()[i].add(elapsed, idx); + int cmdType = static_cast(req->type()); + if (cmdType >= 0 && cmdType < Command::AvailableCommands) { + if (auto cp = s ? mConnPool[s->server()->id()] : nullptr) { + for (auto i : mProxy->latencyMonitorSet().cmdIndex(req->type())) { + int idx = -1; + { + std::lock_guard lck(mStatsMtx); + idx = mLatencyMonitors[i].add(elapsed); + } + if (idx >= 0) { + cp->addLatency(i, elapsed, idx); + } + } + } else { + for (auto i : mProxy->latencyMonitorSet().cmdIndex(req->type())) { + std::lock_guard lck(mStatsMtx); + mLatencyMonitors[i].add(elapsed); } - } - } else { - for (auto i : mProxy->latencyMonitorSet().cmdIndex(req->type())) { - mLatencyMonitors[i].add(elapsed); } } logInfo("RESP h %d c %s %d req %ld %s %.*s s %s %d res %ld %s t %ld", @@ -1032,12 +1079,12 @@ void Handler::infoRequest(Request* req, const String& key) } if (Scope(all, empty, "Stats")) { - HandlerStats st(mStats); + HandlerStats st = snapshotStats(); for (auto h : mProxy->handlers()) { if (h == this) { continue; } - st += h->mStats; + st += h->snapshotStats(); } buf = buf->fappend("Accept:%ld\n", st.accept); buf = buf->fappend("ClientConnections:%ld\n", st.clientConnections); @@ -1057,7 +1104,7 @@ void Handler::infoRequest(Request* req, const String& key) ServerStats st; for (auto h : mProxy->handlers()) { if (auto cp = h->getConnectConnectionPool(serv->id())) { - st += cp->stats(); + st += cp->snapshotStats(); } } buf = buf->fappend("Server:%s\n", serv->addr().data()); @@ -1079,15 +1126,21 @@ void Handler::infoRequest(Request* req, const String& key) if (Scope(all, empty, "LatencyMonitor")) { LatencyMonitor lm; - for (size_t i = 0; i < mLatencyMonitors.size(); ++i) { - lm = mLatencyMonitors[i]; + size_t count = latencyMonitorCount(); + for (size_t i = 0; i < count; ++i) { + lm = snapshotLatency(i); for (auto h : mProxy->handlers()) { if (h == this) { continue; } - lm += h->mLatencyMonitors[i]; + if (i < h->latencyMonitorCount()) { + lm += h->snapshotLatency(i); + } } - buf = buf->fappend("LatencyMonitorName:%s\n", lm.name().data()); + const char* lmName = lm.name().data(); + int lmNameLen = lm.name().length(); + buf = buf->fappend("LatencyMonitorName:%.*s\n", + lmNameLen, lmName ? lmName : ""); buf = lm.output(buf); buf = buf->fappend("\n"); } @@ -1123,14 +1176,19 @@ void Handler::infoLatencyRequest(Request* req) } BufferPtr buf = body.fset(nullptr, "# LatencyMonitor\n"); - LatencyMonitor lm = mLatencyMonitors[i]; + LatencyMonitor lm = snapshotLatency(i); for (auto h : mProxy->handlers()) { if (h == this) { continue; } - lm += h->mLatencyMonitors[i]; + if (i < h->latencyMonitorCount()) { + lm += h->snapshotLatency(i); + } } - buf = buf->fappend("LatencyMonitorName:%s\n", lm.name().data()); + const char* lmName = lm.name().data(); + int lmNameLen = lm.name().length(); + buf = buf->fappend("LatencyMonitorName:%.*s\n", + lmNameLen, lmName ? lmName : ""); buf = lm.output(buf); buf = buf->fappend("\n"); @@ -1138,15 +1196,19 @@ void Handler::infoLatencyRequest(Request* req) auto sp = mProxy->serverPool(); int servCursor = 0; while (Server* serv = sp->iter(servCursor)) { - lm = mLatencyMonitors[i]; + lm = snapshotLatency(i); lm.reset(); for (auto h : mProxy->handlers()) { if (auto cp = h->getConnectConnectionPool(serv->id())) { - lm += cp->latencyMonitors()[i]; + if (i < cp->latencyMonitorCount()) { + lm += cp->snapshotLatency(i); + } } } - buf = buf->fappend("ServerLatencyMonitorName:%s %s\n", - serv->addr().data(), lm.name().data()); + const char* lmName = lm.name().data(); + int lmNameLen = lm.name().length(); + buf = buf->fappend("ServerLatencyMonitorName:%s %.*s\n", + serv->addr().data(), lmNameLen, lmName ? lmName : ""); buf = lm.output(buf); buf = buf->fappend("\n"); } @@ -1197,11 +1259,13 @@ void Handler::infoServerLatencyRequest(Request* req) handleResponse(nullptr, req, res); return; } - LatencyMonitor lm = mLatencyMonitors[i]; + LatencyMonitor lm = snapshotLatency(i); lm.reset(); for (auto h : mProxy->handlers()) { if (auto cp = h->getConnectConnectionPool(serv->id())) { - lm += cp->latencyMonitors()[i]; + if (i < cp->latencyMonitorCount()) { + lm += cp->snapshotLatency(i); + } } } buf = buf->fappend("ServerLatencyMonitorName:%s %s\n", @@ -1209,16 +1273,21 @@ void Handler::infoServerLatencyRequest(Request* req) buf = lm.output(buf); buf = buf->fappend("\n"); } else { - for (size_t i = 0; i < mLatencyMonitors.size(); ++i) { - LatencyMonitor lm = mLatencyMonitors[i]; + size_t count = latencyMonitorCount(); + for (size_t i = 0; i < count; ++i) { + LatencyMonitor lm = snapshotLatency(i); lm.reset(); for (auto h : mProxy->handlers()) { if (auto cp = h->getConnectConnectionPool(serv->id())) { - lm += cp->latencyMonitors()[i]; + if (i < cp->latencyMonitorCount()) { + lm += cp->snapshotLatency(i); + } } } - buf = buf->fappend("ServerLatencyMonitorName:%s %s\n", - serv->addr().data(), lm.name().data()); + const char* lmName = lm.name().data(); + int lmNameLen = lm.name().length(); + buf = buf->fappend("ServerLatencyMonitorName:%s %.*s\n", + serv->addr().data(), lmNameLen, lmName ? lmName : ""); buf = lm.output(buf); buf = buf->fappend("\n"); } @@ -1236,9 +1305,12 @@ void Handler::infoServerLatencyRequest(Request* req) void Handler::resetStats() { - mStats.reset(); - for (auto& m : mLatencyMonitors) { - m.reset(); + { + std::lock_guard lck(mStatsMtx); + mStats.reset(); + for (auto& m : mLatencyMonitors) { + m.reset(); + } } for (auto cp : mConnPool) { if (cp) { diff --git a/src/Handler.h b/src/Handler.h index 208d39b..8b13e62 100644 --- a/src/Handler.h +++ b/src/Handler.h @@ -7,6 +7,7 @@ #ifndef _PREDIXY_HANDLER_H_ #define _PREDIXY_HANDLER_H_ +#include #include #include "Predixy.h" #include "Multiplexor.h" @@ -52,6 +53,9 @@ public: { return mLatencyMonitors; } + HandlerStats snapshotStats() const; + LatencyMonitor snapshotLatency(size_t i) const; + size_t latencyMonitorCount() const; ConnectConnectionPool* getConnectConnectionPool(int id) const { return id < (int)mConnPool.size() ? mConnPool[id] : nullptr; @@ -65,13 +69,19 @@ public: } void addServerReadStats(Server* serv, int num) { - mStats.recvServerBytes += num; - mConnPool[serv->id()]->stats().recvBytes += num; + { + std::lock_guard lck(mStatsMtx); + mStats.recvServerBytes += num; + } + mConnPool[serv->id()]->addRecvBytes(num); } void addServerWriteStats(Server* serv, int num) { - mStats.sendServerBytes += num; - mConnPool[serv->id()]->stats().sendBytes += num; + { + std::lock_guard lck(mStatsMtx); + mStats.sendServerBytes += num; + } + mConnPool[serv->id()]->addSendBytes(num); } IDUnique& idUnique() { @@ -126,6 +136,7 @@ private: long mStatsVer; HandlerStats mStats; std::vector mLatencyMonitors; + mutable std::mutex mStatsMtx; IDUnique mIDUnique; unsigned int mRandSeed; }; diff --git a/test/info_concurrent.py b/test/info_concurrent.py new file mode 100644 index 0000000..2d84122 --- /dev/null +++ b/test/info_concurrent.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +# +# Exercise INFO while other commands are running to catch race regressions. +# + +import threading +import time +import redis +from test_util import parse_args, get_host_port, exit_with_result + + +def run_load(client, stop_event, errors): + i = 0 + while not stop_event.is_set(): + try: + key = "info_concurrent:%d" % i + client.set(key, "v") + client.get(key) + i += 1 + except Exception as exc: + errors.append(("load", str(exc))) + return + + +def run_info(client, stop_event, errors): + while not stop_event.is_set(): + try: + client.info() + except Exception as exc: + errors.append(("info", str(exc))) + return + + +def run_test(host, port): + client = redis.StrictRedis(host=host, port=port) + stop_event = threading.Event() + errors = [] + + threads = [ + threading.Thread(target=run_load, args=(client, stop_event, errors)), + threading.Thread(target=run_load, args=(client, stop_event, errors)), + threading.Thread(target=run_info, args=(client, stop_event, errors)), + ] + for t in threads: + t.start() + + time.sleep(1.5) + stop_event.set() + for t in threads: + t.join() + + if errors: + print("FAIL: concurrent INFO errors", errors[0]) + return False + return True + + +if __name__ == "__main__": + args = parse_args("INFO concurrent test") + host, port = get_host_port(args) + success = run_test(host, port) + exit_with_result(success, "info concurrent", "info concurrent") diff --git a/test/run.sh b/test/run.sh index 6d918c5..bf458be 100755 --- a/test/run.sh +++ b/test/run.sh @@ -152,6 +152,7 @@ TESTS=( "test/string_to_int.py" "test/handler_stop_atomic.py" "test/logger_stop_atomic.py" + "test/info_concurrent.py" "test/pubsub_long_name.py" "test/pubsub_large_message.py" "test/transaction_forbid.py"