提交 66c132a1 authored 作者: blu's avatar blu

refactor of delta config

上级 1951e662
......@@ -115,7 +115,7 @@ private:
ret["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
spdlog::info("evcloudsvc config:{}",newConfig.dump());
string msg;
bool bInvalidCfg = false;
bool hasError = false;
try {
// json deltaCfg = json();
if(newConfig.count("data") == 0|| newConfig["data"].size() == 0) {
......@@ -128,9 +128,11 @@ private:
json &data = newConfig["data"];
// for edge clusters, those are mgrs
for(auto &[k, v]: data.items()) {
if(hasError) {
break;
}
if(this->configMap.count(k) != 0 && this->peerData["config"].count(k) != 0) {
spdlog::warn("evcloudsvc info: cluster config for {} exists, will do comparison", k);
}
if(this->configMap.count(k) ^ this->peerData["config"].count(k)) {
spdlog::error("evcloudsvc inconsistent configuration for cluster {}", k);
......@@ -152,19 +154,50 @@ private:
json &ipcs = v["ipcs"];
int ipcIdx = 0;
for(auto &ipc : ipcs) {
if(hasError) {
break;
}
if(ipc.count("modules") == 0||ipc["modules"].size() == 0) {
spdlog::warn("evcloudsvc edge cluster {} has no modules for ipc {}", k, ipcIdx);
}
else {
json &modules = ipc["modules"];
for(auto &[mn, ma]: modules.items()) {
if(hasError) {
break;
}
if(ma.size() == 0) {
spdlog::warn("evcloudsvc /{}/ipcs/{}/modules/{} empty", k, ipcIdx, mn);
continue;
}
int modIdx = 0;
for(auto &m:ma) {
if(m.count("sn") != 0 && m["sn"].size() != 0) {
if(hasError) {
break;
}
if(m.size() == 0) {
spdlog::warn("evcloudsvc /{}/ipcs/{}/modules/{}/{} empty", k, ipcIdx, mn, modIdx);
continue;
}
if(m.count("sn") == 0 || m["sn"].size() == 0 || m.count("iid") == 0 || m["iid"].size() == 0||(mn == "evml" && (m.count("type") == 0||m["type"].size() == 0))) {
msg = fmt::format("evcloudsvc received invalid config at /{}/ipcs/{}/modules/{}/{}. check for fields sn, iid, type(evml): {}", k, ipcIdx, mn, modIdx, newConfig.dump());
spdlog::error(msg);
break;
}
if (1) {
string msg = "evcloudsvc invalid config: " + data.dump();;
ret["code"] = -1;
ret["msg"] = msg;
spdlog::error(msg);
break;
}
if(2) {
string modKey;
string sn = m["sn"];
//ml
if(mn == "evml" && m.count("type") != 0 && m["type"].size() != 0) {
modKey = sn +":evml:" + m["type"].get<string>();
modKey = sn +":evml" + m["type"].get<string>();
}
else {
modKey = sn + ":" + mn;
......@@ -193,13 +226,6 @@ private:
// modkey -> sn_of_evmgr
this->configMap["mod2mgr"][modKey] = k;
}
else {
string msg = "evcloudsvc invalid config: " + data.dump();;
ret["code"] = -1;
ret["msg"] = msg;
spdlog::error(msg);
break;
}
}// for mod
} // for modules
}
......
......@@ -187,7 +187,7 @@ private:
std::lock_guard<std::mutex> lock(mutSubsystem);
if(subs.size() != 0) {
for(auto &k: subs) {
if((this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0) && this->peerData["config"].count(k) != 0 && this->peerData["enabled"].count(k) != 0 && this->peerData["enabled"][k] != 0){
if((this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0) && this->peerData["config"].count(k) != 0 && this->peerData["enabled"].count(k) != 0 && this->peerData["enabled"][k] != 0) {
pid_t pid;
ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid);
if(0 == ret) {
......@@ -198,11 +198,13 @@ private:
else {
spdlog::info("evdaemon {} failed to create subsystem {}", this->devSn, k);
}
}else{
}
else {
spdlog::warn("evdaemon {} refuse to start subsystem {}, maybe it's disabled", this->devSn, k);
}
}
}else{
}
else {
if(this->bColdStart) {
vector<string> tmp;
json unkown;
......@@ -270,7 +272,8 @@ private:
}
}
this->bColdStart = false;
}else{
}
else {
// calc diff
auto jret = cfgutils::getModulesOperFromConfDiff(this->oldConfig, this->config, this->deltaCfg, this->devSn);
this->deltaCfg = json();
......@@ -289,7 +292,8 @@ private:
spdlog::info("evdaemon {} startSubSystems config diff to module action: {} -> {}", this->devSn, string(k), int(v));
if(v == 0) {
sendCmd2Peer(k, EV_MSG_META_VALUE_CMD_STOP, "0");
}else if(int(v) == 1 || int(v) == 2){
}
else if(int(v) == 1 || int(v) == 2) {
int status = (this->peerData["status"].count(k) == 0) ? -1:this->peerData["status"][k].get<int>();
spdlog::info("{} status {}", k, status);
if(this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0||this->peerData["status"][k] == -1) {
......@@ -304,11 +308,13 @@ private:
else {
spdlog::info("evdaemon {} failed to create subsystem {}", this->devSn, k);
}
}else{
}
else {
// restart
sendCmd2Peer(k, EV_MSG_META_VALUE_CMD_STOP, to_string(v));
}
}else{
}
else {
//
spdlog::warn("evdaemon {} unkown action {} for module {}", this->devSn, int(v), string(k));
}
......@@ -317,15 +323,17 @@ private:
}
return ret;
}
void sendCmd2Peer(string peerId, string cmdVal, string msg) {
void sendCmd2Peer(string peerId, string cmdVal, string msg)
{
json meta;
meta["type"] = EV_MSG_META_TYPE_CMD;
meta["value"] = cmdVal;
int ret = z_send(pRouter, peerId, this->daemonId, meta, msg);
if(ret < 0) {
spdlog::error("evdaemon {} failed to send msg to peer {}: {} - {}", devSn, peerId, meta.dump(), msg);
}else{
}
else {
spdlog::info("evdaemon {} successfully send msg to peer {}: {} - {}", devSn, peerId, meta.dump(), msg);
}
}
......
......@@ -160,7 +160,7 @@ private:
bProcessed = true;
cvMsg.notify_one();
}
}
}
}
catch(exception &e) {
spdlog::error("evmlmotion {} exception to process msg {}: {}", selfId, msg, e.what());
......@@ -773,9 +773,10 @@ public:
}
init();
thCloudMsgHandler = thread([this]{
while(true) {
thCloudMsgHandler = thread([this] {
while(true)
{
auto body = z_recv_multiple(pDaemon,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
......@@ -787,8 +788,9 @@ public:
});
thCloudMsgHandler.detach();
thEdgeMsgHandler = thread([this]{
while(true) {
thEdgeMsgHandler = thread([this] {
while(true)
{
auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
......@@ -796,7 +798,7 @@ public:
}
// full proto msg received.
this->handleEdgeMsg(body);
}
}
});
thEdgeMsgHandler.detach();
......
......@@ -371,11 +371,10 @@ protected:
av_packet_unref(&packet);
}
if(pAVFmtCtxBytes != nullptr)
{
if(pAVFmtCtxBytes != nullptr) {
free(pAVFmtCtxBytes);
}
// TODO:
if(ret < 0 && !bStopSig) {
// reconnect
......@@ -423,8 +422,9 @@ public:
}
init();
thEdgeMsgHandler = thread([this]{
while(true) {
thEdgeMsgHandler = thread([this] {
while(true)
{
auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
......@@ -437,13 +437,14 @@ public:
}
spdlog::info("evpuller {} received edge msg: {}", selfId, msg);
this->handleEdgeMsg(body);
}
}
});
thEdgeMsgHandler.detach();
thCloudMsgHandler = thread([this]{
while(true) {
thCloudMsgHandler = thread([this] {
while(true)
{
auto body = z_recv_multiple(pDaemon,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
......
......@@ -231,7 +231,7 @@ private:
bProcessed = true;
cvMsg.notify_one();
}
}
}
}
catch(exception &e) {
spdlog::error("evpusher {} exception to process msg {}: {}", selfId, msg, e.what());
......@@ -266,7 +266,7 @@ private:
}
unique_lock<mutex> lk(this->mutMsg);
this->cvMsg.wait(lk, [this] {return this->gotFormat;});
return ret;
}
......@@ -488,8 +488,9 @@ public:
init();
thCloudMsgHandler = thread([this]{
while(true) {
thCloudMsgHandler = thread([this] {
while(true)
{
auto body = z_recv_multiple(pDaemon,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
......@@ -501,8 +502,9 @@ public:
});
thCloudMsgHandler.detach();
thEdgeMsgHandler = thread([this]{
while(true) {
thEdgeMsgHandler = thread([this] {
while(true)
{
auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
......@@ -510,7 +512,7 @@ public:
}
// full proto msg received.
this->handleEdgeMsg(body);
}
}
});
thEdgeMsgHandler.detach();
......
......@@ -229,7 +229,7 @@ private:
}
}
this->videoFileServerApi += this->ipcSn;
this->videoFileServerApi += this->ipcSn;
json evpuller = ipc["modules"]["evpuller"][0];
pullerGid = evpuller["sn"].get<string>() + ":evpuller:" + to_string(evpuller["iid"]);
......@@ -582,7 +582,8 @@ protected:
return string(buffer);
}
void debugFilesRing(){
void debugFilesRing()
{
spdlog::info("evslicer {} debug files ring. segHead: {}, isFull: {}, max: {}",this->selfId, this->segHead, this->bSegFull, this->numSlices);
for(int i = 1; i <= numSlices; i++) {
spdlog::info("\tevslicer {} vector[{}] = {}, {}", selfId, i, vTsActive[i], videoFileTs2Name(vTsActive[i]));
......@@ -875,8 +876,9 @@ public:
});
thEdgeMsgHandler.detach();
thCloudMsgHandler = thread([this]{
while(true) {
thCloudMsgHandler = thread([this] {
while(true)
{
auto body = z_recv_multiple(pDaemon,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论