提交 68b330ce authored 作者: blu's avatar blu

refactor of delta config

上级 b25a9252
......@@ -29,21 +29,74 @@ update: 2019/09/10
using namespace std;
using namespace zmqhelper;
class RepSrv: public TinyThread {
class EvPuller: public TinyThread {
private:
string mgrSn;
string devSn;
string selfId;
int iid;
string urlRep;
const char * bytes;
int len;
void *pDealer=NULL;
int handleMsg(vector<vector<uint8_t> > v)
void *pPubCtx = nullptr; // for packets publishing
void *pPub = nullptr;
void *pDealerCtx = nullptr;
void *pDealer = nullptr;
void *pDaemonCtx = nullptr, *pDaemon = nullptr;
AVFormatContext *pAVFormatInput = nullptr;
char *pAVFmtCtxBytes = nullptr;
int lenAVFmtCtxBytes = 0;
string urlIn, urlPub, urlDealer, mgrSn, devSn, selfId, ipcPort;
int *streamList = nullptr, numStreams = 0, iid;
time_t tsLastBoot, tsUpdateTime;
json config;
thread thEdgeMsgHandler, thCloudMsgHandler;
string proto = "rtsp";
string drport = "5549";
condition_variable cvMsg;
mutex mutMsg;
bool gotFormat = false;
int handleCloudMsg(vector<vector<uint8_t> > v)
{
int ret = 0;
string peerId, metaType, metaValue, msg;
json data;
for(auto &b:v) {
msg +=body2str(b) + ";";
}
bool bProcessed = false;
if(v.size() == 3) {
try {
peerId = body2str(v[0]);
json meta = json::parse(body2str(v[1]));
metaType = meta["type"];
if(meta.count("value") != 0) {
metaValue = meta["value"];
}
// msg from cluster mgr
string daemonId = this->devSn + ":evdaemon:0";
if(peerId == daemonId) {
if(metaValue == EV_MSG_META_VALUE_CMD_STOP || metaValue == EV_MSG_META_VALUE_CMD_RESTART) {
spdlog::info("evpusher {} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
bProcessed = true;
exit(0);
}
}
}
catch(exception &e) {
spdlog::error("evpusher {} exception to process msg {}: {}", selfId, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evpusher {} received msg having no implementation from peer: {}", selfId, msg);
}
return ret;
}
int handleEdgeMsg(vector<vector<uint8_t> > v)
{
int ret = 0;
auto msgBody = data2body(const_cast<char*>(bytes), len);
unique_lock<mutex> lk(this->mutMsg);
this->cvMsg.wait(lk, [this] {return this->gotFormat;});
auto msgBody = data2body(const_cast<char*>(pAVFmtCtxBytes), lenAVFmtCtxBytes);
try {
// rep framectx
// TODO: verify sender id
......@@ -74,73 +127,6 @@ private:
return ret;
}
protected:
void run()
{
int ret = 0;
bool bStopSig = false;
// init response msg
while (true) {
if(checkStop() == true) {
bStopSig = true;
break;
}
spdlog::info("evpuller {} waiting for req", selfId);
// proto: [sender_id] [meta] [body]
auto v = z_recv_multiple(pDealer, false);
if(v.size() != 3) {
//TODO:
spdlog::error("evpuller {}, repSrv received invalid message: {}", selfId, v.size());
continue;
}
handleMsg(v);
}
}
public:
RepSrv() = delete;
RepSrv(RepSrv &) = delete;
RepSrv(RepSrv&&) = delete;
RepSrv(string mgrSn, string devSn, int iid, const char* formatBytes,
int len, void *pDealer):mgrSn(mgrSn),devSn(devSn), iid(iid), bytes(formatBytes),
len(len), pDealer(pDealer)
{
selfId = devSn+":evpuller:" + to_string(iid);
};
~RepSrv() {};
};
class EvPuller: public TinyThread {
private:
void *pPubCtx = nullptr; // for packets publishing
void *pPub = nullptr;
void *pDealerCtx = nullptr;
void *pDealer = nullptr;
void *pDaemonCtx = nullptr, *pDaemon = nullptr;
AVFormatContext *pAVFormatInput = nullptr;
string urlIn, urlPub, urlDealer, mgrSn, devSn, selfId, ipcPort;
int *streamList = nullptr, numStreams = 0, iid;
time_t tsLastBoot, tsUpdateTime;
json config;
string proto = "rtsp";
string drport = "5549";
bool isIpStr(string ip)
{
int cnt = 3*4 + 3;
if(ip.size() == 0 || ip.size() > cnt) {
return false;
}
auto v = strutils::split(ip, '.');
if(v.size() == 0 || v.size () != 4) {
return false;
}
return true;
}
int ping()
{
......@@ -203,7 +189,7 @@ private:
}
string ipcAddr = ipc["addr"].get<string>();
if(isIpStr(ipcAddr)) {
if(strutils::isIpStr(ipcAddr)) {
string chan = "ch1";
string streamName = "main";
if(ipc.count("channel") != 0 && !ipc["channel"].get<string>().empty()) {
......@@ -310,10 +296,27 @@ protected:
}
// serialize formatctx to bytes
char *pBytes = nullptr;
ret = AVFormatCtxSerializer::encode(pAVFormatInput, &pBytes);
auto repSrv = RepSrv(mgrSn, devSn, iid, pBytes, ret, pDealer);
repSrv.detach();
lock_guard<mutex> lock(this->mutMsg);
lenAVFmtCtxBytes = AVFormatCtxSerializer::encode(pAVFormatInput, &pAVFmtCtxBytes);
if(lenAVFmtCtxBytes <= 0 || pAVFmtCtxBytes == nullptr) {
spdlog::error("evpuller {} failed to pull packet from {}. exiting...", selfId, urlIn);
exit(1);
}
gotFormat = true;
cvMsg.notify_one();
thEdgeMsgHandler = thread([this]{
while(true) {
auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
continue;
}
// full proto msg received.
this->handleEdgeMsg(body);
}
});
thEdgeMsgHandler.detach();
// find all video & audio streams for remuxing
int i = 0, streamIdx = 0;
......@@ -374,7 +377,11 @@ protected:
av_packet_unref(&packet);
}
free(pBytes);
if(pAVFmtCtxBytes != nullptr)
{
free(pAVFmtCtxBytes);
}
// TODO:
if(ret < 0 && !bStopSig) {
// reconnect
......@@ -421,6 +428,19 @@ public:
spdlog::error("evpuller {} failed to receive configration message {}", selfId, addr);
}
init();
thCloudMsgHandler = thread([this]{
while(true) {
auto body = z_recv_multiple(pDaemon,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
continue;
}
// full proto msg received.
this->handleCloudMsg(body);
}
});
thCloudMsgHandler.detach();
}
~EvPuller()
......
......@@ -95,6 +95,7 @@ json reqConfig(json &info)
///
namespace strutils {
vector<string> split(const std::string& s, char delimiter)
{
std::vector<std::string> tokens;
......@@ -106,6 +107,20 @@ vector<string> split(const std::string& s, char delimiter)
return tokens;
}
bool isIpStr(string ip)
{
int cnt = 3*4 + 3;
if(ip.size() == 0 || ip.size() > cnt) {
return false;
}
auto v = strutils::split(ip, '.');
if(v.size() == 0 || v.size () != 4) {
return false;
}
return true;
}
}//namespace strutils
namespace cfgutils {
......
......@@ -40,6 +40,7 @@ json reqConfig(json &info);
///
namespace strutils{
vector<string> split(const std::string& s, char delimiter);
bool isIpStr(string ip);
}//namespace strutils
namespace cfgutils {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论