Compare commits

..

No commits in common. "master" and "1.0.4a" have entirely different histories.

33 changed files with 124 additions and 1085 deletions

View File

@ -1,94 +0,0 @@
## Custom Command
## CustomCommand {
## command { #command string, must be lowercase
## [Mode read|write|admin[|[keyAt2|keyAt3]] #default write, default key position is 1
## [MinArgs [2-]] #default 2, including command itself
## [MaxArgs [2-]] #default 2, must be MaxArgs >= MinArgs
## }...
## }
## Currently support maximum 16 custom commands
## Example:
#CustomCommand {
##------------------------------------------------------------------------
# custom.ttl {
# Mode keyAt2
# MinArgs 3
# MaxArgs 3
# }
#### custom.ttl miliseconds key
#### Mode = write|keyAt2, MinArgs/MaxArgs = 3 = command + miliseconds + key
##------------------------------------------------------------------------
## from redis source src/modules/hello.c
# hello.push.native {
# MinArgs 3
# MaxArgs 3
# }
#### hello.push.native key value
#### Mode = write, MinArgs/MaxArgs = 3 = command) + key + value
##------------------------------------------------------------------------
# hello.repl2 {
# }
#### hello.repl2 <list-key>
#### Mode = write, MinArgs/MaxArgs = 2 = command + list-key
##------------------------------------------------------------------------
# hello.toggle.case {
# }
#### hello.toggle.case key
#### Mode = write, MinArgs/MaxArgs = 2 = command + key
##------------------------------------------------------------------------
# hello.more.expire {
# MinArgs 3
# MaxArgs 3
# }
#### hello.more.expire key milliseconds
#### Mode = write, MinArgs/MaxArgs = 3 = command + key + milliseconds
##------------------------------------------------------------------------
# hello.zsumrange {
# MinArgs 4
# MaxArgs 4
# Mode read
# }
#### hello.zsumrange key startscore endscore
#### Mode = read, MinArgs/MaxArgs = 4 = command + key + startscore + endscore
##------------------------------------------------------------------------
# hello.lexrange {
# MinArgs 6
# MaxArgs 6
# Mode read
# }
#### hello.lexrange key min_lex max_lex min_age max_age
#### Mode = read, MinArgs/MaxArgs = 6 = command + key + min_lex + max_lex + min_age + max_age
##------------------------------------------------------------------------
# hello.hcopy {
# MinArgs 4
# MaxArgs 4
# }
#### hello.hcopy key srcfield dstfield
#### Mode = write, MinArgs/MaxArgs = 4 = command + key + srcfield) + dstfield
##------------------------------------------------------------------------
## from redis source src/modules/hellotype.c
# hellotype.insert {
# MinArgs 3
# MaxArgs 3
# }
#### hellotype.insert key value
#### Mode = write, MinArgs/MaxArgs = 3 = command + key + value
##------------------------------------------------------------------------
# hellotype.range {
# MinArgs 4
# MaxArgs 4
# Mode read
# }
#### hellotype.range key first count
#### Mode = read, MinArgs/MaxArgs = 4 = command + key + first + count
##------------------------------------------------------------------------
# hellotype.len {
# Mode read
# }
#### hellotype.len key
#### Mode = read, MinArgs/MaxArgs = 2 = command + key
##------------------------------------------------------------------------
#}

View File

@ -93,10 +93,6 @@ Include try.conf
# Include dc.conf # Include dc.conf
################################### COMMAND ####################################
## Custom command define, see command.conf
#Include command.conf
################################### LATENCY #################################### ################################### LATENCY ####################################
## Latency monitor define, see latency.conf ## Latency monitor define, see latency.conf
Include latency.conf Include latency.conf

View File

@ -1,71 +0,0 @@
## redis standalone server pool define
##StandaloneServerPool {
## [Password xxx] #default no
## [Databases number] #default 1
## Hash atol|crc16
## [HashTag "xx"] #default no
## Distribution modula|random
## [MasterReadPriority [0-100]] #default 50
## [StaticSlaveReadPriority [0-100]] #default 0
## [DynamicSlaveReadPriority [0-100]] #default 0
## RefreshMethod fixed|sentinel #
## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
## [ServerTimeout number[s|ms|us]] #default 0, server connection socket read/write timeout
## [ServerFailureLimit number] #default 10
## [ServerRetryTimeout number[s|ms|us]] #default 1
## [KeepAlive seconds] #default 0, server connection tcp keepalive
## Sentinels [sentinel-password] {
## + addr
## ...
## }
## Group xxx {
## [+ addr] #if RefreshMethod==fixed: the first addr is master in a group, then all addrs is slaves in this group
## ...
## }
##}
## Examples:
#StandaloneServerPool {
# Databases 16
# Hash crc16
# HashTag "{}"
# Distribution modula
# MasterReadPriority 60
# StaticSlaveReadPriority 50
# DynamicSlaveReadPriority 50
# RefreshMethod sentinel
# RefreshInterval 1
# ServerTimeout 1
# ServerFailureLimit 10
# ServerRetryTimeout 1
# KeepAlive 120
# Sentinels {
# + 10.2.2.2:7500
# + 10.2.2.3:7500
# + 10.2.2.4:7500
# }
# Group shard001 {
# }
# Group shard002 {
# }
#}
#StandaloneServerPool {
# Databases 16
# Hash crc16
# HashTag "{}"
# Distribution modula
# MasterReadPriority 60
# StaticSlaveReadPriority 50
# DynamicSlaveReadPriority 50
# RefreshMethod fixed
# ServerTimeout 1
# ServerFailureLimit 10
# ServerRetryTimeout 1
# KeepAlive 120
# Group shard001 {
# + 10.2.3.2:6379
# }
#}

View File

@ -151,9 +151,9 @@ predixy扩展了redis中AUTH命令的功能支持定义多个认证密码
Authority { Authority {
Auth [password] { Auth [password] {
Mode read|write|admin Mode read|write|admin
[KeyPrefix Prefix...] [KeyPredix Predix...]
[ReadKeyPrefix Prefix...] [ReadKeyPredix Predix...]
[WriteKeyPrefix Prefix...] [WriteKeyPredix Predix...]
}... }...
} }

View File

@ -29,7 +29,6 @@ public:
typedef AcceptConnection Value; typedef AcceptConnection Value;
typedef ListNode<AcceptConnection, SharePtr<AcceptConnection>> ListNodeType; typedef ListNode<AcceptConnection, SharePtr<AcceptConnection>> ListNodeType;
typedef DequeNode<AcceptConnection, SharePtr<AcceptConnection>> DequeNodeType; typedef DequeNode<AcceptConnection, SharePtr<AcceptConnection>> DequeNodeType;
typedef Alloc<AcceptConnection, Const::AcceptConnectionAllocCacheSize> Allocator;
public: public:
AcceptConnection(int fd, sockaddr* addr, socklen_t len); AcceptConnection(int fd, sockaddr* addr, socklen_t len);
~AcceptConnection(); ~AcceptConnection();
@ -98,6 +97,6 @@ private:
typedef List<AcceptConnection> AcceptConnectionList; typedef List<AcceptConnection> AcceptConnectionList;
typedef Deque<AcceptConnection> AcceptConnectionDeque; typedef Deque<AcceptConnection> AcceptConnectionDeque;
typedef AcceptConnection::Allocator AcceptConnectionAlloc; typedef Alloc<AcceptConnection, Const::AcceptConnectionAllocCacheSize> AcceptConnectionAlloc;
#endif #endif

View File

