本文介绍了如何在阿里云平台上使用智能媒体服务获取token以及进行视频点播操作,包括创建客户端、获取和刷新凭证,以及视频上传、播放信息获取和管理等。
# alibabacloud_imm20200930==4.1.0
class Sample(object):
'''智能媒体服务'''
def __init__(self):
self.access_key = '111'
self.key_secret = '222'
def weboffice_permission(self):
return imm_20200930_models.WebofficePermission(
rename=False,
readonly=True,
history=False,
print=False,
export=False,
copy=False
)
def create_client(self) -> imm20200930Client:
"""使用AK&SK初始化账号Client"""
config = open_api_models.Config(access_key_id=self.access_key, access_key_secret=self.key_secret)
config.endpoint = f'imm.cn-beijing.aliyuncs.com'
return imm20200930Client(config)
def get_token(self, filename):
'''获取凭证'''
client = self.create_client()
generate_weboffice_token_request = imm_20200930_models.GenerateWebofficeTokenRequest(
project_name='edg',
source_uri=f'oss://fileedg/{filename}',
permission=self.weboffice_permission()
)
runtime = util_models.RuntimeOptions()
try:
response = client.generate_weboffice_token_with_options(generate_weboffice_token_request, runtime).body
logger.debug(f'获取凭证:【{response}】')
response = dict({'code': 0, 'msg': 'success'}, **eval(str(response)))
except Exception as error:
logger.error(f'{error}')
response = {'code': 500, 'msg': f'{error}'}
return response
def refresh_token(self, access_token, refresh_token):
'''刷新token'''
client = self.create_client()
refresh_weboffice_token_request = imm_20200930_models.RefreshWebofficeTokenRequest(
project_name='edg',
access_token=access_token,
refresh_token=refresh_token
)
runtime = util_models.RuntimeOptions()
try:
response = not client.refresh_weboffice_token_with_options(refresh_weboffice_token_request, runtime).body
logger.debug(f'刷新凭证:【{response}】')
response = dict({'code': 0, 'msg': 'success'}, **eval(str(response)))
except Exception as error:
logger.error(f'{error}')
response = {'code': 500, 'msg': f'{error}'}
return response
获取凭证
# https://help.aliyun.com/zh/vod/developer-reference/api-vod-2017-03-21-refreshuploadvideo?spm=a2c4g.11186623.0.0.5b4e3c94BxaDxH
# alibabacloud_vod20170321==2.20.1
from alibabacloud_vod20170321.client import Client as vod20170321Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
class VideoDianBo(object):
'''视频点播'''
def __init__(self):
self.access_key = '111'
self.key_secret = '222'
def create_client(self) -> vod20170321Client:
'''初始化对象'''
config = open_api_models.Config(access_key_id=self.access_key, access_key_secret=self.key_secret)
config.endpoint = f'vod.cn-shanghai.aliyuncs.com'
return vod20170321Client(config)
def createuploadvideo(self, filename: str, title: str):
'''
获取音视频上传地址和凭证
@params filename --> 文件名;
@params title --> 标题;
'''
client = self.create_client()
create_upload_video_request = vod_20170321_models.CreateUploadVideoRequest(file_name=filename, title=title)
runtime = util_models.RuntimeOptions()
try:
result = client.create_upload_video_with_options(create_upload_video_request, runtime).body.to_map()
logger.debug(f'视频点播获取上传凭证:【{result}】')
return result
except Exception as error:
return {'error': error}
def refreshuploadvideo(self, video_id):
'''
刷新音视频上传地址和凭证
@params video_id --> 视频id;
'''
client = self.create_client()
refresh_upload_video_request = vod_20170321_models.RefreshUploadVideoRequest(video_id=video_id)
runtime = util_models.RuntimeOptions()
try:
result = client.refresh_upload_video_with_options(refresh_upload_video_request, runtime).body.to_map()
logger.debug(f'视频点播刷新上传凭证:【{result}】')
return result
except Exception as error:
return {'error': error}
def play_info(self, video_id):
'''根据视频id获取播放链接'''
client = self.create_client()
get_play_info_request = vod_20170321_models.GetPlayInfoRequest(video_id=video_id)
runtime = util_models.RuntimeOptions()
try:
result = client.get_play_info_with_options(get_play_info_request, runtime).body.to_map()
logger.debug(f'视频点播获取播放链接:【{result}】')
return result
except Exception as error:
return {'error': error}
def delete_video(self, video_ids):
'''
删除点播中完整视频(获取凭证后会产生记录)
@params video_ids --> 视频id;
'''
client = self.create_client()
delete_video_request = vod_20170321_models.DeleteVideoRequest(video_ids=video_ids)
runtime = util_models.RuntimeOptions()
try:
result = client.delete_video_with_options(delete_video_request, runtime).body.to_map()
logger.debug(f'删除点播视频:【{result}】')
return result
except Exception as error:
return {'error': error}
def get_video_infos(self, video_ids: str):
'''
批量查询视频信息
@params video_ids --> 视频id;多个id用英文逗号分割;
'''
client = self.create_client()
get_video_infos_request = vod_20170321_models.GetVideoInfosRequest(video_ids=video_ids)
runtime = util_models.RuntimeOptions()
try:
result = client.get_video_infos_with_options(get_video_infos_request, runtime).body.to_map()
logger.debug(f'获取视频信息:【{result}】')
return result
except Exception as error:
return {'error': error}
服务端分片上传
# 测试上传本地音视频
# voduploadsdk==1.0.2
def testUploadLocalVideo(accessKeyId, accessKeySecret, filePath, storageLocation=None):
try:
# 可以指定上传脚本部署的ECS区域。如果ECS区域和视频点播存储区域相同,则自动使用内网上传,上传更快且更省公网流量。
ecsRegionId ="cn-beijing"
uploader = AliyunVodUploader(accessKeyId, accessKeySecret, ecsRegionId)
uploadVideoRequest = UploadVideoRequest(filePath, 'exampleTitle')
if storageLocation:
uploadVideoRequest.setStorageLocation(storageLocation)
videoId = uploader.uploadLocalVideo(uploadVideoRequest)
print("file: %s, videoId: %s" % (uploadVideoRequest.filePath, videoId))
except AliyunVodException as e:
print(e)
accessKeyId = '111'
accessKeySecret = '22'
localFilePath = r'D:\Users\11.MP4'
testUploadLocalVideo(accessKeyId, accessKeySecret, localFilePath)