使用Python编写一个简单的下载工具

如何使用Python编写一个简单的下载工具?(PCLⅡ百宝箱)

步骤1—安装前置

安装所需的前置库

在确认你已安装python的情况下安装requests库(如果安装则忽略)

在cmd中键入:pip install requests


步骤2—创建文件

创建一个py文件(比如 downloader.py )

示例代码(可以正常使用)

import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import threading
import queue
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from requests.exceptions import RequestException
import os
import time
from urllib.parse import urlparse
import logging
import hashlib
from pathlib import Path
import json

class DownloaderApp(tk.Tk):
def __init__(self):
    super().__init__()
    self.title("高级批量下载工具")
    self.geometry("1200x800")
    self.style = ttk.Style()
    self.configure_styles()
    self.task_widgets = {}
    self.create_widgets()
    self.download_manager = DownloadManager(self)
    self.protocol("WM_DELETE_WINDOW", self.on_close)
    self.load_download_history()

def configure_styles(self):
    self.style.theme_use("clam")
    self.style.configure(".", font=("Segoe UI", 9))
    self.style.configure("TFrame", background="#f5f6f7")
    self.style.configure("TLabel", background="#f5f6f7", foreground="#333")
    self.style.configure("Header.TLabel", font=('Helvetica', 10, 'bold'))
    self.style.configure("TButton", padding=6, relief="flat", 
                       background="#4CAF50", foreground="white")
    self.style.map("TButton",
                 background=[('active', '#45a049'), ('disabled', '#c8e6c9')],
                 foreground=[('disabled', '#757575')])
    self.style.configure("Task.Horizontal.TProgressbar",
                       thickness=18,
                       troughcolor="#e0e0e0",
                       background="#4CAF50",
                       troughrelief="flat")
    self.style.configure("Task.TFrame", background="white", borderwidth=1, 
                       relief="solid", bordercolor="#e0e0e0")
    self.style.configure("Task.TLabel", background="white")
    self.style.configure("Status.TLabel", background="white", 
                       foreground="#666", font=("Segoe UI", 8))

