提交 4341a93a authored 作者: blu's avatar blu

upload video files

上级 6bee7971
...@@ -68,7 +68,8 @@ private: ...@@ -68,7 +68,8 @@ private:
thread thEventHandler; thread thEventHandler;
string videoFileServerApi = "http://139.219.142.18:10008/upload/evtvideos/"; string videoFileServerApi = "http://139.219.142.18:10008/upload/evtvideos/";
bool validMsg(json &msg) { bool validMsg(json &msg)
{
return true; return true;
} }
int handleMsg(vector<vector<uint8_t> > v) int handleMsg(vector<vector<uint8_t> > v)
...@@ -91,15 +92,16 @@ private: ...@@ -91,15 +92,16 @@ private:
gotFormat = true; gotFormat = true;
spdlog::info("evslicer {} got avformat from {}", selfId, peerId); spdlog::info("evslicer {} got avformat from {}", selfId, peerId);
} }
else if(meta == EV_MSG_META_EVENT){ else if(meta == EV_MSG_META_EVENT) {
data = json::parse(body2str(v[2])); data = json::parse(body2str(v[2]));
/// evslicer has two msg interfaces to subsystems on edge side /// evslicer has two msg interfaces to subsystems on edge side
/// 1. type = "event"; start: timestamp; end: timestamp /// 1. type = "event"; start: timestamp; end: timestamp
/// 2. type = "media"; duration: seconds /// 2. type = "media"; duration: seconds
if(!validMsg(data)){ if(!validMsg(data)) {
spdlog::info("evslicer {} received invalid msg from {}: {}", selfId, peerId, msg); spdlog::info("evslicer {} received invalid msg from {}: {}", selfId, peerId, msg);
}else{ }
else {
spdlog::info("evslicer {} received msg from {}, type = {}, data = {}", selfId, peerId, meta, data.dump()); spdlog::info("evslicer {} received msg from {}, type = {}, data = {}", selfId, peerId, meta, data.dump());
if(data["type"] == "event") { if(data["type"] == "event") {
lock_guard<mutex> lock(mutEvent); lock_guard<mutex> lock(mutEvent);
...@@ -107,10 +109,11 @@ private: ...@@ -107,10 +109,11 @@ private:
if(eventQueue.size() > MAX_EVENT_QUEUE_SIZE) { if(eventQueue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQueue.pop(); eventQueue.pop();
} }
cvEvent.notify_one(); cvEvent.notify_one();
} }
} }
}else{ }
else {
spdlog::info("evslicer {} received unkown msg from {}: {}", selfId, peerId, msg); spdlog::info("evslicer {} received unkown msg from {}: {}", selfId, peerId, msg);
} }
} }
...@@ -162,7 +165,8 @@ private: ...@@ -162,7 +165,8 @@ private:
selfId = devSn + ":evslicer:" + to_string(iid); selfId = devSn + ":evslicer:" + to_string(iid);
if(ipc.count("sn") == 0) { if(ipc.count("sn") == 0) {
ipcSn = "unkown"; ipcSn = "unkown";
}else{ }
else {
ipcSn = ipc["sn"]; ipcSn = ipc["sn"];
} }
...@@ -172,7 +176,7 @@ private: ...@@ -172,7 +176,7 @@ private:
pullerGid = evpuller["sn"].get<string>() + ":evpuller:" + to_string(evpuller["iid"]); pullerGid = evpuller["sn"].get<string>() + ":evpuller:" + to_string(evpuller["iid"]);
mgrSn = evmgr["sn"]; mgrSn = evmgr["sn"];
if(evslicer.count("path") == 0){ if(evslicer.count("path") == 0) {
spdlog::info("evslicer {} no params for path, using default: {}", selfId, URLOUT_DEFAULT); spdlog::info("evslicer {} no params for path, using default: {}", selfId, URLOUT_DEFAULT);
urlOut = URLOUT_DEFAULT; urlOut = URLOUT_DEFAULT;
} }
...@@ -477,18 +481,21 @@ protected: ...@@ -477,18 +481,21 @@ protected:
} }
string getBaseName(const string &fname) { string getBaseName(const string &fname)
{
string ret; string ret;
auto posS = fname.find_last_of('/'); auto posS = fname.find_last_of('/');
if(posS == string::npos) { if(posS == string::npos) {
posS = 0; posS = 0;
}else{ }
else {
posS = posS +1; posS = posS +1;
} }
auto posE = fname.find_last_of('.'); auto posE = fname.find_last_of('.');
if(posE == string::npos) { if(posE == string::npos) {
posE = fname.size()-1; posE = fname.size()-1;
}else{ }
else {
posE = posE -1; posE = posE -1;
} }
if(posE < posS) { if(posE < posS) {
...@@ -500,13 +507,15 @@ protected: ...@@ -500,13 +507,15 @@ protected:
return fname.substr(posS, posE - posS + 1); return fname.substr(posS, posE - posS + 1);
} }
long videoFileName2Ts(string &fileBaseName) { long videoFileName2Ts(string &fileBaseName)
{
std::tm t; std::tm t;
strptime(fileBaseName.data(), "%Y%m%d_%H%M%S", &t); strptime(fileBaseName.data(), "%Y%m%d_%H%M%S", &t);
return mktime(&t); return mktime(&t);
} }
string videoFileTs2Name(long ts, bool bLog = false) { string videoFileTs2Name(long ts, bool bLog = false)
{
std::time_t now = ts; std::time_t now = ts;
std::tm * ptm = std::localtime(&now); std::tm * ptm = std::localtime(&now);
char buffer[20]; char buffer[20];
...@@ -575,7 +584,7 @@ protected: ...@@ -575,7 +584,7 @@ protected:
if(skip > 0) { if(skip > 0) {
tsToProcess.insert(std::upper_bound(tsToProcess.begin(), tsToProcess.end(), tsRing.front()), tsRing.begin(), pos); tsToProcess.insert(std::upper_bound(tsToProcess.begin(), tsToProcess.end(), tsRing.front()), tsRing.begin(), pos);
} }
delta = maxSlices - tsToProcess.size(); delta = maxSlices - tsToProcess.size();
skip = delta < 0? (-delta) : 0; skip = delta < 0? (-delta) : 0;
idx = 0; idx = 0;
...@@ -598,7 +607,8 @@ protected: ...@@ -598,7 +607,8 @@ protected:
} }
// file monitor callback // file monitor callback
static void fileMonHandler(const std::vector<event>& evts, void *pUserData) { static void fileMonHandler(const std::vector<event>& evts, void *pUserData)
{
static string lastFile; static string lastFile;
static long lastTs; static long lastTs;
...@@ -606,7 +616,8 @@ protected: ...@@ -606,7 +616,8 @@ protected:
for(auto &i : evts) { for(auto &i : evts) {
if(lastFile == i.get_path()) { if(lastFile == i.get_path()) {
// skip // skip
}else if(!lastFile.empty()){ }
else if(!lastFile.empty()) {
// insert into ts active // insert into ts active
//spdlog::info("evslicer {} filemon file: {}, ts: {}, last: {}", self->selfId, i.get_path().c_str(), i.get_time(), lastFile); //spdlog::info("evslicer {} filemon file: {}, ts: {}, last: {}", self->selfId, i.get_path().c_str(), i.get_time(), lastFile);
if(self->segHead >= self->numSlices) { if(self->segHead >= self->numSlices) {
...@@ -617,20 +628,22 @@ protected: ...@@ -617,20 +628,22 @@ protected:
if(self->bSegFull) { if(self->bSegFull) {
// TODO: backup orignal self->vTsActive[self->segHead] // TODO: backup orignal self->vTsActive[self->segHead]
} }
try{ try {
auto baseName = self->getBaseName(lastFile); auto baseName = self->getBaseName(lastFile);
auto ts = self->videoFileName2Ts(baseName); auto ts = self->videoFileName2Ts(baseName);
auto oldTs = self->vTsActive[self->segHead]; auto oldTs = self->vTsActive[self->segHead];
self->vTsActive[self->segHead] = ts; self->vTsActive[self->segHead] = ts;
self->segHead++; self->segHead++;
//spdlog::info("evslicer {} fileMonHandler video seg done: {}/{}.mp4, ts:{}", self->selfId, self->urlOut, baseName, ts); //spdlog::info("evslicer {} fileMonHandler video seg done: {}/{}.mp4, ts:{}", self->selfId, self->urlOut, baseName, ts);
}catch(exception &e) { }
catch(exception &e) {
spdlog::error("evslicer {} fileMonHandler exception: {}", self->selfId, e.what()); spdlog::error("evslicer {} fileMonHandler exception: {}", self->selfId, e.what());
} }
}else{ }
else {
//nop //nop
} }
lastFile = i.get_path(); lastFile = i.get_path();
...@@ -638,14 +651,16 @@ protected: ...@@ -638,14 +651,16 @@ protected:
} }
// //
int segToIdx(int seg) { int segToIdx(int seg)
{
if(seg >= numSlices) { if(seg >= numSlices) {
seg -= numSlices; seg -= numSlices;
} }
return seg; return seg;
} }
// find video files // find video files
vector<string> findSlicesByRange(long tss, long tse, int offsetS, int offsetE){ vector<string> findSlicesByRange(long tss, long tse, int offsetS, int offsetE)
{
vector<string> ret; vector<string> ret;
int found = 0; int found = 0;
int _itss = 0; int _itss = 0;
...@@ -653,7 +668,7 @@ protected: ...@@ -653,7 +668,7 @@ protected:
_itss = segHead; _itss = segHead;
} }
for(int i = 0; i < numSlices; i++){ for(int i = 0; i < numSlices; i++) {
spdlog::info("evslicer {} vector[{}] = {}, {}", selfId, i, vTsActive[i], videoFileTs2Name(vTsActive[i])); spdlog::info("evslicer {} vector[{}] = {}, {}", selfId, i, vTsActive[i], videoFileTs2Name(vTsActive[i]));
if(vTsActive[i] == 0) { if(vTsActive[i] == 0) {
break; break;
...@@ -662,21 +677,22 @@ protected: ...@@ -662,21 +677,22 @@ protected:
if(vTsActive[_itss] >= tse || vTsActive[segHead -1] < tss||(!bSegFull && segHead == 0)) { if(vTsActive[_itss] >= tse || vTsActive[segHead -1] < tss||(!bSegFull && segHead == 0)) {
spdlog::error("evslicer {} findSlicesByRange event range ({},{}) is not in recorded range ({}, {})", selfId, this->videoFileTs2Name(tss), this->videoFileTs2Name(tse), this->videoFileTs2Name(vTsActive[_itss]), this->videoFileTs2Name(vTsActive[segHead -1])); spdlog::error("evslicer {} findSlicesByRange event range ({},{}) is not in recorded range ({}, {})", selfId, this->videoFileTs2Name(tss), this->videoFileTs2Name(tse), this->videoFileTs2Name(vTsActive[_itss]), this->videoFileTs2Name(vTsActive[segHead -1]));
}else{ }
else {
int idxS, idxE; int idxS, idxE;
int delta = bSegFull? numSlices : 0; int delta = bSegFull? numSlices : 0;
for(int i = segHead + delta; i > _itss; i--){ for(int i = segHead + delta; i > _itss; i--) {
if(vTsActive[segToIdx(i)] == 0) { if(vTsActive[segToIdx(i)] == 0) {
continue; continue;
} }
if(tse >= vTsActive[segToIdx(i)]){ if(tse >= vTsActive[segToIdx(i)]) {
if((found &1) != 1){ if((found &1) != 1) {
idxE = segToIdx(i); idxE = segToIdx(i);
found |= 1; found |= 1;
} }
} }
if(tss > vTsActive[segToIdx(i)]){ if(tss > vTsActive[segToIdx(i)]) {
if((found &2) != 2) { if((found &2) != 2) {
idxS = segToIdx(i); idxS = segToIdx(i);
found |=2; found |=2;
...@@ -692,7 +708,7 @@ protected: ...@@ -692,7 +708,7 @@ protected:
idxE += numSlices; idxE += numSlices;
} }
string sf; string sf;
for(int i = idxS; i <= idxE; i++){ for(int i = idxS; i <= idxE; i++) {
int idx = segToIdx(i); int idx = segToIdx(i);
long ts = vTsActive[idx]; long ts = vTsActive[idx];
string fname = videoFileTs2Name(ts, true); string fname = videoFileTs2Name(ts, true);
...@@ -765,7 +781,7 @@ public: ...@@ -765,7 +781,7 @@ public:
this->vTsActive = this->LoadVideoFiles(this->urlOut, this->hours, this->numSlices, this->vTsOld); this->vTsActive = this->LoadVideoFiles(this->urlOut, this->hours, this->numSlices, this->vTsOld);
spdlog::info("evslicer {} will store slice from index: {}", selfId, this->segHead); spdlog::info("evslicer {} will store slice from index: {}", selfId, this->segHead);
monitor * m = nullptr; monitor * m = nullptr;
CreateDirMon(&m, this->urlOut, ".mp4", vector<string>(), EvSlicer::fileMonHandler, (void *)this); CreateDirMon(&m, this->urlOut, ".mp4", vector<string>(), EvSlicer::fileMonHandler, (void *)this);
}); });
thSliceMgr.detach(); thSliceMgr.detach();
...@@ -775,12 +791,13 @@ public: ...@@ -775,12 +791,13 @@ public:
setupStream(); setupStream();
// event thread // event thread
thEventHandler = thread([this]{ thEventHandler = thread([this] {
while(true){ while(true)
{
unique_lock<mutex> lk(this->mutEvent); unique_lock<mutex> lk(this->mutEvent);
this->cvEvent.wait(lk, [this]{return !(this->eventQueue.empty());}); this->cvEvent.wait(lk, [this] {return !(this->eventQueue.empty());});
if(this->eventQueue.empty()){ if(this->eventQueue.empty()) {
continue; continue;
} }
...@@ -794,11 +811,17 @@ public: ...@@ -794,11 +811,17 @@ public:
long offsetS = 0; long offsetS = 0;
long offsetE = 0; long offsetE = 0;
// TODO: async // TODO: async
static auto bootTime = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
if(tse < bootTime) {
spdlog::warn("evslicer {} discard old msg {}", selfId, evt);
return;
}
this_thread::sleep_for(chrono::seconds(this->seconds + 5)); this_thread::sleep_for(chrono::seconds(this->seconds + 5));
auto v = findSlicesByRange(tss, tse, offsetS, offsetE); auto v = findSlicesByRange(tss, tse, offsetS, offsetE);
if(v.size() == 0) { if(v.size() == 0) {
spdlog::error("evslicer {} ignore upload videos in range ({}, {})", this->selfId, this->videoFileTs2Name(tss), this->videoFileTs2Name(tse)); spdlog::error("evslicer {} ignore upload videos in range ({}, {})", this->selfId, this->videoFileTs2Name(tss), this->videoFileTs2Name(tse));
}else{ }
else {
vector<tuple<string, string> > params= {{"startTime", to_string(tss)},{"endTime", to_string(tse)},{"cameraId", ipcSn}, {"headOffset", to_string(offsetS)},{"tailOffset", to_string(offsetE)}}; vector<tuple<string, string> > params= {{"startTime", to_string(tss)},{"endTime", to_string(tse)},{"cameraId", ipcSn}, {"headOffset", to_string(offsetS)},{"tailOffset", to_string(offsetE)}};
vector<string> fileNames; vector<string> fileNames;
string sf; string sf;
...@@ -810,45 +833,52 @@ public: ...@@ -810,45 +833,52 @@ public:
spdlog::info("evslicer {} file upload url: {}", selfId, this->videoFileServerApi); spdlog::info("evslicer {} file upload url: {}", selfId, this->videoFileServerApi);
// TODO: check result and reschedule it // TODO: check result and reschedule it
if(netutils::postFiles(std::move(this->videoFileServerApi), std::move(params), std::move(fileNames)) != 0){ if(netutils::postFiles(std::move(this->videoFileServerApi), std::move(params), std::move(fileNames)) != 0) {
spdlog::error("evslicer {} failed to upload files:\n{}", selfId, sf); spdlog::error("evslicer {} failed to upload files:\n{}", selfId, sf);
}else{
spdlog::info("evslicer {} successfull uploaded files:\n{}", selfId, sf);
} }
else {
spdlog::info("evslicer {} successfull uploaded files:\n{}", selfId, sf);
#undef DEBUG
#ifdef DEBUG
int iret = 0;
string _path1 = string("/tmp/merged_video/") + this->videoFileTs2Name(tss) + "_" + this->videoFileTs2Name(tse);
fs::path _path(_path1 + ".txt");
if(!fs::create_directories(_path.parent_path())) {
spdlog::error("evslicer {} failed to create directory {}", selfId, _path.c_str());
return;
}
ofstream ofs(_path);
ofs << sf;
ofs.close();
AVDictionary * _optsIn, *_optsO;
av_dict_set(&_optsIn, "f", "concat", 0);
av_dict_set(&_optsO, "max_muxing_queue_size", "9999", 0);
AVFormatContext *_pCtx = nullptr;
iret = avformat_open_input(&_pCtx, _path.c_str(), nullptr, &_optsIn);
if(iret < 0) {
spdlog::error("evslicer {} failed to open {}",selfId, _path.c_str());
return;
}
iret = avio_open2(&_pCtx->pb, (_path1 + ".mp4").c_str(), AVIO_FLAG_WRITE, nullptr, &_optsO);
if(iret < 0) {
spdlog::error("evslicer {} failed to write output {}", selfId, (_path1 + ".mp4").c_str());
}
AVPacket pkt;
while(av_read_frame(_pCtx, &pkt)>=0) {
iret = av_interleaved_write_frame(_pCtx, &pkt);
av_packet_unref(&pkt);
}
av_write_trailer(_pCtx);
if(_pCtx->pb != nullptr)
avio_closep(&_pCtx->pb);
if(_pCtx != nullptr)
avformat_free_context(_pCtx);
#define DEBUG #endif
#ifdef DEBUG
int iret = 0;
string _path1 = string("/tmp/merged_video/") + this->videoFileTs2Name(tss) + "_" + this->videoFileTs2Name(tse);
fs::path _path(_path1 + ".txt");
if(!fs::create_directories(_path.parent_path())) {
spdlog::error("evslicer {} failed to create directory {}", selfId, _path.c_str());
return;
} }
ofstream ofs(_path);
ofs << sf;
ofs.close();
AVDictionary * _optsIn, *_optsO;
av_dict_set(&_optsIn, "f", "concat", 0);
av_dict_set(&_optsO, "max_muxing_queue_size", "9999", 0);
AVFormatContext *_pCtx = nullptr;
iret = avformat_open_input(&_pCtx, _path.c_str(), nullptr, &_optsIn);
if(iret < 0) {
spdlog::error("evslicer {} failed to open {}",selfId, _path.c_str());
return;
}
iret = avio_open2(&_pCtx->pb, (_path1 + ".mp4").c_str(), AVIO_FLAG_WRITE, nullptr, &_optsO);
if(iret < 0) {
spdlog::error("evslicer {} failed to write output {}", selfId, (_path1 + ".mp4").c_str());
}
if(_pCtx->pb != nullptr)
avio_closep(&_pCtx->pb);
if(_pCtx != nullptr)
avformat_free_context(_pCtx);
#endif
} }
}else{ }
else {
spdlog::error("evslicer {} unkown event :{}", this->selfId, evt); spdlog::error("evslicer {} unkown event :{}", this->selfId, evt);
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论