提交 f0c25180 authored 作者: zw.wang's avatar zw.wang

feat: 获取设想信息和测试脚本编写

上级 df902592
......@@ -28,7 +28,8 @@ def event_rcv():
'fields': {
'event_id': event['eventId'],
'camera_name': event['srcName'],
'paren_index': event['srcParentIndex']
'paren_index': event['srcParentIndex'],
'value': event['time']
},
'time': dateutil.parser.parse(event['happenTime'])
})
......
......@@ -83,18 +83,33 @@ class HikVisionClient(object):
uri = "/artemis/api/video/v1/cameras/previewURLs"
return self._request(uri, body)
def get_cameras(self, page_size=100, page_no=1):
def get_encode_device(self, page_size=100, page_no=1):
""" 根据条件查询目录下有权限的编码设备列表
"""
分页获取摄像头信息
uri = '/artemis/api/resource/v2/encodeDevice/search'
body = {
"pageSize": page_size,
"pageNo": page_no
}
return self._request(uri, body)
def get_cameras(self, page_size=100, page_no=1, expressions=None):
""" 分页获取摄像头信息
:param page_size:
:param page_no:
:param expressions:
文档详见
https://open.hikvision.com/docs/aec7933a37f34aedb03853ce289e74b7#ef459f4b
:return:
"""
body = {
"pageSize": page_size,
"pageNo": page_no
}
uri = '/artemis/api/resource/v1/cameras'
if expressions:
body['expressions'] = expressions
uri = '/artemis/api/resource/v2/camera/search'
result = self._request(uri, body)
camera_list = []
if result:
......@@ -102,8 +117,7 @@ class HikVisionClient(object):
return camera_list
def get_cameras_playback_urls(self, camera_index_code, begin_time, end_time, protocol='rtmp'):
"""
回放取流
""" 回放取流
"""
streamform = 'ps' if protocol == 'rtmp' else 'rtp'
uri = '/artemis/api/video/v2/cameras/playbackURLs'
......@@ -161,3 +175,14 @@ class HikVisionClient(object):
}
res = self._request(uri, body)
return res
def get_camera_online(self, camera_index_code):
"""
获取监控点在线状态(貌似无用)
"""
uri = '/artemis/api/nms/v1/online/camera/get'
body = {
'indexCodes': [camera_index_code]
}
res = self._request(uri, body)
return res
from dynaconf import settings
from hikvision_isc_client.client import HikVisionClient
config = settings.get('ISC')
client = HikVisionClient(config.get('KEY'), config.get('SECRET'),
config.get('HOST'), config.get('PORT'))
devices = client.get_encode_device()
for device in devices.get('list'):
camera_info = client.get_cameras(expressions=[{
'key': 'parentIndexCode',
'operator': 0,
'values': [device['indexCode']]
}])
camera_code = camera_info[0]['indexCode']
print('摄像头序列号{}的编码设备Index为{}'.format(
device['deviceCode'], camera_code))
print('-------------')
print(client.get_camera_online(camera_code))
print('直播预览接口为', client.get_camera_preview_url(camera_code))
......@@ -22,17 +22,28 @@ client = HikVisionClient(config.get('KEY'), config.get('SECRET'),
def main():
start_time = datetime(2021, 4, 23, 16, 10)
end_time = datetime(2021, 4, 23, 16, 15)
start_time = datetime(2021, 4, 25, 11, 13)
end_time = datetime(2021, 4, 25, 11, 15)
playback_urls = client.get_cameras_playback_urls(
'f8a3c4d9b8ae42118b4db9fcf7895031',
HikVisionClient.iso_format(start_time),
HikVisionClient.iso_format(end_time),
)
for playback_stream in playback_urls:
if len(playback_urls) > 0:
# 可以只通过一个回放流地址取到其他时间段的流
playback_stream = playback_urls[0]
else:
return
log.info(playback_stream)
part_num = 1
part_files_set = set()
file_name = 'rtmp_{}_{}.mp4'.format(
start_time.strftime('%Y%m%dT%H%M%S'),
end_time.strftime('%Y%m%dT%H%M%S')
)
# 断点续录
while True:
complete_duration = (end_time - start_time).total_seconds()
......@@ -50,13 +61,18 @@ def main():
part_num)
shutil.move(file_info['file_name'], part_file_name)
part_files_set.add(part_file_name)
start_time = new_start_time
part_num += 1
else:
part_files_set.add(file_info['file_name'])
break
if len(part_files_set) > 1:
concat(sorted(list(part_files_set)), file_info)
part_files = sorted(list(part_files_set))
if len(part_files) > 1:
concat(part_files, file_name, removed=True)
elif len(part_files) == 1:
shutil.move(part_files[0], file_name)
log.info('视频文件%s录制完成!', file_name)
def stream_record(stream, start_time, end_time):
......
......@@ -12,6 +12,6 @@ client = HikVisionClient(config.get('KEY'), config.get('SECRET'),
#
for camera in client.get_cameras():
print(camera)
url = client.get_camera_preview_url(camera['cameraIndexCode'], protocol='rtsp', stream_type=1)
print('摄像头{}的直播地址为{}'.format(camera['cameraIndexCode'], url.get('url')))
url = client.get_camera_preview_url(camera['indexCode'], protocol='rtsp', stream_type=1)
print('摄像头{}的直播地址为{}'.format(camera['indexCode'], url.get('url')))
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论