/* module: evpuller description: author: Bruce.Lu update: 2019/08/23 */ #pragma GCC diagnostic ignored "-Wpragmas" #pragma GCC diagnostic ignored "-Wunknown-warning-option" #pragma GCC diagnostic ignored "-Wunused-private-field" #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" #pragma GCC diagnostic ignored "-Wunused-but-set-variable" #include #include #include #include #include #include #include #ifdef OS_LINUX #include namespace fs = std::filesystem; #endif #include "inc/zmqhelper.hpp" #include "inc/tinythread.hpp" #include "inc/common.hpp" #include "inc/database.h" using namespace std; using namespace zmqhelper; class RepSrv: public TinyThread { private: string mgrSn; string devSn; string selfId; int iid; string urlRep; const char * bytes; int len; void *pDealer=NULL; thread thPing; int ping() { int ret = 0; vector >body; // since identity is auto set body.push_back(str2body(mgrSn + ":0:0")); body.push_back(str2body(EV_MSG_META_PING)); body.push_back(str2body(MSG_HELLO)); ret = z_send_multiple(pDealer, body); if(ret < 0) { spdlog::error("evpuller {} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno())); } return ret; } int handleMsg(vector > v) { int ret = 0; auto msgBody = data2body(const_cast(bytes), len); try { // rep framectx // TODO: verify sender id string sMeta = body2str(v[1]); string peerId = body2str(v[0]); auto meta = json::parse(sMeta); if(meta["type"].get() == EV_MSG_META_AVFORMATCTX) { vector > rep = {v[0], v[1], msgBody}; ret = z_send_multiple(pDealer, rep); if(ret < 0) { spdlog::error("evpuller {} failed to send avformatctx data to requester {}: {}", selfId, peerId, zmq_strerror(zmq_errno())); }else{ spdlog::info("evpuller {} success to send avformatctx data to requester {}", selfId, peerId); } } else if(meta["type"].get() == EV_MSG_META_EVENT) { // event msg spdlog::info("evpuller {} received event: {}", selfId, body2str(v[2])); } else { spdlog::error("evpuller {} unknown meta from {}: {}", selfId, body2str(v[0]), body2str(v[1])); } } catch(exception &e) { spdlog::error("evpuller {} excpetion parse request from {}: {}", selfId, body2str(v[0]), body2str(v[1])); } return ret; } protected: void run() { int ret = 0; bool bStopSig = false; // declare ready to router ping(); // TODO: don't need this anymore, since I've used the draft feature of ZOUTER_NOTIFICATION instead // thPing = thread([&,this]() { // while(true) { // this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS-2)); // ping(); // } // }); // thPing.detach(); // init response msg while (true) { if(checkStop() == true) { bStopSig = true; break; } spdlog::info("evpuller {} waiting for req", selfId); // proto: [sender_id] [meta] [body] auto v = z_recv_multiple(pDealer, false); if(v.size() != 3) { //TODO: spdlog::error("evpuller {}, repSrv received invalid message: {}", selfId, v.size()); continue; } handleMsg(v); } } public: RepSrv() = delete; RepSrv(RepSrv &) = delete; RepSrv(RepSrv&&) = delete; RepSrv(string mgrSn, string devSn, int iid, const char* formatBytes, int len, void *pDealer):mgrSn(mgrSn),devSn(devSn), iid(iid), bytes(formatBytes), len(len), pDealer(pDealer) { selfId = devSn+":evpuller:" + to_string(iid); }; ~RepSrv() {}; }; class EvPuller: public TinyThread { private: void *pPubCtx = NULL; // for packets publishing void *pPub = NULL; void *pDealerCtx = NULL; void *pDealer = NULL; AVFormatContext *pAVFormatInput = NULL; string urlIn, urlPub, urlDealer, mgrSn, devSn, selfId, ipcPort; int *streamList = NULL, numStreams = 0, iid; time_t tsLastBoot, tsUpdateTime; json config; int init() { bool inited = false; // TODO: load config from local db json info; int ret = LVDB::getSn(info); if(ret < 0) { spdlog::error("failed to get sn"); exit(1); } tsLastBoot = info["lastboot"]; tsUpdateTime=info["updatetime"]; spdlog::info("evpuller info: sn = {}, lastboot = {}, updatetime = {}", info["sn"].get(), ctime(&tsLastBoot), ctime(&tsUpdateTime)); devSn = info["sn"]; while(!inited) { // TODO: req config bool found = false; string user, passwd, addr; try { ret = LVDB::getLocalConfig(config); if(ret < 0) { spdlog::error("failed to get local configuration"); exit(1); } spdlog::info("config dump: {:s}", config.dump()); json data = config["data"]; // first try to check mgr with same sn json evpuller; json evmgr; json ipc; for (auto& [key, value] : data.items()) { //std::cout << key << " : " << dynamic_cast(value).dump() << "\n"; evmgr = value; json ipcs = evmgr["ipcs"]; for(auto &j: ipcs) { json pullers = j["modules"]["evpuller"]; for(auto &p:pullers) { if(p["sn"] == devSn && p["status"] == 0) { evpuller = p; iid = p["iid"]; break; } } if(evpuller.size() != 0) { ipc = j; break; } } if(ipc.size()!=0 && evpuller.size()!=0) { found = true; break; } } if(!found) { spdlog::error("evpuller {} no valid config found", devSn); goto togo_sleep_continue; } selfId = devSn+":evpuller:" + to_string(iid); mgrSn = evmgr["sn"]; user = ipc["user"]; passwd = ipc["password"]; // default stream port if(ipc.count("port") == 0) { ipcPort = "554"; }else{ ipcPort = to_string(ipc["port"]); } urlIn = "rtsp://" + user + ":" + passwd + "@" + ipc["addr"].get() + ":" + ipcPort + "/h264/ch1/sub/av_stream"; addr = evpuller["addr"].get(); spdlog::info("evpuller {} connecting to IPC {}", selfId, urlIn); if(addr == "*" || addr == "0.0.0.0") { spdlog::error("evpuller {} invalid addr {} for pub", selfId, evpuller.dump()); goto togo_sleep_continue; } urlPub = string("tcp://*:") + to_string(evpuller["port-pub"]); // urlRep = string("tcp://") +data["addr"].get() + ":" + to_string(data["port-rep"]); urlDealer = "tcp://" + evmgr["addr"].get() + string(":") + to_string(evmgr["port-router"]); spdlog::info("evpuller {} bind on {} for pub, connect to {} for dealer", selfId, urlPub, urlDealer); pPubCtx = zmq_ctx_new(); pPub = zmq_socket(pPubCtx, ZMQ_PUB); ret = zmq_bind(pPub, urlPub.c_str()); if(ret < 0) { spdlog::error("evpuller {} failed to bind to {}", selfId, urlPub); goto togo_sleep_continue; } pDealerCtx = zmq_ctx_new(); pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER); ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size()); if(ret < 0) { spdlog::error("evpuller {} failed to set identity", selfId); goto togo_sleep_continue; } ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size()); if(ret < 0) { spdlog::error("evpusher {} {} failed setsockopts router: {}", selfId, urlDealer); goto togo_sleep_continue; } ret = zmq_connect(pDealer, urlDealer.c_str()); if(ret < 0) { spdlog::error("evpuller {} failed to connect to router {}", selfId, urlDealer); goto togo_sleep_continue; } } catch(exception &e) { this_thread::sleep_for(chrono::seconds(3)); spdlog::error("evpuller {} exception in EvPuller.init {:s}, retrying... ", selfId, e.what()); continue; } inited = true; spdlog::info("successfully load config"); break; togo_sleep_continue: this_thread::sleep_for(chrono::seconds(3)); } return 0; } protected: // Function to be executed by thread function void run() { int ret = 0; if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, NULL)) < 0) { spdlog::error("Could not open input file {}", urlIn); } if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) { spdlog::error("Failed to retrieve input stream information"); } pAVFormatInput->flags = AVFMT_FLAG_NOBUFFER | AVFMT_FLAG_FLUSH_PACKETS; numStreams = pAVFormatInput->nb_streams; int *streamList = (int *)av_mallocz_array(numStreams, sizeof(*streamList)); if (!streamList) { ret = AVERROR(ENOMEM); spdlog::error("failed create avformatcontext for output: {}", av_err2str(AVERROR(ENOMEM))); } // serialize formatctx to bytes char *pBytes = NULL; ret = AVFormatCtxSerializer::encode(pAVFormatInput, &pBytes); auto repSrv = RepSrv(mgrSn, devSn, iid, pBytes, ret, pDealer); repSrv.detach(); // find all video & audio streams for remuxing int i = 0, streamIdx = 0; for (; i < pAVFormatInput->nb_streams; i++) { AVStream *in_stream = pAVFormatInput->streams[i]; AVCodecParameters *in_codecpar = in_stream->codecpar; if (in_codecpar->codec_type != AVMEDIA_TYPE_AUDIO && in_codecpar->codec_type != AVMEDIA_TYPE_VIDEO && in_codecpar->codec_type != AVMEDIA_TYPE_SUBTITLE) { streamList[i] = -1; continue; } streamList[i] = streamIdx++; } bool bStopSig = false; uint64_t pktCnt = 0; spdlog::info("evpulelr {} reading packets from {}", selfId, urlIn); while (true) { if(checkStop() == true) { bStopSig = true; break; } AVStream *in_stream; AVPacket packet; zmq_msg_t msg; ret = av_read_frame(pAVFormatInput, &packet); if (ret < 0) { spdlog::error("failed read packet: {}", av_err2str(ret)); break; } in_stream = pAVFormatInput->streams[packet.stream_index]; if (packet.stream_index >= numStreams || streamList[packet.stream_index] < 0) { av_packet_unref(&packet); continue; } if(pktCnt % EV_LOG_PACKET_CNT == 0) { spdlog::info("pktCnt: {:d}", pktCnt); } pktCnt++; packet.stream_index = streamList[packet.stream_index]; // serialize packet to raw bytes char * data = NULL; int size = AVPacketSerializer::encode(packet, &data); zmq_msg_init_data(&msg, (void*)data, size, mqPacketFree, NULL); zmq_send_const(pPub, zmq_msg_data(&msg), size, 0); av_packet_unref(&packet); } free(pBytes); // TODO: if(ret < 0 && !bStopSig) { // reconnect } else { std::cout << "Task End" << std::endl; } } public: EvPuller() { init(); } ~EvPuller() { if(pPub != NULL) { zmq_close(pPub); pPub = NULL; } if(pPubCtx != NULL) { zmq_ctx_destroy(pPubCtx); pPubCtx = NULL; } if(pDealer != NULL) { zmq_close(pDealer); pDealer= NULL; } if(pDealerCtx != NULL) { zmq_ctx_destroy(pPubCtx); pDealerCtx = NULL; } } }; int main(int argc, char **argv) { av_log_set_level(AV_LOG_ERROR); spdlog::set_level(spdlog::level::info); //DB::exec(NULL, NULL, NULL,NULL); auto evp = EvPuller(); evp.join(); return 0; }