提交 0ed79758 authored 作者: blu's avatar blu

cloud registry

上级 c6336990
...@@ -3,7 +3,8 @@ ...@@ -3,7 +3,8 @@
"python.pythonPath": "/opt/apps/conda/anaconda3/bin/python", "python.pythonPath": "/opt/apps/conda/anaconda3/bin/python",
"files.associations": { "files.associations": {
"chrono": "cpp", "chrono": "cpp",
"optional": "cpp" "optional": "cpp",
"string": "cpp"
}, },
"C_Cpp.errorSquiggles": "Disabled" "C_Cpp.errorSquiggles": "Disabled"
} }
\ No newline at end of file
...@@ -22,22 +22,22 @@ all: evmgr evpuller evpusher evslicer evmlmotion evdaemon evcloudsvc ...@@ -22,22 +22,22 @@ all: evmgr evpuller evpusher evslicer evmlmotion evdaemon evcloudsvc
sqlite3.o: vendor/sqlite/sqlite3.c sqlite3.o: vendor/sqlite/sqlite3.c
gcc -D SQLITE_THREADSAFE=1 -c vendor/sqlite/sqlite3.c gcc -D SQLITE_THREADSAFE=1 -c vendor/sqlite/sqlite3.c
evmgr: evmgr.cpp database.cpp inc/common.hpp inc/database.h inc/zmqhelper.hpp inc/tinythread.hpp $(SQLITE_SRC) evmgr: evmgr.cpp database.cpp inc/utils.hpp inc/common.hpp inc/database.h inc/zmqhelper.hpp inc/tinythread.hpp $(SQLITE_SRC)
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evmgr evmgr.cpp $(SQLITE) database.cpp $(HEADERS) $(LIBFFMPEG) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evmgr evmgr.cpp $(SQLITE) database.cpp $(HEADERS) $(LIBFFMPEG) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evpuller: evpuller.cpp database.cpp inc/common.hpp inc/database.h inc/zmqhelper.hpp inc/tinythread.hpp $(SQLITE_SRC) evpuller: evpuller.cpp database.cpp inc/utils.hpp inc/common.hpp inc/database.h inc/zmqhelper.hpp inc/tinythread.hpp $(SQLITE_SRC)
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evpuller evpuller.cpp $(SQLITE) database.cpp $(HEADERS) $(LIBFFMPEG) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evpuller evpuller.cpp $(SQLITE) database.cpp $(HEADERS) $(LIBFFMPEG) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evpusher: evpusher.cpp inc/common.hpp inc/tinythread.hpp inc/database.h inc/zmqhelper.hpp database.cpp $(SQLITE_SRC) evpusher: evpusher.cpp inc/common.hpp inc/utils.hpp inc/tinythread.hpp inc/database.h inc/zmqhelper.hpp database.cpp $(SQLITE_SRC)
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evpusher evpusher.cpp database.cpp $(SQLITE) $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evpusher evpusher.cpp database.cpp $(SQLITE) $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evslicer: evslicer.cpp inc/common.hpp inc/tinythread.hpp inc/database.h inc/zmqhelper.hpp database.cpp $(SQLITE_SRC) evslicer: evslicer.cpp inc/common.hpp inc/utils.hpp inc/tinythread.hpp inc/database.h inc/zmqhelper.hpp database.cpp $(SQLITE_SRC)
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evslicer evslicer.cpp database.cpp $(SQLITE) $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evslicer evslicer.cpp database.cpp $(SQLITE) $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evmlmotion: evmlmotion.cpp inc/common.hpp inc/avcvhelpers.hpp inc/database.h inc/zmqhelper.hpp inc/tinythread.hpp database.cpp $(SQLITE_SRC) evmlmotion: evmlmotion.cpp inc/common.hpp inc/utils.hpp inc/avcvhelpers.hpp inc/database.h inc/zmqhelper.hpp inc/tinythread.hpp database.cpp $(SQLITE_SRC)
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evmlmotion evmlmotion.cpp database.cpp $(SQLITE) $(LIBFFMPEG) $(HEADERS) $(LIBOPENCV) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evmlmotion evmlmotion.cpp database.cpp $(SQLITE) $(LIBFFMPEG) $(HEADERS) $(LIBOPENCV) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evdaemon: evdaemon.cpp inc/common.hpp inc/database.h inc/zmqhelper.hpp inc/tinythread.hpp database.cpp evdaemon: evdaemon.cpp inc/common.hpp inc/utils.hpp inc/database.h inc/zmqhelper.hpp inc/tinythread.hpp database.cpp
$(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evdaemon evdaemon.cpp database.cpp $(SQLITE) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) $(LD_FLAGS) -o evdaemon evdaemon.cpp database.cpp $(SQLITE) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evcloudsvc: evcloudsvc.cpp inc/utils.hpp inc/database.h inc/zmqhelper.hpp inc/tinythread.hpp database.cpp evcloudsvc: evcloudsvc.cpp inc/utils.hpp inc/database.h inc/zmqhelper.hpp inc/tinythread.hpp database.cpp
......
...@@ -103,7 +103,7 @@ int setInfo(void* info, const char*fileName) ...@@ -103,7 +103,7 @@ int setInfo(void* info, const char*fileName)
sqlite3 * pdb = NULL; sqlite3 * pdb = NULL;
auto v = static_cast<json*>(info); auto v = static_cast<json*>(info);
if(v==NULL||v->count("sn") == 0 ||v->count("lastboot") == 0) { if(v==NULL||v->count("sn") == 0 ||v->count("lastboot") == 0) {
spdlog::error("failed to set info to file {}, parameter error: {}", fileName, v->dump()); spdlog::error("failed to set info to file {}, parameter error: {}", fileName, v->dump(4));
return -1; return -1;
} }
...@@ -183,7 +183,7 @@ int getInfo(void *info, int active, const char*fileName) ...@@ -183,7 +183,7 @@ int getInfo(void *info, int active, const char*fileName)
return sqlite3_errcode(pdb); return sqlite3_errcode(pdb);
} }
spdlog::debug("getInfo to file {}: {}", fileName, v->dump()); spdlog::debug("getInfo to file {}: {}", fileName, v->dump(4));
return 0; return 0;
} }
...@@ -213,7 +213,7 @@ int saveLocalConfigration(json &config, string fileName) ...@@ -213,7 +213,7 @@ int saveLocalConfigration(json &config, string fileName)
std::ofstream o(fileName); std::ofstream o(fileName);
o << std::setw(4) << config << std::endl; o << std::setw(4) << config << std::endl;
}catch(exception &e) { }catch(exception &e) {
spdlog::error("saveLocalConfigration failed to write configuration to file {}: {}\n{}", fileName, e.what(), config.dump()); spdlog::error("saveLocalConfigration failed to write configuration to file {}: {}\n{}", fileName, e.what(), config.dump(4));
return -2; return -2;
} }
......
...@@ -38,7 +38,7 @@ class HttpSrv{ ...@@ -38,7 +38,7 @@ class HttpSrv{
try{ try{
if(newConfig.count("data") == 0 || newConfig["data"].size() == 0) { if(newConfig.count("data") == 0 || newConfig["data"].size() == 0) {
ret["code"] = 1; ret["code"] = 1;
ret["msg"] = "evcloudsvc invalid config body received: " + newConfig.dump(); ret["msg"] = "evcloudsvc invalid config body received: " + newConfig.dump(4);
spdlog::error(ret["msg"].get<string>()); spdlog::error(ret["msg"].get<string>());
}else{ }else{
json &data = newConfig["data"]; json &data = newConfig["data"];
...@@ -77,7 +77,7 @@ class HttpSrv{ ...@@ -77,7 +77,7 @@ class HttpSrv{
// modkey -> sn_of_evmgr // modkey -> sn_of_evmgr
this->configMap[modKey] = k; this->configMap[modKey] = k;
}else{ }else{
string msg = "evcloudsvc invalid config: " + data.dump();; string msg = "evcloudsvc invalid config: " + data.dump(4);;
ret["code"] = -1; ret["code"] = -1;
ret["msg"] = msg; ret["msg"] = msg;
spdlog::error(msg); spdlog::error(msg);
...@@ -104,7 +104,7 @@ class HttpSrv{ ...@@ -104,7 +104,7 @@ class HttpSrv{
//save //save
iret = LVDB::setLocalConfig(evmgrData, k); iret = LVDB::setLocalConfig(evmgrData, k);
if(iret < 0) { if(iret < 0) {
string msg = "failed to save config " + k + " -> " + evmgrData.dump(); string msg = "failed to save config " + k + " -> " + evmgrData.dump(4);
spdlog::error(msg); spdlog::error(msg);
ret["code"] = iret; ret["code"] = iret;
ret["msg"] = msg; ret["msg"] = msg;
...@@ -176,7 +176,7 @@ class HttpSrv{ ...@@ -176,7 +176,7 @@ class HttpSrv{
ret = this->config(cfg); ret = this->config(cfg);
if(ret["code"] == 0) { if(ret["code"] == 0) {
}else{ }else{
spdlog::error("failed to config: {}", ret.dump()); spdlog::error("failed to config: {}", ret.dump(4));
} }
}else{ }else{
key = configMap[modname]; key = configMap[modname];
...@@ -195,7 +195,7 @@ class HttpSrv{ ...@@ -195,7 +195,7 @@ class HttpSrv{
ret = this->config(cfg); ret = this->config(cfg);
}else{ }else{
json diff = json::diff(cfg, data); json diff = json::diff(cfg, data);
spdlog::info("evcloudsvc diff: {}", diff.dump()); spdlog::info("evcloudsvc diff: {}", diff.dump(4));
ret["data"] = diff; ret["data"] = diff;
} }
} }
......
...@@ -75,7 +75,7 @@ class HttpSrv{ ...@@ -75,7 +75,7 @@ class HttpSrv{
newConfig["data"] = json::parse(req.body)["data"]; newConfig["data"] = json::parse(req.body)["data"];
LVDB::setLocalConfig(newConfig); LVDB::setLocalConfig(newConfig);
spdlog::info("evmgr new config: {}", newConfig.dump()); spdlog::info("evmgr new config: {}", newConfig.dump(4));
// TODO: restart other components // TODO: restart other components
// //
}catch(exception &e) { }catch(exception &e) {
...@@ -106,7 +106,7 @@ class HttpSrv{ ...@@ -106,7 +106,7 @@ class HttpSrv{
int main(){ int main(){
json info; json info;
LVDB::getSn(info); LVDB::getSn(info);
spdlog::info("evdaemon: {}",info.dump()); spdlog::info("evdaemon: \n{}",info.dump(4));
HttpSrv srv; HttpSrv srv;
srv.run(); srv.run();
} }
\ No newline at end of file
...@@ -52,6 +52,8 @@ private: ...@@ -52,6 +52,8 @@ private:
void init() void init()
{ {
int ret; int ret;
json jret;
bool inited = false; bool inited = false;
// TODO: load config from local db // TODO: load config from local db
json info; json info;
...@@ -73,29 +75,56 @@ private: ...@@ -73,29 +75,56 @@ private:
exit(1); exit(1);
} }
spdlog::info("evmgr local config:\n{}", config.dump(4));
// // set all module status to 0
// set all module status to 0 // ret = LVDB::traverseConfigureModules(config, [](string modname, json &m, void* pUser)->int{
ret = LVDB::traverseConfigureModules(config, [](string modname, json &m, void* pUser)->int{ // if(m.count("status") != 0)
if(m.count("status") != 0) // {
{ // //cout << modname <<" ," << m.dump() << endl;
//cout << modname <<" ," << m.dump() << endl; // m["status"] = 0;
m["status"] = 0; // }
} // return 0;
return 0; // });
});
if(ret < 0) {
spdlog::error("evmgr {} failed to set module status to 0", devSn); // if(ret < 0) {
}else{ // spdlog::error("evmgr {} failed to set module status to 0", devSn);
//spdlog::info("new config: {}", config.dump()); // }else{
LVDB::setLocalConfig(config); // //spdlog::info("new config: {}", config.dump());
} // LVDB::setLocalConfig(config);
// }
int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT; int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT;
string proto, addr; string proto, addr;
while(!inited) { while(!inited) {
try { try {
spdlog::info("config dumps: \n{}", config.dump()); // register
jret = cloudutils::registry(config, info["sn"], "evmgr");
spdlog::info("evmgr {} get config from cloud:\n{}", devSn, jret.dump(4));
if(jret["code"] != 0) {
spdlog::error("evmgr {} failed to registry: {}", devSn, jret["msg"].get<string>());
goto togo_sleep_continue;
}else{
if(jret["msg"] == "diff") {
json &data = jret["data"];
if(data.size() == 1 && data[0].at("path") == "/lastupdated") {
// no diff
spdlog::info("evmgr {} no change in config", devSn);
}else{
// patch
spdlog::info("evmgr config changed in cloud, merge patch:\n{}", jret["data"].dump(4));
config.merge_patch(jret["data"]);
ret = LVDB::setLocalConfig(config);
if(ret < 0) {
spdlog::error("evmgr {} failed to update local config:\n{}", devSn, config.dump(4));
goto togo_sleep_continue;
}
}
}
}
spdlog::info("new config dumps: \n{}", config.dump(4));
// TODO: verify sn // TODO: verify sn
if(!config.count("data")||!config["data"].count(devSn)||!config["data"][devSn].count("ipcs")) { if(!config.count("data")||!config["data"].count(devSn)||!config["data"][devSn].count("ipcs")) {
spdlog::error("evmgr {} invalid config. reload now...", devSn); spdlog::error("evmgr {} invalid config. reload now...", devSn);
...@@ -110,7 +139,7 @@ private: ...@@ -110,7 +139,7 @@ private:
// //
if(jmgr["addr"].get<string>() == "*" || jmgr["addr"].get<string>() == "0.0.0.0") { if(jmgr["addr"].get<string>() == "*" || jmgr["addr"].get<string>() == "0.0.0.0") {
spdlog::error("invalid mgr address: {} in config:\n{}", jmgr["addr"].get<string>(), jmgr.dump()); spdlog::error("invalid mgr address: {} in config:\n{}", jmgr["addr"].get<string>(), jmgr.dump(4));
goto togo_sleep_continue; goto togo_sleep_continue;
} }
......
...@@ -220,12 +220,8 @@ togo_sc: ...@@ -220,12 +220,8 @@ togo_sc:
{ {
// send hello to router // send hello to router
int ret = 0; int ret = 0;
vector<vector<uint8_t> >body; /// identity is auto set
// since identity is auto set vector<vector<uint8_t> >body = {str2body(mgrSn+":0:0"), str2body(EV_MSG_META_PING), str2body(MSG_HELLO)};
body.push_back(str2body(mgrSn+":0:0"));
body.push_back(str2body(EV_MSG_META_PING)); // blank meta
body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
spdlog::error("evslicer {} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno())); spdlog::error("evslicer {} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
......
...@@ -83,6 +83,10 @@ typedef int socket_t; ...@@ -83,6 +83,10 @@ typedef int socket_t;
#include <sys/stat.h> #include <sys/stat.h>
#include <thread> #include <thread>
// for uri
#include <algorithm> // find
#ifdef CPPHTTPLIB_OPENSSL_SUPPORT #ifdef CPPHTTPLIB_OPENSSL_SUPPORT
#include <openssl/err.h> #include <openssl/err.h>
#include <openssl/ssl.h> #include <openssl/ssl.h>
...@@ -118,6 +122,73 @@ inline const unsigned char *ASN1_STRING_get0_data(const ASN1_STRING *asn1) { ...@@ -118,6 +122,73 @@ inline const unsigned char *ASN1_STRING_get0_data(const ASN1_STRING *asn1) {
#define CPPHTTPLIB_THREAD_POOL_COUNT 8 #define CPPHTTPLIB_THREAD_POOL_COUNT 8
namespace httplib { namespace httplib {
struct Uri
{
public:
std::string QueryString, Path, Protocol, Host, Port;
static Uri Parse(const std::string &uri)
{
Uri result;
typedef std::string::const_iterator iterator_t;
if (uri.length() == 0)
return result;
iterator_t uriEnd = uri.end();
// get query start
iterator_t queryStart = std::find(uri.begin(), uriEnd, L'?');
// protocol
iterator_t protocolStart = uri.begin();
iterator_t protocolEnd = std::find(protocolStart, uriEnd, L':'); //"://");
if (protocolEnd != uriEnd)
{
std::string prot = &*(protocolEnd);
if ((prot.length() > 3) && (prot.substr(0, 3) == "://"))
{
result.Protocol = std::string(protocolStart, protocolEnd);
protocolEnd += 3; // ://
}
else
protocolEnd = uri.begin(); // no protocol
}
else
protocolEnd = uri.begin(); // no protocol
// host
iterator_t hostStart = protocolEnd;
iterator_t pathStart = std::find(hostStart, uriEnd, '/'); // get pathStart
iterator_t hostEnd = std::find(protocolEnd,
(pathStart != uriEnd) ? pathStart : queryStart,
':'); // check for port
result.Host = std::string(hostStart, hostEnd);
// port
if ((hostEnd != uriEnd) && ((&*(hostEnd))[0] == ':')) // we have a port
{
hostEnd++;
iterator_t portEnd = (pathStart != uriEnd) ? pathStart : queryStart;
result.Port = std::string(hostEnd, portEnd);
}
// path
if (pathStart != uriEnd)
result.Path = std::string(pathStart, queryStart);
// query
if (queryStart != uriEnd)
result.QueryString = std::string(queryStart, uri.end());
return result;
} // Parse
}; // uri
namespace detail { namespace detail {
...@@ -505,6 +576,13 @@ public: ...@@ -505,6 +576,13 @@ public:
std::shared_ptr<Response> Head(const char *path); std::shared_ptr<Response> Head(const char *path);
std::shared_ptr<Response> Head(const char *path, const Headers &headers); std::shared_ptr<Response> Head(const char *path, const Headers &headers);
/// patched by bruce.lu
std::shared_ptr<Response> Post(const char *path,
const Headers &headers,
const Params & params,
const std::string &body,
const char *content_type);
std::shared_ptr<Response> Post(const char *path, const std::string &body, std::shared_ptr<Response> Post(const char *path, const std::string &body,
const char *content_type); const char *content_type);
std::shared_ptr<Response> Post(const char *path, const Headers &headers, std::shared_ptr<Response> Post(const char *path, const Headers &headers,
...@@ -2848,6 +2926,35 @@ inline std::shared_ptr<Response> Client::Post(const char *path, ...@@ -2848,6 +2926,35 @@ inline std::shared_ptr<Response> Client::Post(const char *path,
return Post(path, Headers(), body, content_type); return Post(path, Headers(), body, content_type);
} }
/// patched by bruce.lu
inline std::shared_ptr<Response> Client::Post(const char *path,
const Headers &headers,
const Params & params,
const std::string &body,
const char *content_type) {
Request req;
req.method = "POST";
req.headers = headers;
req.params = params;
std::string query;
for (auto it = params.begin(); it != params.end(); ++it) {
if (it != params.begin()) { query += "&"; }
query += it->first;
query += "=";
query += detail::encode_url(it->second);
}
req.path = string(path) + "?" + query;
req.headers.emplace("Content-Type", content_type);
req.body = body;
auto res = std::make_shared<Response>();
return send(req, *res) ? res : nullptr;
}
inline std::shared_ptr<Response> Client::Post(const char *path, inline std::shared_ptr<Response> Client::Post(const char *path,
const Headers &headers, const Headers &headers,
const std::string &body, const std::string &body,
......
...@@ -34,12 +34,30 @@ vector<string> split(const std::string& s, char delimiter) ...@@ -34,12 +34,30 @@ vector<string> split(const std::string& s, char delimiter)
json registry(json &conf, string sn, string module) { json registry(json &conf, string sn, string module) {
json ret; json ret;
string api; string api;
try{ try{
api = conf.at(sn).at("api-cloud").get<string>() + "/register"; api = conf.at("data").at(sn).at("api-cloud").get<string>() + "/register";
Uri uri=Uri::Parse(api);
if(uri.Host.empty()||uri.Port.empty()||uri.Protocol.find("http") == string::npos||uri.Path.empty()) {
string msg = "registry error. invalid api-cloud in config: " + api;
ret["code"] = 1;
ret["msg"] = msg;
spdlog::error(msg);
return ret;
}
}catch(exception &e) { Params params;
params.emplace("sn", sn);
params.emplace("module", module);
Client cli(uri.Host.c_str(), stoi(uri.Port));
auto res = cli.Post("/register", Headers(), params, conf.dump(), "text/json");
spdlog::debug("{} {} registry res from cloud : {}", __FILE__, __LINE__, res->body);
ret = json::parse(res->body);
}catch(exception &e) {
ret["code"] = -1;
string msg = string(__FILE__) + ":" + to_string(__LINE__) + string(": registry exception - ") + e.what();
ret["msg"] = msg;
spdlog::error(msg);
} }
// /Client cli; // /Client cli;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论