提交 0628fdce authored 作者: zw.wang's avatar zw.wang

feat: 任务和录制视频

上级 f93b4868
...@@ -5,5 +5,3 @@ WORKDIR /root/hikvision-isc ...@@ -5,5 +5,3 @@ WORKDIR /root/hikvision-isc
COPY ./ ./ COPY ./ ./
RUN pip install -e . RUN pip install -e .
CMD python hikvision_isc_client/app.py
version: "2" version: "2"
services: services:
isc-service: api-server:
container_name: isc-service container_name: api-server
build: build:
context: . context: .
ports: ports:
- 5001:5001 - 5001:5001
volumes:
- /var/log/event_rcv:/var/log/event_rcv
command: python hikvision_isc_client/app.py
isc-recorder:
container_name: isc-recorder
build:
context: .
volumes:
- /var/log/event_rcv:/var/log/event_rcv
command: python hikvision_isc_client/recorder.py
isc-merger:
container_name: isc-merger
build:
context: .
volumes: volumes:
- /data/videos/isc-record:/data/videos/isc-record - /data/videos/isc-record:/data/videos/isc-record
- /var/log/event_rcv:/var/log/event_rcv - /var/log/event_rcv:/var/log/event_rcv
command: python hikvision_isc_client/event_merger.py
influxdb: influxdb:
container_name: influxdb container_name: influxdb
image: ilabservice-registry.cn-hangzhou.cr.aliyuncs.com/basic/influxdb:monitor image: ilabservice-registry.cn-hangzhou.cr.aliyuncs.com/basic/influxdb:monitor
...@@ -22,3 +38,11 @@ services: ...@@ -22,3 +38,11 @@ services:
environment: environment:
- INFLUXDB_ADMIN_USER=admin - INFLUXDB_ADMIN_USER=admin
- INFLUXDB_ADMIN_PASSWORD=ilabservice123 - INFLUXDB_ADMIN_PASSWORD=ilabservice123
redis:
image: ilabservice-registry.cn-hangzhou.cr.aliyuncs.com/basic/redis:4.0
container_name: redis
expose:
- "6379"
volumes:
- /var/lib/redis:/data
command: redis-server /etc/redis.conf --appendonly yes
LAST_CHECK_TIME_KEY = 'isc:recorder:camera:{}'
from .influxdb import influxdb
from .redis import redis_connect
from .rabbitmq import rabbitmq_connect
from influxdb import InfluxDBClient
import dynaconf import dynaconf
from influxdb import InfluxDBClient
class InfluxDB(object): class InfluxDB(object):
......
from contextlib import contextmanager
from functools import wraps
import mysql.connector
from dynaconf import settings
from intelab_python_sdk.logger import log
from mysql.connector import errorcode
from retrying import retry
class MysqlClient(object):
def __init__(self, config):
self.config = dict(
host=config.get('HOST'),
port=config.get('PORT'),
user=config.get('USER'),
password=config.get('PASSWORD'),
database=config.get('DATABASE'),
auth_plugin='mysql_native_password'
)
@contextmanager
def mysql_connector(self, cursor_dict=False):
"""
构建上下文
"""
conn = None
try:
conn = mysql.connector.connect(**self.config)
if cursor_dict:
cursor = conn.cursor(
cursor_class=mysql.connector.cursor.MySQLCursorDict)
else:
cursor = conn.cursor()
yield cursor, conn
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
log.error(
"Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
log.error("Database does not exist")
else:
log.error("cannot connect to mysql. err: {}".format(err))
raise err
finally:
if conn:
conn.close()
db = MysqlClient(settings.MYSQL)
version = settings.get('version', 'v3')
def query(cursor_dict=False):
""" 执行sql 语句
对被装饰的函数参数进行修改,传出mysql的connector和cursor
默认重试5次
:param cursor_dict: 查询结果为dict
"""
def in_func(func):
@wraps(func)
@retry(stop_max_attempt_number=5, wait_random_min=100,
wait_random_max=1000)
def connect(*args, **kw):
with db.mysql_connector(cursor_dict) as (cursor, conn):
return func(cursor, conn, *args, **kw)
return connect
return in_func
@query(cursor_dict=True)
def get_camera_sn(cursor, conn):
sql = '''
select
device_number camera_sn,
biz_type,
name camera_name
from camera.video_config
'''
# where device_number = 'E57379052'
cursor.execute(sql)
return cursor.fetchall()
@query(cursor_dict=True)
def get_camera_video_url(cursor, conn, table, camera_sn, date):
sql = '''
select video_url, id video_id, status, start_date, end_date, file_name
from {}
where date = %s and device_serial = %s
'''.format(table)
cursor.execute(sql, [date, camera_sn])
return cursor.fetchall()
@query()
def set_video_data_status(cursor, conn, table, video_id):
sql = '''
update {}
set status = 0
where id = %s
'''.format(table)
cursor.execute(sql, [video_id])
conn.commit()
import dynaconf
import pika
from retrying import retry
@retry(stop_max_attempt_number=3, wait_random_min=100, wait_random_max=1000)
def rabbitmq_connect():
config = dynaconf.settings.get('RABBITMQ', {})
amqp_config = dict(user=config.get('USER'), password=config.get('PASSWORD'), host=config.get('HOST'),
port=config.get('PORT'))
if not amqp_config:
raise ConnectionError('rabbitmq配置错误')
connection = pika.BlockingConnection(
pika.URLParameters('amqp://{user}:{password}@{host}:{port}/%2F'.format(**amqp_config)))
return connection
import dynaconf
import redis
from retrying import retry
@retry(stop_max_attempt_number=3, wait_random_min=100, wait_random_max=1000)
def redis_connect():
config = dynaconf.settings.get('redis')
config = {
'host': config.get('host'),
'port': config.get('port'),
'password': config.get('password'),
'db': config.get('db'),
'decode_responses': config.get('decode_responses')
}
return redis.Redis(**config)
import pytz
import json
import time
import dateutil.parser
from datetime import datetime, timedelta
from intelab_python_sdk.logger import log
from hikvision_isc_client.db import influxdb, rabbitmq_connect, redis_connect
from hikvision_isc_client.const import LAST_CHECK_TIME_KEY
from hikvision_isc_client.pre_event import PreEvent
tz = pytz.timezone('Asia/Shanghai')
def get_camera_info():
camera_info = {
'index_code': '9e6768059bd74f6085eec605b7658e8f',
'event_type': '131331',
'device_code': 'D86639983',
'db_table': 'video_0.video_data_motion_1'
}
return [camera_info]
class EventMergerJob:
def __init__(self):
self.queue_name = 'ISC_RECORD_JOB'
self.local_service_name = 'cloud-record'
@staticmethod
def clean():
pipe = redis_connect()
for key in pipe.keys(LAST_CHECK_TIME_KEY.format('*')):
pipe.delete(key)
pipe.close()
def start(self):
while True:
try:
self.run()
except Exception as e:
log.exception(e)
time.sleep(15 * 60)
def run(self):
pipe = redis_connect()
for camera in get_camera_info():
now_std = datetime.now(pytz.timezone('Asia/Shanghai'))
camera_code = camera['device_code']
last_check_time_key = LAST_CHECK_TIME_KEY.format(camera_code)
last_check_time = pipe.get(last_check_time_key)
if not last_check_time:
# 设备无上次事件,取最近的15分钟作为开始时间
last_check_time = now_std - timedelta(minutes=15)
else:
last_check_time = dateutil.parser.parse(
last_check_time).astimezone(tz)
# 调整最大事件长度为1天
if now_std - last_check_time > timedelta(days=1):
last_check_time = now_std - timedelta(days=1)
res = pipe.set(last_check_time_key, now_std.strftime('%Y-%m-%d %H:%M:%S'))
if not res:
continue
pre_event = PreEvent(last_check_time.strftime('%Y-%m-%d %H:%M:%S'),
now_std.strftime('%Y-%m-%d %H:%M:%S'))
alarm_list = pre_event.get_alarm_list(camera['index_code'])
log.info('获取{}-{}的告警消息{}条'.format(last_check_time, now_std, len(alarm_list)))
connection = None
# if len(alarm_list) > 0:
connection = rabbitmq_connect()
channel = connection.channel()
channel.queue_declare(self.queue_name, durable=True)
# TODO 测试持续获取事件
events = [{'start_time': last_check_time, 'end_time': now_std}]
# events = PreEvent.merge_alarm_to_event(alarm_list)
for event in events:
body = {
'camera_index': camera['index_code'],
'start_time': event['start_time'].strftime('%Y-%m-%dT%H:%M:%S'),
'end_time': event['end_time'].strftime('%Y-%m-%dT%H:%M:%S'),
'event_type': '131331',
'camera_code': camera['device_code'],
'db_table': camera['db_table']
}
log.info(body)
channel.basic_publish(exchange='', routing_key=self.queue_name,
body=json.dumps(body, ensure_ascii=False))
if connection:
connection.close()
log.info('本轮移动事件视频录制任务结束.')
pipe.close()
if __name__ == '__main__':
from intelab_python_sdk.logger import log_init
log_init('event_merger', False)
em = EventMergerJob()
em.start()
import json
import os import os
import shutil import shutil
import threading
import functools
import pytz import pytz
import dynaconf import dynaconf
from datetime import timedelta from datetime import timedelta, datetime
from intelab_python_sdk.logger import log_init, log from intelab_python_sdk.logger import log_init, log
from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat
from hikvision_isc_client.db import rabbitmq_connect
from hikvision_isc_client.client import HikVisionClient from hikvision_isc_client.client import HikVisionClient
from hikvision_isc_client.utils import aliyun_oss
from hikvision_isc_client.utils.record_utils import record_thread, get_video_duration, time_to_seconds from hikvision_isc_client.utils.record_utils import record_thread, get_video_duration, time_to_seconds
tz = pytz.timezone('Asia/Shanghai') tz = pytz.timezone('Asia/Shanghai')
log_init(__name__, True, './log')
video_path = '/data/videos/isc-record'
video_path = '/data/videos/isc-record'
os.makedirs(video_path, exist_ok=True) os.makedirs(video_path, exist_ok=True)
config = dynaconf.settings.get('ISC') config = dynaconf.settings.get('ISC')
client = HikVisionClient(config.get('KEY'), config.get('SECRET'), client = HikVisionClient(config.get('KEY'), config.get('SECRET'),
config.get('HOST'), config.get('PORT')) config.get('HOST'), config.get('PORT'))
def recorder(camera_index, start_time, end_time): class StreamRecorder:
playback_urls = client.get_cameras_playback_urls(
camera_index, def __init__(self):
HikVisionClient.iso_format(start_time), log_init(__name__, False, './log')
HikVisionClient.iso_format(end_time) self.queue_name = 'ISC_RECORD_JOB'
) self.connection = rabbitmq_connect()
if len(playback_urls) > 0:
# 可以只通过一个回放流地址取到其他时间段的流 self.channel = self.connection.channel()
playback_stream = playback_urls[0]
else: def start(self):
return log.info('启动分析进程')
log.info(playback_stream) log.info('binding to queue {}'.format(self.queue_name))
part_num = 1
part_files_set = set() self.channel.queue_declare(queue=self.queue_name, durable=True)
file_name = os.path.join(video_path, 'rtmp_{}_{}.mp4'.format(
start_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S'), def ack_message(ch, delivery_tag):
end_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S') """Note that `ch` must be the same pika channel instance via which
)) the message being ACKed was retrieved (AMQP protocol constraint).
"""
while True: if ch.is_open:
complete_duration = (end_time - start_time).total_seconds() ch.basic_ack(delivery_tag)
else:
file_info, _ = stream_record(playback_stream['stream_url'], start_time, end_time) # Channel is already closed, so we can't ACK this message;
file_duration = time_to_seconds(file_info['duration']) # log and/or do something that makes sense for your app in this case.
if not os.path.isfile(file_info['file_name']): pass
continue
def do_work(conn, ch, delivery_tag, body):
if file_duration < complete_duration - 2: thread_id = threading.get_ident()
# 视频文件时长小于完整时长 log.info('Thread id: %s Delivery tag: %s Message body: %s', thread_id,
new_start_time = start_time + timedelta(seconds=file_duration) delivery_tag, body)
part_file_name = os.path.join(video_path, 'rtmp_{}_{}_{}.mp4'.format( self.process_message(body)
start_time.strftime('%Y%m%dT%H%M%S'), cb = functools.partial(ack_message, ch, delivery_tag)
new_start_time.strftime('%Y%m%dT%H%M%S'), part_num conn.add_callback_threadsafe(cb)
))
shutil.move(file_info['file_name'], part_file_name) def on_message(ch, method_frame, _header_frame, body, args):
part_files_set.add(part_file_name) (conn, thrds) = args
body = json.loads(body)
start_time = new_start_time delivery_tag = method_frame.delivery_tag
part_num += 1 t = threading.Thread(target=do_work, args=(conn, ch, delivery_tag, body))
t.start()
thrds.append(t)
threads = []
on_message_callback = functools.partial(on_message, args=(self.connection, threads))
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(on_message_callback=on_message_callback,
queue=self.queue_name)
log.info(' [*] Waiting for messages. To exit press CTRL+C')
try:
self.channel.start_consuming()
except KeyboardInterrupt:
log.info('MQ connection closed by user')
except Exception as e:
log.exception('MQ connection closed unexpectedly. %s', e)
raise
for thread in threads:
thread.join()
self.connection.close()
def process_message(self, body):
filename = self.recorder(
body['camera_index'],
datetime.strptime(body['start_time'], '%Y-%m-%dT%H:%M:%S'),
datetime.strptime(body['end_time'], '%Y-%m-%dT%H:%M:%S'))
video_info = get_video_duration(filename)
url = aliyun_oss.oss_upload_file('isc_record/' + filename.split('/')[-1], filename)
log.info('video_info: %s, url: %s', video_info, url)
return True
@staticmethod
def recorder(camera_index, start_time, end_time):
playback_urls = client.get_cameras_playback_urls(
camera_index,
HikVisionClient.iso_format(start_time),
HikVisionClient.iso_format(end_time)
)
if len(playback_urls) > 0:
# 可以只通过一个回放流地址取到其他时间段的流
playback_stream = playback_urls[0]
else: else:
part_files_set.add(file_info['file_name']) return
break log.info(playback_stream)
part_files = sorted(list(part_files_set)) part_num = 1
if len(part_files) > 1: part_files_set = set()
concat(part_files, file_name, removed=True) file_name = os.path.join(video_path, 'rtmp_{}_{}.mp4'.format(
elif len(part_files) == 1: start_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S'),
shutil.move(part_files[0], file_name) end_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S')
log.info('The download is complete, file %s', file_name) ))
while True:
def stream_record(stream, start_time, end_time): complete_duration = (end_time - start_time).total_seconds()
start_time = start_time.strftime('%Y%m%dT%H%M%S')
end_time = end_time.strftime('%Y%m%dT%H%M%S') file_info, _ = StreamRecorder.stream_record(
playback_stream['stream_url'], start_time, end_time)
if stream['protocol'] == 'rtsp': file_duration = time_to_seconds(file_info['duration'])
stream_url = stream['url'] if not os.path.isfile(file_info['file_name']):
else: continue
stream_url = '{}?beginTime={}&endTime={}&{}'.format(stream['url'],
start_time, if file_duration < complete_duration - 2:
end_time, stream['extra_args']) # 视频文件时长小于完整时长
file_name = os.path.join(video_path, 'rtmp_{}_{}.mp4'.format(start_time, end_time)) new_start_time = start_time + timedelta(seconds=file_duration)
record_thread(stream_url, file_name) part_file_name = os.path.join(video_path, 'rtmp_{}_{}_{}.mp4'.format(
return get_video_duration(file_name) start_time.strftime('%Y%m%dT%H%M%S'),
new_start_time.strftime('%Y%m%dT%H%M%S'), part_num
))
shutil.move(file_info['file_name'], part_file_name)
part_files_set.add(part_file_name)
start_time = new_start_time
part_num += 1
else:
part_files_set.add(file_info['file_name'])
break
part_files = sorted(list(part_files_set))
if len(part_files) > 1:
concat(part_files, file_name, removed=True)
elif len(part_files) == 1:
shutil.move(part_files[0], file_name)
log.info('The download is complete, file %s', file_name)
return file_name
@staticmethod
def stream_record(stream, start_time, end_time):
start_time = start_time.strftime('%Y%m%dT%H%M%S')
end_time = end_time.strftime('%Y%m%dT%H%M%S')
if stream['protocol'] == 'rtsp':
stream_url = stream['url']
else:
stream_url = '{}?beginTime={}&endTime={}&{}'.format(stream['url'],
start_time,
end_time, stream['extra_args'])
file_name = os.path.join(
video_path, 'rtmp_{}_{}.mp4'.format(start_time, end_time))
record_thread(stream_url, file_name)
return get_video_duration(file_name)
if __name__ == '__main__':
stream_recorder = StreamRecorder()
stream_recorder.start()
import os
import oss2
from functools import lru_cache
from dynaconf import settings
local_endpoint = 'https://oss-cn-{}-internal.aliyuncs.com' # 局域网配置
config = settings.get('ALI_OSS', {})
region = config.get('region', 'hangzhou')
local_endpoint = config.get('endpoint', local_endpoint.format(region))
@lru_cache()
def _get_bucket():
bucket = oss2.Bucket(oss2.Auth(config['access_key_id'], config['access_key_secret']),
local_endpoint, config['bucket_name'])
return bucket
def oss_upload_file(origin_file, local_file):
""" 上传文件
"""
# 用私有域名替换掉阿里的域名
url_base = config.get('internal', 'https://{}.oss-cn-{}.aliyuncs.com/'.format(config['bucket_name'], region))
bucket = _get_bucket()
result = bucket.put_object_from_file(origin_file, local_file)
if result.status == 200:
origin_file_url = os.path.join(url_base, origin_file)
else:
origin_file_url = ''
return origin_file_url
def oss_delete_file(origin_file):
""" 删除单个文件
"""
res = oss_batch_delete_files([origin_file])
return True if len(res) > 0 else False
def oss_batch_delete_files(files_list):
""" 批量删除云端文件
"""
bucket = _get_bucket()
bucket_files = []
for origin_file in files_list:
if origin_file.startswith('http://') or origin_file.startswith('https://'):
bucket_files.append(origin_file.split('/')[-1])
else:
bucket_files.append(origin_file)
result = bucket.batch_delete_objects(bucket_files)
return result.deleted_keys
def oss_download_file(origin_file, local_file):
"""
下载视频文件
"""
bucket = _get_bucket()
if origin_file.startswith('http://') or origin_file.startswith('https://'):
origin_file = origin_file.split('/')[-1]
bucket.get_object_to_file(origin_file, local_file)
return local_file
if __name__ == '__main__':
# print(oss_upload_file('test-2.mp4', '/home/wen/Videos/3_C90842327_2020_06_04_13_37_14.mp4'))
# print(oss_download_file('https://test-qzwjtest.oss-cn-hangzhou.aliyuncs.com/test-2.mp4', 't.mp4'))
# print(oss_delete_file('https://test-qzwjtest.oss-cn-hangzhou.aliyuncs.com/test-2.mp4'))
oss_download_file('D00268229_2020-10-23_14-07-13.mp4', '/tmp/v3/videos/D00268229_2020-10-23_14-07-13.mp4')
...@@ -26,7 +26,11 @@ setuptools.setup( ...@@ -26,7 +26,11 @@ setuptools.setup(
'flask', 'flask',
'dynaconf', 'dynaconf',
'influxdb', 'influxdb',
'python-dateutil' 'python-dateutil',
'pika==1.1.0',
'redis',
'mysql-connector',
'retrying',
], ],
python_requires='>=3.6', python_requires='>=3.6',
) )
...@@ -5,7 +5,7 @@ from intelab_python_sdk.logger import log_init ...@@ -5,7 +5,7 @@ from intelab_python_sdk.logger import log_init
from dynaconf import settings from dynaconf import settings
from hikvision_isc_client.client import HikVisionClient from hikvision_isc_client.client import HikVisionClient
from hikvision_isc_client.recorder import recorder from hikvision_isc_client.recorder import StreamRecorder
tz = pytz.timezone('Asia/Shanghai') tz = pytz.timezone('Asia/Shanghai')
...@@ -23,7 +23,7 @@ def main(): ...@@ -23,7 +23,7 @@ def main():
# rtmp_20210426T003128_20210426T003247.mp4 # rtmp_20210426T003128_20210426T003247.mp4
start_time = datetime(2021, 4, 26, 8, 31, 28) start_time = datetime(2021, 4, 26, 8, 31, 28)
end_time = datetime(2021, 4, 26, 8, 32, 47) end_time = datetime(2021, 4, 26, 8, 32, 47)
recorder('f8a3c4d9b8ae42118b4db9fcf7895031', start_time, end_time) StreamRecorder.recorder('f8a3c4d9b8ae42118b4db9fcf7895031', start_time, end_time)
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论