def create_widgets(self):
    main_frame = ttk.Frame(self)
    main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)

    # 左侧控制面板
    left_frame = ttk.Frame(main_frame, width=300)
    left_frame.pack(side=tk.LEFT, fill=tk.Y)

    control_panel = ttk.LabelFrame(left_frame, text="下载设置", padding=10)
    control_panel.pack(fill=tk.X, pady=5)

    # 输入文件选择
    ttk.Label(control_panel, text="输入文件 (.txt):").grid(row=0, column=0, sticky=tk.W, pady=2)
    self.input_entry = ttk.Entry(control_panel, width=25)
    self.input_entry.grid(row=0, column=1, pady=2)
    ttk.Button(control_panel, text="浏览", command=self.select_input_file, width=6).grid(row=0, column=2, pady=2)

    # 输出目录选择
    ttk.Label(control_panel, text="输出目录:").grid(row=1, column=0, sticky=tk.W, pady=2)
    self.output_entry = ttk.Entry(control_panel, width=25)
    self.output_entry.grid(row=1, column=1, pady=2)
    ttk.Button(control_panel, text="浏览", command=self.select_output_dir, width=6).grid(row=1, column=2, pady=2)

    # 并发设置
    ttk.Label(control_panel, text="并发数:").grid(row=2, column=0, sticky=tk.W, pady=2)
    self.concurrency_spin = ttk.Spinbox(control_panel, from_=1, to=20, width=5)
    self.concurrency_spin.set(5)
    self.concurrency_spin.grid(row=2, column=1, sticky=tk.W, pady=2)

    # 分块下载设置
    self.chunk_var = tk.BooleanVar()
    self.chunk_cb = ttk.Checkbutton(control_panel, text="启用分块下载", variable=self.chunk_var, command=self.toggle_chunk)
    self.chunk_cb.grid(row=3, column=0, columnspan=3, pady=5, sticky=tk.W)

    ttk.Label(control_panel, text="块大小 (MB):").grid(row=4, column=0, sticky=tk.W, pady=2)
    self.chunk_size = ttk.Spinbox(control_panel, from_=1, to=100, width=5, state=tk.DISABLED)
    self.chunk_size.set(10)
    self.chunk_size.grid(row=4, column=1, sticky=tk.W, pady=2)

    ttk.Label(control_panel, text="线程数/文件:").grid(row=5, column=0, sticky=tk.W, pady=2)
    self.threads_per_file = ttk.Spinbox(control_panel, from_=1, to=16, width=5, state=tk.DISABLED)
    self.threads_per_file.set(4)
    self.threads_per_file.grid(row=5, column=1, sticky=tk.W, pady=2)

    # 重试设置
    self.retry_var = tk.BooleanVar()
    self.retry_cb = ttk.Checkbutton(control_panel, text="启用自动重试", variable=self.retry_var, command=self.toggle_retry)
    self.retry_cb.grid(row=6, column=0, columnspan=3, pady=5, sticky=tk.W)

    ttk.Label(control_panel, text="最大重试次数:").grid(row=7, column=0, sticky=tk.W, pady=2)
    self.max_retries = ttk.Spinbox(control_panel, from_=0, to=10, width=5, state=tk.DISABLED)
    self.max_retries.set(3)
    self.max_retries.grid(row=7, column=1, sticky=tk.W, pady=2)

    # 控制按钮
    btn_frame = ttk.Frame(control_panel)
    btn_frame.grid(row=8, column=0, columnspan=3, pady=10)
    self.start_btn = ttk.Button(btn_frame, text="开始下载", command=self.start_download)
    self.start_btn.pack(side=tk.LEFT, padx=5)
    self.stop_btn = ttk.Button(btn_frame, text="终止下载", command=self.stop_download, state=tk.DISABLED)
    self.stop_btn.pack(side=tk.LEFT, padx=5)

    # 右侧显示面板
    right_frame = ttk.Frame(main_frame)
    right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)

    # 总进度条
    ttk.Label(right_frame, text="总下载进度", style="Header.TLabel").pack(anchor=tk.W)
    self.total_progress = ttk.Progressbar(right_frame, length=600, mode='determinate', style="Task.Horizontal.TProgressbar")
    self.total_progress.pack(fill=tk.X, pady=5)

    # 下载项滚动区域
    self.canvas = tk.Canvas(right_frame, bg="white", highlightthickness=0)
    self.scrollbar = ttk.Scrollbar(right_frame, orient="vertical", command=self.canvas.yview)
    self.items_frame = ttk.Frame(self.canvas)

    self.items_frame.bind("<Configure>", lambda e: self.canvas.configure(
        scrollregion=self.canvas.bbox("all")))
    self.canvas.create_window((0, 0), window=self.items_frame, anchor="nw")
    self.canvas.configure(yscrollcommand=self.scrollbar.set)

    self.canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
    self.scrollbar.pack(side=tk.RIGHT, fill=tk.Y)

    # 日志区域
    ttk.Label(right_frame, text="操作日志", style="Header.TLabel").pack(anchor=tk.W)
    self.log_text = tk.Text(right_frame, height=8, wrap=tk.WORD, state=tk.DISABLED)
    log_scroll = ttk.Scrollbar(right_frame, command=self.log_text.yview)
    self.log_text.configure(yscrollcommand=log_scroll.set)
    self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
    log_scroll.pack(side=tk.RIGHT, fill=tk.Y)

def save_download_history(self):
    history = {
        'input_file': self.input_entry.get(),
        'output_dir': self.output_entry.get(),
        'tasks': [
            {
                'url': task.url,
                'filename': task.filename,
                'downloaded': task.downloaded,
                'total_size': task.total_size
            }
            for task in self.download_manager.tasks.values()
        ]
    }
    with open('download_history.json', 'w') as f:
        json.dump(history, f)

def load_download_history(self):
    try:
        with open('download_history.json') as f:
            history = json.load(f)
            self.input_entry.delete(0, tk.END)
            self.output_entry.delete(0, tk.END)
            self.input_entry.insert(0, history.get('input_file', ''))
            self.output_entry.insert(0, history.get('output_dir', ''))
    except FileNotFoundError:
        # 首次运行时自动创建空文件
        open('download_history.json', 'w').close()
    except json.JSONDecodeError:
        # 处理空文件或无效JSON的情况
        self.log("历史文件损坏,已重置")
        open('download_history.json', 'w').close()

def toggle_chunk(self):
    state = tk.NORMAL if self.chunk_var.get() else tk.DISABLED
    self.chunk_size.config(state=state)
    self.threads_per_file.config(state=state)

def toggle_retry(self):
    state = tk.NORMAL if self.retry_var.get() else tk.DISABLED
    self.max_retries.config(state=state)

def select_input_file(self):
    path = filedialog.askopenfilename(filetypes=[("Text files", "*.txt")])
    if path:
        self.input_entry.delete(0, tk.END)
        self.input_entry.insert(0, path)

