提交 18e50a85 authored 作者: blu's avatar blu

big refacting of communitation architect

上级 aeb3b869
...@@ -4,3 +4,6 @@ ...@@ -4,3 +4,6 @@
[submodule "opencv-motion-detect/vendor/leveldb"] [submodule "opencv-motion-detect/vendor/leveldb"]
path = opencv-motion-detect/vendor/leveldb path = opencv-motion-detect/vendor/leveldb
url = https://github.com/google/leveldb url = https://github.com/google/leveldb
[submodule "opencv-motion-detect/vendor/curlpp"]
path = opencv-motion-detect/vendor/curlpp
url = https://github.com/jpbarrette/curlpp
...@@ -400,7 +400,7 @@ private: ...@@ -400,7 +400,7 @@ private:
spdlog::info("evdaemon {} received cloud config diff: {}\nnew: {}", devSn, this->deltaCfg.dump(4), data.dump()); spdlog::info("evdaemon {} received cloud config diff: {}\nnew: {}", devSn, this->deltaCfg.dump(4), data.dump());
if(this->deltaCfg.size() != 0 || this->bColdStart) { if(this->deltaCfg.size() != 0 || this->bColdStart) {
this->config = data; this->config = data;
spdlog::info("evadmon {} reloading config from cloud", devSn); spdlog::info("evdaemon {} reloading config from cloud", devSn);
ret = reloadCfg(); ret = reloadCfg();
if(ret != 0) { if(ret != 0) {
spdlog::error("evdameon {} failed to parse new config: {}", devSn, data.dump()); spdlog::error("evdameon {} failed to parse new config: {}", devSn, data.dump());
......
...@@ -56,13 +56,12 @@ private: ...@@ -56,13 +56,12 @@ private:
int ret = 0; int ret = 0;
json jret; json jret;
bool inited = false; bool inited = false;
spdlog::info("evmgr boot {}", devSn);
int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT; int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT;
string addr; string addr;
try { try {
// //
spdlog::info("evmgr {} starting with configuration:{}", devSn, config.dump()); spdlog::info("evmgr boot configuration: {} -> {}", devSn, config.dump());
if(config["proto"] != "zmq") { if(config["proto"] != "zmq") {
spdlog::warn("evmgr {} unsupported protocol: {}, try fallback to zmq instead now...", devSn, config["proto"].get<string>()); spdlog::warn("evmgr {} unsupported protocol: {}, try fallback to zmq instead now...", devSn, config["proto"].get<string>());
......
...@@ -85,15 +85,14 @@ private: ...@@ -85,15 +85,14 @@ private:
thread thEvent; thread thEvent;
string drport = "5549"; string drport = "5549";
// //
int init() int init()
{ {
int ret = 0; int ret = 0;
spdlog::info("evmlmotion boot {}", selfId);
// TODO: req config // TODO: req config
bool found = false; bool found = false;
try { try {
spdlog::info("evmlmotion {} config: {}", devSn, config.dump()); spdlog::info("evmlmotion boot config: {} -> {}", selfId, config.dump());
json evmlmotion; json evmlmotion;
json &evmgr = this->config; json &evmgr = this->config;
json ipc; json ipc;
...@@ -623,7 +622,7 @@ protected: ...@@ -623,7 +622,7 @@ protected:
} }
zmq_msg_close(&msg); zmq_msg_close(&msg);
if(pktCnt % EV_LOG_PACKET_CNT == 0) { if(pktCnt % EV_LOG_PACKET_CNT == 0) {
spdlog::info("seq: {}, pts: {}, dts: {}, idx: {}", pktCnt, packet.pts, packet.dts, packet.stream_index); spdlog::info("evmlmotion {} seq: {}, pts: {}, dts: {}, idx: {}", selfId, pktCnt, packet.pts, packet.dts, packet.stream_index);
} }
pktCnt++; pktCnt++;
...@@ -712,7 +711,7 @@ public: ...@@ -712,7 +711,7 @@ public:
int main(int argc, const char *argv[]) int main(int argc, const char *argv[])
{ {
spdlog::set_level(spdlog::level::debug); spdlog::set_level(spdlog::level::info);
av_log_set_level(AV_LOG_ERROR); av_log_set_level(AV_LOG_ERROR);
queue<string> evtQueue; queue<string> evtQueue;
EvMLMotion es(&evtQueue); EvMLMotion es(&evtQueue);
...@@ -736,7 +735,7 @@ int main(int argc, const char *argv[]) ...@@ -736,7 +735,7 @@ int main(int argc, const char *argv[])
while(true) { while(true) {
if(evtQueue.size() > 0) { if(evtQueue.size() > 0) {
string p = evtQueue.front(); string p = evtQueue.front();
spdlog::info("event: {}", p); spdlog::info("evmlmotion event: {}", p);
evtQueue.pop(); evtQueue.pop();
} }
else { else {
......
...@@ -144,14 +144,11 @@ private: ...@@ -144,14 +144,11 @@ private:
int init() int init()
{ {
bool inited = false; bool inited = false;
int ret = 0; int ret = 0;
spdlog::info("evpuller: {}", selfId);
bool found = false; bool found = false;
string user, passwd, addr; string user, passwd, addr;
try { try {
spdlog::info("config dump: {:s}", config.dump()); spdlog::info("evpuller boot config: {} -> {}", selfId, config.dump());
json evpuller; json evpuller;
json &evmgr = this->config; json &evmgr = this->config;
json ipc; json ipc;
...@@ -237,7 +234,7 @@ private: ...@@ -237,7 +234,7 @@ private:
} }
inited = true; inited = true;
spdlog::info("successfully load config"); spdlog::info("evpuller successfully load config");
return 0; return 0;
} }
...@@ -309,7 +306,7 @@ protected: ...@@ -309,7 +306,7 @@ protected:
ret = av_read_frame(pAVFormatInput, &packet); ret = av_read_frame(pAVFormatInput, &packet);
if (ret < 0) { if (ret < 0) {
spdlog::error("failed read packet: {}", av_err2str(ret)); spdlog::error("evpuller {} failed read packet: {}", selfId, av_err2str(ret));
break; break;
} }
in_stream = pAVFormatInput->streams[packet.stream_index]; in_stream = pAVFormatInput->streams[packet.stream_index];
...@@ -318,7 +315,7 @@ protected: ...@@ -318,7 +315,7 @@ protected:
continue; continue;
} }
if(pktCnt % EV_LOG_PACKET_CNT == 0) { if(pktCnt % EV_LOG_PACKET_CNT == 0) {
spdlog::info("pktCnt: {:d}", pktCnt); spdlog::info("evpuller {} pktCnt: {:d}", selfId, pktCnt);
} }
pktCnt++; pktCnt++;
...@@ -405,7 +402,7 @@ public: ...@@ -405,7 +402,7 @@ public:
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
av_log_set_level(AV_LOG_ERROR); av_log_set_level(AV_LOG_ERROR);
spdlog::set_level(spdlog::level::debug); spdlog::set_level(spdlog::level::info);
//DB::exec(NULL, NULL, NULL,NULL); //DB::exec(NULL, NULL, NULL,NULL);
auto evp = EvPuller(); auto evp = EvPuller();
evp.join(); evp.join();
......
...@@ -54,10 +54,9 @@ private: ...@@ -54,10 +54,9 @@ private:
int init() int init()
{ {
int ret = 0; int ret = 0;
spdlog::info("evpusher startup {}", selfId);
bool found = false; bool found = false;
try { try {
spdlog::info("config: {:s}", config.dump()); spdlog::info("evpusher boot config: {} -> {}", selfId, config.dump());
json evpusher; json evpusher;
json &evmgr = this->config; json &evmgr = this->config;
json ipc; json ipc;
...@@ -131,7 +130,7 @@ private: ...@@ -131,7 +130,7 @@ private:
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} failed to set config: {}", selfId, config.dump()); spdlog::error("evpusher {} failed to set config: {}", selfId, config.dump());
} }
spdlog::info("new config: {}", config.dump()); spdlog::info("evpusher new config: {}", config.dump());
ping(); ping();
} }
catch(exception &e) { catch(exception &e) {
...@@ -262,7 +261,7 @@ private: ...@@ -262,7 +261,7 @@ private:
} }
for(int i = 0; i < pAVFormatInput->nb_streams; i++ ) { for(int i = 0; i < pAVFormatInput->nb_streams; i++ ) {
spdlog::info("streamList[{:d}]: {:d}", i, streamList[i]); spdlog::info("evpusher {} streamList[{:d}]: {:d}", selfId, i, streamList[i]);
} }
av_dump_format(pAVFormatRemux, 0, urlOut.c_str(), 1); av_dump_format(pAVFormatRemux, 0, urlOut.c_str(), 1);
...@@ -341,7 +340,7 @@ protected: ...@@ -341,7 +340,7 @@ protected:
} }
if(pktCnt % EV_LOG_PACKET_CNT == 0) { if(pktCnt % EV_LOG_PACKET_CNT == 0) {
spdlog::info("seq: {}, pts: {}, dts: {}, idx: {}", pktCnt, packet.pts, packet.dts, packet.stream_index); spdlog::info("evpusher {} seq: {}, pts: {}, dts: {}, idx: {}", selfId, pktCnt, packet.pts, packet.dts, packet.stream_index);
} }
pktCnt++; pktCnt++;
......
...@@ -32,6 +32,7 @@ namespace fs = std::filesystem; ...@@ -32,6 +32,7 @@ namespace fs = std::filesystem;
#include "inc/tinythread.hpp" #include "inc/tinythread.hpp"
#include "inc/common.hpp" #include "inc/common.hpp"
#include "inc/database.h" #include "inc/database.h"
#include "postfile.h"
using namespace std; using namespace std;
using namespace zmqhelper; using namespace zmqhelper;
...@@ -62,13 +63,9 @@ private: ...@@ -62,13 +63,9 @@ private:
int init() int init()
{ {
int ret = 0; int ret = 0;
spdlog::info("evslicer boot {}", selfId);
// TODO: req config
bool found = false; bool found = false;
try { try {
spdlog::info("config: {:s}", config.dump()); spdlog::info("evslicer boot config: {} -> {}", selfId, config.dump());
json evslicer; json evslicer;
json &evmgr = this->config; json &evmgr = this->config;
json ipc; json ipc;
...@@ -130,7 +127,7 @@ private: ...@@ -130,7 +127,7 @@ private:
sliceIdxToName = new vector<int>(numSlices); sliceIdxToName = new vector<int>(numSlices);
// TODO: load db // TODO: load db
// DB::exec(NULL, "select id, ts, last from slices;", DB::get_slices, sliceIdxToName); // DB::exec(NULL, "select id, ts, last from slices;", DB::get_slices, sliceIdxToName);
spdlog::info("mkdir -p {}", urlOut); spdlog::info("evslicer mkdir -p {}", selfId, urlOut);
ret = system((string("mkdir -p ") + urlOut).c_str()); ret = system((string("mkdir -p ") + urlOut).c_str());
// if(ret == -1) { // if(ret == -1) {
// spdlog::error("failed to create {} dir", urlOut); // spdlog::error("failed to create {} dir", urlOut);
...@@ -280,7 +277,7 @@ private: ...@@ -280,7 +277,7 @@ private:
} }
for(int i = 0; i < pAVFormatInput->nb_streams; i++ ) { for(int i = 0; i < pAVFormatInput->nb_streams; i++ ) {
spdlog::info("streamList[{:d}]: {:d}", i, streamList[i]); spdlog::info("evslicer {} streamList[{:d}]: {:d}", selfId, i, streamList[i]);
} }
//av_dict_set(&pOptsRemux, "movflags", "frag_keyframe+empty_moov+default_base_moof", 0); //av_dict_set(&pOptsRemux, "movflags", "frag_keyframe+empty_moov+default_base_moof", 0);
...@@ -357,7 +354,7 @@ protected: ...@@ -357,7 +354,7 @@ protected:
// TODO: // TODO:
spdlog::info("writing new slice {}", name.c_str()); spdlog::info("evslicer {} writing new slice {}", selfId, name.c_str());
while(chrono::duration_cast<chrono::seconds>(end-start).count() < minutes * 60) { while(chrono::duration_cast<chrono::seconds>(end-start).count() < minutes * 60) {
if(checkStop() == true) { if(checkStop() == true) {
bStopSig = true; bStopSig = true;
...@@ -393,7 +390,7 @@ protected: ...@@ -393,7 +390,7 @@ protected:
//calc pts //calc pts
if(pktCnt % EV_LOG_PACKET_CNT == 0) { if(pktCnt % EV_LOG_PACKET_CNT == 0) {
spdlog::info("seq: {}, pts: {}, dts: {}, idx: {}", pktCnt, packet.pts, packet.dts, packet.stream_index); spdlog::info("evslicer {} seq: {}, pts: {}, dts: {}, idx: {}", selfId, pktCnt, packet.pts, packet.dts, packet.stream_index);
} }
/* copy packet */ /* copy packet */
if(pktCnt == 0) { if(pktCnt == 0) {
...@@ -504,7 +501,7 @@ public: ...@@ -504,7 +501,7 @@ public:
int main(int argc, const char *argv[]) int main(int argc, const char *argv[])
{ {
av_log_set_level(AV_LOG_ERROR); av_log_set_level(AV_LOG_ERROR);
spdlog::set_level(spdlog::level::debug); spdlog::set_level(spdlog::level::info);
EvSlicer es; EvSlicer es;
es.join(); es.join();
return 0; return 0;
......
...@@ -191,7 +191,7 @@ int encode(AVFormatContext *ctx, char **bytes) ...@@ -191,7 +191,7 @@ int encode(AVFormatContext *ctx, char **bytes)
wholeSize += strlen(PS_MARK_S); wholeSize += strlen(PS_MARK_S);
// num streams // num streams
wholeSize += sizeof(ctx->nb_streams); wholeSize += sizeof(ctx->nb_streams);
spdlog::info("encode num of streams: {:d}", ctx->nb_streams); spdlog::debug("encode num of streams: {:d}", ctx->nb_streams);
for (int i = 0; i < ctx->nb_streams; i++) for (int i = 0; i < ctx->nb_streams; i++)
{ {
wholeSize += sizeof(AVStream); wholeSize += sizeof(AVStream);
...@@ -246,7 +246,7 @@ int decode(char *bytes, int len, AVFormatContext *pCtx) ...@@ -246,7 +246,7 @@ int decode(char *bytes, int len, AVFormatContext *pCtx)
spdlog::error("invalid avformatctx: {} {}", ret, len); spdlog::error("invalid avformatctx: {} {}", ret, len);
return -1; return -1;
} }
spdlog::info("decode len: {}", ret); spdlog::debug("decode len: {}", ret);
got += strlen(PS_MARK_S); got += strlen(PS_MARK_S);
memcpy(&ret, bytes + got, sizeof(ret)); memcpy(&ret, bytes + got, sizeof(ret));
got += sizeof(ret); got += sizeof(ret);
......
Subproject commit 8810334c830faa3b38bcd94f5b1ab695a4f05eb9
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论