mirror of
https://github.com/joyieldInc/predixy.git
synced 2026-02-05 01:42:24 +08:00
Fix transaction queuing and pubsub count parsing
This commit is contained in:
parent
8dd200a35e
commit
28e20dfe80
@ -543,22 +543,6 @@ bool Handler::preHandleRequest(Request* req, const String& key)
|
|||||||
FuncCallTimer();
|
FuncCallTimer();
|
||||||
auto c = req->connection();
|
auto c = req->connection();
|
||||||
if (c && c->inTransaction()) {
|
if (c && c->inTransaction()) {
|
||||||
switch (req->type()) {
|
|
||||||
case Command::Select:
|
|
||||||
case Command::Psubscribe:
|
|
||||||
case Command::Subscribe:
|
|
||||||
{
|
|
||||||
ResponsePtr res = ResponseAlloc::create();
|
|
||||||
char buf[128];
|
|
||||||
snprintf(buf, sizeof(buf), "forbid command \"%s\" in transaction",
|
|
||||||
req->cmd());
|
|
||||||
res->setErr(buf);
|
|
||||||
handleResponse(nullptr, req, res);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (req->type() == Command::Msetnx && mProxy->isSplitMultiKey()) {
|
if (req->type() == Command::Msetnx && mProxy->isSplitMultiKey()) {
|
||||||
@ -910,7 +894,8 @@ void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res)
|
|||||||
res->adjustForLeader(req);
|
res->adjustForLeader(req);
|
||||||
}
|
}
|
||||||
req->setResponse(res);
|
req->setResponse(res);
|
||||||
if (c->send(this, req, res)) {
|
// Always trigger write event for error responses to ensure they're sent immediately
|
||||||
|
if (c->send(this, req, res) || res->isError()) {
|
||||||
addPostEvent(c, Multiplexor::WriteEvent);
|
addPostEvent(c, Multiplexor::WriteEvent);
|
||||||
}
|
}
|
||||||
long elapsed = Util::elapsedUSec() - req->createTime();
|
long elapsed = Util::elapsedUSec() - req->createTime();
|
||||||
|
|||||||
@ -41,21 +41,34 @@ SubscribeParser::Status SubscribeParser::parse(const Segment& body, int& chs)
|
|||||||
st = String;
|
st = String;
|
||||||
}
|
}
|
||||||
if (chs < 0) {
|
if (chs < 0) {
|
||||||
|
// Parse count from the end of the response
|
||||||
|
// Format: *3\r\n$N\r\nsubscribe\r\n$M\r\n<channel>\r\n:<count>\r\n
|
||||||
|
// The count is always the last integer after a colon
|
||||||
int len = body.length();
|
int len = body.length();
|
||||||
int tailLen = len < 64 ? len : 64;
|
// Look at the tail, but use a larger buffer for long channel names
|
||||||
|
int tailLen = len < 256 ? len : 256;
|
||||||
Segment tmp(body);
|
Segment tmp(body);
|
||||||
tmp.rewind();
|
tmp.rewind();
|
||||||
if (len > tailLen) {
|
if (len > tailLen) {
|
||||||
tmp.use(len - tailLen);
|
tmp.use(len - tailLen);
|
||||||
}
|
}
|
||||||
char buf[64 + 1];
|
char buf[256 + 1];
|
||||||
int n = tmp.dump(buf, tailLen);
|
int n = tmp.dump(buf, tailLen);
|
||||||
buf[n] = '\0';
|
buf[n] = '\0';
|
||||||
|
// Search backwards for colon followed by digits
|
||||||
const char* p = buf + n;
|
const char* p = buf + n;
|
||||||
while (p > buf) {
|
while (p > buf) {
|
||||||
if (*--p == ':') {
|
if (*--p == ':') {
|
||||||
chs = atoi(p + 1);
|
// Found colon, parse the number after it
|
||||||
break;
|
const char* numStart = p + 1;
|
||||||
|
// Skip whitespace
|
||||||
|
while (*numStart == ' ' || *numStart == '\t') {
|
||||||
|
numStart++;
|
||||||
|
}
|
||||||
|
if (*numStart >= '0' && *numStart <= '9') {
|
||||||
|
chs = atoi(numStart);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user