提交 3caa6fab authored 作者: blu's avatar blu

new upload script with /mnt/sd card as backup

上级 7b32b236
...@@ -6,5 +6,5 @@ ...@@ -6,5 +6,5 @@
</script> </script>
<video id="example_video_1" class="video-js vjs-default-skin" controls preload="true" width="640" height="320" <video id="example_video_1" class="video-js vjs-default-skin" controls preload="true" width="640" height="320"
data-setup='{"autoplay": "true"}'> data-setup='{"autoplay": "true"}'>
<source src="rtmp://qz.videostreaming.ilabservice.cloud:1935/hls/D72158932" type='flv' /> <source src="./output.mp4" type='mp4' />
</video> </video>
\ No newline at end of file
import requests, glob, os, json, socket, datetime, time, sys import requests, glob, os, json, socket, datetime, time, sys, logging, shutil
from pathlib import Path
import pdb
import traceback
FILE_PATH = os.getenv('FILE_PATH', '/var/data/evsuits/failed_events/') logger = logging.getLogger(__file__)
VIDEO_PATH = os.getenv('VIDEO_PATH', '/root/work/opencv-pocs/opencv-motion-detect/') logger.setLevel(logging.INFO)
API_ADDR=os.getenv('API_ADDR', 'http://evcloudsvc.ilabservice.cloud:10009/upload/evtvideos/') ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter(('[%(asctime)s][uploader][%(lineno)d][%(levelname)s] %(message)s')))
logger.addHandler(ch)
## constants derived from crontab and evslicer
kSchedIntervalSec = 3 * 60
kSliceIntervalSec = 30
## storage water marks
kSDCardFreeMBWMLow = 300
kSDCardFreeMBWMHigh = 1024 # not used, instead we use timeshift below
## clean from the oldest files, total 1 hour
kSDCardCleanupTimeShiftMinutes = 60
kSDCardVideoPath = '/mnt/sd/videos'
kFlashVideoPath = '/root/work/opencv-pocs/opencv-motion-detect'
kJobJsonPath = '/var/data/evsuits/failed_events'
kAPIServerAddr=os.getenv('API_ADDR', 'http://evcloudsvc.ilabservice.cloud:10009/upload/evtvideos/')
def list_files_by_ext(dir, ext):
out = glob.glob("{}/*.{}".format(dir, ext))
out.sort(key=os.path.getmtime)
logger.info(out)
return out
def list_files_by_mtime(dir, ext, tsn, ten):
'''
List files of type _ext_ under _dir_ with mtime in range (now - _tsn_, now - _ten_) in seconds
_tsn_ must be greater than _ten_
Parameters:
dir (string): directory to look at
ext (string): file extension
tsn (int): seconds from now as the left side of range
ten (int): seconds from now as the right side of range
Return:
list of sorted file names, in ascending mtime order (older first)
History: v0.1 created by Bruce.Lu at 20200412
'''
ret = []
now = None
if sys.version_info[0] < 3:
now = time.mktime(datetime.datetime.now().timetuple())
else:
now = datetime.datetime.now().timestamp()
if tsn <= ten or tsn <=0:
logger.error("invalid time range {} {}", tsn ,ten)
return ret
logger.info("now: {}".format(now))
for item in Path(dir).glob('*.'+ ext):
if item.is_file():
mts = item.stat().st_mtime
logger.debug('{} {}'.format(item.absolute(), mts))
if mts >= now -tsn and mts <= now -ten:
logger.debug("matched: {} {} {}".format(item.absolute(), now - mts, item.name))
ret.append(str(item.absolute()))
ret.sort(key=os.path.getmtime)
return ret
def backup(files, target):
for f in files:
logger.info('backup {} to {}'.format(f, target))
pd = os.path.dirname(f)
base = os.path.basename(f)
subd = pd[pd.rfind('/'):]
subd = target + subd
if not os.path.exists(subd):
if sys.version_info[0] < 3:
os.system('mkdir -p '+subd)
else:
os.makedirs(subd, exist_ok = True)
subd += '/' + base
if not os.path.exists(subd):
shutil.copyfile(f, subd)
else:
logger.info("skipped backup: {}".format(f))
def cleanup(target):
st = os.statvfs(target)
mbfree = st.f_bavail * st.f_bsize/1024/1024
logger.info("mb free: {}\n{}".format(mbfree, st))
if mbfree > kSDCardFreeMBWMLow:
pass
else:
# find out the oldest N files in SD card
# ALERT: must use generator to save memory, otherwise it prons to OOM by stacks of files on SD card
number_files = int(kSDCardCleanupTimeShiftMinutes * 60/kSliceIntervalSec)
oldest_n_mtime = [] # ten digits
oldest_n_files = []
cnt = 0
for dir_ in Path(target).iterdir():
if not dir_.is_dir():
continue
for file in Path(dir_.absolute()):
if not file.is_file():
continue
mts = file.stat().st_mtime
fn = file.absolute()
## fill the list
if cnt < number_files:
oldest_n_mtime[cnt] = mts
oldest_n_files[cnt] = fn
else:
# argmax, not the numpy way but maybe sufficient fast
v = max(oldest_n_mtime)
k = oldest_n_mtime.index(v)
if mts < v:
oldest_n_mtime[k] = v
oldest_n_files[k] = fn
cnt += 1
# clean up
for f in oldest_n_files:
os.remove(f)
def lookup(src, file, backup):
file = src+ '/' + file
logger.info('lookup {} with backup {}'.format(file, backup))
if os.path.exists(file):
logger.info('found : %s', file)
return file
pd = os.path.dirname(file)
base = os.path.basename(file)
subd = pd[pd.rfind('/'):]
subd = backup + subd + '/' + base
if os.path.exists(subd):
logger.info('found : %s', subd)
return subd
logger.error('file not found: %s', file)
return None
#{"fileNames":["slices/237808840_3/20200304_124406.mp4","slices/237808840_3/20200304_124437.mp4"],"params":{"cameraId":"237808840","endTime":"1583297091","headOffset":"0","startTime":"1583297066","tailOffset":"0","type":"event"}}
def upload_file(file): def upload_file(file):
success = False success = False
fileNames = [] fileNames = []
try: try:
with open(file) as jf: with open(file) as jf:
data = json.load(jf) data = json.load(jf)
print("data: ", data, file) logger.info("prepare uploading: %s, %s", file, data)
fileNames = ["{}{}".format(VIDEO_PATH, x) for x in data["fileNames"]] fileNames = [lookup(kFlashVideoPath, x, kSDCardVideoPath) for x in data["fileNames"]]
if not any(fileNames):
raise IOError("has video file lost in {}".format(data))
if len(fileNames) < 0:
logger.error("no files fond in: %s", data);
return success
# check last file
lastFile = fileNames[len(fileNames) -1]
if not os.path.exists(lastFile):
logger.error("file not exist: %s", lastFile)
return success
now = time.mktime(datetime.datetime.now().timetuple())
if now - Path(lastFile).stat().st_mtime < kSliceIntervalSec:
logger.error("file is still in recording %s", lastFile)
return success
blob = [('files', open(f, 'rb')) for f in fileNames] blob = [('files', open(f, 'rb')) for f in fileNames]
url = API_ADDR + data["params"]["cameraId"] + '?' + '&'.join(["{}={}".format(k,v) for k,v in data["params"].items()]) url = kAPIServerAddr + data["params"]["cameraId"] + '?' + '&'.join(["{}={}".format(k,v) for k,v in data["params"].items()])
r = requests.post(url, files = blob, timeout = 60*10) logger.info("upload with url: %s", url)
print(file, fileNames, r.status_code, r.text) r = requests.post(url, files = blob, timeout = 10)
if(r.status_code == 200 and ('code' in r.json()) and r.json()['code'] == 0): if(r.status_code == 200 and ('code' in r.json()) and r.json()['code'] == 0):
success = True success = True
else:
logger.error("failed to make http request: %s", file)
if success: if success:
logger.info("upload success :%s", r.text)
os.remove(file) os.remove(file)
# for f in fileNames:
# try:
# os.remove(f)
# except:
# pass
except requests.exceptions.ConnectionError as e: except requests.exceptions.ConnectionError as e:
print ("Error Connecting:",e) logger.error("Error Connecting: {}".format(e))
except requests.exceptions.Timeout as e: except requests.exceptions.Timeout as e:
print ("Timeout Error:",e) logger.error("Timeout Error: {}".format(e))
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print ("Request Exception",e) logger.error("Request Exception: {}".format(e))
except IOError as e: except IOError as e:
success = True success = True
# file io error # file io error
os.remove(file) os.remove(file)
print("IOError Exception:", e) logger.error("IOError Exception: {}".format(e))
except Exception as e: except Exception as e:
print("General Exception:", e) logger.error("General Exception: {}".format(e))
return success return success
def list_files(dir): # 1. backup & cleanup
out = glob.glob("{}/*.json".format(dir)) # 2. upload jobs
out.sort(key=os.path.getmtime)
# prevent memory overflow
if len(out) > 5:
out = out[:5]
return out
if __name__ == "__main__": if __name__ == "__main__":
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #list_files_by_ext('.', 'json')
s.bind(('localhost', 45431)) # make sufficient margins for long running uploader & slicer
files = list_files(FILE_PATH) #logger.info(files)
for f in files: logger.setLevel(logging.INFO)
now = time.mktime(datetime.datetime.now().timetuple()) try:
mts = os.path.getmtime(f) #
# defer those files may still in recording cleanup('/mnt/sd/videos')
if now - mts < 35: # walk subdir of kFlashVideoPath
continue files = []
res = upload_file(f) for dir_ in Path(kFlashVideoPath + '/' + 'slices').iterdir():
if res: if dir_.is_dir():
continue logger.info("check subd: {}".format(dir_.absolute()))
else: files.extend(list_files_by_mtime(dir_.absolute(), 'mp4', kSchedIntervalSec * 2, kSliceIntervalSec * 2))
# TODO: report logger.info("files to backup: {}".format(files[:10]))
break # backup
backup(files, kSDCardVideoPath)
#
files = list_files_by_mtime(kJobJsonPath, 'json', 9999999, 0)
logger.info("files to upload: {}".format(files))
for f in files:
upload_file(f)
except Exception as e:
logger.error('failure: {}'.format(e))
extype, value, tb = sys.exc_info()
traceback.print_exc()
\ No newline at end of file
import requests, glob, os, json, socket, datetime, time, sys
import pdb
import traceback
FILE_PATH = os.getenv('FILE_PATH', '/var/data/evsuits/failed_events/')
VIDEO_PATH = os.getenv('VIDEO_PATH', '/root/work/opencv-pocs/opencv-motion-detect/')
API_ADDR=os.getenv('API_ADDR', 'http://evcloudsvc.ilabservice.cloud:10009/upload/evtvideos/')
#{"fileNames":["slices/237808840_3/20200304_124406.mp4","slices/237808840_3/20200304_124437.mp4"],"params":{"cameraId":"237808840","endTime":"1583297091","headOffset":"0","startTime":"1583297066","tailOffset":"0","type":"event"}}
def upload_file(file):
success = False
fileNames = []
try:
with open(file) as jf:
data = json.load(jf)
print("data: ", data, file)
fileNames = ["{}{}".format(VIDEO_PATH, x) for x in data["fileNames"]]
blob = [('files', open(f, 'rb')) for f in fileNames]
url = API_ADDR + data["params"]["cameraId"] + '?' + '&'.join(["{}={}".format(k,v) for k,v in data["params"].items()])
r = requests.post(url, files = blob, timeout = 60*10)
print(file, fileNames, r.status_code, r.text)
if(r.status_code == 200 and ('code' in r.json()) and r.json()['code'] == 0):
success = True
if success:
os.remove(file)
# for f in fileNames:
# try:
# os.remove(f)
# except:
# pass
except requests.exceptions.ConnectionError as e:
print ("Error Connecting:",e)
except requests.exceptions.Timeout as e:
print ("Timeout Error:",e)
except requests.exceptions.RequestException as e:
print ("Request Exception",e)
except IOError as e:
success = True
# file io error
os.remove(file)
print("IOError Exception:", e)
except Exception as e:
print("General Exception:", e)
return success
def list_files(dir):
out = glob.glob("{}/*.json".format(dir))
out.sort(key=os.path.getmtime)
# prevent memory overflow
if len(out) > 5:
out = out[:5]
return out
if __name__ == "__main__":
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('localhost', 45431))
files = list_files(FILE_PATH)
for f in files:
now = time.mktime(datetime.datetime.now().timetuple())
mts = os.path.getmtime(f)
# defer those files may still in recording
if now - mts < 35:
continue
res = upload_file(f)
if res:
continue
else:
# TODO: report
break
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论