五分钟掌握 FastAPI 文件上传

3/8/2025

一、开始之前我们思考一下文件上传需要哪些知识?

  • 前端文件读取
  • 传输方式
  • 后端文件写入
  • 限制:
    • 文件类型(前后端限制)
    • 文件大小(文件)
    • 限制错误错误
  • 高阶
    • 文件分片上传
  • fastapi 中的两种文件上传方式:bytes File 和 UploadFile
  • 测试
    • fastapi docs 测试
    • postman 等软件接口测试
    • 其他...

二、如何开始

  • 使用 poetry + vscode 工具进行开发与测试
  • 使用 postman 进行接口测试

三、如何测试

Postman

image.png

postman 测试优点是,可以系统性的进行测试,保存测试内容,重复利用。

docs 文档

image.png

我们知道 FastAPI 非常方便的地方就是提供 swagger, 在 /docs 目录下就能进行接口测试,可谓非常直观方便。

四、FastAPI 上传文件的两种方式

from fastapi import FastAPI, File, UploadFile

app = FastAPI()


@app.post("/files/")
async def create_file(file: bytes = File()):
    return {"file_size": len(file)}


@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile):
    return {"filename": file.filename}

bytes 专注在文件的二进制内容,UploadFile 包含了文件的源信息。大部分时候 UploadFile 用的更多。

五、File 类

从上面的 create_file 知道 bytes 赋值给了 File, 下面我们看看 File 关系:

  • File 是一个函数, 返回的是 params.File, 下面我们就开始了解这个调用链:

File -> params.File -> File -> Form -> Body -> FieldInfo -> Representation。

image.png

六、UploadFile 类

UploadFile 的调用链:

UploadFile -> StarletteUploadFile 看起来很简单。

image.png

七、文件上传与表单

前端上传文件文件一般都是使用 formdata, 数据编码类型是 multipart/form-data,而不是 json 类型。如果你不是全属性安装 fastapi 那么需要单独的安装 form-data 相关的库:

pip install python-multipart # pip
poetry add python-multipart # poetry

全局安装 fastapi 中包含 python-multipart 库。

image.png

八、上传文件的可选性与默认值

FastAPI 中可选性使用注解表示:

# 非注解 python 3.10+
file: bytes | None = File(default=None)
file: UploadFile | None = None

# Annotated 方式 Python 10+
file: Annotated[bytes | None, File()] = None
file: UploadFile | None = None

第一中方式使用 | 方式更加简明

九、限制文件

限制文件时实际项目中最常用的功能,这里我们讲解

  • 文件大小
  • 文件类型

限制文件大小及错误处理

from fastapi import FastAPI, File, UploadFile, HTTPException

app = FastAPI()

MAX_FILE_SIZE = 1024 * 1024 * 10  # 10MB

@app.post("/upload/")
async def upload_file(file: UploadFile = File(...)):
    content = await file.read()
    if len(content) > MAX_FILE_SIZE:
        raise HTTPException(status_code=400, detail="文件大小超过限制(10MB)")
    # 继续处理文件
    return {"filename": file.filename, "message": "文件上传成功"}

定义文件支持的大小 MAX_FILE_SIZE(一般使用配置文件),然后使用 file.read 方法读取文件 content, 然后获取 len, 最后与 MAX_FILE_SIZE 进行对比,超出就抛出错误。

限制文件类型及错误处理

from fastapi import FastAPI, File, UploadFile, HTTPException

app = FastAPI()

ALLOWED_CONTENT_TYPES = ["image/jpeg", "image/png", "application/pdf"]

@app.post("/upload/")
async def upload_file(file: UploadFile = File(...)):
    if file.content_type not in ALLOWED_CONTENT_TYPES:
        raise HTTPException(status_code=400, detail="不支持的文件类型")
    # 继续处理文件
    return {"filename": file.filename, "message": "文件上传成功"}

定义支持文件类型列表,在文件上传之后,读取 content_type 属性获取文件类型,判断是否符合需求。然后抛出错误。

十、模板形式

如果是全局安装 poetry add fastapi[all] 那么,包里面就包含了 jinja 模板特性,直接用了。

from fastapi import FastAPI, UploadFile
from fastapi.responses import HTMLResponse

app = FastAPI()

@app.post("/file/")
async def create_files(
    file: UploadFile
):
    return file


@app.get("/")
async def main():
    content = """
<body>
<form action="/file/" enctype="multipart/form-data" method="post">
<input name="file" type="file" multiple>
<input type="submit">
</form>
</body>
    """
    return HTMLResponse(content=content)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, port=8000)

上传文件,点击提交,服务器响应数据:

image.png

十一、前后端分离(with antd)

Antd

如果你是前后端分离项目,使用 React+ Antd 那么上传也非常容易:

import React from 'react';
import { UploadOutlined } from '@ant-design/icons';
import type { UploadProps } from 'antd';
import { Button, message, Upload } from 'antd';

const props: UploadProps = {
  name: 'file',
  action: '/upload/file',
  headers: {
    authorization: 'your header author info',
  },
  onChange(info) {
    //
  },
};

const App: React.FC = () => (
  <Upload {...props}>
    <Button icon={<UploadOutlined />}>Click to Upload</Button>
  </Upload>
);

export default App;

ProComponent

上传文件组件更加简单:

<ProFormUploadButton
    name="file"
    label="上传文件"
    placeholder="请输入名称"
    listType="picture-card"
    action="/upload"
  />

十二、多文件上传

多文件上传,有两个地方:

  • 前端表单元素指定:multiple 属性
<input name="files" type="file" multiple>
  • 后端使用 list 注解:
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import HTMLResponse

app = FastAPI()


