Fix SCAN command cursor overflow with __uint128_t and optimize code reusability

- Use __uint128_t to handle large cursor values from Kvrocks (up to 64-bit)
- Prevent integer overflow when encoding cursor with Group ID
- Add Util::uint128ToString() utility function to eliminate code duplication
- Refactor Handler.cpp and Request.cpp to use the new utility function
- Add debug logging for cursor transformation tracking

This fix resolves SCAN cursor infinite loop issue when using Predixy with Kvrocks in Cluster mode.
This commit is contained in:
pt3ot8pl933h 2025-10-30 18:34:09 +08:00
parent ca1630a6b4
commit b059744c5f
4 changed files with 94 additions and 10 deletions

View File

@ -649,10 +649,24 @@ bool Handler::preHandleRequest(Request* req, const String& key)
case Command::Scan: case Command::Scan:
{ {
auto sp = mProxy->serverPool(); auto sp = mProxy->serverPool();
unsigned long cursor = atol(key.data()); // Use 128-bit integer to handle large cursor values
int groupIdx = cursor & Const::ServGroupMask; __uint128_t cursor = 0;
const char* cursorStr = key.data();
// Manually parse string to 128-bit integer
while (*cursorStr >= '0' && *cursorStr <= '9') {
cursor = cursor * 10 + (*cursorStr - '0');
cursorStr++;
}
// Debug log: cursor received from client
char cursorBuf[64];
Util::uint128ToString(cursor, cursorBuf);
int groupIdx = (unsigned long)(cursor & Const::ServGroupMask);
auto g = sp->getGroup(groupIdx); auto g = sp->getGroup(groupIdx);
if (!g) { if (!g) {
logDebug("h %d SCAN cursor from client: %s (invalid group %d)",
id(), cursorBuf, groupIdx);
directResponse(req, Response::InvalidScanCursor); directResponse(req, Response::InvalidScanCursor);
return true; return true;
} }
@ -661,12 +675,26 @@ bool Handler::preHandleRequest(Request* req, const String& key)
serv = g->getMaster(); serv = g->getMaster();
} }
if (!serv) { if (!serv) {
logDebug("h %d SCAN cursor from client: %s (no server available)",
id(), cursorBuf);
directResponse(req, Response::ScanEnd); directResponse(req, Response::ScanEnd);
return true; return true;
} }
if (ConnectConnection* s = getConnectConnection(req, serv)) { if (ConnectConnection* s = getConnectConnection(req, serv)) {
if (cursor != 0) { if (cursor != 0) {
req->adjustScanCursor(cursor >> Const::ServGroupBits); __uint128_t actualCursor = cursor >> Const::ServGroupBits;
// Debug log: cursor to be sent to backend server
char actualCursorBuf[64];
Util::uint128ToString(actualCursor, actualCursorBuf);
logDebug("h %d SCAN cursor from client: %s, group: %d, cursor to server: %s",
id(), cursorBuf, groupIdx, actualCursorBuf);
req->adjustScanCursor(actualCursor);
} else {
logDebug("h %d SCAN cursor from client: 0, group: %d, cursor to server: 0",
id(), groupIdx);
} }
handleRequest(req, s); handleRequest(req, s);
} else { } else {
@ -814,14 +842,41 @@ void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res)
} else if (req->type() == Command::Scan && s && res->type() == Reply::Array) { } else if (req->type() == Command::Scan && s && res->type() == Reply::Array) {
SegmentStr<64> str(res->body()); SegmentStr<64> str(res->body());
if (const char* p = strchr(str.data() + sizeof("*2\r\n$"), '\n')) { if (const char* p = strchr(str.data() + sizeof("*2\r\n$"), '\n')) {
long cursor = atol(p + 1); // Use 128-bit integer to handle large cursor values (Kvrocks may return cursor close to 64-bit limit)
__uint128_t cursor = 0;
const char* cursorStr = p + 1;
const char* cursorStart = cursorStr;
while (*cursorStr >= '0' && *cursorStr <= '9') {
cursor = cursor * 10 + (*cursorStr - '0');
cursorStr++;
}
// Debug log: cursor received from backend server
char serverCursorBuf[64];
int serverCursorLen = cursorStr - cursorStart;
if (serverCursorLen > 0 && serverCursorLen < 64) {
memcpy(serverCursorBuf, cursorStart, serverCursorLen);
serverCursorBuf[serverCursorLen] = '\0';
} else {
strcpy(serverCursorBuf, "0");
}
auto g = s->server()->group(); auto g = s->server()->group();
int currentGroupId = g ? g->id() : -1;
if (cursor != 0 || (g = sp->getGroup(g->id() + 1)) != nullptr) { if (cursor != 0 || (g = sp->getGroup(g->id() + 1)) != nullptr) {
// Use 128-bit integer for left shift, will not overflow
cursor <<= Const::ServGroupBits; cursor <<= Const::ServGroupBits;
cursor |= g->id(); cursor |= g->id();
if ((p = strchr(p, '*')) != nullptr) { if ((p = strchr(p, '*')) != nullptr) {
char buf[32]; // Convert 128-bit integer to string
int n = snprintf(buf, sizeof(buf), "%ld", cursor); char buf[64]; // 128-bit needs at most 39 decimal digits
int n = Util::uint128ToString(cursor, buf);
// Debug log: cursor to be sent to client
logDebug("h %d SCAN cursor from server: %s, group: %d, cursor to client: %s",
id(), serverCursorBuf, g->id(), buf);
res->head().fset(nullptr, res->head().fset(nullptr,
"*2\r\n" "*2\r\n"
"$%d\r\n" "$%d\r\n"
@ -829,6 +884,10 @@ void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res)
n, buf); n, buf);
res->body().cut(p - str.data()); res->body().cut(p - str.data());
} }
} else {
// Scan completed, return 0 to client
logDebug("h %d SCAN cursor from server: %s, group: %d, cursor to client: 0 (scan complete)",
id(), serverCursorBuf, currentGroupId);
} }
} }
} }

