提交 0951089e authored 作者: blu's avatar blu

upload video files

上级 b8f1a381
...@@ -38,7 +38,7 @@ CFLAGS = $(DEBUG) -Wall ...@@ -38,7 +38,7 @@ CFLAGS = $(DEBUG) -Wall
LIBOPENCV = `pkg-config opencv --cflags --libs` LIBOPENCV = `pkg-config opencv --cflags --libs`
LIBFFMPEG = `pkg-config libavformat libavutil libavcodec libswscale --cflags --libs` LIBFFMPEG = `pkg-config libavformat libavutil libavcodec libswscale --cflags --libs`
LIBS +=-Lvendor/lib -lpthread -lleveldb -lfswatch LIBS +=-Lvendor/lib -lpthread -lleveldb
#-static #-static
HEADERS=-Iinc -Ivendor/include HEADERS=-Iinc -Ivendor/include
...@@ -72,7 +72,7 @@ evpusher: evpusher.cpp inc/common.hpp inc/av_common.hpp objs/utils.o inc/tinythr ...@@ -72,7 +72,7 @@ evpusher: evpusher.cpp inc/common.hpp inc/av_common.hpp objs/utils.o inc/tinythr
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evpusher evpusher.cpp objs/database.o objs/utils.o objs/zmqhelper.o $(SQLITE) $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evpusher evpusher.cpp objs/database.o objs/utils.o objs/zmqhelper.o $(SQLITE) $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evslicer: evslicer.cpp inc/common.hpp inc/av_common.hpp postfile.cpp objs/utils.o objs/dirmon.o inc/tinythread.hpp objs/database.o objs/zmqhelper.o $(SQLITE_SRC) evslicer: evslicer.cpp inc/common.hpp inc/av_common.hpp postfile.cpp objs/utils.o objs/dirmon.o inc/tinythread.hpp objs/database.o objs/zmqhelper.o $(SQLITE_SRC)
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evslicer evslicer.cpp postfile.cpp objs/database.o objs/dirmon.o objs/utils.o objs/zmqhelper.o $(SQLITE) $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) -lcurl $(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evslicer evslicer.cpp postfile.cpp objs/database.o objs/dirmon.o objs/utils.o objs/zmqhelper.o $(SQLITE) $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) -lcurl -lfswatch
evmlmotion: evmlmotion.cpp inc/common.hpp inc/av_common.hpp objs/utils.o inc/avcvhelpers.hpp objs/database.o objs/zmqhelper.o inc/tinythread.hpp $(SQLITE_SRC) evmlmotion: evmlmotion.cpp inc/common.hpp inc/av_common.hpp objs/utils.o inc/avcvhelpers.hpp objs/database.o objs/zmqhelper.o inc/tinythread.hpp $(SQLITE_SRC)
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evmlmotion evmlmotion.cpp objs/database.o objs/utils.o objs/zmqhelper.o $(SQLITE) $(LIBFFMPEG) $(HEADERS) $(LIBOPENCV) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evmlmotion evmlmotion.cpp objs/database.o objs/utils.o objs/zmqhelper.o $(SQLITE) $(LIBFFMPEG) $(HEADERS) $(LIBOPENCV) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
......
...@@ -549,7 +549,7 @@ public: ...@@ -549,7 +549,7 @@ public:
// //
} }
else { else {
ret["data"].merge_patch(cfg); ret["data"].merge_patch(_patch);
} }
} }
} }
......
...@@ -13,6 +13,9 @@ update: 2019/09/10 ...@@ -13,6 +13,9 @@ update: 2019/09/10
#include <queue> #include <queue>
#include <cstdlib> #include <cstdlib>
#include <algorithm> #include <algorithm>
#include <regex>
#include <iterator>
#include <set>
#include "inc/tinythread.hpp" #include "inc/tinythread.hpp"
#include "inc/httplib.h" #include "inc/httplib.h"
#include "inc/zmqhelper.hpp" #include "inc/zmqhelper.hpp"
...@@ -34,6 +37,7 @@ class EvDaemon { ...@@ -34,6 +37,7 @@ class EvDaemon {
private: private:
Server svr; Server svr;
json config; json config;
json lastConfig;
json deltaCfg; json deltaCfg;
json info; json info;
int port = 8088; int port = 8088;
...@@ -177,16 +181,16 @@ private: ...@@ -177,16 +181,16 @@ private:
} }
} }
int startSubSystems(vector<string> v = {}) int startSubSystems(vector<string> subs = {})
{ {
// check status and startup
std::lock_guard<std::mutex> lock(mutSubsystem);
int ret = 0; int ret = 0;
std::lock_guard<std::mutex> lock(mutSubsystem);
vector<string> tmp; vector<string> tmp;
json unkown; json unkown;
vector<string> terms; vector<string> terms;
string info;
int cnt = 0; int cnt = 0;
string info;
for(auto &[k,v]: this->peerData["config"].items()) { for(auto &[k,v]: this->peerData["config"].items()) {
if(this->peerData["enabled"].count(k) != 0 && this->peerData["enabled"][k] != 0) { if(this->peerData["enabled"].count(k) != 0 && this->peerData["enabled"][k] != 0) {
if((this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0)) { if((this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0)) {
...@@ -203,8 +207,28 @@ private: ...@@ -203,8 +207,28 @@ private:
cnt++; cnt++;
} }
if(subs.size() != 0) {
for(auto &k: subs) {
pid_t pid = 0;
if(std::find(terms.begin(), terms.end(), k) != terms.end()) {
// ignore
spdlog::warn("evdaemon {} skip startup {} since it's disabled.", this->devSn, k);
}else{
ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid);
if(0 == ret) {
this->peerData["status"][k] = 0;
this->peerData["pids"][k] = pid;
spdlog::info("evdaemon {} created subsystem {}", devSn, k);
}
else {
spdlog::info("evdaemon {} failed to create subsystem {}", devSn, k);
}
}
}
}else{
// cold startup, ignore diff.
if(this->bColdStart) {
spdlog::info("evdaemon {} will start following subsystems: {}", devSn, info); spdlog::info("evdaemon {} will start following subsystems: {}", devSn, info);
//
for(string &e : tmp) { for(string &e : tmp) {
pid_t pid = 0; pid_t pid = 0;
ret = zmqhelper::forkSubsystem(devSn, e, portRouter, pid); ret = zmqhelper::forkSubsystem(devSn, e, portRouter, pid);
...@@ -218,12 +242,6 @@ private: ...@@ -218,12 +242,6 @@ private:
} }
} }
for(string &e: terms) {
if(this->peerData["pids"].count(e) != 0) {
kill(this->peerData["pids"][e], SIGTERM);
}
}
while(unkown.size() != 0 && cnt < 3) { while(unkown.size() != 0 && cnt < 3) {
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(3));
for(auto &[k,v]: unkown.items()) { for(auto &[k,v]: unkown.items()) {
...@@ -235,6 +253,15 @@ private: ...@@ -235,6 +253,15 @@ private:
cnt++; cnt++;
} }
for(string &e: terms) {
// if(this->peerData["pids"].count(e) != 0) {
// kill(this->peerData["pids"][e], SIGTERM);
// }
if(this->peerData["pids"].count(e) != 0 && this->peerData["pids"].count(e) != -1) {
}
}
for(auto &[k,v]: unkown.items()) { for(auto &[k,v]: unkown.items()) {
pid_t pid = 0; pid_t pid = 0;
ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid); ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid);
...@@ -247,10 +274,26 @@ private: ...@@ -247,10 +274,26 @@ private:
spdlog::info("evdaemon {} failed to create subsystem {}", devSn, k); spdlog::info("evdaemon {} failed to create subsystem {}", devSn, k);
} }
} }
}else{
// TODO: load delta config
json mods;
set<int> ipcs;
}
this->bColdStart = false;
this->deltaCfg = json();
return ret; return ret;
} }
void sendMsgToPeer(string peerId, string meta, string msg) {
int ret = z_send(pDealer, this->daemonId, peerId, meta, msg);
if(ret < 0) {
spdlog::error("evcloudsvc {} failed to send msg to peer {}: {} - {}", devSn, peerId, meta, msg);
}else{
spdlog::info("evcloudsvc {} successfully send msg to peer {}: {} - {}", devSn, peerId, meta, msg);
}
}
int handleEdgeMsg(vector<vector<uint8_t> > &body) int handleEdgeMsg(vector<vector<uint8_t> > &body)
{ {
int ret = 0; int ret = 0;
...@@ -291,7 +334,7 @@ private: ...@@ -291,7 +334,7 @@ private:
spdlog::warn("evdaemon {} peer {} disconnected. reloading config", devSn, selfId); spdlog::warn("evdaemon {} peer {} disconnected. reloading config", devSn, selfId);
if(this->bBootstrap) { if(this->bBootstrap) {
startSubSystems(); startSubSystems({selfId});
} }
} }
...@@ -326,9 +369,8 @@ private: ...@@ -326,9 +369,8 @@ private:
this->peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); this->peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
// msg to peer // msg to peer
string myId = devSn + ":evmgr:0"; int minLen = std::min(body[1].size(), this->daemonId.size());
int minLen = std::min(body[1].size(), myId.size()); if(memcmp((void*)(body[1].data()), this->daemonId.data(), minLen) != 0) {
if(memcmp((void*)(body[1].data()), myId.data(), minLen) != 0) {
// message to other peer // message to other peer
// check peer status // check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]}; vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
...@@ -426,6 +468,7 @@ private: ...@@ -426,6 +468,7 @@ private:
this->deltaCfg = json::diff(this->config, data); this->deltaCfg = json::diff(this->config, data);
spdlog::info("evdaemon {} received cloud config diff: {}\nnew: {}", devSn, this->deltaCfg.dump(4), data.dump()); spdlog::info("evdaemon {} received cloud config diff: {}\nnew: {}", devSn, this->deltaCfg.dump(4), data.dump());
if(this->deltaCfg.size() != 0 || this->bColdStart) { if(this->deltaCfg.size() != 0 || this->bColdStart) {
this->lastConfig = this->config;
this->config = data; this->config = data;
spdlog::info("evdaemon {} reloading config from cloud", devSn); spdlog::info("evdaemon {} reloading config from cloud", devSn);
ret = reloadCfg(); ret = reloadCfg();
...@@ -438,7 +481,6 @@ private: ...@@ -438,7 +481,6 @@ private:
spdlog::error("evdameon {} failed to save new config to local db: {}", devSn, data.dump()); spdlog::error("evdameon {} failed to save new config to local db: {}", devSn, data.dump());
return ret; return ret;
} }
this->bColdStart = false;
} }
else { else {
} }
...@@ -550,6 +592,7 @@ public: ...@@ -550,6 +592,7 @@ public:
peerData["status"] = json(); peerData["status"] = json();
peerData["pids"] = json(); peerData["pids"] = json();
peerData["config"] = json(); peerData["config"] = json();
deltaCfg = json();
int ret = 0; int ret = 0;
string dir_ = string("mkdir -p ") + EV_FILE_LVDB_DAEMON; string dir_ = string("mkdir -p ") + EV_FILE_LVDB_DAEMON;
system(dir_.c_str()); system(dir_.c_str());
......
...@@ -192,4 +192,43 @@ namespace cfgutils { ...@@ -192,4 +192,43 @@ namespace cfgutils {
return ret; return ret;
} }
vector<string> getModuleGidsOfIpc(json &config, string sn, int ipcId) {
json tmp;
if(config.count(sn) == 0) {
//
spdlog::error("getModuleGidsOfIpc invalid config for sn {}: {}", sn, config.dump());
}else{
auto conf = config[sn];
if(conf.count("ipcs") == 0 || conf["ipcs"].size() < (ipcId + 1) || conf["ipcs"][ipcId].count("modules") == 0||conf["ipcs"][ipcId]["modules"].size() == 0) {
spdlog::error("getModuleGidsOfIpc invalid config having no such ipc {} or modules. {}: {}", ipcId, sn, config.dump());
}else{
auto modules = conf["ipcs"][ipcId]["modules"];
for(auto &[mn, mv]: modules.items()) {
for(auto &m:mv) {
if(m.count("sn") == 0 || m.count("iid") == 0||( mn == "evml" && m.count("type") == 0)) {
spdlog::error("getModuleGidsOfIpc invalid module config {} in {}", mv.dump(), config.dump());
}else{
if(m["sn"] == sn) {
string gid;
if(mn == "evml"){
gid = sn + ":evml"+m["type"].get<string>()+":"+ to_string(m["iid"].get<int>());
}else{
gid = sn + ":" + mn + ":"+ to_string(m["iid"].get<int>());
}
tmp[gid] = 1;
}
}
}
}
}
}
vector<string> ret;
for(auto &[k,v]: tmp.items()) {
ret.push_back(k);
}
return ret;
}
} // cfgutils } // cfgutils
...@@ -38,6 +38,7 @@ vector<string> split(const std::string& s, char delimiter); ...@@ -38,6 +38,7 @@ vector<string> split(const std::string& s, char delimiter);
namespace cfgutils { namespace cfgutils {
int getPeerId(string modName, json& modElem, string &peerId, string &peerName); int getPeerId(string modName, json& modElem, string &peerId, string &peerName);
json *findModuleConfig(string peerId, json &data); json *findModuleConfig(string peerId, json &data);
vector<string> getModuleGidsOfIpc(json &config, string sn, int ipcId);
} }
struct StrException : public std::exception struct StrException : public std::exception
......
...@@ -85,6 +85,14 @@ int z_send_multiple(void *s, vector<vector<uint8_t> >&body) ...@@ -85,6 +85,14 @@ int z_send_multiple(void *s, vector<vector<uint8_t> >&body)
return ret; return ret;
} }
int z_send(void *s, string peerId, string selfId, string sMeta, string body)
{
json meta;
meta["type"] = sMeta;
vector<vector<uint8_t> > v{str2body(peerId), str2body(selfId), str2body(meta.dump()), str2body(body)};
return z_send_multiple(s, v);
}
/// setup router /// setup router
int setupRouter(void **ctx, void **s, string addr){ int setupRouter(void **ctx, void **s, string addr){
int ret = 0; int ret = 0;
......
...@@ -34,7 +34,7 @@ namespace zmqhelper { ...@@ -34,7 +34,7 @@ namespace zmqhelper {
#define EV_MSG_TYPE_AI_MOTION "ai_motion" #define EV_MSG_TYPE_AI_MOTION "ai_motion"
#define EV_MSG_TYPE_CONN_STAT "connstat" #define EV_MSG_TYPE_CONN_STAT "connstat"
#define EV_MSG_TYPE_SYS_STAT "sysstat" #define EV_MSG_TYPE_SYS_STAT "sysstat"
// #define EV_MSG_CMD_RESTART "restart" #define EV_MSG_CMD_RESTART "restart"
// #define EV_MSG_CMD_UPDATE "update" // #define EV_MSG_CMD_UPDATE "update"
#define EV_MSG_EVENT_MOTION_START "start" #define EV_MSG_EVENT_MOTION_START "start"
...@@ -63,6 +63,7 @@ int setupDealer(void **ctx, void **s, string addr, string ident); ...@@ -63,6 +63,7 @@ int setupDealer(void **ctx, void **s, string addr, string ident);
/// @return 0 success, otherwise failed. /// @return 0 success, otherwise failed.
int recvConfigMsg(void *s, json &config, string addr, string ident); int recvConfigMsg(void *s, json &config, string addr, string ident);
int forkSubsystem(string devSn, string peerId, int drPort, pid_t &pid); int forkSubsystem(string devSn, string peerId, int drPort, pid_t &pid);
int z_send(void *s, string peerId, string selfId, string sMeta, string body);
} }
#endif #endif
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论