mirror of
https://github.com/joyieldInc/predixy.git
synced 2025-12-24 22:46:41 +08:00
Compare commits
No commits in common. "master" and "1.0.4" have entirely different histories.
@ -6,10 +6,10 @@
|
|||||||
## [StaticSlaveReadPriority [0-100]] #default 0
|
## [StaticSlaveReadPriority [0-100]] #default 0
|
||||||
## [DynamicSlaveReadPriority [0-100]] #default 0
|
## [DynamicSlaveReadPriority [0-100]] #default 0
|
||||||
## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
|
## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
|
||||||
## [ServerTimeout number[s|ms|us]] #default 0, server connection socket read/write timeout
|
## [ServerTimeout number[s|ms|us]] #default 1, server connection socket read/write timeout
|
||||||
## [ServerFailureLimit number] #default 10
|
## [ServerFailureLimit number] #default 10
|
||||||
## [ServerRetryTimeout number[s|ms|us]] #default 1
|
## [ServerRetryTimeout number[s|ms|us]] #default 1
|
||||||
## [KeepAlive seconds] #default 0, server connection tcp keepalive
|
## [KeepAlive seconds] #default 120, server connection tcp keepalive
|
||||||
|
|
||||||
## Servers {
|
## Servers {
|
||||||
## + addr
|
## + addr
|
||||||
|
|||||||
@ -1,94 +0,0 @@
|
|||||||
## Custom Command
|
|
||||||
## CustomCommand {
|
|
||||||
## command { #command string, must be lowercase
|
|
||||||
## [Mode read|write|admin[|[keyAt2|keyAt3]] #default write, default key position is 1
|
|
||||||
## [MinArgs [2-]] #default 2, including command itself
|
|
||||||
## [MaxArgs [2-]] #default 2, must be MaxArgs >= MinArgs
|
|
||||||
## }...
|
|
||||||
## }
|
|
||||||
|
|
||||||
## Currently support maximum 16 custom commands
|
|
||||||
|
|
||||||
## Example:
|
|
||||||
#CustomCommand {
|
|
||||||
##------------------------------------------------------------------------
|
|
||||||
# custom.ttl {
|
|
||||||
# Mode keyAt2
|
|
||||||
# MinArgs 3
|
|
||||||
# MaxArgs 3
|
|
||||||
# }
|
|
||||||
#### custom.ttl miliseconds key
|
|
||||||
#### Mode = write|keyAt2, MinArgs/MaxArgs = 3 = command + miliseconds + key
|
|
||||||
##------------------------------------------------------------------------
|
|
||||||
## from redis source src/modules/hello.c
|
|
||||||
# hello.push.native {
|
|
||||||
# MinArgs 3
|
|
||||||
# MaxArgs 3
|
|
||||||
# }
|
|
||||||
#### hello.push.native key value
|
|
||||||
#### Mode = write, MinArgs/MaxArgs = 3 = command) + key + value
|
|
||||||
##------------------------------------------------------------------------
|
|
||||||
# hello.repl2 {
|
|
||||||
# }
|
|
||||||
#### hello.repl2 <list-key>
|
|
||||||
#### Mode = write, MinArgs/MaxArgs = 2 = command + list-key
|
|
||||||
##------------------------------------------------------------------------
|
|
||||||
# hello.toggle.case {
|
|
||||||
# }
|
|
||||||
#### hello.toggle.case key
|
|
||||||
#### Mode = write, MinArgs/MaxArgs = 2 = command + key
|
|
||||||
##------------------------------------------------------------------------
|
|
||||||
# hello.more.expire {
|
|
||||||
# MinArgs 3
|
|
||||||
# MaxArgs 3
|
|
||||||
# }
|
|
||||||
#### hello.more.expire key milliseconds
|
|
||||||
#### Mode = write, MinArgs/MaxArgs = 3 = command + key + milliseconds
|
|
||||||
##------------------------------------------------------------------------
|
|
||||||
# hello.zsumrange {
|
|
||||||
# MinArgs 4
|
|
||||||
# MaxArgs 4
|
|
||||||
# Mode read
|
|
||||||
# }
|
|
||||||
#### hello.zsumrange key startscore endscore
|
|
||||||
#### Mode = read, MinArgs/MaxArgs = 4 = command + key + startscore + endscore
|
|
||||||
##------------------------------------------------------------------------
|
|
||||||
# hello.lexrange {
|
|
||||||
# MinArgs 6
|
|
||||||
# MaxArgs 6
|
|
||||||
# Mode read
|
|
||||||
# }
|
|
||||||
#### hello.lexrange key min_lex max_lex min_age max_age
|
|
||||||
#### Mode = read, MinArgs/MaxArgs = 6 = command + key + min_lex + max_lex + min_age + max_age
|
|
||||||
##------------------------------------------------------------------------
|
|
||||||
# hello.hcopy {
|
|
||||||
# MinArgs 4
|
|
||||||
# MaxArgs 4
|
|
||||||
# }
|
|
||||||
#### hello.hcopy key srcfield dstfield
|
|
||||||
#### Mode = write, MinArgs/MaxArgs = 4 = command + key + srcfield) + dstfield
|
|
||||||
##------------------------------------------------------------------------
|
|
||||||
## from redis source src/modules/hellotype.c
|
|
||||||
# hellotype.insert {
|
|
||||||
# MinArgs 3
|
|
||||||
# MaxArgs 3
|
|
||||||
# }
|
|
||||||
#### hellotype.insert key value
|
|
||||||
#### Mode = write, MinArgs/MaxArgs = 3 = command + key + value
|
|
||||||
##------------------------------------------------------------------------
|
|
||||||
# hellotype.range {
|
|
||||||
# MinArgs 4
|
|
||||||
# MaxArgs 4
|
|
||||||
# Mode read
|
|
||||||
# }
|
|
||||||
#### hellotype.range key first count
|
|
||||||
#### Mode = read, MinArgs/MaxArgs = 4 = command + key + first + count
|
|
||||||
##------------------------------------------------------------------------
|
|
||||||
# hellotype.len {
|
|
||||||
# Mode read
|
|
||||||
# }
|
|
||||||
#### hellotype.len key
|
|
||||||
#### Mode = read, MinArgs/MaxArgs = 2 = command + key
|
|
||||||
##------------------------------------------------------------------------
|
|
||||||
#}
|
|
||||||
|
|
||||||
@ -93,10 +93,6 @@ Include try.conf
|
|||||||
# Include dc.conf
|
# Include dc.conf
|
||||||
|
|
||||||
|
|
||||||
################################### COMMAND ####################################
|
|
||||||
## Custom command define, see command.conf
|
|
||||||
#Include command.conf
|
|
||||||
|
|
||||||
################################### LATENCY ####################################
|
################################### LATENCY ####################################
|
||||||
## Latency monitor define, see latency.conf
|
## Latency monitor define, see latency.conf
|
||||||
Include latency.conf
|
Include latency.conf
|
||||||
|
|||||||
@ -10,10 +10,10 @@
|
|||||||
## [StaticSlaveReadPriority [0-100]] #default 0
|
## [StaticSlaveReadPriority [0-100]] #default 0
|
||||||
## [DynamicSlaveReadPriority [0-100]] #default 0
|
## [DynamicSlaveReadPriority [0-100]] #default 0
|
||||||
## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
|
## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
|
||||||
## [ServerTimeout number[s|ms|us]] #default 0, server connection socket read/write timeout
|
## [ServerTimeout number[s|ms|us]] #default 1, server connection socket read/write timeout
|
||||||
## [ServerFailureLimit number] #default 10
|
## [ServerFailureLimit number] #default 10
|
||||||
## [ServerRetryTimeout number[s|ms|us]] #default 1
|
## [ServerRetryTimeout number[s|ms|us]] #default 1
|
||||||
## [KeepAlive seconds] #default 0, server connection tcp keepalive
|
## [KeepAlive seconds] #default 120, server connection tcp keepalive
|
||||||
## Sentinels {
|
## Sentinels {
|
||||||
## + addr
|
## + addr
|
||||||
## ...
|
## ...
|
||||||
|
|||||||
@ -1,71 +0,0 @@
|
|||||||
## redis standalone server pool define
|
|
||||||
|
|
||||||
##StandaloneServerPool {
|
|
||||||
## [Password xxx] #default no
|
|
||||||
## [Databases number] #default 1
|
|
||||||
## Hash atol|crc16
|
|
||||||
## [HashTag "xx"] #default no
|
|
||||||
## Distribution modula|random
|
|
||||||
## [MasterReadPriority [0-100]] #default 50
|
|
||||||
## [StaticSlaveReadPriority [0-100]] #default 0
|
|
||||||
## [DynamicSlaveReadPriority [0-100]] #default 0
|
|
||||||
## RefreshMethod fixed|sentinel #
|
|
||||||
## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
|
|
||||||
## [ServerTimeout number[s|ms|us]] #default 0, server connection socket read/write timeout
|
|
||||||
## [ServerFailureLimit number] #default 10
|
|
||||||
## [ServerRetryTimeout number[s|ms|us]] #default 1
|
|
||||||
## [KeepAlive seconds] #default 0, server connection tcp keepalive
|
|
||||||
## Sentinels [sentinel-password] {
|
|
||||||
## + addr
|
|
||||||
## ...
|
|
||||||
## }
|
|
||||||
## Group xxx {
|
|
||||||
## [+ addr] #if RefreshMethod==fixed: the first addr is master in a group, then all addrs is slaves in this group
|
|
||||||
## ...
|
|
||||||
## }
|
|
||||||
##}
|
|
||||||
|
|
||||||
|
|
||||||
## Examples:
|
|
||||||
#StandaloneServerPool {
|
|
||||||
# Databases 16
|
|
||||||
# Hash crc16
|
|
||||||
# HashTag "{}"
|
|
||||||
# Distribution modula
|
|
||||||
# MasterReadPriority 60
|
|
||||||
# StaticSlaveReadPriority 50
|
|
||||||
# DynamicSlaveReadPriority 50
|
|
||||||
# RefreshMethod sentinel
|
|
||||||
# RefreshInterval 1
|
|
||||||
# ServerTimeout 1
|
|
||||||
# ServerFailureLimit 10
|
|
||||||
# ServerRetryTimeout 1
|
|
||||||
# KeepAlive 120
|
|
||||||
# Sentinels {
|
|
||||||
# + 10.2.2.2:7500
|
|
||||||
# + 10.2.2.3:7500
|
|
||||||
# + 10.2.2.4:7500
|
|
||||||
# }
|
|
||||||
# Group shard001 {
|
|
||||||
# }
|
|
||||||
# Group shard002 {
|
|
||||||
# }
|
|
||||||
#}
|
|
||||||
|
|
||||||
#StandaloneServerPool {
|
|
||||||
# Databases 16
|
|
||||||
# Hash crc16
|
|
||||||
# HashTag "{}"
|
|
||||||
# Distribution modula
|
|
||||||
# MasterReadPriority 60
|
|
||||||
# StaticSlaveReadPriority 50
|
|
||||||
# DynamicSlaveReadPriority 50
|
|
||||||
# RefreshMethod fixed
|
|
||||||
# ServerTimeout 1
|
|
||||||
# ServerFailureLimit 10
|
|
||||||
# ServerRetryTimeout 1
|
|
||||||
# KeepAlive 120
|
|
||||||
# Group shard001 {
|
|
||||||
# + 10.2.3.2:6379
|
|
||||||
# }
|
|
||||||
#}
|
|
||||||
@ -151,9 +151,9 @@ predixy扩展了redis中AUTH命令的功能,支持定义多个认证密码,
|
|||||||
Authority {
|
Authority {
|
||||||
Auth [password] {
|
Auth [password] {
|
||||||
Mode read|write|admin
|
Mode read|write|admin
|
||||||
[KeyPrefix Prefix...]
|
[KeyPredix Predix...]
|
||||||
[ReadKeyPrefix Prefix...]
|
[ReadKeyPredix Predix...]
|
||||||
[WriteKeyPrefix Prefix...]
|
[WriteKeyPredix Predix...]
|
||||||
}...
|
}...
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -222,11 +222,9 @@ predixy支持Redis Sentinel和Redis Cluster来使用redis,一个配置里这
|
|||||||
[MasterReadPriority [0-100]]
|
[MasterReadPriority [0-100]]
|
||||||
[StaticSlaveReadPriority [0-100]]
|
[StaticSlaveReadPriority [0-100]]
|
||||||
[DynamicSlaveReadPriority [0-100]]
|
[DynamicSlaveReadPriority [0-100]]
|
||||||
[RefreshInterval number[s|ms|us]]
|
[RefreshInterval seconds]
|
||||||
[ServerTimeout number[s|ms|us]]
|
|
||||||
[ServerFailureLimit number]
|
[ServerFailureLimit number]
|
||||||
[ServerRetryTimeout number[s|ms|us]]
|
[ServerRetryTimeout seconds]
|
||||||
[KeepAlive seconds]
|
|
||||||
Sentinels {
|
Sentinels {
|
||||||
+ addr
|
+ addr
|
||||||
...
|
...
|
||||||
@ -244,14 +242,12 @@ predixy支持Redis Sentinel和Redis Cluster来使用redis,一个配置里这
|
|||||||
+ Hash: 指定对key算哈希的方法,当前只支持atol和crc16
|
+ Hash: 指定对key算哈希的方法,当前只支持atol和crc16
|
||||||
+ HashTag: 指定哈希标签,不指定的话为{}
|
+ HashTag: 指定哈希标签,不指定的话为{}
|
||||||
+ Distribution: 指定分布key的方法,当前只支持modula和random
|
+ Distribution: 指定分布key的方法,当前只支持modula和random
|
||||||
+ MasterReadPriority: 读写分离功能,从redis master节点执行读请求的优先级,为0则禁止读redis master,不指定的话为50
|
+ MasterReadPriority: 读写分离功能,从redis master节点执行读请求的优先级,为0则禁止读redis master
|
||||||
+ StaticSlaveReadPriority: 读写分离功能,从静态redis slave节点执行读请求的优先级,所谓静态节点,是指在本配置文件中显示列出的redis节点,不指定的话为0
|
+ StaticSlaveReadPriority: 读写分离功能,从静态redis slave节点执行读请求的优先级,所谓静态节点,是指在本配置文件中显示列出的redis节点
|
||||||
+ DynamicSlaveReadPolicy: 功能见上,所谓动态节点是指在本配置文件中没有列出,但是通过redis sentinel动态发现的节点,不指定的话为0
|
+ DynamicSlaveReadPolicy: 功能见上,所谓动态节点是指在本配置文件中没有列出,但是通过redis sentinel动态发现的节点
|
||||||
+ RefreshInterval: predixy会周期性的请求redis sentinel以获取最新的集群信息,该参数以秒为单位指定刷新周期,不指定的话为1秒
|
+ RefreshInterval: predixy会周期性的请求redis sentinel以获取最新的集群信息,该参数以秒为单位指定刷新周期
|
||||||
+ ServerTimeout: 请求在predixy中最长的处理/等待时间,如果超过该时间redis还没有响应的话,那么predixy会关闭同redis的连接,并给客户端一个错误响应,对于blpop这种阻塞式命令,该选项不起作用,为0则禁止此功能,即如果redis不返回就一直等待,不指定的话为0
|
+ ServerFailureLimit: 一个redis实例出现多少次才错误以后将其标记为失效
|
||||||
+ ServerFailureLimit: 一个redis实例出现多少次才错误以后将其标记为失效,不指定的话为10
|
+ ServerRetryTimeout: 一个redis实例失效后多久后去检查其是否恢复正常
|
||||||
+ ServerRetryTimeout: 一个redis实例失效后多久后去检查其是否恢复正常,不指定的话为1秒
|
|
||||||
+ KeepAlive: predixy与redis的连接tcp keepalive时间,为0则禁止此功能,不指定的话为0
|
|
||||||
+ Sentinels: 里面定义redis sentinel实例的地址
|
+ Sentinels: 里面定义redis sentinel实例的地址
|
||||||
+ Group: 定义一个redis组,Group的名字应该和redis sentinel里面的名字一致,Group里可以显示列出redis的地址,列出的话就是上面提到的静态节点
|
+ Group: 定义一个redis组,Group的名字应该和redis sentinel里面的名字一致,Group里可以显示列出redis的地址,列出的话就是上面提到的静态节点
|
||||||
|
|
||||||
@ -266,10 +262,8 @@ predixy支持Redis Sentinel和Redis Cluster来使用redis,一个配置里这
|
|||||||
StaticSlaveReadPriority 50
|
StaticSlaveReadPriority 50
|
||||||
DynamicSlaveReadPriority 50
|
DynamicSlaveReadPriority 50
|
||||||
RefreshInterval 1
|
RefreshInterval 1
|
||||||
ServerTimeout 1
|
|
||||||
ServerFailureLimit 10
|
ServerFailureLimit 10
|
||||||
ServerRetryTimeout 1
|
ServerRetryTimeout 1
|
||||||
KeepAlive 120
|
|
||||||
Sentinels {
|
Sentinels {
|
||||||
+ 10.2.2.2:7500
|
+ 10.2.2.2:7500
|
||||||
+ 10.2.2.3:7500
|
+ 10.2.2.3:7500
|
||||||
@ -293,10 +287,8 @@ predixy支持Redis Sentinel和Redis Cluster来使用redis,一个配置里这
|
|||||||
[StaticSlaveReadPriority [0-100]]
|
[StaticSlaveReadPriority [0-100]]
|
||||||
[DynamicSlaveReadPriority [0-100]]
|
[DynamicSlaveReadPriority [0-100]]
|
||||||
[RefreshInterval seconds]
|
[RefreshInterval seconds]
|
||||||
[ServerTimeout number[s|ms|us]]
|
|
||||||
[ServerFailureLimit number]
|
[ServerFailureLimit number]
|
||||||
[ServerRetryTimeout number[s|ms|us]]
|
[ServerRetryTimeout seconds]
|
||||||
[KeepAlive seconds]
|
|
||||||
Servers {
|
Servers {
|
||||||
+ addr
|
+ addr
|
||||||
...
|
...
|
||||||
@ -316,10 +308,8 @@ predixy支持Redis Sentinel和Redis Cluster来使用redis,一个配置里这
|
|||||||
StaticSlaveReadPriority 50
|
StaticSlaveReadPriority 50
|
||||||
DynamicSlaveReadPriority 50
|
DynamicSlaveReadPriority 50
|
||||||
RefreshInterval 1
|
RefreshInterval 1
|
||||||
ServerTimeout 1
|
|
||||||
ServerFailureLimit 10
|
ServerFailureLimit 10
|
||||||
ServerRetryTimeout 1
|
ServerRetryTimeout 1
|
||||||
KeepAlive 120
|
|
||||||
Servers {
|
Servers {
|
||||||
+ 192.168.2.107:2211
|
+ 192.168.2.107:2211
|
||||||
+ 192.168.2.107:2212
|
+ 192.168.2.107:2212
|
||||||
|
|||||||
@ -29,7 +29,6 @@ public:
|
|||||||
typedef AcceptConnection Value;
|
typedef AcceptConnection Value;
|
||||||
typedef ListNode<AcceptConnection, SharePtr<AcceptConnection>> ListNodeType;
|
typedef ListNode<AcceptConnection, SharePtr<AcceptConnection>> ListNodeType;
|
||||||
typedef DequeNode<AcceptConnection, SharePtr<AcceptConnection>> DequeNodeType;
|
typedef DequeNode<AcceptConnection, SharePtr<AcceptConnection>> DequeNodeType;
|
||||||
typedef Alloc<AcceptConnection, Const::AcceptConnectionAllocCacheSize> Allocator;
|
|
||||||
public:
|
public:
|
||||||
AcceptConnection(int fd, sockaddr* addr, socklen_t len);
|
AcceptConnection(int fd, sockaddr* addr, socklen_t len);
|
||||||
~AcceptConnection();
|
~AcceptConnection();
|
||||||
@ -98,6 +97,6 @@ private:
|
|||||||
|
|
||||||
typedef List<AcceptConnection> AcceptConnectionList;
|
typedef List<AcceptConnection> AcceptConnectionList;
|
||||||
typedef Deque<AcceptConnection> AcceptConnectionDeque;
|
typedef Deque<AcceptConnection> AcceptConnectionDeque;
|
||||||
typedef AcceptConnection::Allocator AcceptConnectionAlloc;
|
typedef Alloc<AcceptConnection, Const::AcceptConnectionAllocCacheSize> AcceptConnectionAlloc;
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -76,7 +76,7 @@ public:
|
|||||||
}
|
}
|
||||||
UsedMemory += allocSize<T>();
|
UsedMemory += allocSize<T>();
|
||||||
if (MaxMemory == 0 || UsedMemory <= MaxMemory) {
|
if (MaxMemory == 0 || UsedMemory <= MaxMemory) {
|
||||||
void* p = ::operator new(allocSize<T>(), std::nothrow);
|
void* p = ::operator new(allocSize<T>());
|
||||||
if (p) {
|
if (p) {
|
||||||
try {
|
try {
|
||||||
obj = new (p) T(args...);
|
obj = new (p) T(args...);
|
||||||
@ -145,7 +145,7 @@ public:
|
|||||||
{
|
{
|
||||||
int n = --mCnt;
|
int n = --mCnt;
|
||||||
if (n == 0) {
|
if (n == 0) {
|
||||||
T::Allocator::destroy(static_cast<T*>(this));
|
Alloc<T>::destroy(static_cast<T*>(this));
|
||||||
} else if (n < 0) {
|
} else if (n < 0) {
|
||||||
logError("unref object %p with cnt %d", this, n);
|
logError("unref object %p with cnt %d", this, n);
|
||||||
abort();
|
abort();
|
||||||
|
|||||||
@ -23,7 +23,6 @@ class Buffer :
|
|||||||
public RefCntObj<Buffer>
|
public RefCntObj<Buffer>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
typedef Alloc<Buffer, Const::BufferAllocCacheSize> Allocator;
|
|
||||||
static const int MaxBufFmtAppendLen = 8192;
|
static const int MaxBufFmtAppendLen = 8192;
|
||||||
public:
|
public:
|
||||||
Buffer& operator=(const Buffer&);
|
Buffer& operator=(const Buffer&);
|
||||||
@ -93,13 +92,12 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
typedef List<Buffer> BufferList;
|
typedef List<Buffer> BufferList;
|
||||||
typedef Buffer::Allocator BufferAlloc;
|
|
||||||
|
|
||||||
template<>
|
template<>
|
||||||
inline int allocSize<Buffer>()
|
inline int allocSize<Buffer>()
|
||||||
{
|
{
|
||||||
return Buffer::getSize() + sizeof(Buffer);
|
return Buffer::getSize() + sizeof(Buffer);
|
||||||
}
|
}
|
||||||
|
typedef Alloc<Buffer, Const::BufferAllocCacheSize> BufferAlloc;
|
||||||
|
|
||||||
struct BufferPos
|
struct BufferPos
|
||||||
{
|
{
|
||||||
|
|||||||
@ -9,9 +9,8 @@
|
|||||||
#include <map>
|
#include <map>
|
||||||
#include "String.h"
|
#include "String.h"
|
||||||
#include "Command.h"
|
#include "Command.h"
|
||||||
#include "Conf.h"
|
|
||||||
|
|
||||||
Command Command::CmdPool[AvailableCommands] = {
|
const Command Command::CmdPool[Sentinel] = {
|
||||||
{None, "", 0, MaxArgs, Read},
|
{None, "", 0, MaxArgs, Read},
|
||||||
{Ping, "ping", 1, 2, Read},
|
{Ping, "ping", 1, 2, Read},
|
||||||
{PingServ, "ping", 1, 2, Inner},
|
{PingServ, "ping", 1, 2, Inner},
|
||||||
@ -87,8 +86,8 @@ Command Command::CmdPool[AvailableCommands] = {
|
|||||||
{Setrange, "setrange", 4, 4, Write},
|
{Setrange, "setrange", 4, 4, Write},
|
||||||
{Strlen, "strlen", 2, 2, Read},
|
{Strlen, "strlen", 2, 2, Read},
|
||||||
{Hdel, "hdel", 3, MaxArgs, Write},
|
{Hdel, "hdel", 3, MaxArgs, Write},
|
||||||
{Hexists, "hexists", 3, 3, Read},
|
{Hexists, "hexists", 3, 3, Write},
|
||||||
{Hget, "hget", 3, 3, Read},
|
{Hget, "hget", 3, 3, Write},
|
||||||
{Hgetall, "hgetall", 2, 2, Read},
|
{Hgetall, "hgetall", 2, 2, Read},
|
||||||
{Hincrby, "hincrby", 4, 4, Write},
|
{Hincrby, "hincrby", 4, 4, Write},
|
||||||
{Hincrbyfloat, "hincrbyfloat", 4, 4, Write},
|
{Hincrbyfloat, "hincrbyfloat", 4, 4, Write},
|
||||||
@ -97,7 +96,7 @@ Command Command::CmdPool[AvailableCommands] = {
|
|||||||
{Hmget, "hmget", 3, MaxArgs, Read},
|
{Hmget, "hmget", 3, MaxArgs, Read},
|
||||||
{Hmset, "hmset", 4, MaxArgs, Write},
|
{Hmset, "hmset", 4, MaxArgs, Write},
|
||||||
{Hscan, "hscan", 3, 7, Read},
|
{Hscan, "hscan", 3, 7, Read},
|
||||||
{Hset, "hset", 4, MaxArgs, Write},
|
{Hset, "hset", 4, 4, Write},
|
||||||
{Hsetnx, "hsetnx", 4, 4, Write},
|
{Hsetnx, "hsetnx", 4, 4, Write},
|
||||||
{Hstrlen, "hstrlen", 3, 3, Read},
|
{Hstrlen, "hstrlen", 3, 3, Read},
|
||||||
{Hvals, "hvals", 2, 2, Read},
|
{Hvals, "hvals", 2, 2, Read},
|
||||||
@ -110,7 +109,7 @@ Command Command::CmdPool[AvailableCommands] = {
|
|||||||
{Lpop, "lpop", 2, 2, Write},
|
{Lpop, "lpop", 2, 2, Write},
|
||||||
{Lpush, "lpush", 3, MaxArgs, Write},
|
{Lpush, "lpush", 3, MaxArgs, Write},
|
||||||
{Lpushx, "lpushx", 3, 3, Write},
|
{Lpushx, "lpushx", 3, 3, Write},
|
||||||
{Lrange, "lrange", 4, 4, Read},
|
{Lrange, "lrange", 4, 4, Write},
|
||||||
{Lrem, "lrem", 4, 4, Write},
|
{Lrem, "lrem", 4, 4, Write},
|
||||||
{Lset, "lset", 4, 4, Write},
|
{Lset, "lset", 4, 4, Write},
|
||||||
{Ltrim, "ltrim", 4, 4, Write},
|
{Ltrim, "ltrim", 4, 4, Write},
|
||||||
@ -139,8 +138,6 @@ Command Command::CmdPool[AvailableCommands] = {
|
|||||||
{Zincrby, "zincrby", 4, 4, Write},
|
{Zincrby, "zincrby", 4, 4, Write},
|
||||||
{Zinterstore, "zinterstore", 4, MaxArgs, Write},
|
{Zinterstore, "zinterstore", 4, MaxArgs, Write},
|
||||||
{Zlexcount, "zlexcount", 4, 4, Read},
|
{Zlexcount, "zlexcount", 4, 4, Read},
|
||||||
{Zpopmax, "zpopmax", 2, 3, Write},
|
|
||||||
{Zpopmin, "zpopmin", 2, 3, Write},
|
|
||||||
{Zrange, "zrange", 4, 5, Read},
|
{Zrange, "zrange", 4, 5, Read},
|
||||||
{Zrangebylex, "zrangebylex", 4, 7, Read},
|
{Zrangebylex, "zrangebylex", 4, 7, Read},
|
||||||
{Zrangebyscore, "zrangebyscore", 4, 8, Read},
|
{Zrangebyscore, "zrangebyscore", 4, 8, Read},
|
||||||
@ -174,14 +171,11 @@ Command Command::CmdPool[AvailableCommands] = {
|
|||||||
{SubMsg, "\000SubMsg", 0, 0, Admin}
|
{SubMsg, "\000SubMsg", 0, 0, Admin}
|
||||||
};
|
};
|
||||||
|
|
||||||
int Command::Sentinel = Command::MaxCommands;
|
|
||||||
Command::CommandMap Command::CmdMap;
|
Command::CommandMap Command::CmdMap;
|
||||||
|
|
||||||
void Command::init()
|
void Command::init()
|
||||||
{
|
{
|
||||||
int type = 0;
|
int type = 0;
|
||||||
for (auto j = 0; j < MaxCommands; j++) {
|
for (auto& i : CmdPool) {
|
||||||
const auto& i = CmdPool[j];
|
|
||||||
if (i.type != type) {
|
if (i.type != type) {
|
||||||
Throw(InitFail, "command %s unmatch the index in commands table", i.name);
|
Throw(InitFail, "command %s unmatch the index in commands table", i.name);
|
||||||
}
|
}
|
||||||
@ -193,19 +187,3 @@ void Command::init()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Command::addCustomCommand(const CustomCommandConf& ccc) {
|
|
||||||
if (Sentinel >= AvailableCommands) {
|
|
||||||
Throw(InitFail, "too many custom commands(>%d)", MaxCustomCommands);
|
|
||||||
}
|
|
||||||
if (nullptr != find(ccc.name)) {
|
|
||||||
Throw(InitFail, "custom command %s is duplicated", ccc.name.c_str());
|
|
||||||
}
|
|
||||||
auto* p = &CmdPool[Sentinel];
|
|
||||||
p->name = ccc.name.c_str();
|
|
||||||
p->minArgs = ccc.minArgs;
|
|
||||||
p->maxArgs = ccc.maxArgs;
|
|
||||||
p->mode = ccc.mode;
|
|
||||||
p->type = (Command::Type)Sentinel++;
|
|
||||||
CmdMap[ccc.name] = p;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|||||||
@ -11,8 +11,6 @@
|
|||||||
#include "Exception.h"
|
#include "Exception.h"
|
||||||
#include "HashFunc.h"
|
#include "HashFunc.h"
|
||||||
|
|
||||||
struct CustomCommandConf;
|
|
||||||
|
|
||||||
class Command
|
class Command
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -156,8 +154,6 @@ public:
|
|||||||
Zincrby,
|
Zincrby,
|
||||||
Zinterstore,
|
Zinterstore,
|
||||||
Zlexcount,
|
Zlexcount,
|
||||||
Zpopmax,
|
|
||||||
Zpopmin,
|
|
||||||
Zrange,
|
Zrange,
|
||||||
Zrangebylex,
|
Zrangebylex,
|
||||||
Zrangebyscore,
|
Zrangebyscore,
|
||||||
@ -193,9 +189,7 @@ public:
|
|||||||
Unsubscribe,
|
Unsubscribe,
|
||||||
SubMsg,
|
SubMsg,
|
||||||
|
|
||||||
MaxCommands,
|
Sentinel
|
||||||
MaxCustomCommands = 16,
|
|
||||||
AvailableCommands = MaxCommands + MaxCustomCommands,
|
|
||||||
};
|
};
|
||||||
enum Mode
|
enum Mode
|
||||||
{
|
{
|
||||||
@ -255,11 +249,9 @@ public:
|
|||||||
auto it = CmdMap.find(cmd);
|
auto it = CmdMap.find(cmd);
|
||||||
return it == CmdMap.end() ? nullptr : it->second;
|
return it == CmdMap.end() ? nullptr : it->second;
|
||||||
}
|
}
|
||||||
static void addCustomCommand(const CustomCommandConf& pc);
|
|
||||||
static int Sentinel;
|
|
||||||
private:
|
private:
|
||||||
static const int MaxArgs = 100000000;
|
static const int MaxArgs = 100000000;
|
||||||
static Command CmdPool[];
|
static const Command CmdPool[Sentinel];
|
||||||
class H
|
class H
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|||||||
@ -10,7 +10,7 @@
|
|||||||
#include <limits.h>
|
#include <limits.h>
|
||||||
|
|
||||||
#define _PREDIXY_NAME_ "predixy"
|
#define _PREDIXY_NAME_ "predixy"
|
||||||
#define _PREDIXY_VERSION_ "1.0.5"
|
#define _PREDIXY_VERSION_ "1.0.4"
|
||||||
|
|
||||||
namespace Const
|
namespace Const
|
||||||
{
|
{
|
||||||
@ -32,8 +32,8 @@ namespace Const
|
|||||||
static const int MaxCmdLen = 32;
|
static const int MaxCmdLen = 32;
|
||||||
static const int MaxKeyLen = 512;
|
static const int MaxKeyLen = 512;
|
||||||
static const int BufferAllocCacheSize = 64;
|
static const int BufferAllocCacheSize = 64;
|
||||||
static const int RequestAllocCacheSize = 128;
|
static const int RequestAllocCacheSize = 32;
|
||||||
static const int ResponseAllocCacheSize = 128;
|
static const int ResponseAllocCacheSize = 32;
|
||||||
static const int AcceptConnectionAllocCacheSize = 32;
|
static const int AcceptConnectionAllocCacheSize = 32;
|
||||||
static const int ConnectConnectionAllocCacheSize = 4;
|
static const int ConnectConnectionAllocCacheSize = 4;
|
||||||
};
|
};
|
||||||
|
|||||||
200
src/Conf.cpp
200
src/Conf.cpp
@ -6,7 +6,6 @@
|
|||||||
|
|
||||||
#include <ctype.h>
|
#include <ctype.h>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <sstream>
|
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include "LogFileSink.h"
|
#include "LogFileSink.h"
|
||||||
#include "ServerPool.h"
|
#include "ServerPool.h"
|
||||||
@ -33,13 +32,6 @@ bool ServerConf::parse(ServerConf& s, const char* str)
|
|||||||
return !s.addr.empty();
|
return !s.addr.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
void CustomCommandConf::init(CustomCommandConf&c, const char* name) {
|
|
||||||
c.name = name;
|
|
||||||
c.minArgs = 2;
|
|
||||||
c.maxArgs = 2;
|
|
||||||
c.mode = Command::Write;
|
|
||||||
}
|
|
||||||
|
|
||||||
Conf::Conf():
|
Conf::Conf():
|
||||||
mBind("0.0.0.0:7617"),
|
mBind("0.0.0.0:7617"),
|
||||||
mWorkerThreads(1),
|
mWorkerThreads(1),
|
||||||
@ -125,9 +117,8 @@ void Conf::setGlobal(const ConfParser::Node* node)
|
|||||||
{
|
{
|
||||||
const ConfParser::Node* authority = nullptr;
|
const ConfParser::Node* authority = nullptr;
|
||||||
const ConfParser::Node* clusterServerPool = nullptr;
|
const ConfParser::Node* clusterServerPool = nullptr;
|
||||||
const ConfParser::Node* standaloneServerPool = nullptr;
|
const ConfParser::Node* sentinelServerPool = nullptr;
|
||||||
const ConfParser::Node* dataCenter = nullptr;
|
const ConfParser::Node* dataCenter = nullptr;
|
||||||
std::vector<const ConfParser::Node*> latencyMonitors;
|
|
||||||
for (auto p = node; p; p = p->next) {
|
for (auto p = node; p; p = p->next) {
|
||||||
if (setStr(mName, "Name", p)) {
|
if (setStr(mName, "Name", p)) {
|
||||||
} else if (setStr(mBind, "Bind", p)) {
|
} else if (setStr(mBind, "Bind", p)) {
|
||||||
@ -156,19 +147,16 @@ void Conf::setGlobal(const ConfParser::Node* node)
|
|||||||
} else if (setInt(mLogSample[LogLevel::Warn], "LogWarnSample", p)) {
|
} else if (setInt(mLogSample[LogLevel::Warn], "LogWarnSample", p)) {
|
||||||
} else if (setInt(mLogSample[LogLevel::Error], "LogErrorSample", p)) {
|
} else if (setInt(mLogSample[LogLevel::Error], "LogErrorSample", p)) {
|
||||||
} else if (strcasecmp(p->key.c_str(), "LatencyMonitor") == 0) {
|
} else if (strcasecmp(p->key.c_str(), "LatencyMonitor") == 0) {
|
||||||
latencyMonitors.push_back(p);
|
mLatencyMonitors.push_back(LatencyMonitorConf{});
|
||||||
|
setLatencyMonitor(mLatencyMonitors.back(), p);
|
||||||
} else if (strcasecmp(p->key.c_str(), "Authority") == 0) {
|
} else if (strcasecmp(p->key.c_str(), "Authority") == 0) {
|
||||||
authority = p;
|
authority = p;
|
||||||
} else if (strcasecmp(p->key.c_str(), "ClusterServerPool") == 0) {
|
} else if (strcasecmp(p->key.c_str(), "ClusterServerPool") == 0) {
|
||||||
clusterServerPool = p;
|
clusterServerPool = p;
|
||||||
} else if (strcasecmp(p->key.c_str(), "SentinelServerPool") == 0) {
|
} else if (strcasecmp(p->key.c_str(), "SentinelServerPool") == 0) {
|
||||||
standaloneServerPool = p;
|
sentinelServerPool = p;
|
||||||
} else if (strcasecmp(p->key.c_str(), "StandaloneServerPool") == 0) {
|
|
||||||
standaloneServerPool = p;
|
|
||||||
} else if (strcasecmp(p->key.c_str(), "DataCenter") == 0) {
|
} else if (strcasecmp(p->key.c_str(), "DataCenter") == 0) {
|
||||||
dataCenter = p;
|
dataCenter = p;
|
||||||
} else if (strcasecmp(p->key.c_str(), "CustomCommand") == 0) {
|
|
||||||
setCustomCommand(p);
|
|
||||||
} else {
|
} else {
|
||||||
Throw(UnknownKey, "%s:%d unknown key %s", p->file, p->line, p->key.c_str());
|
Throw(UnknownKey, "%s:%d unknown key %s", p->file, p->line, p->key.c_str());
|
||||||
}
|
}
|
||||||
@ -176,27 +164,20 @@ void Conf::setGlobal(const ConfParser::Node* node)
|
|||||||
if (authority) {
|
if (authority) {
|
||||||
setAuthority(authority);
|
setAuthority(authority);
|
||||||
}
|
}
|
||||||
if (clusterServerPool && standaloneServerPool) {
|
if (clusterServerPool && sentinelServerPool) {
|
||||||
Throw(LogicError, "Can't define ClusterServerPool/StandaloneServerPool at the same time");
|
Throw(LogicError, "Can't define ClusterServerPool and SentinelServerPool at the same time");
|
||||||
} else if (clusterServerPool) {
|
} else if (clusterServerPool) {
|
||||||
setClusterServerPool(clusterServerPool);
|
setClusterServerPool(clusterServerPool);
|
||||||
mServerPoolType = ServerPool::Cluster;
|
mServerPoolType = ServerPool::Cluster;
|
||||||
} else if (standaloneServerPool) {
|
} else if (sentinelServerPool) {
|
||||||
if (strcasecmp(standaloneServerPool->key.c_str(), "SentinelServerPool") == 0) {
|
setSentinelServerPool(sentinelServerPool);
|
||||||
mStandaloneServerPool.refreshMethod = ServerPoolRefreshMethod::Sentinel;
|
mServerPoolType = ServerPool::Sentinel;
|
||||||
}
|
|
||||||
setStandaloneServerPool(standaloneServerPool);
|
|
||||||
mServerPoolType = ServerPool::Standalone;
|
|
||||||
} else {
|
} else {
|
||||||
Throw(LogicError, "Must define a server pool");
|
Throw(LogicError, "Must define a server pool");
|
||||||
}
|
}
|
||||||
if (dataCenter) {
|
if (dataCenter) {
|
||||||
setDataCenter(dataCenter);
|
setDataCenter(dataCenter);
|
||||||
}
|
}
|
||||||
for (auto& latencyMonitor : latencyMonitors) {
|
|
||||||
mLatencyMonitors.push_back(LatencyMonitorConf{});
|
|
||||||
setLatencyMonitor(mLatencyMonitors.back(), latencyMonitor);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setKeyPrefix(std::vector<std::string>& dat, const std::string& v)
|
static void setKeyPrefix(std::vector<std::string>& dat, const std::string& v)
|
||||||
@ -258,21 +239,28 @@ void Conf::setAuthority(const ConfParser::Node* node)
|
|||||||
|
|
||||||
bool Conf::setServerPool(ServerPoolConf& sp, const ConfParser::Node* p)
|
bool Conf::setServerPool(ServerPoolConf& sp, const ConfParser::Node* p)
|
||||||
{
|
{
|
||||||
bool ret = true;
|
|
||||||
if (setStr(sp.password, "Password", p)) {
|
if (setStr(sp.password, "Password", p)) {
|
||||||
|
return true;
|
||||||
} else if (setInt(sp.masterReadPriority, "MasterReadPriority", p, 0, 100)) {
|
} else if (setInt(sp.masterReadPriority, "MasterReadPriority", p, 0, 100)) {
|
||||||
|
return true;
|
||||||
} else if (setInt(sp.staticSlaveReadPriority, "StaticSlaveReadPriority", p, 0, 100)) {
|
} else if (setInt(sp.staticSlaveReadPriority, "StaticSlaveReadPriority", p, 0, 100)) {
|
||||||
|
return true;
|
||||||
} else if (setInt(sp.dynamicSlaveReadPriority, "DynamicSlaveReadPriority", p, 0, 100)) {
|
} else if (setInt(sp.dynamicSlaveReadPriority, "DynamicSlaveReadPriority", p, 0, 100)) {
|
||||||
|
return true;
|
||||||
} else if (setDuration(sp.refreshInterval, "RefreshInterval", p)) {
|
} else if (setDuration(sp.refreshInterval, "RefreshInterval", p)) {
|
||||||
|
return true;
|
||||||
} else if (setDuration(sp.serverTimeout, "ServerTimeout", p)) {
|
} else if (setDuration(sp.serverTimeout, "ServerTimeout", p)) {
|
||||||
|
return true;
|
||||||
} else if (setInt(sp.serverFailureLimit, "ServerFailureLimit", p, 1)) {
|
} else if (setInt(sp.serverFailureLimit, "ServerFailureLimit", p, 1)) {
|
||||||
|
return true;
|
||||||
} else if (setDuration(sp.serverRetryTimeout, "ServerRetryTimeout", p)) {
|
} else if (setDuration(sp.serverRetryTimeout, "ServerRetryTimeout", p)) {
|
||||||
|
return true;
|
||||||
} else if (setInt(sp.keepalive, "KeepAlive", p, 0)) {
|
} else if (setInt(sp.keepalive, "KeepAlive", p, 0)) {
|
||||||
|
return true;
|
||||||
} else if (setInt(sp.databases, "Databases", p, 1, 128)) {
|
} else if (setInt(sp.databases, "Databases", p, 1, 128)) {
|
||||||
} else {
|
return true;
|
||||||
ret = false;
|
|
||||||
}
|
}
|
||||||
return ret;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Conf::setClusterServerPool(const ConfParser::Node* node)
|
void Conf::setClusterServerPool(const ConfParser::Node* node)
|
||||||
@ -296,47 +284,41 @@ void Conf::setClusterServerPool(const ConfParser::Node* node)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Conf::setStandaloneServerPool(const ConfParser::Node* node)
|
void Conf::setSentinelServerPool(const ConfParser::Node* node)
|
||||||
{
|
{
|
||||||
if (!node->sub) {
|
if (!node->sub) {
|
||||||
Throw(InvalidValue, "%s:%d StandaloneServerPool require scope value", node->file, node->line);
|
Throw(InvalidValue, "%s:%d SentinelServerPool require scope value", node->file, node->line);
|
||||||
}
|
}
|
||||||
mStandaloneServerPool.hashTag[0] = '\0';
|
mSentinelServerPool.hashTag[0] = '\0';
|
||||||
mStandaloneServerPool.hashTag[1] = '\0';
|
mSentinelServerPool.hashTag[1] = '\0';
|
||||||
for (auto p = node->sub; p; p = p->next) {
|
for (auto p = node->sub; p; p = p->next) {
|
||||||
if (setServerPool(mStandaloneServerPool, p)) {
|
if (setServerPool(mSentinelServerPool, p)) {
|
||||||
} else if (strcasecmp(p->key.c_str(), "RefreshMethod") == 0) {
|
|
||||||
try {
|
|
||||||
mStandaloneServerPool.refreshMethod = ServerPoolRefreshMethod::parse(p->val.c_str());
|
|
||||||
} catch (ServerPoolRefreshMethod::InvalidEnumValue& excp) {
|
|
||||||
Throw(InvalidValue, "%s:%d unknown RefreshMethod:%s", p->file, p->line, p->val.c_str());
|
|
||||||
}
|
|
||||||
} else if (strcasecmp(p->key.c_str(), "Distribution") == 0) {
|
} else if (strcasecmp(p->key.c_str(), "Distribution") == 0) {
|
||||||
mStandaloneServerPool.dist = Distribution::parse(p->val.c_str());
|
mSentinelServerPool.dist = Distribution::parse(p->val.c_str());
|
||||||
if (mStandaloneServerPool.dist == Distribution::None) {
|
if (mSentinelServerPool.dist == Distribution::None) {
|
||||||
Throw(InvalidValue, "%s:%d unknown Distribution", p->file, p->line);
|
Throw(InvalidValue, "%s:%d unknown Distribution", p->file, p->line);
|
||||||
}
|
}
|
||||||
} else if (strcasecmp(p->key.c_str(), "Hash") == 0) {
|
} else if (strcasecmp(p->key.c_str(), "Hash") == 0) {
|
||||||
mStandaloneServerPool.hash = Hash::parse(p->val.c_str());
|
mSentinelServerPool.hash = Hash::parse(p->val.c_str());
|
||||||
if (mStandaloneServerPool.hash == Hash::None) {
|
if (mSentinelServerPool.hash == Hash::None) {
|
||||||
Throw(InvalidValue, "%s:%d unknown Hash", p->file, p->line);
|
Throw(InvalidValue, "%s:%d unknown Hash", p->file, p->line);
|
||||||
}
|
}
|
||||||
} else if (strcasecmp(p->key.c_str(), "HashTag") == 0) {
|
} else if (strcasecmp(p->key.c_str(), "HashTag") == 0) {
|
||||||
if (p->val.empty()) {
|
if (p->val.empty()) {
|
||||||
mStandaloneServerPool.hashTag[0] = '\0';
|
mSentinelServerPool.hashTag[0] = '\0';
|
||||||
mStandaloneServerPool.hashTag[1] = '\0';
|
mSentinelServerPool.hashTag[1] = '\0';
|
||||||
} else if (p->val.size() == 2) {
|
} else if (p->val.size() == 2) {
|
||||||
mStandaloneServerPool.hashTag[0] = p->val[0];
|
mSentinelServerPool.hashTag[0] = p->val[0];
|
||||||
mStandaloneServerPool.hashTag[1] = p->val[1];
|
mSentinelServerPool.hashTag[1] = p->val[1];
|
||||||
} else {
|
} else {
|
||||||
Throw(InvalidValue, "%s:%d HashTag invalid", p->file, p->line);
|
Throw(InvalidValue, "%s:%d HashTag invalid", p->file, p->line);
|
||||||
}
|
}
|
||||||
} else if (setServers(mStandaloneServerPool.sentinels, "Sentinels", p)) {
|
} else if (setServers(mSentinelServerPool.sentinels, "Sentinels", p)) {
|
||||||
mStandaloneServerPool.sentinelPassword = p->val;
|
mSentinelServerPool.sentinelPassword = p->val;
|
||||||
} else if (strcasecmp(p->key.c_str(), "Group") == 0) {
|
} else if (strcasecmp(p->key.c_str(), "Group") == 0) {
|
||||||
mStandaloneServerPool.groups.push_back(ServerGroupConf{p->val});
|
mSentinelServerPool.groups.push_back(ServerGroupConf{p->val});
|
||||||
if (p->sub) {
|
if (p->sub) {
|
||||||
auto& g = mStandaloneServerPool.groups.back();
|
auto& g = mSentinelServerPool.groups.back();
|
||||||
setServers(g.servers, "Group", p);
|
setServers(g.servers, "Group", p);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -344,31 +326,18 @@ void Conf::setStandaloneServerPool(const ConfParser::Node* node)
|
|||||||
p->file, p->line, p->key.c_str());
|
p->file, p->line, p->key.c_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (mStandaloneServerPool.groups.empty()) {
|
if (mSentinelServerPool.sentinels.empty()) {
|
||||||
Throw(LogicError, "StandaloneServerPool no server group");
|
Throw(LogicError, "SentinelServerPool no sentinel server");
|
||||||
}
|
}
|
||||||
if (mStandaloneServerPool.refreshMethod == ServerPoolRefreshMethod::None) {
|
if (mSentinelServerPool.groups.empty()) {
|
||||||
Throw(LogicError, "StandaloneServerPool must define RefreshMethod");
|
Throw(LogicError, "SentinelServerPool no server group");
|
||||||
} else if (mStandaloneServerPool.refreshMethod == ServerPoolRefreshMethod::Sentinel) {
|
|
||||||
if (mStandaloneServerPool.sentinels.empty()) {
|
|
||||||
Throw(LogicError, "StandaloneServerPool with RefreshMethod(sentinel) but no sentinel servers");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (!mStandaloneServerPool.sentinels.empty()) {
|
|
||||||
Throw(LogicError, "StandaloneServerPool with Sentinels but RefreshMethod is not sentinel");
|
|
||||||
}
|
|
||||||
for (auto& g : mStandaloneServerPool.groups) {
|
|
||||||
if (g.servers.empty()) {
|
|
||||||
Throw(LogicError, "Group(%s) must add servers", g.name.c_str());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (mStandaloneServerPool.groups.size() > 1) {
|
if (mSentinelServerPool.groups.size() > 1) {
|
||||||
if (mStandaloneServerPool.dist == Distribution::None) {
|
if (mSentinelServerPool.dist == Distribution::None) {
|
||||||
Throw(LogicError, "StandaloneServerPool must define Dsitribution in multi groups");
|
Throw(LogicError, "SentinelServerPool must define Dsitribution in multi groups");
|
||||||
}
|
}
|
||||||
if (mStandaloneServerPool.hash == Hash::None) {
|
if (mSentinelServerPool.hash == Hash::None) {
|
||||||
Throw(LogicError, "StandaloneServerPool must define Hash in multi groups");
|
Throw(LogicError, "SentinelServerPool must define Hash in multi groups");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -389,83 +358,6 @@ void Conf::setDataCenter(const ConfParser::Node* node)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Conf::setCustomCommand(const ConfParser::Node* node)
|
|
||||||
{
|
|
||||||
if (!node->sub) {
|
|
||||||
Throw(InvalidValue, "%s:%d CustomCommand require scope value", node->file, node->line);
|
|
||||||
}
|
|
||||||
for (auto p = node->sub; p; p = p->next) {
|
|
||||||
mCustomCommands.push_back(CustomCommandConf{});
|
|
||||||
auto& cc = mCustomCommands.back();
|
|
||||||
CustomCommandConf::init(cc, p->key.c_str());
|
|
||||||
auto s = p->sub;
|
|
||||||
for (;s ; s = s->next) {
|
|
||||||
if (setInt(cc.minArgs, "MinArgs", s, 2)) {
|
|
||||||
} else if (setInt(cc.maxArgs, "MaxArgs", s, 2, 9999)) {
|
|
||||||
} else if (setCommandMode(cc.mode, "Mode", s)) {
|
|
||||||
} else {
|
|
||||||
Throw(UnknownKey, "%s:%d unknown key %s", s->file, s->line, s->key.c_str());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (cc.maxArgs < cc.minArgs) {
|
|
||||||
Throw(InvalidValue, "%s:%d must be MaxArgs >= MinArgs", p->file, p->line);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (const auto& cc : mCustomCommands) {
|
|
||||||
Command::addCustomCommand(cc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Conf::setCommandMode(int& mode, const char* name, const ConfParser::Node* n, const int defaultMode)
|
|
||||||
{
|
|
||||||
if (strcasecmp(name, n->key.c_str()) != 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (n->val.size() == 0) {
|
|
||||||
mode = defaultMode;
|
|
||||||
} else {
|
|
||||||
mode = 0;
|
|
||||||
std::string mask;
|
|
||||||
std::istringstream is(n->val);
|
|
||||||
while (std::getline(is, mask, '|')) {
|
|
||||||
if ((strcasecmp(mask.c_str(), "write") == 0)) {
|
|
||||||
mode |= Command::Write;
|
|
||||||
} else if ((strcasecmp(mask.c_str(), "read") == 0)) {
|
|
||||||
mode |= Command::Read;
|
|
||||||
} else if ((strcasecmp(mask.c_str(), "admin") == 0)) {
|
|
||||||
mode |= Command::Admin;
|
|
||||||
} else if ((strcasecmp(mask.c_str(), "keyAt2") == 0)) {
|
|
||||||
mode |= Command::KeyAt2;
|
|
||||||
} else if ((strcasecmp(mask.c_str(), "keyAt3") == 0)) {
|
|
||||||
mode |= Command::KeyAt3;
|
|
||||||
} else {
|
|
||||||
Throw(InvalidValue, "%s:%d unknown mode %s", n->file, n->line, mask.c_str());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
switch (mode & Command::KeyMask) {
|
|
||||||
case 0:
|
|
||||||
case Command::KeyAt2:
|
|
||||||
case Command::KeyAt3:
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
Throw(InvalidValue, "%s:%d %s require exclusive key pos", n->file, n->line, name);
|
|
||||||
}
|
|
||||||
switch (mode & Command::AuthMask) {
|
|
||||||
case 0:
|
|
||||||
mode |= Command::Write;
|
|
||||||
break;
|
|
||||||
case Command::Read:
|
|
||||||
case Command::Write:
|
|
||||||
case Command::Admin:
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
Throw(InvalidValue, "%s:%d %s require exclusive mode", n->file, n->line, name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void Conf::setDC(DCConf& dc, const ConfParser::Node* node)
|
void Conf::setDC(DCConf& dc, const ConfParser::Node* node)
|
||||||
{
|
{
|
||||||
if (!node->sub) {
|
if (!node->sub) {
|
||||||
|
|||||||
32
src/Conf.h
32
src/Conf.h
@ -19,8 +19,6 @@
|
|||||||
#include "Distribution.h"
|
#include "Distribution.h"
|
||||||
#include "ConfParser.h"
|
#include "ConfParser.h"
|
||||||
#include "Auth.h"
|
#include "Auth.h"
|
||||||
#include "Command.h"
|
|
||||||
#include "Enums.h"
|
|
||||||
|
|
||||||
struct AuthConf
|
struct AuthConf
|
||||||
{
|
{
|
||||||
@ -52,10 +50,10 @@ struct ServerPoolConf
|
|||||||
int staticSlaveReadPriority = 0;
|
int staticSlaveReadPriority = 0;
|
||||||
int dynamicSlaveReadPriority = 0;
|
int dynamicSlaveReadPriority = 0;
|
||||||
long refreshInterval = 1000000; //us
|
long refreshInterval = 1000000; //us
|
||||||
long serverTimeout = 0; //us
|
long serverTimeout = 1000000; //us
|
||||||
int serverFailureLimit = 10;
|
int serverFailureLimit = 10;
|
||||||
long serverRetryTimeout = 1000000; //us
|
long serverRetryTimeout = 1000000; //us
|
||||||
int keepalive = 0; //seconds
|
int keepalive = 120; //seconds
|
||||||
int databases = 1;
|
int databases = 1;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -64,9 +62,8 @@ struct ClusterServerPoolConf : public ServerPoolConf
|
|||||||
std::vector<ServerConf> servers;
|
std::vector<ServerConf> servers;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct StandaloneServerPoolConf : public ServerPoolConf
|
struct SentinelServerPoolConf : public ServerPoolConf
|
||||||
{
|
{
|
||||||
ServerPoolRefreshMethod refreshMethod = ServerPoolRefreshMethod::None;
|
|
||||||
Distribution dist = Distribution::None;
|
Distribution dist = Distribution::None;
|
||||||
Hash hash = Hash::None;
|
Hash hash = Hash::None;
|
||||||
char hashTag[2];
|
char hashTag[2];
|
||||||
@ -92,20 +89,10 @@ struct DCConf
|
|||||||
struct LatencyMonitorConf
|
struct LatencyMonitorConf
|
||||||
{
|
{
|
||||||
std::string name;
|
std::string name;
|
||||||
std::bitset<Command::AvailableCommands> cmds;
|
std::bitset<Command::Sentinel> cmds;
|
||||||
std::vector<long> timeSpan;//us
|
std::vector<long> timeSpan;//us
|
||||||
};
|
};
|
||||||
|
|
||||||
struct CustomCommandConf
|
|
||||||
{
|
|
||||||
std::string name;
|
|
||||||
int minArgs;
|
|
||||||
int maxArgs;
|
|
||||||
int mode;
|
|
||||||
|
|
||||||
static void init(CustomCommandConf &c, const char* name);
|
|
||||||
};
|
|
||||||
|
|
||||||
class Conf
|
class Conf
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -177,9 +164,9 @@ public:
|
|||||||
{
|
{
|
||||||
return mClusterServerPool;
|
return mClusterServerPool;
|
||||||
}
|
}
|
||||||
const StandaloneServerPoolConf& standaloneServerPool() const
|
const SentinelServerPoolConf& sentinelServerPool() const
|
||||||
{
|
{
|
||||||
return mStandaloneServerPool;
|
return mSentinelServerPool;
|
||||||
}
|
}
|
||||||
const std::string& localDC() const
|
const std::string& localDC() const
|
||||||
{
|
{
|
||||||
@ -200,7 +187,7 @@ private:
|
|||||||
void setGlobal(const ConfParser::Node* node);
|
void setGlobal(const ConfParser::Node* node);
|
||||||
void setAuthority(const ConfParser::Node* node);
|
void setAuthority(const ConfParser::Node* node);
|
||||||
void setClusterServerPool(const ConfParser::Node* node);
|
void setClusterServerPool(const ConfParser::Node* node);
|
||||||
void setStandaloneServerPool(const ConfParser::Node* node);
|
void setSentinelServerPool(const ConfParser::Node* node);
|
||||||
void setDataCenter(const ConfParser::Node* node);
|
void setDataCenter(const ConfParser::Node* node);
|
||||||
void check();
|
void check();
|
||||||
bool setServerPool(ServerPoolConf& sp, const ConfParser::Node* n);
|
bool setServerPool(ServerPoolConf& sp, const ConfParser::Node* n);
|
||||||
@ -214,8 +201,6 @@ private:
|
|||||||
void setDC(DCConf& dc, const ConfParser::Node* n);
|
void setDC(DCConf& dc, const ConfParser::Node* n);
|
||||||
void setReadPolicy(ReadPolicyConf& c, const ConfParser::Node* n);
|
void setReadPolicy(ReadPolicyConf& c, const ConfParser::Node* n);
|
||||||
void setLatencyMonitor(LatencyMonitorConf& m, const ConfParser::Node* n);
|
void setLatencyMonitor(LatencyMonitorConf& m, const ConfParser::Node* n);
|
||||||
void setCustomCommand(const ConfParser::Node* n);
|
|
||||||
bool setCommandMode(int& mode, const char* name, const ConfParser::Node* n, const int defaultMode = Command::Write);
|
|
||||||
private:
|
private:
|
||||||
std::string mName;
|
std::string mName;
|
||||||
std::string mBind;
|
std::string mBind;
|
||||||
@ -231,11 +216,10 @@ private:
|
|||||||
std::vector<AuthConf> mAuthConfs;
|
std::vector<AuthConf> mAuthConfs;
|
||||||
int mServerPoolType;
|
int mServerPoolType;
|
||||||
ClusterServerPoolConf mClusterServerPool;
|
ClusterServerPoolConf mClusterServerPool;
|
||||||
StandaloneServerPoolConf mStandaloneServerPool;
|
SentinelServerPoolConf mSentinelServerPool;
|
||||||
std::vector<DCConf> mDCConfs;
|
std::vector<DCConf> mDCConfs;
|
||||||
std::string mLocalDC;
|
std::string mLocalDC;
|
||||||
std::vector<LatencyMonitorConf> mLatencyMonitors;
|
std::vector<LatencyMonitorConf> mLatencyMonitors;
|
||||||
std::vector<CustomCommandConf> mCustomCommands;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -283,22 +283,20 @@ Done:
|
|||||||
case SValBody:
|
case SValBody:
|
||||||
return KeyVal;
|
return KeyVal;
|
||||||
case VValBody:
|
case VValBody:
|
||||||
{
|
|
||||||
auto ret = KeyVal;
|
|
||||||
val.assign(line, pos, line.size() - pos);
|
val.assign(line, pos, line.size() - pos);
|
||||||
if (val.back() == '{') {
|
if (val.back() == '{') {
|
||||||
val.resize(val.size() - 1);
|
val.resize(val.size() - 1);
|
||||||
ret = BeginScope;
|
int vsp = 0;
|
||||||
}
|
for (auto it = val.rbegin(); it != val.rend(); ++it) {
|
||||||
int vsp = 0;
|
if (isspace(*it)) {
|
||||||
for (auto it = val.rbegin(); it != val.rend(); ++it) {
|
++vsp;
|
||||||
if (isspace(*it)) {
|
}
|
||||||
++vsp;
|
|
||||||
}
|
}
|
||||||
|
val.resize(val.size() - vsp);
|
||||||
|
return BeginScope;
|
||||||
|
} else {
|
||||||
|
return KeyVal;
|
||||||
}
|
}
|
||||||
val.resize(val.size() - vsp);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
case ScopeReady:
|
case ScopeReady:
|
||||||
return KeyVal;
|
return KeyVal;
|
||||||
case ScopeBody:
|
case ScopeBody:
|
||||||
|
|||||||
@ -23,7 +23,6 @@ public:
|
|||||||
typedef ConnectConnection Value;
|
typedef ConnectConnection Value;
|
||||||
typedef ListNode<ConnectConnection> ListNodeType;
|
typedef ListNode<ConnectConnection> ListNodeType;
|
||||||
typedef DequeNode<ConnectConnection> DequeNodeType;
|
typedef DequeNode<ConnectConnection> DequeNodeType;
|
||||||
typedef Alloc<ConnectConnection, Const::ConnectConnectionAllocCacheSize> Allocator;
|
|
||||||
public:
|
public:
|
||||||
ConnectConnection(Server* s, bool shared);
|
ConnectConnection(Server* s, bool shared);
|
||||||
~ConnectConnection();
|
~ConnectConnection();
|
||||||
@ -98,6 +97,6 @@ private:
|
|||||||
|
|
||||||
typedef List<ConnectConnection> ConnectConnectionList;
|
typedef List<ConnectConnection> ConnectConnectionList;
|
||||||
typedef Deque<ConnectConnection> ConnectConnectionDeque;
|
typedef Deque<ConnectConnection> ConnectConnectionDeque;
|
||||||
typedef ConnectConnection::Allocator ConnectConnectionAlloc;
|
typedef Alloc<ConnectConnection, Const::ConnectConnectionAllocCacheSize> ConnectConnectionAlloc;
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -27,7 +27,7 @@ BufferPtr Connection::getBuffer(Handler* h, bool allowNew)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!mBuf || mBuf->full()) {
|
if (!mBuf || mBuf->full()) {
|
||||||
BufferPtr buf = BufferAlloc::create();
|
BufferPtr buf = Alloc<Buffer>::create();
|
||||||
if (mBuf) {
|
if (mBuf) {
|
||||||
mBuf->concat(buf);
|
mBuf->concat(buf);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,15 +0,0 @@
|
|||||||
/*
|
|
||||||
* predixy - A high performance and full features proxy for redis.
|
|
||||||
* Copyright (C) 2017 Joyield, Inc. <joyield.com@gmail.com>
|
|
||||||
* All rights reserved.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "Enums.h"
|
|
||||||
|
|
||||||
const ServerPoolRefreshMethod::TypeName
|
|
||||||
ServerPoolRefreshMethod::sPairs[3] = {
|
|
||||||
{ServerPoolRefreshMethod::None, "none"},
|
|
||||||
{ServerPoolRefreshMethod::Fixed, "fixed"},
|
|
||||||
{ServerPoolRefreshMethod::Sentinel, "sentinel"},
|
|
||||||
};
|
|
||||||
|
|
||||||
74
src/Enums.h
74
src/Enums.h
@ -1,74 +0,0 @@
|
|||||||
/*
|
|
||||||
* predixy - A high performance and full features proxy for redis.
|
|
||||||
* Copyright (C) 2017 Joyield, Inc. <joyield.com@gmail.com>
|
|
||||||
* All rights reserved.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _PREDIXY_ENUMS_H_
|
|
||||||
#define _PREDIXY_ENUMS_H_
|
|
||||||
|
|
||||||
#include <string.h>
|
|
||||||
#include <strings.h>
|
|
||||||
#include "Exception.h"
|
|
||||||
|
|
||||||
template<class T>
|
|
||||||
class EnumBase
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
DefException(InvalidEnumValue);
|
|
||||||
struct TypeName
|
|
||||||
{
|
|
||||||
int type;
|
|
||||||
const char* name;
|
|
||||||
};
|
|
||||||
public:
|
|
||||||
EnumBase(int t):
|
|
||||||
mType(t)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
int value() const
|
|
||||||
{
|
|
||||||
return mType;
|
|
||||||
}
|
|
||||||
bool operator==(const T& t) const
|
|
||||||
{
|
|
||||||
return t.value() == mType;
|
|
||||||
}
|
|
||||||
bool operator!=(const T& t) const
|
|
||||||
{
|
|
||||||
return t.value() != mType;
|
|
||||||
}
|
|
||||||
const char* name() const
|
|
||||||
{
|
|
||||||
return T::sPairs[mType].name;
|
|
||||||
}
|
|
||||||
static T parse(const char* str)
|
|
||||||
{
|
|
||||||
for (auto& i : T::sPairs) {
|
|
||||||
if (strcasecmp(i.name, str) == 0) {
|
|
||||||
return T(typename T::Type(i.type));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Throw(InvalidEnumValue, "invalid enum value:%s", str);
|
|
||||||
}
|
|
||||||
protected:
|
|
||||||
int mType;
|
|
||||||
};
|
|
||||||
|
|
||||||
class ServerPoolRefreshMethod : public EnumBase<ServerPoolRefreshMethod>
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
enum Type
|
|
||||||
{
|
|
||||||
None,
|
|
||||||
Fixed,
|
|
||||||
Sentinel
|
|
||||||
};
|
|
||||||
static const TypeName sPairs[3];
|
|
||||||
ServerPoolRefreshMethod(Type t = None):
|
|
||||||
EnumBase<ServerPoolRefreshMethod>(t)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
#endif
|
|
||||||
@ -31,9 +31,12 @@ bool EpollMultiplexor::addSocket(Socket* s, int evts)
|
|||||||
event.events |= (evts & ReadEvent) ? EPOLLIN : 0;
|
event.events |= (evts & ReadEvent) ? EPOLLIN : 0;
|
||||||
event.events |= (evts & WriteEvent) ? EPOLLOUT : 0;
|
event.events |= (evts & WriteEvent) ? EPOLLOUT : 0;
|
||||||
event.events |= EPOLLET;
|
event.events |= EPOLLET;
|
||||||
|
//event.events |= EPOLLONESHOT;
|
||||||
event.data.ptr = s;
|
event.data.ptr = s;
|
||||||
s->setEvent(evts);
|
|
||||||
int ret = epoll_ctl(mFd, EPOLL_CTL_ADD, s->fd(), &event);
|
int ret = epoll_ctl(mFd, EPOLL_CTL_ADD, s->fd(), &event);
|
||||||
|
if (ret == 0) {
|
||||||
|
s->setEvent(evts);
|
||||||
|
}
|
||||||
return ret == 0;
|
return ret == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,6 +61,7 @@ bool EpollMultiplexor::addEvent(Socket* s, int evts)
|
|||||||
}
|
}
|
||||||
if ((s->getEvent() | evts) != s->getEvent()) {
|
if ((s->getEvent() | evts) != s->getEvent()) {
|
||||||
event.events |= EPOLLET;
|
event.events |= EPOLLET;
|
||||||
|
//event.events |= EPOLLONESHOT;
|
||||||
int ret = epoll_ctl(mFd, EPOLL_CTL_MOD, s->fd(), &event);
|
int ret = epoll_ctl(mFd, EPOLL_CTL_MOD, s->fd(), &event);
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
s->setEvent(s->getEvent() | evts);
|
s->setEvent(s->getEvent() | evts);
|
||||||
|
|||||||
@ -215,6 +215,7 @@ void Handler::postAcceptConnectionEvent()
|
|||||||
auto cp = mConnPool[s->server()->id()];
|
auto cp = mConnPool[s->server()->id()];
|
||||||
s->setStatus(Connection::LogicError);
|
s->setStatus(Connection::LogicError);
|
||||||
addPostEvent(s, Multiplexor::ErrorEvent);
|
addPostEvent(s, Multiplexor::ErrorEvent);
|
||||||
|
cp->putPrivateConnection(s);
|
||||||
c->detachConnectConnection();
|
c->detachConnectConnection();
|
||||||
s->detachAcceptConnection();
|
s->detachAcceptConnection();
|
||||||
}
|
}
|
||||||
@ -275,9 +276,6 @@ void Handler::postConnectConnectionEvent()
|
|||||||
s->status(), s->statusStr());
|
s->status(), s->statusStr());
|
||||||
mEventLoop->delSocket(s);
|
mEventLoop->delSocket(s);
|
||||||
s->close(this);
|
s->close(this);
|
||||||
if (!s->isShared()) {
|
|
||||||
mConnPool[s->server()->id()]->putPrivateConnection(s);
|
|
||||||
}
|
|
||||||
if (c) {
|
if (c) {
|
||||||
addPostEvent(c, Multiplexor::ErrorEvent);
|
addPostEvent(c, Multiplexor::ErrorEvent);
|
||||||
s->detachAcceptConnection();
|
s->detachAcceptConnection();
|
||||||
@ -317,6 +315,7 @@ void Handler::addAcceptSocket(int fd, sockaddr* addr, socklen_t len)
|
|||||||
AcceptConnection* c = nullptr;
|
AcceptConnection* c = nullptr;
|
||||||
try {
|
try {
|
||||||
c = AcceptConnectionAlloc::create(fd, addr, len);
|
c = AcceptConnectionAlloc::create(fd, addr, len);
|
||||||
|
logNotice("h %d accept c %s %d", id(), c->peer(), fd);
|
||||||
} catch (ExceptionBase& e) {
|
} catch (ExceptionBase& e) {
|
||||||
logWarn("h %d create connection for client %d fail %s",
|
logWarn("h %d create connection for client %d fail %s",
|
||||||
id(), fd, e.what());
|
id(), fd, e.what());
|
||||||
@ -369,8 +368,6 @@ void Handler::addAcceptSocket(int fd, sockaddr* addr, socklen_t len)
|
|||||||
logWarn("h %d destroy c %s %d with add to event loop fail:%s",
|
logWarn("h %d destroy c %s %d with add to event loop fail:%s",
|
||||||
id(), c->peer(), c->fd(), StrError());
|
id(), c->peer(), c->fd(), StrError());
|
||||||
AcceptConnectionAlloc::destroy(c);
|
AcceptConnectionAlloc::destroy(c);
|
||||||
} else {
|
|
||||||
logNotice("h %d accept c %s %d assign to h %d", id(), c->peer(), fd, dst->id());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -67,5 +67,6 @@ int KqueueMultiplexor::wait(long usec, T* handler)
|
|||||||
|
|
||||||
|
|
||||||
typedef KqueueMultiplexor Multiplexor;
|
typedef KqueueMultiplexor Multiplexor;
|
||||||
|
#define _MULTIPLEXOR_ASYNC_ASSIGN_
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -98,7 +98,7 @@ public:
|
|||||||
Buffer* output(Buffer* buf) const;
|
Buffer* output(Buffer* buf) const;
|
||||||
private:
|
private:
|
||||||
String mName;
|
String mName;
|
||||||
const std::bitset<Command::AvailableCommands>* mCmds;
|
const std::bitset<Command::Sentinel>* mCmds;
|
||||||
std::vector<TimeSpan> mTimeSpan;
|
std::vector<TimeSpan> mTimeSpan;
|
||||||
TimeSpan mLast;
|
TimeSpan mLast;
|
||||||
};
|
};
|
||||||
|
|||||||
@ -72,7 +72,6 @@ objs = \
|
|||||||
Buffer.o \
|
Buffer.o \
|
||||||
Command.o \
|
Command.o \
|
||||||
Distribution.o \
|
Distribution.o \
|
||||||
Enums.o \
|
|
||||||
Reply.o \
|
Reply.o \
|
||||||
ConfParser.o \
|
ConfParser.o \
|
||||||
Conf.o \
|
Conf.o \
|
||||||
@ -88,7 +87,7 @@ objs = \
|
|||||||
ServerPool.o \
|
ServerPool.o \
|
||||||
ClusterNodesParser.o \
|
ClusterNodesParser.o \
|
||||||
ClusterServerPool.o \
|
ClusterServerPool.o \
|
||||||
StandaloneServerPool.o \
|
SentinelServerPool.o \
|
||||||
ConnectConnectionPool.o \
|
ConnectConnectionPool.o \
|
||||||
Handler.o \
|
Handler.o \
|
||||||
Proxy.o \
|
Proxy.o \
|
||||||
|
|||||||
@ -118,10 +118,10 @@ bool Proxy::init(int argc, char* argv[])
|
|||||||
mServPool = p;
|
mServPool = p;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case ServerPool::Standalone:
|
case ServerPool::Sentinel:
|
||||||
{
|
{
|
||||||
StandaloneServerPool* p = new StandaloneServerPool(this);
|
SentinelServerPool* p = new SentinelServerPool(this);
|
||||||
p->init(mConf->standaloneServerPool());
|
p->init(mConf->sentinelServerPool());
|
||||||
mServPool = p;
|
mServPool = p;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|||||||
@ -13,7 +13,7 @@
|
|||||||
#include "DC.h"
|
#include "DC.h"
|
||||||
#include "ServerPool.h"
|
#include "ServerPool.h"
|
||||||
#include "ClusterServerPool.h"
|
#include "ClusterServerPool.h"
|
||||||
#include "StandaloneServerPool.h"
|
#include "SentinelServerPool.h"
|
||||||
#include "LatencyMonitor.h"
|
#include "LatencyMonitor.h"
|
||||||
|
|
||||||
class Proxy
|
class Proxy
|
||||||
@ -51,15 +51,15 @@ public:
|
|||||||
}
|
}
|
||||||
bool isSplitMultiKey() const
|
bool isSplitMultiKey() const
|
||||||
{
|
{
|
||||||
return mConf->standaloneServerPool().groups.size() != 1;
|
return mConf->sentinelServerPool().groups.size() != 1;
|
||||||
}
|
}
|
||||||
bool supportTransaction() const
|
bool supportTransaction() const
|
||||||
{
|
{
|
||||||
return mConf->standaloneServerPool().groups.size() == 1;
|
return mConf->sentinelServerPool().groups.size() == 1;
|
||||||
}
|
}
|
||||||
bool supportSubscribe() const
|
bool supportSubscribe() const
|
||||||
{
|
{
|
||||||
return mConf->standaloneServerPool().groups.size() == 1 ||
|
return mConf->sentinelServerPool().groups.size() == 1 ||
|
||||||
mConf->clusterServerPool().servers.size() > 0;
|
mConf->clusterServerPool().servers.size() > 0;
|
||||||
}
|
}
|
||||||
const std::vector<Handler*>& handlers() const
|
const std::vector<Handler*>& handlers() const
|
||||||
|
|||||||
@ -98,12 +98,10 @@ Request::Request(GenericCode code):
|
|||||||
|
|
||||||
Request::~Request()
|
Request::~Request()
|
||||||
{
|
{
|
||||||
clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Request::clear()
|
void Request::clear()
|
||||||
{
|
{
|
||||||
mConn = nullptr;
|
|
||||||
mRes = nullptr;
|
mRes = nullptr;
|
||||||
mHead.clear();
|
mHead.clear();
|
||||||
mReq.clear();
|
mReq.clear();
|
||||||
@ -157,14 +155,13 @@ void Request::set(const RequestParser& p, Request* leader)
|
|||||||
}
|
}
|
||||||
mHead = r->mReq;
|
mHead = r->mReq;
|
||||||
mReq = p.request();
|
mReq = p.request();
|
||||||
|
mLeader = leader;
|
||||||
if (leader == this) {
|
if (leader == this) {
|
||||||
if (mType == Command::Mset || mType == Command::Msetnx) {
|
if (mType == Command::Mset || mType == Command::Msetnx) {
|
||||||
mFollowers = (p.argNum() - 1) >> 1;
|
mFollowers = (p.argNum() - 1) >> 1;
|
||||||
} else {
|
} else {
|
||||||
mFollowers = p.argNum() - 1;
|
mFollowers = p.argNum() - 1;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
mLeader = leader;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
mReq = p.request();
|
mReq = p.request();
|
||||||
@ -290,11 +287,10 @@ void Request::adjustScanCursor(long cursor)
|
|||||||
|
|
||||||
void Request::follow(Request* leader)
|
void Request::follow(Request* leader)
|
||||||
{
|
{
|
||||||
leader->mFollowers += 1;
|
++mFollowers;
|
||||||
if (leader == this) {
|
if (leader == this) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
mConn = leader->mConn;
|
|
||||||
mType = leader->mType;
|
mType = leader->mType;
|
||||||
mHead = leader->mHead;
|
mHead = leader->mHead;
|
||||||
mReq = leader->mReq;
|
mReq = leader->mReq;
|
||||||
@ -342,49 +338,49 @@ int Request::fill(IOVec* vecs, int len)
|
|||||||
void Request::setResponse(Response* res)
|
void Request::setResponse(Response* res)
|
||||||
{
|
{
|
||||||
mDone = true;
|
mDone = true;
|
||||||
if (Request* ld = leader()) {
|
if (mLeader) {
|
||||||
ld->mFollowersDone += 1;
|
mLeader->mFollowersDone += 1;
|
||||||
switch (mType) {
|
switch (mType) {
|
||||||
case Command::Mget:
|
case Command::Mget:
|
||||||
mRes = res;
|
mRes = res;
|
||||||
break;
|
break;
|
||||||
case Command::Mset:
|
case Command::Mset:
|
||||||
if (Response* leaderRes = ld->getResponse()) {
|
if (Response* leaderRes = mLeader->getResponse()) {
|
||||||
if (res->isError() && !leaderRes->isError()) {
|
if (res->isError() && !leaderRes->isError()) {
|
||||||
ld->mRes = res;
|
mLeader->mRes = res;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ld->mRes = res;
|
mLeader->mRes = res;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Command::Msetnx:
|
case Command::Msetnx:
|
||||||
if (Response* leaderRes = ld->getResponse()) {
|
if (Response* leaderRes = mLeader->getResponse()) {
|
||||||
if (!leaderRes->isError() &&
|
if (!leaderRes->isError() &&
|
||||||
(res->isError() || res->integer() == 0)) {
|
(res->isError() || res->integer() == 0)) {
|
||||||
ld->mRes = res;
|
mLeader->mRes = res;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ld->mRes = res;
|
mLeader->mRes = res;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Command::Touch:
|
case Command::Touch:
|
||||||
case Command::Exists:
|
case Command::Exists:
|
||||||
case Command::Del:
|
case Command::Del:
|
||||||
case Command::Unlink:
|
case Command::Unlink:
|
||||||
if (!ld->mRes) {
|
if (!mLeader->mRes) {
|
||||||
ld->mRes = res;
|
mLeader->mRes = res;
|
||||||
}
|
}
|
||||||
if (ld->isDone()) {
|
if (mLeader->isDone()) {
|
||||||
ld->mRes->set(ld->mRes->integer());
|
mLeader->mRes->set(mLeader->mRes->integer());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Command::ScriptLoad:
|
case Command::ScriptLoad:
|
||||||
if (Response* leaderRes = ld->getResponse()) {
|
if (Response* leaderRes = mLeader->getResponse()) {
|
||||||
if (leaderRes->isString() && !res->isString()) {
|
if (leaderRes->isString() && !res->isString()) {
|
||||||
ld->mRes = res;
|
mLeader->mRes = res;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ld->mRes = res;
|
mLeader->mRes = res;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
@ -399,7 +395,7 @@ void Request::setResponse(Response* res)
|
|||||||
|
|
||||||
bool Request::isDone() const
|
bool Request::isDone() const
|
||||||
{
|
{
|
||||||
if (isLeader()) {
|
if (mLeader == this) {
|
||||||
switch (mType) {
|
switch (mType) {
|
||||||
case Command::Mget:
|
case Command::Mget:
|
||||||
case Command::Psubscribe:
|
case Command::Psubscribe:
|
||||||
|
|||||||
@ -25,7 +25,6 @@ class Request :
|
|||||||
public:
|
public:
|
||||||
typedef Request Value;
|
typedef Request Value;
|
||||||
typedef ListNode<Request, SharePtr<Request>, RequestListIndex::Size> ListNodeType;
|
typedef ListNode<Request, SharePtr<Request>, RequestListIndex::Size> ListNodeType;
|
||||||
typedef Alloc<Request, Const::RequestAllocCacheSize> Allocator;
|
|
||||||
static const int MaxRedirectLimit = 3;
|
static const int MaxRedirectLimit = 3;
|
||||||
enum GenericCode
|
enum GenericCode
|
||||||
{
|
{
|
||||||
@ -120,11 +119,11 @@ public:
|
|||||||
}
|
}
|
||||||
Request* leader() const
|
Request* leader() const
|
||||||
{
|
{
|
||||||
return isLeader() ? const_cast<Request*>(this) : (Request*)mLeader;
|
return mLeader;
|
||||||
}
|
}
|
||||||
bool isLeader() const
|
bool isLeader() const
|
||||||
{
|
{
|
||||||
return mFollowers > 0;
|
return mLeader == this;
|
||||||
}
|
}
|
||||||
bool isDelivered() const
|
bool isDelivered() const
|
||||||
{
|
{
|
||||||
@ -182,6 +181,6 @@ private:
|
|||||||
|
|
||||||
typedef List<Request, RequestListIndex::Recv> RecvRequestList;
|
typedef List<Request, RequestListIndex::Recv> RecvRequestList;
|
||||||
typedef List<Request, RequestListIndex::Send> SendRequestList;
|
typedef List<Request, RequestListIndex::Send> SendRequestList;
|
||||||
typedef Request::Allocator RequestAlloc;
|
typedef Alloc<Request, Const::RequestAllocCacheSize> RequestAlloc;
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -18,7 +18,6 @@ class Response :
|
|||||||
public:
|
public:
|
||||||
typedef Response Value;
|
typedef Response Value;
|
||||||
typedef ListNode<Response, SharePtr<Response>> ListNodeType;
|
typedef ListNode<Response, SharePtr<Response>> ListNodeType;
|
||||||
typedef Alloc<Response, Const::ResponseAllocCacheSize> Allocator;
|
|
||||||
enum GenericCode
|
enum GenericCode
|
||||||
{
|
{
|
||||||
Pong,
|
Pong,
|
||||||
@ -138,6 +137,6 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
typedef List<Response> ResponseList;
|
typedef List<Response> ResponseList;
|
||||||
typedef Response::Allocator ResponseAlloc;
|
typedef Alloc<Response, Const::ResponseAllocCacheSize> ResponseAlloc;
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -162,9 +162,6 @@ Server* ServerGroup::getReadServer(Handler* h, DC* localDC) const
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
DC* dc = s->dc();
|
DC* dc = s->dc();
|
||||||
if (!dc) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
int dcrp = localDC->getReadPriority(dc);
|
int dcrp = localDC->getReadPriority(dc);
|
||||||
if (dcrp <= 0) {
|
if (dcrp <= 0) {
|
||||||
continue;
|
continue;
|
||||||
@ -224,7 +221,7 @@ Server* ServerGroup::getReadServer(Handler* h, DC* localDC) const
|
|||||||
dc = sdc[0];
|
dc = sdc[0];
|
||||||
found = true;
|
found = true;
|
||||||
}
|
}
|
||||||
if (!found) {
|
if (!found) {//dc maybe nullptr even we found
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
Server* deadServs[Const::MaxServInGroup];
|
Server* deadServs[Const::MaxServInGroup];
|
||||||
|
|||||||
@ -20,7 +20,7 @@ public:
|
|||||||
{
|
{
|
||||||
Unknown,
|
Unknown,
|
||||||
Cluster,
|
Cluster,
|
||||||
Standalone
|
Sentinel
|
||||||
};
|
};
|
||||||
static const int DefaultServerRetryTimeout = 10000000;
|
static const int DefaultServerRetryTimeout = 10000000;
|
||||||
static const int DefaultRefreshInterval = 1000000;
|
static const int DefaultRefreshInterval = 1000000;
|
||||||
|
|||||||
@ -102,7 +102,7 @@ void Socket::getFirstAddr(const char* addr, int type, int protocol, sockaddr* re
|
|||||||
} else {
|
} else {
|
||||||
std::string tmp;
|
std::string tmp;
|
||||||
const char* host = addr;
|
const char* host = addr;
|
||||||
const char* port = strrchr(addr, ':');
|
const char* port = strchr(addr, ':');
|
||||||
if (port) {
|
if (port) {
|
||||||
tmp.append(addr, port - addr);
|
tmp.append(addr, port - addr);
|
||||||
host = tmp.c_str();
|
host = tmp.c_str();
|
||||||
@ -162,7 +162,6 @@ bool Socket::setTcpKeepAlive(int interval)
|
|||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
#ifdef __linux__
|
|
||||||
val = interval;
|
val = interval;
|
||||||
ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val));
|
ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val));
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
@ -178,9 +177,6 @@ bool Socket::setTcpKeepAlive(int interval)
|
|||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
#else
|
|
||||||
((void)interval); //Avoid unused var warning for non Linux systems
|
|
||||||
#endif
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,482 +0,0 @@
|
|||||||
/*
|
|
||||||
* predixy - A high performance and full features proxy for redis.
|
|
||||||
* Copyright (C) 2017 Joyield, Inc. <joyield.com@gmail.com>
|
|
||||||
* All rights reserved.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include <algorithm>
|
|
||||||
#include "Logger.h"
|
|
||||||
#include "ServerGroup.h"
|
|
||||||
#include "Handler.h"
|
|
||||||
#include "StandaloneServerPool.h"
|
|
||||||
|
|
||||||
StandaloneServerPool::StandaloneServerPool(Proxy* p):
|
|
||||||
ServerPoolTmpl(p, Standalone),
|
|
||||||
mDist(Distribution::Modula)
|
|
||||||
{
|
|
||||||
mSentinels.reserve(MaxSentinelNum);
|
|
||||||
mServPool.reserve(Const::MaxServNum);
|
|
||||||
mHashTag[0] = mHashTag[1] = '\0';
|
|
||||||
}
|
|
||||||
|
|
||||||
StandaloneServerPool::~StandaloneServerPool()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void StandaloneServerPool::init(const StandaloneServerPoolConf& conf)
|
|
||||||
{
|
|
||||||
ServerPool::init(conf);
|
|
||||||
mRefreshMethod = conf.refreshMethod;
|
|
||||||
mDist = conf.dist;
|
|
||||||
mHash = conf.hash;
|
|
||||||
mHashTag[0] = conf.hashTag[0];
|
|
||||||
mHashTag[1] = conf.hashTag[1];
|
|
||||||
int i = 0;
|
|
||||||
if (conf.refreshMethod == ServerPoolRefreshMethod::Sentinel) {
|
|
||||||
mSentinels.resize(conf.sentinels.size());
|
|
||||||
for (auto& sc : conf.sentinels) {
|
|
||||||
Server* s = new Server(this, sc.addr, true);
|
|
||||||
s->setRole(Server::Sentinel);
|
|
||||||
s->setPassword(sc.password.empty() ? conf.sentinelPassword:sc.password);
|
|
||||||
mSentinels[i++] = s;
|
|
||||||
mServs[s->addr()] = s;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
mGroupPool.resize(conf.groups.size());
|
|
||||||
i = 0;
|
|
||||||
for (auto& gc : conf.groups) {
|
|
||||||
ServerGroup* g = new ServerGroup(this, gc.name);
|
|
||||||
mGroupPool[i++] = g;
|
|
||||||
auto role = Server::Master;
|
|
||||||
for (auto& sc : gc.servers) {
|
|
||||||
Server* s = new Server(this, sc.addr, true);
|
|
||||||
s->setPassword(sc.password.empty() ? conf.password : sc.password);
|
|
||||||
mServPool.push_back(s);
|
|
||||||
mServs[s->addr()] = s;
|
|
||||||
g->add(s);
|
|
||||||
s->setGroup(g);
|
|
||||||
switch (mRefreshMethod.value()) {
|
|
||||||
case ServerPoolRefreshMethod::Fixed:
|
|
||||||
s->setOnline(true);
|
|
||||||
s->setRole(role);
|
|
||||||
role = Server::Slave;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
s->setOnline(false);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Server* StandaloneServerPool::getServer(Handler* h, Request* req, const String& key) const
|
|
||||||
{
|
|
||||||
FuncCallTimer();
|
|
||||||
switch (req->type()) {
|
|
||||||
case Command::SentinelGetMaster:
|
|
||||||
case Command::SentinelSlaves:
|
|
||||||
case Command::SentinelSentinels:
|
|
||||||
if (mSentinels.empty()) {
|
|
||||||
return nullptr;
|
|
||||||
} else {
|
|
||||||
Server* s = randServer(h, mSentinels);
|
|
||||||
logDebug("sentinel server pool get server %s for sentinel command",
|
|
||||||
s->addr().data());
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case Command::Randomkey:
|
|
||||||
return randServer(h, mServPool);
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (mGroupPool.size() == 1) {
|
|
||||||
return mGroupPool[0]->getServer(h, req);
|
|
||||||
} else if (mGroupPool.size() > 1) {
|
|
||||||
switch (mDist) {
|
|
||||||
case Distribution::Modula:
|
|
||||||
{
|
|
||||||
long idx = mHash.hash(key.data(), key.length(), mHashTag);
|
|
||||||
idx %= mGroupPool.size();
|
|
||||||
return mGroupPool[idx]->getServer(h, req);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case Distribution::Random:
|
|
||||||
{
|
|
||||||
int idx = h->rand() % mGroupPool.size();
|
|
||||||
return mGroupPool[idx]->getServer(h, req);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
void StandaloneServerPool::refreshRequest(Handler* h)
|
|
||||||
{
|
|
||||||
logDebug("h %d update standalone server pool", h->id());
|
|
||||||
switch (mRefreshMethod.value()) {
|
|
||||||
case ServerPoolRefreshMethod::Sentinel:
|
|
||||||
for (auto g : mGroupPool) {
|
|
||||||
RequestPtr req = RequestAlloc::create();
|
|
||||||
req->setSentinels(g->name());
|
|
||||||
req->setData(g);
|
|
||||||
h->handleRequest(req);
|
|
||||||
req = RequestAlloc::create();
|
|
||||||
req->setSentinelGetMaster(g->name());
|
|
||||||
req->setData(g);
|
|
||||||
h->handleRequest(req);
|
|
||||||
req = RequestAlloc::create();
|
|
||||||
req->setSentinelSlaves(g->name());
|
|
||||||
req->setData(g);
|
|
||||||
h->handleRequest(req);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void StandaloneServerPool::handleResponse(Handler* h, ConnectConnection* s, Request* req, Response* res)
|
|
||||||
{
|
|
||||||
switch (req->type()) {
|
|
||||||
case Command::SentinelSentinels:
|
|
||||||
handleSentinels(h, s, req, res);
|
|
||||||
break;
|
|
||||||
case Command::SentinelGetMaster:
|
|
||||||
handleGetMaster(h, s, req, res);
|
|
||||||
break;
|
|
||||||
case Command::SentinelSlaves:
|
|
||||||
handleSlaves(h, s, req, res);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class AddrParser
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
enum Status {
|
|
||||||
Ok,
|
|
||||||
Error,
|
|
||||||
Done
|
|
||||||
};
|
|
||||||
public:
|
|
||||||
AddrParser(const Segment& res):
|
|
||||||
mRes(res),
|
|
||||||
mState(Idle),
|
|
||||||
mCnt(0),
|
|
||||||
mArgLen(0),
|
|
||||||
mIp(false),
|
|
||||||
mPort(false)
|
|
||||||
{
|
|
||||||
mRes.rewind();
|
|
||||||
}
|
|
||||||
int count() const {return mCnt;}
|
|
||||||
Status parse(SString<Const::MaxAddrLen>& addr);
|
|
||||||
private:
|
|
||||||
enum State {
|
|
||||||
Idle,
|
|
||||||
Count,
|
|
||||||
CountLF,
|
|
||||||
Arg,
|
|
||||||
ArgLen,
|
|
||||||
ArgLenLF,
|
|
||||||
SubArrayLen,
|
|
||||||
Body,
|
|
||||||
BodyLF,
|
|
||||||
Invalid,
|
|
||||||
Finished
|
|
||||||
};
|
|
||||||
private:
|
|
||||||
Segment mRes;
|
|
||||||
State mState;
|
|
||||||
int mCnt;
|
|
||||||
int mArgLen;
|
|
||||||
bool mIp;
|
|
||||||
bool mPort;
|
|
||||||
SString<4> mKey;
|
|
||||||
};
|
|
||||||
|
|
||||||
AddrParser::Status AddrParser::parse(SString<Const::MaxAddrLen>& addr)
|
|
||||||
{
|
|
||||||
const char* dat;
|
|
||||||
int len;
|
|
||||||
addr.clear();
|
|
||||||
while (mRes.get(dat, len) && mState != Invalid) {
|
|
||||||
for (int i = 0; i < len && mState != Invalid; ++i) {
|
|
||||||
char ch = dat[i];
|
|
||||||
switch (mState) {
|
|
||||||
case Idle:
|
|
||||||
mState = ch == '*' ? Count : Invalid;
|
|
||||||
break;
|
|
||||||
case Count:
|
|
||||||
if (ch >= '0' && ch <= '9') {
|
|
||||||
mCnt = mCnt * 10 + (ch - '0');
|
|
||||||
} else if (ch == '\r') {
|
|
||||||
if (mCnt == 0) {
|
|
||||||
mState = Finished;
|
|
||||||
return Done;
|
|
||||||
} else if (mCnt < 0) {
|
|
||||||
mState = Invalid;
|
|
||||||
return Error;
|
|
||||||
}
|
|
||||||
mState = CountLF;
|
|
||||||
} else {
|
|
||||||
mState = Invalid;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case CountLF:
|
|
||||||
mState = ch == '\n' ? Arg : Invalid;
|
|
||||||
break;
|
|
||||||
case Arg:
|
|
||||||
if (ch == '$') {
|
|
||||||
mState = ArgLen;
|
|
||||||
mArgLen = 0;
|
|
||||||
} else if (ch == '*') {
|
|
||||||
mState = SubArrayLen;
|
|
||||||
} else {
|
|
||||||
mState = Invalid;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case ArgLen:
|
|
||||||
if (ch >= '0' && ch <= '9') {
|
|
||||||
mArgLen = mArgLen * 10 + (ch - '0');
|
|
||||||
} else if (ch == '\r') {
|
|
||||||
mState = ArgLenLF;
|
|
||||||
} else {
|
|
||||||
mState = Invalid;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case ArgLenLF:
|
|
||||||
mState = ch == '\n' ? Body : Invalid;
|
|
||||||
break;
|
|
||||||
case SubArrayLen:
|
|
||||||
if (ch == '\n') {
|
|
||||||
mState = Arg;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case Body:
|
|
||||||
if (ch == '\r') {
|
|
||||||
mState = BodyLF;
|
|
||||||
if (mPort) {
|
|
||||||
mPort = false;
|
|
||||||
mRes.use(i + 1);
|
|
||||||
return Ok;
|
|
||||||
} else if (mIp) {
|
|
||||||
mIp = false;
|
|
||||||
addr.append(':');
|
|
||||||
} else if (mArgLen == 2 && strcmp(mKey.data(), "ip") == 0) {
|
|
||||||
mIp = true;
|
|
||||||
} else if (mArgLen == 4 && strcmp(mKey.data(), "port") == 0) {
|
|
||||||
mPort = true;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (mIp || mPort) {
|
|
||||||
addr.append(ch);
|
|
||||||
} else if (mArgLen == 2 || mArgLen == 4) {
|
|
||||||
mKey.append(ch);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case BodyLF:
|
|
||||||
mKey.clear();
|
|
||||||
mState = ch == '\n' ? Arg : Invalid;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
mRes.use(len);
|
|
||||||
}
|
|
||||||
return mState != Invalid ? Done : Error;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool hasValidPort(const String& addr)
|
|
||||||
{
|
|
||||||
const char* p = addr.data() + addr.length();
|
|
||||||
for (int i = 0; i < addr.length(); ++i) {
|
|
||||||
if (*(--p) == ':') {
|
|
||||||
int port = atoi(p + 1);
|
|
||||||
return port > 0 && port < 65536;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
void StandaloneServerPool::handleSentinels(Handler* h, ConnectConnection* s, Request* req, Response* res)
|
|
||||||
{
|
|
||||||
if (!res || !res->isArray()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
AddrParser parser(res->body());
|
|
||||||
SString<Const::MaxAddrLen> addr;
|
|
||||||
while (true) {
|
|
||||||
auto st = parser.parse(addr);
|
|
||||||
if (st == AddrParser::Ok) {
|
|
||||||
logDebug("sentinel server pool parse sentinel %s", addr.data());
|
|
||||||
if (!hasValidPort(addr)) {
|
|
||||||
logNotice("sentinel server pool parse sentienl %s invalid",
|
|
||||||
addr.data());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
auto it = mServs.find(addr);
|
|
||||||
Server* serv = it == mServs.end() ? nullptr : it->second;
|
|
||||||
if (!serv) {
|
|
||||||
if (mSentinels.size() == mSentinels.capacity()) {
|
|
||||||
logWarn("too many sentinels %d, will ignore new sentinel %s",
|
|
||||||
(int)mSentinels.size(), addr.data());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
serv = new Server(this, addr, false);
|
|
||||||
serv->setRole(Server::Sentinel);
|
|
||||||
serv->setPassword(password());
|
|
||||||
mSentinels.push_back(serv);
|
|
||||||
mServs[serv->addr()] = serv;
|
|
||||||
logNotice("h %d create new sentinel %s",
|
|
||||||
h->id(), addr.data());
|
|
||||||
}
|
|
||||||
serv->setOnline(true);
|
|
||||||
} else if (st == AddrParser::Done) {
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
logError("sentinel server pool parse sentinel sentinels error");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void StandaloneServerPool::handleGetMaster(Handler* h, ConnectConnection* s, Request* req, Response* res)
|
|
||||||
{
|
|
||||||
if (!res || !res->isArray()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
ServerGroup* g = (ServerGroup*)req->data();
|
|
||||||
if (!g) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SegmentStr<Const::MaxAddrLen + 32> str(res->body());
|
|
||||||
if (!str.complete()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (strncmp(str.data(), "*2\r\n$", 5) != 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SString<Const::MaxAddrLen> addr;
|
|
||||||
const char* p = str.data() + 5;
|
|
||||||
int len = atoi(p);
|
|
||||||
if (len <= 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
p = strchr(p, '\r') + 2;
|
|
||||||
if (!addr.append(p, len)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (!addr.append(':')) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
p += len + 3;
|
|
||||||
len = atoi(p);
|
|
||||||
if (len <= 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
p = strchr(p, '\r') + 2;
|
|
||||||
if (!addr.append(p, len)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
logDebug("sentinel server pool group %s get master %s",
|
|
||||||
g->name().data(), addr.data());
|
|
||||||
auto it = mServs.find(addr);
|
|
||||||
Server* serv = it == mServs.end() ? nullptr : it->second;
|
|
||||||
if (serv) {
|
|
||||||
serv->setOnline(true);
|
|
||||||
serv->setRole(Server::Master);
|
|
||||||
auto old = serv->group();
|
|
||||||
if (old) {
|
|
||||||
if (old != g) {
|
|
||||||
old->remove(serv);
|
|
||||||
g->add(serv);
|
|
||||||
serv->setGroup(g);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
g->add(serv);
|
|
||||||
serv->setGroup(g);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (mServPool.size() == mServPool.capacity()) {
|
|
||||||
logWarn("too many servers %d, will ignore new master server %s",
|
|
||||||
(int)mServPool.size(), addr.data());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
serv = new Server(this, addr, false);
|
|
||||||
serv->setRole(Server::Master);
|
|
||||||
serv->setPassword(password());
|
|
||||||
mServPool.push_back(serv);
|
|
||||||
g->add(serv);
|
|
||||||
serv->setGroup(g);
|
|
||||||
mServs[serv->addr()] = serv;
|
|
||||||
logNotice("sentinel server pool group %s create master server %s %s",
|
|
||||||
g->name().data(), addr.data(), serv->dcName().data());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void StandaloneServerPool::handleSlaves(Handler* h, ConnectConnection* s, Request* req, Response* res)
|
|
||||||
{
|
|
||||||
if (!res || !res->isArray()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
ServerGroup* g = (ServerGroup*)req->data();
|
|
||||||
if (!g) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
AddrParser parser(res->body());
|
|
||||||
SString<Const::MaxAddrLen> addr;
|
|
||||||
while (true) {
|
|
||||||
auto st = parser.parse(addr);
|
|
||||||
if (st == AddrParser::Ok) {
|
|
||||||
logDebug("sentinel server pool group %s parse slave %s",
|
|
||||||
g->name().data(), addr.data());
|
|
||||||
auto it = mServs.find(addr);
|
|
||||||
Server* serv = it == mServs.end() ? nullptr : it->second;
|
|
||||||
if (serv) {
|
|
||||||
serv->setOnline(true);
|
|
||||||
serv->setRole(Server::Slave);
|
|
||||||
auto old = serv->group();
|
|
||||||
if (old) {
|
|
||||||
if (old != g) {
|
|
||||||
old->remove(serv);
|
|
||||||
g->add(serv);
|
|
||||||
serv->setGroup(g);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
g->add(serv);
|
|
||||||
serv->setGroup(g);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (mServPool.size() == mServPool.capacity()) {
|
|
||||||
logWarn("too many servers %d, will ignore new slave server %s",
|
|
||||||
(int)mServPool.size(), addr.data());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
serv = new Server(this, addr, false);
|
|
||||||
serv->setRole(Server::Slave);
|
|
||||||
serv->setPassword(password());
|
|
||||||
mServPool.push_back(serv);
|
|
||||||
g->add(serv);
|
|
||||||
serv->setGroup(g);
|
|
||||||
mServs[serv->addr()] = serv;
|
|
||||||
logNotice("sentinel server pool group %s create slave server %s %s",
|
|
||||||
g->name().data(), addr.data(), serv->dcName().data());
|
|
||||||
}
|
|
||||||
} else if (st == AddrParser::Done) {
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
logError("sentinel server pool group %s parse sentinel sentinels error",
|
|
||||||
g->name().data());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,43 +0,0 @@
|
|||||||
/*
|
|
||||||
* predixy - A high performance and full features proxy for redis.
|
|
||||||
* Copyright (C) 2017 Joyield, Inc. <joyield.com@gmail.com>
|
|
||||||
* All rights reserved.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _PREDIXY_STANDALONE_SERVER_POOL_H_
|
|
||||||
#define _PREDIXY_STANDALONE_SERVER_POOL_H_
|
|
||||||
|
|
||||||
#include <map>
|
|
||||||
#include "Predixy.h"
|
|
||||||
#include "ServerPool.h"
|
|
||||||
|
|
||||||
class StandaloneServerPool : public ServerPoolTmpl<StandaloneServerPool>
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
static const int MaxSentinelNum = 64;
|
|
||||||
public:
|
|
||||||
StandaloneServerPool(Proxy* p);
|
|
||||||
~StandaloneServerPool();
|
|
||||||
void init(const StandaloneServerPoolConf& conf);
|
|
||||||
Server* getServer(Handler* h, Request* req, const String& key) const;
|
|
||||||
Server* iter(int& cursor) const
|
|
||||||
{
|
|
||||||
return ServerPool::iter(mServPool, cursor);
|
|
||||||
}
|
|
||||||
void refreshRequest(Handler* h);
|
|
||||||
void handleResponse(Handler* h, ConnectConnection* s, Request* req, Response* res);
|
|
||||||
private:
|
|
||||||
void handleSentinels(Handler* h, ConnectConnection* s, Request* req, Response* res);
|
|
||||||
void handleGetMaster(Handler* h, ConnectConnection* s, Request* req, Response* res);
|
|
||||||
void handleSlaves(Handler* h, ConnectConnection* s, Request* req, Response* res);
|
|
||||||
friend class ServerPoolTmpl<StandaloneServerPool>;
|
|
||||||
private:
|
|
||||||
ServerPoolRefreshMethod mRefreshMethod;
|
|
||||||
std::vector<Server*> mSentinels;
|
|
||||||
std::vector<Server*> mServPool;
|
|
||||||
Distribution mDist;
|
|
||||||
Hash mHash;
|
|
||||||
char mHashTag[2];
|
|
||||||
};
|
|
||||||
|
|
||||||
#endif
|
|
||||||
@ -391,7 +391,7 @@ Cases = [
|
|||||||
[('scard', '{k}2'), 3],
|
[('scard', '{k}2'), 3],
|
||||||
]),
|
]),
|
||||||
('zset', [
|
('zset', [
|
||||||
[('del', 'k', '{k}2', '{k}3', '{k}4', '{k}5', '{k}6'), ],
|
[('del', 'k', '{k}2', '{k}3', '{k}4'), ],
|
||||||
[('zadd', 'k', 10, 'apple'), 1],
|
[('zadd', 'k', 10, 'apple'), 1],
|
||||||
[('zcard', 'k'), 1],
|
[('zcard', 'k'), 1],
|
||||||
[('zincrby', 'k', 2, 'apple'), '12'],
|
[('zincrby', 'k', 2, 'apple'), '12'],
|
||||||
@ -438,12 +438,6 @@ Cases = [
|
|||||||
[('zunionstore', '{k}3', 2, 'k', '{k}2'), 4],
|
[('zunionstore', '{k}3', 2, 'k', '{k}2'), 4],
|
||||||
[('zunionstore', '{k}3', 2, 'k', '{k}2', 'AGGREGATE', 'MAX'), 4],
|
[('zunionstore', '{k}3', 2, 'k', '{k}2', 'AGGREGATE', 'MAX'), 4],
|
||||||
[('zunionstore', '{k}3', 2, 'k', '{k}2', 'WEIGHTS', 0.5, 1.2, 'AGGREGATE', 'MAX'), 4],
|
[('zunionstore', '{k}3', 2, 'k', '{k}2', 'WEIGHTS', 0.5, 1.2, 'AGGREGATE', 'MAX'), 4],
|
||||||
[('zadd', '{k}5', 0, 'apple', 9, 'banana', 1, 'pear', 3, 'orange', 4, 'cat'), 5],
|
|
||||||
[('zpopmax', '{k}5'), ['banana', '9']],
|
|
||||||
[('zpopmax', '{k}5', 3), ['cat', '4', 'orange', '3', 'pear', '1']],
|
|
||||||
[('zadd', '{k}6', 0, 'apple', 9, 'banana', 1, 'pear', 3, 'orange', 4, 'cat'), 5],
|
|
||||||
[('zpopmin', '{k}6'), ['apple', '0']],
|
|
||||||
[('zpopmin', '{k}6', 3), ['pear', '1', 'orange', '3', 'cat', '4']],
|
|
||||||
]),
|
]),
|
||||||
('hyperloglog', [
|
('hyperloglog', [
|
||||||
[('del', 'k', '{k}2', '{k}3'), ],
|
[('del', 'k', '{k}2', '{k}3'), ],
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user