(原创)通过Python编写Xray批量测试脚本 仅用于学习和方法研究
以上介绍来自本站自建的GPT服务,自打网站被黑之后,一直在研究如何加强网站的防御,以及如何对网站进行尽可能的测试,所以写了这一系列的文章,能够使得一些工具实现串成串的效果。
代码如下:
新版(1、支持跨平台操作2、线程数决定于CPU数量):
# -*- coding: utf-8 -*-
import os
import re
import time
import sys
import platform
import multiprocessing
from queue import Queue
from threading import Thread
print('''
__ __ _____ ____ ____ _
| \/ |_ _| ___| _ \ / ___| / \ ___ _ __
| |\/| | | | | |_ | |_) | | _ / _ \ / __| '_ \
| | | | |_| | _| | __/| |_| |/ ___ \ | (__| | | |
|_| |_|\__, |_| |_| \____/_/ \_(_)___|_| |_|
|___/
Xray Batch Control tool {1.7.2#main}
*Internal version code sensitive and confidential*
''')
def readfile():
# 创建一个队列用于存储扫描目标
target_queue = Queue()
# 从文件 'targets.txt' 读取目标并加入队列
targets_list = [line.strip() for line in open('targets.txt', encoding='utf-8')]
for target in targets_list:
target_queue.put(target)
return target_queue
def format_time(seconds):
# 将秒数格式化为时:分:秒的形式
hours, remainder = divmod(int(seconds), 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def scan(target_queue, xray_path, total_targets):
start_time = time.time()
while True:
try:
target = target_queue.get()
# 从目标中提取域名部分
domain_match = re.search(r'(.*?//)?(.*)', target)
if domain_match:
domain = domain_match.group(2)
# 替换域名中的特殊字符,用于生成输出文件名
domain = domain.replace(":", "_").replace("/", "_")
# 构建扫描命令
command = f"{xray_path} webscan --basic-crawler {target} --html-output ./output/{domain}.html"
output = os.popen(command)
processed_targets = total_targets - target_queue.qsize()
elapsed_time = time.time() - start_time
targets_per_second = processed_targets / elapsed_time
remaining_targets = target_queue.qsize()
estimated_time = remaining_targets / targets_per_second if targets_per_second > 0 else 0
print(f"[{time.strftime('%H:%M:%S', time.localtime(time.time()))}] [INFO] 正在处理目标:{processed_targets}/{total_targets},估计还需要 {format_time(estimated_time)}。")
try:
result = output.read()
if "[Vuln:" in result:
print(f"[{time.strftime('%H:%M:%S', time.localtime(time.time()))}] [INFO] 发现漏洞:{result}")
except UnicodeDecodeError:
pass
finally:
target_queue.task_done()
def get_platform_info():
system = platform.system() # 获取操作系统名称
architecture = platform.architecture() # 获取架构和位数信息
if system == 'Linux':
# 在Linux上,可以使用os.uname()获取更详细的架构信息
uname = os.uname()
machine = uname.machine
if 'aarch64' in machine:
return f"Linux Arm64 {architecture[0]}"
elif 'arm' in machine:
return f"Linux Arm {architecture[0]}"
elif 'x86_64' in machine:
return f"Linux x86_64 {architecture[0]}"
elif 'i386' in machine:
return f"Linux i386 {architecture[0]}"
else:
return f"Linux {machine} {architecture[0]}"
elif system == 'Windows':
machine = platform.machine()
if machine == 'AMD64':
return f"Windows x64 {architecture[0]}"
elif machine == 'i386':
return f"Windows x86 {architecture[0]}"
else:
return f"Windows {machine} {architecture[0]}"
else:
return f"Unsupported OS: {system}"
def select_xray_executable():
platform_info = get_platform_info()
if "Linux" in platform_info:
if "Arm64" in platform_info:
return "xray_linux_arm64"
elif "i386" in platform_info:
return "xray_linux_386"
elif "x86_64" in platform_info:
return "xray_linux_amd64"
elif "Windows" in platform_info:
if "i386" in platform_info:
return "xray_windows_386.exe"
elif "x64" in platform_info:
return "xray_windows_amd64.exe"
elif "Darwin" in platform_info:
if "Arm64" in platform_info:
return "xray_darwin_arm64"
elif "x86_64" in platform_info:
return "xray_darwin_amd64"
return "Unsupported platform"
if __name__ == "__main__":
# 根据操作系统选择 xray 可执行文件路径
xray_path = select_xray_executable()
target_queue = readfile()
total_targets = target_queue.qsize()
# 使用适量的线程来并行扫描目标
usr_threads = multiprocessing.cpu_count() #线程数,可以固定修改为10等
for i in range(usr_threads):
t = Thread(target=scan, args=(target_queue, xray_path, total_targets))
t.daemon = True
t.start()
target_queue.join()
print(f"[{time.strftime('%H:%M:%S', time.localtime(time.time()))}] [INFO] 所有任务已完成!")历史版本:
# -*- coding: utf-8 -*-
import os
import re
import time
import sys
from queue import Queue
from threading import Thread
print('''
__ __ _____ ____ ____ _
| \/ |_ _| ___| _ \ / ___| / \ ___ _ __
| |\/| | | | | |_ | |_) | | _ / _ \ / __| '_ \
| | | | |_| | _| | __/| |_| |/ ___ \ | (__| | | |
|_| |_|\__, |_| |_| \____/_/ \_(_)___|_| |_|
|___/
Xray Batch Control tool {1.7.1#main}
*Internal version code sensitive and confidential*
''')
def readfile():
# 创建一个队列用于存储扫描目标
target_queue = Queue()
# 从文件 'targets.txt' 读取目标并加入队列
targets_list = [line.strip() for line in open('targets.txt', encoding='utf-8')]
for target in targets_list:
target_queue.put(target)
return target_queue
def format_time(seconds):
# 将秒数格式化为时:分:秒的形式
hours, remainder = divmod(int(seconds), 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def scan(target_queue, xray_path, total_targets):
start_time = time.time()
while True:
try:
target = target_queue.get()
# 从目标中提取域名部分
domain_match = re.search(r'(.*?//)?(.*)', target)
if domain_match:
domain = domain_match.group(2)
# 替换域名中的特殊字符,用于生成输出文件名
domain = domain.replace(":", "_").replace("/", "_")
# 构建扫描命令
command = f"{xray_path} webscan --basic-crawler {target} --html-output ./output/{domain}.html"
output = os.popen(command)
processed_targets = total_targets - target_queue.qsize()
elapsed_time = time.time() - start_time
targets_per_second = processed_targets / elapsed_time
remaining_targets = target_queue.qsize()
estimated_time = remaining_targets / targets_per_second if targets_per_second > 0 else 0
print(f"[{time.strftime('%H:%M:%S', time.localtime(time.time()))}] [INFO] 正在处理目标:{processed_targets}/{total_targets},估计还需要 {format_time(estimated_time)}。")
try:
result = output.read()
if "[Vuln:" in result:
print(f"[{time.strftime('%H:%M:%S', time.localtime(time.time()))}] [INFO] 发现漏洞:{result}")
except UnicodeDecodeError:
pass
finally:
target_queue.task_done()
if __name__ == "__main__":
# 根据操作系统选择 xray 可执行文件路径
xray_path = "xray_windows_amd64.exe" if sys.platform == 'win32' else "xray_linux_amd64"
target_queue = readfile()
total_targets = target_queue.qsize()
# 使用适量的线程来并行扫描目标
for i in range(10):
t = Thread(target=scan, args=(target_queue, xray_path, total_targets))
t.daemon = True
t.start()
target_queue.join()
print(f"[{time.strftime('%H:%M:%S', time.localtime(time.time()))}] [INFO] 所有任务已完成!")


