提交 47edd465 authored 作者: blu's avatar blu

init

上级 f7aa0a70
...@@ -7,7 +7,7 @@ CFLAGS = $(DEBUG) -Wall ...@@ -7,7 +7,7 @@ CFLAGS = $(DEBUG) -Wall
LIBOPENCV = `pkg-config opencv --cflags --libs` LIBOPENCV = `pkg-config opencv --cflags --libs`
LIBFFMPEG = `pkg-config libavformat libavutil libavcodec libswscale --cflags --libs` LIBFFMPEG = `pkg-config libavformat libavutil libavcodec libswscale --cflags --libs`
LD_FLAGS=-Lvendor/lib -lpthread -lrocksdb LD_FLAGS=-Lvendor/lib -lpthread -lleveldb
#-static #-static
HEADERS=-Iinc -Ivendor/include HEADERS=-Iinc -Ivendor/include
LIBS= LIBS=
......
...@@ -4,8 +4,10 @@ ...@@ -4,8 +4,10 @@
#include <cstdlib> #include <cstdlib>
#include <mutex> #include <mutex>
#include <map> #include <map>
#include <fstream>
#include <iomanip>
using namespace rocksdb; using namespace leveldb;
string _config_default_tmpl = "{\"time\":0,\"code\":0,\"data\":{\"<SN_MGR>\":{\"sn\":\"<SN_MGR>\",\"addr\":\"127.0.0.1\",\"addr-cloud\":\"<cloud_addr>\",\"proto\":\"zmq\",\"port-cloud\":5556,\"port-router\":5550,\"status\":1,\"ipcs\":[{\"addr\":\"172.31.0.51\",\"proto\":\"rtsp\",\"user\":\"admin\",\"password\":\"FWBWTU\",\"status\":0,\"modules\":{\"evpuller\":[{\"sn\":\"<SN_PULLER>\",\"addr\":\"127.0.0.1\",\"iid\":1,\"port-pub\":5556,\"status\":0}],\"evpusher\":[{\"sn\":\"<SN_PUSHER>\",\"iid\":1,\"urlDest\":\"rtsp://40.73.41.176:554/test1\",\"user\":\"\",\"password\":\"\",\"token\":\"\",\"enabled\":1,\"status\":0}],\"evslicer\":[{\"sn\":\"<SN_SLICER>\",\"iid\":1,\"path\":\"slices\",\"enabled\":1,\"status\":0}],\"evml\":[{\"type\":\"motion\",\"sn\":\"<SN_ML>\",\"iid\":1,\"enabled\":1,\"status\":0}]}}]}}}"; string _config_default_tmpl = "{\"time\":0,\"code\":0,\"data\":{\"<SN_MGR>\":{\"sn\":\"<SN_MGR>\",\"addr\":\"127.0.0.1\",\"addr-cloud\":\"<cloud_addr>\",\"proto\":\"zmq\",\"port-cloud\":5556,\"port-router\":5550,\"status\":1,\"ipcs\":[{\"addr\":\"172.31.0.51\",\"proto\":\"rtsp\",\"user\":\"admin\",\"password\":\"FWBWTU\",\"status\":0,\"modules\":{\"evpuller\":[{\"sn\":\"<SN_PULLER>\",\"addr\":\"127.0.0.1\",\"iid\":1,\"port-pub\":5556,\"status\":0}],\"evpusher\":[{\"sn\":\"<SN_PUSHER>\",\"iid\":1,\"urlDest\":\"rtsp://40.73.41.176:554/test1\",\"user\":\"\",\"password\":\"\",\"token\":\"\",\"enabled\":1,\"status\":0}],\"evslicer\":[{\"sn\":\"<SN_SLICER>\",\"iid\":1,\"path\":\"slices\",\"enabled\":1,\"status\":0}],\"evml\":[{\"type\":\"motion\",\"sn\":\"<SN_ML>\",\"iid\":1,\"enabled\":1,\"status\":0}]}}]}}}";
...@@ -34,27 +36,26 @@ string getStrRand(int length) ...@@ -34,27 +36,26 @@ string getStrRand(int length)
namespace LVDB { namespace LVDB {
map<string, DB*> mappDB;
DB *_getDB(string fileName) { DB *_getDB(string fileName) {
static bool bmk = false;
DB *pdb = NULL; DB *pdb = NULL;
int ret = 0; int ret = 0;
if(mappDB.count(fileName) == 0) {
//
string mk = string("mkdir -p ") + LVDB_PATH;
ret = system(mk.c_str());
if(ret == -1) {
spdlog::error("failed to create db path: {}", LVDB_PATH);
}else{
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
Status s = DB::Open(options, fileName, &pdb); if(!bmk) {
if(!s.ok()) { string mkdir_ = "mkdir -p " + fileName;
spdlog::error("failed to open db {}: {}", fileName, s.ToString()); //spdlog::info("creating directory: {}", mkdir_);
ret = system(mkdir_.c_str());
if(-1 == ret) {
spdlog::error("failed to create directory for {}", fileName);
exit(1);
} }
mappDB[fileName] = pdb; bmk = true;
} }
}else{
pdb = mappDB[fileName]; Status s = DB::Open(options, fileName, &pdb);
if(!s.ok() && s.) {
spdlog::error("failed to open db {}: {}", fileName, s.ToString());
} }
assert(pdb != NULL); assert(pdb != NULL);
...@@ -75,12 +76,14 @@ namespace LVDB { ...@@ -75,12 +76,14 @@ namespace LVDB {
Status s = pdb->Get(ReadOptions(), key, &value); Status s = pdb->Get(ReadOptions(), key, &value);
if(!s.ok()) { if(!s.ok()) {
spdlog::debug("failed to get {} from {}: {}",key, fileName, s.ToString()); spdlog::debug("failed to get {} from {}: {}",key, fileName, s.ToString());
return -1; ret = -1;
} }else{
if(cb != NULL) { if(cb != NULL) {
ret = cb(value); ret = cb(value);
} }
}
delete pdb;
return ret; return ret;
} }
...@@ -103,18 +106,23 @@ namespace LVDB { ...@@ -103,18 +106,23 @@ namespace LVDB {
s = pdb->Put(WriteOptions(), key, value); s = pdb->Put(WriteOptions(), key, value);
if(!s.ok()) { if(!s.ok()) {
spdlog::error("failed to put {} -> {}: {}", key, value, s.ToString()); spdlog::error("failed to put {} -> {}: {}", key, value, s.ToString());
return -2; ret = -1;
goto togo_end;
} }
if(!oldVal.empty()) { if(!oldVal.empty()) {
s = pdb->Put(WriteOptions(), key + LVDB_KEY_SUFFIX_BACK, oldVal); s = pdb->Put(WriteOptions(), key + LVDB_KEY_SUFFIX_BACK, oldVal);
if(!s.ok()) { if(!s.ok()) {
spdlog::error("failed to put backup {} -> {}: {}", key, oldVal, s.ToString()); spdlog::error("failed to put backup {} -> {}: {}", key, oldVal, s.ToString());
return -2; ret = -2;
goto togo_end;
} }
} }
return 0; togo_end:
delete pdb;
return ret;
} }
int getValue(json &value, string key, string fileName, cb_verify_json cb) { int getValue(json &value, string key, string fileName, cb_verify_json cb) {
...@@ -156,13 +164,16 @@ namespace LVDB { ...@@ -156,13 +164,16 @@ namespace LVDB {
} }
int delValue(string key, string fileName) { int delValue(string key, string fileName) {
int ret = 0;
DB* pdb = _getDB(fileName); DB* pdb = _getDB(fileName);
Status s = pdb->Delete(WriteOptions(), key); Status s = pdb->Delete(WriteOptions(), key);
if(!s.ok()) { if(!s.ok()) {
spdlog::error("failed to delete key {}: {} in {}",s.ToString(), key, fileName); spdlog::error("failed to delete key {}: {} in {}",s.ToString(), key, fileName);
return -1; ret = -1;
} }
return 0;
delete pdb;
return ret;
} }
// sn // sn
...@@ -218,6 +229,26 @@ namespace LVDB { ...@@ -218,6 +229,26 @@ namespace LVDB {
return setValue(info, LVDB_KEY_SN, fileName, _validateSn); return setValue(info, LVDB_KEY_SN, fileName, _validateSn);
}; };
// int setSn(json &info) {
// std::ifstream i("file.json");
// json j;
// i >> j;
// // write prettified JSON to another file
// std::ofstream o("pretty.json");
// o << std::setw(4) << j << std::endl;
// return -1;
// }
// int getSn(json &info) {
// fstream file;
// file.open(LVDB_KEY_SN, ios::out|ios);
// if(snfile.e)
// json j;
// i >> j;
// // write prettified JSON to another file
// std::ofstream o("pretty.json");
// o << std::setw(4) << j << std::endl;
// }
// config // config
int _validateConfig(const json &config) { int _validateConfig(const json &config) {
if(config.count("data") == 0|| config["data"].size() == 0) { if(config.count("data") == 0|| config["data"].size() == 0) {
......
...@@ -72,7 +72,6 @@ private: ...@@ -72,7 +72,6 @@ private:
exit(1); exit(1);
} }
selfId = devSn + ":evpusher:" + to_string(iid);
while(!inited) { while(!inited) {
// TODO: req config // TODO: req config
bool found = false; bool found = false;
...@@ -90,8 +89,10 @@ private: ...@@ -90,8 +89,10 @@ private:
for(auto &j: ipcs) { for(auto &j: ipcs) {
json pullers = j["modules"]["evpusher"]; json pullers = j["modules"]["evpusher"];
for(auto &p:pullers) { for(auto &p:pullers) {
if(p["sn"] == devSn && p["iid"] == iid) { if(p["sn"] == devSn && p["status"] == 0 && p["enabled"] == 1/* && p["iid"] */ ) {
evpusher = p; evpusher = p;
iid = p["iid"];
selfId = devSn + ":evpusher:" + to_string(iid);
break; break;
} }
} }
...@@ -100,6 +101,8 @@ private: ...@@ -100,6 +101,8 @@ private:
break; break;
} }
} }
spdlog::info("evpusher {} {}, evpusher: {}",devSn, iid, evpusher.dump());
spdlog::info("evpusher {} {}, ipc: {}",devSn, iid, ipc.dump());
if(ipc.size()!=0 && evpusher.size()!=0) { if(ipc.size()!=0 && evpusher.size()!=0) {
found = true; found = true;
...@@ -108,7 +111,7 @@ private: ...@@ -108,7 +111,7 @@ private:
} }
if(!found) { if(!found) {
spdlog::error("evpusher {} {}: no valid config found. retrying load config...", devSn, iid); spdlog::error("evpusher {} {}: no valid config found. retrying load config: {}", devSn, iid, config.dump());
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(3));
continue; continue;
} }
...@@ -120,7 +123,7 @@ private: ...@@ -120,7 +123,7 @@ private:
urlPub = string("tcp://") + evpuller["addr"].get<string>() + ":" + to_string(evpuller["port-pub"]); urlPub = string("tcp://") + evpuller["addr"].get<string>() + ":" + to_string(evpuller["port-pub"]);
urlDealer = string("tcp://") + evmgr["addr"].get<string>() + ":" + to_string(evmgr["port-router"]); urlDealer = string("tcp://") + evmgr["addr"].get<string>() + ":" + to_string(evmgr["port-router"]);
spdlog::info("evpusher {} {} will connect to {} for sub, {} for router", devSn, iid, urlPub, urlDealer); spdlog::info("evpusher {} connect to {} for sub, {} for router", selfId, urlPub, urlDealer);
// TODO: multiple protocols support // TODO: multiple protocols support
urlOut = evpusher["urlDest"].get<string>(); urlOut = evpusher["urlDest"].get<string>();
} }
...@@ -152,7 +155,7 @@ private: ...@@ -152,7 +155,7 @@ private:
//TODO: //TODO:
} }
else { else {
spdlog::info("evpusher {} {} sent hello to router: {}", devSn, iid, mgrSn); spdlog::info("evpusher {} sent hello to router: {}", selfId, mgrSn);
} }
return ret; return ret;
...@@ -182,7 +185,7 @@ private: ...@@ -182,7 +185,7 @@ private:
ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size()); ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size()); ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {} failed setsockopts router: {}", devSn, iid, urlDealer); spdlog::error("evpusher {} failed setsockopts router {}: {}", selfId, urlDealer, zmq_strerror(zmq_errno()));
return -3; return -3;
} }
ret = zmq_connect(pDealer, urlDealer.c_str()); ret = zmq_connect(pDealer, urlDealer.c_str());
...@@ -210,7 +213,7 @@ private: ...@@ -210,7 +213,7 @@ private:
int ret = 0; int ret = 0;
// req avformatcontext packet // req avformatcontext packet
// send hello to puller // send hello to puller
spdlog::info("evpusher {} {} send hello to puller: {}", devSn, iid, pullerGid); spdlog::info("evpusher {} send hello to puller: {}", selfId, pullerGid);
vector<vector<uint8_t> > body; vector<vector<uint8_t> > body;
body.push_back(str2body(pullerGid)); body.push_back(str2body(pullerGid));
json meta; json meta;
......
#ifndef __DATABSE_LEVEL_DB_H__ #ifndef __DATABSE_LEVEL_DB_H__
#define __DATABSE_LEVEL_DB_H__ #define __DATABSE_LEVEL_DB_H__
// #include "leveldb/db.h" #include "leveldb/db.h"
#include "rocksdb/db.h" // #include "rocksdb/db.h"
#include "rocksdb/slice.h" // #include "rocksdb/slice.h"
#include "rocksdb/options.h" // #include "rocksdb/options.h"
#include "json.hpp" #include "json.hpp"
using namespace nlohmann; using namespace nlohmann;
...@@ -22,12 +22,14 @@ namespace LVDB { ...@@ -22,12 +22,14 @@ namespace LVDB {
#define LVDB_KEY_SN "SN" #define LVDB_KEY_SN "SN"
#define LVDB_KEY_CONFIG "CONFIG" #define LVDB_KEY_CONFIG "CONFIG"
// //
int delValue(string key, string fileName); int delValue(string key, string fileName);
// sn, updatetime, boottime // sn, updatetime, boottime
int setSn(json &info, string fileName=LVDB_FILE_GENERAL); int setSn(json &info,string fileName=LVDB_FILE_GENERAL);
int getSn(json &info, string fileName=LVDB_FILE_GENERAL); int getSn(json &info,string fileName=LVDB_FILE_GENERAL);
// cloudutils::config // cloudutils::config
int getLocalConfig(json &config, string fileName=LVDB_FILE_GENERAL); int getLocalConfig(json &config, string fileName=LVDB_FILE_GENERAL);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论