From b059744c5f46a0303a9621590572ce7d8230f0c5 Mon Sep 17 00:00:00 2001 From: pt3ot8pl933h Date: Thu, 30 Oct 2025 18:34:09 +0800 Subject: [PATCH] Fix SCAN command cursor overflow with __uint128_t and optimize code reusability - Use __uint128_t to handle large cursor values from Kvrocks (up to 64-bit) - Prevent integer overflow when encoding cursor with Group ID - Add Util::uint128ToString() utility function to eliminate code duplication - Refactor Handler.cpp and Request.cpp to use the new utility function - Add debug logging for cursor transformation tracking This fix resolves SCAN cursor infinite loop issue when using Predixy with Kvrocks in Cluster mode. --- src/Handler.cpp | 71 ++++++++++++++++++++++++++++++++++++++++++++----- src/Request.cpp | 6 ++--- src/Request.h | 2 +- src/Util.h | 25 +++++++++++++++++ 4 files changed, 94 insertions(+), 10 deletions(-) diff --git a/src/Handler.cpp b/src/Handler.cpp index b45127b..55949bf 100644 --- a/src/Handler.cpp +++ b/src/Handler.cpp @@ -649,10 +649,24 @@ bool Handler::preHandleRequest(Request* req, const String& key) case Command::Scan: { auto sp = mProxy->serverPool(); - unsigned long cursor = atol(key.data()); - int groupIdx = cursor & Const::ServGroupMask; + // Use 128-bit integer to handle large cursor values + __uint128_t cursor = 0; + const char* cursorStr = key.data(); + // Manually parse string to 128-bit integer + while (*cursorStr >= '0' && *cursorStr <= '9') { + cursor = cursor * 10 + (*cursorStr - '0'); + cursorStr++; + } + + // Debug log: cursor received from client + char cursorBuf[64]; + Util::uint128ToString(cursor, cursorBuf); + + int groupIdx = (unsigned long)(cursor & Const::ServGroupMask); auto g = sp->getGroup(groupIdx); if (!g) { + logDebug("h %d SCAN cursor from client: %s (invalid group %d)", + id(), cursorBuf, groupIdx); directResponse(req, Response::InvalidScanCursor); return true; } @@ -661,12 +675,26 @@ bool Handler::preHandleRequest(Request* req, const String& key) serv = g->getMaster(); } if (!serv) { + logDebug("h %d SCAN cursor from client: %s (no server available)", + id(), cursorBuf); directResponse(req, Response::ScanEnd); return true; } if (ConnectConnection* s = getConnectConnection(req, serv)) { if (cursor != 0) { - req->adjustScanCursor(cursor >> Const::ServGroupBits); + __uint128_t actualCursor = cursor >> Const::ServGroupBits; + + // Debug log: cursor to be sent to backend server + char actualCursorBuf[64]; + Util::uint128ToString(actualCursor, actualCursorBuf); + + logDebug("h %d SCAN cursor from client: %s, group: %d, cursor to server: %s", + id(), cursorBuf, groupIdx, actualCursorBuf); + + req->adjustScanCursor(actualCursor); + } else { + logDebug("h %d SCAN cursor from client: 0, group: %d, cursor to server: 0", + id(), groupIdx); } handleRequest(req, s); } else { @@ -814,14 +842,41 @@ void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res) } else if (req->type() == Command::Scan && s && res->type() == Reply::Array) { SegmentStr<64> str(res->body()); if (const char* p = strchr(str.data() + sizeof("*2\r\n$"), '\n')) { - long cursor = atol(p + 1); + // Use 128-bit integer to handle large cursor values (Kvrocks may return cursor close to 64-bit limit) + __uint128_t cursor = 0; + const char* cursorStr = p + 1; + const char* cursorStart = cursorStr; + while (*cursorStr >= '0' && *cursorStr <= '9') { + cursor = cursor * 10 + (*cursorStr - '0'); + cursorStr++; + } + + // Debug log: cursor received from backend server + char serverCursorBuf[64]; + int serverCursorLen = cursorStr - cursorStart; + if (serverCursorLen > 0 && serverCursorLen < 64) { + memcpy(serverCursorBuf, cursorStart, serverCursorLen); + serverCursorBuf[serverCursorLen] = '\0'; + } else { + strcpy(serverCursorBuf, "0"); + } + auto g = s->server()->group(); + int currentGroupId = g ? g->id() : -1; + if (cursor != 0 || (g = sp->getGroup(g->id() + 1)) != nullptr) { + // Use 128-bit integer for left shift, will not overflow cursor <<= Const::ServGroupBits; cursor |= g->id(); if ((p = strchr(p, '*')) != nullptr) { - char buf[32]; - int n = snprintf(buf, sizeof(buf), "%ld", cursor); + // Convert 128-bit integer to string + char buf[64]; // 128-bit needs at most 39 decimal digits + int n = Util::uint128ToString(cursor, buf); + + // Debug log: cursor to be sent to client + logDebug("h %d SCAN cursor from server: %s, group: %d, cursor to client: %s", + id(), serverCursorBuf, g->id(), buf); + res->head().fset(nullptr, "*2\r\n" "$%d\r\n" @@ -829,6 +884,10 @@ void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res) n, buf); res->body().cut(p - str.data()); } + } else { + // Scan completed, return 0 to client + logDebug("h %d SCAN cursor from server: %s, group: %d, cursor to client: 0 (scan complete)", + id(), serverCursorBuf, currentGroupId); } } } diff --git a/src/Request.cpp b/src/Request.cpp index 1734c3c..8e9942a 100644 --- a/src/Request.cpp +++ b/src/Request.cpp @@ -246,10 +246,10 @@ void Request::setSentinelSlaves(const String& master) master.length(), master.length(), master.data()); } -void Request::adjustScanCursor(long cursor) +void Request::adjustScanCursor(__uint128_t cursor) { - char buf[32]; - int n = snprintf(buf, sizeof(buf), "%ld", cursor); + char buf[64]; // 128-bit integer needs at most 39 decimal digits + int n = Util::uint128ToString(cursor, buf); if (mHead.empty()) { SegmentStr<64> str(mReq); const char* p = strchr(str.data(), '$'); diff --git a/src/Request.h b/src/Request.h index 1f04102..86348dd 100644 --- a/src/Request.h +++ b/src/Request.h @@ -63,7 +63,7 @@ public: void setSentinels(const String& master); void setSentinelGetMaster(const String& master); void setSentinelSlaves(const String& master); - void adjustScanCursor(long cursor); + void adjustScanCursor(__uint128_t cursor); void follow(Request* leader); void setResponse(Response* res); bool send(Socket* s); diff --git a/src/Util.h b/src/Util.h index 164c996..a8cba8a 100644 --- a/src/Util.h +++ b/src/Util.h @@ -81,6 +81,31 @@ namespace Util { return duration_cast(steady_clock::now().time_since_epoch()).count(); } + + // Convert 128-bit unsigned integer to string + // Returns the length of the converted string (excluding null terminator) + // The buffer must be at least 40 bytes (39 digits + null terminator for max __uint128_t value) + inline int uint128ToString(__uint128_t value, char* buf) + { + int n = 0; + if (value == 0) { + buf[n++] = '0'; + } else { + char temp[64]; + int tempLen = 0; + __uint128_t v = value; + while (v > 0) { + temp[tempLen++] = '0' + (v % 10); + v /= 10; + } + // Reverse digits (extracted from low to high) + for (int i = tempLen - 1; i >= 0; i--) { + buf[n++] = temp[i]; + } + } + buf[n] = '\0'; + return n; + } }; #endif