2025年3月24日 星期一 甲辰(龙)年 月廿三 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Python

python --阿里云(智能媒体管理/视频点播)

时间:08-17来源:作者:点击数:39

本文介绍了如何在阿里云平台上使用智能媒体服务获取token以及进行视频点播操作,包括创建客户端、获取和刷新凭证,以及视频上传、播放信息获取和管理等。

智能媒体服务获取token

  • # alibabacloud_imm20200930==4.1.0
  • class Sample(object):
  • '''智能媒体服务'''
  • def __init__(self):
  • self.access_key = '111'
  • self.key_secret = '222'
  • def weboffice_permission(self):
  • return imm_20200930_models.WebofficePermission(
  • rename=False,
  • readonly=True,
  • history=False,
  • print=False,
  • export=False,
  • copy=False
  • )
  • def create_client(self) -> imm20200930Client:
  • """使用AK&SK初始化账号Client"""
  • config = open_api_models.Config(access_key_id=self.access_key, access_key_secret=self.key_secret)
  • config.endpoint = f'imm.cn-beijing.aliyuncs.com'
  • return imm20200930Client(config)
  • def get_token(self, filename):
  • '''获取凭证'''
  • client = self.create_client()
  • generate_weboffice_token_request = imm_20200930_models.GenerateWebofficeTokenRequest(
  • project_name='edg',
  • source_uri=f'oss://fileedg/{filename}',
  • permission=self.weboffice_permission()
  • )
  • runtime = util_models.RuntimeOptions()
  • try:
  • response = client.generate_weboffice_token_with_options(generate_weboffice_token_request, runtime).body
  • logger.debug(f'获取凭证:【{response}】')
  • response = dict({'code': 0, 'msg': 'success'}, **eval(str(response)))
  • except Exception as error:
  • logger.error(f'{error}')
  • response = {'code': 500, 'msg': f'{error}'}
  • return response
  • def refresh_token(self, access_token, refresh_token):
  • '''刷新token'''
  • client = self.create_client()
  • refresh_weboffice_token_request = imm_20200930_models.RefreshWebofficeTokenRequest(
  • project_name='edg',
  • access_token=access_token,
  • refresh_token=refresh_token
  • )
  • runtime = util_models.RuntimeOptions()
  • try:
  • response = not client.refresh_weboffice_token_with_options(refresh_weboffice_token_request, runtime).body
  • logger.debug(f'刷新凭证:【{response}】')
  • response = dict({'code': 0, 'msg': 'success'}, **eval(str(response)))
  • except Exception as error:
  • logger.error(f'{error}')
  • response = {'code': 500, 'msg': f'{error}'}
  • return response

视频点播

获取凭证

  • # https://help.aliyun.com/zh/vod/developer-reference/api-vod-2017-03-21-refreshuploadvideo?spm=a2c4g.11186623.0.0.5b4e3c94BxaDxH
  • # alibabacloud_vod20170321==2.20.1
  • from alibabacloud_vod20170321.client import Client as vod20170321Client
  • from alibabacloud_tea_openapi import models as open_api_models
  • from alibabacloud_tea_util import models as util_models
  • class VideoDianBo(object):
  • '''视频点播'''
  • def __init__(self):
  • self.access_key = '111'
  • self.key_secret = '222'
  • def create_client(self) -> vod20170321Client:
  • '''初始化对象'''
  • config = open_api_models.Config(access_key_id=self.access_key, access_key_secret=self.key_secret)
  • config.endpoint = f'vod.cn-shanghai.aliyuncs.com'
  • return vod20170321Client(config)
  • def createuploadvideo(self, filename: str, title: str):
  • '''
  • 获取音视频上传地址和凭证
  • @params filename --> 文件名;
  • @params title --> 标题;
  • '''
  • client = self.create_client()
  • create_upload_video_request = vod_20170321_models.CreateUploadVideoRequest(file_name=filename, title=title)
  • runtime = util_models.RuntimeOptions()
  • try:
  • result = client.create_upload_video_with_options(create_upload_video_request, runtime).body.to_map()
  • logger.debug(f'视频点播获取上传凭证:【{result}】')
  • return result
  • except Exception as error:
  • return {'error': error}
  • def refreshuploadvideo(self, video_id):
  • '''
  • 刷新音视频上传地址和凭证
  • @params video_id --> 视频id;
  • '''
  • client = self.create_client()
  • refresh_upload_video_request = vod_20170321_models.RefreshUploadVideoRequest(video_id=video_id)
  • runtime = util_models.RuntimeOptions()
  • try:
  • result = client.refresh_upload_video_with_options(refresh_upload_video_request, runtime).body.to_map()
  • logger.debug(f'视频点播刷新上传凭证:【{result}】')
  • return result
  • except Exception as error:
  • return {'error': error}
  • def play_info(self, video_id):
  • '''根据视频id获取播放链接'''
  • client = self.create_client()
  • get_play_info_request = vod_20170321_models.GetPlayInfoRequest(video_id=video_id)
  • runtime = util_models.RuntimeOptions()
  • try:
  • result = client.get_play_info_with_options(get_play_info_request, runtime).body.to_map()
  • logger.debug(f'视频点播获取播放链接:【{result}】')
  • return result
  • except Exception as error:
  • return {'error': error}
  • def delete_video(self, video_ids):
  • '''
  • 删除点播中完整视频(获取凭证后会产生记录)
  • @params video_ids --> 视频id;
  • '''
  • client = self.create_client()
  • delete_video_request = vod_20170321_models.DeleteVideoRequest(video_ids=video_ids)
  • runtime = util_models.RuntimeOptions()
  • try:
  • result = client.delete_video_with_options(delete_video_request, runtime).body.to_map()
  • logger.debug(f'删除点播视频:【{result}】')
  • return result
  • except Exception as error:
  • return {'error': error}
  • def get_video_infos(self, video_ids: str):
  • '''
  • 批量查询视频信息
  • @params video_ids --> 视频id;多个id用英文逗号分割;
  • '''
  • client = self.create_client()
  • get_video_infos_request = vod_20170321_models.GetVideoInfosRequest(video_ids=video_ids)
  • runtime = util_models.RuntimeOptions()
  • try:
  • result = client.get_video_infos_with_options(get_video_infos_request, runtime).body.to_map()
  • logger.debug(f'获取视频信息:【{result}】')
  • return result
  • except Exception as error:
  • return {'error': error}

服务端分片上传

  • # 测试上传本地音视频
  • # voduploadsdk==1.0.2
  • def testUploadLocalVideo(accessKeyId, accessKeySecret, filePath, storageLocation=None):
  • try:
  • # 可以指定上传脚本部署的ECS区域。如果ECS区域和视频点播存储区域相同,则自动使用内网上传,上传更快且更省公网流量。
  • ecsRegionId ="cn-beijing"
  • uploader = AliyunVodUploader(accessKeyId, accessKeySecret, ecsRegionId)
  • uploadVideoRequest = UploadVideoRequest(filePath, 'exampleTitle')
  • if storageLocation:
  • uploadVideoRequest.setStorageLocation(storageLocation)
  • videoId = uploader.uploadLocalVideo(uploadVideoRequest)
  • print("file: %s, videoId: %s" % (uploadVideoRequest.filePath, videoId))
  • except AliyunVodException as e:
  • print(e)
  • accessKeyId = '111'
  • accessKeySecret = '22'
  • localFilePath = r'D:\Users\11.MP4'
  • testUploadLocalVideo(accessKeyId, accessKeySecret, localFilePath)
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门
本栏推荐