提交 9b6802a3 authored 作者: zw.wang's avatar zw.wang

feat: [eviz] 萤石云摄像头移动侦测回放支持

上级 31ad1cc8
...@@ -75,6 +75,13 @@ DYNACONF_DINGTALK__SECRET = SECd5986591b53e959d5076b8d53be127b0046eddf1d76b3836b ...@@ -75,6 +75,13 @@ DYNACONF_DINGTALK__SECRET = SECd5986591b53e959d5076b8d53be127b0046eddf1d76b3836b
'oss2' 'oss2'
``` ```
### influxdb 使用保留策略 `one_week`
```
use intelab
CREATE RETENTION POLICY "one_week" ON "intelab" DURATION 168h REPLICATION 1
```
## 本地开发 ## 本地开发
配置本地配置文件`settings.local.toml` 配置本地配置文件`settings.local.toml`
...@@ -91,6 +98,19 @@ pip install -e . ...@@ -91,6 +98,19 @@ pip install -e .
告警消息合并成事件模块,任务每15分钟启动一次,合并最近的15分钟内influxdb中的告警消息为移动侦测事件,同时将移动事件的信息发生给rabbitmq。 告警消息合并成事件模块,任务每15分钟启动一次,合并最近的15分钟内influxdb中的告警消息为移动侦测事件,同时将移动事件的信息发生给rabbitmq。
- StreamRecorder - StreamRecorder
移动侦测事件取流录制模块,该模块支持多实例部署,用于消费上游EventMerger模块生产的移动侦测事件消息,根据事件的startTime和endTime向APIServer服务查询回放流地址,并进行录制,支持断点录制和视频合并。录制完成的视频将会上传到阿里云OSS,同时将链接写入Mysql。 移动侦测事件取流录制模块,该模块支持多实例部署,用于消费上游EventMerger模块生产的移动侦测事件消息,根据事件的startTime和endTime向APIServer服务查询回放流地址,并进行录制,支持断点录制和视频合并。录制完成的视频将会上传到阿里云OSS,同时将链接写入Mysql。
## camera_ai_config.ai_config_support示例:`0101000000000000`,采用二进制数判断是否启用某一配置,1-11位分别表示
- 1.是否支持云存储,默认值1, 只有开启了云存储的才会启动录制服务
- 2.是否支持移动侦测区域绘制,默认值1
- 3.是否开启全天录制,默认值0,非开启全天录制的任务是6:00-23:00之间的任务,不建议开启全天录制
- 4.是否开启移动侦测,默认值1,开启移动侦测只会针对移动侦测的视频进行云存储,建议开启
- 5.是否摄像头移动检测,默认值0
- 6.是否摄像头遮挡检测,默认值0
- 7.是否支持人体检测,默认值0
- 8.是否支持人脸检测,默认值0
- 9.是否支持人脸口罩检测,默认值0
- 10-16位暂时为0,留作备用
## 部署 ## 部署
......
...@@ -80,15 +80,17 @@ def query(cursor_dict=False): ...@@ -80,15 +80,17 @@ def query(cursor_dict=False):
@query(cursor_dict=True) @query(cursor_dict=True)
def get_camera_info(cursor, conn, camera_code=None): def get_camera_info(cursor, conn, camera_code=None, platform='isc'):
if camera_code: if camera_code:
_filter = 'where camera_info.device_code = "{}"'.format(camera_code) _filter = 'where camera_info.device_code = "{}"'.format(camera_code)
else: else:
_filter = ''' _filter = '''
where `is_valid` = 1 where camera_info.platform = "{}"
and camera_info.is_valid = 1
and biz_type is not null and biz_type is not null
and `point_index_code` is not null '''.format(platform)
''' if platform == 'isc':
_filter += ' and `point_index_code` is not null '
sql = ''' sql = '''
select select
...@@ -102,9 +104,10 @@ def get_camera_info(cursor, conn, camera_code=None): ...@@ -102,9 +104,10 @@ def get_camera_info(cursor, conn, camera_code=None):
ai_config_support, device_code, service_type, biz_type, ai_config_support, device_code, service_type, biz_type,
region_path_name, region_path_name,
1 as video_plan_type, 1 as video_plan_type,
network_quality network_quality,
is_valid, platform, treaty, model, brand, cac.video_stream_url
from camera_info from camera_info
join camera_ai_config cac left join camera_ai_config cac
on camera_info.id = cac.camera_info_id on camera_info.id = cac.camera_info_id
{} {}
order by create_time; order by create_time;
...@@ -119,24 +122,25 @@ def get_camera_info(cursor, conn, camera_code=None): ...@@ -119,24 +122,25 @@ def get_camera_info(cursor, conn, camera_code=None):
@query() @query()
def insert_video_info(cursor, conn, db_table, device_code, start_time, end_time, def insert_video_info(cursor, conn, db_table, device_code, start_time, end_time,
biz_type, service_type, status=0, biz_type, service_type, status=0,
file_name=None, video_url=None, video_resolution=None): file_name=None, video_url=None, video_resolution=None, recovered_time=None):
sql = ''' sql = '''
insert {} ( insert {} (
device_code, file_name, start_time, end_time, device_code, file_name, start_time, end_time,
video_url, video_resolution, video_url, video_resolution,
biz_type, service_type, biz_type, service_type,
status, create_time, update_time) status, create_time, update_time, expired_time, recovered_time)
value (%(device_code)s, %(file_name)s, %(start_time)s, %(end_time)s, value (%(device_code)s, %(file_name)s, %(start_time)s, %(end_time)s,
%(video_url)s, %(video_resolution)s, %(video_url)s, %(video_resolution)s,
%(biz_type)s, %(service_type)s, %(biz_type)s, %(service_type)s,
%(status)s, now(), now() %(status)s, now(), now(), date_add(now(),interval 31 day), %(recovered_time)s
) )
'''.format(db_table) '''.format(db_table)
cursor.execute(sql, { cursor.execute(sql, {
'device_code': device_code, 'file_name': file_name, 'device_code': device_code, 'file_name': file_name,
'start_time': start_time, 'end_time': end_time, 'start_time': start_time, 'end_time': end_time,
'video_url': video_url, 'video_resolution': video_resolution, 'video_url': video_url, 'video_resolution': video_resolution,
'status': status, 'biz_type': biz_type, 'service_type': service_type 'status': status, 'biz_type': biz_type, 'service_type': service_type,
'recovered_time': recovered_time
}) })
video_id = cursor.lastrowid video_id = cursor.lastrowid
conn.commit() conn.commit()
......
import os
import time
import json
import shutil
from dynaconf import settings
import multiprocessing
from datetime import datetime, timedelta
from urllib.error import URLError
import dateutil.parser
from intelab_python_sdk.logger import log
from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat as ffmpeg_concat
from ils_common_video.db.influxdb import influxdb
from ils_common_video.db.rabbitmq import rabbitmq_connect
from ils_common_video.db import mysql
from ils_common_video.utils.record_utils import get_time_str, judge_video_error
from ils_common_video.utils.video_file import VideoFile
from ils_common_video.eviz_video.pre_event import PreEvent
from ils_common_video.utils.alarm_utils import send_alarm_to_developer
from ils_common_video.utils.aliyun_oss import oss_upload_file
influxdb_config = settings.get('INFLUXDB')
def main(*args):
while True:
t1 = time.time()
try:
pool_merger()
except Exception as e:
log.exception(e)
send_alarm_to_developer('eviz-merger', e)
time.sleep(5)
continue
duration = round(time.time() - t1, 2)
log.info('本轮处理结束,共耗时%s秒,下次任务在:%s',
duration, datetime.now() + timedelta(seconds=900))
if duration < 900:
time.sleep(900)
def pool_merger():
camera_files = get_sn_video_files()
log.info('获取%s路摄像头的%s个视频文件', len(camera_files),
sum([len(v) for k, v in camera_files.items()]))
# 开启四路线程池
pool = multiprocessing.Pool(processes=1)
for camera_code, files in camera_files.items():
pool.apply_async(merger_events, args=(camera_code, files,))
pool.close()
pool.join()
def get_sn_video_files():
"""
获取 videos_path路径下所有的mp4,并根据摄像头序列号分组
"""
# 获取当前存储空间内所有的视频文件
cameras_files = {}
for root, dirs, files in os.walk(settings.get('VIDEOS_PATH'), topdown=False):
# 对文件名进行排序
for file_name in files:
if not (file_name.endswith('.mp4') and file_name.count('_') == 2):
continue
# 文件名格式不正确
sn = file_name.split('_')[0]
# HD文件名处理
sn = sn[3:] if 'HDV' in sn else sn
full_path = os.path.join(root, file_name)
if sn not in cameras_files:
cameras_files[sn] = {full_path}
else:
cameras_files[sn].add(full_path)
return cameras_files
def merger_events(camera_code, files):
t = time.time()
all_pre_files = get_pre_files(camera_code, files)
if len(all_pre_files) == 0:
return
# 通过当前摄像头的文件最早和最晚的时间查询事件
start_time, end_time = all_pre_files[0].start_time, all_pre_files[-1].end_time
pre_events = PreEvent(start_time=int(start_time.timestamp() * 1000),
end_time=int(end_time.timestamp() * 1000))
try:
log.info('查询摄像头%s在%s到%s之间的移动事件', camera_code, start_time, end_time)
camera, events = pre_events.get_events(camera_code)
# insert_influxdb_events_info(camera_code, all_pre_files, events)
except URLError as e:
log.exception(e)
return
if not camera:
log.warning('摄像头%s已经被删除,将删除该摄像头的所有视频!', camera_code)
for video_file in all_pre_files:
if os.path.isfile(video_file.full_path):
os.remove(video_file.full_path)
return
pre_files = verified_events_files(events, all_pre_files)
# if not settings.get('MERGER_OFF', False) \
# and camera.get('support_activity_detect', 1): # 关闭事件过滤
# pre_files = verified_events_files(events, all_pre_files)
# else:
# log.warning('事件过滤逻辑已经关闭')
# pre_files = all_pre_files
log.info('共耗时%s秒,筛选预视频文件数:%s', round(time.time() - t, 3), len(pre_files))
if len(pre_files) < 1:
return
# 根据文件事件获取的事件过滤的视频文件如果不在pre中就需要删除了
# deal_with_func = send_unverified_file if camera['support_activity_detect'] else send_verified_file
# deal_with_func(pre_files, camera)
send_unverified_file(pre_files, camera)
log.info('==============================')
def get_pre_files(camera_code, files):
"""
获取本地可以处理的视频文件,剔除损坏或未录制完成的文件
"""
pre_files = set()
for file_path in files:
video_file = VideoFile(file_path)
# TODO 异常视频报警
if video_file.error_log or video_file.duration < 2:
if video_file.start_time < datetime.now() - timedelta(minutes=10):
log.warning('文件%s损坏,直接删除, %s', video_file.full_path, video_file.error_log)
os.remove(video_file.full_path)
continue
pre_files.add(video_file)
pre_files = sorted(list(pre_files), key=lambda x: x.start_time)
log.info('设备%s-当前视频文件数:%s', camera_code, len(pre_files))
return pre_files
def concat_multiple_files(event_videos):
"""
合并多个文件,一个事件最多对应一个视频文件
"""
# 对同一事件对应的多个视频进行合并,合并之后只有一个视频文件
if len(event_videos) > 1:
first_video_file = event_videos[0]
concat_file_name = os.path.join(first_video_file.dir_path,
'.tmp_' + first_video_file.file_path)
ffmpeg_concat([video_file.full_path for video_file in event_videos], concat_file_name, removed=True)
# 重命名合并之后的文件为第一个文件,并更新信息
if os.path.isfile(concat_file_name):
shutil.move(concat_file_name, first_video_file.full_path)
video_file = VideoFile(first_video_file.full_path)
event_videos = [video_file]
return event_videos
def verified_events_files(pre_events, all_files):
"""
验证事件中的有效视频
"""
pre_event_files = []
pre_video_files_set = set()
for event in pre_events:
cur_event_videos_set = set()
loss_seconds = 0
last_file_end_time = None
event_start_time = dateutil.parser.parse(event['start_time'])
event_end_time = dateutil.parser.parse(event['end_time'])
event_seconds = (event_end_time - event_start_time).total_seconds()
for video_file in all_files:
if video_file.end_time < event_start_time:
# 文件时间整体在当前事件之前
continue
elif video_file.start_time - timedelta(seconds=5) > event_end_time:
break
else:
# 判定改文件是否为花屏文件!
_, error_log = judge_video_error(video_file.file_path)
if error_log:
continue
cur_event_videos_set.add(video_file)
if last_file_end_time \
and video_file.start_time > last_file_end_time:
loss_seconds += (video_file.start_time - last_file_end_time).total_seconds()
last_file_end_time = video_file.end_time
if last_file_end_time and last_file_end_time < event_end_time:
loss_seconds += (event_end_time - last_file_end_time).total_seconds()
pre_video_files_set.update(cur_event_videos_set)
cur_event_video = concat_multiple_files(
sorted(list(cur_event_videos_set), key=lambda x: x.start_time))
# 计算视频的网络质量
log.info('事件[%s, %s]的持续时长为%s, 其中丢失视频时长为%s', event_start_time, event_end_time,
get_time_str(int(event_seconds)), get_time_str(int(loss_seconds)))
for video_file in cur_event_video:
video_file.network_quality = 1 - loss_seconds / event_seconds
pre_event_files.extend(cur_event_video)
for video_file in all_files:
if video_file not in pre_video_files_set \
and os.path.isfile(video_file.full_path):
log.info('文件%s为无效文件,删除...', video_file.file_name)
os.remove(video_file.full_path)
return pre_event_files
def send_unverified_file(pre_files, camera):
"""
发送待处理的文件到分析队列
"""
queue_name = 'UNVERIFIED_EVENT_QUEUE'
connection = rabbitmq_connect()
channel = connection.channel()
channel.queue_declare(queue_name, durable=True)
for video_file in pre_files:
log.info('添加待处理文件%s,到分析队列,开始时间%s, 结束时间%s', video_file.file_name,
video_file.start_time, video_file.end_time)
log.info('视频文件%s的网络质量为%s, 评级为%s',
video_file.file_name,
video_file.network_quality, video_file.network_quality_grade)
if not os.path.isfile(video_file.full_path):
log.warning('文件%s已经不存在!', video_file.file_name)
continue
if video_file.end_time - video_file.start_time < timedelta(seconds=2):
log.warning('视频文件%s播放时间较短认为有问题', video_file.file_name)
os.remove(video_file.full_path)
continue
file_name = VideoFile.gen_file_name(
camera['device_code'], start_time=video_file.start_time, end_time=video_file.end_time, prefix='EVIZ'
)
# 上传待分析文件到云存储
video_url = oss_upload_file('isc_record/' + file_name, video_file.full_path)
event_id = mysql.insert_video_info(
camera['db_table'], camera['device_code'], video_file.start_time, video_file.end_time,
camera['biz_type'], camera['service_type'], status=1,
file_name=file_name, video_url=video_url, video_resolution=video_file.resolution,
recovered_time=video_file.end_time
)
video_data = {
'event_id': event_id,
'full_path': video_file.full_path,
'sn': camera['device_code'],
'biz_type': camera['biz_type'],
'file_name': file_name,
'start_time': video_file.start_time.strftime('%Y-%m-%d %H:%M:%S'),
'end_time': video_file.end_time.strftime('%Y-%m-%d %H:%M:%S'),
'date': video_file.date,
'detection_region': camera.get('detection_region') or '',
'bitrate': video_file.bitrate,
'resolution': video_file.resolution,
'duration': video_file.duration,
'video_url': video_url, # 视频播放地址
'cloud_storage': video_file.file_path, # 云存储key
'network_quality': video_file.network_quality,
'network_quality_grade': video_file.network_quality_grade
}
video_data.update(camera)
if video_url:
# 视频文件上传完成之后可以删除了
os.remove(video_file.full_path)
# 发送mq信息
channel.basic_publish(exchange='', routing_key=queue_name,
body=json.dumps(video_data, ensure_ascii=False))
connection.close()
if __name__ == '__main__':
from intelab_python_sdk.logger import log_init
log_init('merger_events_plus', True)
main()
import time
from datetime import datetime, timedelta
from dynaconf import settings
from intelab_python_sdk.logger import log
from ils_common_video.db.mysql import get_camera_info
from ils_common_video.utils.eviz_client import EvizVersionClient
class PreEvent:
def __init__(self, start_time=None, end_time=None, ip_address=None):
self.timezone_shift = 8 - settings.get('TIMEZONE_SHIFT', 0)
cur_time = int(datetime.timestamp(datetime.utcnow() + timedelta(hours=self.timezone_shift)))
self.start_time = start_time or ((cur_time - int(settings.get('EVENT_START_HOUR', 3)) * 3600) * 1000)
self.end_time = end_time or ((cur_time - 10 * 60) * 1000)
log.debug('本次获取事件起始时间:%s, 结束时间: %s',
self._strftimestamp(self.start_time),
self._strftimestamp(self.end_time))
self.interval = settings.get('EVENT_INTERVAL', 1 * 60) # 合并事件的间隔时间 30s
self.ysy_client = EvizVersionClient()
self.ip_address = ip_address or settings.get('SERVICE_NAME', '')
def _strftimestamp(self, timestamp):
return time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(timestamp / 1000 - self.timezone_shift))
def get_alarm_list(self, token, camera_sn):
alarm_list = []
t1 = time.time()
alarms = self.ysy_client.get_alarm_list(
token, camera_sn, self.start_time, self.end_time)
while True:
if not alarms.get('data'):
break
else:
alarm_list.extend(alarms['data'])
size = alarms['page']['size']
if len(alarms['data']) < size:
break
# 获取分页的下一页
page = alarms['page']['page']
page = page + 1
alarms = self.ysy_client.get_alarm_list(
token, camera_sn, self.start_time, self.end_time,
page_start=page, page_size=size)
log.info('共获取到%s条移动报警信息,耗时%s', len(alarm_list), time.time() - t1)
return alarm_list
def merge_alarm_to_event(self, alarm_list, channel=1):
events_list = []
for alarm in alarm_list[::-1]:
# 莹石云时间单位是毫秒
alarm_time = int(alarm['alarmTime']) / 1000
alarm['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(alarm_time))
if alarm['channelNo'] != channel:
continue
# 如果当前告警信息距离上一个事件的结束时间小于80s
# 而且上一个事件的总时长小于30分钟
if len(events_list) > 0 \
and alarm_time - events_list[-1]['end_time'] < 80 \
and events_list[-1]['end_time'] - events_list[-1]['start_time'] < 30 * 60:
events_list[-1]['end_time'] = alarm_time + 80
continue
# 根据告警时间生成事件(前-15s,后+80s)
events_list.append({
'start_time': alarm_time - 15,
'end_time': alarm_time + 80,
})
log.info('合并间隔时间较近的事件后得到事件数%s' % len(events_list))
for event in events_list:
event['start_time'] = self._strftimestamp(event['start_time'] * 1000)
event['end_time'] = self._strftimestamp(event['end_time'] * 1000)
return events_list
def get_events(self, camera_sn):
log.info('处理摄像头%s', camera_sn)
token = EvizVersionClient.get_access_token(camera_sn)
camera = get_camera_info(camera_code=camera_sn)
camera = camera[0] if camera else None
# 使用移动告警接口获取事件
alarm_list = self.get_alarm_list(token, camera_sn)
channel = int(camera_sn.split(':', 1)[1]) if ':' in camera_sn else 1
return camera, self.merge_alarm_to_event(alarm_list, channel)
import time
import os
import pytz
from datetime import datetime, timedelta, time as dtime
from dynaconf import settings
from apscheduler.schedulers.blocking import BlockingScheduler
from intelab_python_sdk.logger import log
from intelab_python_sdk.ffmpeg.ffmpeg_record import FfmpegRecordThread
from ils_common_video.db import mysql, redis
from ils_common_video.db.influxdb import influxdb
from ils_common_video.utils.eviz_client import EvizVersionClient
# from intelab_video.ffmpeg.read_video_resolution import FfprobeThread
service_name = settings.get('SERVICE_NAME', '')
TIMEZONE = 'Asia/Shanghai'
tz = pytz.timezone(TIMEZONE)
class VideoRecord:
def __init__(self):
self.running = {}
self.ip_address = service_name
self.last_check_time = None
self.today = datetime.now(tz)
self.stop_time = datetime.combine(self.today.date(), dtime(settings.get('RECORD_STOP_HOUR', 22), 0), tz)
def create_thread(self, camera_sn, rtmp_url, video_file_path,
video_stream_type='rtmp', video_duration=60):
if video_stream_type == 'rtsp':
thread = FfmpegRecordThread(camera_sn, rtmp_url, video_file_path, video_duration,
rtsp_transport='tcp', stimeout=5000000)
else:
thread = FfmpegRecordThread(camera_sn, rtmp_url, video_file_path, video_duration)
thread.video_stream_type = video_stream_type
thread.daemon = True
thread.start()
self.running[camera_sn] = thread
@staticmethod
def update_rtmp_stream(camera_sn, stream_url, is_hd=False):
ysy_client = EvizVersionClient()
token = ysy_client.get_access_token(camera_sn)
if not token: # 设备未获取到有效token
return stream_url
res = ysy_client.get_live_address(token, camera_sn)
if res['code'] == '200':
data = res['data'][0]
if data['ret'] == '200':
if settings.get('VIDEO_RESOLUTION') == 'HD1080' or is_hd:
stream_url = data['rtmpHd']
else:
stream_url = data['rtmp']
else:
log.error(EvizVersionClient.ret_code.get(
data['ret'], '未知错误{}').format(camera_sn))
if data['ret'] == '60060': # 设备未开通直播
open_result = ysy_client.open_device_live(token, camera_sn)
if open_result['code'] == '200' \
and open_result['data'][0]['ret'] == '200':
return VideoRecord.update_rtmp_stream(camera_sn, stream_url, is_hd)
return stream_url
@staticmethod
def resolution_compare(pattern, string):
try:
p_w, p_h = pattern.split('x')
s_w, s_h = string.split('x')
return int(p_w) * int(p_h) > int(s_w) * int(s_h)
except Exception as e:
log.exception(e)
return False
@staticmethod
def update_video_stream(camera, stream_url):
# 判断视频是否使用高清流
# is_hd = True if VideoRecord.resolution_compare(camera['video_resolution'], '768x432') else False
is_hd = True
stream_url = VideoRecord.update_rtmp_stream(camera['device_code'], stream_url, is_hd=is_hd)
# TODO rtmp直播流地址缓存到云端
# if stream_url:
# mysql.update_video_stream(camera['device_code'], stream_url)
return stream_url
def run_record(self):
sn_list = []
# last_time = datetime.now() - timedelta(minutes=8)
# offline_device_list = db.influxdb_client.get_devices_status(last_time, status=0)
for camera_info in mysql.get_camera_info(platform='eviz'):
# if camera_sn in offline_device_list:
# log.info('设备%s在此刻已经离线', camera_sn)
# db.influxdb_client.insert_restart_second(
# camera_sn, reason='离线')
# continue
if not camera_info['ai_config_support'] or len(camera_info['ai_config_support']) < 5:
continue
if camera_info['ai_config_support'][0] == '0':
# 只有开启了云存储的才会启动录制服务
log.warning('摄像头%s未开启云存储功能,不需要进行录制', camera_info['device_code'])
continue
full_day = True if camera_info['ai_config_support'][2] == '1' else False
movement = True if camera_info['ai_config_support'][3] == '1' else False
sn_list.append(camera_info['device_code'])
rtmp_url = VideoRecord.update_video_stream(camera_info, camera_info['video_stream_url'])
video_path = settings.get('VIDEOS_PATH')
os.makedirs(video_path, exist_ok=True)
video_file_name = os.path.join(
video_path, '{}_%Y-%m-%d_%H-%M-%S.mp4'.format(camera_info['device_code']))
if camera_info['device_code'] not in self.running and rtmp_url:
self.create_thread(camera_info['device_code'], rtmp_url, video_file_name)
return sn_list
def record_check(self):
sn_list = []
kill_proc_sn = []
check = False
now = datetime.now()
if not self.last_check_time \
or (now - self.last_check_time > timedelta(minutes=5)):
self.last_check_time = now
log.info('检查是否有新摄像头...')
sn_list = self.run_record()
check = True
t1 = time.time()
for camera_sn, record_thread in self.running.items():
if check and camera_sn not in sn_list:
kill_proc_sn.append(camera_sn)
continue
if record_thread.is_alive():
log.info('%s的录制进程还在录制,初始分辨率为%s',
camera_sn, record_thread.video_resolution)
# 创建进程判定分辨率是否发生变化
# if record_thread.video_stream_type == 'rtmp':
# try:
# ffprobe_key = 'video:camera_sn:{}:stream_url:{}:resolution'.format(
# camera_sn, record_thread.stream_url)
# with redis_utils.redis_connect() as pipe:
# res = pipe.set(ffprobe_key, '', nx=True, ex=120)
# if res:
# log.info('设置%s的查询分辨率任务', camera_sn)
# probe_thread = FfprobeThread(camera_sn, record_thread.stream_url,
# record_thread, ffprobe_key)
# probe_thread.daemon = True
# probe_thread.start()
# except Exception as e:
# log.exception(e)
# send_error_message('record', e)
continue
if now - record_thread.create_time > timedelta(seconds=6):
log.warning('%s录制进程退出,现在进行重启!',
record_thread.name)
self.create_thread(record_thread.name, record_thread.stream_url,
record_thread.out_file_path, record_thread.video_stream_type)
# self.retry_count(record_thread.name, now.strftime('%Y-%m-%d'))
log.info('--- \n当前存活摄像头%s路, 检查当前摄像头存活状态耗时%s',
len(self.running), round(time.time() - t1, 2))
for camera_sn in kill_proc_sn:
log.warning('摄像头%s已经不存在,将删除其录制进程', camera_sn)
if self.running[camera_sn].ffmpeg_proc:
# TODO 这里的kill并不能把录制ffmpeg进程杀掉,需要测试
self.running[camera_sn].kill()
self.running.pop(camera_sn)
def start(self):
try:
while True:
now = datetime.now(tz)
if now > self.stop_time:
break
time.sleep(1)
try:
# db.load_db(settings.get('INFLUXDB'))
self.record_check()
except Exception as e:
log.exception(e)
# send_error_message(service_name, e)
except KeyboardInterrupt:
log.info('Ctrl+C')
finally:
self.clean()
log.info('主进程退出')
def clean(self):
for camera_sn, record_thread in self.running.items():
if record_thread.is_alive():
record_thread.kill()
log.info('最后停顿5s,执行killall命令保证ffmpeg都能退出!')
time.sleep(5)
# apt-get install psmisc
# TODO 这里执行了killall如果event进程在运行可能会误杀!但是如果去掉不知道能否保证所有的ffmpeg进程都可以退掉
# os.system('killall -9 ffmpeg')
def runner():
def start_record():
video = VideoRecord()
video.start()
start_record()
scheduler = BlockingScheduler(
{'apscheduler.timezone': TIMEZONE}
)
scheduler.add_job(start_record, 'cron',
hour=settings.get('RECORD_START_HOUR', 6))
scheduler.start()
if __name__ == '__main__':
from intelab_python_sdk.logger import log_init
log_init(__file__, True, '/tmp/logs/video')
runner()
...@@ -12,7 +12,7 @@ from ils_common_video.db import rabbitmq_connect, mysql ...@@ -12,7 +12,7 @@ from ils_common_video.db import rabbitmq_connect, mysql
from ils_common_video.db.mysql import get_camera_info, insert_video_info from ils_common_video.db.mysql import get_camera_info, insert_video_info
from ils_common_video.utils.alarm_utils import send_alarm_to_developer from ils_common_video.utils.alarm_utils import send_alarm_to_developer
from ils_common_video.utils.api_helper import IntelabApiHelper, PlaybackUrlException from ils_common_video.utils.api_helper import IntelabApiHelper, PlaybackUrlException
from ils_common_video.utils.pre_event import PreEvent from ils_common_video.isc_video.pre_event import PreEvent
api_helper = IntelabApiHelper() api_helper = IntelabApiHelper()
......
...@@ -90,4 +90,6 @@ if __name__ == '__main__': ...@@ -90,4 +90,6 @@ if __name__ == '__main__':
# print(oss_download_file('https://test-qzwjtest.oss-cn-hangzhou.aliyuncs.com/test-2.mp4', 't.mp4')) # print(oss_download_file('https://test-qzwjtest.oss-cn-hangzhou.aliyuncs.com/test-2.mp4', 't.mp4'))
# print(oss_delete_file('https://test-qzwjtest.oss-cn-hangzhou.aliyuncs.com/test-2.mp4')) # print(oss_delete_file('https://test-qzwjtest.oss-cn-hangzhou.aliyuncs.com/test-2.mp4'))
# oss_download_file('D00268229_2020-10-23_14-07-13.mp4', '/tmp/v3/videos/D00268229_2020-10-23_14-07-13.mp4') # oss_download_file('D00268229_2020-10-23_14-07-13.mp4', '/tmp/v3/videos/D00268229_2020-10-23_14-07-13.mp4')
print(oss_delete_file('https://prod-jiandu-shanghai.oss-cn-shanghai.aliyuncs.com/isc_record/ISC_D86639983_20210509T143611_20210509T143859.mp4')) # print(oss_delete_file('https://prod-jiandu-shanghai.oss-cn-shanghai.aliyuncs.com/isc_record/ISC_D86639983_20210509T143611_20210509T143859.mp4'))
oss_download_file('https://prod-isc.oss-cn-shanghai.aliyuncs.com/E82843971_2021-07-21_16-28-18.mp4', '/tmp/videos/E82843971_2021-07-21_16-28-18.mp4')
"""
萤石云开发平台API工具
"""
import time
from retrying import retry
import requests
import dynaconf
from intelab_python_sdk.logger import log
from ils_common_video.db.redis import redis_connect
from ils_common_video.const import ACCOUNT_TOKEN_KEY, DEVICE_SERIAL_KEY
account_secret = dynaconf.settings.get('YSYACCOUNT', {})
# TODO 萤石云账号APP_KEY 和 APP_SECRET 从数据库中获取
APP_KEY_AND_ACCOUNT = {}
TOKEN_AND_ACCOUNT = {}
class EvizVersionClient(object):
"""
莹石云api
文档:https://open.ys7.com/doc/zh/book/index/device_select.html#device_select-api9
"""
ret_code = {
'20002': '设备{}不存在',
'20007': '设备{}不在线',
'20014': 'deviceSerial-{}不合法',
'20018': '该用户不拥有该设备{}',
'60060': '设备{}地址未绑定'
}
@staticmethod
def _get_access_token(account):
""" 根据帐号获取token值,莹石云的token有效期为7天
"""
if not account: # 设备暂未绑定帐号
return ''
secret = account_secret[account]
APP_KEY_AND_ACCOUNT[secret['app_key']] = account
with redis_connect() as pipe:
token_key = ACCOUNT_TOKEN_KEY.format(secret['app_key'])
if pipe.exists(token_key):
access_token = pipe.get(token_key)
else:
data = {
"appKey": secret['app_key'],
"appSecret": secret['app_secret']
}
result = EvizVersionClient._do_request(
'https://open.ys7.com/api/lapp/token/get', data)
log.debug(result)
if 'data' in result and 'code' in result and result['code'] == '200':
access_token = result['data']['accessToken']
expire_time = result['data']['expireTime']
else:
# 暂时未获取到Token, 15分钟后重试
access_token = ''
expire_time = int((time.time() + 15 * 60) * 1000)
# 缓存token
pipe.set(token_key, access_token)
pipe.expireat(token_key, int(expire_time / 1000)) # 设置失效时间
if access_token:
TOKEN_AND_ACCOUNT[access_token] = account
return access_token
@staticmethod
@retry(stop_max_attempt_number=8, wait_random_min=1000, wait_random_max=10000)
def _do_request(url, data=None, method='POST'):
"""
调用莹石云接口
"""
res = requests.request(method, url, params=data, timeout=10)
res.raise_for_status()
return res.json()
@staticmethod
def get_access_token(sn):
sn, channel = sn.split(':', 1) if ':' in sn else (sn, 1)
account = EvizVersionClient.choice_account_of_device(sn)
return EvizVersionClient._get_access_token(account)
@staticmethod
def get_live_address(access_token, sn):
""" 获取萤石云直播地址
"""
sn, channel = sn.split(':', 1) if ':' in sn else (sn, 1)
live_data = {
"accessToken": access_token,
"source": "{}:{}".format(sn, channel)
}
result = EvizVersionClient._do_request(
'https://open.ys7.com/api/lapp/live/address/get', data=live_data)
return result
@staticmethod
def choice_account_of_device(sn):
""" 查询设备属于哪个萤石云账号
"""
sn, channel = sn.split(':', 1) if ':' in sn else (sn, 1)
sn_key = DEVICE_SERIAL_KEY.format(sn)
with redis_connect() as pipe:
account = ''
if pipe.exists(sn_key):
account = pipe.get(sn_key)
else:
log.info('查询设备属于哪个帐号...')
for cur_account in account_secret:
log.info('查询设备%s是否属于%s的帐号', sn, cur_account)
access_token = EvizVersionClient._get_access_token(cur_account)
result = EvizVersionClient.get_device_info(access_token, sn)
if result.get('code') == '200':
account = cur_account
break
else:
log.info('当前设备%s不在配置的帐号权限中', sn)
# 保存摄像头和帐号之间的关系,暂时保存22小时
pipe.set(sn_key, account, ex=3600 * 22)
return account
@staticmethod
def get_video_by_time(access_token, sn, start_time=None, end_time=None, rec_type=1):
sn, channel = sn.split(':', 1) if ':' in sn else (sn, 1)
url = 'https://open.ys7.com/api/lapp/video/by/time'
data = {
'accessToken': access_token,
'deviceSerial': sn,
'recType': rec_type
}
if start_time:
data.update({'startTime': start_time})
if end_time:
data.update({'endTime': end_time})
result = EvizVersionClient._do_request(url, data)
return result
@staticmethod
def get_alarm_list(access_token, sn, start_time=None, end_time=None, page_size=50, page_start=0):
sn, channel = sn.split(':', 1) if ':' in sn else (sn, 1)
url = ' https://open.ys7.com/api/lapp/alarm/device/list'
data = {
'accessToken': access_token,
'deviceSerial': sn,
'status': 2, # 2-全部,1-已读,0-未读
'pageSize': page_size,
'pageStart': page_start,
'alarmType': -1
}
if start_time:
data.update({'startTime': start_time})
if end_time:
data.update({'endTime': end_time})
result = EvizVersionClient._do_request(url, data)
return result
@staticmethod
def get_device_info(access_token, sn):
sn, channel = sn.split(':', 1) if ':' in sn else (sn, 1)
url = 'https://open.ys7.com/api/lapp/device/info'
data = {
'accessToken': access_token,
'deviceSerial': sn
}
return EvizVersionClient._do_request(url, data)
@staticmethod
def get_device_status(access_token, sn):
sn, channel = sn.split(':', 1) if ':' in sn else (sn, 1)
url = 'https://open.ys7.com/api/lapp/device/status/get'
data = {
'accessToken': access_token,
'deviceSerial': sn
}
return EvizVersionClient._do_request(url, data)
@staticmethod
def open_device_live(access_token, sn):
""" 该接口用于根据序列号和通道号批量开通直播功能
"""
sn, channel = sn.split(':', 1) if ':' in sn else (sn, 1)
url = 'https://open.ys7.com/api/lapp/live/video/open'
data = {
'accessToken': access_token,
'source': '{}:{}'.format(sn, channel)
}
log.info('开通设备%s:%s的直播功能', sn, channel)
return EvizVersionClient._do_request(url, data)
@staticmethod
def set_alarm_sound(access_token, sn):
sn, channel = sn.split(':', 1) if ':' in sn else (sn, 1)
url = 'https://open.ys7.com/api/lapp/device/alarm/sound/set'
data = {
'accessToken': access_token,
'deviceSerial': sn,
'type': 2
}
log.info('设置%s的告警声音为%s(0-短叫,1-长叫,2-静音)', sn, 2)
return EvizVersionClient._do_request(url, data)
@staticmethod
def get_device_traffic_detail(access_token, sn, start_time=None, end_time=None, page_size=50, page_start=0):
sn, channel = sn.split(':', 1) if ':' in sn else (sn, 1)
url = 'https://open.ys7.com/api/lapp/traffic/device/detail'
data = {
'accessToken': access_token,
'deviceSerial': sn,
'pageSize': page_size,
'pageStart': page_start,
}
if start_time:
data.update({'startTime': start_time})
if end_time:
data.update({'endTime': end_time})
result = EvizVersionClient._do_request(url, data)
return result
@staticmethod
def set_algorithm_config(access_token, sn, value=4):
sn, channel = sn.split(':', 1) if ':' in sn else (sn, 1)
url = 'https://open.ys7.com/api/lapp/device/algorithm/config/set'
data = {
'accessToken': access_token,
'deviceSerial': sn,
'type': 0,
'channelNo': channel,
'value': value # 该值为0~6,6表示灵敏度最高
}
return EvizVersionClient._do_request(url, data)
@staticmethod
def get_device_list(access_token, page_size=50, page_start=0):
url = 'https://open.ys7.com/api/lapp/device/list'
data = {
'accessToken': access_token,
'pageSize': page_size,
'pageStart': page_start
}
return EvizVersionClient._do_request(url, data)
@staticmethod
def get_offline_device_list():
offline_device_list = []
for cur_account in account_secret:
t1 = time.time()
log.info('正在查询帐号%s下所有设备的离线状态...', cur_account)
access_token = EvizVersionClient._get_access_token(cur_account)
result = EvizVersionClient.get_device_list(access_token)
online = 0
offline = 0
while True:
if not result.get('data'):
break
else:
for device in result['data']:
if device['status'] == 1:
online += 1
else:
offline_device_list.append(device['deviceSerial'])
offline += 1
size = result['page']['size']
if len(result['data']) < size:
break
# 获取分页的下一页
page = result['page']['page']
page = page + 1
result = EvizVersionClient.get_device_list(access_token, page_start=page, page_size=size)
log.info('查询耗时%s,帐号%s共有%s个设备,在线/离线:%s/%s', time.time() - t1,
cur_account, offline + online, online, offline)
return offline_device_list
@staticmethod
def set_encrypt_off(access_token, sn, validate_code):
sn, channel = sn.split(':', 1) if ':' in sn else (sn, 1)
url = 'https://open.ys7.com/api/lapp/device/encrypt/off'
data = {
'accessToken': access_token,
'deviceSerial': sn,
'validateCode': validate_code
}
return EvizVersionClient._do_request(url, data)
@staticmethod
def create_open_cloud_project(access_token, project_id):
url = 'https://open.ys7.com/api/open/cloud/v1/project/{}'.format(project_id)
data = {
'accessToken': access_token,
'expireDays': 1,
'projectName': 'WZW_Test',
}
return EvizVersionClient._do_request(url, data)
@staticmethod
def record_cloud_video_save(access_token, sn, project_id, file_id,
start_time, end_time, rec_type):
url = 'https://open.ys7.com/api/open/cloud/v1/video/save'
data = {
'accessToken': access_token,
'channelNo': 1,
'deviceSerial': sn,
'fileId': file_id,
'projectId': project_id,
'recType': rec_type,
'startTime': start_time,
'endTime': end_time
}
return EvizVersionClient._do_request(url, data)
@staticmethod
def get_cloud_video_download_url(access_token, file_id, project_id):
url = 'https://open.ys7.com/api/open/cloud/v1/file/download'
data = {
'accessToken': access_token,
'fileId': file_id,
'projectId': project_id
}
return EvizVersionClient._do_request(url, data, 'GET')
@staticmethod
def delete_cloud_video_download_url(access_token, file_id, project_id):
url = 'https://open.ys7.com/api/open/cloud/v1/file'
data = {
'accessToken': access_token,
'fileId': file_id,
'projectId': project_id
}
return EvizVersionClient._do_request(url, data, method='DELETE')
if __name__ == '__main__':
ysy_client = EvizVersionClient()
camera_sn = 'E57377023'
token = ysy_client.get_access_token(camera_sn)
print(ysy_client.get_live_address(token, camera_sn))
import shutil
import os
from datetime import timedelta
import dateutil.parser
from intelab_python_sdk.logger import log
from intelab_python_sdk.ffmpeg import ffmpeg_capture
from ils_common_video.utils.record_utils import get_video_duration, time_to_seconds
class VideoFile:
def __init__(self, full_path):
"""
full_path: 文件的全路径
"""
self._duration = None
self._is_opened = None
self._frame = None
self.full_path = full_path
self.dir_path, self.file_path = os.path.split(full_path)
self.file_name, self.postfix = self.file_path.rsplit('.', 1)
if len(self.file_name.split('_')) == 3:
# 文件名格式不正确
self.sn, self.date, self.time = self.file_name.split('_')
# HD文件名处理
self.sn = self.sn[3:] if 'HDV' in self.sn else self.sn
else:
self.sn = self.date = self.time = None
self.start_time = dateutil.parser.parse(
' '.join([self.date, self.time.replace('-', ':')]))
self._duration = self._bitrate = self._resolution = self._media_type = None
self._end_time = None
self._error_log = ''
self.load = False
self._picture_path = os.path.join(self.dir_path, self.file_name + '.jpg')
self._device_name = ''
self.rename = False
self.network_quality = 1.0 # 网络质量得分
self._network_quality_grade = ''
@property
def error_log(self):
if not self._error_log and not self.load:
self._get_video_info()
return self._error_log
@property
def duration(self):
if not self._duration:
self._get_video_info()
return self._duration
@property
def bitrate(self):
if not self._bitrate:
self._get_video_info()
return self._bitrate
@property
def resolution(self):
if not self._resolution:
self._get_video_info()
return self._resolution
@property
def media_type(self):
if not self._media_type:
self._get_video_info()
return self._media_type
@property
def end_time(self):
if not self._end_time:
self._get_video_info()
return self._end_time
def _get_video_info(self):
video_info, self._error_log = get_video_duration(self.full_path)
self._duration = time_to_seconds(video_info['duration'])
self._bitrate = video_info['bitrate']
self._resolution = video_info['resolution']
self._media_type = video_info['media_type']
self._end_time = self.start_time + timedelta(seconds=self._duration)
if self._duration == 7 \
and self._media_type == 'VideoHandler' \
and self._resolution == '512x288':
# 时长为7秒且无音频且分辨率为512x288的视频文件为萤石云无效视频
self._error_log = '萤石云无效视频分辨率为512x288'
self.load = True
@property
def network_quality_grade(self):
if self.network_quality > 0.95:
self._network_quality_grade = '优'
elif self.network_quality > 0.20:
self._network_quality_grade = '良'
else:
self._network_quality_grade = '差'
return self._network_quality_grade
def rename_by_start_time(self, start_time):
new_file_name = '{}_{}'.format(self.sn, start_time.strftime('%Y-%m-%d_%H-%M-%S'))
full_path = os.path.join(self.dir_path, '{}.{}'.format(new_file_name, self.postfix))
if os.path.isfile(self._picture_path):
shutil.move(self._picture_path, os.path.join(self.dir_path, '{}.{}'.format(new_file_name, 'jpg')))
return full_path
@property
def picture_path(self):
if not os.path.exists(self._picture_path):
# 如果文件所略图不存在,则打开视频文件,保存一帧
ffmpeg_capture.capture(self.full_path, self._picture_path)
return self._picture_path
@property
def device_name(self):
return self._device_name
@property
def size(self):
"""
获取文件大小(M: 兆) K = B / 1024, M = K / 1024, G = M / 1024
"""
file_path = self.get_exists_file()
size = 0
try:
if os.path.isfile(file_path):
size = os.path.getsize(file_path) / 1024.0
except Exception as e:
log.warning('获取文件%s大小失败!', self.file_name)
log.exception(e)
return size
def get_uploaded_name(self):
new_file_file_name = os.path.join(self.dir_path,
self.device_name + self.file_path.replace('-', '_'))
return new_file_file_name
def get_exists_file(self):
if os.path.exists(self.full_path):
file_path = self.full_path
else:
uploaded_name = self.get_uploaded_name()
if os.path.exists(uploaded_name):
file_path = uploaded_name
else:
file_path = ''
log.warning('未获取到文件%s的有效的文件名', self.file_name)
return file_path
@staticmethod
def gen_file_name(camera_code, start_time, end_time, prefix='ISC', part_num=None):
elements = [
prefix, camera_code,
start_time.strftime('%Y%m%dT%H%M%S'),
end_time.strftime('%Y%m%d%H%M%S')
]
if part_num is not None:
elements.append(str(part_num))
return '_'.join(elements) + '.mp4'
if __name__ == '__main__':
video_file = VideoFile('/tmp/videos/E82843165_2021-07-21_15-43-24.mp4')
# print(video_file.picture_path)
print(video_file.duration)
print(video_file.size)
print(video_file.resolution)
...@@ -4,7 +4,7 @@ from dynaconf import settings ...@@ -4,7 +4,7 @@ from dynaconf import settings
from datetime import datetime from datetime import datetime
from ils_common_video.utils.isc_client import HikVisionClient from ils_common_video.utils.isc_client import HikVisionClient
from ils_common_video.utils.pre_event import PreEvent from ils_common_video.isc_video.pre_event import PreEvent
tz = pytz.timezone('Asia/Shanghai') tz = pytz.timezone('Asia/Shanghai')
......
...@@ -7,7 +7,7 @@ from dynaconf import settings ...@@ -7,7 +7,7 @@ from dynaconf import settings
from ils_common_video.utils.isc_client import HikVisionClient from ils_common_video.utils.isc_client import HikVisionClient
from ils_common_video.utils.record_utils import record_thread, get_video_duration from ils_common_video.utils.record_utils import record_thread, get_video_duration
from ils_common_video.utils.api_helper import PlaybackUrlException from ils_common_video.utils.api_helper import PlaybackUrlException
from ils_common_video.utils.pre_event import PreEvent from ils_common_video.isc_video.pre_event import PreEvent
tz = pytz.timezone('Asia/Shanghai') tz = pytz.timezone('Asia/Shanghai')
...@@ -23,10 +23,10 @@ client = HikVisionClient(config.get('KEY'), config.get('SECRET'), ...@@ -23,10 +23,10 @@ client = HikVisionClient(config.get('KEY'), config.get('SECRET'),
def main(): def main():
start_time = datetime(2021, 7, 5, 14, 33, 56).astimezone(tz) start_time = datetime(2021, 7, 22, 10, 0, 0).astimezone(tz)
# start_time = datetime(2021, 5, 28, 9, 10, 59).astimezone(tz) # start_time = datetime(2021, 5, 28, 9, 10, 59).astimezone(tz)
end_time = datetime(2021, 7, 5, 14, 34, 15).astimezone(tz) end_time = datetime(2021, 7, 22, 10, 5, 0).astimezone(tz)
camera_index = '8f50e406cad6489fac443e034d29a66f' camera_index = 'a3cafffd4114438eb197f48af1e19293'
results = [] results = []
try: try:
...@@ -51,6 +51,7 @@ def main(): ...@@ -51,6 +51,7 @@ def main():
print('ERROR:', e) print('ERROR:', e)
else: else:
print(res) print(res)
print(results)
for event in results: for event in results:
cur_start_time = max(event['start_time'], start_time) cur_start_time = max(event['start_time'], start_time)
...@@ -75,7 +76,7 @@ def stream_record(stream, start_time, end_time): ...@@ -75,7 +76,7 @@ def stream_record(stream, start_time, end_time):
video_path, 'rtmp_{}_{}_{}.mp4'.format('y', start_time, end_time)) video_path, 'rtmp_{}_{}_{}.mp4'.format('y', start_time, end_time))
# TODO 多进程处理 # TODO 多进程处理
print(stream_url, start_time, end_time) print(stream_url, start_time, end_time)
record_thread(stream_url, file_name, thread_name='y', protocol=stream['protocol']) # record_thread(stream_url, file_name, thread_name='y', protocol=stream['protocol'])
return get_video_duration(file_name) return get_video_duration(file_name)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论