提交 ba7dadb0 authored 作者: blu's avatar blu

big refacting of communitation architect

上级 7a8a0add
...@@ -38,7 +38,7 @@ CFLAGS = $(DEBUG) -Wall ...@@ -38,7 +38,7 @@ CFLAGS = $(DEBUG) -Wall
LIBOPENCV = `pkg-config opencv --cflags --libs` LIBOPENCV = `pkg-config opencv --cflags --libs`
LIBFFMPEG = `pkg-config libavformat libavutil libavcodec libswscale --cflags --libs` LIBFFMPEG = `pkg-config libavformat libavutil libavcodec libswscale --cflags --libs`
LIBS +=-Lvendor/lib -lpthread -lleveldb LIBS +=-Lvendor/lib -lpthread -lleveldb -lfswatch
#-static #-static
HEADERS=-Iinc -Ivendor/include HEADERS=-Iinc -Ivendor/include
...@@ -56,6 +56,8 @@ objs/database.o: database.cpp inc/database.h ...@@ -56,6 +56,8 @@ objs/database.o: database.cpp inc/database.h
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -c database.cpp -o objs/database.o $(HEADERS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -c database.cpp -o objs/database.o $(HEADERS)
objs/zmqhelper.o: inc/zmqhelper.cpp inc/zmqhelper.hpp objs/zmqhelper.o: inc/zmqhelper.cpp inc/zmqhelper.hpp
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -c inc/zmqhelper.cpp -o objs/zmqhelper.o $(HEADERS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -c inc/zmqhelper.cpp -o objs/zmqhelper.o $(HEADERS)
objs/dirmon.o: dirmon.cpp dirmon.h
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -c dirmon.cpp -o objs/dirmon.o $(HEADERS)
objs/utils.o: inc/utils.cpp inc/utils.hpp objs/utils.o: inc/utils.cpp inc/utils.hpp
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -c inc/utils.cpp -o objs/utils.o $(HEADERS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -c inc/utils.cpp -o objs/utils.o $(HEADERS)
...@@ -69,8 +71,8 @@ evpuller: evpuller.cpp database.cpp inc/av_common.hpp objs/utils.o inc/common.hp ...@@ -69,8 +71,8 @@ evpuller: evpuller.cpp database.cpp inc/av_common.hpp objs/utils.o inc/common.hp
evpusher: evpusher.cpp inc/common.hpp inc/av_common.hpp objs/utils.o inc/tinythread.hpp objs/database.o objs/zmqhelper.o $(SQLITE_SRC) evpusher: evpusher.cpp inc/common.hpp inc/av_common.hpp objs/utils.o inc/tinythread.hpp objs/database.o objs/zmqhelper.o $(SQLITE_SRC)
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evpusher evpusher.cpp objs/database.o objs/utils.o objs/zmqhelper.o $(SQLITE) $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evpusher evpusher.cpp objs/database.o objs/utils.o objs/zmqhelper.o $(SQLITE) $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evslicer: evslicer.cpp inc/common.hpp inc/av_common.hpp objs/utils.o inc/tinythread.hpp objs/database.o objs/zmqhelper.o $(SQLITE_SRC) evslicer: evslicer.cpp inc/common.hpp inc/av_common.hpp objs/utils.o objs/dirmon.o inc/tinythread.hpp objs/database.o objs/zmqhelper.o $(SQLITE_SRC)
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evslicer evslicer.cpp objs/database.o objs/utils.o objs/zmqhelper.o $(SQLITE) $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evslicer evslicer.cpp objs/database.o objs/dirmon.o objs/utils.o objs/zmqhelper.o $(SQLITE) $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evmlmotion: evmlmotion.cpp inc/common.hpp inc/av_common.hpp objs/utils.o inc/avcvhelpers.hpp objs/database.o objs/zmqhelper.o inc/tinythread.hpp $(SQLITE_SRC) evmlmotion: evmlmotion.cpp inc/common.hpp inc/av_common.hpp objs/utils.o inc/avcvhelpers.hpp objs/database.o objs/zmqhelper.o inc/tinythread.hpp $(SQLITE_SRC)
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evmlmotion evmlmotion.cpp objs/database.o objs/utils.o objs/zmqhelper.o $(SQLITE) $(LIBFFMPEG) $(HEADERS) $(LIBOPENCV) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evmlmotion evmlmotion.cpp objs/database.o objs/utils.o objs/zmqhelper.o $(SQLITE) $(LIBFFMPEG) $(HEADERS) $(LIBOPENCV) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
......
#include "dirmon.h" #include "dirmon.h"
using namespace std;
using namespace fsw;
void fileEventHandler(const std::vector<event>&evts, void *pUser)
{
for(auto&i:evts) {
spdlog::info("path: {}, time: {}", i.get_path(), i.get_time());
}
}
int CreateDirMon(monitor **m, string path, string ext, vector<string> &&events, FSW_EVENT_CALLBACK cb) int CreateDirMon(monitor **m, string path, string ext, vector<string> &&events, FSW_EVENT_CALLBACK cb)
{ {
...@@ -18,20 +8,20 @@ int CreateDirMon(monitor **m, string path, string ext, vector<string> &&events, ...@@ -18,20 +8,20 @@ int CreateDirMon(monitor **m, string path, string ext, vector<string> &&events,
*m = monitor_factory::create_monitor( *m = monitor_factory::create_monitor(
fsw_monitor_type::system_default_monitor_type, fsw_monitor_type::system_default_monitor_type,
paths, paths,
fileEventHandler); cb);
(*m)->set_latency(1.1); (*m)->set_latency(1.1);
(*m)->set_filters(flt); (*m)->set_filters(flt);
(*m)->start(); (*m)->start();
return 0; return 0;
} }
int CloseDirMon() int CloseDirMon(monitor *m)
{ {
m->stop();
return 0; return 0;
} }
#define DEBUG #undef DEBUG
#ifdef DEBUG #ifdef DEBUG
int main() int main()
......
...@@ -12,5 +12,10 @@ ...@@ -12,5 +12,10 @@
#include "libfswatch/c++/monitor_factory.hpp" #include "libfswatch/c++/monitor_factory.hpp"
#include "libfswatch/c++/libfswatch_exception.hpp" #include "libfswatch/c++/libfswatch_exception.hpp"
#include "inc/spdlog/spdlog.h" #include "inc/spdlog/spdlog.h"
using namespace std;
using namespace fsw;
int CreateDirMon(monitor **m, string path, string ext, vector<string> &&events, FSW_EVENT_CALLBACK cb);
int CloseDirMon(monitor*m);
#endif #endif
\ No newline at end of file
...@@ -28,6 +28,8 @@ update: 2019/09/10 ...@@ -28,6 +28,8 @@ update: 2019/09/10
#include "inc/common.hpp" #include "inc/common.hpp"
#include "inc/database.h" #include "inc/database.h"
#include "postfile.h" #include "postfile.h"
#include "dirmon.h"
#include "inc/fs.h"
using namespace std; using namespace std;
using namespace zmqhelper; using namespace zmqhelper;
...@@ -49,12 +51,16 @@ private: ...@@ -49,12 +51,16 @@ private:
int *streamList = nullptr; int *streamList = nullptr;
time_t tsLastBoot, tsUpdateTime; time_t tsLastBoot, tsUpdateTime;
json config; json config;
thread thMsgProcessor; thread thMsgProcessor, thSliceMgr;
string drport = "5549"; string drport = "5549";
json slices; json slices;
bool gotFormat = false; bool gotFormat = false;
vector<long> vTsOld;
vector<long> vTsActive;
map<long, string> mapTs2BaseName;
int handleMsg(vector<vector<uint8_t> > v){ int handleMsg(vector<vector<uint8_t> > v)
{
int ret = 0; int ret = 0;
string peerId, meta; string peerId, meta;
json data; json data;
...@@ -64,7 +70,7 @@ private: ...@@ -64,7 +70,7 @@ private:
} }
if(v.size() == 3) { if(v.size() == 3) {
try{ try {
peerId = body2str(v[0]); peerId = body2str(v[0]);
meta = json::parse(body2str(v[1]))["type"]; meta = json::parse(body2str(v[1]))["type"];
if(meta == EV_MSG_META_AVFORMATCTX) { if(meta == EV_MSG_META_AVFORMATCTX) {
...@@ -72,13 +78,16 @@ private: ...@@ -72,13 +78,16 @@ private:
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput); AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true; gotFormat = true;
spdlog::info("evslicer {} got avformat from {}", selfId, peerId); spdlog::info("evslicer {} got avformat from {}", selfId, peerId);
}else{ }
else {
spdlog::info("evslicer {} received msg from {}, type = {}, data = {}", selfId, peerId, meta, data.dump()); spdlog::info("evslicer {} received msg from {}, type = {}, data = {}", selfId, peerId, meta, data.dump());
} }
}catch(exception &e){ }
catch(exception &e) {
spdlog::error("evslicer {} failed to process msg:{}", selfId, msg); spdlog::error("evslicer {} failed to process msg:{}", selfId, msg);
} }
}else{ }
else {
spdlog::error("evslicer {} get invalid msg with size {}: {}", selfId, v.size(), msg); spdlog::error("evslicer {} get invalid msg with size {}: {}", selfId, v.size(), msg);
} }
...@@ -243,7 +252,7 @@ private: ...@@ -243,7 +252,7 @@ private:
spdlog::error("evslicer {}, failed to send hello to puller: {}", selfId, zmq_strerror(zmq_errno())); spdlog::error("evslicer {}, failed to send hello to puller: {}", selfId, zmq_strerror(zmq_errno()));
continue; continue;
} }
this_thread::sleep_for(chrono::seconds(7)); this_thread::sleep_for(chrono::seconds(20));
} }
return ret; return ret;
} }
...@@ -426,6 +435,93 @@ protected: ...@@ -426,6 +435,93 @@ protected:
}// outer while }// outer while
} }
vector<long> LoadVideoFiles(string path, int days, int maxSlices, map<long, string> &ts2fileName, vector<long> &tsNeedUpload)
{
vector<long> v;
// get current timestamp
list<long> tsRing;
list<long>tsToProcess;
auto now = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
try {
for (const auto & entry : fs::directory_iterator(path)) {
if(entry.file_size() == 0 || !entry.is_regular_file()||entry.path().extension() != ".mp4") {
spdlog::warn("LoasdVideoFiles skipped {} (empty/directory/!mp4)", entry.path().c_str());
continue;
}
auto ftime = fs::last_write_time(entry.path());
auto ts = decltype(ftime)::clock::to_time_t(ftime);
// check if processed already
if(ts2fileName.count(ts) != 0) {
spdlog::warn("LoasdVideoFiles multiple files with same timestamp: {}, {}(skipped), ", ts2fileName[ts], entry.path().c_str());
continue;
}
// check old files
if(ts - now > days * 24 * 60 * 60) {
spdlog::info("file {} old that {} days", entry.path().c_str(), days);
tsToProcess.insert(std::upper_bound(tsToProcess.begin(), tsToProcess.end(), ts), ts);
}
else {
tsRing.insert(std::upper_bound(tsRing.begin(), tsRing.end(), ts), ts);
}
// add to map
string fname = entry.path().c_str();
auto posS = fname.find_last_of('/');
if(posS == string::npos) {
posS = 0;
}else{
posS = posS +1;
}
auto posE = fname.find_last_of('.');
if(posE == string::npos) {
posE = fname.size()-1;
}else{
posE = posE -1;
}
if(posE < posS) {
spdlog::error("LoadVideoFiles invalid filename");
}
//spdlog::info("LoadVideoFiles path {}, s {}, e {}", fname, posS, posE);
ts2fileName[ts] = fname.substr(posS, posE - posS + 1);
}
}
catch(exception &e) {
spdlog::error("LoasdVideoFiles exception : {}", e.what());
}
// skip old items
list<long>olds;
int delta = maxSlices - tsRing.size();
int skip = delta < 0? (-delta):0;
spdlog::info("LoasdVideoFiles max: {}, current: {}, skip: {}",maxSlices, tsRing.size(), skip);
int idx = 0;
list<long>::iterator pos = tsRing.begin();
for(auto &i:tsRing) {
if(idx < skip) {
idx++;
pos++;
continue;
}
v.push_back(i);
}
// merge
if(skip > 0) {
tsToProcess.insert(std::upper_bound(tsToProcess.begin(), tsToProcess.end(), tsRing.front()), tsRing.begin(), pos);
}
//
for(auto &i:tsToProcess) {
tsNeedUpload.push_back(i);
}
return v;
}
public: public:
EvSlicer() EvSlicer()
{ {
...@@ -444,7 +540,8 @@ public: ...@@ -444,7 +540,8 @@ public:
} }
devSn = v[0]; devSn = v[0];
iid = stoi(v[2]); iid = stoi(v[2]);
}else{ }
else {
spdlog::error("evslicer failed to start. no SN set"); spdlog::error("evslicer failed to start. no SN set");
exit(1); exit(1);
} }
...@@ -459,13 +556,13 @@ public: ...@@ -459,13 +556,13 @@ public:
ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId); ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId);
if(ret != 0) { if(ret != 0) {
spdlog::error("evslicer {} failed to receive configration message {}", devSn , addr); spdlog::error("evslicer {} failed to receive configration message {}", devSn, addr);
} }
init(); init();
// thread for msg // thread for msg
thMsgProcessor = thread([this](){ thMsgProcessor = thread([this]() {
while(true) { while(true) {
auto body = z_recv_multiple(pDealer,false); auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) { if(body.size() == 0) {
...@@ -476,8 +573,22 @@ public: ...@@ -476,8 +573,22 @@ public:
handleMsg(body); handleMsg(body);
} }
}); });
thMsgProcessor.detach(); thMsgProcessor.detach();
// thread for slicer maintenace
thSliceMgr = thread([this]() {
// get old and active slices
this->vTsActive = this->LoadVideoFiles(this->urlOut, this->days, this->numSlices, this->mapTs2BaseName, this->vTsOld);
this->segHead = 0;
this->segTail = vTsActive.size();
});
thSliceMgr.detach();
// thread for uploading slices
getInputFormat(); getInputFormat();
setupStream(); setupStream();
}; };
......
...@@ -68,7 +68,26 @@ vector<long> LoadVideoFiles(string path, int days, int maxSlices, map<long, stri ...@@ -68,7 +68,26 @@ vector<long> LoadVideoFiles(string path, int days, int maxSlices, map<long, stri
} }
// add to map // add to map
ts2fileName[ts] = entry.path(); string fname = entry.path().c_str();
auto posS = fname.find_last_of('/');
if(posS == string::npos) {
posS = 0;
}else{
posS = posS +1;
}
auto posE = fname.find_last_of('.');
if(posE == string::npos) {
posE = fname.size()-1;
}else{
posE = posE -1;
}
if(posE < posS) {
spdlog::error("LoadVideoFiles invalid filename");
}
//spdlog::info("LoadVideoFiles path {}, s {}, e {}", fname, posS, posE);
ts2fileName[ts] = fname.substr(posS, posE - posS + 1);
} }
} }
catch(exception &e) { catch(exception &e) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论