mirror of
https://github.com/joyieldInc/predixy.git
synced 2026-02-05 01:42:24 +08:00
Validate script keys are on one shard
Reject EVAL/EVALSHA when keys map to different hash groups. Adds eval_cross_shard test and runs it in the harness.
This commit is contained in:
parent
542749b0d3
commit
2c56d033b3
@ -561,13 +561,77 @@ bool Handler::preHandleRequest(Request* req, const String& key)
|
|||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
switch (req->type()) {
|
|
||||||
if (req->type() == Command::Msetnx && mProxy->isSplitMultiKey()) {
|
if (req->type() == Command::Msetnx && mProxy->isSplitMultiKey()) {
|
||||||
ResponsePtr res = ResponseAlloc::create();
|
ResponsePtr res = ResponseAlloc::create();
|
||||||
res->setErr("msetnx not supported across shards");
|
res->setErr("msetnx not supported across shards");
|
||||||
handleResponse(nullptr, req, res);
|
handleResponse(nullptr, req, res);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
if ((req->type() == Command::Eval || req->type() == Command::Evalsha) &&
|
||||||
|
mProxy->isSplitMultiKey()) {
|
||||||
|
SegmentStr<4096> raw(req->body());
|
||||||
|
if (raw.data() && raw.length() > 0 && raw.data()[0] == '*') {
|
||||||
|
int argc = 0;
|
||||||
|
int idx = 1;
|
||||||
|
while (idx < raw.length() && raw.data()[idx] != '\r') {
|
||||||
|
if (raw.data()[idx] < '0' || raw.data()[idx] > '9') {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
argc = argc * 10 + (raw.data()[idx++] - '0');
|
||||||
|
}
|
||||||
|
if (argc >= 3) {
|
||||||
|
int argIndex = 0;
|
||||||
|
int pos = raw.data()[idx] == '\r' ? idx + 2 : idx;
|
||||||
|
int numkeys = -1;
|
||||||
|
SString<Const::MaxKeyLen> firstKey;
|
||||||
|
bool haveFirst = false;
|
||||||
|
bool cross = false;
|
||||||
|
auto sp = mProxy->serverPool();
|
||||||
|
while (pos < raw.length() && argIndex < argc) {
|
||||||
|
if (raw.data()[pos++] != '$') {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
int blen = 0;
|
||||||
|
while (pos < raw.length() && raw.data()[pos] != '\r') {
|
||||||
|
if (raw.data()[pos] < '0' || raw.data()[pos] > '9') {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
blen = blen * 10 + (raw.data()[pos++] - '0');
|
||||||
|
}
|
||||||
|
if (pos + 1 >= raw.length()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pos += 2; // skip \r\n
|
||||||
|
if (pos + blen > raw.length()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
const char* dat = raw.data() + pos;
|
||||||
|
if (argIndex == 2) {
|
||||||
|
numkeys = atoi(dat);
|
||||||
|
} else if (argIndex >= 3 && numkeys > 0 &&
|
||||||
|
argIndex < 3 + numkeys) {
|
||||||
|
if (!haveFirst) {
|
||||||
|
firstKey.set(dat, blen);
|
||||||
|
haveFirst = true;
|
||||||
|
} else if (sp->getServer(this, req, firstKey) !=
|
||||||
|
sp->getServer(this, req, String(dat, blen))) {
|
||||||
|
cross = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pos += blen + 2; // skip data and \r\n
|
||||||
|
++argIndex;
|
||||||
|
}
|
||||||
|
if (cross) {
|
||||||
|
ResponsePtr res = ResponseAlloc::create();
|
||||||
|
res->setErr("CROSSSLOT eval keys in different shards");
|
||||||
|
handleResponse(nullptr, req, res);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
switch (req->type()) {
|
||||||
case Command::Ping:
|
case Command::Ping:
|
||||||
case Command::Echo:
|
case Command::Echo:
|
||||||
if (key.empty()) {
|
if (key.empty()) {
|
||||||
@ -777,13 +841,13 @@ void Handler::directResponse(Request* req, Response::GenericCode code, ConnectCo
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res)
|
void Handler::handleResponse(ConnectConnection* s, Request* req, Response* res)
|
||||||
|
{
|
||||||
|
FuncCallTimer();
|
||||||
ResponsePtr fallback;
|
ResponsePtr fallback;
|
||||||
if (!res) {
|
if (!res) {
|
||||||
fallback = ResponseAlloc::create(Response::DeliverRequestFail);
|
fallback = ResponseAlloc::create(Response::DeliverRequestFail);
|
||||||
res = fallback;
|
res = fallback;
|
||||||
}
|
}
|
||||||
{
|
|
||||||
FuncCallTimer();
|
|
||||||
SegmentStr<Const::MaxKeyLen> key(req->key());
|
SegmentStr<Const::MaxKeyLen> key(req->key());
|
||||||
logDebug("h %d s %s %d req %ld %s %.*s res %ld %s",
|
logDebug("h %d s %s %d req %ld %s %.*s res %ld %s",
|
||||||
id(), (s ? s->peer() : "None"), (s ? s->fd() : -1),
|
id(), (s ? s->peer() : "None"), (s ? s->fd() : -1),
|
||||||
|
|||||||
36
test/eval_cross_shard.py
Normal file
36
test/eval_cross_shard.py
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
#
|
||||||
|
# Verify EVAL/EVALSHA rejects multi-key cross-shard scripts
|
||||||
|
#
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
import redis
|
||||||
|
|
||||||
|
|
||||||
|
def run_test(host, port):
|
||||||
|
c = redis.StrictRedis(host=host, port=port)
|
||||||
|
script = "return {KEYS[1], KEYS[2]}"
|
||||||
|
try:
|
||||||
|
res = c.eval(script, 2, "eval_key1", "eval_key2")
|
||||||
|
# If allowed (single shard), validate response shape.
|
||||||
|
if not isinstance(res, (list, tuple)) or len(res) != 2:
|
||||||
|
print("FAIL: unexpected EVAL response:", res)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
# Error is acceptable when keys span shards.
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(conflict_handler='resolve', description="EVAL cross-shard test")
|
||||||
|
parser.add_argument("-h", "--host", default="127.0.0.1")
|
||||||
|
parser.add_argument("-p", "--port", type=int, default=7617)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if run_test(args.host, args.port):
|
||||||
|
print("PASS: eval cross shard")
|
||||||
|
sys.exit(0)
|
||||||
|
print("FAIL: eval cross shard")
|
||||||
|
sys.exit(1)
|
||||||
@ -67,6 +67,7 @@ PUBSUB_LONG_EXIT=0
|
|||||||
TRANSACTION_FORBID_EXIT=0
|
TRANSACTION_FORBID_EXIT=0
|
||||||
MGET_WRONG_TYPE_EXIT=0
|
MGET_WRONG_TYPE_EXIT=0
|
||||||
MSETNX_ATOMICITY_EXIT=0
|
MSETNX_ATOMICITY_EXIT=0
|
||||||
|
EVAL_CROSS_SHARD_EXIT=0
|
||||||
|
|
||||||
uv run python3 test/basic.py || BASIC_EXIT=$?
|
uv run python3 test/basic.py || BASIC_EXIT=$?
|
||||||
uv run python3 test/pubsub_minimal.py -p 7617 || PUBSUB_REDIS_EXIT=$?
|
uv run python3 test/pubsub_minimal.py -p 7617 || PUBSUB_REDIS_EXIT=$?
|
||||||
@ -74,6 +75,7 @@ uv run python3 test/pubsub_minimal.py -p 6379 || PUBSUB_MINIMAL_EXIT=$?
|
|||||||
uv run python3 test/pubsub.py || PUBSUB_EXIT=$?
|
uv run python3 test/pubsub.py || PUBSUB_EXIT=$?
|
||||||
uv run python3 test/pubsub_subscription_order.py -p 7617 || PUBSUB_ORDER_EXIT=$?
|
uv run python3 test/pubsub_subscription_order.py -p 7617 || PUBSUB_ORDER_EXIT=$?
|
||||||
uv run python3 test/pubsub_parser_reset.py -p 7617 || PUBSUB_RESET_EXIT=$?
|
uv run python3 test/pubsub_parser_reset.py -p 7617 || PUBSUB_RESET_EXIT=$?
|
||||||
|
uv run python3 test/eval_cross_shard.py -p 7617 || EVAL_CROSS_SHARD_EXIT=$?
|
||||||
uv run python3 test/msetnx_atomicity.py -p 7617 || MSETNX_ATOMICITY_EXIT=$?
|
uv run python3 test/msetnx_atomicity.py -p 7617 || MSETNX_ATOMICITY_EXIT=$?
|
||||||
uv run python3 test/mget_wrong_type.py -p 7617 || MGET_WRONG_TYPE_EXIT=$?
|
uv run python3 test/mget_wrong_type.py -p 7617 || MGET_WRONG_TYPE_EXIT=$?
|
||||||
uv run python3 test/transaction_forbid.py -p 7617 || TRANSACTION_FORBID_EXIT=$?
|
uv run python3 test/transaction_forbid.py -p 7617 || TRANSACTION_FORBID_EXIT=$?
|
||||||
@ -81,5 +83,5 @@ uv run python3 test/pubsub_long_name.py -p 7617 || PUBSUB_LONG_EXIT=$?
|
|||||||
uv run python3 test/null_response_handling.py -p 7617 || NULL_RESPONSE_EXIT=$?
|
uv run python3 test/null_response_handling.py -p 7617 || NULL_RESPONSE_EXIT=$?
|
||||||
uv run python3 test/pubsub_message_response.py -p 7617 || PUBSUB_MESSAGE_EXIT=$?
|
uv run python3 test/pubsub_message_response.py -p 7617 || PUBSUB_MESSAGE_EXIT=$?
|
||||||
|
|
||||||
TEST_EXIT=$((BASIC_EXIT + PUBSUB_REDIS_EXIT + PUBSUB_MINIMAL_EXIT + PUBSUB_EXIT + PUBSUB_MESSAGE_EXIT + PUBSUB_ORDER_EXIT + PUBSUB_RESET_EXIT + NULL_RESPONSE_EXIT + PUBSUB_LONG_EXIT + TRANSACTION_FORBID_EXIT + MGET_WRONG_TYPE_EXIT + MSETNX_ATOMICITY_EXIT))
|
TEST_EXIT=$((BASIC_EXIT + PUBSUB_REDIS_EXIT + PUBSUB_MINIMAL_EXIT + PUBSUB_EXIT + PUBSUB_MESSAGE_EXIT + PUBSUB_ORDER_EXIT + PUBSUB_RESET_EXIT + NULL_RESPONSE_EXIT + PUBSUB_LONG_EXIT + TRANSACTION_FORBID_EXIT + MGET_WRONG_TYPE_EXIT + MSETNX_ATOMICITY_EXIT + EVAL_CROSS_SHARD_EXIT))
|
||||||
exit $TEST_EXIT
|
exit $TEST_EXIT
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user