def select_output_dir(self):
    path = filedialog.askdirectory()
    if path:
        self.output_entry.delete(0, tk.END)
        self.output_entry.insert(0, path)

def start_download(self):
    input_file = self.input_entry.get()
    output_dir = self.output_entry.get()

    if not input_file or not output_dir:
        messagebox.showerror("错误", "请先选择输入文件和输出目录")
        return

    try:
        with open(input_file) as f:
            urls = [line.strip() for line in f if line.strip()]
    except Exception as e:
        messagebox.showerror("错误", f"无法读取文件: {str(e)}")
        return

    if not urls:
        messagebox.showerror("错误", "输入文件中没有有效的URL")
        return

    config = {
        'output_dir': output_dir,
        'concurrency': int(self.concurrency_spin.get()),
        'chunk_size': int(self.chunk_size.get())*1024*1024 if self.chunk_var.get() else 0,
        'threads_per_file': int(self.threads_per_file.get()) if self.chunk_var.get() else 1,
        'max_retries': int(self.max_retries.get()) if self.retry_var.get() else 0
    }

    self.start_btn.config(state=tk.DISABLED)
    self.stop_btn.config(state=tk.NORMAL)
    self.download_manager.start(urls, config)

def stop_download(self):
    self.download_manager.stop()
    self.stop_btn.config(state=tk.DISABLED)
    self.start_btn.config(state=tk.NORMAL)

def create_task_widget(self, task_id, filename):
    base_name, ext = os.path.splitext(filename)
    counter = 1
    while filename in [w['filename'] for w in self.task_widgets.values()]:
        filename = f"{base_name}({counter}){ext}"
        counter += 1

    frame = ttk.Frame(self.items_frame, style="Task.TFrame")
    frame.pack(fill=tk.X, pady=2, padx=5)

    lbl = ttk.Label(frame, text=filename, width=40, style="Task.TLabel")
    lbl.pack(side=tk.LEFT)

    progress = ttk.Progressbar(frame, style="Task.Horizontal.TProgressbar")
    progress.pack(side=tk.LEFT, expand=True, padx=5)

    status = ttk.Label(frame, text="等待中", width=12, style="Status.TLabel")
    status.pack(side=tk.RIGHT)

    self.task_widgets[task_id] = {
        'frame': frame,
        'progress': progress,
        'status': status,
        'filename': filename
    }

def task_complete(self, task_id):
    if task_id in self.task_widgets:
        self.task_widgets[task_id]['status'].config(text="完成", foreground="#4CAF50")
        self.task_widgets[task_id]['progress']['value'] = 100

def task_error(self, task_id, message):
    if task_id in self.task_widgets:
        self.task_widgets[task_id]['status'].config(text="错误", foreground="#f44336")
        self.log(f"任务 {task_id} 失败: {message}")

def log(self, message):
    self.log_text.config(state=tk.NORMAL)
    self.log_text.insert(tk.END, f"{time.strftime('%H:%M:%S')} - {message}\n")
    self.log_text.see(tk.END)
    self.log_text.config(state=tk.DISABLED)

def on_close(self):
    self.download_manager.stop()
    self.destroy()

class DownloadManager:
    def __init__(self, app):
        self.app = app
        self.queue = queue.Queue()
        self.stop_event = threading.Event()
        self.executor = None
        self.tasks = {}
        self.next_id = 1
        self.task_progress = {}
        self.max_ui_batch_size = 100
        self.ui_update_interval = 0.02
        self.ui_thread = threading.Thread(target=self.process_ui_queue, daemon=True)
        self.ui_thread.start()

def start(self, urls, config):
    existing_files = self.get_existing_files(config['output_dir'])
    max_workers = min(config['concurrency'] * 2, os.cpu_count() * 2 + config['concurrency'])
    self.executor = ThreadPoolExecutor(max_workers=max_workers)

    for url in urls:
        if self.stop_event.is_set():
            break

        filename = self.get_persistent_filename(url, config['output_dir'])
        filepath = os.path.join(config['output_dir'], filename)

        if filename in existing_files:
            if os.path.exists(filepath) and os.path.getsize(filepath) == existing_files[filename]:
                self.app.log(f"跳过已下载文件: {filename}")
                continue
            else:
                self.app.log(f"恢复下载: {filename}")

        task_id = self.next_id
        self.next_id += 1

        task = DownloadTask(
            task_id=task_id,
            url=url,
            config=config,
            queue=self.queue,
            stop_event=self.stop_event
        )

        self.tasks[task_id] = task
        self.executor.submit(task.run)