@ -76,7 +76,7 @@ public:
} }
UsedMemory += allocSize<T>(); UsedMemory += allocSize<T>();
if (MaxMemory == 0 || UsedMemory <= MaxMemory) { if (MaxMemory == 0 || UsedMemory <= MaxMemory) {
void* p = ::operator new(allocSize<T>(), std::nothrow); void* p = ::operator new(allocSize<T>());
if (p) { if (p) {
try { try {
obj = new (p) T(args...); obj = new (p) T(args...);
@ -145,7 +145,7 @@ public:
{ {
int n = --mCnt; int n = --mCnt;
if (n == 0) { if (n == 0) {
T::Allocator::destroy(static_cast<T*>(this)); Alloc<T>::destroy(static_cast<T*>(this));
} else if (n < 0) { } else if (n < 0) {
logError("unref object %p with cnt %d", this, n); logError("unref object %p with cnt %d", this, n);
abort(); abort();

View File

@ -23,7 +23,6 @@ class Buffer :
public RefCntObj<Buffer> public RefCntObj<Buffer>
{ {
public: public:
typedef Alloc<Buffer, Const::BufferAllocCacheSize> Allocator;
static const int MaxBufFmtAppendLen = 8192; static const int MaxBufFmtAppendLen = 8192;
public: public:
Buffer& operator=(const Buffer&); Buffer& operator=(const Buffer&);
@ -93,13 +92,12 @@ private:
}; };
typedef List<Buffer> BufferList; typedef List<Buffer> BufferList;
typedef Buffer::Allocator BufferAlloc;
template<> template<>
inline int allocSize<Buffer>() inline int allocSize<Buffer>()
{ {
return Buffer::getSize() + sizeof(Buffer); return Buffer::getSize() + sizeof(Buffer);
} }
typedef Alloc<Buffer, Const::BufferAllocCacheSize> BufferAlloc;
struct BufferPos struct BufferPos
{ {

View File

@ -9,9 +9,8 @@
#include <map> #include <map>
#include "String.h" #include "String.h"
#include "Command.h" #include "Command.h"
#include "Conf.h"
Command Command::CmdPool[AvailableCommands] = { const Command Command::CmdPool[Sentinel] = {
{None, "", 0, MaxArgs, Read}, {None, "", 0, MaxArgs, Read},
{Ping, "ping", 1, 2, Read}, {Ping, "ping", 1, 2, Read},
{PingServ, "ping", 1, 2, Inner}, {PingServ, "ping", 1, 2, Inner},
@ -87,8 +86,8 @@ Command Command::CmdPool[AvailableCommands] = {
{Setrange, "setrange", 4, 4, Write}, {Setrange, "setrange", 4, 4, Write},
{Strlen, "strlen", 2, 2, Read}, {Strlen, "strlen", 2, 2, Read},
{Hdel, "hdel", 3, MaxArgs, Write}, {Hdel, "hdel", 3, MaxArgs, Write},
{Hexists, "hexists", 3, 3, Read}, {Hexists, "hexists", 3, 3, Write},
{Hget, "hget", 3, 3, Read}, {Hget, "hget", 3, 3, Write},
{Hgetall, "hgetall", 2, 2, Read}, {Hgetall, "hgetall", 2, 2, Read},
{Hincrby, "hincrby", 4, 4, Write}, {Hincrby, "hincrby", 4, 4, Write},
{Hincrbyfloat, "hincrbyfloat", 4, 4, Write}, {Hincrbyfloat, "hincrbyfloat", 4, 4, Write},
@ -97,7 +96,7 @@ Command Command::CmdPool[AvailableCommands] = {
{Hmget, "hmget", 3, MaxArgs, Read}, {Hmget, "hmget", 3, MaxArgs, Read},
{Hmset, "hmset", 4, MaxArgs, Write}, {Hmset, "hmset", 4, MaxArgs, Write},
{Hscan, "hscan", 3, 7, Read}, {Hscan, "hscan", 3, 7, Read},
{Hset, "hset", 4, MaxArgs, Write}, {Hset, "hset", 4, 4, Write},
{Hsetnx, "hsetnx", 4, 4, Write}, {Hsetnx, "hsetnx", 4, 4, Write},
{Hstrlen, "hstrlen", 3, 3, Read}, {Hstrlen, "hstrlen", 3, 3, Read},
{Hvals, "hvals", 2, 2, Read}, {Hvals, "hvals", 2, 2, Read},
@ -110,7 +109,7 @@ Command Command::CmdPool[AvailableCommands] = {
{Lpop, "lpop", 2, 2, Write}, {Lpop, "lpop", 2, 2, Write},
{Lpush, "lpush", 3, MaxArgs, Write}, {Lpush, "lpush", 3, MaxArgs, Write},
{Lpushx, "lpushx", 3, 3, Write}, {Lpushx, "lpushx", 3, 3, Write},
{Lrange, "lrange", 4, 4, Read}, {Lrange, "lrange", 4, 4, Write},
{Lrem, "lrem", 4, 4, Write}, {Lrem, "lrem", 4, 4, Write},
{Lset, "lset", 4, 4, Write}, {Lset, "lset", 4, 4, Write},
{Ltrim, "ltrim", 4, 4, Write}, {Ltrim, "ltrim", 4, 4, Write},
@ -139,8 +138,6 @@ Command Command::CmdPool[AvailableCommands] = {
{Zincrby, "zincrby", 4, 4, Write}, {Zincrby, "zincrby", 4, 4, Write},
{Zinterstore, "zinterstore", 4, MaxArgs, Write}, {Zinterstore, "zinterstore", 4, MaxArgs, Write},
{Zlexcount, "zlexcount", 4, 4, Read}, {Zlexcount, "zlexcount", 4, 4, Read},
{Zpopmax, "zpopmax", 2, 3, Write},
{Zpopmin, "zpopmin", 2, 3, Write},
{Zrange, "zrange", 4, 5, Read}, {Zrange, "zrange", 4, 5, Read},
{Zrangebylex, "zrangebylex", 4, 7, Read}, {Zrangebylex, "zrangebylex", 4, 7, Read},
{Zrangebyscore, "zrangebyscore", 4, 8, Read}, {Zrangebyscore, "zrangebyscore", 4, 8, Read},
@ -174,14 +171,11 @@ Command Command::CmdPool[AvailableCommands] = {
{SubMsg, "\000SubMsg", 0, 0, Admin} {SubMsg, "\000SubMsg", 0, 0, Admin}
}; };
int Command::Sentinel = Command::MaxCommands;
Command::CommandMap Command::CmdMap; Command::CommandMap Command::CmdMap;
void Command::init() void Command::init()
{ {
int type = 0; int type = 0;
for (auto j = 0; j < MaxCommands; j++) { for (auto& i : CmdPool) {
const auto& i = CmdPool[j];
if (i.type != type) { if (i.type != type) {
Throw(InitFail, "command %s unmatch the index in commands table", i.name); Throw(InitFail, "command %s unmatch the index in commands table", i.name);
} }
@ -193,19 +187,3 @@ void Command::init()
} }
} }
void Command::addCustomCommand(const CustomCommandConf& ccc) {
if (Sentinel >= AvailableCommands) {
Throw(InitFail, "too many custom commands(>%d)", MaxCustomCommands);
}
if (nullptr != find(ccc.name)) {
Throw(InitFail, "custom command %s is duplicated", ccc.name.c_str());
}
auto* p = &CmdPool[Sentinel];
p->name = ccc.name.c_str();
p->minArgs = ccc.minArgs;
p->maxArgs = ccc.maxArgs;
p->mode = ccc.mode;
p->type = (Command::Type)Sentinel++;
CmdMap[ccc.name] = p;
}

View File

@ -11,8 +11,6 @@
#include "Exception.h" #include "Exception.h"
#include "HashFunc.h" #include "HashFunc.h"
struct CustomCommandConf;
class Command class Command
{ {
public: public:
@ -156,8 +154,6 @@ public:
Zincrby, Zincrby,
Zinterstore, Zinterstore,
Zlexcount, Zlexcount,
Zpopmax,
Zpopmin,
Zrange, Zrange,
Zrangebylex, Zrangebylex,
Zrangebyscore, Zrangebyscore,
@ -193,9 +189,7 @@ public:
Unsubscribe, Unsubscribe,
SubMsg, SubMsg,
MaxCommands, Sentinel
MaxCustomCommands = 16,
AvailableCommands = MaxCommands + MaxCustomCommands,
}; };
enum Mode enum Mode
{ {
@ -255,11 +249,9 @@ public:
auto it = CmdMap.find(cmd); auto it = CmdMap.find(cmd);
return it == CmdMap.end() ? nullptr : it->second; return it == CmdMap.end() ? nullptr : it->second;
} }
static void addCustomCommand(const CustomCommandConf& pc);
static int Sentinel;
private: private:
static const int MaxArgs = 100000000; static const int MaxArgs = 100000000;
static Command CmdPool[]; static const Command CmdPool[Sentinel];
class H class H
{ {
public: public:

View File

@ -10,7 +10,7 @@
#include <limits.h> #include <limits.h>
#define _PREDIXY_NAME_ "predixy" #define _PREDIXY_NAME_ "predixy"
#define _PREDIXY_VERSION_ "1.0.5" #define _PREDIXY_VERSION_ "1.0.4"
namespace Const namespace Const
{ {
@ -32,8 +32,8 @@ namespace Const
static const int MaxCmdLen = 32; static const int MaxCmdLen = 32;
static const int MaxKeyLen = 512; static const int MaxKeyLen = 512;
static const int BufferAllocCacheSize = 64; static const int BufferAllocCacheSize = 64;
static const int RequestAllocCacheSize = 128; static const int RequestAllocCacheSize = 32;
static const int ResponseAllocCacheSize = 128; static const int ResponseAllocCacheSize = 32;
static const int AcceptConnectionAllocCacheSize = 32; static const int AcceptConnectionAllocCacheSize = 32;
static const int ConnectConnectionAllocCacheSize = 4; static const int ConnectConnectionAllocCacheSize = 4;
}; };

View File

@ -6,7 +6,6 @@
#include <ctype.h> #include <ctype.h>
#include <iostream> #include <iostream>
#include <sstream>
#include <fstream> #include <fstream>
#include "LogFileSink.h" #include "LogFileSink.h"
#include "ServerPool.h" #include "ServerPool.h"
@ -33,13 +32,6 @@ bool ServerConf::parse(ServerConf& s, const char* str)
return !s.addr.empty(); return !s.addr.empty();
} }
void CustomCommandConf::init(CustomCommandConf&c, const char* name) {
c.name = name;
c.minArgs = 2;
c.maxArgs = 2;
c.mode = Command::Write;
}
Conf::Conf(): Conf::Conf():
mBind("0.0.0.0:7617"), mBind("0.0.0.0:7617"),
mWorkerThreads(1), mWorkerThreads(1),
@ -125,9 +117,8 @@ void Conf::setGlobal(const ConfParser::Node* node)
{ {
const ConfParser::Node* authority = nullptr; const ConfParser::Node* authority = nullptr;
const ConfParser::Node* clusterServerPool = nullptr; const ConfParser::Node* clusterServerPool = nullptr;
const ConfParser::Node* standaloneServerPool = nullptr; const ConfParser::Node* sentinelServerPool = nullptr;
const ConfParser::Node* dataCenter = nullptr; const ConfParser::Node* dataCenter = nullptr;
std::vector<const ConfParser::Node*> latencyMonitors;
for (auto p = node; p; p = p->next) { for (auto p = node; p; p = p->next) {
if (setStr(mName, "Name", p)) { if (setStr(mName, "Name", p)) {
} else if (setStr(mBind, "Bind", p)) { } else if (setStr(mBind, "Bind", p)) {
@ -156,19 +147,16 @@ void Conf::setGlobal(const ConfParser::Node* node)
} else if (setInt(mLogSample[LogLevel::Warn], "LogWarnSample", p)) { } else if (setInt(mLogSample[LogLevel::Warn], "LogWarnSample", p)) {
} else if (setInt(mLogSample[LogLevel::Error], "LogErrorSample", p)) { } else if (setInt(mLogSample[LogLevel::Error], "LogErrorSample", p)) {
} else if (strcasecmp(p->key.c_str(), "LatencyMonitor") == 0) { } else if (strcasecmp(p->key.c_str(), "LatencyMonitor") == 0) {
latencyMonitors.push_back(p); mLatencyMonitors.push_back(LatencyMonitorConf{});
setLatencyMonitor(mLatencyMonitors.back(), p);
} else if (strcasecmp(p->key.c_str(), "Authority") == 0) { } else if (strcasecmp(p->key.c_str(), "Authority") == 0) {
authority = p; authority = p;
} else if (strcasecmp(p->key.c_str(), "ClusterServerPool") == 0) { } else if (strcasecmp(p->key.c_str(), "ClusterServerPool") == 0) {
clusterServerPool = p; clusterServerPool = p;
} else if (strcasecmp(p->key.c_str(), "SentinelServerPool") == 0) { } else if (strcasecmp(p->key.c_str(), "SentinelServerPool") == 0) {
standaloneServerPool = p; sentinelServerPool = p;
} else if (strcasecmp(p->key.c_str(), "StandaloneServerPool") == 0) {
standaloneServerPool = p;
} else if (strcasecmp(p->key.c_str(), "DataCenter") == 0) { } else if (strcasecmp(p->key.c_str(), "DataCenter") == 0) {
dataCenter = p; dataCenter = p;
} else if (strcasecmp(p->key.c_str(), "CustomCommand") == 0) {
setCustomCommand(p);
} else { } else {
Throw(UnknownKey, "%s:%d unknown key %s", p->file, p->line, p->key.c_str()); Throw(UnknownKey, "%s:%d unknown key %s", p->file, p->line, p->key.c_str());
} }
@ -176,27 +164,20 @@ void Conf::setGlobal(const ConfParser::Node* node)
if (authority) { if (authority) {
setAuthority(authority); setAuthority(authority);
} }
if (clusterServerPool && standaloneServerPool) { if (clusterServerPool && sentinelServerPool) {
Throw(LogicError, "Can't define ClusterServerPool/StandaloneServerPool at the same time"); Throw(LogicError, "Can't define ClusterServerPool and SentinelServerPool at the same time");
} else if (clusterServerPool) { } else if (clusterServerPool) {
setClusterServerPool(clusterServerPool); setClusterServerPool(clusterServerPool);
mServerPoolType = ServerPool::Cluster; mServerPoolType = ServerPool::Cluster;
} else if (standaloneServerPool) { } else if (sentinelServerPool) {
if (strcasecmp(standaloneServerPool->key.c_str(), "SentinelServerPool") == 0) { setSentinelServerPool(sentinelServerPool);
mStandaloneServerPool.refreshMethod = ServerPoolRefreshMethod::Sentinel; mServerPoolType = ServerPool::Sentinel;
}
setStandaloneServerPool(standaloneServerPool);
mServerPoolType = ServerPool::Standalone;
} else { } else {
Throw(LogicError, "Must define a server pool"); Throw(LogicError, "Must define a server pool");
} }
if (dataCenter) { if (dataCenter) {
setDataCenter(dataCenter); setDataCenter(dataCenter);
} }
for (auto& latencyMonitor : latencyMonitors) {
mLatencyMonitors.push_back(LatencyMonitorConf{});
setLatencyMonitor(mLatencyMonitors.back(), latencyMonitor);
}
} }
static void setKeyPrefix(std::vector<std::string>& dat, const std::string& v) static void setKeyPrefix(std::vector<std::string>& dat, const std::string& v)
@ -258,21 +239,28 @@ void Conf::setAuthority(const ConfParser::Node* node)
bool Conf::setServerPool(ServerPoolConf& sp, const ConfParser::Node* p) bool Conf::setServerPool(ServerPoolConf& sp, const ConfParser::Node* p)
{ {
bool ret = true;
if (setStr(sp.password, "Password", p)) { if (setStr(sp.password, "Password", p)) {
return true;
} else if (setInt(sp.masterReadPriority, "MasterReadPriority", p, 0, 100)) { } else if (setInt(sp.masterReadPriority, "MasterReadPriority", p, 0, 100)) {
return true;
} else if (setInt(sp.staticSlaveReadPriority, "StaticSlaveReadPriority", p, 0, 100)) { } else if (setInt(sp.staticSlaveReadPriority, "StaticSlaveReadPriority", p, 0, 100)) {
return true;
} else if (setInt(sp.dynamicSlaveReadPriority, "DynamicSlaveReadPriority", p, 0, 100)) { } else if (setInt(sp.dynamicSlaveReadPriority, "DynamicSlaveReadPriority", p, 0, 100)) {
return true;
} else if (setDuration(sp.refreshInterval, "RefreshInterval", p)) { } else if (setDuration(sp.refreshInterval, "RefreshInterval", p)) {
return true;
} else if (setDuration(sp.serverTimeout, "ServerTimeout", p)) { } else if (setDuration(sp.serverTimeout, "ServerTimeout", p)) {
return true;
} else if (setInt(sp.serverFailureLimit, "ServerFailureLimit", p, 1)) { } else if (setInt(sp.serverFailureLimit, "ServerFailureLimit", p, 1)) {
return true;
} else if (setDuration(sp.serverRetryTimeout, "ServerRetryTimeout", p)) { } else if (setDuration(sp.serverRetryTimeout, "ServerRetryTimeout", p)) {
return true;
} else if (setInt(sp.keepalive, "KeepAlive", p, 0)) { } else if (setInt(sp.keepalive, "KeepAlive", p, 0)) {
return true;
} else if (setInt(sp.databases, "Databases", p, 1, 128)) { } else if (setInt(sp.databases, "Databases", p, 1, 128)) {
} else { return true;
ret = false;
} }
return ret; return false;
} }
void Conf::setClusterServerPool(const ConfParser::Node* node) void Conf::setClusterServerPool(const ConfParser::Node* node)
@ -296,47 +284,41 @@ void Conf::setClusterServerPool(const ConfParser::Node* node)
} }
} }
void Conf::setStandaloneServerPool(const ConfParser::Node* node) void Conf::setSentinelServerPool(const ConfParser::Node* node)
{ {
if (!node->sub) { if (!node->sub) {
Throw(InvalidValue, "%s:%d StandaloneServerPool require scope value", node->file, node->line); Throw(InvalidValue, "%s:%d SentinelServerPool require scope value", node->file, node->line);
} }
mStandaloneServerPool.hashTag[0] = '\0'; mSentinelServerPool.hashTag[0] = '\0';
mStandaloneServerPool.hashTag[1] = '\0'; mSentinelServerPool.hashTag[1] = '\0';
for (auto p = node->sub; p; p = p->next) { for (auto p = node->sub; p; p = p->next) {
if (setServerPool(mStandaloneServerPool, p)) { if (setServerPool(mSentinelServerPool, p)) {
} else if (strcasecmp(p->key.c_str(), "RefreshMethod") == 0) {
try {
mStandaloneServerPool.refreshMethod = ServerPoolRefreshMethod::parse(p->val.c_str());
} catch (ServerPoolRefreshMethod::InvalidEnumValue& excp) {
Throw(InvalidValue, "%s:%d unknown RefreshMethod:%s", p->file, p->line, p->val.c_str());
}
} else if (strcasecmp(p->key.c_str(), "Distribution") == 0) { } else if (strcasecmp(p->key.c_str(), "Distribution") == 0) {
mStandaloneServerPool.dist = Distribution::parse(p->val.c_str()); mSentinelServerPool.dist = Distribution::parse(p->val.c_str());
if (mStandaloneServerPool.dist == Distribution::None) { if (mSentinelServerPool.dist == Distribution::None) {
Throw(InvalidValue, "%s:%d unknown Distribution", p->file, p->line); Throw(InvalidValue, "%s:%d unknown Distribution", p->file, p->line);
} }
} else if (strcasecmp(p->key.c_str(), "Hash") == 0) { } else if (strcasecmp(p->key.c_str(), "Hash") == 0) {
mStandaloneServerPool.hash = Hash::parse(p->val.c_str()); mSentinelServerPool.hash = Hash::parse(p->val.c_str());
if (mStandaloneServerPool.hash == Hash::None) { if (mSentinelServerPool.hash == Hash::None) {
Throw(InvalidValue, "%s:%d unknown Hash", p->file, p->line); Throw(InvalidValue, "%s:%d unknown Hash", p->file, p->line);
} }
} else if (strcasecmp(p->key.c_str(), "HashTag") == 0) { } else if (strcasecmp(p->key.c_str(), "HashTag") == 0) {
if (p->val.empty()) { if (p->val.empty()) {
mStandaloneServerPool.hashTag[0] = '\0'; mSentinelServerPool.hashTag[0] = '\0';
mStandaloneServerPool.hashTag[1] = '\0'; mSentinelServerPool.hashTag[1] = '\0';
} else if (p->val.size() == 2) { } else if (p->val.size() == 2) {
mStandaloneServerPool.hashTag[0] = p->val[0]; mSentinelServerPool.hashTag[0] = p->val[0];
mStandaloneServerPool.hashTag[1] = p->val[1]; mSentinelServerPool.hashTag[1] = p->val[1];
} else { } else {
Throw(InvalidValue, "%s:%d HashTag invalid", p->file, p->line); Throw(InvalidValue, "%s:%d HashTag invalid", p->file, p->line);
} }
} else if (setServers(mStandaloneServerPool.sentinels, "Sentinels", p)) { } else if (setServers(mSentinelServerPool.sentinels, "Sentinels", p)) {
mStandaloneServerPool.sentinelPassword = p->val; mSentinelServerPool.sentinelPassword = p->val;
} else if (strcasecmp(p->key.c_str(), "Group") == 0) { } else if (strcasecmp(p->key.c_str(), "Group") == 0) {
mStandaloneServerPool.groups.push_back(ServerGroupConf{p->val}); mSentinelServerPool.groups.push_back(ServerGroupConf{p->val});
if (p->sub) { if (p->sub) {
auto& g = mStandaloneServerPool.groups.back(); auto& g = mSentinelServerPool.groups.back();
setServers(g.servers, "Group", p); setServers(g.servers, "Group", p);
} }
} else { } else {
@ -344,31 +326,18 @@ void Conf::setStandaloneServerPool(const ConfParser::Node* node)
p->file, p->line, p->key.c_str()); p->file, p->line, p->key.c_str());
} }
} }
if (mStandaloneServerPool.groups.empty()) { if (mSentinelServerPool.sentinels.empty()) {
Throw(LogicError, "StandaloneServerPool no server group"); Throw(LogicError, "SentinelServerPool no sentinel server");
} }
if (mStandaloneServerPool.refreshMethod == ServerPoolRefreshMethod::None) { if (mSentinelServerPool.groups.empty()) {
Throw(LogicError, "StandaloneServerPool must define RefreshMethod"); Throw(LogicError, "SentinelServerPool no server group");
} else if (mStandaloneServerPool.refreshMethod == ServerPoolRefreshMethod::Sentinel) {
if (mStandaloneServerPool.sentinels.empty()) {
Throw(LogicError, "StandaloneServerPool with RefreshMethod(sentinel) but no sentinel servers");
} }
} else { if (mSentinelServerPool.groups.size() > 1) {
if (!mStandaloneServerPool.sentinels.empty()) { if (mSentinelServerPool.dist == Distribution::None) {
Throw(LogicError, "StandaloneServerPool with Sentinels but RefreshMethod is not sentinel"); Throw(LogicError, "SentinelServerPool must define Dsitribution in multi groups");
} }
for (auto& g : mStandaloneServerPool.groups) { if (mSentinelServerPool.hash == Hash::None) {
if (g.servers.empty()) { Throw(LogicError, "SentinelServerPool must define Hash in multi groups");
Throw(LogicError, "Group(%s) must add servers", g.name.c_str());
}
}
}
if (mStandaloneServerPool.groups.size() > 1) {
if (mStandaloneServerPool.dist == Distribution::None) {
Throw(LogicError, "StandaloneServerPool must define Dsitribution in multi groups");
}
if (mStandaloneServerPool.hash == Hash::None) {
Throw(LogicError, "StandaloneServerPool must define Hash in multi groups");
} }
} }
} }
@ -389,83 +358,6 @@ void Conf::setDataCenter(const ConfParser::Node* node)
} }
} }
void Conf::setCustomCommand(const ConfParser::Node* node)
{
if (!node->sub) {
Throw(InvalidValue, "%s:%d CustomCommand require scope value", node->file, node->line);
}
for (auto p = node->sub; p; p = p->next) {
mCustomCommands.push_back(CustomCommandConf{});
auto& cc = mCustomCommands.back();
CustomCommandConf::init(cc, p->key.c_str());
auto s = p->sub;
for (;s ; s = s->next) {
if (setInt(cc.minArgs, "MinArgs", s, 2)) {
} else if (setInt(cc.maxArgs, "MaxArgs", s, 2, 9999)) {
} else if (setCommandMode(cc.mode, "Mode", s)) {
} else {
Throw(UnknownKey, "%s:%d unknown key %s", s->file, s->line, s->key.c_str());
}
}
if (cc.maxArgs < cc.minArgs) {
Throw(InvalidValue, "%s:%d must be MaxArgs >= MinArgs", p->file, p->line);
}
}
for (const auto& cc : mCustomCommands) {
Command::addCustomCommand(cc);
}
}
bool Conf::setCommandMode(int& mode, const char* name, const ConfParser::Node* n, const int defaultMode)
{
if (strcasecmp(name, n->key.c_str()) != 0) {
return false;
}
if (n->val.size() == 0) {
mode = defaultMode;
} else {
mode = 0;
std::string mask;
std::istringstream is(n->val);
while (std::getline(is, mask, '|')) {
if ((strcasecmp(mask.c_str(), "write") == 0)) {
mode |= Command::Write;
} else if ((strcasecmp(mask.c_str(), "read") == 0)) {
mode |= Command::Read;
} else if ((strcasecmp(mask.c_str(), "admin") == 0)) {
mode |= Command::Admin;
} else if ((strcasecmp(mask.c_str(), "keyAt2") == 0)) {
mode |= Command::KeyAt2;
} else if ((strcasecmp(mask.c_str(), "keyAt3") == 0)) {
mode |= Command::KeyAt3;
} else {
Throw(InvalidValue, "%s:%d unknown mode %s", n->file, n->line, mask.c_str());
}
}
switch (mode & Command::KeyMask) {
case 0:
case Command::KeyAt2:
case Command::KeyAt3:
break;
default:
Throw(InvalidValue, "%s:%d %s require exclusive key pos", n->file, n->line, name);
}
switch (mode & Command::AuthMask) {
case 0:
mode |= Command::Write;
break;
case Command::Read:
case Command::Write:
case Command::Admin:
break;
default:
Throw(InvalidValue, "%s:%d %s require exclusive mode", n->file, n->line, name);
}
}
return true;
}
void Conf::setDC(DCConf& dc, const ConfParser::Node* node) void Conf::setDC(DCConf& dc, const ConfParser::Node* node)
{ {
if (!node->sub) { if (!node->sub) {

View File

@ -19,8 +19,6 @@
#include "Distribution.h" #include "Distribution.h"
#include "ConfParser.h" #include "ConfParser.h"
#include "Auth.h" #include "Auth.h"
#include "Command.h"
#include "Enums.h"
struct AuthConf struct AuthConf
{ {
@ -64,9 +62,8 @@ struct ClusterServerPoolConf : public ServerPoolConf
std::vector<ServerConf> servers; std::vector<ServerConf> servers;
}; };
struct StandaloneServerPoolConf : public ServerPoolConf struct SentinelServerPoolConf : public ServerPoolConf
{ {
ServerPoolRefreshMethod refreshMethod = ServerPoolRefreshMethod::None;
Distribution dist = Distribution::None; Distribution dist = Distribution::None;
Hash hash = Hash::None; Hash hash = Hash::None;
char hashTag[2]; char hashTag[2];
@ -92,20 +89,10 @@ struct DCConf
struct LatencyMonitorConf struct LatencyMonitorConf
{ {
std::string name; std::string name;
std::bitset<Command::AvailableCommands> cmds; std::bitset<Command::Sentinel> cmds;
std::vector<long> timeSpan;//us std::vector<long> timeSpan;//us
}; };
struct CustomCommandConf
{
std::string name;
int minArgs;
int maxArgs;
int mode;
static void init(CustomCommandConf &c, const char* name);
};
class Conf class Conf
{ {
public: public:
@ -177,9 +164,9 @@ public:
{ {
return mClusterServerPool; return mClusterServerPool;
} }
const StandaloneServerPoolConf& standaloneServerPool() const const SentinelServerPoolConf& sentinelServerPool() const
{ {
return mStandaloneServerPool; return mSentinelServerPool;
} }
const std::string& localDC() const const std::string& localDC() const
{ {
@ -200,7 +187,7 @@ private:
void setGlobal(const ConfParser::Node* node); void setGlobal(const ConfParser::Node* node);
void setAuthority(const ConfParser::Node* node); void setAuthority(const ConfParser::Node* node);
void setClusterServerPool(const ConfParser::Node* node); void setClusterServerPool(const ConfParser::Node* node);
void setStandaloneServerPool(const ConfParser::Node* node); void setSentinelServerPool(const ConfParser::Node* node);
void setDataCenter(const ConfParser::Node* node); void setDataCenter(const ConfParser::Node* node);
void check(); void check();
bool setServerPool(ServerPoolConf& sp, const ConfParser::Node* n); bool setServerPool(ServerPoolConf& sp, const ConfParser::Node* n);
@ -214,8 +201,6 @@ private:
void setDC(DCConf& dc, const ConfParser::Node* n); void setDC(DCConf& dc, const ConfParser::Node* n);
void setReadPolicy(ReadPolicyConf& c, const ConfParser::Node* n); void setReadPolicy(ReadPolicyConf& c, const ConfParser::Node* n);
void setLatencyMonitor(LatencyMonitorConf& m, const ConfParser::Node* n); void setLatencyMonitor(LatencyMonitorConf& m, const ConfParser::Node* n);
void setCustomCommand(const ConfParser::Node* n);
bool setCommandMode(int& mode, const char* name, const ConfParser::Node* n, const int defaultMode = Command::Write);
private: private:
std::string mName; std::string mName;
std::string mBind; std::string mBind;
@ -231,11 +216,10 @@ private:
std::vector<AuthConf> mAuthConfs; std::vector<AuthConf> mAuthConfs;
int mServerPoolType; int mServerPoolType;
ClusterServerPoolConf mClusterServerPool; ClusterServerPoolConf mClusterServerPool;
StandaloneServerPoolConf mStandaloneServerPool; SentinelServerPoolConf mSentinelServerPool;
std::vector<DCConf> mDCConfs; std::vector<DCConf> mDCConfs;
std::string mLocalDC; std::string mLocalDC;
std::vector<LatencyMonitorConf> mLatencyMonitors; std::vector<LatencyMonitorConf> mLatencyMonitors;
std::vector<CustomCommandConf> mCustomCommands;
}; };

View File

@ -283,13 +283,9 @@ Done:
case SValBody: case SValBody:
return KeyVal; return KeyVal;
case VValBody: case VValBody:
{
auto ret = KeyVal;
val.assign(line, pos, line.size() - pos); val.assign(line, pos, line.size() - pos);
if (val.back() == '{') { if (val.back() == '{') {
val.resize(val.size() - 1); val.resize(val.size() - 1);
ret = BeginScope;
}
int vsp = 0; int vsp = 0;
for (auto it = val.rbegin(); it != val.rend(); ++it) { for (auto it = val.rbegin(); it != val.rend(); ++it) {
if (isspace(*it)) { if (isspace(*it)) {
@ -297,7 +293,9 @@ Done:
} }
} }
val.resize(val.size() - vsp); val.resize(val.size() - vsp);
return ret; return BeginScope;
} else {
return KeyVal;
} }
case ScopeReady: case ScopeReady:
return KeyVal; return KeyVal;

View File

@ -23,7 +23,6 @@ public:
typedef ConnectConnection Value; typedef ConnectConnection Value;
typedef ListNode<ConnectConnection> ListNodeType; typedef ListNode<ConnectConnection> ListNodeType;
typedef DequeNode<ConnectConnection> DequeNodeType; typedef DequeNode<ConnectConnection> DequeNodeType;
typedef Alloc<ConnectConnection, Const::ConnectConnectionAllocCacheSize> Allocator;
public: public:
ConnectConnection(Server* s, bool shared); ConnectConnection(Server* s, bool shared);
~ConnectConnection(); ~ConnectConnection();
@ -98,6 +97,6 @@ private:
typedef List<ConnectConnection> ConnectConnectionList; typedef List<ConnectConnection> ConnectConnectionList;
typedef Deque<ConnectConnection> ConnectConnectionDeque; typedef Deque<ConnectConnection> ConnectConnectionDeque;
typedef ConnectConnection::Allocator ConnectConnectionAlloc; typedef Alloc<ConnectConnection, Const::ConnectConnectionAllocCacheSize> ConnectConnectionAlloc;
#endif #endif

View File

@ -27,7 +27,7 @@ BufferPtr Connection::getBuffer(Handler* h, bool allowNew)
} }
} }
if (!mBuf || mBuf->full()) { if (!mBuf || mBuf->full()) {
BufferPtr buf = BufferAlloc::create(); BufferPtr buf = Alloc<Buffer>::create();
if (mBuf) { if (mBuf) {
mBuf->concat(buf); mBuf->concat(buf);
} }

View File

@ -1,15 +0,0 @@
/*
* predixy - A high performance and full features proxy for redis.
* Copyright (C) 2017 Joyield, Inc. <joyield.com@gmail.com>
* All rights reserved.
*/
#include "Enums.h"
const ServerPoolRefreshMethod::TypeName
ServerPoolRefreshMethod::sPairs[3] = {
{ServerPoolRefreshMethod::None, "none"},
{ServerPoolRefreshMethod::Fixed, "fixed"},
{ServerPoolRefreshMethod::Sentinel, "sentinel"},
};

View File

@ -1,74 +0,0 @@
/*
* predixy - A high performance and full features proxy for redis.
* Copyright (C) 2017 Joyield, Inc. <joyield.com@gmail.com>
* All rights reserved.
*/
#ifndef _PREDIXY_ENUMS_H_
#define _PREDIXY_ENUMS_H_
#include <string.h>
#include <strings.h>
#include "Exception.h"
template<class T>
class EnumBase
{
public:
DefException(InvalidEnumValue);
struct TypeName
{
int type;
const char* name;
};
public:
EnumBase(int t):
mType(t)
{
}
int value() const
{
return mType;
}
bool operator==(const T& t) const
{
return t.value() == mType;
}
bool operator!=(const T& t) const
{
return t.value() != mType;
}
const char* name() const
{
return T::sPairs[mType].name;
}
static T parse(const char* str)
{
for (auto& i : T::sPairs) {
if (strcasecmp(i.name, str) == 0) {
return T(typename T::Type(i.type));
}
}
Throw(InvalidEnumValue, "invalid enum value:%s", str);
}
protected:
int mType;
};
class ServerPoolRefreshMethod : public EnumBase<ServerPoolRefreshMethod>
{
public:
enum Type
{
None,
Fixed,
Sentinel
};
static const TypeName sPairs[3];
ServerPoolRefreshMethod(Type t = None):
EnumBase<ServerPoolRefreshMethod>(t)
{
}
};
#endif

View File

@ -31,9 +31,12 @@ bool EpollMultiplexor::addSocket(Socket* s, int evts)
event.events |= (evts & ReadEvent) ? EPOLLIN : 0; event.events |= (evts & ReadEvent) ? EPOLLIN : 0;
event.events |= (evts & WriteEvent) ? EPOLLOUT : 0; event.events |= (evts & WriteEvent) ? EPOLLOUT : 0;
event.events |= EPOLLET; event.events |= EPOLLET;
//event.events |= EPOLLONESHOT;
event.data.ptr = s; event.data.ptr = s;
s->setEvent(evts);
int ret = epoll_ctl(mFd, EPOLL_CTL_ADD, s->fd(), &event); int ret = epoll_ctl(mFd, EPOLL_CTL_ADD, s->fd(), &event);
if (ret == 0) {
s->setEvent(evts);
}
return ret == 0; return ret == 0;
} }
@ -58,6 +61,7 @@ bool EpollMultiplexor::addEvent(Socket* s, int evts)
} }
if ((s->getEvent() | evts) != s->getEvent()) { if ((s->getEvent() | evts) != s->getEvent()) {
event.events |= EPOLLET; event.events |= EPOLLET;
//event.events |= EPOLLONESHOT;
int ret = epoll_ctl(mFd, EPOLL_CTL_MOD, s->fd(), &event); int ret = epoll_ctl(mFd, EPOLL_CTL_MOD, s->fd(), &event);
if (ret == 0) { if (ret == 0) {
s->setEvent(s->getEvent() | evts); s->setEvent(s->getEvent() | evts);

View File

@ -215,6 +215,7 @@ void Handler::postAcceptConnectionEvent()
auto cp = mConnPool[s->server()->id()]; auto cp = mConnPool[s->server()->id()];
s->setStatus(Connection::LogicError); s->setStatus(Connection::LogicError);
addPostEvent(s, Multiplexor::ErrorEvent); addPostEvent(s, Multiplexor::ErrorEvent);
cp->putPrivateConnection(s);
c->detachConnectConnection(); c->detachConnectConnection();
s->detachAcceptConnection(); s->detachAcceptConnection();
} }
@ -275,9 +276,6 @@ void Handler::postConnectConnectionEvent()
s->status(), s->statusStr()); s->status(), s->statusStr());
mEventLoop->delSocket(s); mEventLoop->delSocket(s);
s->close(this); s->close(this);
if (!s->isShared()) {
mConnPool[s->server()->id()]->putPrivateConnection(s);
}
if (c) { if (c) {
addPostEvent(c, Multiplexor::ErrorEvent); addPostEvent(c, Multiplexor::ErrorEvent);
s->detachAcceptConnection(); s->detachAcceptConnection();
@ -317,6 +315,7 @@ void Handler::addAcceptSocket(int fd, sockaddr* addr, socklen_t len)
AcceptConnection* c = nullptr; AcceptConnection* c = nullptr;
try { try {
c = AcceptConnectionAlloc::create(fd, addr, len); c = AcceptConnectionAlloc::create(fd, addr, len);
logNotice("h %d accept c %s %d", id(), c->peer(), fd);
} catch (ExceptionBase& e) { } catch (ExceptionBase& e) {
logWarn("h %d create connection for client %d fail %s", logWarn("h %d create connection for client %d fail %s",
id(), fd, e.what()); id(), fd, e.what());
@ -369,8 +368,6 @@ void Handler::addAcceptSocket(int fd, sockaddr* addr, socklen_t len)
logWarn("h %d destroy c %s %d with add to event loop fail:%s", logWarn("h %d destroy c %s %d with add to event loop fail:%s",
id(), c->peer(), c->fd(), StrError()); id(), c->peer(), c->fd(), StrError());
AcceptConnectionAlloc::destroy(c); AcceptConnectionAlloc::destroy(c);
} else {
logNotice("h %d accept c %s %d assign to h %d", id(), c->peer(), fd, dst->id());
} }
} }

View File

@ -67,5 +67,6 @@ int KqueueMultiplexor::wait(long usec, T* handler)
typedef KqueueMultiplexor Multiplexor; typedef KqueueMultiplexor Multiplexor;
#define _MULTIPLEXOR_ASYNC_ASSIGN_
#endif #endif

View File

@ -98,7 +98,7 @@ public:
Buffer* output(Buffer* buf) const; Buffer* output(Buffer* buf) const;
private: private:
String mName; String mName;
const std::bitset<Command::AvailableCommands>* mCmds; const std::bitset<Command::Sentinel>* mCmds;
std::vector<TimeSpan> mTimeSpan; std::vector<TimeSpan> mTimeSpan;
TimeSpan mLast; TimeSpan mLast;
}; };

View File

@ -72,7 +72,6 @@ objs = \
Buffer.o \ Buffer.o \
Command.o \ Command.o \
Distribution.o \ Distribution.o \
Enums.o \
Reply.o \ Reply.o \
ConfParser.o \ ConfParser.o \
Conf.o \ Conf.o \
@ -88,7 +87,7 @@ objs = \
ServerPool.o \ ServerPool.o \
ClusterNodesParser.o \ ClusterNodesParser.o \
ClusterServerPool.o \ ClusterServerPool.o \
StandaloneServerPool.o \ SentinelServerPool.o \
ConnectConnectionPool.o \ ConnectConnectionPool.o \
Handler.o \ Handler.o \
Proxy.o \ Proxy.o \

View File

@ -118,10 +118,10 @@ bool Proxy::init(int argc, char* argv[])
mServPool = p; mServPool = p;
} }
break; break;
case ServerPool::Standalone: case ServerPool::Sentinel:
{ {
StandaloneServerPool* p = new StandaloneServerPool(this); SentinelServerPool* p = new SentinelServerPool(this);
p->init(mConf->standaloneServerPool()); p->init(mConf->sentinelServerPool());
mServPool = p; mServPool = p;
} }
break; break;

View File

@ -13,7 +13,7 @@
#include "DC.h" #include "DC.h"
#include "ServerPool.h" #include "ServerPool.h"
#include "ClusterServerPool.h" #include "ClusterServerPool.h"
#include "StandaloneServerPool.h" #include "SentinelServerPool.h"
#include "LatencyMonitor.h" #include "LatencyMonitor.h"
class Proxy class Proxy
@ -51,15 +51,15 @@ public:
} }
bool isSplitMultiKey() const bool isSplitMultiKey() const
{ {
return mConf->standaloneServerPool().groups.size() != 1; return mConf->sentinelServerPool().groups.size() != 1;
} }
bool supportTransaction() const bool supportTransaction() const
{ {
return mConf->standaloneServerPool().groups.size() == 1; return mConf->sentinelServerPool().groups.size() == 1;
} }
bool supportSubscribe() const bool supportSubscribe() const
{ {
return mConf->standaloneServerPool().groups.size() == 1 || return mConf->sentinelServerPool().groups.size() == 1 ||
mConf->clusterServerPool().servers.size() > 0; mConf->clusterServerPool().servers.size() > 0;
} }
const std::vector<Handler*>& handlers() const const std::vector<Handler*>& handlers() const

View File

@ -98,12 +98,10 @@ Request::Request(GenericCode code):
Request::~Request() Request::~Request()
{ {
clear();
} }
void Request::clear() void Request::clear()
{ {
mConn = nullptr;
mRes = nullptr; mRes = nullptr;
mHead.clear(); mHead.clear();
mReq.clear(); mReq.clear();
@ -157,14 +155,13 @@ void Request::set(const RequestParser& p, Request* leader)
} }
mHead = r->mReq; mHead = r->mReq;
mReq = p.request(); mReq = p.request();
mLeader = leader;
if (leader == this) { if (leader == this) {
if (mType == Command::Mset || mType == Command::Msetnx) { if (mType == Command::Mset || mType == Command::Msetnx) {
mFollowers = (p.argNum() - 1) >> 1; mFollowers = (p.argNum() - 1) >> 1;
} else { } else {
mFollowers = p.argNum() - 1; mFollowers = p.argNum() - 1;
} }
} else {
mLeader = leader;
} }
} else { } else {
mReq = p.request(); mReq = p.request();
@ -290,11 +287,10 @@ void Request::adjustScanCursor(long cursor)
void Request::follow(Request* leader) void Request::follow(Request* leader)
{ {
leader->mFollowers += 1; ++mFollowers;
if (leader == this) { if (leader == this) {
return; return;
} }
mConn = leader->mConn;
mType = leader->mType; mType = leader->mType;
mHead = leader->mHead; mHead = leader->mHead;
mReq = leader->mReq; mReq = leader->mReq;
@ -342,49 +338,49 @@ int Request::fill(IOVec* vecs, int len)
void Request::setResponse(Response* res) void Request::setResponse(Response* res)
{ {
mDone = true; mDone = true;
if (Request* ld = leader()) { if (mLeader) {
ld->mFollowersDone += 1; mLeader->mFollowersDone += 1;
switch (mType) { switch (mType) {
case Command::Mget: case Command::Mget:
mRes = res; mRes = res;
break; break;
case Command::Mset: case Command::Mset:
if (Response* leaderRes = ld->getResponse()) { if (Response* leaderRes = mLeader->getResponse()) {
if (res->isError() && !leaderRes->isError()) { if (res->isError() && !leaderRes->isError()) {
ld->mRes = res; mLeader->mRes = res;
} }
} else { } else {
ld->mRes = res; mLeader->mRes = res;
} }
break; break;
case Command::Msetnx: case Command::Msetnx:
if (Response* leaderRes = ld->getResponse()) { if (Response* leaderRes = mLeader->getResponse()) {
if (!leaderRes->isError() && if (!leaderRes->isError() &&
(res->isError() || res->integer() == 0)) { (res->isError() || res->integer() == 0)) {
ld->mRes = res; mLeader->mRes = res;
} }
} else { } else {
ld->mRes = res; mLeader->mRes = res;
} }
break; break;
case Command::Touch: case Command::Touch:
case Command::Exists: case Command::Exists:
case Command::Del: case Command::Del:
case Command::Unlink: case Command::Unlink:
if (!ld->mRes) { if (!mLeader->mRes) {
ld->mRes = res; mLeader->mRes = res;
} }
if (ld->isDone()) { if (mLeader->isDone()) {
ld->mRes->set(ld->mRes->integer()); mLeader->mRes->set(mLeader->mRes->integer());
} }
break; break;
case Command::ScriptLoad: case Command::ScriptLoad:
if (Response* leaderRes = ld->getResponse()) { if (Response* leaderRes = mLeader->getResponse()) {
if (leaderRes->isString() && !res->isString()) { if (leaderRes->isString() && !res->isString()) {
ld->mRes = res; mLeader->mRes = res;
} }
} else { } else {
ld->mRes = res; mLeader->mRes = res;
} }
break; break;
default: default:
@ -399,7 +395,7 @@ void Request::setResponse(Response* res)
bool Request::isDone() const bool Request::isDone() const
{ {
if (isLeader()) { if (mLeader == this) {
switch (mType) { switch (mType) {
case Command::Mget: case Command::Mget:
case Command::Psubscribe: case Command::Psubscribe:

View File

@ -25,7 +25,6 @@ class Request :
public: public:
typedef Request Value; typedef Request Value;
typedef ListNode<Request, SharePtr<Request>, RequestListIndex::Size> ListNodeType; typedef ListNode<Request, SharePtr<Request>, RequestListIndex::Size> ListNodeType;
typedef Alloc<Request, Const::RequestAllocCacheSize> Allocator;
static const int MaxRedirectLimit = 3; static const int MaxRedirectLimit = 3;
enum GenericCode enum GenericCode
{ {
@ -120,11 +119,11 @@ public:
} }
Request* leader() const Request* leader() const
{ {
return isLeader() ? const_cast<Request*>(this) : (Request*)mLeader; return mLeader;
} }
bool isLeader() const bool isLeader() const
{ {
return mFollowers > 0; return mLeader == this;
} }
bool isDelivered() const bool isDelivered() const
{ {
@ -182,6 +181,6 @@ private:
typedef List<Request, RequestListIndex::Recv> RecvRequestList; typedef List<Request, RequestListIndex::Recv> RecvRequestList;
typedef List<Request, RequestListIndex::Send> SendRequestList; typedef List<Request, RequestListIndex::Send> SendRequestList;
typedef Request::Allocator RequestAlloc; typedef Alloc<Request, Const::RequestAllocCacheSize> RequestAlloc;
#endif #endif

View File

@ -18,7 +18,6 @@ class Response :
public: public:
typedef Response Value; typedef Response Value;
typedef ListNode<Response, SharePtr<Response>> ListNodeType; typedef ListNode<Response, SharePtr<Response>> ListNodeType;
typedef Alloc<Response, Const::ResponseAllocCacheSize> Allocator;
enum GenericCode enum GenericCode
{ {
Pong, Pong,
@ -138,6 +137,6 @@ private:
}; };
typedef List<Response> ResponseList; typedef List<Response> ResponseList;
typedef Response::Allocator ResponseAlloc; typedef Alloc<Response, Const::ResponseAllocCacheSize> ResponseAlloc;
#endif #endif

View File

@ -162,9 +162,6 @@ Server* ServerGroup::getReadServer(Handler* h, DC* localDC) const
continue; continue;
} }
DC* dc = s->dc(); DC* dc = s->dc();
if (!dc) {
continue;
}
int dcrp = localDC->getReadPriority(dc); int dcrp = localDC->getReadPriority(dc);
if (dcrp <= 0) { if (dcrp <= 0) {
continue; continue;
@ -224,7 +221,7 @@ Server* ServerGroup::getReadServer(Handler* h, DC* localDC) const
dc = sdc[0]; dc = sdc[0];
found = true; found = true;
} }
if (!found) { if (!found) {//dc maybe nullptr even we found
return nullptr; return nullptr;
} }
Server* deadServs[Const::MaxServInGroup]; Server* deadServs[Const::MaxServInGroup];

View File

@ -20,7 +20,7 @@ public:
{ {
Unknown, Unknown,
Cluster, Cluster,
Standalone Sentinel
}; };
static const int DefaultServerRetryTimeout = 10000000; static const int DefaultServerRetryTimeout = 10000000;
static const int DefaultRefreshInterval = 1000000; static const int DefaultRefreshInterval = 1000000;

View File

@ -102,7 +102,7 @@ void Socket::getFirstAddr(const char* addr, int type, int protocol, sockaddr* re
} else { } else {
std::string tmp; std::string tmp;
const char* host = addr; const char* host = addr;
const char* port = strrchr(addr, ':'); const char* port = strchr(addr, ':');
if (port) { if (port) {
tmp.append(addr, port - addr); tmp.append(addr, port - addr);
host = tmp.c_str(); host = tmp.c_str();
@ -162,7 +162,6 @@ bool Socket::setTcpKeepAlive(int interval)
if (ret != 0) { if (ret != 0) {
return false; return false;
} }
#ifdef __linux__
val = interval; val = interval;
ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)); ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val));
if (ret != 0) { if (ret != 0) {
@ -178,9 +177,6 @@ bool Socket::setTcpKeepAlive(int interval)
if (ret != 0) { if (ret != 0) {
return false; return false;
} }
#else
((void)interval); //Avoid unused var warning for non Linux systems
#endif
return true; return true;
} }

View File

@ -1,482 +0,0 @@
/*
* predixy - A high performance and full features proxy for redis.
* Copyright (C) 2017 Joyield, Inc. <joyield.com@gmail.com>
* All rights reserved.
*/
#include <algorithm>
#include "Logger.h"
#include "ServerGroup.h"
#include "Handler.h"
#include "StandaloneServerPool.h"
StandaloneServerPool::StandaloneServerPool(Proxy* p):
ServerPoolTmpl(p, Standalone),
mDist(Distribution::Modula)
{
mSentinels.reserve(MaxSentinelNum);
mServPool.reserve(Const::MaxServNum);
mHashTag[0] = mHashTag[1] = '\0';
}
StandaloneServerPool::~StandaloneServerPool()
{
}
void StandaloneServerPool::init(const StandaloneServerPoolConf& conf)
{
ServerPool::init(conf);
mRefreshMethod = conf.refreshMethod;
mDist = conf.dist;
mHash = conf.hash;
mHashTag[0] = conf.hashTag[0];
mHashTag[1] = conf.hashTag[1];
int i = 0;
if (conf.refreshMethod == ServerPoolRefreshMethod::Sentinel) {
mSentinels.resize(conf.sentinels.size());
for (auto& sc : conf.sentinels) {
Server* s = new Server(this, sc.addr, true);
s->setRole(Server::Sentinel);
s->setPassword(sc.password.empty() ? conf.sentinelPassword:sc.password);
mSentinels[i++] = s;
mServs[s->addr()] = s;
}
}
mGroupPool.resize(conf.groups.size());
i = 0;
for (auto& gc : conf.groups) {
ServerGroup* g = new ServerGroup(this, gc.name);
mGroupPool[i++] = g;
auto role = Server::Master;
for (auto& sc : gc.servers) {
Server* s = new Server(this, sc.addr, true);
s->setPassword(sc.password.empty() ? conf.password : sc.password);
mServPool.push_back(s);
mServs[s->addr()] = s;
g->add(s);
s->setGroup(g);
switch (mRefreshMethod.value()) {
case ServerPoolRefreshMethod::Fixed:
s->setOnline(true);
s->setRole(role);
role = Server::Slave;
break;
default:
s->setOnline(false);
break;
}
}
}
}
Server* StandaloneServerPool::getServer(Handler* h, Request* req, const String& key) const
{
FuncCallTimer();
switch (req->type()) {
case Command::SentinelGetMaster:
case Command::SentinelSlaves:
case Command::SentinelSentinels:
if (mSentinels.empty()) {
return nullptr;
} else {
Server* s = randServer(h, mSentinels);
logDebug("sentinel server pool get server %s for sentinel command",
s->addr().data());
return s;
}
break;
case Command::Randomkey:
return randServer(h, mServPool);
default:
break;
}
if (mGroupPool.size() == 1) {
return mGroupPool[0]->getServer(h, req);
} else if (mGroupPool.size() > 1) {
switch (mDist) {
case Distribution::Modula:
{
long idx = mHash.hash(key.data(), key.length(), mHashTag);
idx %= mGroupPool.size();
return mGroupPool[idx]->getServer(h, req);
}
break;
case Distribution::Random:
{
int idx = h->rand() % mGroupPool.size();
return mGroupPool[idx]->getServer(h, req);
}
break;
default:
break;
}
}
return nullptr;
}
void StandaloneServerPool::refreshRequest(Handler* h)
{
logDebug("h %d update standalone server pool", h->id());
switch (mRefreshMethod.value()) {
case ServerPoolRefreshMethod::Sentinel:
for (auto g : mGroupPool) {
RequestPtr req = RequestAlloc::create();
req->setSentinels(g->name());
req->setData(g);
h->handleRequest(req);
req = RequestAlloc::create();
req->setSentinelGetMaster(g->name());
req->setData(g);
h->handleRequest(req);
req = RequestAlloc::create();
req->setSentinelSlaves(g->name());
req->setData(g);
h->handleRequest(req);
}
break;
default:
break;
}
}
void StandaloneServerPool::handleResponse(Handler* h, ConnectConnection* s, Request* req, Response* res)
{
switch (req->type()) {
case Command::SentinelSentinels:
handleSentinels(h, s, req, res);
break;
case Command::SentinelGetMaster:
handleGetMaster(h, s, req, res);
break;
case Command::SentinelSlaves:
handleSlaves(h, s, req, res);
break;
default:
break;
}
}
class AddrParser
{
public:
enum Status {
Ok,
Error,
Done
};
public:
AddrParser(const Segment& res):
mRes(res),
mState(Idle),
mCnt(0),
mArgLen(0),
mIp(false),
mPort(false)
{
mRes.rewind();
}
int count() const {return mCnt;}
Status parse(SString<Const::MaxAddrLen>& addr);
private:
enum State {
Idle,
Count,
CountLF,
Arg,
ArgLen,
ArgLenLF,
SubArrayLen,
Body,
BodyLF,
Invalid,
Finished
};
private:
Segment mRes;
State mState;
int mCnt;
int mArgLen;
bool mIp;
bool mPort;
SString<4> mKey;
};
AddrParser::Status AddrParser::parse(SString<Const::MaxAddrLen>& addr)
{
const char* dat;
int len;
addr.clear();
while (mRes.get(dat, len) && mState != Invalid) {
for (int i = 0; i < len && mState != Invalid; ++i) {
char ch = dat[i];
switch (mState) {
case Idle:
mState = ch == '*' ? Count : Invalid;
break;
case Count:
if (ch >= '0' && ch <= '9') {
mCnt = mCnt * 10 + (ch - '0');
} else if (ch == '\r') {
if (mCnt == 0) {
mState = Finished;
return Done;
} else if (mCnt < 0) {
mState = Invalid;
return Error;
}
mState = CountLF;
} else {
mState = Invalid;
}
break;
case CountLF:
mState = ch == '\n' ? Arg : Invalid;
break;
case Arg:
if (ch == '$') {
mState = ArgLen;
mArgLen = 0;
} else if (ch == '*') {
mState = SubArrayLen;
} else {
mState = Invalid;
}
break;
case ArgLen:
if (ch >= '0' && ch <= '9') {
mArgLen = mArgLen * 10 + (ch - '0');
} else if (ch == '\r') {
mState = ArgLenLF;
} else {
mState = Invalid;
}
break;
case ArgLenLF:
mState = ch == '\n' ? Body : Invalid;
break;
case SubArrayLen:
if (ch == '\n') {
mState = Arg;
}
break;
case Body:
if (ch == '\r') {
mState = BodyLF;
if (mPort) {
mPort = false;
mRes.use(i + 1);
return Ok;
} else if (mIp) {
mIp = false;
addr.append(':');
} else if (mArgLen == 2 && strcmp(mKey.data(), "ip") == 0) {
mIp = true;
} else if (mArgLen == 4 && strcmp(mKey.data(), "port") == 0) {
mPort = true;
}
break;
}
if (mIp || mPort) {
addr.append(ch);
} else if (mArgLen == 2 || mArgLen == 4) {
mKey.append(ch);
}
break;
case BodyLF:
mKey.clear();
mState = ch == '\n' ? Arg : Invalid;
break;
default:
break;
}
}
mRes.use(len);
}
return mState != Invalid ? Done : Error;
}
static bool hasValidPort(const String& addr)
{
const char* p = addr.data() + addr.length();
for (int i = 0; i < addr.length(); ++i) {
if (*(--p) == ':') {
int port = atoi(p + 1);
return port > 0 && port < 65536;
}
}
return false;
}
void StandaloneServerPool::handleSentinels(Handler* h, ConnectConnection* s, Request* req, Response* res)
{
if (!res || !res->isArray()) {
return;
}
AddrParser parser(res->body());
SString<Const::MaxAddrLen> addr;
while (true) {
auto st = parser.parse(addr);
if (st == AddrParser::Ok) {
logDebug("sentinel server pool parse sentinel %s", addr.data());
if (!hasValidPort(addr)) {
logNotice("sentinel server pool parse sentienl %s invalid",
addr.data());
continue;
}
auto it = mServs.find(addr);
Server* serv = it == mServs.end() ? nullptr : it->second;
if (!serv) {
if (mSentinels.size() == mSentinels.capacity()) {
logWarn("too many sentinels %d, will ignore new sentinel %s",
(int)mSentinels.size(), addr.data());
continue;
}
serv = new Server(this, addr, false);
serv->setRole(Server::Sentinel);
serv->setPassword(password());
mSentinels.push_back(serv);
mServs[serv->addr()] = serv;
logNotice("h %d create new sentinel %s",
h->id(), addr.data());
}
serv->setOnline(true);
} else if (st == AddrParser::Done) {
break;
} else {
logError("sentinel server pool parse sentinel sentinels error");
break;
}
}
}
void StandaloneServerPool::handleGetMaster(Handler* h, ConnectConnection* s, Request* req, Response* res)
{
if (!res || !res->isArray()) {
return;
}
ServerGroup* g = (ServerGroup*)req->data();
if (!g) {
return;
}
SegmentStr<Const::MaxAddrLen + 32> str(res->body());
if (!str.complete()) {
return;
}
if (strncmp(str.data(), "*2\r\n$", 5) != 0) {
return;
}
SString<Const::MaxAddrLen> addr;
const char* p = str.data() + 5;
int len = atoi(p);
if (len <= 0) {
return;
}
p = strchr(p, '\r') + 2;
if (!addr.append(p, len)) {
return;
}
if (!addr.append(':')) {
return;
}
p += len + 3;
len = atoi(p);
if (len <= 0) {
return;
}
p = strchr(p, '\r') + 2;
if (!addr.append(p, len)) {
return;
}
logDebug("sentinel server pool group %s get master %s",
g->name().data(), addr.data());
auto it = mServs.find(addr);
Server* serv = it == mServs.end() ? nullptr : it->second;
if (serv) {
serv->setOnline(true);
serv->setRole(Server::Master);
auto old = serv->group();
if (old) {
if (old != g) {
old->remove(serv);
g->add(serv);
serv->setGroup(g);
}
} else {
g->add(serv);
serv->setGroup(g);
}
} else {
if (mServPool.size() == mServPool.capacity()) {
logWarn("too many servers %d, will ignore new master server %s",
(int)mServPool.size(), addr.data());
return;
}
serv = new Server(this, addr, false);
serv->setRole(Server::Master);
serv->setPassword(password());
mServPool.push_back(serv);
g->add(serv);
serv->setGroup(g);
mServs[serv->addr()] = serv;
logNotice("sentinel server pool group %s create master server %s %s",
g->name().data(), addr.data(), serv->dcName().data());
}
}
void StandaloneServerPool::handleSlaves(Handler* h, ConnectConnection* s, Request* req, Response* res)
{
if (!res || !res->isArray()) {
return;
}
ServerGroup* g = (ServerGroup*)req->data();
if (!g) {
return;
}
AddrParser parser(res->body());
SString<Const::MaxAddrLen> addr;
while (true) {
auto st = parser.parse(addr);
if (st == AddrParser::Ok) {
logDebug("sentinel server pool group %s parse slave %s",
g->name().data(), addr.data());
auto it = mServs.find(addr);
Server* serv = it == mServs.end() ? nullptr : it->second;
if (serv) {
serv->setOnline(true);
serv->setRole(Server::Slave);
auto old = serv->group();
if (old) {
if (old != g) {
old->remove(serv);
g->add(serv);
serv->setGroup(g);
}
} else {
g->add(serv);
serv->setGroup(g);
}
} else {
if (mServPool.size() == mServPool.capacity()) {
logWarn("too many servers %d, will ignore new slave server %s",
(int)mServPool.size(), addr.data());
return;
}
serv = new Server(this, addr, false);
serv->setRole(Server::Slave);
serv->setPassword(password());
mServPool.push_back(serv);
g->add(serv);
serv->setGroup(g);
mServs[serv->addr()] = serv;
logNotice("sentinel server pool group %s create slave server %s %s",
g->name().data(), addr.data(), serv->dcName().data());
}
} else if (st == AddrParser::Done) {
break;
} else {
logError("sentinel server pool group %s parse sentinel sentinels error",
g->name().data());
break;
}
}
}

View File

@ -1,43 +0,0 @@
/*
* predixy - A high performance and full features proxy for redis.
* Copyright (C) 2017 Joyield, Inc. <joyield.com@gmail.com>
* All rights reserved.
*/
#ifndef _PREDIXY_STANDALONE_SERVER_POOL_H_
#define _PREDIXY_STANDALONE_SERVER_POOL_H_
#include <map>
#include "Predixy.h"
#include "ServerPool.h"
class StandaloneServerPool : public ServerPoolTmpl<StandaloneServerPool>
{
public:
static const int MaxSentinelNum = 64;
public:
StandaloneServerPool(Proxy* p);
~StandaloneServerPool();
void init(const StandaloneServerPoolConf& conf);
Server* getServer(Handler* h, Request* req, const String& key) const;
Server* iter(int& cursor) const
{
return ServerPool::iter(mServPool, cursor);
}
void refreshRequest(Handler* h);
void handleResponse(Handler* h, ConnectConnection* s, Request* req, Response* res);
private:
void handleSentinels(Handler* h, ConnectConnection* s, Request* req, Response* res);
void handleGetMaster(Handler* h, ConnectConnection* s, Request* req, Response* res);
void handleSlaves(Handler* h, ConnectConnection* s, Request* req, Response* res);
friend class ServerPoolTmpl<StandaloneServerPool>;
private:
ServerPoolRefreshMethod mRefreshMethod;
std::vector<Server*> mSentinels;
std::vector<Server*> mServPool;
Distribution mDist;
Hash mHash;
char mHashTag[2];
};
#endif

View File

@ -391,7 +391,7 @@ Cases = [
[('scard', '{k}2'), 3], [('scard', '{k}2'), 3],
]), ]),
('zset', [ ('zset', [
[('del', 'k', '{k}2', '{k}3', '{k}4', '{k}5', '{k}6'), ], [('del', 'k', '{k}2', '{k}3', '{k}4'), ],
[('zadd', 'k', 10, 'apple'), 1], [('zadd', 'k', 10, 'apple'), 1],
[('zcard', 'k'), 1], [('zcard', 'k'), 1],
[('zincrby', 'k', 2, 'apple'), '12'], [('zincrby', 'k', 2, 'apple'), '12'],
@ -438,12 +438,6 @@ Cases = [
[('zunionstore', '{k}3', 2, 'k', '{k}2'), 4], [('zunionstore', '{k}3', 2, 'k', '{k}2'), 4],
[('zunionstore', '{k}3', 2, 'k', '{k}2', 'AGGREGATE', 'MAX'), 4], [('zunionstore', '{k}3', 2, 'k', '{k}2', 'AGGREGATE', 'MAX'), 4],
[('zunionstore', '{k}3', 2, 'k', '{k}2', 'WEIGHTS', 0.5, 1.2, 'AGGREGATE', 'MAX'), 4], [('zunionstore', '{k}3', 2, 'k', '{k}2', 'WEIGHTS', 0.5, 1.2, 'AGGREGATE', 'MAX'), 4],
[('zadd', '{k}5', 0, 'apple', 9, 'banana', 1, 'pear', 3, 'orange', 4, 'cat'), 5],
[('zpopmax', '{k}5'), ['banana', '9']],
[('zpopmax', '{k}5', 3), ['cat', '4', 'orange', '3', 'pear', '1']],
[('zadd', '{k}6', 0, 'apple', 9, 'banana', 1, 'pear', 3, 'orange', 4, 'cat'), 5],
[('zpopmin', '{k}6'), ['apple', '0']],
[('zpopmin', '{k}6', 3), ['pear', '1', 'orange', '3', 'cat', '4']],
]), ]),
('hyperloglog', [ ('hyperloglog', [
[('del', 'k', '{k}2', '{k}3'), ], [('del', 'k', '{k}2', '{k}3'), ],