提交 ab66cd7b authored 作者: blu's avatar blu

event notifier rev1: new issues interface and a set of utilities, preparing for rev2

上级 5485494d
......@@ -59,10 +59,10 @@ cmake_force:
SHELL = /bin/sh
# The CMake executable.
CMAKE_COMMAND = /usr/local/Cellar/cmake/3.17.1/bin/cmake
CMAKE_COMMAND = /usr/local/Cellar/cmake/3.17.2/bin/cmake
# The command to remove a file.
RM = /usr/local/Cellar/cmake/3.17.1/bin/cmake -E rm -f
RM = /usr/local/Cellar/cmake/3.17.2/bin/cmake -E rm -f
# Escaping for special characters.
EQUALS = =
......@@ -79,7 +79,7 @@ CMAKE_BINARY_DIR = /Users/blu/work/opencv-projects/opencv-motion-detect
# Special rule for the target rebuild_cache
rebuild_cache:
@$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Running CMake to regenerate build system..."
/usr/local/Cellar/cmake/3.17.1/bin/cmake --regenerate-during-build -S$(CMAKE_SOURCE_DIR) -B$(CMAKE_BINARY_DIR)
/usr/local/Cellar/cmake/3.17.2/bin/cmake --regenerate-during-build -S$(CMAKE_SOURCE_DIR) -B$(CMAKE_BINARY_DIR)
.PHONY : rebuild_cache
# Special rule for the target rebuild_cache
......@@ -90,7 +90,7 @@ rebuild_cache/fast: rebuild_cache
# Special rule for the target edit_cache
edit_cache:
@$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Running CMake cache editor..."
/usr/local/Cellar/cmake/3.17.1/bin/ccmake -S$(CMAKE_SOURCE_DIR) -B$(CMAKE_BINARY_DIR)
/usr/local/Cellar/cmake/3.17.2/bin/ccmake -S$(CMAKE_SOURCE_DIR) -B$(CMAKE_BINARY_DIR)
.PHONY : edit_cache
# Special rule for the target edit_cache
......
......@@ -54,7 +54,7 @@ DINGBOT = DingBot(TOKEN, SECRET)
@app.route('/')
def index():
name = request.args.get("name", "World")
# DINGBOT.send_msg('test', 'this is a test')
DINGBOT.send_msg('test', 'this is a test')
term = Terminal.query.filter(Terminal.sn == 'SNaaaaaa').first()
ret = 'ok'
if term:
......
......@@ -31,6 +31,7 @@ using namespace zmqhelper;
#define NUM_MAX_REPORT_HISTORY 5
class EvCloudSvc {
private:
Server svr;
......@@ -41,6 +42,7 @@ private:
json configMap;
json releaseBundle;
int64_t bootTime = 0;
// peer data
json peerData;
......@@ -49,11 +51,97 @@ private:
queue<string> eventQue;
mutex eventQLock;
thread thMsgProcessor;
recursive_mutex mutIpcStatus;
json allIssues;
json buildIssuesForIpc(string ipcSn, json &v){
json ret;
auto now = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
// check mgr status first
json data = R"(
{
"catId": "AV_MGROFFLINE",
"level": "error",
"modId": "ALL",
"msg": "[autogen]evcloudsvc detects cluster mgr {} offline for ipc {}",
"status": "active",
"time": 1591085518,
"type": "report"
}
)"_json;
data["time"] = now;
bool hasIssue = false;
if(v.count("mgrTerminal") == 0|| v["mgrTerminal"].size() == 0||v["mgrTerminal"].count("online") == 0){
data["catId"] = "AV_NO_CONFIG";
data["msg"] = "no configuratoin";
hasIssue = true;
}else{
if(!v["mgrTerminal"]["online"].get<bool>()){
data["msg"] = fmt::format(data["msg"].get<string>(), v["mgrTerminal"]["sn"].get<string>(), ipcSn);
hasIssue = true;
}
}
if(hasIssue){
if(ret.count(data["catId"].get<string>()) == 0){
ret[data["catId"].get<string>()] = data;
}
}else{
// check runtime issues
if(v.count("issues") != 0 && v["issues"].size() != 0){
for(auto &[m,n]: v["issues"].items()){
auto modId = m;
for(auto &[i,j]: n.items()){
auto catId = i;
auto data = j;
if(catId != EV_MSG_REPORT_CATID_AVMODOFFLINE) {
if(ret.count(catId) == 0){
ret[catId] = data;
}
}
}
}
}
}
return ret;
}
void storeReportMsg(string selfId, string ipcSn, json &ipcStatus, string catId, string msg, bool active){
json data;
data["msg"] = msg;
data["modId"] = "ALL";
data["type"] = EV_MSG_META_TYPE_REPORT;
data["catId"] = catId;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
data["status"] = active?"active":"recover";
lock_guard<recursive_mutex> lg(mutIpcStatus);
if(ipcStatus["issues"].count(selfId) == 0) {
ipcStatus["issues"][selfId] = json();
}
if(active){
ipcStatus["issues"][selfId][catId] = data;
}else{
if(ipcStatus["issues"][selfId].count(catId) != 0){
ipcStatus["issues"][selfId].erase(catId);
}
}
ipcStatus["lastNReports"].push_back(data);
if(ipcStatus["lastNReports"].size() > NUM_MAX_REPORT_HISTORY) {
ipcStatus["lastNReports"].erase(0);
}
// check status
}
int buildIpcStatus(json &conf)
{
int ret = 0;
string msg;
lock_guard<recursive_mutex> lg(mutIpcStatus);
for(auto &[k,v]: conf.items()) {
try {
json &ipcs = v["ipcs"];
......@@ -567,8 +655,8 @@ private:
json data = getConfigForDevice(selfId);
if(data["code"] != 0) {
json resp;
resp["target"] = selfId,
resp["metaType"] = EV_MSG_META_PONG;
resp["target"] = selfId;
resp["metaType"] = EV_MSG_META_PONG;
resp["data"] = data["msg"];
sendEdgeMsg(resp);
}
......@@ -577,6 +665,7 @@ private:
}
// update ipcStatus
lock_guard<recursive_mutex> lg(mutIpcStatus);
if(peerData["mgr2ipc"].count(selfId) != 0) {
for(auto &[k,v]: peerData["mgr2ipc"][selfId].items()) {
if(peerData["ipcStatus"].count(k) == 0) {
......@@ -589,6 +678,8 @@ private:
}
spdlog::info("update status to online = true");
ipcStatus["mgrTerminal"]["online"] = true;
string msg = fmt::format("evcloudsvc detects cluster mgr {} offline of ipc {}", selfId, k);
storeReportMsg(selfId, k, ipcStatus, EV_MSG_REPORT_CATID_AVMGROFFLINE, msg, false);
}
}
}
......@@ -597,6 +688,7 @@ private:
peerData["online"][selfId] = 0;
spdlog::warn("{} peer disconnected: {}", devSn, selfId);
if(peerData["mgr2ipc"].count(selfId) != 0) {
lock_guard<recursive_mutex> lg(mutIpcStatus);
for(auto &[k,v]: peerData["mgr2ipc"][selfId].items()) {
if(peerData["ipcStatus"].count(k) == 0) {
spdlog::error("{} no ipcStatus config for camera {}", devSn, k);
......@@ -607,24 +699,26 @@ private:
for(auto &[m,n]:ipcStatus["current"].items()) {
n = false;
}
json data;
string msg = fmt::format("evcloudsvc detects cluster mgr {} offline of ipc {}", selfId, k);
data["msg"] = msg;
data["modId"] = "ALL";
data["type"] = EV_MSG_META_TYPE_REPORT;
data["catId"] = EV_MSG_REPORT_CATID_AVMGROFFLINE;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
data["status"] = "active";
if(ipcStatus["issues"].count(selfId) == 0) {
ipcStatus["issues"][selfId] = json();
}
ipcStatus["issues"][selfId][EV_MSG_REPORT_CATID_AVMGROFFLINE] = data;
ipcStatus["lastNReports"].push_back(data);
if(ipcStatus["lastNReports"].size() > NUM_MAX_REPORT_HISTORY) {
ipcStatus["lastNReports"].erase(0);
}
storeReportMsg(selfId, k, ipcStatus, EV_MSG_REPORT_CATID_AVMGROFFLINE, msg, true);
// json data;
// string msg = fmt::format("evcloudsvc detects cluster mgr {} offline of ipc {}", selfId, k);
// data["msg"] = msg;
// data["modId"] = "ALL";
// data["type"] = EV_MSG_META_TYPE_REPORT;
// data["catId"] = EV_MSG_REPORT_CATID_AVMGROFFLINE;
// data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
// data["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
// data["status"] = "active";
// if(ipcStatus["issues"].count(selfId) == 0) {
// ipcStatus["issues"][selfId] = json();
// }
// ipcStatus["issues"][selfId][EV_MSG_REPORT_CATID_AVMGROFFLINE] = data;
// ipcStatus["lastNReports"].push_back(data);
// if(ipcStatus["lastNReports"].size() > NUM_MAX_REPORT_HISTORY) {
// ipcStatus["lastNReports"].erase(0);
// }
}
}
}
......@@ -654,8 +748,9 @@ private:
modIds.push_back(data["modId"].get<string>());
}
lock_guard<recursive_mutex> lg(mutIpcStatus);
for(const string &modId: modIds) {
if(peerData["mod2ipc"].count(modId) == 0) {
if(peerData["mod2ipc"].count(modId) == 0||peerData["mod2ipc"][modId].size() == 0) {
spdlog::error("{} received report from {} modId {} having no related ipc: {}", devSn, peerId, modId, data.dump());
}
else {
......@@ -687,8 +782,9 @@ private:
}
// update status
ipcStatus["current"][modId] = true;
if(status == "active") {
if(severity == "error") {
if(severity == "error" || severity == "fatal") {
ipcStatus["current"][modId] = false;
}
......@@ -709,10 +805,28 @@ private:
}
}
if(catId == EV_MSG_REPORT_CATID_AVFAILEDUPLOAD ||catId == EV_MSG_REPORT_CATID_AVMODOFFLINE || catId == EV_MSG_REPORT_CATID_AVWRITEPIPE || (modId.find("evpuller") != string::npos && catId == EV_MSG_REPORT_CATID_AVOPENINPUT)) {
ipcStatus["current"][modId] = true;
// if(catId == EV_MSG_REPORT_CATID_AVFAILEDUPLOAD ||catId == EV_MSG_REPORT_CATID_AVMODOFFLINE || catId == EV_MSG_REPORT_CATID_AVWRITEPIPE || (modId.find("evpuller") != string::npos && catId == EV_MSG_REPORT_CATID_AVOPENINPUT)) {
// ipcStatus["current"][modId] = true;
// }
}
auto issues = buildIssuesForIpc(ipcSn, ipcStatus);
if(allIssues.count(ipcSn) == 0){
allIssues[ipcSn] = json();
}
bool report = false;
if(issues.size() > 0){
for(auto &[kk,vv]: issues.items()){
if(allIssues[ipcSn].count(kk) == 0) {
report = true;
break;
}
}
}
if(report){
/// TODO:
}
}
else {
spdlog::error("{} can't find ipc for report mod {}", devSn, modId);
......@@ -807,7 +921,7 @@ private:
auto data = json::parse(body2str(body[3]));
spdlog::info("{}, ping msg from {}: {}", devSn, selfId, data.dump());
this->peerData["info"]["ips"][selfId] = data["ips"];
if(data["reports"].size() == 0) {
if(this->peerData["config"].count(selfId)== 0) {
if(this->peerData["info"]["nocfg"].count(selfId) == 0) {
this->peerData["info"]["nocfg"][selfId] = json();
}
......@@ -1324,6 +1438,7 @@ public:
string sn = req.get_param_value("sn");
try {
if(!sn.empty() && sn != "all") {
lock_guard<recursive_mutex> lg(mutIpcStatus);
if(this->peerData["ipcStatus"].count(sn) != 0) {
json j;
j[sn] = this->peerData["ipcStatus"][sn];
......@@ -1345,6 +1460,7 @@ public:
ret["data"]["detail"] = detail;
// get a copy to build summary
lock_guard<recursive_mutex> lg(mutIpcStatus);
json ipcsData = this->peerData["ipcStatus"];
for(auto &[k,v]: ipcsData.items()) {
json diff = json::diff(v["expected"], v["current"]);
......@@ -1374,6 +1490,7 @@ public:
json summary;
json stats;
vector<string> tags = {"E0C0", "E0C1", "E1C0", "E1C1"};
lock_guard<recursive_mutex> lg(mutIpcStatus);
for(auto &[k,v]: ipcsData.items()) {
json diff = json::diff(v["expected"], v["current"]);
if(diff.size() != 0) {
......@@ -1460,6 +1577,7 @@ public:
auto end = body["end"];
auto type = body["type"].get<string>();
// find slicer id by camera sn
lock_guard<recursive_mutex> lg(mutIpcStatus);
if(this->peerData["ipcStatus"].count(sn) != 0) {
auto dsn = this->peerData["ipcStatus"][sn]["mgrTerminal"]["sn"].get<string>();
if(this->peerData["ipcStatus"][sn]["mgrTerminal"]["online"].get<bool>()){
......@@ -1643,6 +1761,22 @@ public:
res.set_content(ret.dump(), "text/json");
});
//ipcStatus[k]["issues"][modId][catId]
svr.Get("/issues", [this](const Request& req, Response& res) {
lock_guard<recursive_mutex> lg(mutIpcStatus);
json &ipcStatus = peerData["ipcStatus"];
json ret;
for(auto &[k,v]: ipcStatus.items()){
auto ipcSn = k;
//
auto r = buildIssuesForIpc(k, v);
if(r.size() > 0){
ret[ipcSn] = r;
}
}
res.set_content(ret.dump(), "text/json");
});
svr.listen("0.0.0.0", stoi(httpPort));
}
......@@ -1691,6 +1825,7 @@ public:
}
});
thMsgProcessor.detach();
bootTime = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
spdlog::info("evdaemon {} edge message processor had setup {}", devSn, addr);
......
......@@ -13,7 +13,7 @@ if [ "$1" == "start" ]
then
cd /opt/data/repos/evsuits/opencv-yolo/web;
/opt/apps/anaconda3/bin/celery multi start 4 -E -A web.worker -l info -n %n.%h --autoscale=4,1 --pidfile=%n.pid
/opt/apps/anaconda3/bin/flower -A web.worker --loglevel=info &disown
/opt/apps/anaconda3/bin/flower -A web.worker --persistent=True --db=flowerdb/db --loglevel=info &disown
/opt/apps/anaconda3/bin/python web.py
else
/opt/apps/anaconda3/bin/celery multi stop 4
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论