首页 > 代码库 > python paramiko 多线程批量执行指令及批量上传文件和目录
python paramiko 多线程批量执行指令及批量上传文件和目录
源代码:
https://github.com/jy1779/be.git
环境需求:
1、python3
2、paramiko
pip install --upgrade pip
apt-get install libssl-dev
pip3 install paramiko
3、执行权限
chmod +x becmd.py
ln -s /root/be/bin/becmd.py /usr/local/sbin/becmd
chmod +x besync.py
ln -s /root/be/bin/becmd.py /usr/local/sbin/besync
4、导入路径设置
cd /usr/lib/python3.5/dist-packages/
touch be.pth
vim be.pth
/root/be #be.pth文件内容
5、因为是从windows开发所以会出现以下问题:
windows 上传的文件,可以用这个指令格式化成Unix文件。
apt install dos2unix
root@db3:~/be/bin# ./besync.py
/usr/bin/env: ‘python3\r’: No such file or directory
root@db3:~/be/bin# dos2unix besync.py
dos2unix: converting file besync.py to Unix format ...
root@db3:~/be/bin# ./besync.py
Reminder: The source and destination addresses do not exist
Usage: ./besync.py <source address> <destination address>
解决
apt install dos2unix
dos2unix becmd.py
dos2unix besync.py
6、日志路径的问题。最好是绝对路径,不然只能在be目录下执行becmd 和besync
f = open("/root/be/logs/besync.log",‘a‘)
f = open("/root/be/logs/becmd.log",‘a‘)
程序目录:
.
├── app
│ ├── __init__.py
│ ├── pwd_connect_cmd.py
│ ├── pwd_connect_sync.py
│ ├── __pycache__
│ │ ├── __init__.cpython-35.pyc
│ │ ├── pwd_connect_cmd.cpython-35.pyc
│ │ ├── pwd_connect_sync.cpython-35.pyc
│ │ ├── ssh_be_cmd.cpython-35.pyc
│ │ └── ssh_be_sync.cpython-35.pyc
│ ├── ssh_be_cmd.py
│ └── ssh_be_sync.py
├── bin
│ ├── becmd.py
│ ├── besync.py
│ └── __init__.py
├── conf
│ ├── config.py
│ ├── __init__.py
│ └── __pycache__
│ ├── config.cpython-35.pyc
│ └── __init__.cpython-35.pyc
├── __init__.py
└── logs
├── becmd.log
├── besync.log
└── __init__.py
app/ssh_be_cmd.py
import paramiko import threading import datetime class MyThread(threading.Thread): def __init__(self,ip,port,username,password,cmd): self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) super(MyThread,self).__init__() self.ip = ip self.port = port self.username = username self.password = password self.cmd = cmd def run(self): port = int(self.port) self.ssh.connect(hostname=self.ip, port=port, username=self.username, password=self.password) stdin, stdout, stderr = self.ssh.exec_command(self.cmd) res, err = stdout.read(), stderr.read() result = res if res else err f = open("/root/be/logs/becmd.log",‘a‘) f.write(str(datetime.datetime.now())+" ") f.write(self.ip+" ") f.write(self.username+ " ") f.write(self.cmd+ "\n") f.close() print("\033[1;32;40m" + self.ip.rjust(33,‘=‘)+ "\033[0m","\033[1;32;40m" + "Command result: ".ljust(37,‘=‘)+ "\033[0m") print(result.decode()) self.ssh.close()
app/pwd_connect_cmd.py
import sys from app.ssh_be_cmd import MyThread from conf.config import account def pwd_con(host): ip = account[host]["ip"] port = account[host]["port"] username = account[host]["username"] password = account[host]["password"] a=sys.argv[1:100] cmd = " ".join(a) if len(a) >=1: M = MyThread(ip,port,username,password,cmd) M.start() else: print("Reminder: The command does not exist") exit() def connect(): for host in account.keys(): pwd_con(host)
app/ssh_be_sync.py
import paramiko import threading import datetime,time import os from os.path import getsize class MyThread(threading.Thread): def __init__(self,ip,port,username,password,cmd): super(MyThread,self).__init__() self.ip = ip self.port = port self.username = username self.password = password self.cmd = cmd def run(self): port = int(self.port) self.transport = paramiko.Transport((self.ip, port)) self.transport.connect(username=self.username, password=self.password) self.sftp = paramiko.SFTPClient.from_transport(self.transport) help=""" -f send file to remote host. %s -f <source address> <destination address> -d send dir to remote host. %s -d <source address> <destination address> --help show help. %s --help """%(self.cmd[0],self.cmd[0],self.cmd[0]) def create_remote_dir(dir): for item in dir: try: self.sftp.stat(item) pass except FileNotFoundError: print("Create a new directory: ", item) self.sftp.mkdir(item) def besync_log(): f = open("/root/be/logs/besync.log",‘a‘) for i in str(datetime.datetime.now())+" ",self.ip+" ",self.username+ " ",self.cmd[0]+" ",self.cmd[1]+" ",src+" ",des+ "\n": f.write(i) f.close() if len(self.cmd) == 4 and self.cmd[1] == "-f": src = self.cmd[2] des = self.cmd[3] besync_log() time_start = time.time() if os.path.isfile(src): des_list = des.split("/") des_dir = des_list[1:-1] b="" c=[] for item in des_dir: b+="/"+item c.append(b) create_remote_dir(c) self.sftp.put(src, des) total_time = time.time() - time_start print("\033[1;32;40mSend Successful.\033[0m") print("total size: " + str(getsize(src)) + " bytes") print("total time: " + str(total_time)) self.transport.close() elif len(self.cmd) == 4 and self.cmd[1] == "-d": def for_dir(): for res in path: if os.path.isdir(res): local_dir_path.append(res) remote_dir_path.append(des) def for_zdir(): des_src_dir.append(remote_dir_path[1]) des_src_dir_list = des_src_dir[0].split("/") des_dir_list = des_src_dir_list[1:] c = "" remote_des_src_path = [] for item in des_dir_list: c += "/" + item remote_des_src_path.append(c) create_remote_dir(remote_des_src_path) create_remote_dir(remote_dir_path) for res in path: if os.path.isfile(res): local_file_path.append(res) src = self.cmd[2] des = self.cmd[3] besync_log() sep = "/" path = [] local_dir_path = [] local_file_path = [] remote_dir_path = [] remote_file_path = [] des_src_dir = [] for i in os.listdir(src): path.append(src + sep + i) for n in path: if os.path.isdir(n) and os.listdir(n): for i in os.listdir(n): path.append(n + sep + i) local_dir_path.append(src) local_dir = src.split("/") local_dir_first = local_dir[0:-1] global a if len(local_dir_first) == 0: for_dir() for res in local_dir_path: remote_dir_path.append(des + "/" + res) for_zdir() for res in local_file_path: remote_file_path.append(des + "/" + res) else: if len(local_dir_first) ==1: dir_join="/".join(local_dir_first) a=dir_join else: dir_join="/".join(local_dir_first) a=dir_join+"/" for res in path: if os.path.isdir(res): local_dir_path.append(res) remote_dir_path.append(des) b=[item.split(a)[-1] for item in local_dir_path] for res in b: if len(local_dir_first) ==1: remote_dir_path.append(des + res) else: remote_dir_path.append(des + "/" + res) for_zdir() d = [item.split(a)[-1] for item in local_file_path] for res in d: if len(local_dir_first) ==1: remote_file_path.append(des + res) else: remote_file_path.append(des + "/" + res) time_start = time.time() local_file_num = len(local_file_path) for i in range(local_file_num): self.sftp.put(local_file_path[i],remote_file_path[i]) total_time = time.time() - time_start print("\033[1;32;40mSend Successful.\033[0m") print("total time: " + str(total_time)) self.transport.close() else: print(help)
app/pwd_connect_sync.py
import sys from app.ssh_be_sync import MyThread from conf.config import account def pwd_con(host): ip = account[host]["ip"] port = account[host]["port"] username = account[host]["username"] password = account[host]["password"] cmd=sys.argv[0:100] M = MyThread(ip, port, username, password, cmd) M.start() def connect(): for host in account.keys(): pwd_con(host)
conf/config.py
account = { "192.168.1.57":{ "ip":"192.168.1.57", "port":"22", "username":"root", "password":"123456" }, "192.168.1.75":{ "ip": "192.168.1.75", "port": "22", "username": "root", "password": "123456" } }
bin/becmd.py
#!/usr/bin/env python3 from app.pwd_connect_cmd import connect connect()
bin/besync.py
#!/usr/bin/env python3 from app.pwd_connect_sync import connect connect()
使用例子:
1、批量执行指令:
2、批量上传文件
3、批量上传目录
本文出自 “微风清凉” 博客,谢绝转载!
python paramiko 多线程批量执行指令及批量上传文件和目录