def stop(self):
    self.stop_event.set()

    if self.executor:
        try:
            # Python 3.9+的关闭方式
            self.executor.shutdown(wait=False, cancel_futures=True)
        except TypeError:
            # 兼容旧版本的关闭方式
            self.executor.shutdown(wait=False)
            # 正确取消future的方式
            if hasattr(self.executor, '_futures'):
                for future in list(self.executor._futures):
                    future.cancel()  # 正确的取消方式

    # 关闭分块下载线程池
    for task in self.tasks.values():
        if hasattr(task, 'chunk_executor'):
            try:
                task.chunk_executor.shutdown(wait=False)
                # 正确取消分块future
                if hasattr(task.chunk_executor, '_futures'):
                    for future in list(task.chunk_executor._futures):
                        future.cancel()
            except Exception as e:
                self.queue.put({'type': 'log', 'message': f"终止分块任务失败: {str(e)}"})

    self.tasks.clear()
    self.queue.put({'type': 'log', 'message': '下载已安全终止'})

def get_persistent_filename(self, url, output_dir):
    url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
    parsed = urlparse(url)
    base_name = os.path.basename(parsed.path) or "download"
    base, ext = os.path.splitext(base_name)
    filename = f"{base}_{url_hash}{ext}"

    counter = 1
    while os.path.exists(os.path.join(output_dir, filename)):
        filename = f"{base}_{url_hash}_{counter}{ext}"
        counter += 1
    return filename

def get_existing_files(self, output_dir):
    files = {}
    for fname in os.listdir(output_dir):
        path = os.path.join(output_dir, fname)
        if os.path.isfile(path) and not fname.startswith('.'):
            files[fname] = os.path.getsize(path)
        elif os.path.isdir(path) and fname.startswith('.'):
            meta_file = os.path.join(path, '.metadata')
            if os.path.exists(meta_file):
                with open(meta_file) as mf:
                    meta = json.load(mf)
                    files[meta['filename']] = meta['downloaded']
    return files

def process_ui_queue(self):
    while True:
        try:
            messages = []
            merged = {'progress': {}, 'logs': [], 'completes': [], 'errors': []}

            for _ in range(self.max_ui_batch_size):
                try:
                    msg = self.queue.get_nowait()
                    if msg['type'] == 'progress':
                        for task in msg['tasks']:
                            merged['progress'][task['id']] = task
                    elif msg['type'] == 'log':
                        merged['logs'].append(msg['message'])
                    elif msg['type'] == 'complete':
                        merged['completes'].append(msg['task_id'])
                    elif msg['type'] == 'error':
                        merged['errors'].append(msg)
                except queue.Empty:
                    break

            self.app.after(0, lambda: self.update_ui_batch(merged))
            time.sleep(self.ui_update_interval)
        except Exception as e:
            print(f"UI队列处理错误: {str(e)}")

def update_ui_batch(self, merged):
    # 处理完成任务
    for task_id in merged['completes']:
        if task_id in self.task_progress:
            del self.task_progress[task_id]
        self.app.task_complete(task_id)

    # 处理错误任务
    for error in merged['errors']:
        task_id = error['task_id']
        self.app.task_error(task_id, error['message'])

    # 更新进度条
    if merged['progress']:
        self.handle_batch_progress(list(merged['progress'].values()))

    # 更新日志
    if merged['logs']:
        self.app.log("\n".join(merged['logs']))

def handle_batch_progress(self, tasks):
    # 更新全局进度数据
    for task in tasks:
        self.task_progress[task['id']] = {
            'total': task.get('total_size', 0),
            'downloaded': task['downloaded']
        }

    # 计算总进度
    total_downloaded = sum(p['downloaded'] for p in self.task_progress.values())
    total_size = sum(p['total'] for p in self.task_progress.values())
    overall_progress = (total_downloaded / total_size) * 100 if total_size > 0 else 0
    self.app.total_progress['value'] = overall_progress

    # 更新子进度条
    self.app.canvas.configure(state="normal")
    for task in tasks:
        if task['id'] not in self.app.task_widgets:
            self.app.create_task_widget(task['id'], task['filename'])

        widget = self.app.task_widgets[task['id']]
        widget['progress']['value'] = task['progress']
        widget['status'].config(text=task['status'])
    self.app.canvas.configure(state="disabled")

