PythonLearn

用Python实现并发文件下载器

完整代码在最下面
注意: 是单个文件并发下载,不是多文件并发下载

必要知识

  1. get请求的请求头中包含Range字段,这个字段可以指定从哪个字节开始下载,比如Range: bytes=0-1023表示从第0字节开始下载1024字节
  2. head请求可以获取文件大小,但也可能不包含大小

项目设计

注意: 本次设计不包含错误捕捉机制,仅供学习
我们最终希望的效果是能够按照下面代码使用

async with Downloader(url) as downloader:
    size = await downloader.get_size()
    async for start, end, content in downloader.start():
        ...

为此我们只需要设计一个下载器类Downloader即可
start方法会创建多个子任务,每个任务都会下载文件的一部分,通过队列传给父任务,父任务再向上返回,最后数据由上面处理

安装模块

最重要的是httpx 可以通过pip install httpx安装
其次还有python内置模块asyncio

编写代码

import asyncio
from typing import Optional
from httpx import AsyncClient

class Downloader:
    def __init__(
            self,
            url,
            max_workers=10,
            block_size=1024 * 1024,
            client: Optional[AsyncClient] = None,
            loop: Optional[asyncio.AbstractEventLoop] = None,
):
        self.url = url
        self.client = client or AsyncClient()
        self.loop = loop or asyncio.get_event_loop()
        self.max_workers = max_workers
        self.block_size = block_size
        self._file_size = 0  # 文件大小,0代表未获取,-1代表无法获取

    async def start(self):
        """获取大小,计算每个任务的下载范围,创建任务,并等待队列结果"""
        size = await self.get_size()
        queue = asyncio.Queue()
        tasks = []
        for i in range(self.max_workers):
            start = size // self.max_workers * i
            end = size // self.max_workers * (i + 1)
            task = self.loop.create_task(self._get_range(start, end, queue))
            tasks.append(task)
        while not all(task.done() for task in tasks):
            size, end, content = await queue.get()
            yield size, end, content

    async def get_size(self) -> int:
        """获取文件大小,如果已经获取过,则直接返回"""
        if self._file_size == 0:
            response = await self.client.head(self.url)
            self._file_size = int(response.headers.get("Content-Length", -1))
        elif self._file_size < 0:
            raise Exception("无法获取文件大小")
        return self._file_size

    async def _get_range(self, start, end, queue: asyncio.Queue):
        """获取指定范围的数据,并写入队列"""
        size = 0
        async with self.client.stream("GET", self.url, headers={"Range": f"bytes={start}-{end}"}) as response:
            async for chunk in response.aiter_bytes(self.block_size):
                await queue.put((size, end, chunk))
                size += len(chunk)

    async def __aenter__(self):
        await self.client.__aenter__()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.client.__aexit__(exc_type, exc_val, exc_tb)

测试

本地IIS开个静态文件服务器,下载个文件试试

async def main():
    url = "http://127.0.0.1/LocalSend.exe"
    with open("LocalSend.exe", "wb") as f:
        async with Downloader(url) as downloader:
            size = await downloader.get_size()
            download_size = 0
            async for start, end, content in downloader.start():
                f.seek(start)
                f.write(content)
                download_size += len(content)
                print(f"{download_size // 1024}kb/{size//1024}kb")

asyncio.run(main())

输出如下

1024kb/12092kb
2048kb/12092kb
...
11906kb/12092kb
12092kb/12092kb