提交 53d355cd authored 作者: blu's avatar blu

big refacting of communitation architect

上级 9a89272c
...@@ -259,7 +259,10 @@ private: ...@@ -259,7 +259,10 @@ private:
return -1; return -1;
} }
if((peerData["status"].count(selfId) == 0 || peerData["status"][selfId] == 0) ) { //auto state = zmq_socket_get_peer_state(pRouter, selfId.data(), selfId.size());
//spdlog::info("evdaemon {} peerState: {}", devSn, state);
if((peerData["status"].count(selfId) == 0 || peerData["status"][selfId] == 0||this->peerData["status"][selfId] == -1) ) {
peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
spdlog::info("evdaemon {} peer connected: {}", devSn, selfId); spdlog::info("evdaemon {} peer connected: {}", devSn, selfId);
eventConn = true; eventConn = true;
...@@ -320,7 +323,7 @@ private: ...@@ -320,7 +323,7 @@ private:
// message to other peer // message to other peer
// check peer status // check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]}; vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
if(peerData["status"].count(peerId) != 0 && peerData["status"][peerId] != 0) { if(peerData["status"].count(peerId) != 0 && peerData["status"][peerId] != 0 && this->peerData["status"][peerId][peerId] != -1) {
spdlog::info("evdaemon {} route msg from {} to {}", devSn, selfId, peerId); spdlog::info("evdaemon {} route msg from {} to {}", devSn, selfId, peerId);
ret = z_send_multiple(pRouter, v); ret = z_send_multiple(pRouter, v);
if(ret < 0) { if(ret < 0) {
......
...@@ -44,7 +44,7 @@ private: ...@@ -44,7 +44,7 @@ private:
void *pSubCtx = nullptr, *pDealerCtx = nullptr; // for packets relay void *pSubCtx = nullptr, *pDealerCtx = nullptr; // for packets relay
void *pSub = nullptr, *pDealer = nullptr, *pDaemonCtx = nullptr, *pDaemon = nullptr; void *pSub = nullptr, *pDealer = nullptr, *pDaemonCtx = nullptr, *pDaemon = nullptr;
string urlOut, urlPub, urlRouter, devSn, mgrSn, selfId, pullerGid; string urlOut, urlPub, urlRouter, devSn, mgrSn, selfId, pullerGid;
int iid, days, minutes, numSlices, segHead = 0; int iid, days, minutes, numSlices, segHead = 0, segHeadP;
bool enablePush = false, bSegFull = false; bool enablePush = false, bSegFull = false;
AVFormatContext *pAVFormatRemux = nullptr; AVFormatContext *pAVFormatRemux = nullptr;
AVFormatContext *pAVFormatInput = nullptr; AVFormatContext *pAVFormatInput = nullptr;
...@@ -290,6 +290,7 @@ private: ...@@ -290,6 +290,7 @@ private:
av_dict_set(&pOptsRemux, "c:v", "libx264", 0); av_dict_set(&pOptsRemux, "c:v", "libx264", 0);
//av_dict_set(&pOptsRemux, "brand", "mp42", 0); //av_dict_set(&pOptsRemux, "brand", "mp42", 0);
//av_dict_set(&pOptsRemux, "movflags", "faststart", 0); //av_dict_set(&pOptsRemux, "movflags", "faststart", 0);
av_dict_set(&pOptsRemux, "strftime", "1", 0);
av_dict_set(&pOptsRemux, "segment_format", "mp4", 0); av_dict_set(&pOptsRemux, "segment_format", "mp4", 0);
av_dict_set(&pOptsRemux, "f", "segment", 0); av_dict_set(&pOptsRemux, "f", "segment", 0);
av_dict_set(&pOptsRemux, "segment_time", "20", 0); av_dict_set(&pOptsRemux, "segment_time", "20", 0);
...@@ -331,7 +332,7 @@ protected: ...@@ -331,7 +332,7 @@ protected:
auto end = start; auto end = start;
int ts = chrono::duration_cast<chrono::seconds>(start.time_since_epoch()).count(); int ts = chrono::duration_cast<chrono::seconds>(start.time_since_epoch()).count();
string name = to_string(ts) + ".mp4"; string name = to_string(ts) + ".mp4";
name = urlOut + "/" + "%06d.mp4"; name = urlOut + "/" + "%Y%m%d_%H%M%S.mp4";
ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "segment", name.c_str()); ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "segment", name.c_str());
if (ret < 0) { if (ret < 0) {
spdlog::error("evslicer {} failed create avformatcontext for output: %s", selfId, av_err2str(ret)); spdlog::error("evslicer {} failed create avformatcontext for output: %s", selfId, av_err2str(ret));
...@@ -359,7 +360,7 @@ protected: ...@@ -359,7 +360,7 @@ protected:
spdlog::error("evslicer {} could not open output file {}", selfId, name); spdlog::error("evslicer {} could not open output file {}", selfId, name);
} }
} }
av_dict_set(&pOptsRemux, "segment_start_number", to_string(segHead+1).data(), 0); av_dict_set(&pOptsRemux, "segment_start_number", to_string(segHead).data(), 0);
ret = avformat_write_header(pAVFormatRemux, &pOptsRemux); ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
if (ret < 0) { if (ret < 0) {
spdlog::error("evslicer {} error occurred when opening output file", selfId); spdlog::error("evslicer {} error occurred when opening output file", selfId);
...@@ -465,45 +466,51 @@ protected: ...@@ -465,45 +466,51 @@ protected:
return fname.substr(posS, posE - posS + 1); return fname.substr(posS, posE - posS + 1);
} }
long videoFileName2Ts(string &fileBaseName) {
std::tm t;
strptime(fileBaseName.data(), "%Y%m%d_%H%M%S", &t);
return mktime(&t);
}
vector<long> LoadVideoFiles(string path, int days, int maxSlices, map<long, string> &ts2fileName, vector<long> &tsNeedUpload) vector<long> LoadVideoFiles(string path, int days, int maxSlices, map<long, string> &ts2fileName, vector<long> &tsNeedUpload)
{ {
vector<long> v; vector<long> v = vector<long>(maxSlices);
tsNeedUpload = vector<long>(maxSlices);
// get current timestamp // get current timestamp
list<long> tsRing; list<long> tsRing;
list<long>tsToProcess; list<long>tsToProcess;
auto now = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); auto now = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
try { try {
string fname, baseName;
for (const auto & entry : fs::directory_iterator(path)) { for (const auto & entry : fs::directory_iterator(path)) {
fname = entry.path().c_str();
if(entry.file_size() == 0 || !entry.is_regular_file()||entry.path().extension() != ".mp4") { if(entry.file_size() == 0 || !entry.is_regular_file()||entry.path().extension() != ".mp4") {
spdlog::warn("LoasdVideoFiles skipped {} (empty/directory/!mp4)", entry.path().c_str()); spdlog::warn("LoasdVideoFiles skipped {} (empty/directory/!mp4)", entry.path().c_str());
continue; continue;
} }
auto ftime = fs::last_write_time(entry.path()); baseName = getBaseName(fname);
auto ts = decltype(ftime)::clock::to_time_t(ftime); auto ts = videoFileName2Ts(baseName);
// auto ftime = fs::last_write_time(entry.path());
// check if processed already // auto ts = decltype(ftime)::clock::to_time_t(ftime);
if(ts2fileName.count(ts) != 0) { // // check if processed already
spdlog::warn("LoasdVideoFiles multiple files with same timestamp: {}, {}(skipped), ", ts2fileName[ts], entry.path().c_str()); // if(ts2fileName.count(ts) != 0) {
continue; // spdlog::warn("LoasdVideoFiles multiple files with same timestamp: {}, {}(skipped), ", ts2fileName[ts], entry.path().c_str());
} // continue;
// }
// check old files // check old files
if(ts - now > days * 24 * 60 * 60) { if(ts - now > days * 24 * 60 * 60) {
spdlog::info("file {} old that {} days", entry.path().c_str(), days); spdlog::info("evslicer {} file {} old than {} days: {}, {}", selfId, entry.path().c_str(), days, ts, now);
tsToProcess.insert(std::upper_bound(tsToProcess.begin(), tsToProcess.end(), ts), ts); tsToProcess.insert(std::upper_bound(tsToProcess.begin(), tsToProcess.end(), ts), ts);
} }
else { else {
tsRing.insert(std::upper_bound(tsRing.begin(), tsRing.end(), ts), ts); tsRing.insert(std::upper_bound(tsRing.begin(), tsRing.end(), ts), ts);
} }
// add to map
string fname = entry.path().c_str();
auto baseName = getBaseName(fname);
//spdlog::info("LoadVideoFiles path {}, s {}, e {}", fname, posS, posE); //spdlog::info("LoadVideoFiles path {}, s {}, e {}", fname, posS, posE);
ts2fileName[ts] = baseName; //ts2fileName[ts] = baseName;
} }
} }
catch(exception &e) { catch(exception &e) {
...@@ -524,15 +531,29 @@ protected: ...@@ -524,15 +531,29 @@ protected:
continue; continue;
} }
v.push_back(i); v.push_back(i);
segHead++;
} }
// merge // merge
if(skip > 0) { if(skip > 0) {
tsToProcess.insert(std::upper_bound(tsToProcess.begin(), tsToProcess.end(), tsRing.front()), tsRing.begin(), pos); tsToProcess.insert(std::upper_bound(tsToProcess.begin(), tsToProcess.end(), tsRing.front()), tsRing.begin(), pos);
} }
// delta = maxSlices - tsToProcess.size();
skip = delta < 0? (-delta) : 0;
idx = 0;
pos = tsToProcess.begin();
for(auto &i:tsToProcess) { for(auto &i:tsToProcess) {
if(idx < skip) {
idx++;
pos++;
continue;
}
tsNeedUpload.push_back(i); tsNeedUpload.push_back(i);
segHeadP++;
}
if(segHead!=0 && segHeadP != 0) {
spdlog::info("evslicer {} LoadVideoFiles active:{}, ts1:{}, ts2: {}; toprocess: {}, ts1: {}, ts2:{}", selfId, segHead, v.front(), v.back(), segHeadP, tsNeedUpload.front(), tsNeedUpload.back());
} }
return v; return v;
...@@ -545,12 +566,11 @@ protected: ...@@ -545,12 +566,11 @@ protected:
auto self = static_cast<EvSlicer*>(pUserData); auto self = static_cast<EvSlicer*>(pUserData);
for(auto &i : evts) { for(auto &i : evts) {
spdlog::info("evslicer {} filemon file: {}, ts: {}", self->selfId, i.get_path().c_str(), i.get_time());
if(lastFile == i.get_path()) { if(lastFile == i.get_path()) {
// skip // skip
}else if(!lastFile.empty()){ }else if(!lastFile.empty()){
// insert into ts active // insert into ts active
auto lockg = lock_guard(self->mutTsActive); spdlog::info("evslicer {} filemon file: {}, ts: {}, last: {}", self->selfId, i.get_path().c_str(), i.get_time(), lastFile);
if(self->segHead >= self->numSlices) { if(self->segHead >= self->numSlices) {
//wrap it; //wrap it;
self->segHead = 0; self->segHead = 0;
...@@ -559,17 +579,17 @@ protected: ...@@ -559,17 +579,17 @@ protected:
if(self->bSegFull) { if(self->bSegFull) {
// TODO: backup orignal self->vTsActive[self->segHead] // TODO: backup orignal self->vTsActive[self->segHead]
} }
try{ try{
auto ftime = fs::last_write_time(lastFile);
auto baseName = self->getBaseName(lastFile); auto baseName = self->getBaseName(lastFile);
auto ts = decltype(ftime)::clock::to_time_t(ftime); auto ts = self->videoFileName2Ts(baseName);
auto oldTs = self->vTsActive[self->segHead]; auto oldTs = self->vTsActive[self->segHead];
self->vTsActive[self->segHead] = ts; self->vTsActive[self->segHead] = ts;
self->mapTs2BaseName[ts] = baseName; //self->mapTs2BaseName[ts] = baseName;
// erase old ts to save memory // erase old ts to save memory
self->mapTs2BaseName.erase(oldTs); //self->mapTs2BaseName.erase(oldTs);
self->segHead++; self->segHead++;
spdlog::info("evslicer {} fileMonHandler video seg done: {}/{}.mp4, ts:{}", self->selfId, self->urlOut, baseName, ts); spdlog::info("evslicer {} fileMonHandler video seg done: {}/{}.mp4, ts:{}", self->selfId, self->urlOut, baseName, ts);
}catch(exception &e) { }catch(exception &e) {
...@@ -644,7 +664,7 @@ public: ...@@ -644,7 +664,7 @@ public:
thSliceMgr = thread([this]() { thSliceMgr = thread([this]() {
// get old and active slices // get old and active slices
this->vTsActive = this->LoadVideoFiles(this->urlOut, this->days, this->numSlices, this->mapTs2BaseName, this->vTsOld); this->vTsActive = this->LoadVideoFiles(this->urlOut, this->days, this->numSlices, this->mapTs2BaseName, this->vTsOld);
this->segHead = this->vTsActive.size(); spdlog::info("evslicer {} will store slice from index: {}", selfId, this->segHead);
monitor * m = nullptr; monitor * m = nullptr;
CreateDirMon(&m, this->urlOut, ".mp4", vector<string>(), EvSlicer::fileMonHandler, (void *)this); CreateDirMon(&m, this->urlOut, ".mp4", vector<string>(), EvSlicer::fileMonHandler, (void *)this);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论