提交 452d784e authored 作者: blu's avatar blu

init

上级 3bbee587
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
#include <map> #include <map>
#include <fstream> #include <fstream>
#include <iomanip> #include <iomanip>
#include <thread>
#include <chrono>
using namespace leveldb; using namespace leveldb;
...@@ -36,9 +38,11 @@ string getStrRand(int length) ...@@ -36,9 +38,11 @@ string getStrRand(int length)
namespace LVDB { namespace LVDB {
DB *_getDB(string fileName) { #define LVDB_ERROR_HELD -1
#define LVDB_ERROR_OTHER -2
int _getDB(string fileName, DB** pdb) {
static bool bmk = false; static bool bmk = false;
DB *pdb = NULL;
int ret = 0; int ret = 0;
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
...@@ -53,14 +57,31 @@ namespace LVDB { ...@@ -53,14 +57,31 @@ namespace LVDB {
bmk = true; bmk = true;
} }
Status s = DB::Open(options, fileName, &pdb); int cnt = 0;
while(cnt < 10000){
Status s = DB::Open(options, fileName, pdb);
if(!s.ok()) { if(!s.ok()) {
size_t pos = s.ToString().find("already held by process");
if(pos != string::npos) {
spdlog::warn("failed to open db {}: {}", fileName, "already opened by other");
//wait for 100 * 10ms
this_thread::sleep_for(chrono::milliseconds(10));
}
else{
spdlog::error("failed to open db {}: {}", fileName, s.ToString()); spdlog::error("failed to open db {}: {}", fileName, s.ToString());
return LVDB_ERROR_OTHER;
}
}else{
break;
}
cnt++;
} }
assert(pdb != NULL); assert(pdb != NULL);
return pdb; return 0;
} }
int clearDB(string fileName) { int clearDB(string fileName) {
...@@ -72,7 +93,12 @@ namespace LVDB { ...@@ -72,7 +93,12 @@ namespace LVDB {
int getValue(string &value, string key, string fileName, cb_verify_str cb) { int getValue(string &value, string key, string fileName, cb_verify_str cb) {
int ret = 0; int ret = 0;
DB* pdb = _getDB(fileName); DB* pdb = NULL;
ret = _getDB(fileName, &pdb);
if(ret < 0) {
return ret;
}
Status s = pdb->Get(ReadOptions(), key, &value); Status s = pdb->Get(ReadOptions(), key, &value);
if(!s.ok()) { if(!s.ok()) {
spdlog::debug("failed to get {} from {}: {}",key, fileName, s.ToString()); spdlog::debug("failed to get {} from {}: {}",key, fileName, s.ToString());
...@@ -96,7 +122,11 @@ namespace LVDB { ...@@ -96,7 +122,11 @@ namespace LVDB {
} }
} }
DB* pdb = _getDB(fileName); DB* pdb = NULL;
ret = _getDB(fileName, &pdb);
if(ret < 0) {
return ret;
}
string oldVal; string oldVal;
Status s = pdb->Get(ReadOptions(), key, &oldVal); Status s = pdb->Get(ReadOptions(), key, &oldVal);
if(!s.ok()) { if(!s.ok()) {
...@@ -165,14 +195,18 @@ togo_end: ...@@ -165,14 +195,18 @@ togo_end:
int delValue(string key, string fileName) { int delValue(string key, string fileName) {
int ret = 0; int ret = 0;
DB* pdb = _getDB(fileName); DB* pdb = NULL;
ret = _getDB(fileName, &pdb);
if(ret < 0) {
return ret;
}
Status s = pdb->Delete(WriteOptions(), key); Status s = pdb->Delete(WriteOptions(), key);
if(!s.ok()) { if(!s.ok()) {
spdlog::error("failed to delete key {}: {} in {}",s.ToString(), key, fileName); spdlog::error("failed to delete key {}: {} in {}",s.ToString(), key, fileName);
ret = -1; ret = -1;
} }
delete pdb; //delete pdb;
return ret; return ret;
} }
......
...@@ -126,57 +126,18 @@ private: ...@@ -126,57 +126,18 @@ private:
spdlog::info("evpusher {} connect to {} for sub, {} for router", selfId, urlPub, urlDealer); spdlog::info("evpusher {} connect to {} for sub, {} for router", selfId, urlPub, urlDealer);
// TODO: multiple protocols support // TODO: multiple protocols support
urlOut = evpusher["urlDest"].get<string>(); urlOut = evpusher["urlDest"].get<string>();
}
catch(exception &e) {
spdlog::error("evpusher {} {} exception in EvPuller.init {:s} retrying", devSn, iid, e.what());
this_thread::sleep_for(chrono::seconds(3));
continue;
}
inited = true;
}
return 0;
}
int ping()
{
// send hello to router
int ret = 0;
vector<vector<uint8_t> >body;
// since identity is auto set
body.push_back(str2body(mgrSn+":0:0"));
body.push_back(str2body(EV_MSG_META_PING)); // blank meta
body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpusher {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
//TODO:
}
else {
spdlog::info("evpusher {} sent hello to router: {}", selfId, mgrSn);
}
return ret;
}
int setupMq()
{
int ret = 0;
// setup sub // setup sub
pSubCtx = zmq_ctx_new(); pSubCtx = zmq_ctx_new();
pSub = zmq_socket(pSubCtx, ZMQ_SUB); pSub = zmq_socket(pSubCtx, ZMQ_SUB);
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0); ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
if(ret != 0) { if(ret != 0) {
spdlog::error("evpusher {} {} failed set setsockopt: {}", devSn, iid, urlPub); spdlog::error("evpusher {} {} failed set setsockopt: {}", devSn, iid, urlPub);
return -1;
} }
ret = zmq_connect(pSub, urlPub.c_str()); ret = zmq_connect(pSub, urlPub.c_str());
if(ret != 0) { if(ret != 0) {
spdlog::error("evpusher {} {} failed connect pub: {}", devSn, iid, urlPub); spdlog::error("evpusher {} {} failed connect pub: {}", devSn, iid, urlPub);
return -2; goto togo_sc;
} }
// setup dealer // setup dealer
...@@ -186,24 +147,52 @@ private: ...@@ -186,24 +147,52 @@ private:
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size()); ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} failed setsockopts router {}: {}", selfId, urlDealer, zmq_strerror(zmq_errno())); spdlog::error("evpusher {} failed setsockopts router {}: {}", selfId, urlDealer, zmq_strerror(zmq_errno()));
return -3; goto togo_sc;
} }
ret = zmq_connect(pDealer, urlDealer.c_str()); ret = zmq_connect(pDealer, urlDealer.c_str());
if(ret != 0) { if(ret != 0) {
spdlog::error("evpusher {} {} failed connect dealer: {}", devSn, iid, urlDealer); spdlog::error("evpusher {} {} failed connect dealer: {}", devSn, iid, urlDealer);
return -4; goto togo_sc;
}
//update status and ping
evpusher["status"] = 1;
ret = LVDB::setLocalConfig(config);
if(ret < 0) {
spdlog::error("evpusher {} failed to set config: {}", selfId, config.dump());
}
spdlog::info("new config: {}", config.dump());
ping();
break;
togo_sc:
this_thread::sleep_for(chrono::seconds(2));
continue;
}
catch(exception &e) {
spdlog::error("evpusher {} {} exception in EvPuller.init {:s} retrying", devSn, iid, e.what());
this_thread::sleep_for(chrono::seconds(3));
continue;
}
inited = true;
}
return 0;
} }
//ping
ret = ping();
// TODO: don't need this anymore, since I've used the draft feature of ZOUTER_NOTIFICATION instead
// thPing = thread([&,this]() {
// while(true) {
// this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS-2));
// ping();
// }
// });
// thPing.detach(); int ping()
{
// send hello to router
int ret = 0;
vector<vector<uint8_t> >body = {str2body(mgrSn+":0:0"), str2body(EV_MSG_META_PING),str2body(MSG_HELLO)};
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpusher {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
//TODO:
}
else {
spdlog::info("evpusher {} sent hello to router: {}", selfId, mgrSn);
}
return ret; return ret;
} }
...@@ -448,7 +437,6 @@ public: ...@@ -448,7 +437,6 @@ public:
EvPusher() EvPusher()
{ {
init(); init();
setupMq();
getInputFormat(); getInputFormat();
setupStream(); setupStream();
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论