提交 42aea38a authored 作者: blu's avatar blu

feature: report status

上级 f27c7b20
......@@ -16,6 +16,7 @@
"pre":4,
"sn":"V2I0C7KC",
"thresh":30,
"fpsProc": 3,
"entropy": 0.3,
"type":"motion"
}
......
......@@ -28,6 +28,9 @@ using namespace zmqhelper;
//
#define KEY_CONFIG_MAP "configmap"
#define KEY_RELEASE_BUNDLE "release_bundle"
#define NUM_MAX_REPORT_HISTORY 5
class EvCloudSvc {
private:
Server svr;
......@@ -47,6 +50,124 @@ private:
mutex eventQLock;
thread thMsgProcessor;
int buildIpcStatus(json &conf) {
int ret = 0;
string msg;
for(auto &[k,v]: conf.items()){
try {
json &ipcs = v["ipcs"];
int ipcIdx = 0;
json shadowObj;
for(auto &ipc : ipcs) {
json &modules = ipc["modules"];
string ipcSn = ipc["sn"];
for(auto &[mn, ma]: modules.items()) {
for(auto &m:ma) {
string modGid;
string sn = m["sn"];
// ml
if(mn == "evml") {
modGid = sn +":evml" + m["type"].get<string>();
}
else {
modGid = sn + ":" + mn;
}
modGid = modGid + ":" + to_string(m["iid"].get<int>());
//{// start populate shadow object
if(shadowObj.count(ipcSn) == 0) {
shadowObj[ipcSn] = json();
}
auto &shad = shadowObj[ipcSn];
if(shad.count("mgrTerminal") == 0) {
shad["mgrTerminal"] = json();
shad["mgrTerminal"]["sn"] = k;
shad["mgrTerminal"]["online"] = false;
}
if(shad.count("expected") == 0) {
shad["expected"] = json();
}
if(shad.count("current") == 0) {
shad["current"] = json();
}
if(shad.count("issues") == 0) {
shad["issues"] = json();
}
if(shad.count("lastNReports") == 0) {
shad["lastNReports"] = json();
}
bool enabled = m["enabled"] == 0?false:true;
// mod2ipc
peerData["mod2ipc"][modGid] = ipcSn;
if(shad["expected"].count(modGid) != 0) {
//multiple mod with same class
spdlog::error("{} configuration for ipc {} in dev {} having multiple modules {}. ignored that extra module", devSn, ipcSn, k, modGid);
}else{
shad["expected"][modGid] = enabled;
shad["current"][modGid] = false;
}
//}//
}// for mod
} // for modules
ipcIdx++;
}// for ipc
// merge
auto &ipcStatus = peerData["ipcStatus"];
for(auto &[k,v]: shadowObj.items()){
if(ipcStatus.count(k) == 0) {
ipcStatus[k] = v;
}else{
auto &newCurr = v["current"];
auto &oldCurr = ipcStatus[k]["current"];
auto &newExpected = v["expected"];
auto &oldExpected = ipcStatus[k]["expected"];
vector<string> modRemove;
for(auto &[m,n]: oldCurr.items()){
if(newCurr.count(m) == 0) {
modRemove.push_back(m);
}
}
for(auto &k:modRemove) {
oldCurr.erase(k);
oldExpected.erase(k);
peerData["mod2ipc"].erase(k);
// remove issue
if(ipcStatus[k]["issues"].count(k) != 0) {
ipcStatus[k]["issues"].erase(k);
}
}
for(auto &[m,n]: newCurr.items()){
oldExpected[m] = newExpected[m];
if(oldCurr.count(m) == 0|| (oldCurr[m] == true && newExpected[m] == false)) {
oldCurr[m] = newCurr[m];
}
}
}
}
}
catch(exception &e) {
msg = fmt::format("evcloudsvc buildIpcStatus exception: {}", e.what());
spdlog::error(msg);
}
}
return ret;
}
void loadConfigMap()
{
// load configmap
......@@ -81,6 +202,9 @@ private:
}
}
// build ipcStatus
buildIpcStatus(this->peerData["config"]);
// release bundle
json relBund;
ret = LVDB::getValue(relBund, KEY_RELEASE_BUNDLE);
......@@ -124,7 +248,7 @@ private:
}
/// v: edge cluster mgr config
json applyClusterCfg(string k, json& v)
json applyClusterCfg(string k, json& v, json& shadowObj)
{
json ret;
ret["code"] = 0;
......@@ -145,12 +269,13 @@ private:
if(hasError) {
break;
}
if(ipc.count("modules") == 0||ipc["modules"].size() == 0) {
if(ipc.count("modules") == 0||ipc["modules"].size() == 0||ipc.count("sn") == 0||ipc["sn"].size() == 0) {
msg += fmt::format("\tedge cluster {} has no modules for ipc {}", k, ipcIdx);
ret["msg"] = msg;
}
else {
json &modules = ipc["modules"];
string ipcSn = ipc["sn"];
for(auto &[mn, ma]: modules.items()) {
if(hasError) {
break;
......@@ -170,7 +295,7 @@ private:
ret["msg"] = msg;
continue;
}
if(m.count("sn") == 0 || m["sn"].size() == 0 || m.count("iid") == 0 || m["iid"].size() == 0||(mn == "evml" && (m.count("type") == 0||m["type"].size() == 0))) {
if(m.count("sn") == 0 || m["sn"].size() == 0 || m.count("iid") == 0 || m["iid"].size() == 0||m.count("enabled") == 0 || m["enabled"].size() == 0||(mn == "evml" && (m.count("type") == 0||m["type"].size() == 0))) {
msg = fmt::format("evcloudsvc received invalid config at /{}/ipcs/{}/modules/{}/{}. check for fields sn, iid, type(evml): {}", k, ipcIdx, mn, modIdx, v.dump());
spdlog::error(msg);
hasError = true;
......@@ -185,7 +310,8 @@ private:
hasError = true;
break;
}
//ml
// ml
if(mn == "evml") {
modKey = sn +":evml" + m["type"].get<string>();
}
......@@ -203,6 +329,39 @@ private:
this->configMap["sn2mods"][sn][modKey] = 1;
}
//{// start populate shadow object
if(shadowObj.count(ipcSn) == 0) {
shadowObj[ipcSn] = json();
}
auto &shad = shadowObj[ipcSn];
if(shad.count("mgrTerminal") == 0) {
shad["mgrTerminal"] = json();
shad["mgrTerminal"]["sn"] = k;
shad["mgrTerminal"]["online"] = false;
}
if(shad.count("expected") == 0) {
shad["expected"] = json();
}
if(shad.count("current") == 0) {
shad["current"] = json();
}
if(shad.count("issues") == 0) {
shad["issues"] = json();
}
bool enabled = m["enabled"] == 0? false: true;
string modGid = sn + ":" + mn + ":" + to_string(m["iid"].get<int>());
// string modNick = mn + ":" + to_string(m["iid"].get<int>());
if(shad["expected"].count(modGid) != 0) {
//multiple mod with same class
spdlog::error("{} configuration for ipc {} in dev {} having multiple modules {}. ignored that extra module", devSn, ipcSn, k, modGid);
}else{
shad["expected"][modGid] = enabled;
shad["current"][modGid] = false;
}
//}
// modkey -> sn_of_evmgr
this->configMap["mod2mgr"][modKey] = k;
......@@ -307,7 +466,8 @@ private:
}
}
auto r = applyClusterCfg(k,v);
json shadowStat;
auto r = applyClusterCfg(k,v, shadowStat);
if(r["code"] != 0) {
hasError = true;
msg = r["msg"];
......@@ -333,6 +493,8 @@ private:
} // for clusters
buildIpcStatus(newConfig["data"]);
if(!hasError) {
// save configmap
iret = LVDB::setValue(this->configMap, KEY_CONFIG_MAP);
......@@ -511,9 +673,62 @@ private:
else {
try {
json jmeta = json::parse(meta);
if(jmeta["type"] == EV_MSG_META_TYPE_REPORT) {
// TODO: handle report msg
// report example
// data["msg"] = msg;
// data["modId"] = selfId;
// data["type"] = EV_MSG_META_TYPE_REPORT;
// data["catId"] = EV_MSG_REPORT_CATID_AVWRITEPIPE;
// data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
// data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
// data["status"] = "active";
json data = json::parse(body2str(body[3]));
string modId = data["modId"];
if(peerData["mod2ipc"].count(modId) == 0) {
spdlog::error("{} received report with modId {} having no related ipc: {}", devSn, modId, data.dump());
}else{
spdlog::warn("{} received report msg from {}: {}", devSn, selfId, body2str(body[3]));
string ipcSn = peerData["mod2ipc"][modId];
string status = data["status"];
string catId = data["catId"];
string severity = data["level"];
if(peerData["ipcStatus"].count(ipcSn) != 0) {
auto &ipcStatus = peerData["ipcStatus"][ipcSn];
// log report
if(ipcStatus.count("lastNReports") == 0){
ipcStatus["lastNReports"] = json();
}
ipcStatus["lastNReports"].push_back(data);
if(ipcStatus["lastNReports"].size() > NUM_MAX_REPORT_HISTORY) {
ipcStatus["lastNReports"].erase(0);
}
// update status
if(status == "active") {
if(ipcStatus["issues"].count(modId) == 0){
ipcStatus["issues"][modId] = json();
}
ipcStatus["issues"][modId][catId] = data;
if(severity == "error") {
ipcStatus["current"][modId] = false;
}
}else{
// recover
if(ipcStatus["issues"].count(modId) != 0 &&
ipcStatus["issues"][modId].count(catId) != 0) {
ipcStatus["issues"][modId].erase(catId);
}
if(catId == EV_MSG_REPORT_CATID_AVWRITEPIPE ||(modId.find("evpuller") != string::npos && catId == EV_MSG_REPORT_CATID_AVOPENINPUT)) {
ipcStatus["current"][modId] = true;
}
}
}
}
}
else {
spdlog::warn("{} received unknown msg {} from {}", devSn, meta, selfId);
......@@ -979,6 +1194,47 @@ public:
res.set_content(ret.dump(), "text/json");
});
svr.Get("/ipcstatus", [this](const Request& req, Response& res) {
json ret;
ret["code"] = 0;
ret["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
ret["msg"] = "ok";
ret["data"] = json();
string sn = req.get_param_value("sn");
try {
if(!sn.empty()) {
if(this->peerData["ipcStatus"].count(sn) != 0){
json j;
j[sn] = this->peerData["ipcStatus"][sn];
ret["data"] = j;
}else{
ret["msg"] = "ipc not found";
ret["code"] = 1;
}
}
else {
ret["data"] = this->peerData["ipcStatus"];
}
for(auto &[k,v]: ret["data"].items()) {
string tsn = v["mgrTerminal"]["sn"];
if(this->peerData["online"].count(tsn) != 0 && this->peerData["online"][tsn] != 0) {
v["mgrTerminal"]["online"] = true;
}else{
v["mgrTerminal"]["online"] = false;
}
}
}
catch(exception &e) {
ret["code"] = -1;
ret["msg"] = string("evcloudsvc exception: ") + e.what();
spdlog::error(ret["msg"].get<string>());
}
res.set_content(ret.dump(), "text/json");
});
svr.Post("/reset", [](const Request& req, Response& res) {
});
......@@ -1147,7 +1403,8 @@ public:
this->peerData["info"]["ips"] = json();
this->peerData["config"] = json();
this->peerData["online"] = json();
this->peerData["status"] = json();
this->peerData["ipcStatus"] = json();
this->peerData["mod2ipc"] = json();
spdlog::info("evcloudsvc boot");
loadConfigMap();
......
......@@ -21,7 +21,6 @@
},
"issues": {
"MODID": {
"runTerminalOnline": true,
"REPORT_CATID": {
"status": "active"
}
......
......@@ -87,6 +87,7 @@ private:
bool inited = false;
int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT;
string addr;
string portRouter;
try {
//
......@@ -102,7 +103,13 @@ private:
goto error_exit;
}
addr = "tcp://*:" + to_string(config["portRouter"]);
if(config.count("portRouter") != 0) {
portRouter = to_string(config["portRouter"]);
}else if(config.count("port-router") != 0) {
portRouter = to_string(config["port-router"]);
}
addr = "tcp://*:" + portRouter;
// setup zmq
// router service
pRouterCtx = zmq_ctx_new();
......
......@@ -829,11 +829,11 @@ protected:
data["modId"] = selfId;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["catId"] = EV_MSG_REPORT_CATID_AVWRITEPIPE;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
data["status"] = "recover";
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
spdlog::info(msg);
}
......
......@@ -336,11 +336,11 @@ protected:
data["modId"] = selfId;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["catId"] = EV_MSG_REPORT_CATID_AVOPENINPUT;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
data["status"] = "recover";
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
spdlog::info(msg);
}
......
......@@ -413,11 +413,11 @@ private:
data["modId"] = selfId;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["catId"] = EV_MSG_REPORT_CATID_AVOPENOUTPUT;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
data["status"] = "recover";
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
spdlog::info(msg);
}
......@@ -441,11 +441,11 @@ private:
data["modId"] = selfId;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["catId"] = EV_MSG_REPORT_CATID_AVWRITEHEADER;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
data["status"] = "recover";
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
return ret;
......@@ -568,7 +568,7 @@ protected:
data["modId"] = selfId;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["catId"] = EV_MSG_REPORT_CATID_AVWRITEPIPE;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
data["status"] = "recover";
meta["type"] = EV_MSG_META_TYPE_REPORT;
......
......@@ -624,11 +624,11 @@ protected:
data["modId"] = selfId;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["catId"] = EV_MSG_REPORT_CATID_AVWRITEPIPE;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
data["status"] = "recover";
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论