提交 e888e357 authored 作者: zw.wang's avatar zw.wang

feat: [filter] 移动侦测过滤模块

上级 16780ab7
import cv2
import datetime
import os
from ils_common_video.capability.state_mechine import FrameState
from intelab_python_sdk.logger import log, log_init
resize_shape = '540p'
shape_dict = {'1080p': {'height': 1080, 'width': 1920},
'540p': {'height': 540, 'width': 960},
'480p': {'height': 480, 'width': 800}}
sample_frequency = 3
thresh = 30
area = 200
duration = 4
limit_size = 1024 * 1024 * 1 # b * k * m
def verify_movement(video_path, window_dict):
log.info("开始分析视频 {}".format(video_path))
pre_frame = None # 总是取前一帧做为背景(不用考虑环境影响)
result_list = []
video_path_name = video_path.rsplit('.', 1)[0]
try:
video = cv2.VideoCapture(video_path)
except Exception as e:
log.error("视频路径打开出错,%s" % e)
return False
bad_frame_count = 0
frame_state = FrameState('noalarm', 0, 0)
video_height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
video_width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
fps = float(int(round(video.get(cv2.CAP_PROP_FPS), 0)))
print("文件帧率为 {}".format(fps))
total_frame_count = 0
camera_seconds = 0 # 时间计数
verify_frame_count = 0
height = shape_dict[resize_shape]['height']
width = shape_dict[resize_shape]['width']
try:
k = (height * width) / (video_height * video_width)
while True:
# if len(result_list)!= 0:
# return result_list
if total_frame_count % (fps // sample_frequency) == 0:
# 表示取帧采样
flag = True
if total_frame_count % fps == 0:
camera_seconds += 1
log.debug('================================第 {} s!================================='.format(
camera_seconds))
if not flag:
# 帧跳过,不解码
ret = video.grab()
if ret:
# log.debug('这帧跳过')
total_frame_count += 1
else:
log.warning("获取帧失败")
if frame_state.alarm_starttime != None and frame_state.alarm_endtime == None:
result_list.append((frame_state.alarm_starttime, -1))
log.debug("当前视频是 {}, 分析的结果是 {}".format(video_path, result_list))
break
else:
# 取帧并解码
ret, frame = video.read()
count_area_list = []
move_flag = False
if ret:
total_frame_count += 1 # 帧数总计
verify_frame_count += 1 # 分析帧计数
flag = False # 解码取帧标记
dim = (width, height)
frame = cv2.resize(frame, dim) # 根据需求resize 图像
# print(int(window_dict['minY'] * height), int(window_dict['maxY'] * height),
# int(window_dict['minX'] * width), int(window_dict['maxX'] * width))
if window_dict:
cur_frame = frame[int(window_dict['minY'] * height):int(window_dict['maxY'] * height),
int(window_dict['minX'] * width):int(window_dict['maxX'] * width)] # 截取需要分析的窗口
else:
cur_frame = frame
gray_img = cv2.cvtColor(cur_frame, cv2.COLOR_BGR2GRAY) # 将图片转为灰度图片
gray_img = cv2.GaussianBlur(gray_img, (21, 21), 0) # 将灰度图片用高斯滤波器处理
if pre_frame is None:
pre_frame = gray_img
# continue
else:
if pre_frame.shape != gray_img.shape:
pre_frame = gray_img
continue
else:
img_delta = cv2.absdiff(pre_frame, gray_img) # 找出两个图片中不同的地方
thresh_frame = cv2.threshold(img_delta, thresh, 255, cv2.THRESH_BINARY)[1] # 第二个参数是可调整的
# cv2.imshow("thresh", thresh)
# out_put_edge = cv2.Canny(thresh, 50, 150)
# cv2.imshow("out_put_edge", out_put_edge)
contours, hierarchy = cv2.findContours(thresh_frame, cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)
for c in contours:
if cv2.contourArea(c) < area * k:
continue
else:
move_flag = True
break
for c in contours:
# print(cv2.contourArea(c))
count_area_list.append(cv2.contourArea(c))
count_area_list.append(0.1)
area_max = max(count_area_list)
if not move_flag: # 设置敏感度
log.debug("前一帧和当前帧一样了, 没有东西在动!,变化的区域最大值为{}, {}帧".format(area_max, total_frame_count))
if frame_state.prestate == 'prealarm':
# 表示预报警转未报警
log.debug("前一刻状态是 {}".format(frame_state.prestate))
frame_state.prestate = frame_state.prealarm_to_noalarm('prealarm',
duration * sample_frequency)
if frame_state.prestate == 'noalarm':
log.debug("事件没真正开始就结束")
log.debug("当前状态是 {}".format(frame_state.prestate))
if frame_state.prestate == 'conalarm':
log.debug("前一刻状态是 {}".format(frame_state.prestate))
frame_state.prestate = frame_state.conalarm_to_noalarm('conalarm',
15 * sample_frequency)
if frame_state.prestate == 'noalarm':
frame_state.alarm_endtime = camera_seconds - 15
log.debug("事件结束,开始时间为 {}s ,结束时间为 {}s ".format(frame_state.alarm_starttime,
frame_state.alarm_endtime))
result_list.append((frame_state.alarm_starttime, frame_state.alarm_endtime))
frame_state.alarm_starttime = None
frame_state.alarm_endtime = None
log.debug("当前状态是 {}".format(frame_state.prestate))
else:
log.debug("有东西在动! 变化的区域最大值为{}, {}帧".format(area_max, total_frame_count))
if frame_state.prestate == 'prealarm':
# 表示预报警转报警
log.debug("前一刻状态是 {}".format(frame_state.prestate))
frame_state.prestate = frame_state.prealarm_to_conalarm('prealarm',
duration * sample_frequency)
if frame_state.prestate == 'conalarm':
log.debug("事件真正开始")
frame_state.alarm_starttime = camera_seconds - duration
log.debug(
"事件真正开始, 事件开始时间为 {} s,结束时间为 {}s".format(frame_state.alarm_starttime,
frame_state.alarm_endtime))
cv2.imwrite(video_path_name+'.jpg', cur_frame)
return True, camera_seconds # 事件真正开始,视频标志为移动
log.debug("当前状态是 {}".format(frame_state.prestate))
if frame_state.prestate == 'noalarm':
# 表示无报警转预报警
log.debug("前一刻状态是 {}".format(frame_state.prestate))
frame_state.prestate = frame_state.noalarm_to_prealarm('noalarm', 1)
log.debug("当前状态是 {}".format(frame_state.prestate))
if frame_state.prestate == 'conalarm':
# 表示持续报警转持续报警
log.debug("前一刻状态是 {}".format(frame_state.prestate))
frame_state.prestate = frame_state.conalarm_to_conalarm('conalarm',
duration * sample_frequency)
log.debug("当前状态是 {}".format(frame_state.prestate))
pre_frame = gray_img
else:
log.debug("获取帧失败")
if frame_state.alarm_starttime != None and frame_state.alarm_endtime == None:
result_list.append((frame_state.alarm_starttime, -1))
log.debug("当前视频是 {}, 分析的结果是 {}".format(video_path, result_list))
break
except Exception as e:
log.error("分析过程发生错误,视频文件 %s", video_path)
log.exception(e)
video.release()
if len(result_list) == 0:
return False, camera_seconds
else:
return True, camera_seconds
def file_filter(file_path, window_dict):
file_size = os.path.getsize(file_path)
if file_size <= limit_size:
return file_path, -1, 0
else:
valid_flag, camera_seconds = verify_movement(file_path, window_dict)
if valid_flag:
return file_path, 1, camera_seconds
else:
return file_path, -1, camera_seconds
if __name__ == '__main__':
log_init(__name__, True)
start_time = datetime.datetime.now()
result = file_filter('a.avi',
{})
end_time = datetime.datetime.now()
cost_time = (end_time - start_time).total_seconds()
print(result, cost_time)
class FrameState:
def __init__(self, prestate, count_forward, count_backward):
self.prestate = prestate
self.count_forward = count_forward
self.count_backward = count_backward
self.alarm_starttime = None
self.alarm_endtime = None
def noalarm_to_prealarm(self, state, duration):
# 未报警状态转移到预报警状态
"""
:param state: 前一帧状态,
:param duration:
:return:
"""
self.count_forward += 1
if self.count_forward == duration:
newState = 'prealarm'
self.count_backward = 0
self.count_forward = 0
else: # 状态不变
newState = state
return newState
def prealarm_to_conalarm(self, state, duration):
# 预报警状态转移到持续报警状态
self.count_forward += 1
if self.count_forward == duration:
newState = 'conalarm'
self.count_backward = 0
self.count_forward = 0
else:
# 状态不变
newState = state
return newState
def conalarm_to_noalarm(self, state, duration):
# 持续报警状态转移到未报警状态
self.count_forward += 1
if self.count_forward == duration:
newState = 'noalarm'
self.count_backward = 0
self.count_forward = 0
else:
# 状态不变
newState = state
return newState
def prealarm_to_noalarm(self, state, duration):
# 预报警状态转移到未报警状态
self.count_backward += 1
if self.count_backward == duration:
newState = 'noalarm'
self.count_backward = 0
self.count_forward = 0
else:
# 状态不变
newState = state
return newState
def conalarm_to_conalarm(self, state, duration):
# 预报警状态转移到未报警状态
self.count_backward += 1
if self.count_backward == duration:
newState = 'conalarm'
self.count_backward = 0
self.count_forward = 0
else:
# 状态不变
newState = state
return newState
if __name__ == "__main__":
frame_state = FrameState('initial', 0, 0)
frame_color_list = ['red', 'red', 'red', 'blue', 'blue', 'blue', 'blue', 'red',
'red', 'red', 'red', 'red', 'red', 'red', 'red', 'red', 'green', 'green', 'green',
'green']
for i in frame_color_list:
if frame_state.prestate == 'initial':
if i == 'red':
frame_state.prestate = frame_state.noalarm_to_prealarm('initial', 1)
print("当前状态是 {}".format(frame_state.prestate))
else:
pass
if frame_state.prestate == 'noalarm':
if i == 'red':
frame_state.prestate = frame_state.noalarm_to_prealarm('noalarm', 3)
print("当前状态是 {}".format(frame_state.prestate))
else:
pass
if frame_state.prestate == 'prealarm':
if i == 'red':
frame_state.prestate = frame_state.prealarm_to_conalarm('prealarm', 3)
print("当前状态是 {}".format(frame_state.prestate))
else:
frame_state.prestate = frame_state.prealarm_to_noalarm('prealarm', 3)
print("当前状态是 {}".format(frame_state.prestate))
if frame_state.prestate == 'conalarm':
if i == 'red':
frame_state.prestate = frame_state.conalarm_to_conalarm('conalarm', 3)
print("当前状态是 {}".format(frame_state.prestate))
else:
frame_state.prestate = frame_state.conalarm_to_noalarm('conalarm', 3)
print("当前状态是 {}".format(frame_state.prestate))
\ No newline at end of file
......@@ -4,3 +4,5 @@ PROCESSING_TOTAL_KEY = 'hk_isc:recording:processing:total'
ACCOUNT_TOKEN_KEY = 'eviz:account:app_key:{}:token'
DEVICE_SERIAL_KEY = 'eviz:account:device:{}'
UNVERIFIED_EVENT_QUEUE = 'UNVERIFIED_EVENT_QUEUE'
......@@ -166,7 +166,7 @@ def insert_video_info(cursor, conn, db_table, device_code, start_time, end_time,
def update_video_info(cursor, conn, db_table, video_id, status,
file_name=None, video_url=None,
video_resolution=None, recovered_time=None, retry_info=None,
next_retry_time=None, remark=None):
next_retry_time=None, remark=None, end_time=None):
sql = '''
update {}
set status = %s, update_time = now() {}
......@@ -183,6 +183,8 @@ def update_video_info(cursor, conn, db_table, video_id, status,
sub_set += ', next_retry_time="{}"'.format(next_retry_time)
if remark:
sub_set += ', remark="{}"'.format(remark)
if end_time:
sub_set += ', end_time="{}"'.format(end_time)
log.info(sql.format(db_table, sub_set), status, video_id)
cursor.execute(sql.format(db_table, sub_set), [status, video_id])
......@@ -206,7 +208,7 @@ def get_untreated_events(cursor, conn, db_table, camera_code, status=3, order_by
recovered_time,
video_url,
file_name,
retry_info, next_retry_time, remark
retry_info, next_retry_time, remark, biz_type, service_type
from {}
where device_code = %s
and create_time > date_sub(now(), interval 7 day)
......
......@@ -273,7 +273,7 @@ def send_unverified_file(pre_files, camera):
video_url = oss_upload_file('isc_record/' + file_name, video_file.full_path)
event_id = mysql.insert_video_info(
camera['db_table'], camera['device_code'], video_file.start_time, video_file.end_time,
camera['biz_type'], camera['service_type'], status=1,
camera['biz_type'], camera['service_type'], status=5, # status = 5 待分析
file_name=file_name, video_url=video_url, video_resolution=video_file.resolution,
recovered_time=video_file.end_time
)
......
......@@ -20,6 +20,7 @@ from ils_common_video.utils import aliyun_oss
from ils_common_video.utils.record_utils import record_thread, get_video_duration, time_to_seconds, judge_video_error
from ils_common_video.utils.alarm_utils import send_alarm_to_developer
from ils_common_video.db import mysql
from ils_common_video.utils.video_file import VideoFile
tz = pytz.timezone('Asia/Shanghai')
......@@ -238,7 +239,7 @@ class ProcessMessage:
remark = (remark or 'start') + '+offline'
elif record_result.get('is_completed'):
log.info('%s: %s is completed.', event['camera_code'], record_result['file_name'])
status = 1
status = 5 # 录制完成的视频状态为5
else:
next_retry_time = now + timedelta(minutes=10)
retry_count += 1
......@@ -265,10 +266,66 @@ class ProcessMessage:
next_retry_time=next_retry_time,
remark=remark
)
if status == 5:
ProcessMessage.send_unverified_file(VideoFile(record_result['file_name']),
event['video_id'], url, event['camera_code'],
event['biz_type'], event['service_type'], body['db_table'])
log.info('video_info: %s, url: %s, video_id: %s.%s', video_info, url, body['db_table'], event['video_id'])
return True
@staticmethod
def send_unverified_file(video_file, event_id, video_url,
device_code, biz_type, service_type, db_table, detection_region=''):
queue_name = 'UNVERIFIED_EVENT_QUEUE'
connection = rabbitmq_connect()
channel = connection.channel()
channel.queue_declare(queue_name, durable=True)
log.info('添加待处理文件%s,到分析队列,开始时间%s, 结束时间%s', video_file.file_name,
video_file.start_time, video_file.end_time)
log.info('视频文件%s的网络质量为%s, 评级为%s',
video_file.file_name,
video_file.network_quality, video_file.network_quality_grade)
if not os.path.isfile(video_file.full_path):
log.warning('文件%s已经不存在!', video_file.file_name)
return
if video_file.end_time - video_file.start_time < timedelta(seconds=2):
log.warning('视频文件%s播放时间较短认为有问题', video_file.file_name)
os.remove(video_file.full_path)
return
video_data = {
'event_id': event_id,
'video_url': video_url, # 视频播放地址
'sn': device_code,
'full_path': video_file.full_path,
'db_table': db_table,
'biz_type': biz_type,
'service_type': service_type,
'file_name': video_file.file_name,
'start_time': video_file.start_time.strftime('%Y-%m-%d %H:%M:%S'),
'end_time': video_file.end_time.strftime('%Y-%m-%d %H:%M:%S'),
'date': video_file.date,
'detection_region': detection_region,
'bitrate': video_file.bitrate,
'resolution': video_file.resolution,
'duration': video_file.duration,
'cloud_storage': video_file.file_path, # 云存储key
'network_quality': video_file.network_quality,
'network_quality_grade': video_file.network_quality_grade,
}
# 发送mq信息
channel.basic_publish(exchange='', routing_key=queue_name,
body=json.dumps(video_data, ensure_ascii=False))
connection.close()
@staticmethod
def stream_to_video(body, playback_stream, start_time, end_time, part_files_set):
......
......@@ -6,7 +6,7 @@ def get_parser():
parsers = argparse.ArgumentParser(
description='ISC motion detection playback video stream recording service.'
)
parsers.add_argument('-e', '--env', choices=['isc', 'eviz', 'common'], type=str,
parsers.add_argument('-e', '--env', choices=['isc', 'eviz', 'common'], default='common', type=str,
dest='env', help='choices [isc, eviz]')
parsers.add_argument('-w', '--worker', type=str,
dest='worker')
......@@ -48,6 +48,11 @@ def command_line_runner():
elif args['worker'] == 'merger':
from ils_common_video.eviz_video.merger import main as record_main
record_main()
elif args['env'] == 'common':
if args['worker'] == 'filter':
from ils_common_video.video_filter.filter import VideoFilterProcess
fp = VideoFilterProcess()
fp.run()
else:
parser.print_help()
......
......@@ -22,16 +22,22 @@ class VideoFile:
self.dir_path, self.file_path = os.path.split(full_path)
self.file_name, self.postfix = self.file_path.rsplit('.', 1)
if len(self.file_name.split('_')) == 3:
# 文件名格式不正确
self.sn, self.date, self.time = self.file_name.split('_')
name_info = self.file_name.split('_')
if len(name_info) == 3:
# 文件名格式如:G25597998_2021-08-03_11-01-52.mp4
self.sn, self.date, self.time = name_info
self.prefix = ''
# HD文件名处理
self.sn = self.sn[3:] if 'HDV' in self.sn else self.sn
self.start_time = dateutil.parser.parse(
' '.join([self.date, self.time.replace('-', ':')]))
elif len(name_info) == 4:
# 文件名格式如: EVIZ_G25597998_20210803T110152_20210803T110334.mp4
self.prefix, self.sn, self.start_time_str, self.end_time_str = name_info
self.start_time = dateutil.parser.parse(self.start_time_str)
else:
self.sn = self.date = self.time = None
self.start_time = dateutil.parser.parse(
' '.join([self.date, self.time.replace('-', ':')]))
self._duration = self._bitrate = self._resolution = self._media_type = None
self._end_time = None
self._error_log = ''
......@@ -167,7 +173,7 @@ class VideoFile:
if __name__ == '__main__':
video_file = VideoFile('/tmp/videos/E82843165_2021-07-21_15-43-24.mp4')
video_file = VideoFile('/Users/wen/Downloads/EVIZ_G25597998_20210803T110152_20210803T110334.mp4')
# print(video_file.picture_path)
print(video_file.duration)
print(video_file.size)
......
import os
import time
import json
from datetime import timedelta
from intelab_python_sdk.ffmpeg.ffmpeg_prune import prune
from intelab_python_sdk.logger import log
from dynaconf import settings
from oss2.exceptions import NoSuchKey
from ils_common_video.db.rabbitmq import rabbitmq_connect
from ils_common_video.db import mysql
from ils_common_video.const import UNVERIFIED_EVENT_QUEUE
from ils_common_video.capability.movement_verify import file_filter
from ils_common_video.utils.aliyun_oss import oss_upload_file, oss_download_file, oss_delete_file
from ils_common_video.utils.video_file import VideoFile
class VideoFilterProcess:
def __init__(self):
self.queue_name = UNVERIFIED_EVENT_QUEUE
self.connection = rabbitmq_connect()
self.channel = self.connection.channel()
def run(self):
os.makedirs(settings.get('VIDEOS_PATH'), exist_ok=True)
log.info('启动分析进程')
log.info('binding to queue {}'.format(self.queue_name))
# 超时时间(如果5秒内没有收到消息,将不会夯住等待消息,取消接收消息)
# self.connection.call_later(5, lambda: self.channel.stop_consuming())
self.channel.queue_declare(queue=self.queue_name, durable=True)
def callback(ch, method, properties, body):
log.info('received MQ message {}'.format(body))
try:
body = json.loads(body)
try:
ret = self.process_message(body)
except Exception as e:
log.error('视频文件%s分析过程出错', body['full_path'])
log.exception(e)
ret = False
if ret:
log.info('finished processing MQ message')
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
ch.basic_nack(delivery_tag=method.delivery_tag)
except Exception as e:
log.exception(e)
log.error('finished processing MQ message, error')
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(on_message_callback=callback,
queue=self.queue_name)
log.info(' [*] Waiting for messages. To exit press CTRL+C')
try:
self.channel.start_consuming()
except KeyboardInterrupt:
log.info('MQ connection closed by user')
except Exception as e:
log.exception('MQ connection closed unexpectedly. %s', e)
raise
self.connection.close()
@staticmethod
def process_message(pre_video_file):
# 下载云端视频
video_path = os.path.split(pre_video_file.get('full_path'))[0]
if video_path and not os.path.exists(video_path):
os.makedirs(video_path, exist_ok=True)
try:
oss_download_file(pre_video_file.get('video_url'), pre_video_file.get('full_path'))
if not os.path.isfile(pre_video_file.get('full_path')):
log.warning('下载文件%s失败', pre_video_file.get('video_url'))
raise FileNotFoundError
except NoSuchKey as e:
log.warning(e)
return True
video_file = VideoFile(pre_video_file['full_path'])
region = pre_video_file.get('detection_region')
if region and isinstance(region, str):
region = json.loads(region)
try:
t = time.time()
_, video_result, camera_seconds = file_filter(pre_video_file.get('full_path'), region)
time_consuming = time.time() - t
log.info('分析在%s视频文件的%s秒结束,耗时%s', pre_video_file.get('full_path'),
camera_seconds, time_consuming)
except (FileNotFoundError, NoSuchKey):
log.warning('视频文件%s不存在', pre_video_file.get('full_path'))
return True
if video_result != 1:
mysql.update_video_info(pre_video_file['db_table'], pre_video_file['event_id'], status=-2)
elif camera_seconds < 20:
mysql.update_video_info(pre_video_file['db_table'], pre_video_file['event_id'], status=1)
else:
invalid_part_url = valid_part_url = ''
camera_seconds -= 20
# 截取前段无效视频
invalid_part_name = video_file.gen_file_name(
pre_video_file['device_code'],
start_time=video_file.start_time,
end_time=video_file.start_time+timedelta(seconds=camera_seconds),
)
prune(pre_video_file.get('full_path'), invalid_part_name, duration=camera_seconds)
if os.path.isfile(invalid_part_name):
invalid_part_url = oss_upload_file('isc_record/' + invalid_part_name, invalid_part_name)
mysql.update_video_info(pre_video_file['db_table'], pre_video_file['event_id'], status=-2,
file_name=invalid_part_name, video_url=invalid_part_url,
end_time=video_file.end_time, recovered_time=video_file.end_time,
video_resolution=video_file.resolution)
# 截取后段有意义视频
valid_part_name = video_file.gen_file_name(
pre_video_file['device_code'],
start_time=video_file.start_time + timedelta(seconds=camera_seconds),
end_time=video_file.end_time
)
prune(pre_video_file.get('full_path'), valid_part_name, start_time=camera_seconds)
if os.path.isfile(valid_part_name):
valid_part_video = VideoFile(valid_part_name)
valid_part_url = oss_upload_file('isc_record/' + valid_part_name, valid_part_name)
video_id = mysql.insert_video_info(
pre_video_file['db_table'], pre_video_file['device_code'],
valid_part_video.start_time, valid_part_video.end_time,
biz_type=pre_video_file['biz_type'], service_type=pre_video_file['service_type'],
status=1, file_name=valid_part_video.file_name,
video_url=valid_part_url, video_resolution=valid_part_video.resolution)
log.info('insert new video_id: %s, db_table: %s', video_id, pre_video_file['db_table'])
if invalid_part_url and valid_part_url:
oss_delete_file(pre_video_file.get('video_url'))
os.remove(invalid_part_name)
os.remove(valid_part_name)
else:
return False
os.remove(pre_video_file.get('full_path'))
log.info('file[%s], network_quality %s, grade %s',
video_file.file_name, pre_video_file['network_quality'],
pre_video_file['network_quality_grade'])
log.info('视频文件%s的分析结果是%s', pre_video_file['full_path'], video_result)
return True
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论