Compare commits

...

46 Commits

Author SHA1 Message Date
fortrue
ca1630a6b4
Merge pull request #185 from joyieldInc/fix/issue124
avoid to use leader's connection for follow request
2024-01-31 11:40:02 +08:00
fortrue
9b85bd0e38 avoid to use leader's connection for follow request 2024-01-25 22:28:45 +08:00
fortrue
c15f54e274
Merge pull request #140 from shuaiming/patch-1
Update config_CN.md
2021-10-01 18:39:58 +08:00
shuaiming
fbb5a8acb3
Update config_CN.md
Fix typo
2021-09-17 18:14:11 +08:00
fortrue
393ff81e69
Merge pull request #101 from joyieldInc/FixPrivateConnLeak
fix private connection leak
2020-02-01 15:24:14 +08:00
fortrue
ace6ed2941 fix private connection leak 2019-06-15 09:03:54 +08:00
fortrue
dacf3fb30c
Merge pull request #73 from leenr/master
Add support for `zpopmax` and `zpopmin` redis commands (server 5.0.0+)
2019-02-09 08:45:57 +08:00
Vladimir Solomatin
ebf7bd2d82
Add support for zpopmax and zpopmin redis commands (server 5.0.0+) 2019-02-07 20:18:08 +03:00
fortrue
ca9cde0487
Merge pull request #71 from crierr/variadic-hset
Fix HSET to support multiple fields
2019-01-26 14:17:38 +08:00
SeungJin Oh
df3aa1aa92 Fix HSET to support multiple fields
- From Redis 4.0.0 HSET is recommended for multiple fields and HMSET is deprecated
2019-01-25 16:58:06 +09:00
fortrue
4d9166443b release 1.0.5 2018-09-22 19:52:50 +08:00
fortrue
57de3c2a03
Merge pull request #48 from joyieldInc/StandaloneServerPool
support redis standalone backend
2018-09-22 12:17:25 +08:00
fortrue
63596e951d fix Command Mode mistake 2018-08-19 17:21:17 +08:00
fortrue
fb1ac64251 support redis standalone backend 2018-07-08 17:28:25 +08:00
fortrue
0635178c2e
Merge pull request #42 from mosquito/master
[fix] ipv6 address parsing
2018-07-05 08:15:04 -05:00
Dmitry Orlov
a2a5d78fbd [fix] ipv6 address parsing 2018-07-05 13:01:06 +03:00
fortrue
36152f570d
Merge pull request #40 from joyieldInc/issue32
Issue32
2018-07-04 23:02:37 -05:00
fortrue
20bfcb657e fix async assign client race condition, #32 #33 #35 #36 2018-07-05 11:54:12 +08:00
fortrue
34cc6d151e fix for script load command follow policy 2018-06-12 16:29:12 +08:00
fortrue
7fda4b77c1 1.fix multi-keys request leader self reference
2.adjust alloc implement
2018-05-16 09:56:43 +08:00
fortrue
bbbe798629 fix crash because server without DC, but config defined DC, see #32 2018-04-29 19:34:32 +08:00
fortrue
28edb79c61 1.fix compile error in MAC OS X for TCP_KEEPIDLE
2.change the version to 1.0.5-pre
2018-04-23 14:00:27 +08:00
fortrue
a856f2607f fix string format output bug 2018-04-23 11:45:46 +08:00
fortrue
4b8f578a2f
Merge pull request #28 from yoonian/master
Add support for custom commands
2018-04-16 15:24:01 +08:00
Yoon
bf635bbfab Remove type in Conf 2018-04-13 17:22:14 +09:00
Yoon
a13bb36159 Add a fix(by @fortrue) 2018-04-13 15:44:49 +09:00
Yoon
a559ac0d4f Support multiple Latency Monitors
- Fix a dangling pointer bug
2018-04-13 14:12:11 +09:00
Yoon
9d1d1af5bf Remove config order restriction
- Delay setLatencyMonitor to load custom commands config first
- Fix custom command container bug
2018-04-12 15:05:02 +09:00
Yoon
dcdc9a1ba0 Change config description style 2018-04-12 11:59:15 +09:00
Yoon
95f4580ccf Add custom command config checking
- add config sample(including Read mode)
- apply @fortue's suggestion to check exclusive mode and key postion
2018-04-10 17:07:47 +09:00
Yoon
aa013991f7 Add custom command exclusive mode checking 2018-04-10 10:32:59 +09:00
Yoonian
75228f3272 Add support for custom commands
- Support maximum 16 commands
- Add a sample configuration(conf/command.conf)
2018-04-09 21:28:16 +09:00
fortrue
ce210cdee3 1.modify default ServerTimeout and KeepAlive to 0
2.improve doc
2018-03-28 20:27:04 +08:00
fortrue
a14f11cec9 release 1.0.4 2018-03-26 15:16:44 +08:00
fortrue
0891036e8c 1.wait for response when connecting to redis
2.remove unused code
3.fix sentinel.conf typing
2018-03-26 14:59:20 +08:00
fortrue
4eda710da4 add wechat contact 2018-03-26 10:12:24 +08:00
fortrue
9afc183a1b
Merge pull request #24 from markus9203902/patch-1
Description for new confing available from commit fixing issue 21
2018-03-20 08:27:04 +08:00
markus9203902
8be48e16e5
Upd wiki page to comply with changes from 86183a4
Added new commands description, KeepAlive and ServerTimeout.
2018-03-19 17:08:15 +03:00
fortrue
ec36627f91 Issue #23
change signal SIGHUP action from abort process to ignore
log signal value when process aborted by signal
2018-03-16 12:28:07 +08:00
fortrue
37b3a07b9f fix config document addr 2018-02-28 19:25:32 +08:00
fortrue
86183a4a97 support redis server connection keepalive and timeout
fix Issue #21
2018-02-23 16:47:03 +08:00
fortrue
1644a9bf1b
fix table typing 2018-01-25 17:02:07 +08:00
fortrue
55cc0dccfd add benchmark with redis 2018-01-25 16:57:18 +08:00
fortrue
b9a468dc97 fix backtrace bug 2018-01-19 16:43:28 +08:00
fortrue
3a5239dd18
Merge pull request #17 from markus9203902/master
Added description of cluster.conf parameters usage
2018-01-14 08:54:12 +08:00
Markus Snow
7ad5f0f794 Added description of cluster.conf parameters usage 2018-01-13 18:52:48 +03:00
59 changed files with 1436 additions and 167 deletions

View File

