提交 c8b301f8 authored 作者: blu's avatar blu

upload video files

上级 2566dd52
...@@ -7,77 +7,82 @@ query configuration for edge device with specified sn ...@@ -7,77 +7,82 @@ query configuration for edge device with specified sn
- sn: string. device serial - sn: string. device serial
##### return ##### return
- type: json - type: json
- field "code": 0 - success; otherwise failed. - field "code": 0 - success;
otherwise failed.
- field "msg": string, readable string for "code" - field "msg": string, readable string for "code"
- field "data": configuration for sn. - field "data": configuration for sn.
- example - example
``` ```
{ {
"code": 0, "code": 0,
"data": { "data": {
"NMXH73Y2": { "NMXH73Y2": {
"addr": "127.0.0.1", "addr": "127.0.0.1"
"api-cloud": "http://127.0.0.1:8089", ,
"ipcs": [ "api-cloud": "http://127.0.0.1:8089"
{ ,
"addr": "172.31.0.51", "ipcs":
"modules": { [ {
"evml": [ "addr": "172.31.0.51",
{ "modules": {
"area": 300, "evml": [
"enabled": 1, {
"iid": 1, "area": 300,
"post": 30, "enabled": 1,
"pre": 3, "iid": 1,
"sn": "NMXH73Y2", "post": 30,
"thresh": 80, "pre": 3,
"type": "motion" "sn": "NMXH73Y2",
} "thresh": 80,
], "type": "motion"
"evpuller": [ }
{ ],
"addr": "127.0.0.1", "evpuller": [
"enabled": 1, {
"iid": 1, "addr": "127.0.0.1",
"port-pub": 5556, "enabled": 1,
"sn": "NMXH73Y2" "iid": 1,
} "port-pub": 5556,
], "sn": "NMXH73Y2"
"evpusher": [ }
{ ],
"enabled": 1, "evpusher": [
"iid": 1, {
"password": "", "enabled": 1,
"sn": "NMXH73Y2", "iid": 1,
"token": "", "password": "",
"urlDest": "rtsp://40.73.41.176:554/test1", "sn": "NMXH73Y2",
"user": "" "token": "",
} "urlDest": "rtsp://40.73.41.176:554/test1",
], "user": ""
"evslicer": [ }
{ ],
"enabled": 1, "evslicer": [
"iid": 1, {
"path": "slices", "enabled": 1,
"sn": "NMXH73Y2" "iid": 1,
} "path": "slices",
] "sn": "NMXH73Y2"
}
]
}, },
"password": "FWBWTU", "password": "FWBWTU",
"port": 554, "port": 554,
"proto": "rtsp", "proto": "rtsp",
"user": "admin" "user": "admin"
} }
], ],
"mqtt-cloud": "<cloud_addr>", "mqtt-cloud": "<cloud_addr>"
"port-cloud": 5556, ,
"port-router": 5550, "port-cloud": 5556,
"proto": "zmq", "port-router": 5550,
"sn": "NMXH73Y2" "proto": "zmq"
} ,
}, "sn": "NMXH73Y2"
"msg": "ok" }
} },
"msg": "ok"
}
``` ```
...@@ -89,84 +94,91 @@ set or change configuration for edge device ...@@ -89,84 +94,91 @@ set or change configuration for edge device
- sn: string, only used when patch is set as true - sn: string, only used when patch is set as true
##### body ##### body
- type: json - type: json
- example - example
1. full configure 1. full configure
``` ```
{ {
"data":{ "data": {
"NMXH73Y2":{ "NMXH73Y2": {
"addr":"127.0.0.1", "addr":"127.0.0.1"
"api-cloud":"http://127.0.0.1:8089", ,
"ipcs":[ "api-cloud":"http://127.0.0.1:8089"
{ ,
"addr":"172.31.0.51", "ipcs":
"modules":{ [ {
"evml":[ "addr":"172.31.0.51",
{ "modules":{
"evml":[
{
"area":300, "area":300,
"enabled":1, "enabled":1,
"iid":1, "iid":1,
"post":30, "post":30,
"pre":3, "pre":3,
"sn":"NMXH73Y2", "sn":"NMXH73Y2",
"thresh":80, "thresh":80,
"type":"motion" "type":"motion"
} }
], ],
"evpuller":[ "evpuller":[
{ {
"addr":"127.0.0.1", "addr":"127.0.0.1",
"iid":1, "iid":1,
"enabled": 1, "enabled": 1,
"port-pub":5556, "port-pub":5556,
"sn":"NMXH73Y2" "sn":"NMXH73Y2"
} }
], ],
"evpusher":[ "evpusher":[
{ {
"enabled":1, "enabled":1,
"iid":1, "iid":1,
"password":"", "password":"",
"sn":"NMXH73Y2", "sn":"NMXH73Y2",
"token":"", "token":"",
"urlDest":"rtsp://40.73.41.176:554/test1", "urlDest":"rtsp://40.73.41.176:554/test1",
"user":"" "user":""
} }
], ],
"evslicer":[ "evslicer":[
{ {
"enabled":1, "enabled":1,
"iid":1, "iid":1,
"path":"slices", "path":"slices",
"sn":"NMXH73Y2" "sn":"NMXH73Y2"
} }
] ]
}, },
"password":"FWBWTU", "password":"FWBWTU",
"port":554, "port":554,
"proto":"rtsp", "proto":"rtsp",
"user":"admin" "user":"admin"
} }
], ],
"mqtt-cloud":"<cloud_addr>", "mqtt-cloud":"<cloud_addr>"
"port-cloud":5556, ,
"port-router":5550, "port-cloud":5556,
"proto":"zmq", "port-router":5550,
"sn":"NMXH73Y2" "proto":"zmq"
} ,
}, "sn":"NMXH73Y2"
"lastupdated":1567669674 }
},
"lastupdated":1567669674
} }
``` ```
2. patch configure (POST /config?patch=true&sn=NMXH73Y2) 2. patch configure (POST /config?patch=true&sn=NMXH73Y2)
``` ```
[{"op":"add","path":"/ipcs/0/modules/evpuller/0/enabled","value":1}] [ {"op":"add","path":"/ipcs/0/modules/evpuller/0/enabled","value":1}]
``` ```
##### return ##### return
- type: json - type: json
- example: - example:
``` ```
{"code": 0, "msg":"ok", "data":JSON} {"code": 0, "msg":"ok"
, "data":
JSON
}
``` ```
#### GET /keys #### GET /keys
...@@ -175,43 +187,48 @@ query all keys in cloud db ...@@ -175,43 +187,48 @@ query all keys in cloud db
##### params ##### params
- none - none
##### return ##### return
- type: json array - type: json array
- example - example
``` ```
[ [
"NMXH73Y2", "NMXH73Y2",
"NMXH73Y2_bak", "NMXH73Y2_bak",
"SN", "SN",
"configmap", "configmap",
"configmap_bak" "configmap_bak"
] ]
``` ```
#### GET /value #### GET /value
##### description ##### description
get value for specified key in cloud db. keys list is queried by /keys api get value for specified key in cloud db. keys list is queried by /keys api
##### params ##### params
- key: string - key: string
##### return ##### return
- type: json - type: json
- example - example
``` ```
# GET /value?key=configmap # GET /value?key=configmap
{ {
"NMXH73Y2": "NMXH73Y2", "NMXH73Y2": "NMXH73Y2"
"code": 0, ,
"mod2mgr": { "code": 0,
"NMXH73Y2:evml:motion": "NMXH73Y2", "mod2mgr": {
"NMXH73Y2:evpuller": "NMXH73Y2", "NMXH73Y2:evml:motion": "NMXH73Y2"
"NMXH73Y2:evpusher": "NMXH73Y2", ,
"NMXH73Y2:evslicer": "NMXH73Y2" "NMXH73Y2:evpuller": "NMXH73Y2"
}, ,
"sn2mods": { "NMXH73Y2:evpusher": "NMXH73Y2"
"NMXH73Y2": [ ,
"NMXH73Y2:evml:motion", "NMXH73Y2:evslicer": "NMXH73Y2"
"NMXH73Y2:evpuller", },
"NMXH73Y2:evpusher", "sn2mods": {
"NMXH73Y2:evslicer" "NMXH73Y2":
] [
"NMXH73Y2:evml:motion",
"NMXH73Y2:evpuller",
"NMXH73Y2:evpusher",
"NMXH73Y2:evslicer"
]
} }
} }
``` ```
...@@ -221,10 +238,12 @@ get value for specified key in cloud db. keys list is queried by /keys api ...@@ -221,10 +238,12 @@ get value for specified key in cloud db. keys list is queried by /keys api
##### params ##### params
- sn: string - sn: string
##### return ##### return
- type: json - type: json
- example - example
``` ```
{"code": 0, "msg":"ok", ...} {"code": 0, "msg":"ok"
, ...
}
``` ```
#### GET /sysinfo #### GET /sysinfo
...@@ -236,17 +255,18 @@ get value for specified key in cloud db. keys list is queried by /keys api ...@@ -236,17 +255,18 @@ get value for specified key in cloud db. keys list is queried by /keys api
*[NOT IMPLEMENTED]* turn on/off debug tunnel *[NOT IMPLEMENTED]* turn on/off debug tunnel
##### params ##### params
- sn: string - sn: string
- op: on|off - op: on|off
- ip: string. ip of public accessable host - ip: string. ip of public accessable host
- port(optional): number. ssh port of the public accessable host. default 22. - port(optional): number. ssh port of the public accessable host. default 22.
- port_tun(optional): number. tunnuel port of the public accessable host. default 11222. - port_tun(optional): number. tunnuel port of the public accessable host. default 11222.
- user: ssh user of the public accessable host - user: ssh user of the public accessable host
- password: ssh password of the public accessable host - password: ssh password of the public accessable host
##### return ##### return
- type: json - type: json
- example - example
``` ```
{"code":0, "msg":"ok"} {"code":0, "msg":"ok"
}
``` ```
...@@ -44,7 +44,8 @@ private: ...@@ -44,7 +44,8 @@ private:
mutex eventQLock; mutex eventQLock;
thread thMsgProcessor; thread thMsgProcessor;
void loadConfigMap(){ void loadConfigMap()
{
// load configmap // load configmap
json cnfm; json cnfm;
int ret = LVDB::getValue(cnfm, KEY_CONFIG_MAP); int ret = LVDB::getValue(cnfm, KEY_CONFIG_MAP);
...@@ -65,19 +66,21 @@ private: ...@@ -65,19 +66,21 @@ private:
} }
// populate peerData // populate peerData
for(auto &[k,v]: this->configMap["sn2mods"].items()){ for(auto &[k,v]: this->configMap["sn2mods"].items()) {
// load config from database // load config from database
json cfg; json cfg;
if(LVDB::getLocalConfig(cfg, k) < 0) { if(LVDB::getLocalConfig(cfg, k) < 0) {
spdlog::error("evcloudsvc failed to load config for device: {}", k); spdlog::error("evcloudsvc failed to load config for device: {}", k);
}else{ }
else {
this->peerData["config"][k] = cfg; this->peerData["config"][k] = cfg;
spdlog::info("evcloudsvc loaded config for device: {}", k); spdlog::info("evcloudsvc loaded config for device: {}", k);
} }
} }
} }
int sendConfig(json &config_, string sn) { int sendConfig(json &config_, string sn)
{
int ret = 0; int ret = 0;
string cfg = config_.dump(); string cfg = config_.dump();
json j; json j;
...@@ -93,10 +96,10 @@ private: ...@@ -93,10 +96,10 @@ private:
// cachedMsg[sn].pop(); // cachedMsg[sn].pop();
// } // }
// }else{ // }else{
ret = z_send_multiple(pRouter, v); ret = z_send_multiple(pRouter, v);
spdlog::info("evcloudsvc config sent to {}: {}", sn, cfg); spdlog::info("evcloudsvc config sent to {}: {}", sn, cfg);
//} //}
return ret; return ret;
} }
...@@ -169,19 +172,20 @@ private: ...@@ -169,19 +172,20 @@ private:
} }
// check exist // check exist
bool hasModKey =false; bool hasModKey =false;
for(auto &modKey_:this->configMap["sn2mods"][sn]){ for(auto &modKey_:this->configMap["sn2mods"][sn]) {
if(modKey_ == modKey) { if(modKey_ == modKey) {
hasModKey = true; hasModKey = true;
break; break;
} }
} }
if(hasModKey){ if(hasModKey) {
//nop //nop
}else{ }
else {
this->configMap["sn2mods"][sn].push_back(modKey); this->configMap["sn2mods"][sn].push_back(modKey);
} }
// modkey -> sn_of_evmgr // modkey -> sn_of_evmgr
this->configMap["mod2mgr"][modKey] = k; this->configMap["mod2mgr"][modKey] = k;
} }
...@@ -221,16 +225,18 @@ private: ...@@ -221,16 +225,18 @@ private:
// update in memory peerData // update in memory peerData
if(this->peerData["config"].count(k) != 0) { if(this->peerData["config"].count(k) != 0) {
json diff = json::diff(this->peerData["config"][k], v); json diff = json::diff(this->peerData["config"][k], v);
if(diff.size()!=0) { if(diff.size()!=0) {
// send config // send config
deltaCfg[k] = 1; deltaCfg[k] = 1;
this->peerData["config"][k] = v; this->peerData["config"][k] = v;
spdlog::info("evcloudsvc peer {} config diff:\n{}\norigin:\n{}", k, diff.dump(), this->peerData["config"][k].dump()); spdlog::info("evcloudsvc peer {} config diff:\n{}\norigin:\n{}", k, diff.dump(), this->peerData["config"][k].dump());
}else{ }
else {
spdlog::info("evcloudsvc peer {} config no diff. ignored:\n{}", k, this->peerData["config"][k].dump()); spdlog::info("evcloudsvc peer {} config no diff. ignored:\n{}", k, this->peerData["config"][k].dump());
} }
}else{ }
else {
this->peerData["config"][k] = v; this->peerData["config"][k] = v;
} }
// TODO: important! always send config in case edge config is corrupted. // TODO: important! always send config in case edge config is corrupted.
...@@ -255,7 +261,7 @@ private: ...@@ -255,7 +261,7 @@ private:
} }
// update config // update config
for(auto &[x,y]: deltaCfg.items()){ for(auto &[x,y]: deltaCfg.items()) {
json j = getConfigForDevice(x); json j = getConfigForDevice(x);
if(j["code"] == 0) { if(j["code"] == 0) {
sendConfig(j["data"], x); sendConfig(j["data"], x);
...@@ -275,26 +281,28 @@ private: ...@@ -275,26 +281,28 @@ private:
return ret; return ret;
} }
// //
bool handleConnection(string selfId) { bool handleConnection(string selfId)
{
bool ret = false; bool ret = false;
int state = zmq_socket_get_peer_state(pRouter, selfId.data(), selfId.size()); int state = zmq_socket_get_peer_state(pRouter, selfId.data(), selfId.size());
spdlog::info("evcloudsvc peer {} state: {}", selfId, state); spdlog::info("evcloudsvc peer {} state: {}", selfId, state);
if(peerData["status"].count(selfId) == 0 || peerData["status"][selfId] == 0) { if(peerData["status"].count(selfId) == 0 || peerData["status"][selfId] == 0) {
peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
spdlog::info("evcloudsvc peer connected: {}", selfId); spdlog::info("evcloudsvc peer connected: {}", selfId);
ret = true; ret = true;
spdlog::debug("evcloudsvc update status of {} to 1 and send config", selfId); spdlog::debug("evcloudsvc update status of {} to 1 and send config", selfId);
json data = getConfigForDevice(selfId); json data = getConfigForDevice(selfId);
if(data["code"] != 0) { if(data["code"] != 0) {
// //
}else{
sendConfig(data["data"], selfId);
}
} }
else { else {
peerData["status"][selfId] = 0; sendConfig(data["data"], selfId);
spdlog::warn("evcloudsvc {} peer disconnected: {}", devSn, selfId); }
}
else {
peerData["status"][selfId] = 0;
spdlog::warn("evcloudsvc {} peer disconnected: {}", devSn, selfId);
} }
return ret; return ret;
} }
...@@ -405,14 +413,15 @@ private: ...@@ -405,14 +413,15 @@ private:
return ret; return ret;
} }
json getConfigForDevice(string sn) { json getConfigForDevice(string sn)
{
json ret; json ret;
ret["code"] = 0; ret["code"] = 0;
ret["msg"] = "ok"; ret["msg"] = "ok";
ret["data"] = json(); ret["data"] = json();
json &data = ret["data"]; json &data = ret["data"];
spdlog::info("evcloudsvc get config for sn {}", sn); spdlog::info("evcloudsvc get config for sn {}", sn);
try{ try {
if(this->configMap["sn2mods"].count(sn) != 0) { if(this->configMap["sn2mods"].count(sn) != 0) {
auto mods = this->configMap["sn2mods"][sn]; auto mods = this->configMap["sn2mods"][sn];
set<string> s; set<string> s;
...@@ -424,7 +433,8 @@ private: ...@@ -424,7 +433,8 @@ private:
for(auto &key : s) { for(auto &key : s) {
if(this->peerData["config"].count(key) == 0) { if(this->peerData["config"].count(key) == 0) {
spdlog::error("evcloudsvc no peerData config for device {}", key); spdlog::error("evcloudsvc no peerData config for device {}", key);
}else{ }
else {
if(data.count(key) != 0) { if(data.count(key) != 0) {
json diff = json::diff(data[key], this->peerData["config"][key]); json diff = json::diff(data[key], this->peerData["config"][key]);
if(diff.size() != 0) { if(diff.size() != 0) {
...@@ -433,25 +443,28 @@ private: ...@@ -433,25 +443,28 @@ private:
ret["msg"] = msg; ret["msg"] = msg;
break; break;
} }
}else{ }
else {
data[key] = this->peerData["config"][key]; data[key] = this->peerData["config"][key];
} }
} }
} // for keys of mgr } // for keys of mgr
ret["data"] = data; ret["data"] = data;
}else{ }
else {
ret["code"] = 1; ret["code"] = 1;
string msg = "no such sn: " + sn; string msg = "no such sn: " + sn;
ret["msg"] = msg; ret["msg"] = msg;
spdlog::warn("evcloudsvc no config for sn: {}", sn); spdlog::warn("evcloudsvc no config for sn: {}", sn);
} }
}catch(exception &e) { }
catch(exception &e) {
string msg = "evcloudsvc exception in file" + string(__FILE__) + ":" + to_string(__LINE__) + " for: " + e.what(); string msg = "evcloudsvc exception in file" + string(__FILE__) + ":" + to_string(__LINE__) + " for: " + e.what();
spdlog::error(msg); spdlog::error(msg);
ret["code"] = -1; ret["code"] = -1;
ret["msg"] = msg; ret["msg"] = msg;
} }
return ret; return ret;
} }
...@@ -506,7 +519,8 @@ public: ...@@ -506,7 +519,8 @@ public:
} }
else if(!sn.empty() && module.empty()) { else if(!sn.empty() && module.empty()) {
ret = getConfigForDevice(sn); ret = getConfigForDevice(sn);
}else{ }
else {
ret["code"] = 2; ret["code"] = 2;
ret["msg"] = "invalid request. no param for sn/module"; ret["msg"] = "invalid request. no param for sn/module";
} }
...@@ -525,7 +539,7 @@ public: ...@@ -525,7 +539,7 @@ public:
string msg; string msg;
try { try {
json cfg = json::parse(req.body); json cfg = json::parse(req.body);
if(req.has_param("sn") && req.has_param("patch")){ if(req.has_param("sn") && req.has_param("patch")) {
string _sn = req.get_param_value("sn"); string _sn = req.get_param_value("sn");
string _patch = req.get_param_value("patch"); string _patch = req.get_param_value("patch");
if(!_sn.empty() && _patch == "true") { if(!_sn.empty() && _patch == "true") {
...@@ -533,11 +547,13 @@ public: ...@@ -533,11 +547,13 @@ public:
ret = getConfigForDevice(_sn); ret = getConfigForDevice(_sn);
if(ret["code"]!= 0) { if(ret["code"]!= 0) {
// //
}else{ }
ret["data"].merge_patch(cfg); else {
ret["data"].merge_patch(cfg);
} }
} }
}else{ }
else {
ret = this->config(cfg); ret = this->config(cfg);
} }
} }
...@@ -599,12 +615,13 @@ public: ...@@ -599,12 +615,13 @@ public:
exit(1); exit(1);
} }
// setup edge msg processor // setup edge msg processor
thMsgProcessor = thread([this](){ thMsgProcessor = thread([this]() {
while(true){ while(true) {
auto v = zmqhelper::z_recv_multiple(this->pRouter); auto v = zmqhelper::z_recv_multiple(this->pRouter);
if(v.size() == 0) { if(v.size() == 0) {
spdlog::error("evdaemon {} failed to receive msg {}", this->devSn, zmq_strerror(zmq_errno())); spdlog::error("evdaemon {} failed to receive msg {}", this->devSn, zmq_strerror(zmq_errno()));
}else{ }
else {
handleMsg(v); handleMsg(v);
} }
} }
......
...@@ -104,7 +104,8 @@ private: ...@@ -104,7 +104,8 @@ private:
this->peerData["config"][peerId] = v; this->peerData["config"][peerId] = v;
if(this->peerData["status"].count(peerId) == 0) { if(this->peerData["status"].count(peerId) == 0) {
this->peerData["status"][peerId] = -1; // unkown this->peerData["status"][peerId] = -1; // unkown
}else{ }
else {
// nop // nop
} }
this->peerData["enabled"][peerId] = 1; this->peerData["enabled"][peerId] = 1;
...@@ -119,8 +120,8 @@ private: ...@@ -119,8 +120,8 @@ private:
for(auto &m : ml) { for(auto &m : ml) {
if(m["sn"] != this->devSn) { if(m["sn"] != this->devSn) {
continue; continue;
} }
string peerName; string peerName;
ret = cfgutils::getPeerId(mn, m, peerId, peerName); ret = cfgutils::getPeerId(mn, m, peerId, peerName);
if(ret != 0) { if(ret != 0) {
...@@ -130,14 +131,16 @@ private: ...@@ -130,14 +131,16 @@ private:
if(m.count("enabled") == 0 || m["enabled"] == 0) { if(m.count("enabled") == 0 || m["enabled"] == 0) {
spdlog::warn("evdaemon {} {} was disabled", this->devSn, mn); spdlog::warn("evdaemon {} {} was disabled", this->devSn, mn);
this->peerData["enabled"][peerId] = 0; this->peerData["enabled"][peerId] = 0;
}else{ }
else {
this->peerData["enabled"][peerId] = 1; this->peerData["enabled"][peerId] = 1;
} }
this->peerData["config"][peerId] = v; this->peerData["config"][peerId] = v;
if(this->peerData["status"].count(peerId) == 0) { if(this->peerData["status"].count(peerId) == 0) {
this->peerData["status"][peerId] = -1; // unkown this->peerData["status"][peerId] = -1; // unkown
}else{ }
else {
// nop // nop
} }
...@@ -186,18 +189,20 @@ private: ...@@ -186,18 +189,20 @@ private:
int cnt = 0; int cnt = 0;
for(auto &[k,v]: this->peerData["config"].items()) { for(auto &[k,v]: this->peerData["config"].items()) {
if(this->peerData["enabled"].count(k) != 0 && this->peerData["enabled"][k] != 0) { if(this->peerData["enabled"].count(k) != 0 && this->peerData["enabled"][k] != 0) {
if((this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0)){ if((this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0)) {
tmp.push_back(k); tmp.push_back(k);
info += (cnt == 0? "" : string(", ")) + k; info += (cnt == 0? "" : string(", ")) + k;
}else if(this->peerData["status"][k] == -1) { }
else if(this->peerData["status"][k] == -1) {
unkown[k] = -1; unkown[k] = -1;
} }
}else{ }
terms.push_back(k); else {
terms.push_back(k);
} }
cnt++; cnt++;
} }
spdlog::info("evdaemon {} will start following subsystems: {}", devSn, info); spdlog::info("evdaemon {} will start following subsystems: {}", devSn, info);
// //
for(string &e : tmp) { for(string &e : tmp) {
...@@ -276,7 +281,7 @@ private: ...@@ -276,7 +281,7 @@ private:
string meta = j.dump(); string meta = j.dump();
vector<vector<uint8_t> > v = {str2body(selfId), str2body(this->daemonId), str2body(meta), str2body(cfg)}; vector<vector<uint8_t> > v = {str2body(selfId), str2body(this->daemonId), str2body(meta), str2body(cfg)};
z_send_multiple(pRouter, v); z_send_multiple(pRouter, v);
spdlog::info("evdaemon {} peer {} config sent: {}", devSn ,selfId, cfg); spdlog::info("evdaemon {} peer {} config sent: {}", devSn,selfId, cfg);
} }
else { else {
peerData["status"][selfId] = 0; peerData["status"][selfId] = 0;
...@@ -416,7 +421,8 @@ private: ...@@ -416,7 +421,8 @@ private:
if(meta == EV_MSG_META_CONFIG) { if(meta == EV_MSG_META_CONFIG) {
if(data.size() == 0) { if(data.size() == 0) {
spdlog::error("evdaemon {} received invalid empty config", devSn); spdlog::error("evdaemon {} received invalid empty config", devSn);
} else { }
else {
this->deltaCfg = json::diff(this->config, data); this->deltaCfg = json::diff(this->config, data);
spdlog::info("evdaemon {} received cloud config diff: {}\nnew: {}", devSn, this->deltaCfg.dump(4), data.dump()); spdlog::info("evdaemon {} received cloud config diff: {}\nnew: {}", devSn, this->deltaCfg.dump(4), data.dump());
if(this->deltaCfg.size() != 0 || this->bColdStart) { if(this->deltaCfg.size() != 0 || this->bColdStart) {
...@@ -440,9 +446,10 @@ private: ...@@ -440,9 +446,10 @@ private:
if(bBootstrap) { if(bBootstrap) {
// TODO: wait for previous started modules to connecting // TODO: wait for previous started modules to connecting
startSubSystems(); startSubSystems();
}else{ }
else {
spdlog::info("evdaemon {} skip startup subsystems since BOOTSTRAP is set to false", devSn); spdlog::info("evdaemon {} skip startup subsystems since BOOTSTRAP is set to false", devSn);
} }
} }
} }
} }
...@@ -539,7 +546,7 @@ public: ...@@ -539,7 +546,7 @@ public:
EvDaemon() EvDaemon()
{ {
/// peerId -> value /// peerId -> value
peerData["status"] = json(); peerData["status"] = json();
peerData["pids"] = json(); peerData["pids"] = json();
peerData["config"] = json(); peerData["config"] = json();
......
...@@ -83,9 +83,10 @@ private: ...@@ -83,9 +83,10 @@ private:
} }
spdlog::info("evmgr {} bind success to {}", devSn, addr); spdlog::info("evmgr {} bind success to {}", devSn, addr);
inited = true; inited = true;
error_exit: error_exit:
if(inited) { if(inited) {
}else{ }
else {
exit(1); exit(1);
} }
} }
...@@ -93,7 +94,7 @@ private: ...@@ -93,7 +94,7 @@ private:
spdlog::error("evmgr {} exception on init() for: {}. abort booting up.", devSn, e.what()); spdlog::error("evmgr {} exception on init() for: {}. abort booting up.", devSn, e.what());
exit(1); exit(1);
} }
spdlog::info("evmgr {} successfuly inited", devSn); spdlog::info("evmgr {} successfuly inited", devSn);
} }
...@@ -285,7 +286,8 @@ public: ...@@ -285,7 +286,8 @@ public:
} }
devSn = v[0]; devSn = v[0];
}else{ }
else {
spdlog::error("evmgr failed to start. no SN set"); spdlog::error("evmgr failed to start. no SN set");
exit(1); exit(1);
} }
...@@ -301,9 +303,9 @@ public: ...@@ -301,9 +303,9 @@ public:
ret = zmqhelper::recvConfigMsg(pDealer, config, addr, ident); ret = zmqhelper::recvConfigMsg(pDealer, config, addr, ident);
if(ret != 0) { if(ret != 0) {
spdlog::error("evmgr {} failed to receive configration message {}", devSn , addr); spdlog::error("evmgr {} failed to receive configration message {}", devSn, addr);
} }
init(); init();
} }
~EvMgr() ~EvMgr()
......
...@@ -70,7 +70,7 @@ private: ...@@ -70,7 +70,7 @@ private:
AVFormatContext *pAVFormatInput = nullptr; AVFormatContext *pAVFormatInput = nullptr;
AVCodecContext *pCodecCtx = nullptr; AVCodecContext *pCodecCtx = nullptr;
AVDictionary *pOptsRemux = nullptr; AVDictionary *pOptsRemux = nullptr;
DetectParam detPara = {25,500,-1,10,3,30, 2}; DetectParam detPara = {25, 500, -1, 10, 3, 30, 0.3};
EventState evtState = EventState::NONE; EventState evtState = EventState::NONE;
chrono::system_clock::time_point evtStartTm, evtStartTmLast; chrono::system_clock::time_point evtStartTm, evtStartTmLast;
queue<string> *evtQueue; queue<string> *evtQueue;
...@@ -81,7 +81,7 @@ private: ...@@ -81,7 +81,7 @@ private:
thread thEvent; thread thEvent;
string drport = "5549"; string drport = "5549";
// //
int init() int init()
{ {
int ret = 0; int ret = 0;
...@@ -577,13 +577,14 @@ protected: ...@@ -577,13 +577,14 @@ protected:
string evt = this->evtQueue->front(); string evt = this->evtQueue->front();
json jevt = json::parse(evt); json jevt = json::parse(evt);
this->evtQueue->pop(); this->evtQueue->pop();
if(jevt["event"] == EV_MSG_EVENT_MOTION_START){ if(jevt["event"] == EV_MSG_EVENT_MOTION_START) {
eventToSlicer["type"] = "event"; eventToSlicer["type"] = "event";
eventToSlicer["start"] = jevt["ts"]; eventToSlicer["start"] = jevt["ts"];
eventToSlicer["extraInfo"] = json(); //array eventToSlicer["extraInfo"] = json(); //array
eventToSlicer["extraInfo"].push_back(jevt); eventToSlicer["extraInfo"].push_back(jevt);
// TODO: save and load saved evt on crash // TODO: save and load saved evt on crash
}else if(jevt["event"] == EV_MSG_EVENT_MOTION_END){ }
else if(jevt["event"] == EV_MSG_EVENT_MOTION_END) {
eventToSlicer["end"] = jevt["ts"]; eventToSlicer["end"] = jevt["ts"];
eventToSlicer["extraInfo"].push_back(jevt); eventToSlicer["extraInfo"].push_back(jevt);
v[2] = str2body(eventToSlicer.dump()); v[2] = str2body(eventToSlicer.dump());
...@@ -595,7 +596,8 @@ protected: ...@@ -595,7 +596,8 @@ protected:
spdlog::info("evmlmotion {} sent event to {}: {}", this->selfId, this->slicerGid, eventToSlicer.dump()); spdlog::info("evmlmotion {} sent event to {}: {}", this->selfId, this->slicerGid, eventToSlicer.dump());
} }
eventToSlicer.clear(); eventToSlicer.clear();
}else{ }
else {
spdlog::error("evmlmotion {} unknown event to {}: {}", this->selfId, this->slicerGid, eventToSlicer.dump()); spdlog::error("evmlmotion {} unknown event to {}: {}", this->selfId, this->slicerGid, eventToSlicer.dump());
} }
......
...@@ -55,7 +55,8 @@ private: ...@@ -55,7 +55,8 @@ private:
ret = z_send_multiple(pDealer, rep); ret = z_send_multiple(pDealer, rep);
if(ret < 0) { if(ret < 0) {
spdlog::error("evpuller {} failed to send avformatctx data to requester {}: {}", selfId, peerId, zmq_strerror(zmq_errno())); spdlog::error("evpuller {} failed to send avformatctx data to requester {}: {}", selfId, peerId, zmq_strerror(zmq_errno()));
}else{ }
else {
spdlog::info("evpuller {} success to send avformatctx data to requester {}", selfId, peerId); spdlog::info("evpuller {} success to send avformatctx data to requester {}", selfId, peerId);
} }
} }
...@@ -103,9 +104,10 @@ public: ...@@ -103,9 +104,10 @@ public:
RepSrv(RepSrv&&) = delete; RepSrv(RepSrv&&) = delete;
RepSrv(string mgrSn, string devSn, int iid, const char* formatBytes, RepSrv(string mgrSn, string devSn, int iid, const char* formatBytes,
int len, void *pDealer):mgrSn(mgrSn),devSn(devSn), iid(iid), bytes(formatBytes), int len, void *pDealer):mgrSn(mgrSn),devSn(devSn), iid(iid), bytes(formatBytes),
len(len), pDealer(pDealer) { len(len), pDealer(pDealer)
selfId = devSn+":evpuller:" + to_string(iid); {
}; selfId = devSn+":evpuller:" + to_string(iid);
};
~RepSrv() {}; ~RepSrv() {};
}; };
...@@ -126,13 +128,14 @@ private: ...@@ -126,13 +128,14 @@ private:
string drport = "5549"; string drport = "5549";
bool isIpStr(string ip) { bool isIpStr(string ip)
{
int cnt = 3*4 + 3; int cnt = 3*4 + 3;
if(ip.size() == 0 || ip.size() > cnt) { if(ip.size() == 0 || ip.size() > cnt) {
return false; return false;
} }
auto v = strutils::split(ip, '.'); auto v = strutils::split(ip, '.');
if(v.size() == 0 || v.size () != 4){ if(v.size() == 0 || v.size () != 4) {
return false; return false;
} }
...@@ -154,7 +157,7 @@ private: ...@@ -154,7 +157,7 @@ private:
int init() int init()
{ {
bool inited = false; bool inited = false;
int ret = 0; int ret = 0;
bool found = false; bool found = false;
string user, passwd, addr; string user, passwd, addr;
try { try {
...@@ -180,7 +183,7 @@ private: ...@@ -180,7 +183,7 @@ private:
if(ipc.size()!=0 && evpuller.size()!=0) { if(ipc.size()!=0 && evpuller.size()!=0) {
found = true; found = true;
} }
if(!found) { if(!found) {
spdlog::error("evpuller {} no valid config found", devSn); spdlog::error("evpuller {} no valid config found", devSn);
...@@ -194,27 +197,29 @@ private: ...@@ -194,27 +197,29 @@ private:
// default stream port // default stream port
if(ipc.count("port") == 0) { if(ipc.count("port") == 0) {
ipcPort = "554"; ipcPort = "554";
}else{ }
else {
ipcPort = to_string(ipc["port"]); ipcPort = to_string(ipc["port"]);
} }
string ipcAddr = ipc["addr"].get<string>(); string ipcAddr = ipc["addr"].get<string>();
if(isIpStr(ipcAddr)){ if(isIpStr(ipcAddr)) {
string chan = "ch1"; string chan = "ch1";
string streamName = "main"; string streamName = "main";
if(ipc.count("channel") != 0 && !ipc["channel"].get<string>().empty()){ if(ipc.count("channel") != 0 && !ipc["channel"].get<string>().empty()) {
chan = ipc["channel"].get<string>(); chan = ipc["channel"].get<string>();
} }
if(ipc.count("streamName") != 0 && !ipc["streamName"].get<string>().empty()){ if(ipc.count("streamName") != 0 && !ipc["streamName"].get<string>().empty()) {
streamName = ipc["streamName"].get<string>(); streamName = ipc["streamName"].get<string>();
} }
if(ipc.count("proto") != 0 && !ipc["proto"].get<string>().empty()){ if(ipc.count("proto") != 0 && !ipc["proto"].get<string>().empty()) {
proto = ipc["proto"]; proto = ipc["proto"];
} }
urlIn = proto + "://" + user + ":" + passwd + "@" + ipc["addr"].get<string>() + ":" + ipcPort + "/h264/" + chan + "/" + streamName + "/av_stream"; urlIn = proto + "://" + user + ":" + passwd + "@" + ipc["addr"].get<string>() + ":" + ipcPort + "/h264/" + chan + "/" + streamName + "/av_stream";
}else{ }
else {
urlIn = ipcAddr; urlIn = ipcAddr;
} }
...@@ -249,7 +254,7 @@ private: ...@@ -249,7 +254,7 @@ private:
exit(1); exit(1);
} }
ret = zmq_connect(pDealer, urlDealer.c_str()); ret = zmq_connect(pDealer, urlDealer.c_str());
if(ret < 0) { if(ret < 0) {
spdlog::error("evpuller {} failed to connect to router {}", selfId, urlDealer); spdlog::error("evpuller {} failed to connect to router {}", selfId, urlDealer);
exit(1); exit(1);
} }
...@@ -277,10 +282,11 @@ protected: ...@@ -277,10 +282,11 @@ protected:
string proto = urlIn.substr(0,4); string proto = urlIn.substr(0,4);
if(proto == "rtsp") { if(proto == "rtsp") {
av_dict_set(&optsIn, "rtsp_transport", "tcp", 0); av_dict_set(&optsIn, "rtsp_transport", "tcp", 0);
}else{ }
else {
// //
} }
spdlog::info("evpuller {} openning stream: {}", selfId, urlIn); spdlog::info("evpuller {} openning stream: {}", selfId, urlIn);
if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, &optsIn)) < 0) { if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, &optsIn)) < 0) {
spdlog::error("evpuller {} Could not open input stream {}", selfId, urlIn); spdlog::error("evpuller {} Could not open input stream {}", selfId, urlIn);
...@@ -396,7 +402,8 @@ public: ...@@ -396,7 +402,8 @@ public:
} }
devSn = v[0]; devSn = v[0];
iid = stoi(v[2]); iid = stoi(v[2]);
}else{ }
else {
spdlog::error("evpuller {} failed to start. no SN set", selfId); spdlog::error("evpuller {} failed to start. no SN set", selfId);
exit(1); exit(1);
} }
...@@ -411,7 +418,7 @@ public: ...@@ -411,7 +418,7 @@ public:
ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId); ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId);
if(ret != 0) { if(ret != 0) {
spdlog::error("evpuller {} failed to receive configration message {}", selfId , addr); spdlog::error("evpuller {} failed to receive configration message {}", selfId, addr);
} }
init(); init();
} }
......
...@@ -93,13 +93,13 @@ private: ...@@ -93,13 +93,13 @@ private:
spdlog::info("evpusher {} connect to {} for sub, {} for router", selfId, urlPub, urlDealer); spdlog::info("evpusher {} connect to {} for sub, {} for router", selfId, urlPub, urlDealer);
// TODO: multiple protocols support // TODO: multiple protocols support
urlOut = evpusher["urlDest"].get<string>(); urlOut = evpusher["urlDest"].get<string>();
// setup sub // setup sub
pSubCtx = zmq_ctx_new(); pSubCtx = zmq_ctx_new();
pSub = zmq_socket(pSubCtx, ZMQ_SUB); pSub = zmq_socket(pSubCtx, ZMQ_SUB);
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0); ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
if(ret != 0) { if(ret != 0) {
spdlog::error("evpusher {} {} failed set setsockopt: {}", devSn, iid, urlPub); spdlog::error("evpusher {} {} failed set setsockopt: {}", devSn, iid, urlPub);
} }
ret = zmq_connect(pSub, urlPub.c_str()); ret = zmq_connect(pSub, urlPub.c_str());
if(ret != 0) { if(ret != 0) {
...@@ -223,12 +223,14 @@ private: ...@@ -223,12 +223,14 @@ private:
ret = AVERROR_UNKNOWN; ret = AVERROR_UNKNOWN;
} }
ret = avformat_alloc_output_context2(&pAVFormatRemux, nullptr, "rtsp", urlOut.c_str()); ret = avformat_alloc_output_context2(&pAVFormatRemux, nullptr, "rtsp", urlOut.c_str());
}else if(proto == "rtmp"){ }
else if(proto == "rtmp") {
ret = avformat_alloc_output_context2(&pAVFormatRemux, nullptr, "rtmp", urlOut.c_str()); ret = avformat_alloc_output_context2(&pAVFormatRemux, nullptr, "rtmp", urlOut.c_str());
}else{ }
else {
ret = avformat_alloc_output_context2(&pAVFormatRemux, nullptr, nullptr, urlOut.c_str()); ret = avformat_alloc_output_context2(&pAVFormatRemux, nullptr, nullptr, urlOut.c_str());
} }
if (ret < 0) { if (ret < 0) {
spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret)); spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret));
exit(1); exit(1);
...@@ -406,7 +408,8 @@ public: ...@@ -406,7 +408,8 @@ public:
} }
devSn = v[0]; devSn = v[0];
iid = stoi(v[2]); iid = stoi(v[2]);
}else{ }
else {
spdlog::error("evpusher failed to start. no SN set"); spdlog::error("evpusher failed to start. no SN set");
exit(1); exit(1);
} }
...@@ -421,7 +424,7 @@ public: ...@@ -421,7 +424,7 @@ public:
ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId); ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId);
if(ret != 0) { if(ret != 0) {
spdlog::error("evpusher {} failed to receive configration message {}", devSn , addr); spdlog::error("evpusher {} failed to receive configration message {}", devSn, addr);
} }
init(); init();
......
...@@ -112,7 +112,8 @@ private: ...@@ -112,7 +112,8 @@ private:
eventQueue.pop(); eventQueue.pop();
} }
cvEvent.notify_one(); cvEvent.notify_one();
}else{ }
else {
spdlog::error("evslicer {} msg not supported from {}: {}", selfId, peerId, msg); spdlog::error("evslicer {} msg not supported from {}: {}", selfId, peerId, msg);
} }
} }
...@@ -597,7 +598,7 @@ protected: ...@@ -597,7 +598,7 @@ protected:
// remove // remove
fs::path fname(this->urlOut + "/" + videoFileTs2Name(i) + ".mp4"); fs::path fname(this->urlOut + "/" + videoFileTs2Name(i) + ".mp4");
fs::remove(fname); fs::remove(fname);
// TODO: currently we don't cache event videos. lost on reboot. // TODO: currently we don't cache event videos. lost on reboot.
// TODO: this behavior will be enhenced later. // TODO: this behavior will be enhenced later.
continue; continue;
...@@ -628,7 +629,7 @@ protected: ...@@ -628,7 +629,7 @@ protected:
for(auto &i : evts) { for(auto &i : evts) {
string fullPath = i.get_path(); string fullPath = i.get_path();
size_t pos = fullPath.find(ext, 0); size_t pos = fullPath.find(ext, 0);
if(fullPath.size() < ext.size() || pos == string::npos || pos != (fullPath.size() - ext.size())){ if(fullPath.size() < ext.size() || pos == string::npos || pos != (fullPath.size() - ext.size())) {
spdlog::debug("evslicer {} invalid file : {}", self->selfId, fullPath); spdlog::debug("evslicer {} invalid file : {}", self->selfId, fullPath);
continue; continue;
} }
...@@ -681,18 +682,21 @@ protected: ...@@ -681,18 +682,21 @@ protected:
{ {
if(seg >= numSlices) { if(seg >= numSlices) {
seg -= numSlices; seg -= numSlices;
}else if(seg <=-1) { }
else if(seg <=-1) {
seg = numSlices + seg; seg = numSlices + seg;
} }
return seg; return seg;
} }
int incSegHead(int seg) { int incSegHead(int seg)
{
return segToIdx(++seg); return segToIdx(++seg);
} }
int decSegHead(int seg) { int decSegHead(int seg)
{
return segToIdx(--seg); return segToIdx(--seg);
} }
...@@ -704,11 +708,12 @@ protected: ...@@ -704,11 +708,12 @@ protected:
int _itss = 0; int _itss = 0;
if(bSegFull) { if(bSegFull) {
_itss = segHead; _itss = segHead;
}else{ }
else {
_itss = 1; _itss = 1;
} }
if(segHead == -1){ if(segHead == -1) {
spdlog::error("evslicer {} no local records."); spdlog::error("evslicer {} no local records.");
return ret; return ret;
} }
...@@ -756,7 +761,8 @@ protected: ...@@ -756,7 +761,8 @@ protected:
return ret; return ret;
} }
void printSlices(){ void printSlices()
{
for(int i = 0; i < numSlices; i++) { for(int i = 0; i < numSlices; i++) {
spdlog::info("evslicer {} vector[{}] = {}, {}", selfId, i, vTsActive[i], videoFileTs2Name(vTsActive[i])); spdlog::info("evslicer {} vector[{}] = {}, {}", selfId, i, vTsActive[i], videoFileTs2Name(vTsActive[i]));
if(vTsActive[i] == 0) { if(vTsActive[i] == 0) {
...@@ -842,16 +848,16 @@ public: ...@@ -842,16 +848,16 @@ public:
string evt; string evt;
unique_lock<mutex> lk(this->mutEvent); unique_lock<mutex> lk(this->mutEvent);
this->cvEvent.wait(lk, [this] {return !(this->eventQueue.empty());}); this->cvEvent.wait(lk, [this] {return !(this->eventQueue.empty());});
if(!this->eventQueue.empty()) { if(!this->eventQueue.empty()) {
evt = this->eventQueue.front(); evt = this->eventQueue.front();
this->eventQueue.pop(); this->eventQueue.pop();
} }
if(evt.empty()){ if(evt.empty()) {
continue; continue;
} }
json jEvt = json::parse(evt); json jEvt = json::parse(evt);
if(jEvt["type"] == "event") { if(jEvt["type"] == "event") {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论