This commit is contained in:
big-thousand 2022-11-20 20:38:29 +08:00 committed by GitHub
commit 4ac2b59f22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 104 additions and 16 deletions

25
.github/workflows/release.yml vendored Normal file
View File

@ -0,0 +1,25 @@
name: Publish And Deploy Demo # 自动部署的名称
on:
push:
branches:
- main
- master
- dev
jobs:
build-and-deploy:
runs-on: ubuntu-latest
steps:
-
name: Checkout
uses: actions/checkout@master
-
name: Setup Make
uses: actions/setup-make@master
-
name: Build
run: |
make
mkdir output
cp src/predixy output/predixy

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
.idea
*.o
CMakeCache.txt
CMakeFiles/
output/
Makefile
cmake_install.cmake
src/predixy

9
Dockerfile Normal file
View File

@ -0,0 +1,9 @@
FROM alpine
COPY output/conf /app/conf
COPY output/predixy /app/predixy
WORKDIR /app
# RUN cat /configs/default.toml
CMD ./prefixy conf/predixy.conf

View File

@ -1,6 +1,3 @@
.PHONY : default debug clean
make = make make = make
plt = $(shell uname) plt = $(shell uname)
ifeq ($(plt), FreeBSD) ifeq ($(plt), FreeBSD)
@ -17,3 +14,5 @@ debug:
clean: clean:
@$(make) -C src -f Makefile clean @$(make) -C src -f Makefile clean
.PHONY : default debug clean

