From 7485ebbb5ae515073b803fac3958189f285210f1 Mon Sep 17 00:00:00 2001 From: Julien Letessier Date: Thu, 15 Jan 2026 09:12:00 +0100 Subject: [PATCH] Reset pubsub parser per message Handle pubsub message replies immediately and reset the parser to avoid response reuse. Adds pubsub_parser_reset test and runs it in the harness. --- src/ConnectConnection.cpp | 4 ++++ test/pubsub_parser_reset.py | 47 +++++++++++++++++++++++++++++++++++++ test/run.sh | 4 +++- 3 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 test/pubsub_parser_reset.py diff --git a/src/ConnectConnection.cpp b/src/ConnectConnection.cpp index d783772..b5665c6 100644 --- a/src/ConnectConnection.cpp +++ b/src/ConnectConnection.cpp @@ -191,6 +191,10 @@ void ConnectConnection::handleResponse(Handler* h) mAcceptConnection->append(req); if (mAcceptConnection->inPendSub()) { mParser.reset(); +mParser.reset(); +h->handleResponse(this, req, res); +mSentRequests.pop_front(); +return; return; } mSentRequests.push_front(req); diff --git a/test/pubsub_parser_reset.py b/test/pubsub_parser_reset.py new file mode 100644 index 0000000..b58aa4c --- /dev/null +++ b/test/pubsub_parser_reset.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +# +# Verify pubsub parser state does not reuse old messages +# + +import argparse +import sys +import redis + + +def run_test(host, port): + c1 = redis.StrictRedis(host=host, port=port) + c2 = redis.StrictRedis(host=host, port=port) + + ps = c1.pubsub() + ps.subscribe("ch_reset") + msg = ps.get_message(timeout=1.0) + if not msg or msg.get("type") != "subscribe": + print("FAIL: subscribe confirmation missing:", msg) + return False + + c2.publish("ch_reset", "first") + msg = ps.get_message(timeout=1.0) + if not msg or msg.get("type") != "message": + print("FAIL: missing first message:", msg) + return False + + ps.psubscribe("ch_reset*") + msg = ps.get_message(timeout=1.0) + if not msg or msg.get("type") != "psubscribe": + print("FAIL: expected psubscribe confirmation, got:", msg) + return False + + return True + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(conflict_handler='resolve', description="Pubsub parser reset test") + parser.add_argument("-h", "--host", default="127.0.0.1") + parser.add_argument("-p", "--port", type=int, default=7617) + args = parser.parse_args() + + if run_test(args.host, args.port): + print("PASS: pubsub parser reset") + sys.exit(0) + print("FAIL: pubsub parser reset") + sys.exit(1) diff --git a/test/run.sh b/test/run.sh index 8094d32..f2586df 100755 --- a/test/run.sh +++ b/test/run.sh @@ -61,13 +61,15 @@ PUBSUB_MINIMAL_EXIT=0 PUBSUB_EXIT=0 PUBSUB_MESSAGE_EXIT=0 PUBSUB_ORDER_EXIT=0 +PUBSUB_RESET_EXIT=0 uv run python3 test/basic.py || BASIC_EXIT=$? uv run python3 test/pubsub_minimal.py -p 7617 || PUBSUB_REDIS_EXIT=$? uv run python3 test/pubsub_minimal.py -p 6379 || PUBSUB_MINIMAL_EXIT=$? uv run python3 test/pubsub.py || PUBSUB_EXIT=$? uv run python3 test/pubsub_subscription_order.py -p 7617 || PUBSUB_ORDER_EXIT=$? +uv run python3 test/pubsub_parser_reset.py -p 7617 || PUBSUB_RESET_EXIT=$? uv run python3 test/pubsub_message_response.py -p 7617 || PUBSUB_MESSAGE_EXIT=$? -TEST_EXIT=$((BASIC_EXIT + PUBSUB_REDIS_EXIT + PUBSUB_MINIMAL_EXIT + PUBSUB_EXIT + PUBSUB_MESSAGE_EXIT + PUBSUB_ORDER_EXIT)) +TEST_EXIT=$((BASIC_EXIT + PUBSUB_REDIS_EXIT + PUBSUB_MINIMAL_EXIT + PUBSUB_EXIT + PUBSUB_MESSAGE_EXIT + PUBSUB_ORDER_EXIT + PUBSUB_RESET_EXIT)) exit $TEST_EXIT