提交 2891ac18 authored 作者: blu's avatar blu

feature: report status

上级 42aea38a
...@@ -564,6 +564,69 @@ private: ...@@ -564,6 +564,69 @@ private:
return ret; return ret;
} }
// report example
// data["msg"] = msg;
// data["modId"] = selfId;
// data["type"] = EV_MSG_META_TYPE_REPORT;
// data["catId"] = EV_MSG_REPORT_CATID_AVWRITEPIPE;
// data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
// data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
// data["status"] = "active";
void processReportMsg(string peerId, json &data) {
json modIds;
if(data["modId"].is_array()) {
modIds = data["modId"];
}else if(data["modId"].is_string()) {
modIds.push_back(data["modId"].get<string>());
}
for(const string &modId: modIds){
if(peerData["mod2ipc"].count(modId) == 0) {
spdlog::error("{} received report fron {} modId {} having no related ipc: {}", devSn, peerId, modId, data.dump());
}else{
spdlog::warn("{} received report msg from {}: {}", devSn, peerId, data.dump());
string ipcSn = peerData["mod2ipc"][modId];
string status = data["status"];
string catId = data["catId"];
string severity = data["level"];
if(peerData["ipcStatus"].count(ipcSn) != 0) {
auto &ipcStatus = peerData["ipcStatus"][ipcSn];
// log report
if(ipcStatus.count("lastNReports") == 0){
ipcStatus["lastNReports"] = json();
}
ipcStatus["lastNReports"].push_back(data);
if(ipcStatus["lastNReports"].size() > NUM_MAX_REPORT_HISTORY) {
ipcStatus["lastNReports"].erase(0);
}
// update status
if(status == "active") {
if(ipcStatus["issues"].count(modId) == 0){
ipcStatus["issues"][modId] = json();
}
ipcStatus["issues"][modId][catId] = data;
if(severity == "error") {
ipcStatus["current"][modId] = false;
}
}else{
// recover
if(ipcStatus["issues"].count(modId) != 0 &&
ipcStatus["issues"][modId].count(catId) != 0) {
ipcStatus["issues"][modId].erase(catId);
}
if(catId == EV_MSG_REPORT_CATID_AVMODCONNECTED || catId == EV_MSG_REPORT_CATID_AVWRITEPIPE || (modId.find("evpuller") != string::npos && catId == EV_MSG_REPORT_CATID_AVOPENINPUT)) {
ipcStatus["current"][modId] = true;
}
}
}
}
}
}
int handleMsg(vector<vector<uint8_t> > &body) int handleMsg(vector<vector<uint8_t> > &body)
{ {
int ret = 0; int ret = 0;
...@@ -643,13 +706,15 @@ private: ...@@ -643,13 +706,15 @@ private:
else { else {
// message to evcloudsvc // message to evcloudsvc
// spdlog::info("evcloudsvc {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2])); // spdlog::info("evcloudsvc {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2]));
try{
if(meta == "pong"||meta == "ping") { if(meta == "pong"||meta == "ping") {
// handleConnection(selfId);
if(meta=="ping") { if(meta=="ping") {
auto ips = body2str(body[3]); auto data = json::parse(body2str(body[3]));
spdlog::info("{}, ping msg from {}: {}", devSn, selfId, ips); spdlog::info("{}, ping msg from {}: {}", devSn, selfId, data.dump());
this->peerData["info"]["ips"][selfId] = data["ips"];
this->peerData["info"]["ips"][selfId] = ips; for(auto &r: data["reports"]) {
processReportMsg(selfId, r);
}
if(cachedMsg.find(selfId) != cachedMsg.end()) { if(cachedMsg.find(selfId) != cachedMsg.end()) {
while(!cachedMsg[selfId].empty()) { while(!cachedMsg[selfId].empty()) {
...@@ -671,74 +736,20 @@ private: ...@@ -671,74 +736,20 @@ private:
} }
} }
else { else {
try {
json jmeta = json::parse(meta); json jmeta = json::parse(meta);
if(jmeta["type"] == EV_MSG_META_TYPE_REPORT) { if(jmeta["type"] == EV_MSG_META_TYPE_REPORT) {
// TODO: handle report msg
// report example
// data["msg"] = msg;
// data["modId"] = selfId;
// data["type"] = EV_MSG_META_TYPE_REPORT;
// data["catId"] = EV_MSG_REPORT_CATID_AVWRITEPIPE;
// data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
// data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
// data["status"] = "active";
json data = json::parse(body2str(body[3])); json data = json::parse(body2str(body[3]));
string modId = data["modId"]; processReportMsg(selfId, data);
if(peerData["mod2ipc"].count(modId) == 0) {
spdlog::error("{} received report with modId {} having no related ipc: {}", devSn, modId, data.dump());
}else{
spdlog::warn("{} received report msg from {}: {}", devSn, selfId, body2str(body[3]));
string ipcSn = peerData["mod2ipc"][modId];
string status = data["status"];
string catId = data["catId"];
string severity = data["level"];
if(peerData["ipcStatus"].count(ipcSn) != 0) {
auto &ipcStatus = peerData["ipcStatus"][ipcSn];
// log report
if(ipcStatus.count("lastNReports") == 0){
ipcStatus["lastNReports"] = json();
}
ipcStatus["lastNReports"].push_back(data);
if(ipcStatus["lastNReports"].size() > NUM_MAX_REPORT_HISTORY) {
ipcStatus["lastNReports"].erase(0);
}
// update status
if(status == "active") {
if(ipcStatus["issues"].count(modId) == 0){
ipcStatus["issues"][modId] = json();
}
ipcStatus["issues"][modId][catId] = data;
if(severity == "error") {
ipcStatus["current"][modId] = false;
}
}else{
// recover
if(ipcStatus["issues"].count(modId) != 0 &&
ipcStatus["issues"][modId].count(catId) != 0) {
ipcStatus["issues"][modId].erase(catId);
}
if(catId == EV_MSG_REPORT_CATID_AVWRITEPIPE ||(modId.find("evpuller") != string::npos && catId == EV_MSG_REPORT_CATID_AVOPENINPUT)) {
ipcStatus["current"][modId] = true;
}
}
}
}
} }
else { else {
spdlog::warn("{} received unknown msg {} from {}", devSn, meta, selfId); spdlog::warn("{} received unknown msg {} from {}", devSn, meta, selfId);
} }
} }
}
catch(exception &e) { catch(exception &e) {
spdlog::warn("{} received unknown msg {} from {}", devSn, meta, selfId); spdlog::warn("{} received unknown msg {} from {}", devSn, meta, selfId);
} }
} }
}
return ret; return ret;
} }
......
...@@ -84,7 +84,23 @@ private: ...@@ -84,7 +84,23 @@ private:
int ping(void *s) int ping(void *s)
{ {
int ret = 0; int ret = 0;
vector<vector<uint8_t> >body = {str2body("evcloudsvc:0:0"), str2body(EV_MSG_META_PING), str2body(this->jsonIPs.dump())}; json data;
data["ips"] = this->jsonIPs;
data["reports"] = json();
json modIdsOnline;
json modIdsOffline;
for(auto &[k,v]: this->peerData["status"].items()){
if(v != 0 && v != -1 && v != 1 && v != 2) {
modIdsOnline.push_back(k);
}else{
modIdsOffline.push_back(k);
}
}
// TODO: construct reports body.
// since we use event-style reports, this interval based report is not needed anymore
vector<vector<uint8_t> >body = {str2body("evcloudsvc:0:0"), str2body(EV_MSG_META_PING), str2body(data.dump())};
ret = z_send_multiple(s, body); ret = z_send_multiple(s, body);
if(ret < 0) { if(ret < 0) {
...@@ -198,7 +214,20 @@ private: ...@@ -198,7 +214,20 @@ private:
pid_t pid; pid_t pid;
if(this->peerData["contConn"].count(k) != 0 && this->peerData["contConn"][k] > 4) { // 4 times if(this->peerData["contConn"].count(k) != 0 && this->peerData["contConn"][k] > 4) { // 4 times
spdlog::error("evdaemon {} detected module restarting frequently {}, will slow down the calling thread for 10s", this->devSn, k); json meta;
json data;
string msg = fmt::format("evdaemon {} detects module {} is restarting frequently, slow down for 10s", this->devSn, k);
spdlog::error(msg);
data["msg"] = msg;
data["modId"] = k;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["catId"] = EV_MSG_REPORT_CATID_AVLOOPRESTART;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
data["status"] = "active";
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDealer, "evcloudsvc", meta.dump(), data.dump());
this_thread::sleep_for(chrono::seconds(10)); this_thread::sleep_for(chrono::seconds(10));
} }
ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid); ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid);
...@@ -365,6 +394,39 @@ private: ...@@ -365,6 +394,39 @@ private:
return ret; return ret;
} }
void sendModConnectedMsg(json &modIds){
json meta;
json data;
string msg = fmt::format("evdaemon {} detects modules {} booted up", this->devSn, modIds.dump());
data["msg"] = msg;
data["modId"] = modIds;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["catId"] = EV_MSG_REPORT_CATID_AVMODCONNECTED;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
data["status"] = "recover";
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_INFO;
z_send(pDealer, "evcloudsvc", meta.dump(), data.dump());
}
void sendModOfflineMsg(json &modIds){
json meta;
json data;
string msg = fmt::format("evdaemon {} detects modules {} offline", this->devSn, modIds.dump());
data["msg"] = msg;
data["modId"] = modIds;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["catId"] = EV_MSG_REPORT_CATID_AVMODOFFLINE;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
data["status"] = "active";
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDealer, "evcloudsvc", meta.dump(), data.dump());
}
int handleEdgeMsg(vector<vector<uint8_t> > &body) int handleEdgeMsg(vector<vector<uint8_t> > &body)
{ {
int ret = 0; int ret = 0;
...@@ -384,9 +446,12 @@ private: ...@@ -384,9 +446,12 @@ private:
//auto state = zmq_socket_get_peer_state(pRouter, selfId.data(), selfId.size()); //auto state = zmq_socket_get_peer_state(pRouter, selfId.data(), selfId.size());
//spdlog::info("evdaemon {} peerState: {}", devSn, state); //spdlog::info("evdaemon {} peerState: {}", devSn, state);
json modIds;
modIds.push_back(selfId);
if((peerData["status"].count(selfId) == 0 || peerData["status"][selfId] == 0||this->peerData["status"][selfId] == -1) ) { if((peerData["status"].count(selfId) == 0 || peerData["status"][selfId] == 0||this->peerData["status"][selfId] == -1) ) {
peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
spdlog::info("evdaemon {} peer connected: {}", devSn, selfId); spdlog::info("evdaemon {} peer connected: {}", devSn, selfId);
sendModConnectedMsg(modIds);
if(this->peerData["tsLastConn"].count(selfId) == 0) { if(this->peerData["tsLastConn"].count(selfId) == 0) {
this->peerData["tsLastConn"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); this->peerData["tsLastConn"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
} }
...@@ -395,7 +460,6 @@ private: ...@@ -395,7 +460,6 @@ private:
this->peerData["contConn"][selfId] = 0; this->peerData["contConn"][selfId] = 0;
} }
else { else {
auto delta = this->peerData["contConn"][selfId].get<long>() - chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); auto delta = this->peerData["contConn"][selfId].get<long>() - chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
if(delta < 3) { // within 3s if(delta < 3) { // within 3s
this->peerData["contConn"][selfId] = this->peerData["contConn"][selfId].get<int>() + 1; this->peerData["contConn"][selfId] = this->peerData["contConn"][selfId].get<int>() + 1;
...@@ -419,6 +483,8 @@ private: ...@@ -419,6 +483,8 @@ private:
spdlog::info("evdaemon {} peer {} config sent: {}", devSn,selfId, cfg); spdlog::info("evdaemon {} peer {} config sent: {}", devSn,selfId, cfg);
} }
else { else {
sendModOfflineMsg(modIds);
if(peerData["status"][selfId] == 1 || peerData["status"][selfId] == 2) { if(peerData["status"][selfId] == 1 || peerData["status"][selfId] == 2) {
spdlog::warn("evdaemon {} refuse to start {}: it was asked to be stopped, and is removed from cluster config", this->devSn, selfId); spdlog::warn("evdaemon {} refuse to start {}: it was asked to be stopped, and is removed from cluster config", this->devSn, selfId);
} }
......
...@@ -37,11 +37,14 @@ namespace zmqhelper { ...@@ -37,11 +37,14 @@ namespace zmqhelper {
#define EV_MSG_META_TYPE_REPORT "report" #define EV_MSG_META_TYPE_REPORT "report"
#define EV_MSG_REPORT_CATID_AVMODOFFLINE "AV_MODOFFLINE"
#define EV_MSG_REPORT_CATID_AVMODCONNECTED "AV_MODCONNECTED"
#define EV_MSG_REPORT_CATID_AVOPENINPUT "AV_OPENINPUT" #define EV_MSG_REPORT_CATID_AVOPENINPUT "AV_OPENINPUT"
#define EV_MSG_REPORT_CATID_AVOPENOUTPUT "AV_OPENOUTPUT" #define EV_MSG_REPORT_CATID_AVOPENOUTPUT "AV_OPENOUTPUT"
#define EV_MSG_REPORT_CATID_AVWRITEHEADER "AV_WRITEHEADER" #define EV_MSG_REPORT_CATID_AVWRITEHEADER "AV_WRITEHEADER"
#define EV_MSG_REPORT_CATID_AVEOF "AV_EOF" #define EV_MSG_REPORT_CATID_AVEOF "AV_EOF"
#define EV_MSG_REPORT_CATID_AVWRITEPIPE "AV_WRITEPIPE" #define EV_MSG_REPORT_CATID_AVWRITEPIPE "AV_WRITEPIPE"
#define EV_MSG_REPORT_CATID_AVLOOPRESTART "AV_LOOPRESTART"
#define EV_MSG_META_VALUE_REPORT_LEVEL_INFO "info" #define EV_MSG_META_VALUE_REPORT_LEVEL_INFO "info"
#define EV_MSG_META_VALUE_REPORT_LEVEL_DEBUG "debug" #define EV_MSG_META_VALUE_REPORT_LEVEL_DEBUG "debug"
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论