class DownloadTask:
    def __init__(self, task_id, url, config, queue, stop_event):
        self.task_id = task_id
        self.url = url
        self.config = config
        self.queue = queue
        self.stop_event = stop_event
        self.filename = self.get_persistent_filename()
        self.filepath = os.path.join(config['output_dir'], self.filename)
        self.temp_dir = os.path.join(config['output_dir'], f".{self.filename}.tmp")
        self.total_size = 0
        self.downloaded = 0
        self.last_update_time = 0
        self.update_interval = 0.1
        self.metadata = {
            'url': self.url,
            'filename': self.filename,
            'total_size': self.total_size,
            'downloaded': self.downloaded
        }

def get_persistent_filename(self):
        url_hash = hashlib.md5(self.url.encode()).hexdigest()[:8]
        parsed = urlparse(self.url)
        base_name = os.path.basename(parsed.path) or "download"
        base, ext = os.path.splitext(base_name)
        return f"{base}_{url_hash}{ext}"

def get_unique_filename(self):
    path = urlparse(self.url).path
    filename = os.path.basename(path) or "download"
    base_name, ext = os.path.splitext(filename)
    counter = 1
    while os.path.exists(os.path.join(self.config['output_dir'], filename)):
        filename = f"{base_name}({counter}){ext}"
        counter += 1
    return filename

def run(self):
    retry_count = 0
    while retry_count <= self.config['max_retries']:
        try:
            if not self.prepare_download():
                return

            if self.config['chunk_size'] > 0 and self.config['threads_per_file'] > 1:
                self.download_chunked()
            else:
                self.download_whole()

            self.queue.put({'type': 'complete', 'task_id': self.task_id})
            return
        except Exception as e:
            if retry_count < self.config['max_retries']:
                retry_count += 1
                self.log(f"重试中 ({retry_count}/{self.config['max_retries']})")
                time.sleep(2 ** retry_count)
                self.reset_state()
            else:
                self.queue.put({
                    'type': 'error',
                    'task_id': self.task_id,
                    'message': f"最终失败: {str(e)}"
                })
                self.cleanup()
                return

def prepare_download(self):
    # 恢复下载状态的逻辑
    if os.path.exists(self.filepath):
        local_size = os.path.getsize(self.filepath)
        if local_size == self.total_size:
            return False
        elif local_size > self.total_size:
            os.remove(self.filepath)

    if os.path.exists(self.temp_dir):
        self.restore_from_temp()

    try:
        r = requests.head(self.url, allow_redirects=True)
        if r.status_code != 200:
            raise Exception(f"无效响应: {r.status_code}")

        self.total_size = int(r.headers.get('content-length', 0))
        self.accept_ranges = 'bytes' in r.headers.get('accept-ranges', '')

        if self.config['chunk_size'] > 0 and not self.accept_ranges:
            self.log("服务器不支持分块下载,使用单线程")
            self.config['chunk_size'] = 0
            self.config['threads_per_file'] = 1

        self.update_progress(status="准备中")
        return True
    except Exception as e:
        self.log(f"准备失败: {str(e)}")
        return False

def restore_from_temp(self):
    meta_file = os.path.join(self.temp_dir, '.metadata')
    if os.path.exists(meta_file):
        with open(meta_file) as f:
            meta = json.load(f)
            self.downloaded = meta['downloaded']
            self.total_size = meta['total_size']
            self.log(f"恢复下载进度: {self.downloaded}/{self.total_size} bytes")

def save_metadata(self):
    if hasattr(self, 'temp_dir'):
        meta_path = os.path.join(self.temp_dir, '.metadata')
        with open(meta_path, 'w') as f:
            json.dump(self.metadata, f)

def download_whole(self):
    headers = {}
    if os.path.exists(self.filepath):
        downloaded = os.path.getsize(self.filepath)
        headers['Range'] = f'bytes={downloaded}-'

    for retry in range(self.config['max_retries'] + 1):
        if self.stop_event.is_set():
            return

        try:
            with requests.get(self.url, headers=headers, stream=True) as r:
                r.raise_for_status()
                self.save_content(r, mode='ab' if headers else 'wb')
                return
        except RequestException as e:
            if retry < self.config['max_retries']:
                self.log(f"重试中 ({retry+1}/{self.config['max_retries']})")
                time.sleep(1)
            else:
                raise