View File

@ -246,10 +246,10 @@ void Request::setSentinelSlaves(const String& master)
master.length(), master.length(), master.data()); master.length(), master.length(), master.data());
} }
void Request::adjustScanCursor(long cursor) void Request::adjustScanCursor(__uint128_t cursor)
{ {
char buf[32]; char buf[64]; // 128-bit integer needs at most 39 decimal digits
int n = snprintf(buf, sizeof(buf), "%ld", cursor); int n = Util::uint128ToString(cursor, buf);
if (mHead.empty()) { if (mHead.empty()) {
SegmentStr<64> str(mReq); SegmentStr<64> str(mReq);
const char* p = strchr(str.data(), '$'); const char* p = strchr(str.data(), '$');

View File

@ -63,7 +63,7 @@ public:
void setSentinels(const String& master); void setSentinels(const String& master);
void setSentinelGetMaster(const String& master); void setSentinelGetMaster(const String& master);
void setSentinelSlaves(const String& master); void setSentinelSlaves(const String& master);
void adjustScanCursor(long cursor); void adjustScanCursor(__uint128_t cursor);
void follow(Request* leader); void follow(Request* leader);
void setResponse(Response* res); void setResponse(Response* res);
bool send(Socket* s); bool send(Socket* s);

View File

@ -81,6 +81,31 @@ namespace Util
{ {
return duration_cast<microseconds>(steady_clock::now().time_since_epoch()).count(); return duration_cast<microseconds>(steady_clock::now().time_since_epoch()).count();
} }
// Convert 128-bit unsigned integer to string
// Returns the length of the converted string (excluding null terminator)
// The buffer must be at least 40 bytes (39 digits + null terminator for max __uint128_t value)
inline int uint128ToString(__uint128_t value, char* buf)
{
int n = 0;
if (value == 0) {
buf[n++] = '0';
} else {
char temp[64];
int tempLen = 0;
__uint128_t v = value;
while (v > 0) {
temp[tempLen++] = '0' + (v % 10);
v /= 10;
}
// Reverse digits (extracted from low to high)
for (int i = tempLen - 1; i >= 0; i--) {
buf[n++] = temp[i];
}
}
buf[n] = '\0';
return n;
}
}; };
#endif #endif