提交 69badb46 authored 作者: blu's avatar blu

refactor communitation archtecture to use evdamon only

上级 dade3686
......@@ -382,6 +382,7 @@ togo_end:
// create default sn.
string sn = getStrRand(8);
info["sn"] = sn;
info["api-cloud"] = "http://127.0.0.1:8089";
spdlog::warn("no local sn set. create a new one: {}", sn);
auto tsNow = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
info["lastboot"] = tsNow;
......
......@@ -9,36 +9,95 @@ update: 2019/08/30
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wunused-lambda-capture"
#include <cstdlib>
#include "inc/tinythread.hpp"
#include "inc/httplib.h"
#include "inc/zmqhelper.hpp"
#include "inc/database.h"
#include "inc/json.hpp"
#include <cstdlib>
#include "inc/utils.hpp"
#include <unistd.h>
using namespace std;
using namespace httplib;
using namespace nlohmann;
class HttpSrv{
class EvDaemon{
private:
Server svr;
json config;
json info;
int port = 8088;
thread thMon;
string devSn;
int portRouter = 5549;
thread::id thIdMain;
void setMonitorThread() {
thMon = thread([this](){
//
/// module gid to process id
json mapModsToPids;
// for zmq
void *pRouterCtx = NULL, *pRouter = NULL;
/// tracking sub-systems: evmgr, evpuller, evpusher, evml*, evslicer etc.
json mapSubSystems;
int reloadCfg() {
int ret = LVDB::getSn(this->info);
if(ret < 0) {
spdlog::error("evdaemon {} failed to get info", this->devSn);
return 1;
}
this->devSn = this->info["sn"];
/// req config
json jret = cloudutils::reqConfig(this->info);
spdlog::info("evmgr {} got cloud config:\n{}", devSn, jret.dump(4));
// apply config
try{
if(jret["code"] != 0) {
spdlog::error("evdaemon {} reqConfig error: {}", this->devSn, jret["msg"].get<string>());
return 2;
}
json &data = jret["data"];
for(auto &[k,v]:data.items()) {
if(k == this->devSn) {
// startup evmgr
pid_t pid = fork();
ret = system("./evmgr");
if(ret == -1) {
spdlog::error("evdaemon {} failed to start evmgr", this->devSn);
break;
}
}
}
}catch(exception &e) {
spdlog::error("evdaemon {} exception to reload and apply configuration:\n{}", this->devSn, jret.dump(4));
return -1;
}
return 0;
}
void setupSubSystems() {
thMon = thread([this](){
while(true) {
int ret = reloadCfg();
if(ret != 0) {
spdlog::error("evdaemon {} failed to setup subsystems, please check log for more info", this->devSn);
}
this_thread::sleep_for(chrono::seconds(5));
}
});
}
protected:
public:
void run(){
setMonitorThread();
setupSubSystems();
// get config
svr.Get("/info", [this](const Request& req, Response& res){
LVDB::getSn(this->info);
......@@ -98,19 +157,37 @@ class HttpSrv{
svr.listen("0.0.0.0", 8088);
}
HttpSrv(){
char* strPort = getenv("PORT");
EvDaemon(){
char* strPort = getenv("DAEMON_PORT");
if(strPort != NULL) {
port = stoi(strPort);
}
strPort = getenv("ROUTER_PORT");
if(strPort != NULL) {
portRouter = stoi(strPort);
}
// setup zmq
int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT;
pRouterCtx = zmq_ctx_new();
pRouter = zmq_socket(pRouterCtx, ZMQ_ROUTER);
zmq_setsockopt (pRouter, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify));
string addr = "tcp://127.0.0.1:" + to_string(portRouter);
int ret = zmq_bind(pRouter, addr.c_str());
if(ret < 0) {
spdlog::error("evdaemon {} failed to bind port: {}", this->devSn, addr);
exit(1);
}
this->thIdMain = this_thread::get_id();
};
~HttpSrv(){};
~EvDaemon(){};
};
int main(){
json info;
LVDB::getSn(info);
spdlog::info("evdaemon: \n{}",info.dump(4));
HttpSrv srv;
EvDaemon srv;
srv.run();
}
\ No newline at end of file
......@@ -561,6 +561,8 @@ public:
virtual ~Client();
virtual bool is_valid() const;
// patched by bruce.lu
std::shared_ptr<Response> Get(const char *path, const Headers &headers, const Params &params);
std::shared_ptr<Response> Get(const char *path, Progress progress = nullptr);
std::shared_ptr<Response> Get(const char *path, const Headers &headers,
......@@ -2871,6 +2873,30 @@ inline std::shared_ptr<Response> Client::Get(const char *path,
return Get(path, Headers(), progress);
}
/// patched by bruce.lu
inline std::shared_ptr<Response> Client::Get(const char *path, const Headers &headers, const Params &params) {
Request req;
req.method = "GET";
req.path = path;
req.params = params;
req.headers = headers;
std::string query;
for (auto it = params.begin(); it != params.end(); ++it) {
if (it != params.begin()) { query += "&"; }
query += it->first;
query += "=";
query += detail::encode_url(it->second);
}
req.path = string(path) + "?" + query;
auto res = std::make_shared<Response>();
return send(req, *res) ? res : nullptr;
}
inline std::shared_ptr<Response>
Client::Get(const char *path, const Headers &headers, Progress progress) {
Request req;
......
......@@ -30,7 +30,7 @@ vector<string> split(const std::string& s, char delimiter)
return tokens;
}
/// ref: ../config.json
/// [deprecated] ref: ../config.json
json registry(json &conf, string sn, string module) {
json ret;
string api;
......@@ -64,6 +64,41 @@ json registry(json &conf, string sn, string module) {
return ret;
}
/// req config
json reqConfig(json &info){
json ret;
string api;
try{
api = info.at("api-cloud").get<string>();
Uri uri=Uri::Parse(api);
string sn = info.at("sn").get<string>();
if(uri.Host.empty()||uri.Port.empty()||uri.Protocol.find("http") == string::npos) {
string msg = "reqConfig error. invalid api-cloud in info: " + api;
ret["code"] = 1;
ret["msg"] = msg;
spdlog::error(msg);
return ret;
}
Params params;
params.emplace("sn", sn);
Client cli(uri.Host.c_str(), stoi(uri.Port));
auto res = cli.Get("/config", Headers(), params);
spdlog::debug("{} {} registry res from cloud : {}", __FILE__, __LINE__, res->body);
ret = json::parse(res->body);
}catch(exception &e) {
ret["code"] = -1;
string msg = string(__FILE__) + ":" + to_string(__LINE__) + string(": registry exception - ") + e.what();
ret["msg"] = msg;
spdlog::error(msg);
}
// /Client cli;
return ret;
}
} // namespace cloudutils
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论