2025年3月15日 星期六 甲辰(龙)年 月十四 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Python

Python操作AD域服务器进行组织和用户的查询和添加

时间:12-06来源:作者:点击数:16

由于工作中有时候会遇到需要对AD域服务器进行批量添加用户和组织的操作,平时都是通过bat批处理对csv文件中的用户和组织进行操作添加,但是操作起来还是略麻烦,就想自己动手用Python代码写个更好操作的方式,随便百度了下,还真的有相关的库——ldap3,先写点demo,后面再完善下吧。

基本操作方法:

  • from ldap3 import Server, Connection, ALL, NTLM
  • # 连接
  • server = Server('192.168.214.93', get_info=ALL)
  • conn = Connection(server, user='TEST\\administrator', password='Winhong1234@#test', auto_bind=True, authentication=NTLM)
  • print(server.info)
  • # 查询
  • res = conn.search('dc=test,dc=csc,dc=com', '(objectclass=user)', attributes=['objectclass'])
  • print(conn.result) # 查询失败的原因
  • print(conn.entries) # 查询到的数据
  • # 增加组织
  • res = conn.add('OU=python,OU=cibuser,DC=test,DC=csc,DC=com', object_class='OrganizationalUnit')
  • if res:
  • print('增加组织成功!')
  • else:
  • print('增加组织发生错误')
  • if conn.result['description'] == 'entryAlreadyExists':
  • print('--该组织已存在')
  • # 增加用户
  • user01 = {
  • 'displayName' : 'python测试用户01', # 显示名称
  • 'userPrincipalName' : 'python_user_01@test.csc.com', # 登录名
  • 'userAccountControl': '544', # 启用账号
  • 'sAMAccountName': 'python_user_01', # 登录名
  • 'pwdLastSet': -1 # 取消下次登录修改密码
  • }
  • res = conn.add('CN=python_user_01,OU=python,OU=cibuser,DC=test,DC=csc,DC=com', object_class='user',
  • attributes=user01)
  • # attributes支持的字段可以通过server.schema.object_classes['user']获取
  • print(res)
  • print(conn.result)
  • if res:
  • print('增加用户成功!')
  • else:
  • print('增加用户发生错误')
  • if conn.result['description'] == 'entryAlreadyExists':
  • print('--该用户已存在')

下面是准备改写成类:

  • #!/usr/bin/env python
  • # coding=UTF-8
  • '''
  • @Author: wjx
  • @Description: AD域
  • @Date: 2018-12-23 21:23:57
  • @LastEditTime: 2019-03-28 23:46:56
  • '''
  • from ldap3 import Server, Connection, ALL, NTLM
  • class Adoper():
  • '''
  • 操作AD域的类
  • '''
  • def __init__(self, domain, ip, admin='administrator', pwd=None):
  • '''
  • domain: 域名,格式为:xxx.xxx.xxx
  • ip: ip地址,格式为:192.168.214.1
  • admin: 管理员账号
  • pwd: 管理员密码
  • '''
  • self.domain = domain
  • self.DC = ','.join(['DC=' + dc for dc in domain.split('.')]) # csc.com -> DC=csc,DC=com
  • self.pre = domain.split('.')[0].upper() # 用户登陆的前缀
  • self.ip = ip
  • self.admin = admin
  • self.pwd = pwd
  • self.server = Server(self.ip, get_info=ALL)
  • self.conn = Connection(self.server, user=self.pre+'\\'+self.admin, password=self.pwd, auto_bind=True, authentication=NTLM)
  • def search(self, org):
  • '''
  • 查询组织下的用户
  • org: 组织,格式为:aaa.bbb 即bbb组织下的aaa组织,不包含域地址
  • '''
  • att_list = ['displayName', 'userPrincipalName','userAccountControl','sAMAccountName','pwdLastSet']
  • org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC
  • res = self.conn.search(search_base=org_base,
  • search_filter='(objectclass=user)', # 查询数据的类型
  • attributes=att_list, # 查询数据的哪些属性
  • paged_size=1000) # 一次查询多少数据
  • if res:
  • for user in self.conn.entries:
  • yield user['displayName']
  • else:
  • print('查询失败: ', self.conn.result['description'])
  • return None
  • def add_org(self, org):
  • '''
  • 增加组织
  • oorg: 组织,格式为:aaa.bbb 即bbb组织下的aaa组织,不包含域地址
  • '''
  • org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC
  • res = self.conn.add(org_base, object_class='OrganizationalUnit') # 成功返回True,失败返回False
  • if res:
  • print(f'增加组织[ {org} ]成功!')
  • else:
  • print(f'增加组织[ {org} ]发生错误: ', self.conn.result['description'])
  • def add_user(self, org, name, uid):
  • '''
  • 增加用户
  • org:增加到该组织下
  • name:显示名称
  • uid:账号
  • '''
  • org_base = ','.join(['OU=' + ou for ou in org.split('.')]) + ',' + self.DC
  • user_att = {
  • 'displayName' : name,
  • 'userPrincipalName' : uid + '@' + self.domain, # uid@admin组成登录名
  • 'userAccountControl': '544', # 启用账号
  • 'sAMAccountName': uid,
  • 'pwdLastSet': -1 # 取消下次登录需要修改密码
  • }
  • res = self.conn.add(f'CN={uid},{org_base}', object_class='user',
  • attributes=user_att)
  • if res:
  • print(f'增加用户[ {name} ]成功!')
  • else:
  • print(f'增加用户[ {name} ]发生错误:', self.conn.result['description'])
  • if __name__ == '__main__':
  • ad93 = Adoper(domain='test.csc.com', ip='192.168.214.93', pwd='Winhong1234@#test')
  • for user in ad93.search('信息科技部.总行.cibuser'):
  • print(user)
  • ad93.add_org('python02.cibuser')
  • ad93.add_user('python02.cibuser', 'python03类用户', 'python03')
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门
本栏推荐