@app.post("/files/")
async def create_files(files: list[bytes] = File()):
    return {"file_sizes": [len(file) for file in files]}


@app.post("/uploadfiles/")
async def create_upload_files(files: list[UploadFile]):
    return {"filenames": [file.filename for file in files]}

十三、额外 meta 信息

有时候我们需要给 file 字段传递一些额外的信息,

file: UploadFile = File(description="A file read as UploadFile")

@app.post("/file/meta")
async def create_file_with_meta(file: bytes = File(description="A file read as bytes")):
    return {"file_size": len(file)}

它直观表现在:swagger-ui 上,我们看卡一个

image.png

十四、文件保存到本地

from fastapi import FastAPI, File, UploadFile
import os

app = FastAPI()

# 定义一个保存文件的路径
UPLOAD_DIR = "uploads"
if not os.path.exists(UPLOAD_DIR):
    os.makedirs(UPLOAD_DIR)

@app.post("/uploadfile/")
async def upload_file(file: UploadFile = File(...)):
    # 获取文件保存路径
    file_location = os.path.join(UPLOAD_DIR, file.filename)

    # 使用原生 FastAPI 方法保存文件
    with open(file_location, "wb") as f:
        # 使用 file.file 提供的类似文件对象的接口直接写入
        content = await file.read()  # 读取上传的文件内容
        f.write(content)  # 写入文件到本地

    return {"filename": file.filename, "file_location": file_location}

首先,我们需要在异步的情况下处理上传。然后指定一个文件保存的路径然后 file_location,使用 file 的 read 文件内容。然后使用 open + with 语法指定的别名写入即可。

十五、文件切片

这里我们也实现一个简单的分片上传合并示例:

from fastapi import FastAPI, File, Form, UploadFile
from fastapi.responses import HTMLResponse, JSONResponse
import os

app = FastAPI()

# 临时保存分片的目录
UPLOAD_DIR = "uploads"
if not os.path.exists(UPLOAD_DIR):
    os.makedirs(UPLOAD_DIR)

@app.get("/", response_class=HTMLResponse)
async def main():
    return """
    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1.0">
        <title>FastAPI 文件分片上传</title>
    </head>
    <body>
        <h1>文件分片上传</h1>
        <form id="uploadForm">
            <input type="file" id="fileInput" name="file" required><br><br>
            <button type="submit">开始上传</button>
        </form>

        <script>
        document.getElementById('uploadForm').addEventListener('submit', function(event) {
            event.preventDefault();

            const file = document.getElementById('fileInput').files[0];
            const chunkSize = 1 * 1024 * 1024;  // 1MB per chunk
            const totalChunks = Math.ceil(file.size / chunkSize);
            const filename = file.name;

            let currentChunk = 0;

            function uploadChunk() {
                const chunkStart = currentChunk * chunkSize;
                const chunkEnd = Math.min(chunkStart + chunkSize, file.size);
                const chunk = file.slice(chunkStart, chunkEnd);

                const formData = new FormData();
                formData.append("chunk", chunk);
                formData.append("chunk_index", currentChunk);
                formData.append("total_chunks", totalChunks);
                formData.append("filename", filename);

                fetch("/upload_chunk/", {
                    method: "POST",
                    body: formData
                })
                .then(response => response.json())
                .then(data => {
                    console.log(data.message);
                    currentChunk++;
                    if (currentChunk < totalChunks) {
                        uploadChunk(); // 递归上传下一个分片
                    } else {
                        console.log("所有分片上传完成!");
                    }
                })
                .catch(error => console.error("上传失败:", error));
            }

            uploadChunk(); // 开始上传第一个分片
        });
        </script>
    </body>
    </html>
    """

@app.post("/upload_chunk/")
async def upload_chunk(
    chunk: UploadFile = File(...),
    chunk_index: int = Form(...),  # 从表单数据中获取
    total_chunks: int = Form(...),  # 从表单数据中获取
    filename: str = Form(...),  # 从表单数据中获取
):
    print("xxx", chunk_index, total_chunks)
    # 临时保存每个分片
    chunk_path = os.path.join(UPLOAD_DIR, f"{filename}_part_{chunk_index}")
    with open(chunk_path, "wb") as f:
        f.write(await chunk.read())

    # 如果所有分片上传完毕,进行合并
    if chunk_index == total_chunks - 1:
        full_file_path = os.path.join(UPLOAD_DIR, filename)
        with open(full_file_path, "wb") as full_file:
            for i in range(total_chunks):
                part_path = os.path.join(UPLOAD_DIR, f"{filename}_part_{i}")
                with open(part_path, "rb") as part_file:
                    full_file.write(part_file.read())
                os.remove(part_path)  # 上传完分片后删除临时文件

        print("message : 文件上传成功!")
        return JSONResponse(content={"message": f"文件 {filename} 上传成功!"})

    print("i", chunk_index, total_chunks, "message : 分片上传成功,等待更多分片...")
    return JSONResponse(content={"message": "分片上传成功,等待更多分片..."})

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)

这里是上传一个视频文件,前端拿到 file 后,计算 1M 大小分片的总 chunk 数量,然后使用 file.slice 对数据进行 chunk 分割。前端传递给后端的数据中,添加一个 chunk 相关的信息给后端,后端使用这些 chunk 信息进行保存。当前端传递最后一个 chunk 之后我们进行组合。

当然这样只是一个简单实现,没有考虑一些复杂的情况。

十六、小结

文本试较为全面讲解 FastAPI 的文件上传,整体 FastAPI 对接文件上传的封装较为简单,FastAPI 支持两种模式上传:byte 二进制式,UploadFile 模式。以及 FastAPI 对 File 类型和 Upload 的封装。有这些基础知识对文件上传和存储以及业务上的功能对接十分清晰了。