提交 6dd4d49b authored 作者: blu's avatar blu

bugfix: check 0 sized file

上级 ec900f1c
......@@ -100,7 +100,8 @@ private:
ret = z_send_multiple(pRouter, v);
if(ret <0) {
spdlog::error("evcloudsvc failed to send config to {}", sn);
}else{
}
else {
spdlog::info("evcloudsvc config sent to {}: {}", sn, cfg);
}
......@@ -482,7 +483,8 @@ private:
spdlog::error("evcloudsvc {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
}else{
}
else {
json resp;
resp["metaType"] = EV_MSG_META_PONG;
resp["target"] = selfId;
......@@ -561,12 +563,13 @@ private:
// eventToSlicer["end"]
// eventToSlicer["sender"] = selfId;
json sendEdgeMsg(json &body) {
json sendEdgeMsg(json &body)
{
json ret;
ret["code"] = 0;
ret["msg"] = "ok";
string msg;
try{
try {
auto target = body["target"].get<string>();
auto v = strutils::split(target, ':');
if(v.size() == 1 || v.size() == 3) {
......@@ -574,26 +577,30 @@ private:
meta["type"] = body["metaType"];
if(body.count("metaValue") == 0) {
// meta["value"] = "";
}else{
}
else {
meta["value"] = body["metaValue"];
}
body["sender"] = devSn;
if(peerData["status"].count(v[0]) == 0 || peerData["status"][v[0]] == 0){
if(peerData["status"].count(v[0]) == 0 || peerData["status"][v[0]] == 0) {
spdlog::warn("evcloudsvc sent msg {} to {}, but it was offline", body.dump(), v[0]);
}else{
}
else {
}
int i= z_send(pRouter, v[0], devSn, meta, body.dump());
if(i < 0) {
msg = fmt::format("evcloudsvc failed to z_zend msg: {} :{}",zmq_strerror(zmq_errno()) ,body.dump());
msg = fmt::format("evcloudsvc failed to z_zend msg: {} :{}",zmq_strerror(zmq_errno()),body.dump());
throw StrException(msg);
}
}else{
}
else {
msg = fmt::format("evcloudsvc invliad target field({}) in body: {}", target, body.dump());
throw StrException(msg);
}
}catch(exception &e) {
}
catch(exception &e) {
ret["msg"] = e.what();
spdlog::error(e.what());
ret["code"] = -1;
......@@ -602,7 +609,8 @@ private:
return ret;
}
json handleCmd(json &body){
json handleCmd(json &body)
{
json ret;
ret["code"] = -1;
ret["msg"] = "unkown msg";
......@@ -611,17 +619,20 @@ private:
body.count("data") != 0 && body["data"].is_object() && body.count("metaValue") !=0 && body["metaValue"].is_string()) {
// it's msg to edge.
return sendEdgeMsg(body);
}else{
}
else {
return ret;
}
}
json getReleaseBundle(string bid) {
json getReleaseBundle(string bid)
{
json ret;
return ret;
}
json addReleaseBundle(json &bundle) {
json addReleaseBundle(json &bundle)
{
json ret;
return ret;
}
......@@ -752,10 +763,11 @@ public:
string msg;
ret["code"] = 0;
ret["msg"] = "ok";
try{
try {
auto body = json::parse(req.body);
ret = this->handleCmd(body);
}catch(exception &e) {
}
catch(exception &e) {
ret["code"] = -1;
msg = fmt::format("evcloudsvc Post /cmd Exception: {}", e.what());
spdlog::error(msg);
......@@ -807,7 +819,8 @@ public:
}
if(this->configMap.count("mod2mgr") ==0 || this->configMap["mod2mgr"].size() ==0) {
}else{
}
else {
for(auto &k:mods) {
this->configMap["mod2mgr"].erase(k);
}
......@@ -827,11 +840,12 @@ public:
string msg;
ret["code"] = 0;
ret["msg"] = "ok";
try{
try {
string bundleId = req.get_param_value("bId");
auto body = json::parse(req.body);
ret = this->getReleaseBundle(bundleId);
}catch(exception &e) {
}
catch(exception &e) {
ret["code"] = -1;
msg = fmt::format("evcloudsvc Get /release Exception: {}", e.what());
spdlog::error(msg);
......@@ -846,10 +860,11 @@ public:
string msg;
ret["code"] = 0;
ret["msg"] = "ok";
try{
try {
auto body = json::parse(req.body);
ret = this->addReleaseBundle(body);
}catch(exception &e) {
}
catch(exception &e) {
ret["code"] = -1;
msg = fmt::format("evcloudsvc Post /release Exception: {}", e.what());
spdlog::error(msg);
......
......@@ -305,7 +305,8 @@ private:
// stop
this->peerData["status"][k] = 1; // disabled
sendCmd2Peer(k, EV_MSG_META_VALUE_CMD_STOP, "0");
}else if(v == 1) { // perm stop
}
else if(v == 1) { // perm stop
this->peerData["status"][k] = 2;
this->peerData["config"].erase(k);
sendCmd2Peer(k, EV_MSG_META_VALUE_CMD_STOP, "0");
......@@ -382,15 +383,18 @@ private:
spdlog::info("evdaemon {} peer connected: {}", devSn, selfId);
if(this->peerData["tsLastConn"].count(selfId) == 0) {
this->peerData["tsLastConn"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
}else{
}
else {
if(this->peerData["contConn"].count(selfId) == 0) {
this->peerData["contConn"][selfId] = 0;
}else{
}
else {
auto delta = this->peerData["contConn"][selfId].get<long>() - chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
if(delta < 3) { // within 3s
this->peerData["contConn"][selfId] = this->peerData["contConn"][selfId].get<int>() + 1;
}else{
}
else {
this->peerData["contConn"][selfId] = 0;
}
// refer to startSubsystems
......@@ -409,9 +413,10 @@ private:
spdlog::info("evdaemon {} peer {} config sent: {}", devSn,selfId, cfg);
}
else {
if(peerData["status"][selfId] == 1 || peerData["status"][selfId] == 2){
if(peerData["status"][selfId] == 1 || peerData["status"][selfId] == 2) {
spdlog::warn("evdaemon {} refuse to start {}: it was asked to be stopped, and is removed from cluster config", this->devSn, selfId);
}else{
}
else {
peerData["status"][selfId] = 0;
if(peerData["pids"].count(selfId) != 0) {
peerData["pids"].erase(selfId);
......@@ -420,7 +425,8 @@ private:
if(this->bBootstrap) {
spdlog::warn("evdaemon {} peer {} disconnected. restarting it.", devSn, selfId);
startSubSystems({selfId});
}else{
}
else {
spdlog::warn("evdaemon {} peer {} disconnected. won't restart it since BOOTSTRAP=false", devSn, selfId);
}
}
......@@ -584,7 +590,8 @@ private:
spdlog::info("evdaemon {} skip startup subsystems since BOOTSTRAP is set to false", devSn);
}
}
}else if(meta == EV_MSG_META_TYPE_CMD){
}
else if(meta == EV_MSG_META_TYPE_CMD) {
spdlog::info("evdaemon {} received cmd from cloud: {}", devSn, msg);
if(data.count("target") != 0 && data["target"].is_string() && data.count("metaType") !=0 && data["metaType"].is_string() &&
data.count("data") != 0 && data["data"].is_object() && data.count("metaValue") !=0 && data["metaValue"].is_string()) {
......@@ -592,38 +599,46 @@ private:
auto v = strutils::split(target, ':');
if(v.size() == 1) {
spdlog::info("evdaemon {} received msg {} from cloud to itself. TODO: functionality extending points such as debug tunnel", devSn, data.dump());
}else if(v.size() == 3){
}
else if(v.size() == 3) {
if(this->peerData["status"].count(target) == 0 || this->peerData["status"][target] == 0 || this->peerData["status"] == -1) {
spdlog::error("evdaemon {} received {} msg from cloud to {}: {}, but its offline", devSn, meta, target, data.dump());
}else{
}
else {
ret = sendCmd2Peer(target, data["metaValue"], data.dump());
if(ret < 0) {
spdlog::error("evdaemon {} failed to send msg to peer {}: {}", devSn, data.dump(), zmq_strerror(zmq_errno()));
}else{
}
else {
spdlog::info("evdaemon {} successfully relayed {} msg from cloud to {}: {}", devSn, meta, target, data.dump());
}
}
}else{
}
else {
spdlog::info("well");
}
}else{
}
else {
spdlog::info("done");
}
}else if(meta == EV_MSG_META_PONG) {
}
else if(meta == EV_MSG_META_PONG) {
string info;
if(data.count("data") != 0 ) {
if(data["data"].is_string()) {
info = fmt::format("evdaemon {} received pong msg from evcloudsvc: {}", devSn, data["data"].get<string>());
}else if (data["data"].is_object()) {
}
else if (data["data"].is_object()) {
info = fmt::format("evdaemon {} received pong msg from evcloudsvc: {}", devSn, data["data"].dump());
}
}else{
}
else {
info = fmt::format("evdaemon {} received pong msg from evcloudsvc.", devSn);
}
spdlog::info(info);
}
else{
else {
spdlog::info("evdaemon {} received msg from cloud that having no handler implemented: {}", devSn, msg);
}
}
......
......@@ -273,7 +273,8 @@ error_exit:
}
bProcessed = true;
} catch(exception &e) {
}
catch(exception &e) {
bProcessed = false;
spdlog::error("evmgr {} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what());
}
......@@ -300,7 +301,7 @@ error_exit:
bProcessed = true;
}
else {
try{
try {
json jmeta = json::parse(meta);
if(jmeta["type"] == EV_MSG_META_TYPE_BROADCAST) {
if(jmeta.count("value") != 0) {
......@@ -313,18 +314,21 @@ error_exit:
ret = z_send_multiple(pRouter, broadCastMsg);
if(ret < 0) {
spdlog::error("evmgr {} failed to broadcast msg from {} because {}. msg meta: {}", devSn, selfId, zmq_strerror(zmq_errno()), meta);
}else{
}
else {
spdlog::info("evmgr {} successfully broadcast msg from {} to {}. msg meta: {}", devSn, selfId, k, meta);
}
}
}
}
bProcessed = true;
}else if(jmeta["type"] == EV_MSG_META_AVFORMATCTX) {
}
else if(jmeta["type"] == EV_MSG_META_AVFORMATCTX) {
bProcessed = true;
// ignore;
}
}catch(exception &e) {
}
catch(exception &e) {
bProcessed = false;
spdlog::error("evmgr {} exception process msg from {} with meta {}: {}", devSn, selfId, meta, e.what());
}
......
......@@ -159,7 +159,8 @@ private:
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true;
cvMsg.notify_one();
}else{
}
else {
spdlog::warn("evmlmotion {} received avformatctx msg from {}, but already proceessed before, ignored. TODO: reinit", selfId, peerId);
}
bProcessed = true;
......@@ -223,14 +224,16 @@ private:
int portPub = 5556;
if(evpuller.count("portPub") != 0 && evpuller["portPub"].is_number_integer()) {
portPub = evpuller["portPub"];
}else if(evpuller.count("port-pub") != 0 && evpuller["port-pub"].is_number_integer()){
}
else if(evpuller.count("port-pub") != 0 && evpuller["port-pub"].is_number_integer()) {
portPub = evpuller["port-pub"];
}
int portRouter = 5550;
if(evmgr.count("portRouter") != 0 && evmgr["portRouter"].is_number_integer()) {
portRouter = evmgr["portRouter"];
}else if(evmgr.count("port-router") != 0 && evmgr["port-router"].is_number_integer()) {
}
else if(evmgr.count("port-router") != 0 && evmgr["port-router"].is_number_integer()) {
portRouter = evmgr["port-router"];
}
......
......@@ -91,14 +91,16 @@ private:
return ret;
}
void sendAVInputCtxMsg(string peerId){
void sendAVInputCtxMsg(string peerId)
{
json meta;
auto msgBody = data2body(const_cast<char*>(pAVFmtCtxBytes), lenAVFmtCtxBytes);
if(peerId.empty()){
if(peerId.empty()) {
meta["type"] = EV_MSG_META_TYPE_BROADCAST;
meta["value"] = EV_MSG_META_AVFORMATCTX;
peerId = this->mgrSn + ":evmgr:0";
}else{
}
else {
meta["type"] = EV_MSG_META_AVFORMATCTX;
}
......@@ -236,14 +238,16 @@ private:
int portPub = 5556;
if(evpuller.count("portPub") != 0 && evpuller["portPub"].is_number_integer()) {
portPub = evpuller["portPub"];
}else if(evpuller.count("port-pub") != 0 && evpuller["port-pub"].is_number_integer()){
}
else if(evpuller.count("port-pub") != 0 && evpuller["port-pub"].is_number_integer()) {
portPub = evpuller["port-pub"];
}
int portRouter = 5550;
if(evmgr.count("portRouter") != 0 && evmgr["portRouter"].is_number_integer()) {
portRouter = evmgr["portRouter"];
}else if(evmgr.count("port-router") != 0 && evmgr["port-router"].is_number_integer()) {
}
else if(evmgr.count("port-router") != 0 && evmgr["port-router"].is_number_integer()) {
portRouter = evmgr["port-router"];
}
......
......@@ -94,14 +94,16 @@ private:
int portPub = 5556;
if(evpuller.count("portPub") != 0 && evpuller["portPub"].is_number_integer()) {
portPub = evpuller["portPub"];
}else if(evpuller.count("port-pub") != 0 && evpuller["port-pub"].is_number_integer()){
}
else if(evpuller.count("port-pub") != 0 && evpuller["port-pub"].is_number_integer()) {
portPub = evpuller["port-pub"];
}
int portRouter = 5550;
if(evmgr.count("portRouter") != 0 && evmgr["portRouter"].is_number_integer()) {
portRouter = evmgr["portRouter"];
}else if(evmgr.count("port-router") != 0 && evmgr["port-router"].is_number_integer()) {
}
else if(evmgr.count("port-router") != 0 && evmgr["port-router"].is_number_integer()) {
portRouter = evmgr["port-router"];
}
......@@ -260,7 +262,8 @@ private:
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true;
cvMsg.notify_one();
}else{
}
else {
spdlog::warn("evpusher {} received avformatctx msg from {}, but already proceessed before, ignored. TODO: reinit", selfId, peerId);
}
bProcessed = true;
......
......@@ -98,7 +98,8 @@ private:
gotFormat = true;
cvMsg.notify_one();
spdlog::info("evslicer {} got avformat from {}", selfId, peerId);
}else{
}
else {
spdlog::warn("evslicer {} received avformatctx msg from {}, but already proceessed before, ignored. TODO: reinit", selfId, peerId);
}
}
......@@ -168,8 +169,9 @@ private:
spdlog::info("evslicer {} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
bProcessed = true;
exit(0);
}else if(metaValue == "debug:record") {
try{
}
else if(metaValue == "debug:record") {
try {
json body = json::parse(body2str(v[2]));
if(body.count("data") != 0 && body["data"].is_object() && body["data"].count("start") != 0 && body["data"]["start"].is_number() && body["data"].count("end") != 0 && body["data"]["end"].is_number()) {
json evt;
......@@ -183,23 +185,28 @@ private:
}
bProcessed = true;
}
}catch(exception &e) {
}
catch(exception &e) {
spdlog::error("evslicer {} exception in handleCloudMsg: {}", selfId, e.what());
}
}else if(metaValue == "debug:xxx"){
}
else if(metaValue == "debug:xxx") {
// TODO: remove debug feature
bProcessed = true;
}else if(metaValue == "debug:list_files"){
}
else if(metaValue == "debug:list_files") {
// TODO: remove debug feature
printVideoFiles(this->sTsList);
bProcessed = true;
}else if(metaValue == "debug:toggle_log") {
}
else if(metaValue == "debug:toggle_log") {
// TODO: remove debug feature
static bool toggle = false;
toggle = !toggle;
if(toggle){
if(toggle) {
spdlog::set_level(spdlog::level::debug);
}else{
}
else {
spdlog::set_level(spdlog::level::info);
}
......@@ -309,14 +316,16 @@ private:
int portPub = 5556;
if(evpuller.count("portPub") != 0 && evpuller["portPub"].is_number_integer()) {
portPub = evpuller["portPub"];
}else if(evpuller.count("port-pub") != 0 && evpuller["port-pub"].is_number_integer()){
}
else if(evpuller.count("port-pub") != 0 && evpuller["port-pub"].is_number_integer()) {
portPub = evpuller["port-pub"];
}
int portRouter = 5550;
if(evmgr.count("portRouter") != 0 && evmgr["portRouter"].is_number_integer()) {
portRouter = evmgr["portRouter"];
}else if(evmgr.count("port-router") != 0 && evmgr["port-router"].is_number_integer()) {
}
else if(evmgr.count("port-router") != 0 && evmgr["port-router"].is_number_integer()) {
portRouter = evmgr["port-router"];
}
......@@ -698,7 +707,8 @@ protected:
}
}
void insertTsList(set<long> &_list, long elem, int maxSize) {
void insertTsList(set<long> &_list, long elem, int maxSize)
{
// _list.insert(lower_bound(_list.begin(), _list.end(), elem), elem);
if(_list.size() == 0) {
_list.insert(_list.begin(),elem);
......@@ -708,14 +718,15 @@ protected:
auto itr = _list.rbegin();
for(; itr != _list.rend(); itr++) {
if(*itr < elem){
if(*itr < elem) {
break;
}
}
if(itr == _list.rbegin() ) {
_list.insert(_list.end(), elem);
}else{
}
else {
_list.insert(itr.base(), elem);
}
......@@ -755,7 +766,8 @@ protected:
auto ts = self->videoFileName2Ts(baseName);
if(ts == -1) {
spdlog::error("evslicer {} fileMonHandler failed to process file: {}", self->selfId, lastFile);
}else{
}
else {
self->insertTsList(self->sTsList, ts, self->numSlices);
}
}
......@@ -947,9 +959,10 @@ public:
if(tse < first) {
spdlog::info("evslicer {} thEventHandler event range ({}, {}) is not in range ({}, {}).", selfId, tss, tse, first, end);
continue;
}else if(first == 0||tse > end) {
}
else if(first == 0||tse > end) {
spdlog::info("evslicer {} thEventHandler event range ({}, {}) is not in range ({}, {}), resched to run in {}s.", selfId, tss, tse, first, end, this->seconds + 5);
auto th = thread([evt, this](){
auto th = thread([evt, this]() {
this_thread::sleep_for(chrono::seconds(this->seconds + 5));
lock_guard<mutex> lock(this->mutEvent);
this->eventQueue.push(evt);
......@@ -987,6 +1000,7 @@ public:
fileNames.push_back(fname);
sf+="\tfile\t" + fname + "\n";
}
if(hasError) {
continue;
}
......@@ -1010,18 +1024,19 @@ public:
postArgs["params"] = params;
postArgs["fileNames"] = fileNames;
string fname = dirDest + params["startTime"].get<string>() + "_" + params["endTime"].get<string>() + "evt.json";
try{
try {
ofstream ofs(fname);
ofs << postArgs;
for(auto &f:fileNames){
for(auto &f:fileNames) {
fs::copy(fs::path(string(f)),fs::path(dirDest));
}
}catch(exception &e) {
}
catch(exception &e) {
spdlog::error("evcloudsvc {} {}:{} exception: {}", selfId, __FILE__, __LINE__, e.what());
}
}else{
}
else {
spdlog::info("evslicer {} retrying upload", selfId);
jEvt["cnt"] = jEvt["cnt"].get<int>() - 1;
lock_guard<mutex> lock(this->mutEvent);
......@@ -1035,19 +1050,21 @@ public:
}
else {
spdlog::info("evslicer {} upload ({}, {}). local({}, {}). resp: {} files:\n{}", selfId, tss, tse, first, end, strResp, sf);
if(ret > 0){
try{
if(ret > 0) {
try {
auto resp = json::parse(strResp);
//TODO: open this swith when video server has implemented this functionality
if(true){
if(true) {
if(resp.count("code") != 0 && resp["code"] != 0) {
if(resp["code"] == 4|| resp["code"] == 7) {
if(jEvt.count("cnt") == 0) {
jEvt["cnt"] = 2;
}else{
}
else {
if(jEvt["cnt"].get<int>() <= 0) {
spdlog::error("evslicer {} failed to upload videos over N times, abort retrying: {}", this->selfId, evt);
}else{
}
else {
jEvt["cnt"] = jEvt["cnt"].get<int>() - 1;
lock_guard<mutex> lock(this->mutEvent);
this->eventQueue.push(jEvt.dump());
......@@ -1057,15 +1074,18 @@ public:
cvEvent.notify_one();
}
}
}else if(resp["code"] == 6) {
}
else if(resp["code"] == 6) {
// TODO: cloud storage issue. need stratigy policy
spdlog::warn("evslicer {} TODO: handle cloud storage", this->selfId);
}else{
}
else {
spdlog::error("evslicer {} failed to upload videos. abort retry.", this->selfId);
}
}
}
}catch(exception &e) {
}
catch(exception &e) {
spdlog::error("evslicer {} {}:{} exception: {}", this->selfId, __FILE__, __LINE__, e.what());
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论