mirror of
https://github.com/joyieldInc/predixy.git
synced 2025-12-24 22:46:41 +08:00
1.fix multi-keys request leader self reference
2.adjust alloc implement
This commit is contained in:
parent
bbbe798629
commit
7fda4b77c1
@ -29,6 +29,7 @@ public:
|
||||
typedef AcceptConnection Value;
|
||||
typedef ListNode<AcceptConnection, SharePtr<AcceptConnection>> ListNodeType;
|
||||
typedef DequeNode<AcceptConnection, SharePtr<AcceptConnection>> DequeNodeType;
|
||||
typedef Alloc<AcceptConnection, Const::AcceptConnectionAllocCacheSize> Allocator;
|
||||
public:
|
||||
AcceptConnection(int fd, sockaddr* addr, socklen_t len);
|
||||
~AcceptConnection();
|
||||
@ -97,6 +98,6 @@ private:
|
||||
|
||||
typedef List<AcceptConnection> AcceptConnectionList;
|
||||
typedef Deque<AcceptConnection> AcceptConnectionDeque;
|
||||
typedef Alloc<AcceptConnection, Const::AcceptConnectionAllocCacheSize> AcceptConnectionAlloc;
|
||||
typedef AcceptConnection::Allocator AcceptConnectionAlloc;
|
||||
|
||||
#endif
|
||||
|
||||
@ -76,7 +76,7 @@ public:
|
||||
}
|
||||
UsedMemory += allocSize<T>();
|
||||
if (MaxMemory == 0 || UsedMemory <= MaxMemory) {
|
||||
void* p = ::operator new(allocSize<T>());
|
||||
void* p = ::operator new(allocSize<T>(), std::nothrow);
|
||||
if (p) {
|
||||
try {
|
||||
obj = new (p) T(args...);
|
||||
@ -145,7 +145,7 @@ public:
|
||||
{
|
||||
int n = --mCnt;
|
||||
if (n == 0) {
|
||||
Alloc<T>::destroy(static_cast<T*>(this));
|
||||
T::Allocator::destroy(static_cast<T*>(this));
|
||||
} else if (n < 0) {
|
||||
logError("unref object %p with cnt %d", this, n);
|
||||
abort();
|
||||
|
||||
@ -23,6 +23,7 @@ class Buffer :
|
||||
public RefCntObj<Buffer>
|
||||
{
|
||||
public:
|
||||
typedef Alloc<Buffer, Const::BufferAllocCacheSize> Allocator;
|
||||
static const int MaxBufFmtAppendLen = 8192;
|
||||
public:
|
||||
Buffer& operator=(const Buffer&);
|
||||
@ -92,12 +93,13 @@ private:
|
||||
};
|
||||
|
||||
typedef List<Buffer> BufferList;
|
||||
typedef Buffer::Allocator BufferAlloc;
|
||||
|
||||
template<>
|
||||
inline int allocSize<Buffer>()
|
||||
{
|
||||
return Buffer::getSize() + sizeof(Buffer);
|
||||
}
|
||||
typedef Alloc<Buffer, Const::BufferAllocCacheSize> BufferAlloc;
|
||||
|
||||
struct BufferPos
|
||||
{
|
||||
|
||||
@ -32,8 +32,8 @@ namespace Const
|
||||
static const int MaxCmdLen = 32;
|
||||
static const int MaxKeyLen = 512;
|
||||
static const int BufferAllocCacheSize = 64;
|
||||
static const int RequestAllocCacheSize = 32;
|
||||
static const int ResponseAllocCacheSize = 32;
|
||||
static const int RequestAllocCacheSize = 128;
|
||||
static const int ResponseAllocCacheSize = 128;
|
||||
static const int AcceptConnectionAllocCacheSize = 32;
|
||||
static const int ConnectConnectionAllocCacheSize = 4;
|
||||
};
|
||||
|
||||
@ -23,6 +23,7 @@ public:
|
||||
typedef ConnectConnection Value;
|
||||
typedef ListNode<ConnectConnection> ListNodeType;
|
||||
typedef DequeNode<ConnectConnection> DequeNodeType;
|
||||
typedef Alloc<ConnectConnection, Const::ConnectConnectionAllocCacheSize> Allocator;
|
||||
public:
|
||||
ConnectConnection(Server* s, bool shared);
|
||||
~ConnectConnection();
|
||||
@ -97,6 +98,6 @@ private:
|
||||
|
||||
typedef List<ConnectConnection> ConnectConnectionList;
|
||||
typedef Deque<ConnectConnection> ConnectConnectionDeque;
|
||||
typedef Alloc<ConnectConnection, Const::ConnectConnectionAllocCacheSize> ConnectConnectionAlloc;
|
||||
typedef ConnectConnection::Allocator ConnectConnectionAlloc;
|
||||
|
||||
#endif
|
||||
|
||||
@ -27,7 +27,7 @@ BufferPtr Connection::getBuffer(Handler* h, bool allowNew)
|
||||
}
|
||||
}
|
||||
if (!mBuf || mBuf->full()) {
|
||||
BufferPtr buf = Alloc<Buffer>::create();
|
||||
BufferPtr buf = BufferAlloc::create();
|
||||
if (mBuf) {
|
||||
mBuf->concat(buf);
|
||||
}
|
||||
|
||||
@ -98,6 +98,7 @@ Request::Request(GenericCode code):
|
||||
|
||||
Request::~Request()
|
||||
{
|
||||
clear();
|
||||
}
|
||||
|
||||
void Request::clear()
|
||||
@ -155,13 +156,14 @@ void Request::set(const RequestParser& p, Request* leader)
|
||||
}
|
||||
mHead = r->mReq;
|
||||
mReq = p.request();
|
||||
mLeader = leader;
|
||||
if (leader == this) {
|
||||
if (mType == Command::Mset || mType == Command::Msetnx) {
|
||||
mFollowers = (p.argNum() - 1) >> 1;
|
||||
} else {
|
||||
mFollowers = p.argNum() - 1;
|
||||
}
|
||||
} else {
|
||||
mLeader = leader;
|
||||
}
|
||||
} else {
|
||||
mReq = p.request();
|
||||
@ -338,49 +340,49 @@ int Request::fill(IOVec* vecs, int len)
|
||||
void Request::setResponse(Response* res)
|
||||
{
|
||||
mDone = true;
|
||||
if (mLeader) {
|
||||
mLeader->mFollowersDone += 1;
|
||||
if (Request* ld = leader()) {
|
||||
ld->mFollowersDone += 1;
|
||||
switch (mType) {
|
||||
case Command::Mget:
|
||||
mRes = res;
|
||||
break;
|
||||
case Command::Mset:
|
||||
if (Response* leaderRes = mLeader->getResponse()) {
|
||||
if (Response* leaderRes = ld->getResponse()) {
|
||||
if (res->isError() && !leaderRes->isError()) {
|
||||
mLeader->mRes = res;
|
||||
ld->mRes = res;
|
||||
}
|
||||
} else {
|
||||
mLeader->mRes = res;
|
||||
ld->mRes = res;
|
||||
}
|
||||
break;
|
||||
case Command::Msetnx:
|
||||
if (Response* leaderRes = mLeader->getResponse()) {
|
||||
if (Response* leaderRes = ld->getResponse()) {
|
||||
if (!leaderRes->isError() &&
|
||||
(res->isError() || res->integer() == 0)) {
|
||||
mLeader->mRes = res;
|
||||
ld->mRes = res;
|
||||
}
|
||||
} else {
|
||||
mLeader->mRes = res;
|
||||
ld->mRes = res;
|
||||
}
|
||||
break;
|
||||
case Command::Touch:
|
||||
case Command::Exists:
|
||||
case Command::Del:
|
||||
case Command::Unlink:
|
||||
if (!mLeader->mRes) {
|
||||
mLeader->mRes = res;
|
||||
if (!ld->mRes) {
|
||||
ld->mRes = res;
|
||||
}
|
||||
if (mLeader->isDone()) {
|
||||
mLeader->mRes->set(mLeader->mRes->integer());
|
||||
if (ld->isDone()) {
|
||||
ld->mRes->set(ld->mRes->integer());
|
||||
}
|
||||
break;
|
||||
case Command::ScriptLoad:
|
||||
if (Response* leaderRes = mLeader->getResponse()) {
|
||||
if (Response* leaderRes = ld->getResponse()) {
|
||||
if (leaderRes->isString() && !res->isString()) {
|
||||
mLeader->mRes = res;
|
||||
ld->mRes = res;
|
||||
}
|
||||
} else {
|
||||
mLeader->mRes = res;
|
||||
ld->mRes = res;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@ -395,7 +397,7 @@ void Request::setResponse(Response* res)
|
||||
|
||||
bool Request::isDone() const
|
||||
{
|
||||
if (mLeader == this) {
|
||||
if (isLeader()) {
|
||||
switch (mType) {
|
||||
case Command::Mget:
|
||||
case Command::Psubscribe:
|
||||
|
||||
@ -25,6 +25,7 @@ class Request :
|
||||
public:
|
||||
typedef Request Value;
|
||||
typedef ListNode<Request, SharePtr<Request>, RequestListIndex::Size> ListNodeType;
|
||||
typedef Alloc<Request, Const::RequestAllocCacheSize> Allocator;
|
||||
static const int MaxRedirectLimit = 3;
|
||||
enum GenericCode
|
||||
{
|
||||
@ -119,11 +120,11 @@ public:
|
||||
}
|
||||
Request* leader() const
|
||||
{
|
||||
return mLeader;
|
||||
return isLeader() ? const_cast<Request*>(this) : (Request*)mLeader;
|
||||
}
|
||||
bool isLeader() const
|
||||
{
|
||||
return mLeader == this;
|
||||
return mFollowers > 0;
|
||||
}
|
||||
bool isDelivered() const
|
||||
{
|
||||
@ -181,6 +182,6 @@ private:
|
||||
|
||||
typedef List<Request, RequestListIndex::Recv> RecvRequestList;
|
||||
typedef List<Request, RequestListIndex::Send> SendRequestList;
|
||||
typedef Alloc<Request, Const::RequestAllocCacheSize> RequestAlloc;
|
||||
typedef Request::Allocator RequestAlloc;
|
||||
|
||||
#endif
|
||||
|
||||
@ -18,6 +18,7 @@ class Response :
|
||||
public:
|
||||
typedef Response Value;
|
||||
typedef ListNode<Response, SharePtr<Response>> ListNodeType;
|
||||
typedef Alloc<Response, Const::ResponseAllocCacheSize> Allocator;
|
||||
enum GenericCode
|
||||
{
|
||||
Pong,
|
||||
@ -137,6 +138,6 @@ private:
|
||||
};
|
||||
|
||||
typedef List<Response> ResponseList;
|
||||
typedef Alloc<Response, Const::ResponseAllocCacheSize> ResponseAlloc;
|
||||
typedef Response::Allocator ResponseAlloc;
|
||||
|
||||
#endif
|
||||
|
||||
Loading…
Reference in New Issue
Block a user