提交 61740a7f authored 作者: blu's avatar blu

evdameon, evcloudsvc: subsystem monitor thread and defered uploading

上级 fad3d0bd
......@@ -1044,7 +1044,6 @@ public:
if(!this->eventQueue.empty()) {
evt = this->eventQueue.front();
this->eventQueue.pop();
spdlog::error("{} msg queue is full!", selfId);
}
}
......@@ -1075,22 +1074,26 @@ public:
}
if(tse < first) {
spdlog::info("{} thEventHandler event range ({}, {}) is not in range ({}, {}).", selfId, tss, tse, first, end);
spdlog::error("{} thEventHandler event range ({}, {}) is not in local storage range ({}, {}).", selfId, tss, tse, first, end);
continue;
}
else if(first == 0||tse > end) {
spdlog::info("{} thEventHandler event range ({}, {}) is not in range ({}, {}), resched to run in {}s.", selfId, tss, tse, first, end, this->seconds + 5);
auto th = thread([evt, this]() {
this_thread::sleep_for(chrono::seconds(this->seconds + 5));
lock_guard<mutex> lock(this->mutEvent);
this->eventQueue.push(evt);
if(eventQueue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQueue.pop();
}
cvEvent.notify_one();
});
spdlog::warn("{} thEventHandler event range ({}, {}) is not in local storage range ({}, {}), it is in future. scheduled", selfId, tss, tse, first, end, this->seconds + 5);
// auto th = thread([evt, this]() {
// this_thread::sleep_for(chrono::seconds(this->seconds + 5));
// lock_guard<mutex> lock(this->mutEvent);
// this->eventQueue.push(evt);
// if(eventQueue.size() > MAX_EVENT_QUEUE_SIZE) {
// eventQueue.pop();
// spdlog::error("{} msg queue is full!", selfId);
// }
// cvEvent.notify_one();
// });
th.detach();
// th.detach();
// continue;
}else if(tse > end*10){
spdlog::error("{} thEventHandler invalid event range ({}, {})", selfId, tss, tse);
continue;
}
......@@ -1157,9 +1160,9 @@ public:
try {
ofstream ofs(fname);
ofs << postArgs;
for(auto &f:fileNames) {
fs::copy(fs::path(string(f)),fs::path(dirDest));
}
// for(auto &f:fileNames) {
// fs::copy(fs::path(string(f)),fs::path(dirDest));
// }
}
catch(exception &e) {
spdlog::error("evcloudsvc {} {}:{} exception: {}", selfId, __FILE__, __LINE__, e.what());
......
import requests, glob, os, json, socket, datetime
FILE_PATH = os.getenv('FILE_PATH', '/var/data/evsuits/failed_events')
FILE_PATH = os.getenv('FILE_PATH', '/root/work/opencv-pocs/opencv-motion-detect/')
API_ADDR=os.getenv('API_ADDR', 'http://evcloudsvc.ilabservice.cloud:10009/upload/evtvideos/')
#{"fileNames":["slices/237808840_3/20200304_124406.mp4","slices/237808840_3/20200304_124437.mp4"],"params":{"cameraId":"237808840","endTime":"1583297091","headOffset":"0","startTime":"1583297066","tailOffset":"0","type":"event"}}
......@@ -10,32 +10,30 @@ def upload_file(file):
try:
with open(file) as jf:
data = json.load(jf)
fileNames = ["{}{}".format(FILE_PATH, x[x.rfind('/'):]) for x in data["fileNames"]]
fileNames = ["{}{}".format(FILE_PATH, x) for x in data["fileNames"]]
blob = [('files', open(f, 'rb')) for f in fileNames]
url = API_ADDR + data["params"]["cameraId"] + '?' + '&'.join(["{}={}".format(k,v) for k,v in data["params"].items()])
r = requests.post(url, files = blob)
print(file, fileNames, r.status_code, r.text)
if(r.status_code == 200 and ('code' in r.json()) and r.json()['code'] == 0):
success = True
try:
os.remove()
except:
pass
if success:
os.remove(file)
for f in fileNames:
try:
os.remove(f)
except:
pass
# for f in fileNames:
# try:
# os.remove(f)
# except:
# pass
except requests.exceptions.ConnectionError as e:
print ("Error Connecting:",e)
except requests.exceptions.Timeout as e:
print ("Timeout Error:",e)
except requests.exceptions.RequestException as e:
print ("",e)
print ("Request Exception",e)
except IOError as e:
success = True
# file io error
os.remove(file)
print(e)
except Exception as e:
print(e)
......@@ -45,6 +43,9 @@ def upload_file(file):
def list_files(dir):
out = glob.glob("{}/*.json".format(dir))
out.sort(key=os.path.getmtime)
# prevent memory overflow
if len(out) > 5:
out = out[:5]
return out
if __name__ == "__main__":
......
import requests, glob, os, json, socket, datetime
FILE_PATH = os.getenv('FILE_PATH', '/var/data/evsuits/failed_events')
API_ADDR=os.getenv('API_ADDR', 'http://evcloudsvc.ilabservice.cloud:10009/upload/evtvideos/')
#{"fileNames":["slices/237808840_3/20200304_124406.mp4","slices/237808840_3/20200304_124437.mp4"],"params":{"cameraId":"237808840","endTime":"1583297091","headOffset":"0","startTime":"1583297066","tailOffset":"0","type":"event"}}
def upload_file(file):
success = False
fileNames = []
try:
with open(file) as jf:
data = json.load(jf)
fileNames = ["{}{}".format(FILE_PATH, x[x.rfind('/'):]) for x in data["fileNames"]]
blob = [('files', open(f, 'rb')) for f in fileNames]
url = API_ADDR + data["params"]["cameraId"] + '?' + '&'.join(["{}={}".format(k,v) for k,v in data["params"].items()])
r = requests.post(url, files = blob)
print(file, fileNames, r.status_code, r.text)
if(r.status_code == 200 and ('code' in r.json()) and r.json()['code'] == 0):
success = True
try:
os.remove()
except:
pass
if success:
os.remove(file)
for f in fileNames:
try:
os.remove(f)
except:
pass
except requests.exceptions.ConnectionError as e:
print ("Error Connecting:",e)
except requests.exceptions.Timeout as e:
print ("Timeout Error:",e)
except requests.exceptions.RequestException as e:
print ("",e)
except IOError as e:
success = True
print(e)
except Exception as e:
print(e)
return success
def list_files(dir):
out = glob.glob("{}/*.json".format(dir))
out.sort(key=os.path.getmtime)
return out
if __name__ == "__main__":
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('localhost', 45431))
files = list_files(FILE_PATH)
for f in files:
now = datetime.datetime.now().timestamp()
mts = os.path.getmtime(f)
# defer those files may still in recording
if now - mts < 35:
continue
res = upload_file(f)
if res:
continue
else:
# TODO: report
break
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论