提交 df62ea0a authored 作者: blu's avatar blu

evmlmotion: delta packet ts

上级 5f785554
......@@ -50,7 +50,7 @@ namespace LVDB {
#define LVDB_ERROR_HELD -1
#define LVDB_ERROR_OTHER -2
json * findConfigModule(json &config, string sn, string moduleName, int iid) {
json * findConfigModule(json &config, string sn, string ipcSn, string moduleName, int iid) {
json *ret = NULL;
json &mgr = config;
if(mgr.count("ipcs") == 0) {
......@@ -58,8 +58,9 @@ namespace LVDB {
}else{
json &ipcs = mgr["ipcs"];
for(auto &ipc:ipcs) {
if(ipc.count("modules") == 0) {
break;
if(ipc.count("modules") == 0 || ipc.count("sn") == 0|| ipc["sn"].size() == 0) {
spdlog::error("ipc has no sn/modules: {}", ipc.dump());
continue;
}else{
json &modules = ipc["modules"];
string modname = moduleName.substr(0,4);
......
......@@ -144,12 +144,13 @@ private:
if(hasError) {
break;
}
if(ipc.count("modules") == 0||ipc["modules"].size() == 0) {
msg += fmt::format("\tedge cluster {} has no modules for ipc {}", k, ipcIdx);
if(ipc.count("modules") == 0||ipc["modules"].size() == 0||ipc.count("sn") == 0 || ipc["sn"].size() == 0) {
msg += fmt::format("\tedge cluster {} has no sn/modules field for ipc {}", k, ipcIdx);
ret["msg"] = msg;
}
else {
json &modules = ipc["modules"];
string ipcSn = ipc["sn"].get<string>();
for(auto &[mn, ma]: modules.items()) {
if(hasError) {
break;
......@@ -186,10 +187,10 @@ private:
}
//ml
if(mn == "evml") {
modKey = sn +":evml" + m["type"].get<string>();
modKey = sn + ":" + ipcSn + ":evml" + m["type"].get<string>();
}
else {
modKey = sn + ":" + mn;
modKey = sn + ":" + ipcSn + ":" + mn;
}
// modules
......@@ -839,9 +840,10 @@ public:
ret["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
ret["msg"] = "ok";
string sn = req.get_param_value("sn");
string ipcSn = req.get_param_value("ipcSn");
string module = req.get_param_value("module");
try {
if(!sn.empty() && !module.empty() && module.size()> 4) {
if(!sn.empty() && !module.empty() && module.size()> 4 && !ipcSn.empty()) {
spdlog::info("evcloudsvc get module config with sn {}, module {}", sn, module);
string modname = module.substr(0,4);
string key;
......@@ -856,7 +858,7 @@ public:
modname = module;
}
key = this->configMap["mod2mgr"].at(sn + ":" + modname);
key = this->configMap["mod2mgr"].at(sn + ":" + ipcSn + ":" + modname);
spdlog::debug("key: ", key);
}
......
......@@ -125,6 +125,10 @@ private:
// startup other submodules
json &ipcs = v["ipcs"];
for(auto &ipc : ipcs) {
if(ipc.count("sn == 0")) {
spdlog::error("evdaemon {} ipc {} has no sn field", devSn, ipc.dump());
continue;
}
json &modules = ipc["modules"];
for(auto &[mn, ml] : modules.items()) {
for(auto &m : ml) {
......@@ -133,7 +137,7 @@ private:
}
string peerName;
ret = cfgutils::getPeerId(mn, m, peerId, peerName);
ret = cfgutils::getPeerId(ipc["sn"].get<string>(), mn, m, peerId, peerName);
if(ret != 0) {
continue;
}
......
......@@ -186,11 +186,11 @@ error_exit:
spdlog::warn("evmg {} inproper peer id: {}", devSn, selfId);
return -1;
}
json *mod = LVDB::findConfigModule(config, sp[0], sp[1], stoi(sp[2]));
if(mod == nullptr) {
spdlog::warn("evmgr {} failed to find the connecting/disconnecting module with id {} in config. please check if it was terminated correctly", devSn, selfId);
return -1;
}
// json *mod = LVDB::findConfigModule(config, sp[0], sp[1], stoi(sp[2]));
// if(mod == nullptr) {
// spdlog::warn("evmgr {} failed to find the connecting/disconnecting module with id {} in config. please check if it was terminated correctly", devSn, selfId);
// return -1;
// }
if(peerData["status"].count(selfId) == 0||peerData["status"][selfId] == 0) {
peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
......
......@@ -65,7 +65,7 @@ class EvMLMotion: public TinyThread {
private:
void *pSubCtx = nullptr, *pDealerCtx = nullptr; // for packets relay
void *pSub = nullptr, *pDealer = nullptr, *pDaemonCtx = nullptr, *pDaemon = nullptr;
string urlOut, urlPub, urlRouter, devSn, mgrSn, selfId, pullerGid, slicerGid;
string urlOut, urlPub, urlRouter, devSn, mgrSn, selfId, pullerGid, slicerGid, ipcSn;
int iid;
AVFormatContext *pAVFormatInput = nullptr;
AVCodecContext *pCodecCtx = nullptr;
......@@ -192,25 +192,25 @@ private:
json ipc;
json ipcs = evmgr["ipcs"];
for(auto &j: ipcs) {
json mls = j["modules"]["evml"];
for(auto &p:mls) {
if(p["sn"] == devSn && p["type"] == "motion" && p["iid"] == iid && p["enabled"] != 0) {
evmlmotion = p;
iid = p["iid"];
break;
if(j["sn"] == ipcSn) {
ipc = j;
json mls = j["modules"]["evml"];
for(auto &p:mls) {
if(p["sn"] == devSn && p["type"] == "motion" && p["iid"] == iid && p["enabled"] != 0) {
evmlmotion = p;
iid = p["iid"];
break;
}
}
}
if(evmlmotion.size() != 0) {
ipc = j;
break;
}
}
if(ipc.size()!=0 && evmlmotion.size()!=0) {
found = true;
}
if(!found) {
}else{
spdlog::error("evmlmotion {}: no valid config found. retrying load config...", devSn);
exit(1);
}
......@@ -786,12 +786,13 @@ public:
if(strEnv != nullptr) {
selfId = strEnv;
auto v = strutils::split(selfId, ':');
if(v.size() != 3||v[1] != "evmlmotion") {
if(v.size() != 4||v[2] != "evmlmotion") {
spdlog::error("evmlmotion received invalid gid: {}", selfId);
exit(1);
}
ipcSn = v[1];
devSn = v[0];
iid = stoi(v[2]);
iid = stoi(v[3]);
}
else {
spdlog::error("evmlmotion failed to start. no SN set");
......
......@@ -39,7 +39,7 @@ private:
AVFormatContext *pAVFormatInput = nullptr;
char *pAVFmtCtxBytes = nullptr;
int lenAVFmtCtxBytes = 0;
string urlIn, urlPub, urlDealer, mgrSn, devSn, selfId, ipcPort;
string urlIn, urlPub, urlDealer, mgrSn, devSn, selfId, ipcPort, ipcSn;
int *streamList = nullptr, numStreams = 0, iid;
time_t tsLastBoot, tsUpdateTime;
json config;
......@@ -172,25 +172,24 @@ private:
json ipc;
json ipcs = evmgr["ipcs"];
for(auto &j: ipcs) {
json pullers = j["modules"]["evpuller"];
for(auto &p:pullers) {
if(p["sn"] == devSn && p["enabled"] != 0 && p["iid"] == iid) {
evpuller = p;
break;
if(j["sn"] == ipcSn) {
ipc = j;
json pullers = j["modules"]["evpuller"];
for(auto &p:pullers) {
if(p["sn"] == devSn && p["enabled"] != 0 && p["iid"] == iid) {
evpuller = p;
break;
}
}
}
}
if(evpuller.size() != 0) {
ipc = j;
break;
}
}
if(ipc.size()!=0 && evpuller.size()!=0) {
found = true;
}
if(!found) {
}else{
spdlog::error("evpuller {} no valid config found", devSn);
exit(1);
}
......@@ -198,6 +197,7 @@ private:
mgrSn = evmgr["sn"];
user = ipc["user"];
passwd = ipc["password"];
ipcSn = ipc["sn"];
// default stream port
if(ipc.count("port") == 0) {
......@@ -437,12 +437,13 @@ public:
if(strEnv != nullptr) {
selfId = strEnv;
auto v = strutils::split(selfId, ':');
if(v.size() != 3||v[1] != "evpuller") {
if(v.size() != 4||v[2] != "evpuller") {
spdlog::error("evpuller {} received invalid gid: {}", selfId);
exit(1);
}
devSn = v[0];
iid = stoi(v[2]);
ipcSn = v[1];
iid = stoi(v[3]);
}
else {
spdlog::error("evpuller {} failed to start. no SN set", selfId);
......
......@@ -35,7 +35,7 @@ class EvPusher: public TinyThread {
private:
void *pSubCtx = nullptr, *pDealerCtx = nullptr; // for packets relay
void *pSub = nullptr, *pDealer = nullptr, *pDaemonCtx = nullptr, *pDaemon = nullptr;
string urlOut, urlPub, urlDealer, devSn, pullerGid, mgrSn, selfId;
string urlOut, urlPub, urlDealer, devSn, pullerGid, mgrSn, selfId, ipcSn;
int iid;
bool enablePush = false;
int *streamList = nullptr;
......@@ -60,11 +60,13 @@ private:
json ipc;
json ipcs = evmgr["ipcs"];
for(auto &j: ipcs) {
json pullers = j["modules"]["evpusher"];
for(auto &p:pullers) {
if(p["sn"] == devSn && p["enabled"] != 0 && p["iid"] == iid ) {
evpusher = p;
break;
if(j["sn"] == ipcSn) {
json pullers = j["modules"]["evpusher"];
for(auto &p:pullers) {
if(p["sn"] == devSn && p["enabled"] != 0 && p["iid"] == iid ) {
evpusher = p;
break;
}
}
}
......@@ -78,10 +80,7 @@ private:
spdlog::info("evpusher {} {}, ipc: {}",devSn, iid, ipc.dump());
if(ipc.size()!=0 && evpusher.size()!=0) {
found = true;
}
if(!found) {
}else{
spdlog::error("evpusher {} : no valid config found: {}", selfId, config.dump());
exit(1);
}
......@@ -500,12 +499,13 @@ public:
if(strEnv != nullptr) {
selfId = strEnv;
auto v = strutils::split(selfId, ':');
if(v.size() != 3||v[1] != "evpusher") {
if(v.size() != 4||v[2] != "evpusher") {
spdlog::error("evpusher received invalid gid: {}", selfId);
exit(1);
}
ipcSn = v[1];
devSn = v[0];
iid = stoi(v[2]);
iid = stoi(v[3]);
}
else {
spdlog::error("evpusher failed to start. no SN set");
......
......@@ -235,41 +235,30 @@ private:
json evslicer;
json &evmgr = this->config;
json ipc;
json ipcs = evmgr["ipcs"];
for(auto &j: ipcs) {
json pullers = j["modules"]["evslicer"];
for(auto &p:pullers) {
if(p["sn"] == devSn && p["enabled"] != 0 && p["iid"] == iid) {
evslicer = p;
break;
if(j["sn"] == ipcSn) {
ipc = j;
json pullers = j["modules"]["evslicer"];
for(auto &p:pullers) {
if(p["sn"] == devSn && p["enabled"] != 0 && p["iid"] == iid) {
evslicer = p;
break;
}
}
}
}
if(evslicer.size() != 0) {
ipc = j;
break;
}
}
if(ipc.size()!=0 && evslicer.size()!=0) {
found = true;
}
if(!found) {
}else{
spdlog::error("evslicer {}: no valid config found. retrying load config...", devSn);
exit(1);
}
selfId = devSn + ":evslicer:" + to_string(iid);
//
if(ipc.count("sn") == 0) {
ipcSn = "unkown";
}
else {
ipcSn = ipc["sn"];
}
if(evslicer.count("videoServerAddr") != 0 && !evslicer["videoServerAddr"].get<string>().empty()) {
videoFileServerApi = evslicer["videoServerAddr"].get<string>();
if(videoFileServerApi.at(videoFileServerApi.size()-1) != '/') {
......@@ -852,12 +841,13 @@ public:
if(strEnv != nullptr) {
selfId = strEnv;
auto v = strutils::split(selfId, ':');
if(v.size() != 3||v[1] != "evslicer") {
if(v.size() != 4||v[2] != "evslicer") {
spdlog::error("evslicer received invalid gid: {}", selfId);
exit(1);
}
ipcSn = v[1];
devSn = v[0];
iid = stoi(v[2]);
iid = stoi(v[3]);
}
else {
spdlog::error("evslicer failed to start. no SN set");
......
......@@ -124,7 +124,7 @@ bool isIpStr(string ip)
}//namespace strutils
namespace cfgutils {
int getPeerId(string modName, json& modElem, string &peerId, string &peerName)
int getPeerId(string ipcSn, string modName, json& modElem, string &peerId, string &peerName)
{
try {
if(modName == "evmgr") {
......@@ -132,11 +132,11 @@ int getPeerId(string modName, json& modElem, string &peerId, string &peerName)
peerName = modName;
}
else if(modName == "evml") {
peerId = modElem["sn"].get<string>() + ":evml" + modElem["type"].get<string>() + ":" + to_string(modElem["iid"]);
peerId = modElem["sn"].get<string>() + ":" + ipcSn + ":evml" + modElem["type"].get<string>() + ":" + to_string(modElem["iid"]);
peerName = modName + modElem["type"].get<string>();
}
else {
peerId = modElem["sn"].get<string>() + ":" + modName + ":" + to_string(modElem["iid"]);
peerId = modElem["sn"].get<string>() + ":" + ipcSn + ":" + modName + ":" + to_string(modElem["iid"]);
peerName = modName;
}
}
......@@ -209,7 +209,7 @@ json getModuleGidsFromCfg(string sn, json &data, string caller, int ipcIdx)
}
string peerName;
int iret = cfgutils::getPeerId(mn, m, peerId, peerName);
int iret = cfgutils::getPeerId(ipc["sn"].get<string>(), mn, m, peerId, peerName);
if(iret != 0) {
// TODO: do we need to treat it more strictly, to make it fails fast???
......@@ -368,7 +368,7 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st
newMgr[mgrSn]["ipcs"].push_back(newIpc);
}
if(newIpc.size() != 0 && newIpc.count("modules") != 0 && newIpc.count("addr") != 0) {
if(newIpc.size() != 0 && newIpc.count("modules") != 0 && newIpc.count("addr") != 0 && newIpc.count("sn") != 0) {
for(auto &[mn, v]: newIpc["modules"].items()) {
string modName = mn;
json &mods = v;
......@@ -394,7 +394,7 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st
break;
}
string modGid = modSn + ":" + modName + ":" + to_string(modObj["iid"].get<int>());
string modGid = modSn + ":" + newIpc["sn"].get<string>() + ":" + modName + ":" + to_string(modObj["iid"].get<int>());
if(modObj.count("enabled") == 0 ||modObj["enabled"] == 0 ) {
ret["data"][modGid] = 0; // disabled
}else{
......@@ -406,7 +406,7 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st
}
}
if(oldIpc.size() != 0 && oldIpc.count("modules") != 0 && oldIpc.count("addr") != 0) {
if(oldIpc.size() != 0 && oldIpc.count("modules") != 0 && oldIpc.count("addr") != 0 && oldIpc.count("sn") != 0) {
for(auto &[mn,v]: oldIpc["modules"].items()) {
string modName = mn;
json &mods = v;
......@@ -432,7 +432,7 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st
break;
}
string modGid = modSn + ":" + modName + ":" + to_string(modObj["iid"].get<int>());
string modGid = modSn + ":" + oldIpc["sn"].get<string>() + ":" + modName + ":" + to_string(modObj["iid"].get<int>());
if(ret["data"].count(modGid) != 0 && ret["data"][modGid] == 2) {
ret["data"][modGid] = 3; // restart
}else{
......@@ -466,6 +466,7 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st
if(true){ //d["op"] == "replace"||d["op"] == "add" || d["op"] == "remove") {
auto &oldMod = oldConfig[mgrSn]["ipcs"][ipcIdx]["modules"][modName][modIdx];
auto &newMod = newConfig[mgrSn]["ipcs"][ipcIdx]["modules"][modName][modIdx];
auto &ipcObj = oldConfig[mgrSn]["ipcs"][ipcIdx];
if(oldMod.count("iid") == 0 || newMod.count("iid") == 0) {
string msg = fmt::format("invalid module config ipcs[{}]['modules'][{}][{}] having no iid field", ipcIdx, modName, modIdx);
spdlog::error(msg);
......@@ -484,8 +485,8 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st
}
}
if(newMod.count("sn") == 0||oldMod.count("sn") == 0) {
string msg = fmt::format("invalid module config ipcs[{}]['modules'][{}][{}] having no sn field", ipcIdx, modName, modIdx);
if(newMod.count("sn") == 0||oldMod.count("sn") == 0 || ipcObj.count("sn") == 0) {
string msg = fmt::format("invalid module config ipcs[{}]['modules'][{}][{}] (or ipc) having no sn field", ipcIdx, modName, modIdx);
spdlog::error(msg);
hasError = true;
break;
......@@ -494,8 +495,8 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st
string oldSn = oldMod["sn"];
string newSn = newMod["sn"];
string oldGid = oldSn + ":" + modName + ":" + to_string(oldMod["iid"].get<int>());
string newGid = newSn + ":" + modName + ":" + to_string(newMod["iid"].get<int>());
string oldGid = oldSn + ":" + ipcObj["sn"].get<string>() + ":" + modName + ":" + to_string(oldMod["iid"].get<int>());
string newGid = newSn + ":" + ipcObj["sn"].get<string>() + ":" + modName + ":" + to_string(newMod["iid"].get<int>());
if(oldGid != newGid) {
ret["data"][oldGid] = 1; // perm stop
......@@ -545,6 +546,8 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st
string modName = results[3].str();
json oldModObj, newModObj;
auto &ipcObj = newConfig[mgrSn]["ipcs"][ipcIdx];
if(oldConfig[mgrSn]["ipcs"][ipcIdx]["modules"][modName].size() >= modIdx +1) {
oldModObj = oldConfig[mgrSn]["ipcs"][ipcIdx]["modules"][modName][modIdx];
}
......@@ -574,7 +577,7 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st
break;
}
string modGid = modSn + ":" + modName + ":" + to_string(newModObj["iid"].get<int>());
string modGid = modSn + ":" + ipcObj["sn"].get<string>() + ":" + modName + ":" + to_string(newModObj["iid"].get<int>());
if(newModObj.count("enabled") == 0 ||newModObj["enabled"] == 0 ) {
ret["data"][modGid] = 0; // disabled
}else{
......@@ -605,7 +608,7 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st
continue;
}
string modGid = modSn + ":" + modName + ":" + to_string(oldModObj["iid"].get<int>());
string modGid = modSn + ":" + ipcObj["sn"].get<string>() + ":" + modName + ":" + to_string(oldModObj["iid"].get<int>());
if(ret["data"].count(modGid) != 0 && ret["data"][modGid] == 2 ) {
ret["data"][modGid] = 3; // restart
}else{
......
......@@ -44,7 +44,7 @@ bool isIpStr(string ip);
}//namespace strutils
namespace cfgutils {
int getPeerId(string modName, json& modElem, string &peerId, string &peerName);
int getPeerId(string ipcSn, string modName, json& modElem, string &peerId, string &peerName);
json *findModuleConfig(string peerId, json &data);
json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, string sn);
json getModuleGidsFromCfg(string sn, json &data, string caller = "", int ipcIdx = -1);
......
......@@ -183,8 +183,11 @@ int forkSubsystem(string devSn, string peerId, int drPort, pid_t &pid){
auto v = strutils::split(peerId, ':');
string modName = v[1];
string sn = v[0];
if(modName != "evmgr") {
modName = v[2];
}
if( (pid = fork()) == -1 ) {
spdlog::error("evdamon {} failed to fork subsytem - evmgr", devSn);
spdlog::error("evdamon {} failed to fork subsytem {}", devSn, modName);
return -1;
}else if(pid == 0) {
ret += setenv("PEERID", peerId.c_str(), 1);
......@@ -194,7 +197,7 @@ int forkSubsystem(string devSn, string peerId, int drPort, pid_t &pid){
return -2;
}
execl((string("./") + modName).c_str(), NULL, NULL, NULL);
spdlog::error("evdaemon {} failed to startup evmgr", devSn);
spdlog::error("forkSubsystem {} failed to startup evmgr", devSn);
}else{
// parent
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论