提交 4008fa7e authored 作者: zw.wang's avatar zw.wang

feat: [client]

上级
流水线 #248 已失败 于阶段
.DS_Store
*.pyc
*.pyo
env/
env*
dist/
build/
*.eggs
*.egg-info/
_mailinglist
.tox/
.cache/
.pytest_cache/
.idea/
docs/_build/
.python-version
**/*.local*
**/*.secrets.toml
**/log/
*.mp4
__pycache__
# HIKVISION iSC Python SDK
This is the Python SDK to interact with Hikvision iSC platform using Hikvision's open API.
import json
import hashlib
import hmac
import base64
import requests
import dateutil.parser
from requests.packages import urllib3
from email.utils import formatdate
from datetime import datetime, timedelta
from time import mktime
class HikVisionClient(object):
def __init__(self, app_key, app_secret, host, port, https=True):
self.app_key = app_key
self.app_secret = app_secret
self.host = host
self.port = port
self.https = https
self.url = '{}://{}:{}'.format('https' if https else 'http', self.host, self.port)
def _sign(self, uri, body, method='POST'):
content_md5 = hashlib.md5()
content_md5.update(json.dumps(body).encode('utf-8'))
date = formatdate(timeval=mktime(datetime.now().timetuple()), localtime=False, usegmt=True)
string_to_be_signed = '{method}\n*/*\napplication/json\n{date}\nx-ca-key:{app_key}\n{uri}'
signature = str(base64.b64encode(hmac.new(
self.app_secret.encode('utf-8'),
string_to_be_signed.format(method=method, date=date, app_key=self.app_key, uri=uri).encode('utf-8'),
digestmod=hashlib.sha256).digest()), 'utf-8')
return {
"Accept": "*/*",
"Content-Type": "application/json",
"Date": date,
"X-Ca-Key": self.app_key,
"X-Ca-Signature": signature,
"X-Ca-Signature-Headers": "X-Ca-Key"
}
def _request(self, uri, body):
url = self.url + uri
urllib3.disable_warnings()
response = requests.post(url, headers=self._sign(uri, body, 'POST'),
json=body,
verify=False)
data = {}
if response.status_code == 200:
res_json = response.json()
if res_json.get('code', '0x000000') == '0':
data = response.json()['data']
else:
print(res_json['msg'])
else:
print('error: ', response.text)
return data
def get_camera_preview_url(self, camera_index_code,
protocol='rtsp', stream_type=0):
"""
:param camera_index_code:
:param protocol:
:param stream_type: 流类型,0 主码流; 1 子码流
:return:
"""
streamform = 'rtp' if protocol == 'rtsp' else 'ps'
body = {
'cameraIndexCode': camera_index_code,
'protocol': protocol,
'streamType': stream_type,
'transmode': 0,
'expand': 'streamform={}'.format(streamform)
}
uri = "/artemis/api/video/v1/cameras/previewURLs"
return self._request(uri, body)
def get_cameras(self, page_size=100, page_no=1):
"""
分页获取摄像头信息
:param page_size:
:param page_no:
:return:
"""
body = {
"pageSize": page_size,
"pageNo": page_no
}
uri = '/artemis/api/resource/v1/cameras'
result = self._request(uri, body)
camera_list = []
if result:
camera_list = result['list']
return camera_list
def get_cameras_playback_urls(self, camera_index_code, begin_time, end_time, protocol='rtmp'):
"""
回放取流
"""
streamform = 'ps' if protocol == 'rtmp' else 'rtp'
uri = '/artemis/api/video/v2/cameras/playbackURLs'
body = {
'cameraIndexCode': camera_index_code,
'beginTime': begin_time,
'endTime': end_time,
'recordLocation': 1,
'protocol': protocol,
'expand': 'transcode=1&systemformat={}&fileSize=1024'.format(streamform),
'streamform': streamform
}
print('回放取流请求参数:', body)
expired_time = datetime.now() + timedelta(minutes=5)
res = self._request(uri, body)
results = []
for f in res.get('list') or []:
start_time = dateutil.parser.parse(f['beginTime'])
end_time = dateutil.parser.parse(f['endTime'])
results.append({
'start_time': start_time,
'end_time': end_time,
'expired_time': expired_time,
'stream_url': {
'url': res.get('url'),
'extra_args': 'playBackMode=1',
'protocol': protocol
}
})
return results
import re
import subprocess
from intelab_python_sdk.logger import log
class RecordThread:
def __init__(self, stream):
self.stream = stream
self.shell = [
'ffmpeg',
'-y',
'-v', 'info',
'-i', '-',
'-c', 'copy',
'-f', 'mp4'
]
def run(self, stream_url, out_file):
log_buffer = ''
cmd = self.shell + [out_file]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
out, err = process.communicate(stream_url)
return_code = process.poll()
while return_code is None:
line = process.stdout.readline(81)
log_buffer += line.decode()
# 换行打印日志
if r'\n' in str(line) or r'\r' in str(line):
# 处理日志中\r的超过81字符的日志长度
log_buffer = log_buffer.split('\r')
if len(log_buffer) > 1:
log_buffer, stdout = '\n'.join(log_buffer[:-1]), log_buffer[-1]
else:
log_buffer = ''.join(log_buffer)
stdout = ''
if 'Non-monotonous DTS' in log_buffer:
# 屏蔽日志中'Non-monotonous DTS in output stream:' 的告警信息
log_buffer = ''
log.debug('log: %s', log_buffer)
log_buffer = stdout
return_code = process.poll()
return out_file
def record_thread(stream_url, out_file):
cmd = [
'ffmpeg',
'-y',
'-rw_timeout', '20000000',
'-v', 'info',
'-i', stream_url,
'-c', 'copy',
'-f', 'mp4',
out_file
]
log_buffer = ''
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=None)
return_code = process.poll()
while return_code is None:
line = process.stdout.readline(81)
log_buffer += line.decode()
# 换行打印日志
if r'\n' in str(line) or r'\r' in str(line):
# 处理日志中\r的超过81字符的日志长度
log_buffer = log_buffer.split('\r')
if len(log_buffer) > 1:
log_buffer, stdout = '\n'.join(log_buffer[:-1]), log_buffer[-1]
else:
log_buffer = ''.join(log_buffer)
stdout = ''
if 'Non-monotonous DTS' in log_buffer:
# 屏蔽日志中'Non-monotonous DTS in output stream:' 的告警信息
log_buffer = ''
log.debug('ffmpeg-log: %s', log_buffer)
log_buffer = stdout
return_code = process.poll()
return out_file
def get_video_duration(file_name):
"""
获取视频文件的持续时间、分辨率等信息
"""
error_log = ''
duration = ['00:00:00']
bitrate = ['0']
resolution = ['0x0']
media_type = 'VideoHandler'
log_buffer = subprocess.getoutput('ffmpeg -i {}'.format(file_name))
if 'moov atom not found' in log_buffer:
error_log += log_buffer
duration = re.findall(r'Duration: (\d\d:\d\d:\d\d)\.\d\d',
log_buffer) or duration
bitrate = re.findall(r'bitrate: (\d+) kb/s',
log_buffer) or bitrate
resolution = re.findall(r'(\d{3,4}x\d{3,4})', log_buffer) or resolution
if 'SoundHandler' in log_buffer:
media_type += '+SoundHandler'
video_info = {
'duration': duration[0],
'bitrate': bitrate[0],
'resolution': resolution[0],
'media_type': media_type,
'file_name': file_name
}
return video_info, error_log
def time_to_seconds(duration):
"""
时间字符串转换成秒
:param duration: 时间格式串, 例如"01:46:00"
:return seconds: int
"""
hours, minutes, seconds = duration.split(':')
seconds = int(seconds)
if minutes > '00':
seconds += int(minutes) * 60
if hours > '00':
seconds += int(hours) * 3600
return seconds
def get_time_str(seconds):
"""
秒转换成时间字符串
:param seconds: 秒
:return duration: str '01:46:00'
"""
if seconds >= 3600:
s = str('%02d' % (seconds // 3600))
seconds = seconds % 3600
else:
s = '00'
if seconds > 60:
s += str(':%02d' % (seconds // 60))
seconds = seconds % 60
else:
s += ':00'
s += str(':%02d' % seconds)
return s
import setuptools
with open('README.md', 'r') as fh:
long_description = fh.read()
setuptools.setup(
name='hikvision-isc-client',
version='0.0.1',
author='',
author_email='',
description='Client to interact with Hikvision iSC platform using open API',
long_description=long_description,
long_description_content_type='text/markdown',
url='',
packages=setuptools.find_packages(),
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
],
install_requires=[
'requests',
'python-dateutil',
'pytz',
'intelab-python-sdk'
],
python_requires='>=3.6',
)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论