From 28e20dfe80a22d334e80712aadaefb749c35aeb3 Mon Sep 17 00:00:00 2001 From: Julien Letessier Date: Thu, 15 Jan 2026 10:07:19 +0100 Subject: [PATCH] Fix transaction queuing and pubsub count parsing --- src/Handler.cpp | 19 ++----------------- src/Subscribe.cpp | 21 +++++++++++++++++---- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/src/Handler.cpp b/src/Handler.cpp index 7d7d9a6..5fa65d4 100644 --- a/src/Handler.cpp +++ b/src/Handler.cpp @@ -543,22 +543,6 @@ bool Handler::preHandleRequest(Request* req, const String& key) FuncCallTimer(); auto c = req->connection(); if (c && c->inTransaction()) { - switch (req->type()) { - case Command::Select: - case Command::Psubscribe: - case Command::Subscribe: - { - ResponsePtr res = ResponseAlloc::create(); - char buf[128]; - snprintf(buf, sizeof(buf), "forbid command \"%s\" in transaction", - req->cmd()); - res->setErr(buf); - handleResponse(nullptr, req, res); - return true; - } - default: - break; - } return false; } if (req->type() == Command::Msetnx && mProxy->isSplitMultiKey()) { @@ -910,7 +894,8 @@ void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res) res->adjustForLeader(req); } req->setResponse(res); - if (c->send(this, req, res)) { + // Always trigger write event for error responses to ensure they're sent immediately + if (c->send(this, req, res) || res->isError()) { addPostEvent(c, Multiplexor::WriteEvent); } long elapsed = Util::elapsedUSec() - req->createTime(); diff --git a/src/Subscribe.cpp b/src/Subscribe.cpp index 4fe4c9c..a653bb8 100644 --- a/src/Subscribe.cpp +++ b/src/Subscribe.cpp @@ -41,21 +41,34 @@ SubscribeParser::Status SubscribeParser::parse(const Segment& body, int& chs) st = String; } if (chs < 0) { + // Parse count from the end of the response + // Format: *3\r\n$N\r\nsubscribe\r\n$M\r\n\r\n:\r\n + // The count is always the last integer after a colon int len = body.length(); - int tailLen = len < 64 ? len : 64; + // Look at the tail, but use a larger buffer for long channel names + int tailLen = len < 256 ? len : 256; Segment tmp(body); tmp.rewind(); if (len > tailLen) { tmp.use(len - tailLen); } - char buf[64 + 1]; + char buf[256 + 1]; int n = tmp.dump(buf, tailLen); buf[n] = '\0'; + // Search backwards for colon followed by digits const char* p = buf + n; while (p > buf) { if (*--p == ':') { - chs = atoi(p + 1); - break; + // Found colon, parse the number after it + const char* numStart = p + 1; + // Skip whitespace + while (*numStart == ' ' || *numStart == '\t') { + numStart++; + } + if (*numStart >= '0' && *numStart <= '9') { + chs = atoi(numStart); + break; + } } } }