Guard stats/latency access in info command

This commit is contained in:
Julien Letessier 2026-01-15 17:00:02 +01:00
parent abe75e40a2
commit b4c89eada9
6 changed files with 266 additions and 45 deletions

View File

@ -22,6 +22,60 @@ ConnectConnectionPool::~ConnectConnectionPool()
{
}
void ConnectConnectionPool::incrRequests()
{
std::lock_guard<std::mutex> lck(mStatsMtx);
++mStats.requests;
}
void ConnectConnectionPool::incrResponses()
{
std::lock_guard<std::mutex> lck(mStatsMtx);
++mStats.responses;
}
void ConnectConnectionPool::addSendBytes(long num)
{
std::lock_guard<std::mutex> lck(mStatsMtx);
mStats.sendBytes += num;
}
void ConnectConnectionPool::addRecvBytes(long num)
{
std::lock_guard<std::mutex> lck(mStatsMtx);
mStats.recvBytes += num;
}
void ConnectConnectionPool::addLatency(size_t i, long elapsed)
{
std::lock_guard<std::mutex> lck(mStatsMtx);
mLatencyMonitors[i].add(elapsed);
}
void ConnectConnectionPool::addLatency(size_t i, long elapsed, int idx)
{
std::lock_guard<std::mutex> lck(mStatsMtx);
mLatencyMonitors[i].add(elapsed, idx);
}
ServerStats ConnectConnectionPool::snapshotStats() const
{
std::lock_guard<std::mutex> lck(mStatsMtx);
return mStats;
}
LatencyMonitor ConnectConnectionPool::snapshotLatency(size_t i) const
{
std::lock_guard<std::mutex> lck(mStatsMtx);
return mLatencyMonitors[i];
}
size_t ConnectConnectionPool::latencyMonitorCount() const
{
std::lock_guard<std::mutex> lck(mStatsMtx);
return mLatencyMonitors.size();
}
ConnectConnection* ConnectConnectionPool::getShareConnection(int db)
{
FuncCallTimer();
@ -35,7 +89,10 @@ ConnectConnection* ConnectConnectionPool::getShareConnection(int db)
if (!c) {
c = ConnectConnectionAlloc::create(mServ, true);
c->setDb(db);
{
std::lock_guard<std::mutex> lck(mStatsMtx);
++mStats.connections;
}
mShareConns[db] = c;
needInit = true;
logNotice("h %d create server connection %s %d",
@ -76,7 +133,10 @@ ConnectConnection* ConnectConnectionPool::getPrivateConnection(int db)
}
c = ConnectConnectionAlloc::create(mServ, false);
c->setDb(db);
{
std::lock_guard<std::mutex> lck(mStatsMtx);
++mStats.connections;
}
needInit = true;
logNotice("h %d create private server connection %s %d",
mHandler->id(), c->peer(), c->fd());
@ -133,7 +193,10 @@ bool ConnectConnectionPool::init(ConnectConnection* c)
mHandler->id(), c->peer(), c->fd());
return false;
}
{
std::lock_guard<std::mutex> lck(mStatsMtx);
++mStats.connect;
}
if (!c->connect()) {
logWarn("h %d s %s %d connect fail",
mHandler->id(), c->peer(), c->fd());

View File

@ -7,6 +7,7 @@
#ifndef _PREDIXY_CONNECT_CONNECTION_POOL_H_
#define _PREDIXY_CONNECT_CONNECTION_POOL_H_
#include <mutex>
#include <vector>
#include "ConnectConnection.h"
#include "Server.h"
@ -41,6 +42,15 @@ public:
{
return --mPendRequests;
}
void incrRequests();
void incrResponses();
void addSendBytes(long num);
void addRecvBytes(long num);
void addLatency(size_t i, long elapsed);
void addLatency(size_t i, long elapsed, int idx);
ServerStats snapshotStats() const;
LatencyMonitor snapshotLatency(size_t i) const;
size_t latencyMonitorCount() const;
ServerStats& stats()
{
return mStats;
@ -59,6 +69,7 @@ public:
}
void resetStats()
{
std::lock_guard<std::mutex> lck(mStatsMtx);
mStats.reset();
for (auto& m : mLatencyMonitors) {
m.reset();
@ -74,6 +85,7 @@ private:
std::vector<ConnectConnectionList> mPrivateConns;
ServerStats mStats;
std::vector<LatencyMonitor> mLatencyMonitors;
mutable std::mutex mStatsMtx;
};

View File

@ -81,6 +81,24 @@ void Handler::stop()
mStop.store(true);
}
HandlerStats Handler::snapshotStats() const
{
std::lock_guard<std::mutex> lck(mStatsMtx);
return mStats;
}
LatencyMonitor Handler::snapshotLatency(size_t i) const
{
std::lock_guard<std::mutex> lck(mStatsMtx);
return mLatencyMonitors[i];
}
size_t Handler::latencyMonitorCount() const
{
std::lock_guard<std::mutex> lck(mStatsMtx);
return mLatencyMonitors.size();
}
void Handler::refreshServerPool()
{
FuncCallTimer();
@ -221,8 +239,11 @@ void Handler::postAcceptConnectionEvent()
mAcceptConns.remove(c);
c->unref();
c->close();
{
std::lock_guard<std::mutex> lck(mStatsMtx);
--mStats.clientConnections;
}
}
mPostAcceptConns.pop_front();
}
}
@ -296,7 +317,10 @@ void Handler::handleListenEvent(ListenSocket* s, int evts)
socklen_t len = sizeof(addr);
int fd = s->accept((sockaddr*)&addr, &len);
if (fd >= 0) {
{
std::lock_guard<std::mutex> lck(mStatsMtx);
++mStats.accept;
}
addAcceptSocket(fd, (sockaddr*)&addr, len);
} else {
break;
@ -355,7 +379,10 @@ void Handler::addAcceptSocket(int fd, sockaddr* addr, socklen_t len)
if (mEventLoop->addSocket(c)) {
c->setLastActiveTime(Util::elapsedUSec());
mAcceptConns.push_back(c);
{
std::lock_guard<std::mutex> lck(mStatsMtx);
++mStats.clientConnections;
}
} else {
fail = true;
}
@ -381,8 +408,11 @@ void Handler::handleAcceptConnectionEvent(AcceptConnection* c, int evts)
if (c->lastActiveTime() < 0) {
c->setLastActiveTime(Util::elapsedUSec());
mAcceptConns.push_back(c);
{
std::lock_guard<std::mutex> lck(mStatsMtx);
++mStats.clientConnections;
}
}
if (evts & Multiplexor::ErrorEvent) {
c->setStatus(AcceptConnection::EventError);
}
@ -457,7 +487,7 @@ ConnectConnection* Handler::getConnectConnection(Request* req, Server* serv)
p = new ConnectConnectionPool(this, serv, serv->pool()->dbNum());
mConnPool[sid] = p;
}
p->stats().requests++;
p->incrRequests();
int db = 0;
auto c = req->connection();
if (c) {
@ -505,7 +535,10 @@ void Handler::handleRequest(Request* req)
if (c && (c->isBlockRequest() || c->isCloseASAP())) {
return;
}
{
std::lock_guard<std::mutex> lck(mStatsMtx);
++mStats.requests;
}
req->setDelivered();
SegmentStr<Const::MaxKeyLen> key(req->key());
logDebug("h %d c %s %d handle req %ld %s %.*s",
@ -793,8 +826,11 @@ void Handler::handleRequest(Request* req, ConnectConnection* s)
}
s->send(this, req);
addPostEvent(s, Multiplexor::WriteEvent);
mStats.requests++;
mConnPool[s->server()->id()]->stats().requests++;
{
std::lock_guard<std::mutex> lck(mStatsMtx);
++mStats.requests;
}
mConnPool[s->server()->id()]->incrRequests();
if (s->isShared()) {
mConnPool[s->server()->id()]->incrPendRequests();
}
@ -837,9 +873,12 @@ void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res)
id(), (s ? s->peer() : "None"), (s ? s->fd() : -1),
req->id(), req->cmd(), key.length(), key.data(),
res->id(), res->typeStr());
mStats.responses++;
{
std::lock_guard<std::mutex> lck(mStatsMtx);
++mStats.responses;
}
if (s) {
mConnPool[s->server()->id()]->stats().responses++;
mConnPool[s->server()->id()]->incrResponses();
if (s->isShared()) {
mConnPool[s->server()->id()]->decrPendRequests();
}
@ -899,18 +938,26 @@ void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res)
addPostEvent(c, Multiplexor::WriteEvent);
}
long elapsed = Util::elapsedUSec() - req->createTime();
int cmdType = static_cast<int>(req->type());
if (cmdType >= 0 && cmdType < Command::AvailableCommands) {
if (auto cp = s ? mConnPool[s->server()->id()] : nullptr) {
for (auto i : mProxy->latencyMonitorSet().cmdIndex(req->type())) {
int idx = mLatencyMonitors[i].add(elapsed);
int idx = -1;
{
std::lock_guard<std::mutex> lck(mStatsMtx);
idx = mLatencyMonitors[i].add(elapsed);
}
if (idx >= 0) {
cp->latencyMonitors()[i].add(elapsed, idx);
cp->addLatency(i, elapsed, idx);
}
}
} else {
for (auto i : mProxy->latencyMonitorSet().cmdIndex(req->type())) {
std::lock_guard<std::mutex> lck(mStatsMtx);
mLatencyMonitors[i].add(elapsed);
}
}
}
logInfo("RESP h %d c %s %d req %ld %s %.*s s %s %d res %ld %s t %ld",
id(), c->peer(), c->fd(),
req->id(), req->cmd(), key.length(), key.data(),
@ -1032,12 +1079,12 @@ void Handler::infoRequest(Request* req, const String& key)
}
if (Scope(all, empty, "Stats")) {
HandlerStats st(mStats);
HandlerStats st = snapshotStats();
for (auto h : mProxy->handlers()) {
if (h == this) {
continue;
}
st += h->mStats;
st += h->snapshotStats();
}
buf = buf->fappend("Accept:%ld\n", st.accept);
buf = buf->fappend("ClientConnections:%ld\n", st.clientConnections);
@ -1057,7 +1104,7 @@ void Handler::infoRequest(Request* req, const String& key)
ServerStats st;
for (auto h : mProxy->handlers()) {
if (auto cp = h->getConnectConnectionPool(serv->id())) {
st += cp->stats();
st += cp->snapshotStats();
}
}
buf = buf->fappend("Server:%s\n", serv->addr().data());
@ -1079,15 +1126,21 @@ void Handler::infoRequest(Request* req, const String& key)
if (Scope(all, empty, "LatencyMonitor")) {
LatencyMonitor lm;
for (size_t i = 0; i < mLatencyMonitors.size(); ++i) {
lm = mLatencyMonitors[i];
size_t count = latencyMonitorCount();
for (size_t i = 0; i < count; ++i) {
lm = snapshotLatency(i);
for (auto h : mProxy->handlers()) {
if (h == this) {
continue;
}
lm += h->mLatencyMonitors[i];
if (i < h->latencyMonitorCount()) {
lm += h->snapshotLatency(i);
}
buf = buf->fappend("LatencyMonitorName:%s\n", lm.name().data());
}
const char* lmName = lm.name().data();
int lmNameLen = lm.name().length();
buf = buf->fappend("LatencyMonitorName:%.*s\n",
lmNameLen, lmName ? lmName : "");
buf = lm.output(buf);
buf = buf->fappend("\n");
}
@ -1123,14 +1176,19 @@ void Handler::infoLatencyRequest(Request* req)
}
BufferPtr buf = body.fset(nullptr, "# LatencyMonitor\n");
LatencyMonitor lm = mLatencyMonitors[i];
LatencyMonitor lm = snapshotLatency(i);
for (auto h : mProxy->handlers()) {
if (h == this) {
continue;
}
lm += h->mLatencyMonitors[i];
if (i < h->latencyMonitorCount()) {
lm += h->snapshotLatency(i);
}
buf = buf->fappend("LatencyMonitorName:%s\n", lm.name().data());
}
const char* lmName = lm.name().data();
int lmNameLen = lm.name().length();
buf = buf->fappend("LatencyMonitorName:%.*s\n",
lmNameLen, lmName ? lmName : "");
buf = lm.output(buf);
buf = buf->fappend("\n");
@ -1138,15 +1196,19 @@ void Handler::infoLatencyRequest(Request* req)
auto sp = mProxy->serverPool();
int servCursor = 0;
while (Server* serv = sp->iter(servCursor)) {
lm = mLatencyMonitors[i];
lm = snapshotLatency(i);
lm.reset();
for (auto h : mProxy->handlers()) {
if (auto cp = h->getConnectConnectionPool(serv->id())) {
lm += cp->latencyMonitors()[i];
if (i < cp->latencyMonitorCount()) {
lm += cp->snapshotLatency(i);
}
}
buf = buf->fappend("ServerLatencyMonitorName:%s %s\n",
serv->addr().data(), lm.name().data());
}
const char* lmName = lm.name().data();
int lmNameLen = lm.name().length();
buf = buf->fappend("ServerLatencyMonitorName:%s %.*s\n",
serv->addr().data(), lmNameLen, lmName ? lmName : "");
buf = lm.output(buf);
buf = buf->fappend("\n");
}
@ -1197,11 +1259,13 @@ void Handler::infoServerLatencyRequest(Request* req)
handleResponse(nullptr, req, res);
return;
}
LatencyMonitor lm = mLatencyMonitors[i];
LatencyMonitor lm = snapshotLatency(i);
lm.reset();
for (auto h : mProxy->handlers()) {
if (auto cp = h->getConnectConnectionPool(serv->id())) {
lm += cp->latencyMonitors()[i];
if (i < cp->latencyMonitorCount()) {
lm += cp->snapshotLatency(i);
}
}
}
buf = buf->fappend("ServerLatencyMonitorName:%s %s\n",
@ -1209,16 +1273,21 @@ void Handler::infoServerLatencyRequest(Request* req)
buf = lm.output(buf);
buf = buf->fappend("\n");
} else {
for (size_t i = 0; i < mLatencyMonitors.size(); ++i) {
LatencyMonitor lm = mLatencyMonitors[i];
size_t count = latencyMonitorCount();
for (size_t i = 0; i < count; ++i) {
LatencyMonitor lm = snapshotLatency(i);
lm.reset();
for (auto h : mProxy->handlers()) {
if (auto cp = h->getConnectConnectionPool(serv->id())) {
lm += cp->latencyMonitors()[i];
if (i < cp->latencyMonitorCount()) {
lm += cp->snapshotLatency(i);
}
}
buf = buf->fappend("ServerLatencyMonitorName:%s %s\n",
serv->addr().data(), lm.name().data());
}
const char* lmName = lm.name().data();
int lmNameLen = lm.name().length();
buf = buf->fappend("ServerLatencyMonitorName:%s %.*s\n",
serv->addr().data(), lmNameLen, lmName ? lmName : "");
buf = lm.output(buf);
buf = buf->fappend("\n");
}
@ -1236,10 +1305,13 @@ void Handler::infoServerLatencyRequest(Request* req)
void Handler::resetStats()
{
{
std::lock_guard<std::mutex> lck(mStatsMtx);
mStats.reset();
for (auto& m : mLatencyMonitors) {
m.reset();
}
}
for (auto cp : mConnPool) {
if (cp) {
cp->resetStats();

View File

@ -7,6 +7,7 @@
#ifndef _PREDIXY_HANDLER_H_
#define _PREDIXY_HANDLER_H_
#include <mutex>
#include <vector>
#include "Predixy.h"
#include "Multiplexor.h"
@ -52,6 +53,9 @@ public:
{
return mLatencyMonitors;
}
HandlerStats snapshotStats() const;
LatencyMonitor snapshotLatency(size_t i) const;
size_t latencyMonitorCount() const;
ConnectConnectionPool* getConnectConnectionPool(int id) const
{
return id < (int)mConnPool.size() ? mConnPool[id] : nullptr;
@ -65,13 +69,19 @@ public:
}
void addServerReadStats(Server* serv, int num)
{
{
std::lock_guard<std::mutex> lck(mStatsMtx);
mStats.recvServerBytes += num;
mConnPool[serv->id()]->stats().recvBytes += num;
}
mConnPool[serv->id()]->addRecvBytes(num);
}
void addServerWriteStats(Server* serv, int num)
{
{
std::lock_guard<std::mutex> lck(mStatsMtx);
mStats.sendServerBytes += num;
mConnPool[serv->id()]->stats().sendBytes += num;
}
mConnPool[serv->id()]->addSendBytes(num);
}
IDUnique& idUnique()
{
@ -126,6 +136,7 @@ private:
long mStatsVer;
HandlerStats mStats;
std::vector<LatencyMonitor> mLatencyMonitors;
mutable std::mutex mStatsMtx;
IDUnique mIDUnique;
unsigned int mRandSeed;
};

62
test/info_concurrent.py Normal file
View File

@ -0,0 +1,62 @@
#!/usr/bin/env python3
#
# Exercise INFO while other commands are running to catch race regressions.
#
import threading
import time
import redis
from test_util import parse_args, get_host_port, exit_with_result
def run_load(client, stop_event, errors):
i = 0
while not stop_event.is_set():
try:
key = "info_concurrent:%d" % i
client.set(key, "v")
client.get(key)
i += 1
except Exception as exc:
errors.append(("load", str(exc)))
return
def run_info(client, stop_event, errors):
while not stop_event.is_set():
try:
client.info()
except Exception as exc:
errors.append(("info", str(exc)))
return
def run_test(host, port):
client = redis.StrictRedis(host=host, port=port)
stop_event = threading.Event()
errors = []
threads = [
threading.Thread(target=run_load, args=(client, stop_event, errors)),
threading.Thread(target=run_load, args=(client, stop_event, errors)),
threading.Thread(target=run_info, args=(client, stop_event, errors)),
]
for t in threads:
t.start()
time.sleep(1.5)
stop_event.set()
for t in threads:
t.join()
if errors:
print("FAIL: concurrent INFO errors", errors[0])
return False
return True
if __name__ == "__main__":
args = parse_args("INFO concurrent test")
host, port = get_host_port(args)
success = run_test(host, port)
exit_with_result(success, "info concurrent", "info concurrent")

View File

@ -152,6 +152,7 @@ TESTS=(
"test/string_to_int.py"
"test/handler_stop_atomic.py"
"test/logger_stop_atomic.py"
"test/info_concurrent.py"
"test/pubsub_long_name.py"
"test/pubsub_large_message.py"
"test/transaction_forbid.py"