更新时间:2024年12月30日 16:16:56 作者:hvinsion
一款由Python可以远程控制局域网电脑关机、重启、注销、锁定 、休眠、退出登录甚至能操作远程电脑Cmd终端命令的一款工具。资源及源码已打包,大家可自行下载。
2. 运行效果
3. 1.0版本相关源码
import socket import keyboard import subprocess import threading import logging import queue ''' import win32gui import win32con # 获取当前窗口句柄 hwnd = win32gui.GetForegroundWindow() # 设置窗口属性,隐藏窗口 win32gui.ShowWindow(hwnd, win32con.SW_HIDE) ''' # 配置日志 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') print("客户端登录账号:root,密码:123456,F3关闭服务器") print() ip1 = '' # 默认ip地址 print(f"请输入服务器IP地址(电脑的IPv4地址),不输入默认为 {ip1}") # 超时设置,15秒 def get_input(prompt, timeout, result_queue): try: user_input = input(prompt) if user_input.strip() == "": result_queue.put(None) # 如果用户直接回车,返回None else: result_queue.put(user_input) except Exception as e: logging.error(f"输入过程中发生错误: {e}") # 创建一个线程来获取输入 print("超时时间---15秒") print() result_queue = queue.Queue() input_thread = threading.Thread(target=get_input, args=("请输入服务器IP地址:", 15, result_queue)) input_thread.start() # 等待输入,如果超时则返回默认值 input_thread.join(15) if not result_queue.empty(): ip = result_queue.get() if ip is None: ip = ip1 # 如果用户直接回车,使用默认值 else: ip = ip1 print(f"最终IP地址: {ip}") ''' def get_ipv4_address(): hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) return ip_address ip=get_ipv4_address() print(ip) ''' # 创建一个 TCP/IP socket server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 设置 SO_REUSEADDR 选项 server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 绑定服务器地址和端口 server_address = (ip, 5000) server_socket.bind(server_address) # 监听连接 server_socket.listen(5) # 支持最多5个并发连接 print('服务器正在等待连接...') def close_server(): print('服务器已关闭') server_socket.close() exit() # 监听 F3 键关闭服务器 keyboard.add_hotkey('F3', close_server) def handle_client(client_socket, client_address): try: print('连接来自:', client_address) # 接收账号和密码 data = client_socket.recv(1024) logging.debug(f"接收到的数据: {data.decode()}") # 解析账号和密码 try: account, password = data.decode().split(':') except ValueError: logging.error("账号和密码格式错误") message = '账号和密码格式错误!' client_socket.send(message.encode()) return # 验证账号和密码 if account == 'root' and password == '123456': # 发送成功消息 message = '登录成功!' client_socket.send(message.encode()) # 接收客户端请求 while True: request = client_socket.recv(1024).decode() logging.debug(f"接收到的请求: {request}") if request == '1': # 关机'shutdown /s /t 0', shell=True) elif request == '2': # 重启'shutdown /r /t 0', shell=True) elif request == '3': # 休眠'rundll32.exe powrprof.dll,SetSuspendState 0,1,0', shell=True) elif request == '4': # 锁定'rundll32.exe user32.dll,LockWorkStation', shell=True) elif request == '0': # 退出登录 print(client_address, "退出登录") break elif request == '5': # 等待用户输入 user_input = 1 while True: # 执行cmd命令 command = client_socket.recv(1024).decode() logging.debug(f"接收到的命令:{command}") try: # 执行命令 result = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # 获取命令的输出 output, error = result.communicate() # 打印命令的输出到本地控制台 logging.debug(f"{output}") # 发送命令的输出 if error: output = " " client_socket.sendall(output.encode()) logging.debug(f"{error}") # 发送命令的错误 client_socket.sendall(error.encode()) logging.debug("命令结果已发送") except subprocess.CalledProcessError as e: logging.error(f"{e.stderr}") # 发送错误信息 client_socket.sendall(str(e).encode()) finally: # 发送一个结束标记 client_socket.sendall(' '.encode()) # 等待用户输入 #接收客户端的user_input值 user_input = client_socket.recv(1024).decode() logging.debug(f"接收到的请求: {user_input}") if user_input == '1': # 继续执行命令 continue elif user_input == '2': print(client_address, "退出登录") # 退出循环 break else: # 无效输入 client_socket.sendall('无效输入'.encode()) # 继续执行命令 continue else: # 无效输入 client_socket.sendall('无效输入'.encode()) else: # 无效请求 message = '无效请求!' client_socket.send(message.encode()) else: # 发送失败消息 message = '账号或密码错误!' client_socket.send(message.encode()) except Exception as e: logging.error(f"处理客户端连接时发生错误: {e}") finally: # 关闭连接 client_socket.close() while True: try: # 等待客户端连接 client_socket, client_address = server_socket.accept() logging.debug(f"接受到来自 {client_address} 的连接") # 创建新线程处理客户端连接 client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address)) client_thread.start() except OSError as e: if e.errno == 10038: print('服务器已关闭') break else: raise input("按回车键退出...")
import socket def run_script(): try: print() ip1 = '' print("请输入服务器的IP地址,不输入默认ip为", ip1) print() ip = input("请输入服务器的IP地址:") or ip1 print() # 服务器地址和端口 server_address = (ip, 5000) # 创建一个 TCP/IP socket client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # 连接到服务器 client_socket.connect(server_address) print('已连接到服务器') # 获取用户输入的账号和密码 account = input('请输入账号:') or 'root' password = input('请输入密码:') or '123456' # 发送账号和密码 message = f'{account}:{password}' client_socket.sendall(message.encode()) # 接收服务器响应 response = client_socket.recv(1024).decode() print('服务器响应:', response) if response == '登录成功!': print('登录成功') print() # 发送默认请求 print("[0]退出登录---[1]关机---[2]重启---[3]休眠") print("[4]锁定---[5]执行cmd命令") print() request = input('输入数字: ') or "5" # 你可以修改这个请求 if request in ['0', '1', '2', '3', '4', '5']: client_socket.sendall(request.encode()) print("请求已发送") if request == '0': print("退出登录成功") elif request == '5': while True: # 获取用户输入的命令 print() command = input('请输入要执行的命令:') or "echo %date% %time%" # 发送命令 client_socket.sendall(command.encode()) # 设置超时时间 client_socket.settimeout(60) try: print() print("接收超时时间为60秒") print() # 接收命令的输出 data = b'' count = 0 while count < 2: packet = client_socket.recv(4096) data += packet count += 1 output = data.decode('utf-8', errors='ignore') print(output) except socket.timeout: print("接收命令的输出超时") # 等待用户输入 print() user_input = input("是否继续执行命令?[1]继续---[2]退出: ") or "1" if user_input == '1': # 发送 client_socket.sendall(user_input.encode()) # 继续执行命令 continue elif user_input == '2': print("退出登录成功") break else: # 无效输入 print('无效输入,继续执行命令') # 发送 client_socket.sendall(user_input.encode()) # 继续执行命令 continue else: print("无效的请求") else: print('登录失败') finally: # 关闭连接 client_socket.close() user_choice = input("输入1继续运行,输入2结束代码: ") if user_choice == '2': print("结束代码") return elif user_choice == '1': print("继续运行") run_script() else: print("无效输入,默认退出代码") except Exception as e: print(f"An error occurred: {e}") else: print("Script completed successfully.") run_script()
4. 2.0版本相关源码
from flask import Flask, request, render_template_string, jsonify import subprocess import socket import os app = Flask(__name__) def get_local_ip(): try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: # 连接到一个公网DNS服务器,不发送任何数据,只是为了获取本地IP s.connect(("", 53)) return s.getsockname()[0] except: return None # 如果无法获取IP,返回None def load_server_ip(): ip_file = 'server_ip.txt' if not os.path.exists(ip_file): return None with open(ip_file, 'r') as f: ip = return ip if ip else None def save_server_ip(ip): ip_file = 'server_ip.txt' with open(ip_file, 'w') as f: f.write(ip) def ping_ip(ip): """ Ping指定的IP,返回True如果IP可达,否则返回False """ try: # 在Windows系统上使用-4参数强制使用IPv4 output = subprocess.check_output(['ping', '-4', '-n', '1', ip], stderr=subprocess.STDOUT, universal_newlines=True) if "unreachable" in output.lower(): return False return True except subprocess.CalledProcessError: return False def bind_server(host_ip): try: print(f"尝试绑定到IP: {host_ip}"), port=80) except Exception as e: print(f"绑定到 {host_ip} 失败: {e}") return False return True def main(): # 尝试从server_ip.txt读取IP server_ip = load_server_ip() if server_ip: print(f"从文件加载服务器IP: {server_ip}") if not ping_ip(server_ip): print(f"Ping测试失败,IP {server_ip} 不可用") server_ip = None else: print(f"Ping测试成功,IP {server_ip} 可用") else: server_ip = None # 重置server_ip,因为文件不存在或内容为空 # 如果server_ip.txt中的IP不可用,尝试获取本机IP if not server_ip: server_ip = get_local_ip() if server_ip: print(f"获取到服务器IP: {server_ip}") if not ping_ip(server_ip): print(f"Ping测试失败,IP {server_ip} 不可用") server_ip = None else: print(f"Ping测试成功,IP {server_ip} 可用") save_server_ip(server_ip) else: print("无法获取本机IP") # 尝试绑定到server_ip if server_ip: if not bind_server(server_ip): # 如果绑定失败,尝试绑定到127.0.0.1 print("绑定到指定IP失败,尝试绑定到127.0.0.1") if not bind_server(''): print("绑定到127.0.0.1也失败,终止脚本") exit(1) else: # 如果没有有效的IP,尝试绑定到127.0.0.1 print("没有有效的IP,尝试绑定到127.0.0.1") if not bind_server(''): print("绑定到127.0.0.1失败,终止脚本") exit(1) @app.route('/', methods=['GET', 'POST']) def login(): if request.method == 'POST': data = request.form if 'username' in data and 'password' in data: username = data['username'] password = data['password'] print(f"Received username: {username}, password: {password}") if username == 'root' and password == '123456': # 这里是用户名和密码,请自行修改。 return render_template_string(''' <!DOCTYPE html> <html> <head> <title>登录成功</title> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style> body { font-family: Arial, sans-serif; margin: 20px; } h1, h2 { text-align: center; } /* 按钮样式 */ .btn { padding: 10px 20px; font-size: 16px; margin: 5px; border: none; border-radius: 4px; cursor: pointer; background-color: #007bff; color: white; } .btn:hover { background-color: #0056b3; } /* 输入框样式 */ #command-input { width: 100%; height: 100px; font-size: 16px; padding: 10px; resize: vertical; border: 1px solid #ccc; border-radius: 4px; box-sizing: border-box; } /* 结果显示区域 */ #result { margin-top: 20px; padding: 10px; border: 1px solid #ddd; border-radius: 4px; background-color: #f9f9f9; white-space: pre-wrap; /* 保留换行 */ } /* 执行按钮样式 */ #execute-button { float: right; margin-top: 10px; } /* 响应式设计 */ @media (max-width: 600px) { .btn { width: 100%; font-size: 14px; } #command-input { height: 100px; font-size: 14px; } #execute-button { width: 100%; float: none; } #result { font-size: 14px; } } </style> </head> <body> <h1>登录成功</h1> <div style="text-align: center;"> <button class="btn">退出登录</button> <button class="btn">关机</button> <button class="btn">重启</button> <button class="btn">休眠</button> <button class="btn">锁定</button> </div> <br> <h2>执行命令</h2> <textarea id="command-input" placeholder="请输入命令"></textarea> <button id="execute-button" class="btn">执行</button> <h3>结果:</h3> <div id="result"></div> <script> function logout() { fetch('/logout', { method: 'GET' }) .then(response => response.json()) .then(data => { alert(data.message); window.location.href = '/'; }); } function shutdown() { fetch('/shutdown', { method: 'GET' }) .then(response => response.json()) .then(data => alert(data.message)); } function restart() { fetch('/restart', { method: 'GET' }) .then(response => response.json()) .then(data => alert(data.message)); } function sleep() { fetch('/sleep', { method: 'GET' }) .then(response => response.json()) .then(data => alert(data.message)); } function lock() { fetch('/lock', { method: 'GET' }) .then(response => response.json()) .then(data => alert(data.message)); } function executeCommand() { const command = document.getElementById('command-input').value; fetch('/execute', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ command: command }) }) .then(response => response.json()) .then(data => { document.getElementById('result').innerText = data.output; }) .catch(error => { document.getElementById('result').innerText = '执行失败'; }); } </script> </body> </html> ''') else: return jsonify({"message": "登录失败"}), 401 else: return jsonify({"message": "缺少用户名或密码"}), 400 else: return render_template_string(''' <!DOCTYPE html> <html> <head> <title>登录</title> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style> body { font-family: Arial, sans-serif; margin: 20px; } form { max-width: 400px; margin: 0 auto; } input[type="text"], input[type="password"] { width: 100%; padding: 10px; margin: 10px 0; border: 1px solid #ccc; border-radius: 4px; box-sizing: border-box; } input[type="submit"] { width: 100%; padding: 10px; background-color: #28a745; color: white; border: none; border-radius: 4px; cursor: pointer; } input[type="submit"]:hover { background-color: #218838; } @media (max-width: 600px) { form { max-width: 100%; } } </style> </head> <body> <h1>登录</h1> <form method="post"> <input type="text" name="username" placeholder="用户名"><br> <input type="password" name="password" placeholder="密码"><br> <input type="submit" value="登录"> </form> </body> </html> ''') @app.route('/logout', methods=['GET']) def logout(): return jsonify({"message": "退出登录成功"}), 200 @app.route('/shutdown', methods=['GET']) def shutdown(): try:["shutdown", "/s", "/t", "1"], check=True) return jsonify({"message": "关机成功"}), 200 except subprocess.CalledProcessError as e: return jsonify({"message": "关机失败", "error": str(e)}), 500 @app.route('/restart', methods=['GET']) def restart(): try:["shutdown", "/r", "/t", "1"], check=True) return jsonify({"message": "重启成功"}), 200 except subprocess.CalledProcessError as e: return jsonify({"message": "重启失败", "error": str(e)}), 500 @app.route('/sleep', methods=['GET']) def sleep(): try:["rundll32.exe", "powrprof.dll,SetSuspendState", "0,1,0"], check=True) return jsonify({"message": "休眠成功"}), 200 except subprocess.CalledProcessError as e: return jsonify({"message": "休眠失败", "error": str(e)}), 500 @app.route('/lock', methods=['GET']) def lock(): try:["rundll32", "user32.dll,LockWorkStation"], check=True) return jsonify({"message": "锁定成功"}), 200 except subprocess.CalledProcessError as e: return jsonify({"message": "锁定失败", "error": str(e)}), 500 @app.route('/execute', methods=['POST']) def execute(): data = request.get_json() command = data.get('command', '') try: result =, shell=True, capture_output=True, text=True) output = result.stdout + result.stderr return jsonify({"output": output}), 200 except Exception as e: return jsonify({"output": f"执行失败: {str(e)}"}), 500 if __name__ == '__main__': main()