def download_chunked(self):
    self.chunk_executor = ThreadPoolExecutor(max_workers=self.config['threads_per_file'])
    chunks = self.calculate_chunks()
    os.makedirs(self.temp_dir, exist_ok=True)

    try:
        futures = []
        for i, (start, end) in enumerate(chunks):
            future = self.chunk_executor.submit(
                self.download_chunk,
                chunk_id=i,
                start=start,
                end=end
            )
            futures.append(future)

        for future in as_completed(futures):
            if self.stop_event.is_set():
                self.chunk_executor.shutdown(wait=False)
                raise Exception("下载被终止")
            future.result()

        self.merge_chunks()
    finally:
        self.chunk_executor.shutdown(wait=True)

def calculate_chunks(self):
    chunks = []
    current = 0
    while current < self.total_size:
        end = min(current + self.config['chunk_size'] - 1, self.total_size - 1)
        chunks.append((current, end))
        current = end + 1
    return chunks

def download_chunk(self, chunk_id, start, end):
    chunk_file = os.path.join(self.temp_dir, f"chunk_{chunk_id}")
    downloaded = os.path.getsize(chunk_file) if os.path.exists(chunk_file) else 0

    headers = {'Range': f'bytes={start + downloaded}-{end}'}
    chunk_downloaded = 0

    for retry in range(self.config['max_retries'] + 1):
        if self.stop_event.is_set():
            return

        try:
            with requests.get(self.url, headers=headers, stream=True) as r:
                r.raise_for_status()
                mode = 'ab' if downloaded else 'wb'
                with open(chunk_file, mode) as f:
                    for chunk in r.iter_content(chunk_size=8192):
                        if self.stop_event.is_set():
                            return
                        if chunk:
                            f.write(chunk)
                            chunk_downloaded += len(chunk)
                            self.downloaded += len(chunk)
                            self.update_progress()
                return
        except RequestException as e:
            if retry < self.config['max_retries']:
                time.sleep(1)
            else:
                self.downloaded -= chunk_downloaded
                raise

def merge_chunks(self):
    with open(self.filepath, 'wb') as f:
        for chunk_file in sorted(os.listdir(self.temp_dir), key=lambda x: int(x.split('_')[1])):
            with open(os.path.join(self.temp_dir, chunk_file), 'rb') as cf:
                f.write(cf.read())
    self.cleanup()

def save_content(self, response, path=None, mode='wb'):
    path = path or self.filepath
    with open(path, mode) as f:
        for chunk in response.iter_content(chunk_size=8192):
            if self.stop_event.is_set():
                return
            if chunk:
                f.write(chunk)
                self.downloaded += len(chunk)
                self.update_progress()

def update_progress(self, status="下载中"):
    current_time = time.time()
    if current_time - self.last_update_time < self.update_interval:
        return

    self.last_update_time = current_time
    progress = (self.downloaded / self.total_size) * 100 if self.total_size > 0 else 0
    self.queue.put({
        'type': 'progress',
        'tasks': [{
            'id': self.task_id,
            'filename': self.filename,
            'progress': progress,
            'status': status,
            'downloaded': self.downloaded,
            'total_size': self.total_size
        }]
    })

def log(self, message):
    self.queue.put({'type': 'log', 'message': f"{self.filename}: {message}"})

def reset_state(self):
    self.downloaded = 0
    self.cleanup()
    if os.path.exists(self.filepath):
        os.remove(self.filepath)

def cleanup(self):
    try:
        if os.path.exists(self.temp_dir):
            for f in os.listdir(self.temp_dir):
                file_path = os.path.join(self.temp_dir, f)
                for _ in range(3):
                    try:
                        os.remove(file_path)
                        break
                    except PermissionError:
                        time.sleep(0.1)
            os.rmdir(self.temp_dir)
    except Exception as e:
        self.log(f"清理临时文件失败: {str(e)}")

if __name__ == "__main__":
    app = DownloaderApp()
    app.mainloop()

你可以将该示例代码粘贴进创建的py文件中并保存


步骤3—启动文件

(确认你已安装requests库)
(以downloader.py为例)

  • 在该文件目录的索引输入 cmd 并回车(或者在cmd中cd到该目录)
  • 在cmd中键入:python downloader.py

如果窗口正常显示,则证明启动成功


使用方法

  • 创建一个txt文件,并将要下载的(批量)文件链接键入
  • 比如键入:
      https://www.website.com/testfile_1.file
      https://www.website.com/testfile_2.file
      https://www.website.com/testfile_3.file
      https://www.website.com/testfile_4.file
      ......
    
  • 在UI窗口里选中该文件(输入下载需求)
  • 在UI窗口里选择输出目录
  • 在根据需求自定义选项

文章到此结束,你学会了吗?(bushi)