本文介绍了如何在阿里云平台上使用智能媒体服务获取token以及进行视频点播操作,包括创建客户端、获取和刷新凭证,以及视频上传、播放信息获取和管理等。
- # alibabacloud_imm20200930==4.1.0
- class Sample(object):
- '''智能媒体服务'''
-
- def __init__(self):
- self.access_key = '111'
- self.key_secret = '222'
-
- def weboffice_permission(self):
- return imm_20200930_models.WebofficePermission(
- rename=False,
- readonly=True,
- history=False,
- print=False,
- export=False,
- copy=False
- )
-
- def create_client(self) -> imm20200930Client:
- """使用AK&SK初始化账号Client"""
- config = open_api_models.Config(access_key_id=self.access_key, access_key_secret=self.key_secret)
- config.endpoint = f'imm.cn-beijing.aliyuncs.com'
- return imm20200930Client(config)
-
- def get_token(self, filename):
- '''获取凭证'''
- client = self.create_client()
- generate_weboffice_token_request = imm_20200930_models.GenerateWebofficeTokenRequest(
- project_name='edg',
- source_uri=f'oss://fileedg/{filename}',
- permission=self.weboffice_permission()
- )
- runtime = util_models.RuntimeOptions()
- try:
- response = client.generate_weboffice_token_with_options(generate_weboffice_token_request, runtime).body
- logger.debug(f'获取凭证:【{response}】')
- response = dict({'code': 0, 'msg': 'success'}, **eval(str(response)))
- except Exception as error:
- logger.error(f'{error}')
- response = {'code': 500, 'msg': f'{error}'}
- return response
-
- def refresh_token(self, access_token, refresh_token):
- '''刷新token'''
- client = self.create_client()
- refresh_weboffice_token_request = imm_20200930_models.RefreshWebofficeTokenRequest(
- project_name='edg',
- access_token=access_token,
- refresh_token=refresh_token
- )
- runtime = util_models.RuntimeOptions()
- try:
- response = not client.refresh_weboffice_token_with_options(refresh_weboffice_token_request, runtime).body
- logger.debug(f'刷新凭证:【{response}】')
- response = dict({'code': 0, 'msg': 'success'}, **eval(str(response)))
- except Exception as error:
- logger.error(f'{error}')
- response = {'code': 500, 'msg': f'{error}'}
- return response
-
获取凭证
- # https://help.aliyun.com/zh/vod/developer-reference/api-vod-2017-03-21-refreshuploadvideo?spm=a2c4g.11186623.0.0.5b4e3c94BxaDxH
-
- # alibabacloud_vod20170321==2.20.1
-
- from alibabacloud_vod20170321.client import Client as vod20170321Client
- from alibabacloud_tea_openapi import models as open_api_models
- from alibabacloud_tea_util import models as util_models
-
- class VideoDianBo(object):
- '''视频点播'''
- def __init__(self):
- self.access_key = '111'
- self.key_secret = '222'
-
- def create_client(self) -> vod20170321Client:
- '''初始化对象'''
- config = open_api_models.Config(access_key_id=self.access_key, access_key_secret=self.key_secret)
- config.endpoint = f'vod.cn-shanghai.aliyuncs.com'
- return vod20170321Client(config)
-
- def createuploadvideo(self, filename: str, title: str):
- '''
- 获取音视频上传地址和凭证
- @params filename --> 文件名;
- @params title --> 标题;
- '''
- client = self.create_client()
- create_upload_video_request = vod_20170321_models.CreateUploadVideoRequest(file_name=filename, title=title)
- runtime = util_models.RuntimeOptions()
- try:
- result = client.create_upload_video_with_options(create_upload_video_request, runtime).body.to_map()
- logger.debug(f'视频点播获取上传凭证:【{result}】')
- return result
- except Exception as error:
- return {'error': error}
-
- def refreshuploadvideo(self, video_id):
- '''
- 刷新音视频上传地址和凭证
- @params video_id --> 视频id;
- '''
- client = self.create_client()
- refresh_upload_video_request = vod_20170321_models.RefreshUploadVideoRequest(video_id=video_id)
- runtime = util_models.RuntimeOptions()
- try:
- result = client.refresh_upload_video_with_options(refresh_upload_video_request, runtime).body.to_map()
- logger.debug(f'视频点播刷新上传凭证:【{result}】')
- return result
- except Exception as error:
- return {'error': error}
-
- def play_info(self, video_id):
- '''根据视频id获取播放链接'''
- client = self.create_client()
- get_play_info_request = vod_20170321_models.GetPlayInfoRequest(video_id=video_id)
- runtime = util_models.RuntimeOptions()
- try:
- result = client.get_play_info_with_options(get_play_info_request, runtime).body.to_map()
- logger.debug(f'视频点播获取播放链接:【{result}】')
- return result
- except Exception as error:
- return {'error': error}
- def delete_video(self, video_ids):
- '''
- 删除点播中完整视频(获取凭证后会产生记录)
- @params video_ids --> 视频id;
- '''
- client = self.create_client()
- delete_video_request = vod_20170321_models.DeleteVideoRequest(video_ids=video_ids)
- runtime = util_models.RuntimeOptions()
- try:
- result = client.delete_video_with_options(delete_video_request, runtime).body.to_map()
- logger.debug(f'删除点播视频:【{result}】')
- return result
- except Exception as error:
- return {'error': error}
-
- def get_video_infos(self, video_ids: str):
- '''
- 批量查询视频信息
- @params video_ids --> 视频id;多个id用英文逗号分割;
- '''
- client = self.create_client()
- get_video_infos_request = vod_20170321_models.GetVideoInfosRequest(video_ids=video_ids)
- runtime = util_models.RuntimeOptions()
- try:
- result = client.get_video_infos_with_options(get_video_infos_request, runtime).body.to_map()
- logger.debug(f'获取视频信息:【{result}】')
- return result
- except Exception as error:
- return {'error': error}
-
服务端分片上传
- # 测试上传本地音视频
- # voduploadsdk==1.0.2
- def testUploadLocalVideo(accessKeyId, accessKeySecret, filePath, storageLocation=None):
- try:
- # 可以指定上传脚本部署的ECS区域。如果ECS区域和视频点播存储区域相同,则自动使用内网上传,上传更快且更省公网流量。
- ecsRegionId ="cn-beijing"
- uploader = AliyunVodUploader(accessKeyId, accessKeySecret, ecsRegionId)
- uploadVideoRequest = UploadVideoRequest(filePath, 'exampleTitle')
-
- if storageLocation:
- uploadVideoRequest.setStorageLocation(storageLocation)
- videoId = uploader.uploadLocalVideo(uploadVideoRequest)
- print("file: %s, videoId: %s" % (uploadVideoRequest.filePath, videoId))
-
- except AliyunVodException as e:
- print(e)
- accessKeyId = '111'
- accessKeySecret = '22'
-
- localFilePath = r'D:\Users\11.MP4'
- testUploadLocalVideo(accessKeyId, accessKeySecret, localFilePath)
-