6
build.sh Executable file
View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
make
mkdir -p output/conf
cp src/predixy output/predixy
cp conf/* output/conf

View File

@ -86,17 +86,42 @@ void ClusterServerPool::refreshRequest(Handler* h)
h->handleRequest(req); h->handleRequest(req);
} }
void ClusterServerPool::removeServer(Server* serv) {
if (nullptr == serv) return;
logNotice("redis cluster delete old server %s %s %s %s %s",
serv->name().data(),
serv->addr().data(),
serv->roleStr(),
serv->masterName().data(),
serv->dcName().data());
ServerGroup* g = getGroup(serv->name());
if (serv->group() && serv->group() != g) {
serv->group()->remove(serv);
}
auto mapServ = mServs.find(serv->addr());
if (mapServ != mServs.end())
{
mServs.erase(mapServ);
}
delete serv;
}
void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request* req, Response* res) void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request* req, Response* res)
{ {
ClusterNodesParser p; ClusterNodesParser p;
p.set(res->body()); p.set(res->body());
for (auto serv : mServPool) { for (auto serv : mServPool) {
serv->setUpdating(true); serv->setUpdating(true);
logNotice("redis old cluster nodes get node %s %s %s %s",
serv->name().data(),
serv->addr().data(),
serv->roleStr(),
serv->masterName().data());
} }
while (true) { while (true) {
ClusterNodesParser::Status st = p.parse(); ClusterNodesParser::Status st = p.parse();
if (st == ClusterNodesParser::Node) { if (st == ClusterNodesParser::Node) {
logDebug("redis cluster update parse node %s %s %s %s", logNotice("redis update cluster nodes get node %s %s %s %s",
p.nodeId().data(), p.nodeId().data(),
p.addr().data(), p.addr().data(),
p.flags().data(), p.flags().data(),
@ -189,9 +214,12 @@ void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request
return; return;
} }
} }
for (auto serv : mServPool) { for (std::vector<Server*>::iterator it = mServPool.begin(); it != mServPool.end();) {
auto serv = *it;
if (serv->updating()) { if (serv->updating()) {
serv->setUpdating(false); serv->setUpdating(false);
it = mServPool.erase(it); //删除不在集群中节点
removeServer(serv);
continue; continue;
} }
if (serv->role() == Server::Master) { if (serv->role() == Server::Master) {
@ -229,6 +257,7 @@ void ClusterServerPool::handleResponse(Handler* h, ConnectConnection* s, Request
g->remove(serv); g->remove(serv);
} }
} }
++it;
} }
} }

View File

@ -25,6 +25,7 @@ public:
{ {
return mServPool; return mServPool;
} }
void removeServer(Server* srv);
private: private:
Server* getServer(Handler* h, Request* req, const String& key) const; Server* getServer(Handler* h, Request* req, const String& key) const;
void refreshRequest(Handler* h); void refreshRequest(Handler* h);

View File

@ -13,15 +13,23 @@
#include "Request.h" #include "Request.h"
#include "ResponseParser.h" #include "ResponseParser.h"
enum ConnectConnectionListIndex
{
PostConn = 0,
PrivateConn,
ConnListSize
};
class ConnectConnection : class ConnectConnection :
public ConnectSocket, public ConnectSocket,
public Connection, public Connection,
public ListNode<ConnectConnection>, public ListNode<ConnectConnection, ConnectConnection*, ConnectConnectionListIndex::ConnListSize>,
public DequeNode<ConnectConnection> public DequeNode<ConnectConnection>
{ {
public: public:
typedef ConnectConnection Value; typedef ConnectConnection Value;
typedef ListNode<ConnectConnection> ListNodeType; typedef ListNode<ConnectConnection, ConnectConnection*, ConnectConnectionListIndex::ConnListSize> ListNodeType;
typedef DequeNode<ConnectConnection> DequeNodeType; typedef DequeNode<ConnectConnection> DequeNodeType;
typedef Alloc<ConnectConnection, Const::ConnectConnectionAllocCacheSize> Allocator; typedef Alloc<ConnectConnection, Const::ConnectConnectionAllocCacheSize> Allocator;
public: public:
@ -96,7 +104,8 @@ private:
bool mReadonly; bool mReadonly;
}; };
typedef List<ConnectConnection> ConnectConnectionList; typedef List<ConnectConnection, ConnectConnectionListIndex::PostConn> PostConnectConnectionList;
typedef List<ConnectConnection, ConnectConnectionListIndex::PrivateConn> PrivateConnectConnectionList;
typedef Deque<ConnectConnection> ConnectConnectionDeque; typedef Deque<ConnectConnection> ConnectConnectionDeque;
typedef ConnectConnection::Allocator ConnectConnectionAlloc; typedef ConnectConnection::Allocator ConnectConnectionAlloc;

View File

@ -71,7 +71,7 @@ private:
Server* mServ; Server* mServ;
int mPendRequests; int mPendRequests;
std::vector<ConnectConnection*> mShareConns; std::vector<ConnectConnection*> mShareConns;
std::vector<ConnectConnectionList> mPrivateConns; std::vector<PrivateConnectConnectionList> mPrivateConns;
ServerStats mStats; ServerStats mStats;
std::vector<LatencyMonitor> mLatencyMonitors; std::vector<LatencyMonitor> mLatencyMonitors;
}; };

View File

@ -120,7 +120,7 @@ private:
std::vector<ConnectConnectionPool*> mConnPool; std::vector<ConnectConnectionPool*> mConnPool;
AcceptConnectionDeque mAcceptConns; AcceptConnectionDeque mAcceptConns;
AcceptConnectionList mPostAcceptConns; AcceptConnectionList mPostAcceptConns;
ConnectConnectionList mPostConnectConns; PostConnectConnectionList mPostConnectConns;
ConnectConnectionDeque mWaitConnectConns; ConnectConnectionDeque mWaitConnectConns;
long mStatsVer; long mStatsVer;
HandlerStats mStats; HandlerStats mStats;

View File

@ -49,16 +49,18 @@ Server* ServerGroup::getServer(Handler* h, Request* req) const
Server* serv = nullptr; Server* serv = nullptr;
if (req->requireWrite()) { if (req->requireWrite()) {
int cnt = mServs.size(); int cnt = mServs.size();
for (int i = 0; i < cnt; ++i) { for (int i = cnt-1; i >= 0; --i) {
Server* s = mServs[i]; Server* s = mServs[i];
if (!s->online()) { if (!s->online()) {
continue; continue;
} }
if (s->role() == Server::Master) { if (s->role() == Server::Master) {
serv = s; serv = s;
if (!s->fail()){
break; break;
} }
} }
}
} else if (auto dataCenter = mPool->proxy()->dataCenter()) { } else if (auto dataCenter = mPool->proxy()->dataCenter()) {
serv = getReadServer(h, dataCenter->localDC()); serv = getReadServer(h, dataCenter->localDC());
} else { } else {