@ -131,3 +131,5 @@ Copyright (C) 2017 Joyield, Inc. <joyield.com#gmail.com>
All rights reserved. All rights reserved.
License under BSD 3-clause "New" or "Revised" License License under BSD 3-clause "New" or "Revised" License
WeChat:cppfan ![wechat](https://github.com/joyieldInc/predixy/blob/master/doc/wechat-cppfan.jpeg)

View File

@ -138,3 +138,4 @@ All rights reserved.
License under BSD 3-clause "New" or "Revised" License License under BSD 3-clause "New" or "Revised" License
微信:cppfan ![微信](https://github.com/joyieldInc/predixy/blob/master/doc/wechat-cppfan.jpeg)

View File

@ -5,9 +5,12 @@
## [MasterReadPriority [0-100]] #default 50 ## [MasterReadPriority [0-100]] #default 50
## [StaticSlaveReadPriority [0-100]] #default 0 ## [StaticSlaveReadPriority [0-100]] #default 0
## [DynamicSlaveReadPriority [0-100]] #default 0 ## [DynamicSlaveReadPriority [0-100]] #default 0
## [RefreshInterval seconds] #default 1 ## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
## [ServerTimeout number[s|ms|us]] #default 0, server connection socket read/write timeout
## [ServerFailureLimit number] #default 10 ## [ServerFailureLimit number] #default 10
## [ServerRetryTimeout seconds] #default 1 ## [ServerRetryTimeout number[s|ms|us]] #default 1
## [KeepAlive seconds] #default 0, server connection tcp keepalive
## Servers { ## Servers {
## + addr ## + addr
## ... ## ...
@ -21,8 +24,10 @@
# StaticSlaveReadPriority 50 # StaticSlaveReadPriority 50
# DynamicSlaveReadPriority 50 # DynamicSlaveReadPriority 50
# RefreshInterval 1 # RefreshInterval 1
# ServerTimeout 1
# ServerFailureLimit 10 # ServerFailureLimit 10
# ServerRetryTimeout 1 # ServerRetryTimeout 1
# KeepAlive 120
# Servers { # Servers {
# + 192.168.2.107:2211 # + 192.168.2.107:2211
# + 192.168.2.107:2212 # + 192.168.2.107:2212

94
conf/command.conf Normal file
View File

@ -0,0 +1,94 @@
## Custom Command
## CustomCommand {
## command { #command string, must be lowercase
## [Mode read|write|admin[|[keyAt2|keyAt3]] #default write, default key position is 1
## [MinArgs [2-]] #default 2, including command itself
## [MaxArgs [2-]] #default 2, must be MaxArgs >= MinArgs
## }...
## }
## Currently support maximum 16 custom commands
## Example:
#CustomCommand {
##------------------------------------------------------------------------
# custom.ttl {
# Mode keyAt2
# MinArgs 3
# MaxArgs 3
# }
#### custom.ttl miliseconds key
#### Mode = write|keyAt2, MinArgs/MaxArgs = 3 = command + miliseconds + key
##------------------------------------------------------------------------
## from redis source src/modules/hello.c
# hello.push.native {
# MinArgs 3
# MaxArgs 3
# }
#### hello.push.native key value
#### Mode = write, MinArgs/MaxArgs = 3 = command) + key + value
##------------------------------------------------------------------------
# hello.repl2 {
# }
#### hello.repl2 <list-key>
#### Mode = write, MinArgs/MaxArgs = 2 = command + list-key
##------------------------------------------------------------------------
# hello.toggle.case {
# }
#### hello.toggle.case key
#### Mode = write, MinArgs/MaxArgs = 2 = command + key
##------------------------------------------------------------------------
# hello.more.expire {
# MinArgs 3
# MaxArgs 3
# }
#### hello.more.expire key milliseconds
#### Mode = write, MinArgs/MaxArgs = 3 = command + key + milliseconds
##------------------------------------------------------------------------
# hello.zsumrange {
# MinArgs 4
# MaxArgs 4
# Mode read
# }
#### hello.zsumrange key startscore endscore
#### Mode = read, MinArgs/MaxArgs = 4 = command + key + startscore + endscore
##------------------------------------------------------------------------
# hello.lexrange {
# MinArgs 6
# MaxArgs 6
# Mode read
# }
#### hello.lexrange key min_lex max_lex min_age max_age
#### Mode = read, MinArgs/MaxArgs = 6 = command + key + min_lex + max_lex + min_age + max_age
##------------------------------------------------------------------------
# hello.hcopy {
# MinArgs 4
# MaxArgs 4
# }
#### hello.hcopy key srcfield dstfield
#### Mode = write, MinArgs/MaxArgs = 4 = command + key + srcfield) + dstfield
##------------------------------------------------------------------------
## from redis source src/modules/hellotype.c
# hellotype.insert {
# MinArgs 3
# MaxArgs 3
# }
#### hellotype.insert key value
#### Mode = write, MinArgs/MaxArgs = 3 = command + key + value
##------------------------------------------------------------------------
# hellotype.range {
# MinArgs 4
# MaxArgs 4
# Mode read
# }
#### hellotype.range key first count
#### Mode = read, MinArgs/MaxArgs = 4 = command + key + first + count
##------------------------------------------------------------------------
# hellotype.len {
# Mode read
# }
#### hellotype.len key
#### Mode = read, MinArgs/MaxArgs = 2 = command + key
##------------------------------------------------------------------------
#}

View File

@ -93,6 +93,10 @@ Include try.conf
# Include dc.conf # Include dc.conf
################################### COMMAND ####################################
## Custom command define, see command.conf
#Include command.conf
################################### LATENCY #################################### ################################### LATENCY ####################################
## Latency monitor define, see latency.conf ## Latency monitor define, see latency.conf
Include latency.conf Include latency.conf

View File

@ -9,9 +9,11 @@
## [MasterReadPriority [0-100]] #default 50 ## [MasterReadPriority [0-100]] #default 50
## [StaticSlaveReadPriority [0-100]] #default 0 ## [StaticSlaveReadPriority [0-100]] #default 0
## [DynamicSlaveReadPriority [0-100]] #default 0 ## [DynamicSlaveReadPriority [0-100]] #default 0
## [RefreshInterval seconds] #default 1 ## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
## [ServerTimeout number[s|ms|us]] #default 0, server connection socket read/write timeout
## [ServerFailureLimit number] #default 10 ## [ServerFailureLimit number] #default 10
## [ServerRetryTimeout seconds] #default 1 ## [ServerRetryTimeout number[s|ms|us]] #default 1
## [KeepAlive seconds] #default 0, server connection tcp keepalive
## Sentinels { ## Sentinels {
## + addr ## + addr
## ... ## ...
@ -33,12 +35,14 @@
# StaticSlaveReadPriority 50 # StaticSlaveReadPriority 50
# DynamicSlaveReadPriority 50 # DynamicSlaveReadPriority 50
# RefreshInterval 1 # RefreshInterval 1
# ServerTimeout 1
# ServerFailureLimit 10 # ServerFailureLimit 10
# ServerRetryTimeout 1 # ServerRetryTimeout 1
# KeepAlive 120
# Sentinels { # Sentinels {
# + 10.2.2.2 # + 10.2.2.2:7500
# + 10.2.2.3 # + 10.2.2.3:7500
# + 10.2.2.4 # + 10.2.2.4:7500
# } # }
# Group shard001 { # Group shard001 {
# } # }

71
conf/standalone.conf Normal file
View File

@ -0,0 +1,71 @@
## redis standalone server pool define
##StandaloneServerPool {
## [Password xxx] #default no
## [Databases number] #default 1
## Hash atol|crc16
## [HashTag "xx"] #default no
## Distribution modula|random
## [MasterReadPriority [0-100]] #default 50
## [StaticSlaveReadPriority [0-100]] #default 0
## [DynamicSlaveReadPriority [0-100]] #default 0
## RefreshMethod fixed|sentinel #
## [RefreshInterval number[s|ms|us]] #default 1, means 1 second
## [ServerTimeout number[s|ms|us]] #default 0, server connection socket read/write timeout
## [ServerFailureLimit number] #default 10
## [ServerRetryTimeout number[s|ms|us]] #default 1
## [KeepAlive seconds] #default 0, server connection tcp keepalive
## Sentinels [sentinel-password] {
## + addr
## ...
## }
## Group xxx {
## [+ addr] #if RefreshMethod==fixed: the first addr is master in a group, then all addrs is slaves in this group
## ...
## }
##}
## Examples:
#StandaloneServerPool {
# Databases 16
# Hash crc16
# HashTag "{}"
# Distribution modula
# MasterReadPriority 60
# StaticSlaveReadPriority 50
# DynamicSlaveReadPriority 50
# RefreshMethod sentinel
# RefreshInterval 1
# ServerTimeout 1
# ServerFailureLimit 10
# ServerRetryTimeout 1
# KeepAlive 120
# Sentinels {
# + 10.2.2.2:7500
# + 10.2.2.3:7500
# + 10.2.2.4:7500
# }
# Group shard001 {
# }
# Group shard002 {
# }
#}
#StandaloneServerPool {
# Databases 16
# Hash crc16
# HashTag "{}"
# Distribution modula
# MasterReadPriority 60
# StaticSlaveReadPriority 50
# DynamicSlaveReadPriority 50
# RefreshMethod fixed
# ServerTimeout 1
# ServerFailureLimit 10
# ServerRetryTimeout 1
# KeepAlive 120
# Group shard001 {
# + 10.2.3.2:6379
# }
#}

View File

@ -0,0 +1,52 @@
## Configuration example
cluster.conf
ClusterServerPool {
MasterReadPriority 60
StaticSlaveReadPriority 50
DynamicSlaveReadPriority 50
RefreshInterval 300ms
ServerTimeout 300ms #added in commit 86183a4
ServerFailureLimit 2
ServerRetryTimeout 500ms
KeepAlive 120s #added in commit 86183a4
Servers {
+ 192.168.2.107:2211
+ 192.168.2.108:2212
+ 192.168.2.109:2213
}
}
## Configuration parameters description
**MasterReadPriority, StaticSlaveReadPriority** and **DynamicSlaveReadPriority** - these parameters work in conjunction only in predixy and have _nothing_ with Redis configuration directives like slave-priority. As predixy can work both with Redis Sentinel or Redis Cluster deployments, two options can be used: MasterReadPriority and StaticSlaveReadPriority, or MasterReadPriority and DynamicSlaveReadPriority.
If your Redis deployment implies nodes auto-discovery with Sentinel or other cluster nodes, the DynamicSlaveReadPriority option will be used; if you plan to add nodes in predixy config to **Servers {...}** manually, StaticSlaveReadPriority will be used.
In other words, predixy can discover automatically added Redis-related nodes polling existing **Servers {...}** and also route queries to them, eliminating the need of manually editing of cluster.conf and restarting the predixy.
These three parameters tell predixy in what way and proportion it should route queries to available nodes.
Some use cases you can see below:
| Master/SlaveReadPriority | Master | Slave1 | Slave2 | Fail-over notes |
| ------------- | ------------- | ------------- | ------------- | ------------- |
| 60/50 | all requests | 0 requests | 0 requests | Master dead, read requests deliver to slave until master(maybe new master) alive |
| 60/0 | all requests | 0 requests | 0 requests | Master dead, all requests fail |
| 50/50 | all write requests, 33.33%read requests | 33.33% read requests | 33.33% read requests | - |
| 0/50 | all write requests, 0 read requests | 50% read requests | 50% read requests | all slaves dead, all read requests fail |
| 10/50 | all write requests, 0 read requests | 50% read requests | 50% read requests | all slaves dead, read requests deliver to master |
**RefreshInterval** - seconds, milliseconds or microseconds [s|ms|us], tells predixy how often it should poll nodes info and allocated hash slots in case of Cluster usage
**ServerFailureLimit** - amount of failed queries to node when predixy stop forwarding queries to this node
**ServerTimeout** - seconds, milliseconds or microseconds [s|ms|us], is timeout for nearly all commands except BRPOP, BLPOP, BRPOPLUSH, transactions and PUB/SUB. It's Redis server connection socket read/write timeout, to avoid wait for a Redis response too long. When timeout reached, Predixy will close connection between itself and Redis and throw an error.
**Available from the commit 86183a4.**
**KeepAlive** - seconds, applied to redis connection(socket), it works for all commands that not covered with the **ServerTimeout**, i.e. applied also to commands with blocking nature like BRPOP, BLPOP, BRPOPLPUSH. hen timeout reached, Predixy will close connection between itself and Redis and throw an error.
**Available from the commit 86183a4.**
**ServerRetryTimeout** - seconds, milliseconds or microseconds [s|ms|us], tells predixy how often it should try to check health of failed nodes and decide if they still failed or alive and can be used for queries processing
**Servers** - just line-by-line list of static Redis nodes in socket fashion, like `+ IP:PORT`

View File

@ -0,0 +1,101 @@
从我们的直观感受来讲,对于任何服务,只要在中间增加了一层,肯定会对服务性能造成影响。那么到底会影响什么呢?在考察一个服务性能的时候,有两个最重要的指标,那就是吞吐和延迟。吞吐定义为服务端单位时间内能处理的请求数,延迟定义为客户端从发出请求到收到请求的耗时。中间环节的引入我们首先想到的就是那会增加处理时间,这就会增加服务的延迟,于是顺便我们也会认为吞吐也会下降。从单个用户的角度来讲,事实确实如此,我完成一个请求的时间增加了,那么我单位时间内所能完成的请求量必定就减少了。然而站在服务端的角度来看,虽然单个请求的处理时间增加了,但是总的吞吐就一定会减少吗?
接下来我们就来对redis来进行一系列的测试利用redis自带的redis-benchmark分别对set和get命令单个发送和批量发送直连redis和连接redis代理[predixy](https://github.com/joyieldInc/predixy)。这样组合起来总共就是八种情况。redis-benchmark、redis是单线程的predixy支持多线程但是我们也只运行一个线程这三个程序都运行在一台机器上。
|项目|内容|
|---|---|
|CPU|AMD Ryzen 7 1700X Eight-Core Processor 3.775GHz|
|内存|16GB DDR4 3000
|OS|x86_64 GNU/Linux 4.10.0-42-generic #46~16.04.1-Ubuntu
|redis|版本3.2.9端口7200
|predixy|版本1.0.2端口7600
八个测试命令
|测试命令|命令行|
|--|--|
|redis set|redis-benchmark -h xxx -p 7200 -t set -r 3000 -n 40000000
|predixy set|redis-benchmark -h xxx -p 7600 -t set -r 3000 -n 40000000
|redis get|redis-benchmark -h xxx -p 7200 -t get -r 3000 -n 40000000
|predixy get|redis-benchmark -h xxx -p 7600 -t get -r 3000 -n 40000000
|redis 批量set|redis-benchmark -h xxx -p 7200 -t set -r 3000 -n 180000000 -P 20
|predixy 批量set|redis-benchmark -h xxx -p 7600 -t set -r 3000 -n 180000000 -P 20
|redis 批量get|redis-benchmark -h xxx -p 7200 -t get -r 3000 -n 420000000 -P 20
|predixy 批量get|redis-benchmark -h xxx -p 7600 -t get -r 3000 -n 220000000 -P 20
以上8条命令采取redis-benchmark默认的50个并发连接数据大小为2字节指定3000个key批量测试时一次发送20个请求。依次间隔2分钟执行以上命令每一个测试完成时间大约4分钟。最后得到下图的总体结果
![整体结果](https://github.com/joyieldInc/predixy/blob/master/doc/bench/redis/overview.png)
眼花缭乱是不是左边的纵轴表示CPU使用率右边的纵轴表示吞吐。其中redis used表示redis总的CPU使用率redis user表示redis CPU用户态使用率redis sys表示redis CPU内核态使用率其它类推。先别担心分不清里面的内容下面我们会一一标出数值来。在这图中总共可以看出有八个凸起依次对应我们上面提到的八个测试命令。
1 redis set测试
![redis_set](https://github.com/joyieldInc/predixy/blob/master/doc/bench/redis/redis_set.png)
2 predixy set测试
![predixy_set](https://github.com/joyieldInc/predixy/blob/master/doc/bench/redis/predixy_set.png)
3 redis get测试
![这里写图片描述](https://github.com/joyieldInc/predixy/blob/master/doc/bench/redis/redis_get.png)
4 predixy get测试
![这里写图片描述](https://github.com/joyieldInc/predixy/blob/master/doc/bench/redis/predixy_get.png)
5 redis 批量set测试
![这里写图片描述](https://github.com/joyieldInc/predixy/blob/master/doc/bench/redis/redis_pipeline_set.png)
6 predixy 批量set测试
![这里写图片描述](https://github.com/joyieldInc/predixy/blob/master/doc/bench/redis/predixy_pipeline_set.png)
7 redis 批量get测试
![这里写图片描述](https://github.com/joyieldInc/predixy/blob/master/doc/bench/redis/redis_pipeline_get.png)
8 predixy 批量get测试
![这里写图片描述](https://github.com/joyieldInc/predixy/blob/master/doc/bench/redis/predixy_pipeline_get.png)
图片还是不方便看,我们总结为表格:
|测试\指标|redis used|redis user|redis sys|predixy used|predixy user|predixy sys|redis qps|predixy qps|
|--|--|--|--|--|--|--|--|--|
|redis set|0.990|0.247|0.744|0|0|0|167000|3|
|predixy set|0.475|0.313|0.162|0.986|0.252|0.734|174000|174000|
|redis get|0.922|0.180|0.742|0|0|0|163000|3|
|predixy get|0.298|0.195|0.104|0.988|0.247|0.741|172000|172000|
|redis批量set|1.006|0.796|0.21|0|0|0|782000|3|
|predixy批量set|0.998|0.940|0.058|0.796|0.539|0.256|724000|724000|
|redis批量get|1|0.688|0.312|0|0|0|1708000|3|
|predixy批量get|0.596|0.582|0.014|0.999|0.637|0.362|935000|935000|
看到前四个的结果如果感到惊讶不用怀疑是自己看错了或者是测试结果有问题这个结果是无误的。根据这个结果那么可以回答我们最初提出的疑问增加了代理之后并不一定会降低服务整体的吞吐虽然benchmark并不是我们的实际应用但是redis的大部分应用场景都是这样的并发的接受众多客户端的请求处理然后返回。
为什么会是这样的结果看到这个结果后我们肯定想知道原因这好像跟我们的想象不太一样。要分析这个问题我们还是从测试的数据来入手首先看测试1的数据redis的CPU使用率几乎已经达到了1对于单线程程序来说这意味着CPU已经跑满了性能已经达到了极限不可能再提高了然而这时redis的吞吐却只有167000。测试2的redis吞吐都比它高并且我们明显能看出测试2里redis的CPU使用率还不如测试1的高测试2里redis CPU使用率只有0.475。为什么CPU使用率降低了吞吐反而却还高了呢仔细对比一下两个测试的数据可以发现在测试1里redis的CPU大部分都花在了内核态高达0.744而用户态只有0.247CPU运行在内核态时虽然我们不能称之为瞎忙活但是却无助于提升程序的性能只有CPU运行在用户态才可能提升我们的程序性能相比测试1测试2的redis用户态CPU使用率提高到了0.313而内核态CPU则大幅下降至0.162。这也就解释了为什么测试2的吞吐比测试1还要高。当然了我们还是要继续刨根问底为什么测试2里经过一层代理predixy后redis的CPU使用情况发生变化了呢这是因为redis接受一个连接批量的发送命令过来处理也就是redis里所谓的pipeline。而predixy正是利用这一特性predixy与redis之间只有一个连接大多数情况下predixy在收到客户端的请求后会将它们批量的通过这个连接发送给redis处理这样一来就大大降低了redis用于网络IO操作的开销而这一部分开销基本都是花费在内核态。
对比测试1和测试2引入predixy不仅直接提高了吞吐还带来一个好处就是redis的CPU使用率只有一半不到了这也就意味着如果我再把剩下的这一半CPU用起来还可以得到更高的吞吐而如果没有predixy这样一层的话测试1结果告诉我们redis的CPU利用率已经到头了吞吐已经不可能再提高。
测试3和测试4说明的问题与测试1和测试2一样如果我只做了这四个测试那么看起来好像代理的引入完全有助于提升我们的吞吐嘛。正如上面所分析的那样predixy提升吞吐的原因是因为采用了批量发送手段。那么如果客户端的使用场景就是批量发送命令那结果会如何呢
于是有了后面四个测试后面四个测试给我们的直接感受就是太震撼了吞吐直接提升几倍甚至10倍其实也正是因为redis批量模式下性能非常强悍才使得predixy在单命令情况下改进吞吐成为可能。当然到了批量模式从测试结果看predixy使得服务的吞吐下降了。
具体到批量set时直连redis和通过predixyredis的CPU使用率都满了虽然采用predixy使得redis的用户态CPU从0.796提高到了0.940但是吞吐却不升反降从782000到724000大约下降了7.4%。至于为什么用户态CPU利用率提高了吞吐却下降了要想知道原因就需要分析redis本身的实现这里我们就不做详细探讨。可以做一个粗糙的解释在redis CPU跑满的情况下不同的负载情况会使得用户态和内核态的使用率不同而这其中有一种分配情况会是吞吐最大而用户态使用率高于或者低于这种情况时都会出现吞吐下降的情况。
再来看批量get直连redis时吞吐高达1708000而通过predixy的话就只有935000了下降了45%就跟纳了个人所得税上限一般。看到这刚刚对predixy建立的美好形象是不是又突然觉得要坍塌了先别急再看看其它指标直连redis时redis CPU跑满而通过predixy时redis CPU只用了0.596也就是说redis还有四成的CPU等待我们去压榨。
写到这既然上面提到批量get时通过predixy的话redis并未发挥出全部功力于是就想着如果全部发挥出来会是什么情况呢我们继续增加两个测试既然单个predixy在批量的情况下造成了吞吐下降但是给我们带来了一个好处是redis还可以提升的余地那么我们就增加predixy的处理能力。因此我们把predixy改为三线程再来跑一遍测试6和测试8。
两个测试的整体结果如下。
![mt_overview](https://github.com/joyieldInc/predixy/blob/master/doc/bench/redis/mt_overview.png)
三线程predixy批量set
![mt_predixy_set](https://github.com/joyieldInc/predixy/blob/master/doc/bench/redis/mt_predixy_pipeline_set.png)
三线程predixy批量get
![mt_predixy_get](https://github.com/joyieldInc/predixy/blob/master/doc/bench/redis/mt_predixy_pipeline_get.png)
|测试\指标|redis used|redis user|redis sys|predixy used|predixy user|predixy sys|redis qps|predixy qps|
|--|--|--|--|--|--|--|--|--|
|predixy pipeline set|1.01|0.93|0.07|1.37|0.97|0.41|762000|762000|
|predixy pipeline get|0.93|0.85|0.08|2.57|1.85|0.72|1718000|1718000|
原本在单线程predixy的批量set测试中predixy和redis的CPU都已经跑满了我们觉得吞吐已经达到了极限但是实际结果显示在三线程predixy的批量set测试中吞吐还是提高了从原来的724000到现在的76200与直连的782000只有2.5%的差距。多线程和单线程的主要差别在于单线程时predixy与redis只有一个连接而三线程时有三个连接。
而对于三线程predixy的批量get测试不出我们所料的吞吐得到了极大的提升从之前的935000直接飙到1718000已经超过了直连的1708000。
最后我们来总结一下我们整个测试的场景比较简单只是单纯的set、get测试并且数据大小为默认的2字节实际的redis应用场景远比这复杂的多。但是测试结果的数据依旧可以给我们一些结论。代理的引入并不一定会降低服务的吞吐实际上根据服务的负载情况有时候引入代理反而可以提升整个服务的吞吐如果我们不计较代理本身所消耗的资源那么引入代理几乎总是一个好的选择。根据我们上面的分析一个最简单实用的判断原则看看你的redis CPU使用情况如果花费了太多时间在内核态那么考虑引入代理吧。

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 84 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 85 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 85 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 83 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 82 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 82 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 81 KiB

View File

@ -151,9 +151,9 @@ predixy扩展了redis中AUTH命令的功能支持定义多个认证密码
Authority { Authority {
Auth [password] { Auth [password] {
Mode read|write|admin Mode read|write|admin
[KeyPredix Predix...] [KeyPrefix Prefix...]
[ReadKeyPredix Predix...] [ReadKeyPrefix Prefix...]
[WriteKeyPredix Predix...] [WriteKeyPrefix Prefix...]
}... }...
} }
@ -222,9 +222,11 @@ predixy支持Redis Sentinel和Redis Cluster来使用redis一个配置里这
[MasterReadPriority [0-100]] [MasterReadPriority [0-100]]
[StaticSlaveReadPriority [0-100]] [StaticSlaveReadPriority [0-100]]
[DynamicSlaveReadPriority [0-100]] [DynamicSlaveReadPriority [0-100]]
[RefreshInterval seconds] [RefreshInterval number[s|ms|us]]
[ServerTimeout number[s|ms|us]]
[ServerFailureLimit number] [ServerFailureLimit number]
[ServerRetryTimeout seconds] [ServerRetryTimeout number[s|ms|us]]
[KeepAlive seconds]
Sentinels { Sentinels {
+ addr + addr
... ...
@ -242,12 +244,14 @@ predixy支持Redis Sentinel和Redis Cluster来使用redis一个配置里这
+ Hash: 指定对key算哈希的方法当前只支持atol和crc16 + Hash: 指定对key算哈希的方法当前只支持atol和crc16
+ HashTag: 指定哈希标签,不指定的话为{} + HashTag: 指定哈希标签,不指定的话为{}
+ Distribution: 指定分布key的方法当前只支持modula和random + Distribution: 指定分布key的方法当前只支持modula和random
+ MasterReadPriority: 读写分离功能从redis master节点执行读请求的优先级为0则禁止读redis master + MasterReadPriority: 读写分离功能从redis master节点执行读请求的优先级为0则禁止读redis master不指定的话为50
+ StaticSlaveReadPriority: 读写分离功能从静态redis slave节点执行读请求的优先级所谓静态节点是指在本配置文件中显示列出的redis节点 + StaticSlaveReadPriority: 读写分离功能从静态redis slave节点执行读请求的优先级所谓静态节点是指在本配置文件中显示列出的redis节点不指定的话为0
+ DynamicSlaveReadPolicy: 功能见上所谓动态节点是指在本配置文件中没有列出但是通过redis sentinel动态发现的节点 + DynamicSlaveReadPolicy: 功能见上所谓动态节点是指在本配置文件中没有列出但是通过redis sentinel动态发现的节点不指定的话为0
+ RefreshInterval: predixy会周期性的请求redis sentinel以获取最新的集群信息该参数以秒为单位指定刷新周期 + RefreshInterval: predixy会周期性的请求redis sentinel以获取最新的集群信息该参数以秒为单位指定刷新周期不指定的话为1秒
+ ServerFailureLimit: 一个redis实例出现多少次才错误以后将其标记为失效 + ServerTimeout: 请求在predixy中最长的处理/等待时间如果超过该时间redis还没有响应的话那么predixy会关闭同redis的连接并给客户端一个错误响应对于blpop这种阻塞式命令该选项不起作用为0则禁止此功能即如果redis不返回就一直等待不指定的话为0
+ ServerRetryTimeout: 一个redis实例失效后多久后去检查其是否恢复正常 + ServerFailureLimit: 一个redis实例出现多少次才错误以后将其标记为失效不指定的话为10
+ ServerRetryTimeout: 一个redis实例失效后多久后去检查其是否恢复正常不指定的话为1秒
+ KeepAlive: predixy与redis的连接tcp keepalive时间为0则禁止此功能不指定的话为0
+ Sentinels: 里面定义redis sentinel实例的地址 + Sentinels: 里面定义redis sentinel实例的地址
+ Group: 定义一个redis组Group的名字应该和redis sentinel里面的名字一致Group里可以显示列出redis的地址列出的话就是上面提到的静态节点 + Group: 定义一个redis组Group的名字应该和redis sentinel里面的名字一致Group里可以显示列出redis的地址列出的话就是上面提到的静态节点
@ -262,12 +266,14 @@ predixy支持Redis Sentinel和Redis Cluster来使用redis一个配置里这
StaticSlaveReadPriority 50 StaticSlaveReadPriority 50
DynamicSlaveReadPriority 50 DynamicSlaveReadPriority 50
RefreshInterval 1 RefreshInterval 1
ServerTimeout 1
ServerFailureLimit 10 ServerFailureLimit 10
ServerRetryTimeout 1 ServerRetryTimeout 1
KeepAlive 120
Sentinels { Sentinels {
+ 10.2.2.2 + 10.2.2.2:7500
+ 10.2.2.3 + 10.2.2.3:7500
+ 10.2.2.4 + 10.2.2.4:7500
} }
Group shard001 { Group shard001 {
} }
@ -275,7 +281,7 @@ predixy支持Redis Sentinel和Redis Cluster来使用redis一个配置里这
} }
} }
这个Redis Sentinel集群定义指定了三个redis sentinel实例分别是10.2.2.2、10.2.2.3、10.2.2.4定义了两组redis分别是shard001、shard002。没有指定任何静态redis节点。所有redis实例都没有开启密码认证它们都具有16个db。predixy用crc16计算key的哈希值然后通过modula也就是求模的办法将key分布到shard001或shard002中去。由于MasterReadPriority为60,比DynamicSlaveReadPriority的50要大所以读请求都会分发到redis master节点RefreshInterval为1,每一秒钟向redis sentinel发送请求刷新集群信息。redis实例失败累计达到10次后将该redis实例标记失效每间隔1秒钟后检查其是否恢复。 这个Redis Sentinel集群定义指定了三个redis sentinel实例分别是10.2.2.2:7500、10.2.2.3:7500、10.2.2.4:7500定义了两组redis分别是shard001、shard002。没有指定任何静态redis节点。所有redis实例都没有开启密码认证它们都具有16个db。predixy用crc16计算key的哈希值然后通过modula也就是求模的办法将key分布到shard001或shard002中去。由于MasterReadPriority为60,比DynamicSlaveReadPriority的50要大所以读请求都会分发到redis master节点RefreshInterval为1,每一秒钟向redis sentinel发送请求刷新集群信息。redis实例失败累计达到10次后将该redis实例标记失效每间隔1秒钟后检查其是否恢复。
### Redis Cluster形式 ### Redis Cluster形式
@ -287,8 +293,10 @@ predixy支持Redis Sentinel和Redis Cluster来使用redis一个配置里这
[StaticSlaveReadPriority [0-100]] [StaticSlaveReadPriority [0-100]]
[DynamicSlaveReadPriority [0-100]] [DynamicSlaveReadPriority [0-100]]
[RefreshInterval seconds] [RefreshInterval seconds]
[ServerTimeout number[s|ms|us]]
[ServerFailureLimit number] [ServerFailureLimit number]
[ServerRetryTimeout seconds] [ServerRetryTimeout number[s|ms|us]]
[KeepAlive seconds]
Servers { Servers {
+ addr + addr
... ...
@ -308,8 +316,10 @@ predixy支持Redis Sentinel和Redis Cluster来使用redis一个配置里这
StaticSlaveReadPriority 50 StaticSlaveReadPriority 50
DynamicSlaveReadPriority 50 DynamicSlaveReadPriority 50
RefreshInterval 1 RefreshInterval 1
ServerTimeout 1
ServerFailureLimit 10 ServerFailureLimit 10
ServerRetryTimeout 1 ServerRetryTimeout 1
KeepAlive 120
Servers { Servers {
+ 192.168.2.107:2211 + 192.168.2.107:2211
+ 192.168.2.107:2212 + 192.168.2.107:2212

BIN
doc/wechat-cppfan.jpeg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

View File

@ -29,6 +29,7 @@ public:
typedef AcceptConnection Value; typedef AcceptConnection Value;
typedef ListNode<AcceptConnection, SharePtr<AcceptConnection>> ListNodeType; typedef ListNode<AcceptConnection, SharePtr<AcceptConnection>> ListNodeType;
typedef DequeNode<AcceptConnection, SharePtr<AcceptConnection>> DequeNodeType; typedef DequeNode<AcceptConnection, SharePtr<AcceptConnection>> DequeNodeType;
typedef Alloc<AcceptConnection, Const::AcceptConnectionAllocCacheSize> Allocator;
public: public:
AcceptConnection(int fd, sockaddr* addr, socklen_t len); AcceptConnection(int fd, sockaddr* addr, socklen_t len);
~AcceptConnection(); ~AcceptConnection();
@ -97,6 +98,6 @@ private:
typedef List<AcceptConnection> AcceptConnectionList; typedef List<AcceptConnection> AcceptConnectionList;
typedef Deque<AcceptConnection> AcceptConnectionDeque; typedef Deque<AcceptConnection> AcceptConnectionDeque;
typedef Alloc<AcceptConnection, Const::AcceptConnectionAllocCacheSize> AcceptConnectionAlloc; typedef AcceptConnection::Allocator AcceptConnectionAlloc;
#endif #endif

View File

@ -76,7 +76,7 @@ public:
} }
UsedMemory += allocSize<T>(); UsedMemory += allocSize<T>();
if (MaxMemory == 0 || UsedMemory <= MaxMemory) { if (MaxMemory == 0 || UsedMemory <= MaxMemory) {
void* p = ::operator new(allocSize<T>()); void* p = ::operator new(allocSize<T>(), std::nothrow);
if (p) { if (p) {
try { try {
obj = new (p) T(args...); obj = new (p) T(args...);
@ -145,7 +145,7 @@ public:
{ {
int n = --mCnt; int n = --mCnt;
if (n == 0) { if (n == 0) {
Alloc<T>::destroy(static_cast<T*>(this)); T::Allocator::destroy(static_cast<T*>(this));
} else if (n < 0) { } else if (n < 0) {
logError("unref object %p with cnt %d", this, n); logError("unref object %p with cnt %d", this, n);
abort(); abort();

View File

@ -12,10 +12,10 @@
#if _PREDIXY_BACKTRACE_ #if _PREDIXY_BACKTRACE_
#include <execinfo.h> #include <execinfo.h>
inline void traceInfo() inline void traceInfo(int sig)
{ {
#define Size 128 #define Size 128
logError("predixy backtrace"); logError("predixy backtrace(%d)", sig);
void* buf[Size]; void* buf[Size];
int num = ::backtrace(buf, Size); int num = ::backtrace(buf, Size);
int fd = -1; int fd = -1;
@ -32,9 +32,9 @@ inline void traceInfo()
#else #else
inline void traceInfo() inline void traceInfo(int sig)
{ {
logError("predixy backtrace, but current system unspport backtrace"); logError("predixy backtrace(%d), but current system unspport backtrace", sig);
} }
#endif #endif

View File

@ -23,6 +23,7 @@ class Buffer :
public RefCntObj<Buffer> public RefCntObj<Buffer>
{ {
public: public:
typedef Alloc<Buffer, Const::BufferAllocCacheSize> Allocator;
static const int MaxBufFmtAppendLen = 8192; static const int MaxBufFmtAppendLen = 8192;
public: public:
Buffer& operator=(const Buffer&); Buffer& operator=(const Buffer&);
@ -92,12 +93,13 @@ private:
}; };
typedef List<Buffer> BufferList; typedef List<Buffer> BufferList;
typedef Buffer::Allocator BufferAlloc;
template<> template<>
inline int allocSize<Buffer>() inline int allocSize<Buffer>()
{ {
return Buffer::getSize() + sizeof(Buffer); return Buffer::getSize() + sizeof(Buffer);
} }
typedef Alloc<Buffer, Const::BufferAllocCacheSize> BufferAlloc;
struct BufferPos struct BufferPos
{ {

View File

@ -9,8 +9,9 @@
#include <map> #include <map>
#include "String.h" #include "String.h"
#include "Command.h" #include "Command.h"
#include "Conf.h"
const Command Command::CmdPool[Sentinel] = { Command Command::CmdPool[AvailableCommands] = {
{None, "", 0, MaxArgs, Read}, {None, "", 0, MaxArgs, Read},
{Ping, "ping", 1, 2, Read}, {Ping, "ping", 1, 2, Read},
{PingServ, "ping", 1, 2, Inner}, {PingServ, "ping", 1, 2, Inner},
@ -86,8 +87,8 @@ const Command Command::CmdPool[Sentinel] = {
{Setrange, "setrange", 4, 4, Write}, {Setrange, "setrange", 4, 4, Write},
{Strlen, "strlen", 2, 2, Read}, {Strlen, "strlen", 2, 2, Read},
{Hdel, "hdel", 3, MaxArgs, Write}, {Hdel, "hdel", 3, MaxArgs, Write},
{Hexists, "hexists", 3, 3, Write}, {Hexists, "hexists", 3, 3, Read},
{Hget, "hget", 3, 3, Write}, {Hget, "hget", 3, 3, Read},
{Hgetall, "hgetall", 2, 2, Read}, {Hgetall, "hgetall", 2, 2, Read},
{Hincrby, "hincrby", 4, 4, Write}, {Hincrby, "hincrby", 4, 4, Write},
{Hincrbyfloat, "hincrbyfloat", 4, 4, Write}, {Hincrbyfloat, "hincrbyfloat", 4, 4, Write},
@ -96,7 +97,7 @@ const Command Command::CmdPool[Sentinel] = {
{Hmget, "hmget", 3, MaxArgs, Read}, {Hmget, "hmget", 3, MaxArgs, Read},
{Hmset, "hmset", 4, MaxArgs, Write}, {Hmset, "hmset", 4, MaxArgs, Write},
{Hscan, "hscan", 3, 7, Read}, {Hscan, "hscan", 3, 7, Read},
{Hset, "hset", 4, 4, Write}, {Hset, "hset", 4, MaxArgs, Write},
{Hsetnx, "hsetnx", 4, 4, Write}, {Hsetnx, "hsetnx", 4, 4, Write},
{Hstrlen, "hstrlen", 3, 3, Read}, {Hstrlen, "hstrlen", 3, 3, Read},
{Hvals, "hvals", 2, 2, Read}, {Hvals, "hvals", 2, 2, Read},
@ -109,7 +110,7 @@ const Command Command::CmdPool[Sentinel] = {
{Lpop, "lpop", 2, 2, Write}, {Lpop, "lpop", 2, 2, Write},
{Lpush, "lpush", 3, MaxArgs, Write}, {Lpush, "lpush", 3, MaxArgs, Write},
{Lpushx, "lpushx", 3, 3, Write}, {Lpushx, "lpushx", 3, 3, Write},
{Lrange, "lrange", 4, 4, Write}, {Lrange, "lrange", 4, 4, Read},
{Lrem, "lrem", 4, 4, Write}, {Lrem, "lrem", 4, 4, Write},
{Lset, "lset", 4, 4, Write}, {Lset, "lset", 4, 4, Write},
{Ltrim, "ltrim", 4, 4, Write}, {Ltrim, "ltrim", 4, 4, Write},
@ -138,6 +139,8 @@ const Command Command::CmdPool[Sentinel] = {
{Zincrby, "zincrby", 4, 4, Write}, {Zincrby, "zincrby", 4, 4, Write},
{Zinterstore, "zinterstore", 4, MaxArgs, Write}, {Zinterstore, "zinterstore", 4, MaxArgs, Write},
{Zlexcount, "zlexcount", 4, 4, Read}, {Zlexcount, "zlexcount", 4, 4, Read},
{Zpopmax, "zpopmax", 2, 3, Write},
{Zpopmin, "zpopmin", 2, 3, Write},
{Zrange, "zrange", 4, 5, Read}, {Zrange, "zrange", 4, 5, Read},
{Zrangebylex, "zrangebylex", 4, 7, Read}, {Zrangebylex, "zrangebylex", 4, 7, Read},
{Zrangebyscore, "zrangebyscore", 4, 8, Read}, {Zrangebyscore, "zrangebyscore", 4, 8, Read},
@ -171,11 +174,14 @@ const Command Command::CmdPool[Sentinel] = {
{SubMsg, "\000SubMsg", 0, 0, Admin} {SubMsg, "\000SubMsg", 0, 0, Admin}
}; };
int Command::Sentinel = Command::MaxCommands;
Command::CommandMap Command::CmdMap; Command::CommandMap Command::CmdMap;
void Command::init() void Command::init()
{ {
int type = 0; int type = 0;
for (auto& i : CmdPool) { for (auto j = 0; j < MaxCommands; j++) {
const auto& i = CmdPool[j];
if (i.type != type) { if (i.type != type) {
Throw(InitFail, "command %s unmatch the index in commands table", i.name); Throw(InitFail, "command %s unmatch the index in commands table", i.name);
} }
@ -187,3 +193,19 @@ void Command::init()
} }
} }
void Command::addCustomCommand(const CustomCommandConf& ccc) {
if (Sentinel >= AvailableCommands) {
Throw(InitFail, "too many custom commands(>%d)", MaxCustomCommands);
}
if (nullptr != find(ccc.name)) {
Throw(InitFail, "custom command %s is duplicated", ccc.name.c_str());
}
auto* p = &CmdPool[Sentinel];
p->name = ccc.name.c_str();
p->minArgs = ccc.minArgs;
p->maxArgs = ccc.maxArgs;
p->mode = ccc.mode;
p->type = (Command::Type)Sentinel++;
CmdMap[ccc.name] = p;
}

View File

@ -11,6 +11,8 @@
#include "Exception.h" #include "Exception.h"
#include "HashFunc.h" #include "HashFunc.h"
struct CustomCommandConf;
class Command class Command
{ {
public: public:
@ -154,6 +156,8 @@ public:
Zincrby, Zincrby,
Zinterstore, Zinterstore,
Zlexcount, Zlexcount,
Zpopmax,
Zpopmin,
Zrange, Zrange,
Zrangebylex, Zrangebylex,
Zrangebyscore, Zrangebyscore,
@ -189,7 +193,9 @@ public:
Unsubscribe, Unsubscribe,
SubMsg, SubMsg,
Sentinel MaxCommands,
MaxCustomCommands = 16,
AvailableCommands = MaxCommands + MaxCustomCommands,
}; };
enum Mode enum Mode
{ {
@ -249,9 +255,11 @@ public:
auto it = CmdMap.find(cmd); auto it = CmdMap.find(cmd);
return it == CmdMap.end() ? nullptr : it->second; return it == CmdMap.end() ? nullptr : it->second;
} }
static void addCustomCommand(const CustomCommandConf& pc);
static int Sentinel;
private: private:
static const int MaxArgs = 100000000; static const int MaxArgs = 100000000;
static const Command CmdPool[Sentinel]; static Command CmdPool[];
class H class H
{ {
public: public:

View File

@ -10,7 +10,7 @@
#include <limits.h> #include <limits.h>
#define _PREDIXY_NAME_ "predixy" #define _PREDIXY_NAME_ "predixy"
#define _PREDIXY_VERSION_ "1.0.3" #define _PREDIXY_VERSION_ "1.0.5"
namespace Const namespace Const
{ {
@ -32,8 +32,8 @@ namespace Const
static const int MaxCmdLen = 32; static const int MaxCmdLen = 32;
static const int MaxKeyLen = 512; static const int MaxKeyLen = 512;
static const int BufferAllocCacheSize = 64; static const int BufferAllocCacheSize = 64;
static const int RequestAllocCacheSize = 32; static const int RequestAllocCacheSize = 128;
static const int ResponseAllocCacheSize = 32; static const int ResponseAllocCacheSize = 128;
static const int AcceptConnectionAllocCacheSize = 32; static const int AcceptConnectionAllocCacheSize = 32;
static const int ConnectConnectionAllocCacheSize = 4; static const int ConnectConnectionAllocCacheSize = 4;
}; };

View File

@ -6,6 +6,7 @@
#include <ctype.h> #include <ctype.h>
#include <iostream> #include <iostream>
#include <sstream>
#include <fstream> #include <fstream>
#include "LogFileSink.h" #include "LogFileSink.h"
#include "ServerPool.h" #include "ServerPool.h"
@ -32,6 +33,13 @@ bool ServerConf::parse(ServerConf& s, const char* str)
return !s.addr.empty(); return !s.addr.empty();
} }
void CustomCommandConf::init(CustomCommandConf&c, const char* name) {
c.name = name;
c.minArgs = 2;
c.maxArgs = 2;
c.mode = Command::Write;
}
Conf::Conf(): Conf::Conf():
mBind("0.0.0.0:7617"), mBind("0.0.0.0:7617"),
mWorkerThreads(1), mWorkerThreads(1),
@ -49,8 +57,6 @@ Conf::Conf():
mLogSample[LogLevel::Notice] = 1; mLogSample[LogLevel::Notice] = 1;
mLogSample[LogLevel::Warn] = 1; mLogSample[LogLevel::Warn] = 1;
mLogSample[LogLevel::Error] = 1; mLogSample[LogLevel::Error] = 1;
mSentinelServerPool.refreshInterval = 1;
mClusterServerPool.refreshInterval = 1;
} }
Conf::~Conf() Conf::~Conf()
@ -119,8 +125,9 @@ void Conf::setGlobal(const ConfParser::Node* node)
{ {
const ConfParser::Node* authority = nullptr; const ConfParser::Node* authority = nullptr;
const ConfParser::Node* clusterServerPool = nullptr; const ConfParser::Node* clusterServerPool = nullptr;
const ConfParser::Node* sentinelServerPool = nullptr; const ConfParser::Node* standaloneServerPool = nullptr;
const ConfParser::Node* dataCenter = nullptr; const ConfParser::Node* dataCenter = nullptr;
std::vector<const ConfParser::Node*> latencyMonitors;
for (auto p = node; p; p = p->next) { for (auto p = node; p; p = p->next) {
if (setStr(mName, "Name", p)) { if (setStr(mName, "Name", p)) {
} else if (setStr(mBind, "Bind", p)) { } else if (setStr(mBind, "Bind", p)) {
@ -149,16 +156,19 @@ void Conf::setGlobal(const ConfParser::Node* node)
} else if (setInt(mLogSample[LogLevel::Warn], "LogWarnSample", p)) { } else if (setInt(mLogSample[LogLevel::Warn], "LogWarnSample", p)) {
} else if (setInt(mLogSample[LogLevel::Error], "LogErrorSample", p)) { } else if (setInt(mLogSample[LogLevel::Error], "LogErrorSample", p)) {
} else if (strcasecmp(p->key.c_str(), "LatencyMonitor") == 0) { } else if (strcasecmp(p->key.c_str(), "LatencyMonitor") == 0) {
mLatencyMonitors.push_back(LatencyMonitorConf{}); latencyMonitors.push_back(p);
setLatencyMonitor(mLatencyMonitors.back(), p);
} else if (strcasecmp(p->key.c_str(), "Authority") == 0) { } else if (strcasecmp(p->key.c_str(), "Authority") == 0) {
authority = p; authority = p;
} else if (strcasecmp(p->key.c_str(), "ClusterServerPool") == 0) { } else if (strcasecmp(p->key.c_str(), "ClusterServerPool") == 0) {
clusterServerPool = p; clusterServerPool = p;
} else if (strcasecmp(p->key.c_str(), "SentinelServerPool") == 0) { } else if (strcasecmp(p->key.c_str(), "SentinelServerPool") == 0) {
sentinelServerPool = p; standaloneServerPool = p;
} else if (strcasecmp(p->key.c_str(), "StandaloneServerPool") == 0) {
standaloneServerPool = p;
} else if (strcasecmp(p->key.c_str(), "DataCenter") == 0) { } else if (strcasecmp(p->key.c_str(), "DataCenter") == 0) {
dataCenter = p; dataCenter = p;
} else if (strcasecmp(p->key.c_str(), "CustomCommand") == 0) {
setCustomCommand(p);
} else { } else {
Throw(UnknownKey, "%s:%d unknown key %s", p->file, p->line, p->key.c_str()); Throw(UnknownKey, "%s:%d unknown key %s", p->file, p->line, p->key.c_str());
} }
@ -166,20 +176,27 @@ void Conf::setGlobal(const ConfParser::Node* node)
if (authority) { if (authority) {
setAuthority(authority); setAuthority(authority);
} }
if (clusterServerPool && sentinelServerPool) { if (clusterServerPool && standaloneServerPool) {
Throw(LogicError, "Can't define ClusterServerPool and SentinelServerPool at the same time"); Throw(LogicError, "Can't define ClusterServerPool/StandaloneServerPool at the same time");
} else if (clusterServerPool) { } else if (clusterServerPool) {
setClusterServerPool(clusterServerPool); setClusterServerPool(clusterServerPool);
mServerPoolType = ServerPool::Cluster; mServerPoolType = ServerPool::Cluster;
} else if (sentinelServerPool) { } else if (standaloneServerPool) {
setSentinelServerPool(sentinelServerPool); if (strcasecmp(standaloneServerPool->key.c_str(), "SentinelServerPool") == 0) {
mServerPoolType = ServerPool::Sentinel; mStandaloneServerPool.refreshMethod = ServerPoolRefreshMethod::Sentinel;
}
setStandaloneServerPool(standaloneServerPool);
mServerPoolType = ServerPool::Standalone;
} else { } else {
Throw(LogicError, "Must define a server pool"); Throw(LogicError, "Must define a server pool");
} }
if (dataCenter) { if (dataCenter) {
setDataCenter(dataCenter); setDataCenter(dataCenter);
} }
for (auto& latencyMonitor : latencyMonitors) {
mLatencyMonitors.push_back(LatencyMonitorConf{});
setLatencyMonitor(mLatencyMonitors.back(), latencyMonitor);
}
} }
static void setKeyPrefix(std::vector<std::string>& dat, const std::string& v) static void setKeyPrefix(std::vector<std::string>& dat, const std::string& v)
@ -241,24 +258,21 @@ void Conf::setAuthority(const ConfParser::Node* node)
bool Conf::setServerPool(ServerPoolConf& sp, const ConfParser::Node* p) bool Conf::setServerPool(ServerPoolConf& sp, const ConfParser::Node* p)
{ {
bool ret = true;
if (setStr(sp.password, "Password", p)) { if (setStr(sp.password, "Password", p)) {
return true;
} else if (setInt(sp.masterReadPriority, "MasterReadPriority", p, 0, 100)) { } else if (setInt(sp.masterReadPriority, "MasterReadPriority", p, 0, 100)) {
return true;
} else if (setInt(sp.staticSlaveReadPriority, "StaticSlaveReadPriority", p, 0, 100)) { } else if (setInt(sp.staticSlaveReadPriority, "StaticSlaveReadPriority", p, 0, 100)) {
return true;
} else if (setInt(sp.dynamicSlaveReadPriority, "DynamicSlaveReadPriority", p, 0, 100)) { } else if (setInt(sp.dynamicSlaveReadPriority, "DynamicSlaveReadPriority", p, 0, 100)) {
return true; } else if (setDuration(sp.refreshInterval, "RefreshInterval", p)) {
} else if (setInt(sp.refreshInterval, "RefreshInterval", p, 1)) { } else if (setDuration(sp.serverTimeout, "ServerTimeout", p)) {
return true;
} else if (setInt(sp.serverFailureLimit, "ServerFailureLimit", p, 1)) { } else if (setInt(sp.serverFailureLimit, "ServerFailureLimit", p, 1)) {
return true; } else if (setDuration(sp.serverRetryTimeout, "ServerRetryTimeout", p)) {
} else if (setInt(sp.serverRetryTimeout, "ServerRetryTimeout", p, 1)) { } else if (setInt(sp.keepalive, "KeepAlive", p, 0)) {
return true;
} else if (setInt(sp.databases, "Databases", p, 1, 128)) { } else if (setInt(sp.databases, "Databases", p, 1, 128)) {
return true; } else {
ret = false;
} }
return false; return ret;
} }
void Conf::setClusterServerPool(const ConfParser::Node* node) void Conf::setClusterServerPool(const ConfParser::Node* node)
@ -282,41 +296,47 @@ void Conf::setClusterServerPool(const ConfParser::Node* node)
} }
} }
void Conf::setSentinelServerPool(const ConfParser::Node* node) void Conf::setStandaloneServerPool(const ConfParser::Node* node)
{ {
if (!node->sub) { if (!node->sub) {
Throw(InvalidValue, "%s:%d SentinelServerPool require scope value", node->file, node->line); Throw(InvalidValue, "%s:%d StandaloneServerPool require scope value", node->file, node->line);
} }
mSentinelServerPool.hashTag[0] = '\0'; mStandaloneServerPool.hashTag[0] = '\0';
mSentinelServerPool.hashTag[1] = '\0'; mStandaloneServerPool.hashTag[1] = '\0';
for (auto p = node->sub; p; p = p->next) { for (auto p = node->sub; p; p = p->next) {
if (setServerPool(mSentinelServerPool, p)) { if (setServerPool(mStandaloneServerPool, p)) {
} else if (strcasecmp(p->key.c_str(), "RefreshMethod") == 0) {
try {
mStandaloneServerPool.refreshMethod = ServerPoolRefreshMethod::parse(p->val.c_str());
} catch (ServerPoolRefreshMethod::InvalidEnumValue& excp) {
Throw(InvalidValue, "%s:%d unknown RefreshMethod:%s", p->file, p->line, p->val.c_str());
}
} else if (strcasecmp(p->key.c_str(), "Distribution") == 0) { } else if (strcasecmp(p->key.c_str(), "Distribution") == 0) {
mSentinelServerPool.dist = Distribution::parse(p->val.c_str()); mStandaloneServerPool.dist = Distribution::parse(p->val.c_str());
if (mSentinelServerPool.dist == Distribution::None) { if (mStandaloneServerPool.dist == Distribution::None) {
Throw(InvalidValue, "%s:%d unknown Distribution", p->file, p->line); Throw(InvalidValue, "%s:%d unknown Distribution", p->file, p->line);
} }
} else if (strcasecmp(p->key.c_str(), "Hash") == 0) { } else if (strcasecmp(p->key.c_str(), "Hash") == 0) {
mSentinelServerPool.hash = Hash::parse(p->val.c_str()); mStandaloneServerPool.hash = Hash::parse(p->val.c_str());
if (mSentinelServerPool.hash == Hash::None) { if (mStandaloneServerPool.hash == Hash::None) {
Throw(InvalidValue, "%s:%d unknown Hash", p->file, p->line); Throw(InvalidValue, "%s:%d unknown Hash", p->file, p->line);
} }
} else if (strcasecmp(p->key.c_str(), "HashTag") == 0) { } else if (strcasecmp(p->key.c_str(), "HashTag") == 0) {
if (p->val.empty()) { if (p->val.empty()) {
mSentinelServerPool.hashTag[0] = '\0'; mStandaloneServerPool.hashTag[0] = '\0';
mSentinelServerPool.hashTag[1] = '\0'; mStandaloneServerPool.hashTag[1] = '\0';
} else if (p->val.size() == 2) { } else if (p->val.size() == 2) {
mSentinelServerPool.hashTag[0] = p->val[0]; mStandaloneServerPool.hashTag[0] = p->val[0];
mSentinelServerPool.hashTag[1] = p->val[1]; mStandaloneServerPool.hashTag[1] = p->val[1];
} else { } else {
Throw(InvalidValue, "%s:%d HashTag invalid", p->file, p->line); Throw(InvalidValue, "%s:%d HashTag invalid", p->file, p->line);
} }
} else if (setServers(mSentinelServerPool.sentinels, "Sentinels", p)) { } else if (setServers(mStandaloneServerPool.sentinels, "Sentinels", p)) {
mSentinelServerPool.sentinelPassword = p->val; mStandaloneServerPool.sentinelPassword = p->val;
} else if (strcasecmp(p->key.c_str(), "Group") == 0) { } else if (strcasecmp(p->key.c_str(), "Group") == 0) {
mSentinelServerPool.groups.push_back(ServerGroupConf{p->val}); mStandaloneServerPool.groups.push_back(ServerGroupConf{p->val});
if (p->sub) { if (p->sub) {
auto& g = mSentinelServerPool.groups.back(); auto& g = mStandaloneServerPool.groups.back();
setServers(g.servers, "Group", p); setServers(g.servers, "Group", p);
} }
} else { } else {
@ -324,18 +344,31 @@ void Conf::setSentinelServerPool(const ConfParser::Node* node)
p->file, p->line, p->key.c_str()); p->file, p->line, p->key.c_str());
} }
} }
if (mSentinelServerPool.sentinels.empty()) { if (mStandaloneServerPool.groups.empty()) {
Throw(LogicError, "SentinelServerPool no sentinel server"); Throw(LogicError, "StandaloneServerPool no server group");
} }
if (mSentinelServerPool.groups.empty()) { if (mStandaloneServerPool.refreshMethod == ServerPoolRefreshMethod::None) {
Throw(LogicError, "SentinelServerPool no server group"); Throw(LogicError, "StandaloneServerPool must define RefreshMethod");
} } else if (mStandaloneServerPool.refreshMethod == ServerPoolRefreshMethod::Sentinel) {
if (mSentinelServerPool.groups.size() > 1) { if (mStandaloneServerPool.sentinels.empty()) {
if (mSentinelServerPool.dist == Distribution::None) { Throw(LogicError, "StandaloneServerPool with RefreshMethod(sentinel) but no sentinel servers");
Throw(LogicError, "SentinelServerPool must define Dsitribution in multi groups");
} }
if (mSentinelServerPool.hash == Hash::None) { } else {
Throw(LogicError, "SentinelServerPool must define Hash in multi groups"); if (!mStandaloneServerPool.sentinels.empty()) {
Throw(LogicError, "StandaloneServerPool with Sentinels but RefreshMethod is not sentinel");
}
for (auto& g : mStandaloneServerPool.groups) {
if (g.servers.empty()) {
Throw(LogicError, "Group(%s) must add servers", g.name.c_str());
}
}
}
if (mStandaloneServerPool.groups.size() > 1) {
if (mStandaloneServerPool.dist == Distribution::None) {
Throw(LogicError, "StandaloneServerPool must define Dsitribution in multi groups");
}
if (mStandaloneServerPool.hash == Hash::None) {
Throw(LogicError, "StandaloneServerPool must define Hash in multi groups");
} }
} }
} }
@ -356,6 +389,83 @@ void Conf::setDataCenter(const ConfParser::Node* node)
} }
} }
void Conf::setCustomCommand(const ConfParser::Node* node)
{
if (!node->sub) {
Throw(InvalidValue, "%s:%d CustomCommand require scope value", node->file, node->line);
}
for (auto p = node->sub; p; p = p->next) {
mCustomCommands.push_back(CustomCommandConf{});
auto& cc = mCustomCommands.back();
CustomCommandConf::init(cc, p->key.c_str());
auto s = p->sub;
for (;s ; s = s->next) {
if (setInt(cc.minArgs, "MinArgs", s, 2)) {
} else if (setInt(cc.maxArgs, "MaxArgs", s, 2, 9999)) {
} else if (setCommandMode(cc.mode, "Mode", s)) {
} else {
Throw(UnknownKey, "%s:%d unknown key %s", s->file, s->line, s->key.c_str());
}
}
if (cc.maxArgs < cc.minArgs) {
Throw(InvalidValue, "%s:%d must be MaxArgs >= MinArgs", p->file, p->line);
}
}
for (const auto& cc : mCustomCommands) {
Command::addCustomCommand(cc);
}
}
bool Conf::setCommandMode(int& mode, const char* name, const ConfParser::Node* n, const int defaultMode)
{
if (strcasecmp(name, n->key.c_str()) != 0) {
return false;
}
if (n->val.size() == 0) {
mode = defaultMode;
} else {
mode = 0;
std::string mask;
std::istringstream is(n->val);
while (std::getline(is, mask, '|')) {
if ((strcasecmp(mask.c_str(), "write") == 0)) {
mode |= Command::Write;
} else if ((strcasecmp(mask.c_str(), "read") == 0)) {
mode |= Command::Read;
} else if ((strcasecmp(mask.c_str(), "admin") == 0)) {
mode |= Command::Admin;
} else if ((strcasecmp(mask.c_str(), "keyAt2") == 0)) {
mode |= Command::KeyAt2;
} else if ((strcasecmp(mask.c_str(), "keyAt3") == 0)) {
mode |= Command::KeyAt3;
} else {
Throw(InvalidValue, "%s:%d unknown mode %s", n->file, n->line, mask.c_str());
}
}
switch (mode & Command::KeyMask) {
case 0:
case Command::KeyAt2:
case Command::KeyAt3:
break;
default:
Throw(InvalidValue, "%s:%d %s require exclusive key pos", n->file, n->line, name);
}
switch (mode & Command::AuthMask) {
case 0:
mode |= Command::Write;
break;
case Command::Read:
case Command::Write:
case Command::Admin:
break;
default:
Throw(InvalidValue, "%s:%d %s require exclusive mode", n->file, n->line, name);
}
}
return true;
}
void Conf::setDC(DCConf& dc, const ConfParser::Node* node) void Conf::setDC(DCConf& dc, const ConfParser::Node* node)
{ {
if (!node->sub) { if (!node->sub) {
@ -568,6 +678,27 @@ bool Conf::parseMemory(long& m, const char* str)
return m >= 0; return m >= 0;
} }
bool Conf::parseDuration(long& v, const char* str)
{
char u[4];
int c = sscanf(str, "%ld%3s", &v, u);
if (c == 2 && v > 0) {
if (strcasecmp(u, "s") == 0) {
v *= 1000000;
} else if (strcasecmp(u, "m") == 0 || strcasecmp(u, "ms") == 0) {
v *= 1000;
} else if (strcasecmp(u, "u") == 0 || strcasecmp(u, "us") == 0) {
} else {
return false;
}
} else if (c == 1) {
v *= 1000000;
} else {
return false;
}
return v >= 0;
}
bool Conf::setMemory(long& m, const char* name, const ConfParser::Node* n) bool Conf::setMemory(long& m, const char* name, const ConfParser::Node* n)
{ {
if (strcasecmp(name, n->key.c_str()) != 0) { if (strcasecmp(name, n->key.c_str()) != 0) {
@ -580,6 +711,18 @@ bool Conf::setMemory(long& m, const char* name, const ConfParser::Node* n)
return true; return true;
} }
bool Conf::setDuration(long& v, const char* name, const ConfParser::Node* n)
{
if (strcasecmp(name, n->key.c_str()) != 0) {
return false;
}
if (!parseDuration(v, n->val.c_str())) {
Throw(InvalidValue, "%s:%d %s invalid duration value \"%s\"",
n->file, n->line, name, n->val.c_str());
}
return true;
}
bool Conf::setServers(std::vector<ServerConf>& servs, const char* name, const ConfParser::Node* p) bool Conf::setServers(std::vector<ServerConf>& servs, const char* name, const ConfParser::Node* p)
{ {
if (strcasecmp(p->key.c_str(), name) != 0) { if (strcasecmp(p->key.c_str(), name) != 0) {

View File

@ -19,6 +19,8 @@
#include "Distribution.h" #include "Distribution.h"
#include "ConfParser.h" #include "ConfParser.h"
#include "Auth.h" #include "Auth.h"
#include "Command.h"
#include "Enums.h"
struct AuthConf struct AuthConf
{ {
@ -49,9 +51,11 @@ struct ServerPoolConf
int masterReadPriority = 50; int masterReadPriority = 50;
int staticSlaveReadPriority = 0; int staticSlaveReadPriority = 0;
int dynamicSlaveReadPriority = 0; int dynamicSlaveReadPriority = 0;
int refreshInterval = 1; //seconds long refreshInterval = 1000000; //us
long serverTimeout = 0; //us
int serverFailureLimit = 10; int serverFailureLimit = 10;
int serverRetryTimeout = 1; //seconds long serverRetryTimeout = 1000000; //us
int keepalive = 0; //seconds
int databases = 1; int databases = 1;
}; };
@ -60,8 +64,9 @@ struct ClusterServerPoolConf : public ServerPoolConf
std::vector<ServerConf> servers; std::vector<ServerConf> servers;
}; };
struct SentinelServerPoolConf : public ServerPoolConf struct StandaloneServerPoolConf : public ServerPoolConf
{ {
ServerPoolRefreshMethod refreshMethod = ServerPoolRefreshMethod::None;
Distribution dist = Distribution::None; Distribution dist = Distribution::None;
Hash hash = Hash::None; Hash hash = Hash::None;
char hashTag[2]; char hashTag[2];
@ -87,10 +92,20 @@ struct DCConf
struct LatencyMonitorConf struct LatencyMonitorConf
{ {
std::string name; std::string name;
std::bitset<Command::Sentinel> cmds; std::bitset<Command::AvailableCommands> cmds;
std::vector<long> timeSpan;//us std::vector<long> timeSpan;//us
}; };
struct CustomCommandConf
{
std::string name;
int minArgs;
int maxArgs;
int mode;
static void init(CustomCommandConf &c, const char* name);
};
class Conf class Conf
{ {
public: public:
@ -162,9 +177,9 @@ public:
{ {
return mClusterServerPool; return mClusterServerPool;
} }
const SentinelServerPoolConf& sentinelServerPool() const const StandaloneServerPoolConf& standaloneServerPool() const
{ {
return mSentinelServerPool; return mStandaloneServerPool;
} }
const std::string& localDC() const const std::string& localDC() const
{ {
@ -180,11 +195,12 @@ public:
} }
public: public:
static bool parseMemory(long& m, const char* str); static bool parseMemory(long& m, const char* str);
static bool parseDuration(long& v, const char* str);
private: private:
void setGlobal(const ConfParser::Node* node); void setGlobal(const ConfParser::Node* node);
void setAuthority(const ConfParser::Node* node); void setAuthority(const ConfParser::Node* node);
void setClusterServerPool(const ConfParser::Node* node); void setClusterServerPool(const ConfParser::Node* node);
void setSentinelServerPool(const ConfParser::Node* node); void setStandaloneServerPool(const ConfParser::Node* node);
void setDataCenter(const ConfParser::Node* node); void setDataCenter(const ConfParser::Node* node);
void check(); void check();
bool setServerPool(ServerPoolConf& sp, const ConfParser::Node* n); bool setServerPool(ServerPoolConf& sp, const ConfParser::Node* n);
@ -193,10 +209,13 @@ private:
bool setLong(long& attr, const char* name, const ConfParser::Node* n, long lower = LONG_MIN, long upper = LONG_MAX); bool setLong(long& attr, const char* name, const ConfParser::Node* n, long lower = LONG_MIN, long upper = LONG_MAX);
bool setBool(bool& attr, const char* name, const ConfParser::Node* n); bool setBool(bool& attr, const char* name, const ConfParser::Node* n);
bool setMemory(long& mem, const char* name, const ConfParser::Node* n); bool setMemory(long& mem, const char* name, const ConfParser::Node* n);
bool setDuration(long& v, const char* name, const ConfParser::Node* n);
bool setServers(std::vector<ServerConf>& servs, const char* name, const ConfParser::Node* n); bool setServers(std::vector<ServerConf>& servs, const char* name, const ConfParser::Node* n);
void setDC(DCConf& dc, const ConfParser::Node* n); void setDC(DCConf& dc, const ConfParser::Node* n);
void setReadPolicy(ReadPolicyConf& c, const ConfParser::Node* n); void setReadPolicy(ReadPolicyConf& c, const ConfParser::Node* n);
void setLatencyMonitor(LatencyMonitorConf& m, const ConfParser::Node* n); void setLatencyMonitor(LatencyMonitorConf& m, const ConfParser::Node* n);
void setCustomCommand(const ConfParser::Node* n);
bool setCommandMode(int& mode, const char* name, const ConfParser::Node* n, const int defaultMode = Command::Write);
private: private:
std::string mName; std::string mName;
std::string mBind; std::string mBind;
@ -212,10 +231,11 @@ private:
std::vector<AuthConf> mAuthConfs; std::vector<AuthConf> mAuthConfs;
int mServerPoolType; int mServerPoolType;
ClusterServerPoolConf mClusterServerPool; ClusterServerPoolConf mClusterServerPool;
SentinelServerPoolConf mSentinelServerPool; StandaloneServerPoolConf mStandaloneServerPool;
std::vector<DCConf> mDCConfs; std::vector<DCConf> mDCConfs;
std::string mLocalDC; std::string mLocalDC;
std::vector<LatencyMonitorConf> mLatencyMonitors; std::vector<LatencyMonitorConf> mLatencyMonitors;
std::vector<CustomCommandConf> mCustomCommands;
}; };

View File

@ -283,20 +283,22 @@ Done:
case SValBody: case SValBody:
return KeyVal; return KeyVal;
case VValBody: case VValBody:
{
auto ret = KeyVal;
val.assign(line, pos, line.size() - pos); val.assign(line, pos, line.size() - pos);
if (val.back() == '{') { if (val.back() == '{') {
val.resize(val.size() - 1); val.resize(val.size() - 1);
int vsp = 0; ret = BeginScope;
for (auto it = val.rbegin(); it != val.rend(); ++it) {
if (isspace(*it)) {
++vsp;
}
}
val.resize(val.size() - vsp);
return BeginScope;
} else {
return KeyVal;
} }
int vsp = 0;
for (auto it = val.rbegin(); it != val.rend(); ++it) {
if (isspace(*it)) {
++vsp;
}
}
val.resize(val.size() - vsp);
return ret;
}
case ScopeReady: case ScopeReady:
return KeyVal; return KeyVal;
case ScopeBody: case ScopeBody:

View File

@ -23,6 +23,7 @@ public:
typedef ConnectConnection Value; typedef ConnectConnection Value;
typedef ListNode<ConnectConnection> ListNodeType; typedef ListNode<ConnectConnection> ListNodeType;
typedef DequeNode<ConnectConnection> DequeNodeType; typedef DequeNode<ConnectConnection> DequeNodeType;
typedef Alloc<ConnectConnection, Const::ConnectConnectionAllocCacheSize> Allocator;
public: public:
ConnectConnection(Server* s, bool shared); ConnectConnection(Server* s, bool shared);
~ConnectConnection(); ~ConnectConnection();
@ -73,6 +74,11 @@ public:
{ {
return mSendRequests.size() + mSentRequests.size(); return mSendRequests.size() + mSentRequests.size();
} }
Request* frontRequest() const
{
return !mSentRequests.empty() ? mSentRequests.front() :
(!mSendRequests.empty() ? mSendRequests.front() : nullptr);
}
private: private:
void parse(Handler* h, Buffer* buf, int pos); void parse(Handler* h, Buffer* buf, int pos);
void handleResponse(Handler* h); void handleResponse(Handler* h);
@ -92,6 +98,6 @@ private:
typedef List<ConnectConnection> ConnectConnectionList; typedef List<ConnectConnection> ConnectConnectionList;
typedef Deque<ConnectConnection> ConnectConnectionDeque; typedef Deque<ConnectConnection> ConnectConnectionDeque;
typedef Alloc<ConnectConnection, Const::ConnectConnectionAllocCacheSize> ConnectConnectionAlloc; typedef ConnectConnection::Allocator ConnectConnectionAlloc;
#endif #endif

View File

@ -30,12 +30,14 @@ ConnectConnection* ConnectConnectionPool::getShareConnection(int db)
mHandler->id(), db, (int)mShareConns.size()); mHandler->id(), db, (int)mShareConns.size());
return nullptr; return nullptr;
} }
bool needInit = false;
ConnectConnection* c = mShareConns[db]; ConnectConnection* c = mShareConns[db];
if (!c) { if (!c) {
c = ConnectConnectionAlloc::create(mServ, true); c = ConnectConnectionAlloc::create(mServ, true);
c->setDb(db); c->setDb(db);
++mStats.connections; ++mStats.connections;
mShareConns[db] = c; mShareConns[db] = c;
needInit = true;
logNotice("h %d create server connection %s %d", logNotice("h %d create server connection %s %d",
mHandler->id(), c->peer(), c->fd()); mHandler->id(), c->peer(), c->fd());
} else if (c->fd() < 0) { } else if (c->fd() < 0) {
@ -43,15 +45,17 @@ ConnectConnection* ConnectConnectionPool::getShareConnection(int db)
return nullptr; return nullptr;
} }
c->reopen(); c->reopen();
needInit = true;
logNotice("h %d reopen server connection %s %d", logNotice("h %d reopen server connection %s %d",
mHandler->id(), c->peer(), c->fd()); mHandler->id(), c->peer(), c->fd());
} else {
return c;
} }
if (!init(c)) { if (needInit && !init(c)) {
c->close(mHandler); c->close(mHandler);
return nullptr; return nullptr;
} }
if (mServ->fail()) {
return nullptr;
}
return c; return c;
} }
@ -91,7 +95,7 @@ ConnectConnection* ConnectConnectionPool::getPrivateConnection(int db)
ccl.push_back(c); ccl.push_back(c);
return nullptr; return nullptr;
} }
return c; return mServ->fail() ? nullptr : c;
} }
void ConnectConnectionPool::putPrivateConnection(ConnectConnection* s) void ConnectConnectionPool::putPrivateConnection(ConnectConnection* s)
@ -118,6 +122,11 @@ bool ConnectConnectionPool::init(ConnectConnection* c)
logWarn("h %d s %s %d settcpnodelay fail %s", logWarn("h %d s %s %d settcpnodelay fail %s",
mHandler->id(), c->peer(), c->fd(), StrError()); mHandler->id(), c->peer(), c->fd(), StrError());
} }
auto sp = mHandler->proxy()->serverPool();
if (sp->keepalive() > 0 && !c->setTcpKeepAlive(sp->keepalive())) {
logWarn("h %d s %s %d settcpkeepalive(%d) fail %s",
mHandler->id(), c->peer(), c->fd(), sp->keepalive(),StrError());
}
auto m = mHandler->eventLoop(); auto m = mHandler->eventLoop();
if (!m->addSocket(c, Multiplexor::ReadEvent|Multiplexor::WriteEvent)) { if (!m->addSocket(c, Multiplexor::ReadEvent|Multiplexor::WriteEvent)) {
logWarn("h %d s %s %d add to eventloop fail", logWarn("h %d s %s %d add to eventloop fail",
@ -141,7 +150,6 @@ bool ConnectConnectionPool::init(ConnectConnection* c)
logDebug("h %d s %s %d auth req %ld", logDebug("h %d s %s %d auth req %ld",
mHandler->id(), c->peer(), c->fd(), req->id()); mHandler->id(), c->peer(), c->fd(), req->id());
} }
auto sp = mHandler->proxy()->serverPool();
if (sp->type() == ServerPool::Cluster) { if (sp->type() == ServerPool::Cluster) {
RequestPtr req = RequestAlloc::create(Request::Readonly); RequestPtr req = RequestAlloc::create(Request::Readonly);
mHandler->handleRequest(req, c); mHandler->handleRequest(req, c);

View File

@ -27,7 +27,7 @@ BufferPtr Connection::getBuffer(Handler* h, bool allowNew)
} }
} }
if (!mBuf || mBuf->full()) { if (!mBuf || mBuf->full()) {
BufferPtr buf = Alloc<Buffer>::create(); BufferPtr buf = BufferAlloc::create();
if (mBuf) { if (mBuf) {
mBuf->concat(buf); mBuf->concat(buf);
} }

View File

@ -25,7 +25,8 @@ public:
enum StatusEnum enum StatusEnum
{ {
ParseError = Socket::CustomStatus, ParseError = Socket::CustomStatus,
LogicError LogicError,
TimeoutError
}; };
public: public:
Connection(); Connection();

View File

@ -86,6 +86,11 @@ public:
{ {
return node(obj)->next(Idx); return node(obj)->next(Idx);
} }
bool exist(T* obj) const
{
auto n = node(obj);
return n->prev(Idx) != nullptr || n->next(Idx) != nullptr || n == mHead;
}
void push_back(T* obj) void push_back(T* obj)
{ {
N* p = static_cast<N*>(obj); N* p = static_cast<N*>(obj);

15
src/Enums.cpp Normal file
View File

@ -0,0 +1,15 @@
/*
* predixy - A high performance and full features proxy for redis.
* Copyright (C) 2017 Joyield, Inc. <joyield.com@gmail.com>
* All rights reserved.
*/
#include "Enums.h"
const ServerPoolRefreshMethod::TypeName
ServerPoolRefreshMethod::sPairs[3] = {
{ServerPoolRefreshMethod::None, "none"},
{ServerPoolRefreshMethod::Fixed, "fixed"},
{ServerPoolRefreshMethod::Sentinel, "sentinel"},
};

74
src/Enums.h Normal file
View File

@ -0,0 +1,74 @@
/*
* predixy - A high performance and full features proxy for redis.
* Copyright (C) 2017 Joyield, Inc. <joyield.com@gmail.com>
* All rights reserved.
*/
#ifndef _PREDIXY_ENUMS_H_
#define _PREDIXY_ENUMS_H_
#include <string.h>
#include <strings.h>
#include "Exception.h"
template<class T>
class EnumBase
{
public:
DefException(InvalidEnumValue);
struct TypeName
{
int type;
const char* name;
};
public:
EnumBase(int t):
mType(t)
{
}
int value() const
{
return mType;
}
bool operator==(const T& t) const
{
return t.value() == mType;
}
bool operator!=(const T& t) const
{
return t.value() != mType;
}
const char* name() const
{
return T::sPairs[mType].name;
}
static T parse(const char* str)
{
for (auto& i : T::sPairs) {
if (strcasecmp(i.name, str) == 0) {
return T(typename T::Type(i.type));
}
}
Throw(InvalidEnumValue, "invalid enum value:%s", str);
}
protected:
int mType;
};
class ServerPoolRefreshMethod : public EnumBase<ServerPoolRefreshMethod>
{
public:
enum Type
{
None,
Fixed,
Sentinel
};
static const TypeName sPairs[3];
ServerPoolRefreshMethod(Type t = None):
EnumBase<ServerPoolRefreshMethod>(t)
{
}
};
#endif

View File

@ -31,12 +31,9 @@ bool EpollMultiplexor::addSocket(Socket* s, int evts)
event.events |= (evts & ReadEvent) ? EPOLLIN : 0; event.events |= (evts & ReadEvent) ? EPOLLIN : 0;
event.events |= (evts & WriteEvent) ? EPOLLOUT : 0; event.events |= (evts & WriteEvent) ? EPOLLOUT : 0;
event.events |= EPOLLET; event.events |= EPOLLET;
//event.events |= EPOLLONESHOT;
event.data.ptr = s; event.data.ptr = s;
s->setEvent(evts);
int ret = epoll_ctl(mFd, EPOLL_CTL_ADD, s->fd(), &event); int ret = epoll_ctl(mFd, EPOLL_CTL_ADD, s->fd(), &event);
if (ret == 0) {
s->setEvent(evts);
}
return ret == 0; return ret == 0;
} }
@ -61,7 +58,6 @@ bool EpollMultiplexor::addEvent(Socket* s, int evts)
} }
if ((s->getEvent() | evts) != s->getEvent()) { if ((s->getEvent() | evts) != s->getEvent()) {
event.events |= EPOLLET; event.events |= EPOLLET;
//event.events |= EPOLLONESHOT;
int ret = epoll_ctl(mFd, EPOLL_CTL_MOD, s->fd(), &event); int ret = epoll_ctl(mFd, EPOLL_CTL_MOD, s->fd(), &event);
if (ret == 0) { if (ret == 0) {
s->setEvent(s->getEvent() | evts); s->setEvent(s->getEvent() | evts);

View File

@ -62,6 +62,13 @@ void Handler::run()
} }
refreshServerPool(); refreshServerPool();
checkConnectionPool(); checkConnectionPool();
timeout = mProxy->serverPool()->serverTimeout();
if (timeout > 0) {
int num = checkServerTimeout(timeout);
if (num > 0) {
postEvent();
}
}
if (mStatsVer < mProxy->statsVer()) { if (mStatsVer < mProxy->statsVer()) {
resetStats(); resetStats();
} }
@ -103,6 +110,29 @@ void Handler::checkConnectionPool()
} }
} }
int Handler::checkServerTimeout(long timeout)
{
int num = 0;
auto now = Util::elapsedUSec();
auto n = mWaitConnectConns.front();
while (n) {
auto s = n;
n = mWaitConnectConns.next(n);
if (auto req = s->frontRequest()) {
long elapsed = now - req->createTime();
if (elapsed >= timeout) {
s->setStatus(Connection::TimeoutError);
addPostEvent(s, Multiplexor::ErrorEvent);
mWaitConnectConns.remove(s);
++num;
}
} else {
mWaitConnectConns.remove(s);
}
}
return num;
}
void Handler::handleEvent(Socket* s, int evts) void Handler::handleEvent(Socket* s, int evts)
{ {
FuncCallTimer(); FuncCallTimer();
@ -185,7 +215,6 @@ void Handler::postAcceptConnectionEvent()
auto cp = mConnPool[s->server()->id()]; auto cp = mConnPool[s->server()->id()];
s->setStatus(Connection::LogicError); s->setStatus(Connection::LogicError);
addPostEvent(s, Multiplexor::ErrorEvent); addPostEvent(s, Multiplexor::ErrorEvent);
cp->putPrivateConnection(s);
c->detachConnectConnection(); c->detachConnectConnection();
s->detachAcceptConnection(); s->detachAcceptConnection();
} }
@ -216,6 +245,10 @@ void Handler::postConnectConnectionEvent()
} }
if (!ret) { if (!ret) {
s->setStatus(Multiplexor::ErrorEvent); s->setStatus(Multiplexor::ErrorEvent);
} else {
if (s->isShared() && !mWaitConnectConns.exist(s)) {
mWaitConnectConns.push_back(s);
}
} }
} }
} }
@ -242,6 +275,9 @@ void Handler::postConnectConnectionEvent()
s->status(), s->statusStr()); s->status(), s->statusStr());
mEventLoop->delSocket(s); mEventLoop->delSocket(s);
s->close(this); s->close(this);
if (!s->isShared()) {
mConnPool[s->server()->id()]->putPrivateConnection(s);
}
if (c) { if (c) {
addPostEvent(c, Multiplexor::ErrorEvent); addPostEvent(c, Multiplexor::ErrorEvent);
s->detachAcceptConnection(); s->detachAcceptConnection();
@ -281,7 +317,6 @@ void Handler::addAcceptSocket(int fd, sockaddr* addr, socklen_t len)
AcceptConnection* c = nullptr; AcceptConnection* c = nullptr;
try { try {
c = AcceptConnectionAlloc::create(fd, addr, len); c = AcceptConnectionAlloc::create(fd, addr, len);
logNotice("h %d accept c %s %d", id(), c->peer(), fd);
} catch (ExceptionBase& e) { } catch (ExceptionBase& e) {
logWarn("h %d create connection for client %d fail %s", logWarn("h %d create connection for client %d fail %s",
id(), fd, e.what()); id(), fd, e.what());
@ -334,6 +369,8 @@ void Handler::addAcceptSocket(int fd, sockaddr* addr, socklen_t len)
logWarn("h %d destroy c %s %d with add to event loop fail:%s", logWarn("h %d destroy c %s %d with add to event loop fail:%s",
id(), c->peer(), c->fd(), StrError()); id(), c->peer(), c->fd(), StrError());
AcceptConnectionAlloc::destroy(c); AcceptConnectionAlloc::destroy(c);
} else {
logNotice("h %d accept c %s %d assign to h %d", id(), c->peer(), fd, dst->id());
} }
} }
@ -384,6 +421,8 @@ void Handler::handleConnectConnectionEvent(ConnectConnection* s, int evts)
if (s->good() && (evts & Multiplexor::WriteEvent)) { if (s->good() && (evts & Multiplexor::WriteEvent)) {
if (s->isConnecting()) { if (s->isConnecting()) {
s->setConnected(); s->setConnected();
logDebug("h %d s %s %d connected",
id(), s->peer(), s->fd());
} }
addPostEvent(s, Multiplexor::WriteEvent); addPostEvent(s, Multiplexor::WriteEvent);
} }

View File

@ -96,6 +96,7 @@ private:
void refreshServerPool(); void refreshServerPool();
void checkConnectionPool(); void checkConnectionPool();
int checkClientTimeout(long timeout); int checkClientTimeout(long timeout);
int checkServerTimeout(long timeout);
void innerResponse(ConnectConnection* c, Request* req, Response* res); void innerResponse(ConnectConnection* c, Request* req, Response* res);
void infoRequest(Request* req, const String& key); void infoRequest(Request* req, const String& key);
void infoLatencyRequest(Request* req); void infoLatencyRequest(Request* req);

View File

@ -67,6 +67,5 @@ int KqueueMultiplexor::wait(long usec, T* handler)
typedef KqueueMultiplexor Multiplexor; typedef KqueueMultiplexor Multiplexor;
#define _MULTIPLEXOR_ASYNC_ASSIGN_
#endif #endif

View File

@ -98,7 +98,7 @@ public:
Buffer* output(Buffer* buf) const; Buffer* output(Buffer* buf) const;
private: private:
String mName; String mName;
const std::bitset<Command::Sentinel>* mCmds; const std::bitset<Command::AvailableCommands>* mCmds;
std::vector<TimeSpan> mTimeSpan; std::vector<TimeSpan> mTimeSpan;
TimeSpan mLast; TimeSpan mLast;
}; };

View File

@ -72,6 +72,7 @@ objs = \
Buffer.o \ Buffer.o \
Command.o \ Command.o \
Distribution.o \ Distribution.o \
Enums.o \
Reply.o \ Reply.o \
ConfParser.o \ ConfParser.o \
Conf.o \ Conf.o \
@ -87,7 +88,7 @@ objs = \
ServerPool.o \ ServerPool.o \
ClusterNodesParser.o \ ClusterNodesParser.o \
ClusterServerPool.o \ ClusterServerPool.o \
SentinelServerPool.o \ StandaloneServerPool.o \
ConnectConnectionPool.o \ ConnectConnectionPool.o \
Handler.o \ Handler.o \
Proxy.o \ Proxy.o \

View File

@ -26,10 +26,10 @@ static bool Stop = false;
static void abortHandler(int sig) static void abortHandler(int sig)
{ {
Abort = true; if (!Abort) {
if (sig == SIGABRT) { traceInfo(sig);
traceInfo();
} }
Abort = true;
if (!Running) { if (!Running) {
abort(); abort();
} }
@ -65,6 +65,7 @@ Proxy::~Proxy()
bool Proxy::init(int argc, char* argv[]) bool Proxy::init(int argc, char* argv[])
{ {
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
signal(SIGFPE, abortHandler); signal(SIGFPE, abortHandler);
signal(SIGILL, abortHandler); signal(SIGILL, abortHandler);
@ -72,7 +73,6 @@ bool Proxy::init(int argc, char* argv[])
signal(SIGABRT, abortHandler); signal(SIGABRT, abortHandler);
signal(SIGBUS, abortHandler); signal(SIGBUS, abortHandler);
signal(SIGQUIT, abortHandler); signal(SIGQUIT, abortHandler);
signal(SIGHUP, abortHandler);
signal(SIGINT, stopHandler); signal(SIGINT, stopHandler);
signal(SIGTERM, stopHandler); signal(SIGTERM, stopHandler);
@ -118,10 +118,10 @@ bool Proxy::init(int argc, char* argv[])
mServPool = p; mServPool = p;
} }
break; break;
case ServerPool::Sentinel: case ServerPool::Standalone:
{ {
SentinelServerPool* p = new SentinelServerPool(this); StandaloneServerPool* p = new StandaloneServerPool(this);
p->init(mConf->sentinelServerPool()); p->init(mConf->standaloneServerPool());
mServPool = p; mServPool = p;
} }
break; break;

View File

@ -13,7 +13,7 @@
#include "DC.h" #include "DC.h"
#include "ServerPool.h" #include "ServerPool.h"
#include "ClusterServerPool.h" #include "ClusterServerPool.h"
#include "SentinelServerPool.h" #include "StandaloneServerPool.h"
#include "LatencyMonitor.h" #include "LatencyMonitor.h"
class Proxy class Proxy
@ -51,15 +51,15 @@ public:
} }
bool isSplitMultiKey() const bool isSplitMultiKey() const
{ {
return mConf->sentinelServerPool().groups.size() != 1; return mConf->standaloneServerPool().groups.size() != 1;
} }
bool supportTransaction() const bool supportTransaction() const
{ {
return mConf->sentinelServerPool().groups.size() == 1; return mConf->standaloneServerPool().groups.size() == 1;
} }
bool supportSubscribe() const bool supportSubscribe() const
{ {
return mConf->sentinelServerPool().groups.size() == 1 || return mConf->standaloneServerPool().groups.size() == 1 ||
mConf->clusterServerPool().servers.size() > 0; mConf->clusterServerPool().servers.size() > 0;
} }
const std::vector<Handler*>& handlers() const const std::vector<Handler*>& handlers() const

View File

@ -98,10 +98,12 @@ Request::Request(GenericCode code):
Request::~Request() Request::~Request()
{ {
clear();
} }
void Request::clear() void Request::clear()
{ {
mConn = nullptr;
mRes = nullptr; mRes = nullptr;
mHead.clear(); mHead.clear();
mReq.clear(); mReq.clear();
@ -155,13 +157,14 @@ void Request::set(const RequestParser& p, Request* leader)
} }
mHead = r->mReq; mHead = r->mReq;
mReq = p.request(); mReq = p.request();
mLeader = leader;
if (leader == this) { if (leader == this) {
if (mType == Command::Mset || mType == Command::Msetnx) { if (mType == Command::Mset || mType == Command::Msetnx) {
mFollowers = (p.argNum() - 1) >> 1; mFollowers = (p.argNum() - 1) >> 1;
} else { } else {
mFollowers = p.argNum() - 1; mFollowers = p.argNum() - 1;
} }
} else {
mLeader = leader;
} }
} else { } else {
mReq = p.request(); mReq = p.request();
@ -287,10 +290,11 @@ void Request::adjustScanCursor(long cursor)
void Request::follow(Request* leader) void Request::follow(Request* leader)
{ {
++mFollowers; leader->mFollowers += 1;
if (leader == this) { if (leader == this) {
return; return;
} }
mConn = leader->mConn;
mType = leader->mType; mType = leader->mType;
mHead = leader->mHead; mHead = leader->mHead;
mReq = leader->mReq; mReq = leader->mReq;
@ -338,49 +342,49 @@ int Request::fill(IOVec* vecs, int len)
void Request::setResponse(Response* res) void Request::setResponse(Response* res)
{ {
mDone = true; mDone = true;
if (mLeader) { if (Request* ld = leader()) {
mLeader->mFollowersDone += 1; ld->mFollowersDone += 1;
switch (mType) { switch (mType) {
case Command::Mget: case Command::Mget:
mRes = res; mRes = res;
break; break;
case Command::Mset: case Command::Mset:
if (Response* leaderRes = mLeader->getResponse()) { if (Response* leaderRes = ld->getResponse()) {
if (res->isError() && !leaderRes->isError()) { if (res->isError() && !leaderRes->isError()) {
mLeader->mRes = res; ld->mRes = res;
} }
} else { } else {
mLeader->mRes = res; ld->mRes = res;
} }
break; break;
case Command::Msetnx: case Command::Msetnx:
if (Response* leaderRes = mLeader->getResponse()) { if (Response* leaderRes = ld->getResponse()) {
if (!leaderRes->isError() && if (!leaderRes->isError() &&
(res->isError() || res->integer() == 0)) { (res->isError() || res->integer() == 0)) {
mLeader->mRes = res; ld->mRes = res;
} }
} else { } else {
mLeader->mRes = res; ld->mRes = res;
} }
break; break;
case Command::Touch: case Command::Touch:
case Command::Exists: case Command::Exists:
case Command::Del: case Command::Del:
case Command::Unlink: case Command::Unlink:
if (!mLeader->mRes) { if (!ld->mRes) {
mLeader->mRes = res; ld->mRes = res;
} }
if (mLeader->isDone()) { if (ld->isDone()) {
mLeader->mRes->set(mLeader->mRes->integer()); ld->mRes->set(ld->mRes->integer());
} }
break; break;
case Command::ScriptLoad: case Command::ScriptLoad:
if (Response* leaderRes = mLeader->getResponse()) { if (Response* leaderRes = ld->getResponse()) {
if (leaderRes->isString() && !res->isString()) { if (leaderRes->isString() && !res->isString()) {
mLeader->mRes = res; ld->mRes = res;
} }
} else { } else {
mLeader->mRes = res; ld->mRes = res;
} }
break; break;
default: default:
@ -395,7 +399,7 @@ void Request::setResponse(Response* res)
bool Request::isDone() const bool Request::isDone() const
{ {
if (mLeader == this) { if (isLeader()) {
switch (mType) { switch (mType) {
case Command::Mget: case Command::Mget:
case Command::Psubscribe: case Command::Psubscribe:

View File

@ -25,6 +25,7 @@ class Request :
public: public:
typedef Request Value; typedef Request Value;
typedef ListNode<Request, SharePtr<Request>, RequestListIndex::Size> ListNodeType; typedef ListNode<Request, SharePtr<Request>, RequestListIndex::Size> ListNodeType;
typedef Alloc<Request, Const::RequestAllocCacheSize> Allocator;
static const int MaxRedirectLimit = 3; static const int MaxRedirectLimit = 3;
enum GenericCode enum GenericCode
{ {
@ -119,11 +120,11 @@ public:
} }
Request* leader() const Request* leader() const
{ {
return mLeader; return isLeader() ? const_cast<Request*>(this) : (Request*)mLeader;
} }
bool isLeader() const bool isLeader() const
{ {
return mLeader == this; return mFollowers > 0;
} }
bool isDelivered() const bool isDelivered() const
{ {
@ -181,6 +182,6 @@ private:
typedef List<Request, RequestListIndex::Recv> RecvRequestList; typedef List<Request, RequestListIndex::Recv> RecvRequestList;
typedef List<Request, RequestListIndex::Send> SendRequestList; typedef List<Request, RequestListIndex::Send> SendRequestList;
typedef Alloc<Request, Const::RequestAllocCacheSize> RequestAlloc; typedef Request::Allocator RequestAlloc;
#endif #endif

View File

@ -18,6 +18,7 @@ class Response :
public: public:
typedef Response Value; typedef Response Value;
typedef ListNode<Response, SharePtr<Response>> ListNodeType; typedef ListNode<Response, SharePtr<Response>> ListNodeType;
typedef Alloc<Response, Const::ResponseAllocCacheSize> Allocator;
enum GenericCode enum GenericCode
{ {
Pong, Pong,
@ -137,6 +138,6 @@ private:
}; };
typedef List<Response> ResponseList; typedef List<Response> ResponseList;
typedef Alloc<Response, Const::ResponseAllocCacheSize> ResponseAlloc; typedef Response::Allocator ResponseAlloc;
#endif #endif

View File

@ -162,6 +162,9 @@ Server* ServerGroup::getReadServer(Handler* h, DC* localDC) const
continue; continue;
} }
DC* dc = s->dc(); DC* dc = s->dc();
if (!dc) {
continue;
}
int dcrp = localDC->getReadPriority(dc); int dcrp = localDC->getReadPriority(dc);
if (dcrp <= 0) { if (dcrp <= 0) {
continue; continue;
@ -221,7 +224,7 @@ Server* ServerGroup::getReadServer(Handler* h, DC* localDC) const
dc = sdc[0]; dc = sdc[0];
found = true; found = true;
} }
if (!found) {//dc maybe nullptr even we found if (!found) {
return nullptr; return nullptr;
} }
Server* deadServs[Const::MaxServInGroup]; Server* deadServs[Const::MaxServInGroup];

View File

@ -18,9 +18,11 @@ void ServerPool::init(const ServerPoolConf& conf)
mMasterReadPriority = conf.masterReadPriority; mMasterReadPriority = conf.masterReadPriority;
mStaticSlaveReadPriority = conf.staticSlaveReadPriority; mStaticSlaveReadPriority = conf.staticSlaveReadPriority;
mDynamicSlaveReadPriority = conf.dynamicSlaveReadPriority; mDynamicSlaveReadPriority = conf.dynamicSlaveReadPriority;
mRefreshInterval = conf.refreshInterval * 1000000; mRefreshInterval = conf.refreshInterval;
mServerTimeout = conf.serverTimeout;
mServerFailureLimit = conf.serverFailureLimit; mServerFailureLimit = conf.serverFailureLimit;
mServerRetryTimeout = conf.serverRetryTimeout * 1000000; mServerRetryTimeout = conf.serverRetryTimeout;
mKeepAlive = conf.keepalive;
mDbNum = conf.databases; mDbNum = conf.databases;
} }

View File

@ -20,7 +20,7 @@ public:
{ {
Unknown, Unknown,
Cluster, Cluster,
Sentinel Standalone
}; };
static const int DefaultServerRetryTimeout = 10000000; static const int DefaultServerRetryTimeout = 10000000;
static const int DefaultRefreshInterval = 1000000; static const int DefaultRefreshInterval = 1000000;
@ -56,6 +56,10 @@ public:
{ {
return mRefreshInterval; return mRefreshInterval;
} }
long serverTimeout() const
{
return mServerTimeout;
}
int serverFailureLimit() const int serverFailureLimit() const
{ {
return mServerFailureLimit; return mServerFailureLimit;
@ -64,6 +68,10 @@ public:
{ {
return mServerRetryTimeout; return mServerRetryTimeout;
} }
int keepalive() const
{
return mKeepAlive;
}
int dbNum() const int dbNum() const
{ {
return mDbNum; return mDbNum;
@ -133,8 +141,10 @@ private:
int mStaticSlaveReadPriority; int mStaticSlaveReadPriority;
int mDynamicSlaveReadPriority; int mDynamicSlaveReadPriority;
long mRefreshInterval; long mRefreshInterval;
long mServerTimeout;
int mServerFailureLimit; int mServerFailureLimit;
long mServerRetryTimeout; long mServerRetryTimeout;
int mKeepAlive;
int mDbNum; int mDbNum;
}; };

View File

@ -102,7 +102,7 @@ void Socket::getFirstAddr(const char* addr, int type, int protocol, sockaddr* re
} else { } else {
std::string tmp; std::string tmp;
const char* host = addr; const char* host = addr;
const char* port = strchr(addr, ':'); const char* port = strrchr(addr, ':');
if (port) { if (port) {
tmp.append(addr, port - addr); tmp.append(addr, port - addr);
host = tmp.c_str(); host = tmp.c_str();
@ -155,6 +155,35 @@ bool Socket::setTcpNoDelay(bool val)
return ret == 0; return ret == 0;
} }
bool Socket::setTcpKeepAlive(int interval)
{
int val = 1;
int ret = setsockopt(mFd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
if (ret != 0) {
return false;
}
#ifdef __linux__
val = interval;
ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val));
if (ret != 0) {
return false;
}
val = interval / 3;
ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val));
if (ret != 0) {
return false;
}
val = 3;
ret = setsockopt(mFd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val));
if (ret != 0) {
return false;
}
#else
((void)interval); //Avoid unused var warning for non Linux systems
#endif
return true;
}
int Socket::read(void* buf, int cnt) int Socket::read(void* buf, int cnt)
{ {
FuncCallTimer(); FuncCallTimer();

View File

@ -46,7 +46,7 @@ public:
EventError, EventError,
ExceptError, ExceptError,
CustomStatus CustomStatus = 100
}; };
public: public:
Socket(int fd = -1); Socket(int fd = -1);
@ -59,6 +59,7 @@ public:
void close(); void close();
bool setNonBlock(bool val = true); bool setNonBlock(bool val = true);
bool setTcpNoDelay(bool val = true); bool setTcpNoDelay(bool val = true);
bool setTcpKeepAlive(int interval);
int read(void* buf, int cnt); int read(void* buf, int cnt);
int write(const void* buf, int cnt); int write(const void* buf, int cnt);
int writev(const struct iovec* vecs, int cnt); int writev(const struct iovec* vecs, int cnt);

View File

@ -0,0 +1,482 @@
/*
* predixy - A high performance and full features proxy for redis.
* Copyright (C) 2017 Joyield, Inc. <joyield.com@gmail.com>
* All rights reserved.
*/
#include <algorithm>
#include "Logger.h"
#include "ServerGroup.h"
#include "Handler.h"
#include "StandaloneServerPool.h"
StandaloneServerPool::StandaloneServerPool(Proxy* p):
ServerPoolTmpl(p, Standalone),
mDist(Distribution::Modula)
{
mSentinels.reserve(MaxSentinelNum);
mServPool.reserve(Const::MaxServNum);
mHashTag[0] = mHashTag[1] = '\0';
}
StandaloneServerPool::~StandaloneServerPool()
{
}
void StandaloneServerPool::init(const StandaloneServerPoolConf& conf)
{
ServerPool::init(conf);
mRefreshMethod = conf.refreshMethod;
mDist = conf.dist;
mHash = conf.hash;
mHashTag[0] = conf.hashTag[0];
mHashTag[1] = conf.hashTag[1];
int i = 0;
if (conf.refreshMethod == ServerPoolRefreshMethod::Sentinel) {
mSentinels.resize(conf.sentinels.size());
for (auto& sc : conf.sentinels) {
Server* s = new Server(this, sc.addr, true);
s->setRole(Server::Sentinel);
s->setPassword(sc.password.empty() ? conf.sentinelPassword:sc.password);
mSentinels[i++] = s;
mServs[s->addr()] = s;
}
}
mGroupPool.resize(conf.groups.size());
i = 0;
for (auto& gc : conf.groups) {
ServerGroup* g = new ServerGroup(this, gc.name);
mGroupPool[i++] = g;
auto role = Server::Master;
for (auto& sc : gc.servers) {
Server* s = new Server(this, sc.addr, true);
s->setPassword(sc.password.empty() ? conf.password : sc.password);
mServPool.push_back(s);
mServs[s->addr()] = s;
g->add(s);
s->setGroup(g);
switch (mRefreshMethod.value()) {
case ServerPoolRefreshMethod::Fixed:
s->setOnline(true);
s->setRole(role);
role = Server::Slave;
break;
default:
s->setOnline(false);
break;
}
}
}
}
Server* StandaloneServerPool::getServer(Handler* h, Request* req, const String& key) const
{
FuncCallTimer();
switch (req->type()) {
case Command::SentinelGetMaster:
case Command::SentinelSlaves:
case Command::SentinelSentinels:
if (mSentinels.empty()) {
return nullptr;
} else {
Server* s = randServer(h, mSentinels);
logDebug("sentinel server pool get server %s for sentinel command",
s->addr().data());
return s;
}
break;
case Command::Randomkey:
return randServer(h, mServPool);
default:
break;
}
if (mGroupPool.size() == 1) {
return mGroupPool[0]->getServer(h, req);
} else if (mGroupPool.size() > 1) {
switch (mDist) {
case Distribution::Modula:
{
long idx = mHash.hash(key.data(), key.length(), mHashTag);
idx %= mGroupPool.size();
return mGroupPool[idx]->getServer(h, req);
}
break;
case Distribution::Random:
{
int idx = h->rand() % mGroupPool.size();
return mGroupPool[idx]->getServer(h, req);
}
break;
default:
break;
}
}
return nullptr;
}
void StandaloneServerPool::refreshRequest(Handler* h)
{
logDebug("h %d update standalone server pool", h->id());
switch (mRefreshMethod.value()) {
case ServerPoolRefreshMethod::Sentinel:
for (auto g : mGroupPool) {
RequestPtr req = RequestAlloc::create();
req->setSentinels(g->name());
req->setData(g);
h->handleRequest(req);
req = RequestAlloc::create();
req->setSentinelGetMaster(g->name());
req->setData(g);
h->handleRequest(req);
req = RequestAlloc::create();
req->setSentinelSlaves(g->name());
req->setData(g);
h->handleRequest(req);
}
break;
default:
break;
}
}
void StandaloneServerPool::handleResponse(Handler* h, ConnectConnection* s, Request* req, Response* res)
{
switch (req->type()) {
case Command::SentinelSentinels:
handleSentinels(h, s, req, res);
break;
case Command::SentinelGetMaster:
handleGetMaster(h, s, req, res);
break;
case Command::SentinelSlaves:
handleSlaves(h, s, req, res);
break;
default:
break;
}
}
class AddrParser
{
public:
enum Status {
Ok,
Error,
Done
};
public:
AddrParser(const Segment& res):
mRes(res),
mState(Idle),
mCnt(0),
mArgLen(0),
mIp(false),
mPort(false)
{
mRes.rewind();
}
int count() const {return mCnt;}
Status parse(SString<Const::MaxAddrLen>& addr);
private:
enum State {
Idle,
Count,
CountLF,
Arg,
ArgLen,
ArgLenLF,
SubArrayLen,
Body,
BodyLF,
Invalid,
Finished
};
private:
Segment mRes;
State mState;
int mCnt;
int mArgLen;
bool mIp;
bool mPort;
SString<4> mKey;
};
AddrParser::Status AddrParser::parse(SString<Const::MaxAddrLen>& addr)
{
const char* dat;
int len;
addr.clear();
while (mRes.get(dat, len) && mState != Invalid) {
for (int i = 0; i < len && mState != Invalid; ++i) {
char ch = dat[i];
switch (mState) {
case Idle:
mState = ch == '*' ? Count : Invalid;
break;
case Count:
if (ch >= '0' && ch <= '9') {
mCnt = mCnt * 10 + (ch - '0');
} else if (ch == '\r') {
if (mCnt == 0) {
mState = Finished;
return Done;
} else if (mCnt < 0) {
mState = Invalid;
return Error;
}
mState = CountLF;
} else {
mState = Invalid;
}
break;
case CountLF:
mState = ch == '\n' ? Arg : Invalid;
break;
case Arg:
if (ch == '$') {
mState = ArgLen;
mArgLen = 0;
} else if (ch == '*') {
mState = SubArrayLen;
} else {
mState = Invalid;
}
break;
case ArgLen:
if (ch >= '0' && ch <= '9') {
mArgLen = mArgLen * 10 + (ch - '0');
} else if (ch == '\r') {
mState = ArgLenLF;
} else {
mState = Invalid;
}
break;
case ArgLenLF:
mState = ch == '\n' ? Body : Invalid;
break;
case SubArrayLen:
if (ch == '\n') {
mState = Arg;
}
break;
case Body:
if (ch == '\r') {
mState = BodyLF;
if (mPort) {
mPort = false;
mRes.use(i + 1);
return Ok;
} else if (mIp) {
mIp = false;
addr.append(':');
} else if (mArgLen == 2 && strcmp(mKey.data(), "ip") == 0) {
mIp = true;
} else if (mArgLen == 4 && strcmp(mKey.data(), "port") == 0) {
mPort = true;
}
break;
}
if (mIp || mPort) {
addr.append(ch);
} else if (mArgLen == 2 || mArgLen == 4) {
mKey.append(ch);
}
break;
case BodyLF:
mKey.clear();
mState = ch == '\n' ? Arg : Invalid;
break;
default:
break;
}
}
mRes.use(len);
}
return mState != Invalid ? Done : Error;
}
static bool hasValidPort(const String& addr)
{
const char* p = addr.data() + addr.length();
for (int i = 0; i < addr.length(); ++i) {
if (*(--p) == ':') {
int port = atoi(p + 1);
return port > 0 && port < 65536;
}
}
return false;
}
void StandaloneServerPool::handleSentinels(Handler* h, ConnectConnection* s, Request* req, Response* res)
{
if (!res || !res->isArray()) {
return;
}
AddrParser parser(res->body());
SString<Const::MaxAddrLen> addr;
while (true) {
auto st = parser.parse(addr);
if (st == AddrParser::Ok) {
logDebug("sentinel server pool parse sentinel %s", addr.data());
if (!hasValidPort(addr)) {
logNotice("sentinel server pool parse sentienl %s invalid",
addr.data());
continue;
}
auto it = mServs.find(addr);
Server* serv = it == mServs.end() ? nullptr : it->second;
if (!serv) {
if (mSentinels.size() == mSentinels.capacity()) {
logWarn("too many sentinels %d, will ignore new sentinel %s",
(int)mSentinels.size(), addr.data());
continue;
}
serv = new Server(this, addr, false);
serv->setRole(Server::Sentinel);
serv->setPassword(password());
mSentinels.push_back(serv);
mServs[serv->addr()] = serv;
logNotice("h %d create new sentinel %s",
h->id(), addr.data());
}
serv->setOnline(true);
} else if (st == AddrParser::Done) {
break;
} else {
logError("sentinel server pool parse sentinel sentinels error");
break;
}
}
}
void StandaloneServerPool::handleGetMaster(Handler* h, ConnectConnection* s, Request* req, Response* res)
{
if (!res || !res->isArray()) {
return;
}
ServerGroup* g = (ServerGroup*)req->data();
if (!g) {
return;
}
SegmentStr<Const::MaxAddrLen + 32> str(res->body());
if (!str.complete()) {
return;
}
if (strncmp(str.data(), "*2\r\n$", 5) != 0) {
return;
}
SString<Const::MaxAddrLen> addr;
const char* p = str.data() + 5;
int len = atoi(p);
if (len <= 0) {
return;
}
p = strchr(p, '\r') + 2;
if (!addr.append(p, len)) {
return;
}
if (!addr.append(':')) {
return;
}
p += len + 3;
len = atoi(p);
if (len <= 0) {
return;
}
p = strchr(p, '\r') + 2;
if (!addr.append(p, len)) {
return;
}
logDebug("sentinel server pool group %s get master %s",
g->name().data(), addr.data());
auto it = mServs.find(addr);
Server* serv = it == mServs.end() ? nullptr : it->second;
if (serv) {
serv->setOnline(true);
serv->setRole(Server::Master);
auto old = serv->group();
if (old) {
if (old != g) {
old->remove(serv);
g->add(serv);
serv->setGroup(g);
}
} else {
g->add(serv);
serv->setGroup(g);
}
} else {
if (mServPool.size() == mServPool.capacity()) {
logWarn("too many servers %d, will ignore new master server %s",
(int)mServPool.size(), addr.data());
return;
}
serv = new Server(this, addr, false);
serv->setRole(Server::Master);
serv->setPassword(password());
mServPool.push_back(serv);
g->add(serv);
serv->setGroup(g);
mServs[serv->addr()] = serv;
logNotice("sentinel server pool group %s create master server %s %s",
g->name().data(), addr.data(), serv->dcName().data());
}
}
void StandaloneServerPool::handleSlaves(Handler* h, ConnectConnection* s, Request* req, Response* res)
{
if (!res || !res->isArray()) {
return;
}
ServerGroup* g = (ServerGroup*)req->data();
if (!g) {
return;
}
AddrParser parser(res->body());
SString<Const::MaxAddrLen> addr;
while (true) {
auto st = parser.parse(addr);
if (st == AddrParser::Ok) {
logDebug("sentinel server pool group %s parse slave %s",
g->name().data(), addr.data());
auto it = mServs.find(addr);
Server* serv = it == mServs.end() ? nullptr : it->second;
if (serv) {
serv->setOnline(true);
serv->setRole(Server::Slave);
auto old = serv->group();
if (old) {
if (old != g) {
old->remove(serv);
g->add(serv);
serv->setGroup(g);
}
} else {
g->add(serv);
serv->setGroup(g);
}
} else {
if (mServPool.size() == mServPool.capacity()) {
logWarn("too many servers %d, will ignore new slave server %s",
(int)mServPool.size(), addr.data());
return;
}
serv = new Server(this, addr, false);
serv->setRole(Server::Slave);
serv->setPassword(password());
mServPool.push_back(serv);
g->add(serv);
serv->setGroup(g);
mServs[serv->addr()] = serv;
logNotice("sentinel server pool group %s create slave server %s %s",
g->name().data(), addr.data(), serv->dcName().data());
}
} else if (st == AddrParser::Done) {
break;
} else {
logError("sentinel server pool group %s parse sentinel sentinels error",
g->name().data());
break;
}
}
}

View File

@ -0,0 +1,43 @@
/*
* predixy - A high performance and full features proxy for redis.
* Copyright (C) 2017 Joyield, Inc. <joyield.com@gmail.com>
* All rights reserved.
*/
#ifndef _PREDIXY_STANDALONE_SERVER_POOL_H_
#define _PREDIXY_STANDALONE_SERVER_POOL_H_
#include <map>
#include "Predixy.h"
#include "ServerPool.h"
class StandaloneServerPool : public ServerPoolTmpl<StandaloneServerPool>
{
public:
static const int MaxSentinelNum = 64;
public:
StandaloneServerPool(Proxy* p);
~StandaloneServerPool();
void init(const StandaloneServerPoolConf& conf);
Server* getServer(Handler* h, Request* req, const String& key) const;
Server* iter(int& cursor) const
{
return ServerPool::iter(mServPool, cursor);
}
void refreshRequest(Handler* h);
void handleResponse(Handler* h, ConnectConnection* s, Request* req, Response* res);
private:
void handleSentinels(Handler* h, ConnectConnection* s, Request* req, Response* res);
void handleGetMaster(Handler* h, ConnectConnection* s, Request* req, Response* res);
void handleSlaves(Handler* h, ConnectConnection* s, Request* req, Response* res);
friend class ServerPoolTmpl<StandaloneServerPool>;
private:
ServerPoolRefreshMethod mRefreshMethod;
std::vector<Server*> mSentinels;
std::vector<Server*> mServPool;
Distribution mDist;
Hash mHash;
char mHashTag[2];
};
#endif

View File

@ -391,7 +391,7 @@ Cases = [
[('scard', '{k}2'), 3], [('scard', '{k}2'), 3],
]), ]),
('zset', [ ('zset', [
[('del', 'k', '{k}2', '{k}3', '{k}4'), ], [('del', 'k', '{k}2', '{k}3', '{k}4', '{k}5', '{k}6'), ],
[('zadd', 'k', 10, 'apple'), 1], [('zadd', 'k', 10, 'apple'), 1],
[('zcard', 'k'), 1], [('zcard', 'k'), 1],
[('zincrby', 'k', 2, 'apple'), '12'], [('zincrby', 'k', 2, 'apple'), '12'],
@ -438,6 +438,12 @@ Cases = [
[('zunionstore', '{k}3', 2, 'k', '{k}2'), 4], [('zunionstore', '{k}3', 2, 'k', '{k}2'), 4],
[('zunionstore', '{k}3', 2, 'k', '{k}2', 'AGGREGATE', 'MAX'), 4], [('zunionstore', '{k}3', 2, 'k', '{k}2', 'AGGREGATE', 'MAX'), 4],
[('zunionstore', '{k}3', 2, 'k', '{k}2', 'WEIGHTS', 0.5, 1.2, 'AGGREGATE', 'MAX'), 4], [('zunionstore', '{k}3', 2, 'k', '{k}2', 'WEIGHTS', 0.5, 1.2, 'AGGREGATE', 'MAX'), 4],
[('zadd', '{k}5', 0, 'apple', 9, 'banana', 1, 'pear', 3, 'orange', 4, 'cat'), 5],
[('zpopmax', '{k}5'), ['banana', '9']],
[('zpopmax', '{k}5', 3), ['cat', '4', 'orange', '3', 'pear', '1']],
[('zadd', '{k}6', 0, 'apple', 9, 'banana', 1, 'pear', 3, 'orange', 4, 'cat'), 5],
[('zpopmin', '{k}6'), ['apple', '0']],
[('zpopmin', '{k}6', 3), ['pear', '1', 'orange', '3', 'cat', '4']],
]), ]),
('hyperloglog', [ ('hyperloglog', [
[('del', 'k', '{k}2', '{k}3'), ], [('del', 'k', '{k}2', '{k}3'), ],