mirror of
https://github.com/joyieldInc/predixy.git
synced 2026-02-05 01:42:24 +08:00
Ensure pubsub confirmations precede messages
Pause message queuing while subscriptions are pending to preserve confirmation ordering. Adds pubsub_subscription_order test and includes it in the test runner.
This commit is contained in:
parent
4b127b8eed
commit
0243aa0fa6
@ -189,6 +189,10 @@ void ConnectConnection::handleResponse(Handler* h)
|
|||||||
res->set(mParser);
|
res->set(mParser);
|
||||||
req->setResponse(res);
|
req->setResponse(res);
|
||||||
mAcceptConnection->append(req);
|
mAcceptConnection->append(req);
|
||||||
|
if (mAcceptConnection->inPendSub()) {
|
||||||
|
mParser.reset();
|
||||||
|
return;
|
||||||
|
}
|
||||||
mSentRequests.push_front(req);
|
mSentRequests.push_front(req);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|||||||
55
test/pubsub_subscription_order.py
Normal file
55
test/pubsub_subscription_order.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
#
|
||||||
|
# Verify subscription confirmations arrive before messages
|
||||||
|
#
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
import redis
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
def run_test(host, port):
|
||||||
|
c1 = redis.StrictRedis(host=host, port=port)
|
||||||
|
c2 = redis.StrictRedis(host=host, port=port)
|
||||||
|
|
||||||
|
ps = c1.pubsub()
|
||||||
|
ps.subscribe("ch_order")
|
||||||
|
|
||||||
|
# Publish quickly after subscribe to stress ordering.
|
||||||
|
c2.publish("ch_order", "order_msg")
|
||||||
|
|
||||||
|
msgs = []
|
||||||
|
for _ in range(5):
|
||||||
|
msg = ps.get_message(timeout=0.5)
|
||||||
|
if msg:
|
||||||
|
msgs.append(msg)
|
||||||
|
if len(msgs) >= 2:
|
||||||
|
break
|
||||||
|
|
||||||
|
if not msgs:
|
||||||
|
print("FAIL: missing subscribe confirmation")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if msgs[0].get("type") != "subscribe":
|
||||||
|
print("FAIL: first message is not subscribe:", msgs[0])
|
||||||
|
return False
|
||||||
|
|
||||||
|
if len(msgs) > 1 and msgs[1].get("type") != "message":
|
||||||
|
print("FAIL: second message is not data message:", msgs[1])
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(conflict_handler='resolve', description="Pubsub subscription order test")
|
||||||
|
parser.add_argument("-h", "--host", default="127.0.0.1")
|
||||||
|
parser.add_argument("-p", "--port", type=int, default=7617)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if run_test(args.host, args.port):
|
||||||
|
print("PASS: pubsub subscription order")
|
||||||
|
sys.exit(0)
|
||||||
|
print("FAIL: pubsub subscription order")
|
||||||
|
sys.exit(1)
|
||||||
@ -60,12 +60,14 @@ PUBSUB_REDIS_EXIT=0
|
|||||||
PUBSUB_MINIMAL_EXIT=0
|
PUBSUB_MINIMAL_EXIT=0
|
||||||
PUBSUB_EXIT=0
|
PUBSUB_EXIT=0
|
||||||
PUBSUB_MESSAGE_EXIT=0
|
PUBSUB_MESSAGE_EXIT=0
|
||||||
|
PUBSUB_ORDER_EXIT=0
|
||||||
|
|
||||||
uv run python3 test/basic.py || BASIC_EXIT=$?
|
uv run python3 test/basic.py || BASIC_EXIT=$?
|
||||||
uv run python3 test/pubsub_minimal.py -p 7617 || PUBSUB_REDIS_EXIT=$?
|
uv run python3 test/pubsub_minimal.py -p 7617 || PUBSUB_REDIS_EXIT=$?
|
||||||
uv run python3 test/pubsub_minimal.py -p 6379 || PUBSUB_MINIMAL_EXIT=$?
|
uv run python3 test/pubsub_minimal.py -p 6379 || PUBSUB_MINIMAL_EXIT=$?
|
||||||
uv run python3 test/pubsub.py || PUBSUB_EXIT=$?
|
uv run python3 test/pubsub.py || PUBSUB_EXIT=$?
|
||||||
|
uv run python3 test/pubsub_subscription_order.py -p 7617 || PUBSUB_ORDER_EXIT=$?
|
||||||
uv run python3 test/pubsub_message_response.py -p 7617 || PUBSUB_MESSAGE_EXIT=$?
|
uv run python3 test/pubsub_message_response.py -p 7617 || PUBSUB_MESSAGE_EXIT=$?
|
||||||
|
|
||||||
TEST_EXIT=$((BASIC_EXIT + PUBSUB_REDIS_EXIT + PUBSUB_MINIMAL_EXIT + PUBSUB_EXIT + PUBSUB_MESSAGE_EXIT))
|
TEST_EXIT=$((BASIC_EXIT + PUBSUB_REDIS_EXIT + PUBSUB_MINIMAL_EXIT + PUBSUB_EXIT + PUBSUB_MESSAGE_EXIT + PUBSUB_ORDER_EXIT))
|
||||||
exit $TEST_EXIT
|
exit $TEST_EXIT
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user