提交 326e3797 authored 作者: blu's avatar blu

nop

上级 d4273563
...@@ -95,7 +95,8 @@ ...@@ -95,7 +95,8 @@
"csignal": "cpp", "csignal": "cpp",
"__functional_03": "cpp", "__functional_03": "cpp",
"variant": "cpp", "variant": "cpp",
"format": "cpp" "format": "cpp",
"*.ipp": "cpp"
}, },
"C_Cpp.errorSquiggles": "Disabled", "C_Cpp.errorSquiggles": "Disabled",
"git.ignoreLimitWarning": true "git.ignoreLimitWarning": true
......
...@@ -37,6 +37,7 @@ const string mqtt_topic_report = "evsuits/report/"; ...@@ -37,6 +37,7 @@ const string mqtt_topic_report = "evsuits/report/";
const string mqtt_topic_subscribe = "evsuits/request/"; const string mqtt_topic_subscribe = "evsuits/request/";
const string mqtt_topic_lastwill = "evsuits/lastwill/"; const string mqtt_topic_lastwill = "evsuits/lastwill/";
const char kcArrMqttUrl[] = "tcp://admin:vJ3zHqWrHbrqxVMT@evcloudsvc.ilabservice.cloud:11883"; const char kcArrMqttUrl[] = "tcp://admin:vJ3zHqWrHbrqxVMT@evcloudsvc.ilabservice.cloud:11883";
const string kMqttClientId = "evcloudsvc";
class EvCloudSvc { class EvCloudSvc {
private: private:
...@@ -75,10 +76,12 @@ private: ...@@ -75,10 +76,12 @@ private:
"msg": "[autogen]evcloudsvc detects cluster mgr {} offline for ipc {}", "msg": "[autogen]evcloudsvc detects cluster mgr {} offline for ipc {}",
"status": "active", "status": "active",
"time": 1591085518, "time": 1591085518,
"type": "report" "type": "report",
"ipc": "ipcSN"
} }
)"_json; )"_json;
data["time"] = now; data["time"] = now;
data["ipc"] = ipcSn;
bool hasIssue = false; bool hasIssue = false;
if(v.count("mgrTerminal") == 0|| v["mgrTerminal"].size() == 0||v["mgrTerminal"].count("online") == 0) { if(v.count("mgrTerminal") == 0|| v["mgrTerminal"].size() == 0||v["mgrTerminal"].count("online") == 0) {
data["catId"] = "AV_NO_CONFIG"; data["catId"] = "AV_NO_CONFIG";
...@@ -107,6 +110,7 @@ private: ...@@ -107,6 +110,7 @@ private:
auto data = j; auto data = j;
if(catId != EV_MSG_REPORT_CATID_AVMODOFFLINE) { if(catId != EV_MSG_REPORT_CATID_AVMODOFFLINE) {
if(ret.count(catId) == 0) { if(ret.count(catId) == 0) {
data["ipc"] = ipcSn;
ret[catId] = data; ret[catId] = data;
} }
} }
...@@ -118,6 +122,28 @@ private: ...@@ -118,6 +122,28 @@ private:
return ret; return ret;
} }
void issueChecker(string ipcSn, json &ipcStatus){
auto issues = buildIssuesForIpc(ipcSn, ipcStatus);
if(allIssues.count(ipcSn) == 0) {
allIssues[ipcSn] = json();
}
bool report = false;
if(issues.size() > 0) {
for(auto &[kk,vv]: issues.items()) {
if(allIssues[ipcSn].count(kk) == 0) {
report = true;
break;
}
}
}
if(report) {
/// TODO:
allIssues[ipcSn] = issues;
MqttMgr::report_response_args(mqClient, mqtt_topic_report + kMqttClientId, 0, "ok", "issues", "", issues);
}
}
void storeReportMsg(string selfId, string ipcSn, json &ipcStatus, string catId, string msg, bool active) void storeReportMsg(string selfId, string ipcSn, json &ipcStatus, string catId, string msg, bool active)
{ {
...@@ -146,7 +172,7 @@ private: ...@@ -146,7 +172,7 @@ private:
ipcStatus["lastNReports"].erase(0); ipcStatus["lastNReports"].erase(0);
} }
// check status // check status
issueChecker(ipcSn, ipcStatus);
} }
int buildIpcStatus(json &conf) int buildIpcStatus(json &conf)
...@@ -822,23 +848,8 @@ private: ...@@ -822,23 +848,8 @@ private:
// } // }
} }
auto issues = buildIssuesForIpc(ipcSn, ipcStatus); // TODO: report
if(allIssues.count(ipcSn) == 0) { issueChecker(ipcSn, ipcStatus);
allIssues[ipcSn] = json();
}
bool report = false;
if(issues.size() > 0) {
for(auto &[kk,vv]: issues.items()) {
if(allIssues[ipcSn].count(kk) == 0) {
report = true;
break;
}
}
}
if(report) {
/// TODO:
}
} }
else { else {
spdlog::error("{} can't find ipc for report mod {}", devSn, modId); spdlog::error("{} can't find ipc for report mod {}", devSn, modId);
...@@ -973,7 +984,7 @@ private: ...@@ -973,7 +984,7 @@ private:
json jmeta = json::parse(meta); json jmeta = json::parse(meta);
if(jmeta["type"] == EV_MSG_META_TYPE_REPORT) { if(jmeta["type"] == EV_MSG_META_TYPE_REPORT) {
json data = json::parse(body2str(body[3])); json data = json::parse(body2str(body[3]));
spdlog::warn("{} received report msg from {}: {}", devSn, selfId, data.dump()); spdlog::info("{} received report msg from {}: {}", devSn, selfId, data.dump());
processReportMsg(selfId, data); processReportMsg(selfId, data);
} }
else { else {
......
...@@ -85,38 +85,11 @@ json make_online_msg(string sn) ...@@ -85,38 +85,11 @@ json make_online_msg(string sn)
void on_connlost(void *context, char *cause) void on_connlost(void *context, char *cause)
{ {
MqttHelper *self = (MqttHelper *) context; MqttHelper *self = (MqttHelper *) context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc = -1; int rc = -1;
spdlog::error("mqtt connection lost: {}", cause? cause: "unkown reason"); spdlog::error("mqtt connection lost: {}", cause? cause: "unkown reason");
// MqttHelper::AsyncResult as;
spdlog::warn("reconnecting {}", self->addr); // as.state = MqttHelper::State::None;
conn_opts.keepAliveInterval = 20; // self->state.set_value(as);
conn_opts.cleansession = 1;
int cnt = 0;
const int kRetryIntervalMaxSeconds = 60 * 60;
const int kRetryIntervalMinSeconds = 10;
time_t t;
srand((unsigned) time(&t));
int sleepTime = 10;
while(rc != MQTTASYNC_SUCCESS){
if ((rc = MQTTAsync_connect(self->client, &conn_opts)) != MQTTASYNC_SUCCESS) {
string msg = fmt::format("mqtt failed to reconnect {}: {}",self->mqtt_url, MQTTAsync_strerror(rc));
spdlog::error(msg);
//self->state.set_exception(StrException(msg));
sleepTime = cnt* kRetryIntervalMinSeconds;
if(sleepTime > kRetryIntervalMaxSeconds) {
sleepTime = rand()%kRetryIntervalMaxSeconds;
if(sleepTime < kRetryIntervalMinSeconds){
sleepTime = kRetryIntervalMinSeconds;
}
}
this_thread::sleep_for(chrono::seconds(sleepTime));
sleepTime++;
}else{
break;
}
}
} }
int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *message) int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *message)
......
...@@ -132,10 +132,7 @@ public: ...@@ -132,10 +132,7 @@ public:
void *extra; void *extra;
}; };
/// State state = State::None; // 0: initial ; 1: ready; 2: disconnected; 3: destroyed void connect(){
promise<AsyncResult> state;
MqttHelper(string mqtt_url, string id, string topic_lastwill, string topic_report, int kai = 20, int cs = 1): mqtt_url(mqtt_url), id(id), topic_lastwill(topic_lastwill), topic_report(topic_report)
{
// make connection, throw excpetions // make connection, throw excpetions
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
...@@ -147,7 +144,7 @@ public: ...@@ -147,7 +144,7 @@ public:
addr = "tcp://" + uri.Host + ":" + uri.Port; addr = "tcp://" + uri.Host + ":" + uri.Port;
} }
spdlog::debug("mqtt url: proto: {}, host: {}, port: {}, user: {}, pass: {}", uri.Protocol, uri.Host, uri.Port, uri.User, uri.Password); spdlog::info("mqtt url: proto: {}, host: {}, port: {}, user: {}, pass: {}", uri.Protocol, uri.Host, uri.Port, uri.User, uri.Password);
if(MQTTASYNC_SUCCESS != MQTTAsync_create(&client, addr.c_str(), (kIdPrefix+id).c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL)) { if(MQTTASYNC_SUCCESS != MQTTAsync_create(&client, addr.c_str(), (kIdPrefix+id).c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL)) {
msg = "failed to async create mqtt"; msg = "failed to async create mqtt";
...@@ -181,6 +178,7 @@ public: ...@@ -181,6 +178,7 @@ public:
} }
conn_opts.context = this; conn_opts.context = this;
conn_opts.will = &lwm; conn_opts.will = &lwm;
conn_opts.automaticReconnect = true;
spdlog::info("trying to connect to {}", addr); spdlog::info("trying to connect to {}", addr);
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
msg = fmt::format("Failed to start connect: {}", MQTTAsync_strerror(rc)); msg = fmt::format("Failed to start connect: {}", MQTTAsync_strerror(rc));
...@@ -188,6 +186,8 @@ public: ...@@ -188,6 +186,8 @@ public:
throw StrException(msg); throw StrException(msg);
} }
spdlog::warn("mqtt connect: get_future");
auto ar = state.get_future().get(); auto ar = state.get_future().get();
if(ar.state != State::Ready) { if(ar.state != State::Ready) {
string msg = fmt::format("ailed to initilaze mqtt: {}", MQTTAsync_strerror(ar.rc)); string msg = fmt::format("ailed to initilaze mqtt: {}", MQTTAsync_strerror(ar.rc));
...@@ -198,11 +198,17 @@ public: ...@@ -198,11 +198,17 @@ public:
spdlog::info("initialze mqtt {} successfully", addr); spdlog::info("initialze mqtt {} successfully", addr);
} }
// reset state // reset state
state = promise<AsyncResult>(); state = promise<AsyncResult>();
} }
/// State state = State::None; // 0: initial ; 1: ready; 2: disconnected; 3: destroyed
promise<AsyncResult> state;
MqttHelper(string mqtt_url, string id, string topic_lastwill, string topic_report, int kai = 20, int cs = 1): mqtt_url(mqtt_url), id(id), topic_lastwill(topic_lastwill), topic_report(topic_report)
{
connect();
}
// can be called only state // can be called only state
int subscribe(string topic, on_msg_fun_ptr_t on_msg, bool force = false) int subscribe(string topic, on_msg_fun_ptr_t on_msg, bool force = false)
{ {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论