Python 后端岗位技能要求完全指南

本文基于真实招聘需求,系统梳理 Python 后端开发岗位的技能要求,从基础到进阶再到高阶,帮助学习者有针对性地提升技能,快速达到岗位要求。


岗位职责汇总表

根据市场招聘需求,Python 后端开发岗位的主要职责如下:

职责类别 具体职责 出现频率
代码开发与维护 负责现有后端代码的维护与扩展,二次开发新功能及接口 ⭐⭐⭐⭐⭐
负责后端功能的开发和迭代 ⭐⭐⭐⭐⭐
负责核心代码编写,负责技术和产品落地 ⭐⭐⭐⭐
后端代码结构优化与重构 ⭐⭐⭐⭐
对已有业务的代码性能和质量进行改进、抽象和重构 ⭐⭐⭐⭐
系统设计与架构 参与后端系统设计与开发,主要涉及 RESTful API、gRPC 等服务接口的构建 ⭐⭐⭐⭐⭐
深入发掘和分析业务需求,撰写技术方案和系统设计 ⭐⭐⭐⭐
参与技术选型和架构设计,解决复杂的技术挑战 ⭐⭐⭐
持续对系统架构进行改造和优化 ⭐⭐⭐
框架与接口开发 使用 Python 框架(如 Flask、FastAPI、Django 等)构建高并发、高可靠性的后端服务 ⭐⭐⭐⭐⭐
能够基于框架开发 RESTful API 和服务 ⭐⭐⭐⭐⭐
搭建链节点交互 API(Flask / FastAPI)支持本地调试与多节点模拟 ⭐⭐
数据库相关 进行数据库设计与优化,熟悉 PostgreSQL、MySQL、Redis、MongoDB 等各类数据库应用 ⭐⭐⭐⭐⭐
设计和实现数据库,确保数据的安全和可靠性 ⭐⭐⭐⭐
掌握 SQL 查询优化及数据库设计原则 ⭐⭐⭐⭐
测试与质量保证 编写测试用例及自动化测试,确保系统质量 ⭐⭐⭐⭐
参与持续集成/持续部署流程 ⭐⭐⭐⭐
熟悉 pytest 等测试框架,具备调试代码和性能优化的能力 ⭐⭐⭐
性能优化 负责后端性能瓶颈分析与优化 ⭐⭐⭐⭐
持续完善并维护产品服务,不断提升服务性能和稳定性 ⭐⭐⭐⭐
对于接口的性能可以进行优化 ⭐⭐⭐⭐
业务协作 参与业务需求分析和产品设计,在初期即为产品注入强劲的技术驱动力 ⭐⭐⭐⭐
实现业务系统的后端服务,并和产品经理紧密配合,确保系统按时上线并且功能完备 ⭐⭐⭐⭐
与团队协作完成从需求分析到产品上线的全流程开发 ⭐⭐⭐
与前端开发团队合作,确保 API 集成到 Web 和移动应用程序中 ⭐⭐⭐
系统运维 负责企业内部系统的运维 ⭐⭐⭐
管理和维护生产系统,以确保可用性和可靠性 ⭐⭐⭐
了解服务的部署和运维;常用的 Linux 操作命令,docker 服务部署等 ⭐⭐⭐
配合容器化部署、多节点运维调试,与团队协作推动流程落地 ⭐⭐
文档与监控 编写高质量的代码、测试用例和技术文档 ⭐⭐⭐
实现和管理日志记录和监控,以便有效地监视和解决问题 ⭐⭐⭐
优化共识安全与性能,撰写相关技术和接口文档 ⭐⭐
特殊领域开发 参与大模型应用后台相关开发 ⭐⭐⭐
负责大模型应用类座舱软件功能设计及开发工作 ⭐⭐
负责智能座舱软件、座舱AI引擎的软件功能设计及开发工作 ⭐⭐
参与区块链相关项目的智能合约开发与优化 ⭐⭐
基于 Python 构建脚本驱动账本系统,设计区块、交易流程、共识模块(PoW/PoS)
负责多格式文档(PDF/扫描件/图片)的信息抽取系统设计、开发与维护 ⭐⭐
负责数据处理、分析与可视化系统的开发工作 ⭐⭐⭐
负责数据智能、自动化等后端代码设计、开发和维护工作 ⭐⭐⭐
负责基于 python 语言进行数据分析 ⭐⭐
项目管理 独立负责 ERP 系统某一业务模块的软件开发和后期维护工作,保证系统的稳定性 ⭐⭐
主导生态、域控、仪表等自动化测试开发
主导自动化测试实施结果分析和定位
自动化工具需求、问题管理

说明

  • ⭐⭐⭐⭐⭐:出现频率极高,几乎所有岗位都要求
  • ⭐⭐⭐⭐:出现频率高,大部分岗位要求
  • ⭐⭐⭐:出现频率中等,部分岗位要求
  • ⭐⭐:出现频率较低,特定岗位要求
  • ⭐:出现频率很低,特殊领域岗位要求

目录


一、基本技能要求

基本技能是 Python 后端开发的基石,几乎所有岗位都会要求掌握这些技能。

1.1 Python 语言基础

岗位要求

  • 精通 Python 语言,掌握 Python 基础语法
  • 理解 Python 的特性,如动态类型、装饰器、生成器等
  • 熟悉标准库的使用
  • 具备良好的编程习惯

核心知识点

# 1. 基础语法和数据类型
name = "Python"
age = 30
is_active = True
scores = [85, 90, 88]

# 2. 函数定义和使用
def calculate_average(scores):
    """计算平均分"""
    return sum(scores) / len(scores) if scores else 0

# 3. 面向对象编程
class User:
    def __init__(self, name, age):
        self.name = name
        self.age = age

    def introduce(self):
        return f"我是{self.name},今年{self.age}岁"

# 4. 装饰器
def log_time(func):
    def wrapper(*args, **kwargs):
        import time
        start = time.time()
        result = func(*args, **kwargs)
        print(f"{func.__name__} 执行时间: {time.time() - start:.2f}秒")
        return result
    return wrapper

@log_time
def slow_function():
    import time
    time.sleep(1)
    return "完成"

# 5. 生成器和迭代器
def fibonacci(n):
    """生成斐波那契数列"""
    a, b = 0, 1
    for _ in range(n):
        yield a
        a, b = b, a + b

# 6. 异常处理
def safe_divide(a, b):
    try:
        return a / b
    except ZeroDivisionError:
        return "除数不能为0"
    except TypeError:
        return "参数类型错误"

学习建议

  • 掌握 Python 基础语法(参考 page6.md)
  • 熟悉常用内置函数和标准库
  • 理解 Python 的面向对象特性
  • 练习编写规范的代码(遵循 PEP 8)

练习题

  1. 编写一个函数,实现列表去重并保持原有顺序
  2. 使用装饰器实现函数执行时间统计
  3. 实现一个简单的上下文管理器
点击查看答案 ```python # 1. 列表去重 def unique_list(lst): seen = set() result = [] for item in lst: if item not in seen: seen.add(item) result.append(item) return result # 2. 执行时间统计装饰器 import time from functools import wraps def timing_decorator(func): @wraps(func) def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) end = time.time() print(f"{func.__name__} 耗时: {end - start:.4f}秒") return result return wrapper # 3. 上下文管理器 class FileManager: def __init__(self, filename, mode): self.filename = filename self.mode = mode self.file = None def __enter__(self): self.file = open(self.filename, self.mode) return self.file def __exit__(self, exc_type, exc_val, exc_tb): if self.file: self.file.close() return False ```

1.2 Web 框架(Django/Flask/FastAPI)

岗位要求

  • 掌握至少一个 Python Web 框架(Django/Flask/FastAPI)
  • 能够基于框架开发 RESTful API 和服务
  • 熟悉 ORM 框架的使用
  • 了解框架的中间件机制

Django 基础示例

# settings.py
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'rest_framework',
    'myapp',
]

# models.py
from django.db import models

class User(models.Model):
    name = models.CharField(max_length=100)
    email = models.EmailField(unique=True)
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        db_table = 'users'

# views.py
from rest_framework import viewsets
from rest_framework.response import Response
from .models import User
from .serializers import UserSerializer

class UserViewSet(viewsets.ModelViewSet):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    def list(self, request):
        users = self.get_queryset()
        serializer = self.get_serializer(users, many=True)
        return Response(serializer.data)

Flask 基础示例

from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)

@app.route('/api/users', methods=['GET'])
def get_users():
    users = User.query.all()
    return jsonify([{'id': u.id, 'name': u.name, 'email': u.email} 
                    for u in users])

@app.route('/api/users', methods=['POST'])
def create_user():
    data = request.json
    user = User(name=data['name'], email=data['email'])
    db.session.add(user)
    db.session.commit()
    return jsonify({'id': user.id, 'name': user.name}), 201

FastAPI 基础示例

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List

app = FastAPI()

class UserCreate(BaseModel):
    name: str
    email: str

class UserResponse(BaseModel):
    id: int
    name: str
    email: str

users_db = []
next_id = 1

@app.get("/api/users", response_model=List[UserResponse])
async def get_users():
    return users_db

@app.post("/api/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
    global next_id
    new_user = UserResponse(id=next_id, **user.dict())
    users_db.append(new_user)
    next_id += 1
    return new_user

学习建议

  • Django:适合大型项目,功能全面,学习曲线较陡
  • Flask:轻量级,灵活,适合中小型项目
  • FastAPI:现代框架,支持异步,自动生成 API 文档

练习题

  1. 使用任意框架创建一个简单的用户管理 API(增删改查)
  2. 实现用户认证中间件
  3. 添加 API 限流功能

1.3 数据库技术

岗位要求

  • 熟悉关系型数据库(MySQL/PostgreSQL)
  • 了解非关系型数据库(Redis/MongoDB)
  • 掌握 SQL 语言和数据库设计原则
  • 熟悉 ORM 框架(Django ORM、SQLAlchemy)
  • 具备数据库优化能力

MySQL 基础

# 使用 pymysql
import pymysql

conn = pymysql.connect(
    host='localhost',
    user='root',
    password='password',
    database='testdb',
    charset='utf8mb4'
)

cursor = conn.cursor()

# 查询
cursor.execute("SELECT * FROM users WHERE age > %s", (18,))
results = cursor.fetchall()

# 插入
cursor.execute(
    "INSERT INTO users (name, email) VALUES (%s, %s)",
    ('张三', 'zhangsan@example.com')
)
conn.commit()

cursor.close()
conn.close()

PostgreSQL 示例

# 使用 psycopg2
import psycopg2

conn = psycopg2.connect(
    host="localhost",
    database="mydb",
    user="postgres",
    password="password"
)

cur = conn.cursor()
cur.execute("SELECT * FROM users")
rows = cur.fetchall()

Redis 缓存示例

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# 字符串操作
r.set('user:1:name', '张三')
name = r.get('user:1:name')

# 哈希操作
r.hset('user:1', 'name', '张三')
r.hset('user:1', 'age', 25)
user = r.hgetall('user:1')

# 列表操作
r.lpush('tasks', 'task1', 'task2')
task = r.rpop('tasks')

# 设置过期时间
r.setex('session:123', 3600, 'session_data')

MongoDB 示例

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']
collection = db['users']

# 插入
user = {'name': '张三', 'age': 25, 'email': 'zhangsan@example.com'}
collection.insert_one(user)

# 查询
users = collection.find({'age': {'$gt': 18}})
for user in users:
    print(user)

# 更新
collection.update_one(
    {'name': '张三'},
    {'$set': {'age': 26}}
)

SQL 优化技巧

-- 1. 使用索引
CREATE INDEX idx_email ON users(email);

-- 2. 避免 SELECT *
SELECT id, name, email FROM users;  -- 好
SELECT * FROM users;  -- 不好

-- 3. 使用 LIMIT
SELECT * FROM users LIMIT 10;

-- 4. 避免子查询,使用 JOIN
-- 不好
SELECT * FROM orders WHERE user_id IN (SELECT id FROM users WHERE age > 18);

-- 好
SELECT o.* FROM orders o 
JOIN users u ON o.user_id = u.id 
WHERE u.age > 18;

学习建议

  • 掌握 SQL 基础语法(SELECT、INSERT、UPDATE、DELETE)
  • 理解数据库索引原理和优化方法
  • 学习事务处理和 ACID 特性
  • 了解数据库设计范式

练习题

  1. 设计一个博客系统的数据库表结构(用户、文章、评论)
  2. 编写 SQL 查询:找出最近一周发布的文章及其作者
  3. 使用 Redis 实现一个简单的缓存装饰器

1.4 RESTful API 设计

岗位要求

  • 具有 RESTful API 设计与开发经验
  • 了解常见的接口安全和性能优化策略
  • 熟悉 HTTP 协议和状态码
  • 能够设计合理的 API 接口

RESTful API 设计原则

# 1. 使用正确的 HTTP 方法
# GET    - 获取资源
# POST   - 创建资源
# PUT    - 更新资源(完整)
# PATCH  - 更新资源(部分)
# DELETE - 删除资源

# 2. 使用合理的 URL 设计
# 好的设计
GET    /api/users          # 获取用户列表
GET    /api/users/1         # 获取单个用户
POST   /api/users           # 创建用户
PUT    /api/users/1         # 更新用户
DELETE /api/users/1         # 删除用户

# 不好的设计
GET    /api/getUsers
POST   /api/createUser
POST   /api/updateUser
POST   /api/deleteUser

# 3. 使用标准 HTTP 状态码
# 200 OK - 成功
# 201 Created - 创建成功
# 400 Bad Request - 请求错误
# 401 Unauthorized - 未授权
# 403 Forbidden - 禁止访问
# 404 Not Found - 资源不存在
# 500 Internal Server Error - 服务器错误

FastAPI RESTful API 示例

from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
from typing import List, Optional

app = FastAPI()

class UserCreate(BaseModel):
    name: str
    email: str

class UserUpdate(BaseModel):
    name: Optional[str] = None
    email: Optional[str] = None

class UserResponse(BaseModel):
    id: int
    name: str
    email: str

users_db = {}
next_id = 1

@app.get("/api/users", response_model=List[UserResponse])
async def list_users(skip: int = 0, limit: int = 10):
    """获取用户列表"""
    return list(users_db.values())[skip:skip+limit]

@app.get("/api/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    """获取单个用户"""
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="用户不存在")
    return users_db[user_id]

@app.post("/api/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
    """创建用户"""
    global next_id
    new_user = UserResponse(id=next_id, **user.dict())
    users_db[next_id] = new_user
    next_id += 1
    return new_user

@app.put("/api/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user: UserUpdate):
    """更新用户"""
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="用户不存在")

    existing_user = users_db[user_id]
    update_data = user.dict(exclude_unset=True)
    updated_user = existing_user.copy(update=update_data)
    users_db[user_id] = updated_user
    return updated_user

@app.delete("/api/users/{user_id}", status_code=204)
async def delete_user(user_id: int):
    """删除用户"""
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="用户不存在")
    del users_db[user_id]
    return None

API 文档和版本控制

# 版本控制
@app.get("/api/v1/users")
async def get_users_v1():
    pass

@app.get("/api/v2/users")
async def get_users_v2():
    pass

# 统一响应格式
class APIResponse(BaseModel):
    code: int = 200
    message: str = "success"
    data: Optional[dict] = None

@app.get("/api/users")
async def get_users():
    return APIResponse(
        code=200,
        message="获取成功",
        data={"users": [...]}
    )

学习建议

  • 理解 REST 架构风格和设计原则
  • 掌握 HTTP 协议和状态码
  • 学习 API 版本控制策略
  • 了解 API 文档生成工具(Swagger、OpenAPI)

练习题

  1. 设计一个任务管理系统的 RESTful API
  2. 实现 API 请求参数验证
  3. 添加 API 响应缓存机制

1.5 Linux 基础

岗位要求

  • 熟悉 Linux 操作系统
  • 能够使用 Shell 脚本进行日常运维操作
  • 了解基本的服务器配置和部署
  • 能够在 Linux 上调试和排查问题

常用 Linux 命令

# 文件操作
ls -la                    # 列出文件(详细信息)
cd /path/to/dir          # 切换目录
pwd                      # 显示当前目录
mkdir -p dir1/dir2       # 创建目录
rm -rf dir               # 删除目录
cp source dest           # 复制文件
mv source dest           # 移动/重命名文件
cat file.txt             # 查看文件内容
head -n 10 file.txt      # 查看前10行
tail -f file.log         # 实时查看日志

# 文件权限
chmod 755 script.sh      # 修改文件权限
chown user:group file    # 修改文件所有者

# 进程管理
ps aux                   # 查看所有进程
ps aux | grep python     # 查找 Python 进程
kill -9 PID              # 强制杀死进程
top                      # 实时查看进程
htop                     # 更友好的进程查看工具

# 网络相关
netstat -tuln            # 查看端口占用
lsof -i :8000            # 查看8000端口占用
curl http://example.com  # 发送 HTTP 请求
wget http://example.com  # 下载文件

# 系统信息
df -h                    # 查看磁盘使用情况
free -h                  # 查看内存使用情况
uname -a                 # 查看系统信息

Shell 脚本示例

#!/bin/bash
# 部署脚本示例

# 变量定义
PROJECT_DIR="/var/www/myapp"
BACKUP_DIR="/backup"
DATE=$(date +%Y%m%d_%H%M%S)

# 备份当前版本
echo "备份当前版本..."
tar -czf "$BACKUP_DIR/backup_$DATE.tar.gz" "$PROJECT_DIR"

# 停止服务
echo "停止服务..."
systemctl stop myapp

# 更新代码
echo "更新代码..."
cd "$PROJECT_DIR"
git pull origin main

# 安装依赖
echo "安装依赖..."
source venv/bin/activate
pip install -r requirements.txt

# 数据库迁移
python manage.py migrate

# 重启服务
echo "重启服务..."
systemctl start myapp

echo "部署完成!"

Python 与 Linux 交互

import subprocess
import os

# 执行 Shell 命令
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(result.stdout)

# 检查文件是否存在
if os.path.exists('/path/to/file'):
    print("文件存在")

# 获取环境变量
python_path = os.environ.get('PYTHONPATH', '/usr/bin/python')

# 执行系统命令并获取输出
def run_command(cmd):
    process = subprocess.Popen(
        cmd,
        shell=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    stdout, stderr = process.communicate()
    return stdout.decode('utf-8'), stderr.decode('utf-8')

output, error = run_command('ps aux | grep python')
print(output)

学习建议

  • 掌握常用 Linux 命令
  • 学习 Shell 脚本编写
  • 了解 Linux 文件系统结构
  • 熟悉进程管理和服务管理

练习题

  1. 编写一个 Shell 脚本,自动备份数据库
  2. 使用 Python 编写一个简单的系统监控脚本
  3. 实现一个日志轮转脚本

1.6 Git 版本控制

岗位要求

  • 了解版本控制系统(Git)
  • 熟悉 Git 基本操作(clone、commit、push、pull)
  • 理解分支管理策略
  • 能够解决代码冲突

Git 基础命令

# 初始化仓库
git init
git clone https://github.com/user/repo.git

# 基本操作
git status                  # 查看状态
git add file.py             # 添加到暂存区
git add .                   # 添加所有文件
git commit -m "提交信息"     # 提交
git push origin main         # 推送到远程
git pull origin main        # 拉取远程更新

# 分支操作
git branch                  # 查看分支
git branch feature/new      # 创建分支
git checkout feature/new    # 切换分支
git checkout -b feature/new # 创建并切换分支
git merge feature/new       # 合并分支
git branch -d feature/new   # 删除分支

# 查看历史
git log                     # 查看提交历史
git log --oneline           # 简洁历史
git log --graph --all       # 图形化历史
git diff                    # 查看差异
git show commit_id          # 查看某次提交

# 撤销操作
git reset --soft HEAD~1     # 撤销提交,保留更改
git reset --hard HEAD~1     # 撤销提交,丢弃更改
git checkout -- file.py     # 撤销工作区更改
git revert commit_id         # 创建新提交来撤销

Git 工作流示例

# 1. 创建功能分支
git checkout -b feature/user-auth

# 2. 开发并提交
git add .
git commit -m "添加用户认证功能"

# 3. 推送分支
git push origin feature/user-auth

# 4. 创建 Pull Request(在 GitHub/GitLab 上)

# 5. 合并后删除分支
git checkout main
git pull origin main
git branch -d feature/user-auth

Python 中使用 Git

import subprocess

def git_command(cmd):
    """执行 Git 命令"""
    result = subprocess.run(
        ['git'] + cmd.split(),
        capture_output=True,
        text=True
    )
    return result.stdout.strip()

# 获取当前分支
current_branch = git_command('branch --show-current')
print(f"当前分支: {current_branch}")

# 获取最新提交信息
last_commit = git_command('log -1 --pretty=format:"%h - %an, %ar : %s"')
print(f"最新提交: {last_commit}")

# 检查是否有未提交的更改
status = git_command('status --porcelain')
if status:
    print("有未提交的更改")
else:
    print("工作区干净")

学习建议

  • 掌握 Git 基本命令和工作流
  • 理解分支管理策略(Git Flow、GitHub Flow)
  • 学习解决代码冲突
  • 了解 Git Hooks 的使用

练习题

  1. 创建一个新分支,开发功能后合并到主分支
  2. 使用 Git 回退到之前的某个提交
  3. 解决一次合并冲突

1.7 数据结构与算法

岗位要求

  • 熟悉常用的数据结构和算法
  • 具备良好的算法基础和数据结构知识
  • 具备良好的问题解决能力
  • 能够进行算法优化

常用数据结构

# 1. 列表(动态数组)
arr = [1, 2, 3, 4, 5]
arr.append(6)          # O(1)
arr.insert(0, 0)       # O(n)
arr.pop()              # O(1)

# 2. 字典(哈希表)
d = {'a': 1, 'b': 2}
d['c'] = 3             # O(1) 平均
value = d.get('a')     # O(1) 平均

# 3. 集合
s = {1, 2, 3}
s.add(4)               # O(1) 平均
4 in s                 # O(1) 平均

# 4. 栈(使用列表实现)
stack = []
stack.append(1)        # 入栈
stack.pop()           # 出栈

# 5. 队列(使用 collections.deque)
from collections import deque
queue = deque()
queue.append(1)       # 入队
queue.popleft()       # 出队

# 6. 堆(使用 heapq)
import heapq
heap = []
heapq.heappush(heap, 3)
heapq.heappush(heap, 1)
heapq.heappush(heap, 2)
min_val = heapq.heappop(heap)  # 1

常用算法

# 1. 排序算法
def quick_sort(arr):
    """快速排序"""
    if len(arr) <= 1:
        return arr
    pivot = arr[len(arr) // 2]
    left = [x for x in arr if x < pivot]
    middle = [x for x in arr if x == pivot]
    right = [x for x in arr if x > pivot]
    return quick_sort(left) + middle + quick_sort(right)

# 2. 二分查找
def binary_search(arr, target):
    """二分查找"""
    left, right = 0, len(arr) - 1
    while left <= right:
        mid = (left + right) // 2
        if arr[mid] == target:
            return mid
        elif arr[mid] < target:
            left = mid + 1
        else:
            right = mid - 1
    return -1

# 3. 深度优先搜索(DFS)
def dfs(graph, start, visited=None):
    """深度优先搜索"""
    if visited is None:
        visited = set()
    visited.add(start)
    print(start)
    for neighbor in graph[start]:
        if neighbor not in visited:
            dfs(graph, neighbor, visited)

# 4. 广度优先搜索(BFS)
from collections import deque

def bfs(graph, start):
    """广度优先搜索"""
    visited = set()
    queue = deque([start])
    visited.add(start)

    while queue:
        node = queue.popleft()
        print(node)
        for neighbor in graph[node]:
            if neighbor not in visited:
                visited.add(neighbor)
                queue.append(neighbor)

# 5. 动态规划示例:斐波那契数列
def fibonacci_dp(n):
    """动态规划计算斐波那契数列"""
    if n <= 1:
        return n
    dp = [0] * (n + 1)
    dp[1] = 1
    for i in range(2, n + 1):
        dp[i] = dp[i-1] + dp[i-2]
    return dp[n]

算法复杂度分析

# O(1) - 常数时间
def get_first(arr):
    return arr[0]

# O(log n) - 对数时间(二分查找)
def binary_search(arr, target):
    # ... 见上面代码

# O(n) - 线性时间
def find_max(arr):
    max_val = arr[0]
    for val in arr:
        if val > max_val:
            max_val = val
    return max_val

# O(n log n) - 线性对数时间(快速排序)
def quick_sort(arr):
    # ... 见上面代码

# O(n²) - 平方时间(冒泡排序)
def bubble_sort(arr):
    n = len(arr)
    for i in range(n):
        for j in range(0, n - i - 1):
            if arr[j] > arr[j + 1]:
                arr[j], arr[j + 1] = arr[j + 1], arr[j]

学习建议

  • 掌握常见数据结构(数组、链表、栈、队列、树、图)
  • 学习基本算法(排序、查找、递归、动态规划)
  • 理解算法复杂度(时间复杂度和空间复杂度)
  • 刷题练习(LeetCode、牛客网)

练习题

  1. 实现一个 LRU 缓存(使用字典和双向链表)
  2. 实现快速排序和归并排序,比较性能
  3. 解决 LeetCode 中等难度的题目(如两数之和、反转链表)
点击查看 LRU 缓存实现 ```python from collections import OrderedDict class LRUCache: def __init__(self, capacity: int): self.cache = OrderedDict() self.capacity = capacity def get(self, key: int) -> int: if key not in self.cache: return -1 # 移动到末尾(最近使用) self.cache.move_to_end(key) return self.cache[key] def put(self, key: int, value: int) -> None: if key in self.cache: # 更新值并移动到末尾 self.cache.move_to_end(key) else: if len(self.cache) >= self.capacity: # 删除最久未使用的(第一个) self.cache.popitem(last=False) self.cache[key] = value ```

二、进阶技能要求

进阶技能是区分初级和中级开发者的关键,掌握这些技能能够胜任更复杂的项目开发。

2.1 异步编程与并发

岗位要求

  • 熟悉多任务并发,异步、协程的使用
  • 对于 HTTP、WebSocket、SSE 等接口有使用经验
  • 能理解不同场景的应用
  • 对于接口的性能可以进行优化

异步编程基础

import asyncio
import aiohttp

# 1. 基本异步函数
async def fetch_data(url):
    """异步获取数据"""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# 2. 并发执行多个任务
async def main():
    urls = [
        'https://api.example.com/data1',
        'https://api.example.com/data2',
        'https://api.example.com/data3'
    ]

    # 并发执行
    tasks = [fetch_data(url) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

# 运行
results = asyncio.run(main())

FastAPI 异步示例

from fastapi import FastAPI
import asyncio

app = FastAPI()

@app.get("/api/data")
async def get_data():
    """异步接口"""
    # 模拟异步 I/O 操作
    await asyncio.sleep(1)
    return {"data": "异步数据"}

@app.get("/api/batch")
async def batch_process(items: list):
    """批量处理"""
    async def process_item(item):
        await asyncio.sleep(0.1)  # 模拟处理
        return f"processed_{item}"

    tasks = [process_item(item) for item in items]
    results = await asyncio.gather(*tasks)
    return results

WebSocket 示例

from fastapi import FastAPI, WebSocket
import json

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # 接收消息
            data = await websocket.receive_text()
            message = json.loads(data)

            # 处理消息
            response = {
                "type": "response",
                "message": f"收到: {message.get('text', '')}"
            }

            # 发送响应
            await websocket.send_text(json.dumps(response))
    except Exception as e:
        print(f"WebSocket 错误: {e}")
    finally:
        await websocket.close()

SSE (Server-Sent Events) 示例

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

@app.get("/api/stream")
async def stream_data():
    """SSE 流式数据"""
    async def generate():
        for i in range(10):
            data = {
                "id": i,
                "message": f"消息 {i}",
                "timestamp": asyncio.get_event_loop().time()
            }
            yield f"data: {json.dumps(data)}\n\n"
            await asyncio.sleep(1)

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

协程和生成器

import asyncio

# 协程函数
async def producer(queue):
    """生产者"""
    for i in range(5):
        await queue.put(f"item_{i}")
        await asyncio.sleep(0.5)
    await queue.put(None)  # 结束标志

async def consumer(queue):
    """消费者"""
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"处理: {item}")
        await asyncio.sleep(0.2)

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(
        producer(queue),
        consumer(queue)
    )

asyncio.run(main())

学习建议

  • 理解异步编程的概念和优势
  • 掌握 asyncio 库的使用
  • 学习异步 I/O 操作(数据库、HTTP 请求)
  • 了解事件循环机制

练习题

  1. 实现一个异步爬虫,并发抓取多个网页
  2. 使用 WebSocket 实现一个简单的聊天室
  3. 实现一个异步任务队列系统

2.2 接口鉴权与安全

岗位要求

  • 熟悉基础的接口鉴权功能:Token 认证、OAuth2 授权、签名验证等
  • 了解常见的接口安全和性能优化策略
  • 具备数据安全相关经验

Token 认证(JWT)

import jwt
from datetime import datetime, timedelta
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

app = FastAPI()
security = HTTPBearer()

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

def create_access_token(data: dict, expires_delta: timedelta = None):
    """创建 JWT Token"""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(hours=1)

    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """验证 Token"""
    try:
        token = credentials.credentials
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token 已过期"
        )
    except jwt.InvalidTokenError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的 Token"
        )

@app.post("/api/login")
async def login(username: str, password: str):
    """登录接口"""
    # 验证用户名密码(示例)
    if username == "admin" and password == "123456":
        access_token = create_access_token(
            data={"sub": username},
            expires_delta=timedelta(hours=24)
        )
        return {"access_token": access_token, "token_type": "bearer"}
    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="用户名或密码错误"
    )

@app.get("/api/protected")
async def protected_route(payload: dict = Depends(verify_token)):
    """受保护的路由"""
    return {
        "message": "访问成功",
        "user": payload.get("sub")
    }

OAuth2 授权

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 模拟用户数据库
fake_users_db = {
    "admin": {
        "username": "admin",
        "hashed_password": "hashed_password_here",
        "email": "admin@example.com"
    }
}

def verify_password(plain_password, hashed_password):
    """验证密码(实际应使用 bcrypt)"""
    return plain_password == "123456"  # 简化示例

def get_user(username: str):
    """获取用户"""
    if username in fake_users_db:
        return fake_users_db[username]
    return None

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    """OAuth2 登录"""
    user = get_user(form_data.username)
    if not user or not verify_password(form_data.password, user["hashed_password"]):
        raise HTTPException(
            status_code=400,
            detail="用户名或密码错误"
        )
    # 生成 token(简化示例)
    access_token = create_access_token(data={"sub": user["username"]})
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/api/users/me")
async def read_users_me(token: str = Depends(oauth2_scheme)):
    """获取当前用户信息"""
    payload = verify_token(token)
    username = payload.get("sub")
    user = get_user(username)
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    return user

签名验证

import hmac
import hashlib
import time
from urllib.parse import urlencode

def generate_sign(params: dict, secret_key: str) -> str:
    """生成签名"""
    # 1. 参数排序
    sorted_params = sorted(params.items())
    # 2. 拼接字符串
    query_string = urlencode(sorted_params)
    # 3. 添加密钥
    string_to_sign = f"{query_string}&key={secret_key}"
    # 4. MD5 加密
    sign = hashlib.md5(string_to_sign.encode()).hexdigest().upper()
    return sign

def verify_sign(params: dict, secret_key: str, received_sign: str) -> bool:
    """验证签名"""
    expected_sign = generate_sign(params, secret_key)
    return hmac.compare_digest(expected_sign, received_sign)

# 使用示例
params = {
    "app_id": "123456",
    "timestamp": str(int(time.time())),
    "data": "test"
}
secret_key = "your_secret_key"

sign = generate_sign(params, secret_key)
print(f"生成的签名: {sign}")

# 验证
is_valid = verify_sign(params, secret_key, sign)
print(f"签名验证: {is_valid}")

学习建议

  • 理解认证和授权的区别
  • 掌握 JWT Token 的使用
  • 学习 OAuth2 授权流程
  • 了解常见的安全漏洞(SQL 注入、XSS、CSRF)

练习题

  1. 实现一个完整的 JWT 认证系统
  2. 实现 API 签名验证中间件
  3. 添加接口访问频率限制(限流)

2.3 消息队列

岗位要求

  • 熟悉消息队列的使用
  • 了解消息队列的选型和应用场景
  • 能够使用消息队列实现异步任务处理

Redis 作为消息队列

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, db=0)

# 生产者
def produce_task(queue_name, task_data):
    """生产任务"""
    r.lpush(queue_name, json.dumps(task_data))
    print(f"任务已加入队列: {task_data}")

# 消费者
def consume_task(queue_name, timeout=0):
    """消费任务"""
    while True:
        # 阻塞式获取任务
        task = r.brpop(queue_name, timeout=timeout)
        if task:
            task_data = json.loads(task[1])
            print(f"处理任务: {task_data}")
            # 处理任务逻辑
            process_task(task_data)

def process_task(task_data):
    """处理任务"""
    time.sleep(1)  # 模拟处理时间
    print(f"任务处理完成: {task_data}")

# 使用示例
produce_task("task_queue", {"type": "email", "to": "user@example.com"})

RabbitMQ 示例

import pika
import json

# 连接 RabbitMQ
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='task_queue', durable=True)

# 生产者
def send_task(task_data):
    """发送任务"""
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=json.dumps(task_data),
        properties=pika.BasicProperties(
            delivery_mode=2,  # 消息持久化
        )
    )
    print(f"任务已发送: {task_data}")

# 消费者
def callback(ch, method, properties, body):
    """处理消息"""
    task_data = json.loads(body)
    print(f"收到任务: {task_data}")

    # 处理任务
    process_task(task_data)

    # 确认消息已处理
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 设置消费者
channel.basic_qos(prefetch_count=1)  # 公平分发
channel.basic_consume(
    queue='task_queue',
    on_message_callback=callback
)

# 开始消费
channel.start_consuming()

Celery 异步任务队列

# celery_app.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def send_email(to, subject, body):
    """发送邮件任务"""
    print(f"发送邮件到 {to}: {subject}")
    # 实际发送邮件逻辑
    return f"邮件已发送到 {to}"

@app.task
def process_data(data):
    """数据处理任务"""
    print(f"处理数据: {data}")
    # 数据处理逻辑
    return "处理完成"

# 使用
from celery_app import send_email, process_data

# 异步执行任务
result = send_email.delay("user@example.com", "主题", "内容")
print(f"任务ID: {result.id}")

# 获取结果
task_result = result.get(timeout=10)
print(f"任务结果: {task_result}")

学习建议

  • 理解消息队列的作用和优势
  • 学习常用消息队列(Redis、RabbitMQ、Kafka)
  • 掌握任务队列的使用场景
  • 了解消息持久化和可靠性保证

练习题

  1. 使用 Redis 实现一个简单的任务队列
  2. 使用 Celery 实现异步邮件发送功能
  3. 实现消息队列的监控和重试机制

2.4 Docker 容器化

岗位要求

  • 具有 Docker 容器化及服务编排经验
  • 熟悉基于 Docker、K8s 开发部署服务的流程
  • 了解服务的部署和运维

Dockerfile 示例

# Python 应用 Dockerfile
FROM python:3.9-slim

# 设置工作目录
WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose 示例

# docker-compose.yml
version: '3.8'

services:
  web:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:pass@db:5432/mydb
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis
    volumes:
      - .:/app
    command: uvicorn main:app --reload --host 0.0.0.0

  db:
    image: postgres:13
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=mydb
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"

volumes:
  postgres_data:

Python 中使用 Docker

import docker

# 连接 Docker
client = docker.from_env()

# 构建镜像
def build_image(dockerfile_path, tag):
    """构建 Docker 镜像"""
    client.images.build(
        path=dockerfile_path,
        tag=tag
    )
    print(f"镜像构建完成: {tag}")

# 运行容器
def run_container(image, ports, environment):
    """运行容器"""
    container = client.containers.run(
        image,
        ports=ports,
        environment=environment,
        detach=True
    )
    print(f"容器已启动: {container.id}")
    return container

# 查看容器日志
def get_logs(container_id):
    """获取容器日志"""
    container = client.containers.get(container_id)
    logs = container.logs()
    return logs.decode('utf-8')

学习建议

  • 掌握 Docker 基本命令(build、run、push、pull)
  • 学习 Dockerfile 编写
  • 了解 Docker Compose 的使用
  • 学习容器编排和最佳实践

练习题

  1. 为你的 Python 项目编写 Dockerfile
  2. 使用 Docker Compose 搭建开发环境(应用+数据库+Redis)
  3. 实现多阶段构建优化镜像大小

2.5 微服务架构

岗位要求

  • 了解分布式系统及微服务架构
  • 能够独立排查和解决服务问题
  • 具有微服务下服务数据的埋点、链路耗时追踪经验

微服务基础架构

# service_a.py - 服务A
from fastapi import FastAPI
import httpx

app = FastAPI()

@app.get("/api/service-a/data")
async def get_data():
    # 调用服务B
    async with httpx.AsyncClient() as client:
        response = await client.get("http://service-b:8001/api/service-b/data")
        data_b = response.json()

    return {
        "service": "A",
        "data": "来自服务A",
        "from_b": data_b
    }

# service_b.py - 服务B
from fastapi import FastAPI

app = FastAPI()

@app.get("/api/service-b/data")
async def get_data():
    return {
        "service": "B",
        "data": "来自服务B"
    }

服务注册与发现

# service_registry.py
from typing import Dict, List
import time

class ServiceRegistry:
    def __init__(self):
        self.services: Dict[str, List[dict]] = {}

    def register(self, service_name: str, address: str, port: int):
        """注册服务"""
        if service_name not in self.services:
            self.services[service_name] = []

        service_info = {
            "address": address,
            "port": port,
            "timestamp": time.time()
        }
        self.services[service_name].append(service_info)
        print(f"服务注册: {service_name} at {address}:{port}")

    def discover(self, service_name: str) -> List[dict]:
        """服务发现"""
        return self.services.get(service_name, [])

    def health_check(self, service_name: str):
        """健康检查"""
        # 实现健康检查逻辑
        pass

# 使用示例
registry = ServiceRegistry()
registry.register("user-service", "localhost", 8001)
registry.register("order-service", "localhost", 8002)

services = registry.discover("user-service")

链路追踪示例

import uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware

app = FastAPI()

class TracingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # 生成追踪ID
        trace_id = str(uuid.uuid4())
        request.state.trace_id = trace_id

        # 添加追踪头
        response = await call_next(request)
        response.headers["X-Trace-Id"] = trace_id

        return response

app.add_middleware(TracingMiddleware)

@app.get("/api/data")
async def get_data(request: Request):
    trace_id = request.state.trace_id
    print(f"追踪ID: {trace_id}")
    return {"data": "test", "trace_id": trace_id}

学习建议

  • 理解微服务架构的优缺点
  • 学习服务间通信方式(HTTP、gRPC、消息队列)
  • 了解服务注册与发现机制
  • 掌握分布式系统的常见问题(一致性、可用性、分区容错)

练习题

  1. 设计一个简单的微服务架构(用户服务、订单服务)
  2. 实现服务间的调用和错误处理
  3. 添加服务监控和日志收集

2.6 性能优化

岗位要求

  • 对于接口的性能可以进行优化
  • 数据库 SQL 优化
  • 负责后端性能瓶颈分析与优化

接口性能优化

from functools import lru_cache
import time
from fastapi import FastAPI
import redis

app = FastAPI()
r = redis.Redis(host='localhost', port=6379, db=0)

# 1. 使用缓存
@lru_cache(maxsize=128)
def expensive_computation(n):
    """昂贵的计算(使用缓存)"""
    time.sleep(0.1)  # 模拟耗时操作
    return n * n

# 2. Redis 缓存装饰器
def cache_result(expire=3600):
    """Redis 缓存装饰器"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            # 生成缓存键
            cache_key = f"{func.__name__}:{args}:{kwargs}"

            # 尝试从缓存获取
            cached = r.get(cache_key)
            if cached:
                return cached.decode('utf-8')

            # 执行函数
            result = func(*args, **kwargs)

            # 存入缓存
            r.setex(cache_key, expire, str(result))
            return result
        return wrapper
    return decorator

@cache_result(expire=3600)
def get_user_data(user_id: int):
    """获取用户数据(带缓存)"""
    # 模拟数据库查询
    time.sleep(0.5)
    return {"user_id": user_id, "name": "张三"}

# 3. 数据库连接池
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "postgresql://user:pass@localhost/db",
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20
)

SQL 优化

# 1. 使用索引
# CREATE INDEX idx_user_email ON users(email);

# 2. 避免 N+1 查询
# 不好的方式
def get_users_bad():
    users = User.query.all()
    for user in users:
        posts = Post.query.filter_by(user_id=user.id).all()  # N+1 查询
        user.posts = posts

# 好的方式(使用 JOIN 或预加载)
def get_users_good():
    from sqlalchemy.orm import joinedload
    users = User.query.options(joinedload(User.posts)).all()

# 3. 使用批量操作
def create_users_bad(users_data):
    for data in users_data:
        user = User(**data)
        db.session.add(user)
    db.session.commit()  # 多次提交

def create_users_good(users_data):
    users = [User(**data) for data in users_data]
    db.session.bulk_save_objects(users)
    db.session.commit()  # 一次提交

# 4. 分页查询
def get_users_paginated(page=1, per_page=20):
    return User.query.paginate(
        page=page,
        per_page=per_page,
        error_out=False
    )

异步优化

import asyncio
import aiohttp
from typing import List

# 串行请求(慢)
async def fetch_serial(urls: List[str]):
    results = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            async with session.get(url) as response:
                results.append(await response.json())
    return results

# 并发请求(快)
async def fetch_parallel(urls: List[str]):
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        results = []
        for response in responses:
            results.append(await response.json())
        return results

学习建议

  • 掌握性能分析工具(cProfile、py-spy)
  • 学习数据库查询优化技巧
  • 了解缓存策略和实现
  • 学习异步编程提升性能

练习题

  1. 使用缓存优化一个慢查询接口
  2. 优化一个存在 N+1 查询问题的代码
  3. 使用异步编程提升批量 API 调用性能

2.7 测试与 CI/CD

岗位要求

  • 编写测试用例及自动化测试
  • 熟悉 pytest 等测试框架
  • 参与持续集成/持续部署流程
  • 熟悉 CI/CD 流程

单元测试(pytest)

# test_user.py
import pytest
from app.models import User

def test_create_user():
    """测试创建用户"""
    user = User(name="张三", email="zhangsan@example.com")
    assert user.name == "张三"
    assert user.email == "zhangsan@example.com"

def test_user_email_validation():
    """测试邮箱验证"""
    with pytest.raises(ValueError):
        User(name="张三", email="invalid-email")

@pytest.fixture
def sample_user():
    """测试夹具"""
    return User(name="测试用户", email="test@example.com")

def test_user_with_fixture(sample_user):
    """使用夹具的测试"""
    assert sample_user.name == "测试用户"

API 测试

# test_api.py
from fastapi.testclient import TestClient
from app.main import app

client = TestClient(app)

def test_get_users():
    """测试获取用户列表"""
    response = client.get("/api/users")
    assert response.status_code == 200
    assert isinstance(response.json(), list)

def test_create_user():
    """测试创建用户"""
    user_data = {
        "name": "测试用户",
        "email": "test@example.com"
    }
    response = client.post("/api/users", json=user_data)
    assert response.status_code == 201
    assert response.json()["name"] == "测试用户"

CI/CD 配置(GitHub Actions)

# .github/workflows/ci.yml
name: CI/CD Pipeline

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: postgres:13
        env:
          POSTGRES_PASSWORD: postgres
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5

    steps:
    - uses: actions/checkout@v2

    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'

    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install pytest pytest-cov

    - name: Run tests
      run: |
        pytest --cov=app --cov-report=xml

    - name: Upload coverage
      uses: codecov/codecov-action@v2

  deploy:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'

    steps:
    - uses: actions/checkout@v2

    - name: Deploy to production
      run: |
        echo "部署到生产环境"
        # 实际的部署命令

学习建议

  • 掌握 pytest 测试框架
  • 学习测试类型(单元测试、集成测试、端到端测试)
  • 了解 CI/CD 工具(GitHub Actions、Jenkins、GitLab CI)
  • 学习测试覆盖率的概念

练习题

  1. 为你的 API 编写完整的测试用例
  2. 配置 GitHub Actions 实现自动化测试
  3. 实现自动化部署流程

三、高阶技能要求

高阶技能通常是特定领域或高级岗位的要求,掌握这些技能能够胜任更专业和复杂的项目。

3.1 大模型应用开发

岗位要求

  • 熟悉基于大模型的构建 Agent 智能体的开发
  • 对于大模型的 API 接口有封装调用经验
  • 对于 Prompt 有调优经验
  • 熟悉 RAG 知识库的构建和工作机制
  • 了解 MCP、function-call 等相关技术
  • 大模型的微调训练:SFT、LoRA 等相关经验

大模型 API 调用

import openai
from typing import List, Dict

class LLMClient:
    def __init__(self, api_key: str, base_url: str = None):
        self.client = openai.OpenAI(
            api_key=api_key,
            base_url=base_url or "https://api.openai.com/v1"
        )

    def chat_completion(self, messages: List[Dict], model: str = "gpt-3.5-turbo"):
        """聊天完成"""
        response = self.client.chat.completions.create(
            model=model,
            messages=messages,
            temperature=0.7,
            max_tokens=1000
        )
        return response.choices[0].message.content

    def generate_text(self, prompt: str, **kwargs):
        """生成文本"""
        messages = [{"role": "user", "content": prompt}]
        return self.chat_completion(messages, **kwargs)

# 使用示例
client = LLMClient(api_key="your-api-key")
response = client.generate_text("解释一下 Python 的装饰器")
print(response)

Prompt 工程

class PromptTemplate:
    """Prompt 模板"""

    SYSTEM_PROMPT = """你是一个专业的 Python 开发助手。
你的任务是帮助用户解决编程问题,提供清晰的代码示例和解释。"""

    def __init__(self):
        self.system_prompt = self.SYSTEM_PROMPT

    def build_prompt(self, user_query: str, context: str = None):
        """构建 Prompt"""
        messages = [
            {"role": "system", "content": self.system_prompt}
        ]

        if context:
            messages.append({
                "role": "system",
                "content": f"上下文信息:{context}"
            })

        messages.append({
            "role": "user",
            "content": user_query
        })

        return messages

    def optimize_prompt(self, original_prompt: str) -> str:
        """优化 Prompt"""
        # 添加明确的指令
        optimized = f"""请按照以下要求回答:
1. 提供清晰的解释
2. 给出代码示例
3. 说明注意事项

问题:{original_prompt}"""
        return optimized

# 使用
template = PromptTemplate()
prompt = template.build_prompt(
    "如何实现一个单例模式?",
    context="Python 3.9+"
)

RAG 知识库构建

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.document_loaders import TextLoader

class RAGSystem:
    def __init__(self, embedding_model: str = "text-embedding-ada-002"):
        self.embeddings = OpenAIEmbeddings(model=embedding_model)
        self.vectorstore = None
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )

    def build_knowledge_base(self, documents_path: str):
        """构建知识库"""
        # 加载文档
        loader = TextLoader(documents_path)
        documents = loader.load()

        # 文档切片
        texts = self.text_splitter.split_documents(documents)

        # 创建向量存储
        self.vectorstore = Chroma.from_documents(
            texts,
            self.embeddings
        )
        print(f"知识库构建完成,包含 {len(texts)} 个文档片段")

    def search(self, query: str, k: int = 3):
        """搜索相关文档"""
        if not self.vectorstore:
            raise ValueError("知识库未构建")

        docs = self.vectorstore.similarity_search(query, k=k)
        return docs

    def rag_query(self, query: str, llm_client):
        """RAG 查询"""
        # 1. 检索相关文档
        relevant_docs = self.search(query)
        context = "\n".join([doc.page_content for doc in relevant_docs])

        # 2. 构建 Prompt
        prompt = f"""基于以下上下文信息回答问题:

上下文:
{context}

问题:{query}

请基于上下文信息回答,如果上下文中没有相关信息,请说明。"""

        # 3. 调用 LLM
        response = llm_client.generate_text(prompt)
        return response

# 使用示例
rag = RAGSystem()
rag.build_knowledge_base("documents.txt")
result = rag.rag_query("Python 装饰器是什么?", client)

Agent 智能体开发

from typing import List, Dict, Callable
import json

class Agent:
    def __init__(self, llm_client, tools: List[Callable] = None):
        self.llm_client = llm_client
        self.tools = tools or []
        self.conversation_history = []

    def add_tool(self, tool_func: Callable, description: str):
        """添加工具"""
        self.tools.append({
            "function": tool_func,
            "name": tool_func.__name__,
            "description": description
        })

    def execute_tool(self, tool_name: str, parameters: dict):
        """执行工具"""
        for tool in self.tools:
            if tool["name"] == tool_name:
                return tool["function"](**parameters)
        return None

    def chat(self, user_input: str):
        """对话"""
        # 构建工具描述
        tools_desc = "\n".join([
            f"- {tool['name']}: {tool['description']}"
            for tool in self.tools
        ])

        # 构建 Prompt
        system_prompt = f"""你是一个智能助手,可以使用以下工具:
{tools_desc}

当需要使用工具时,请以 JSON 格式回复:
{{"action": "use_tool", "tool": "tool_name", "parameters": {{...}}}}

否则直接回答用户问题。"""

        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_input}
        ]

        response = self.llm_client.chat_completion(messages)

        # 检查是否需要使用工具
        try:
            action = json.loads(response)
            if action.get("action") == "use_tool":
                tool_result = self.execute_tool(
                    action["tool"],
                    action["parameters"]
                )
                return f"工具执行结果:{tool_result}"
        except:
            pass

        return response

# 使用示例
def get_weather(city: str) -> str:
    """获取天气"""
    return f"{city}的天气是晴天,25°C"

agent = Agent(client)
agent.add_tool(get_weather, "获取指定城市的天气信息")
response = agent.chat("北京今天天气怎么样?")

学习建议

  • 学习 Prompt 工程技巧
  • 掌握向量数据库的使用
  • 了解 RAG 架构和实现
  • 学习 Agent 开发框架(LangChain、AutoGPT)

练习题

  1. 实现一个简单的 RAG 问答系统
  2. 开发一个能够调用工具的 Agent
  3. 优化 Prompt 提升回答质量

3.2 分布式系统

岗位要求

  • 了解分布式系统及微服务架构
  • 能够独立排查和解决服务问题
  • 理解分布式系统的常见问题

分布式锁

import redis
import time
import uuid

class DistributedLock:
    def __init__(self, redis_client, key, timeout=10):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())

    def acquire(self):
        """获取锁"""
        end = time.time() + self.timeout
        while time.time() < end:
            if self.redis.set(
                self.key,
                self.identifier,
                nx=True,
                ex=self.timeout
            ):
                return True
            time.sleep(0.001)
        return False

    def release(self):
        """释放锁"""
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        self.redis.eval(script, 1, self.key, self.identifier)

# 使用
r = redis.Redis()
lock = DistributedLock(r, "resource")
if lock.acquire():
    try:
        # 执行需要加锁的操作
        pass
    finally:
        lock.release()

分布式任务调度

import asyncio
from datetime import datetime
import json

class DistributedScheduler:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.task_queue = "task_queue"
        self.result_queue = "result_queue"

    async def schedule_task(self, task_type: str, data: dict, delay: int = 0):
        """调度任务"""
        task = {
            "id": str(uuid.uuid4()),
            "type": task_type,
            "data": data,
            "scheduled_at": datetime.now().isoformat(),
            "execute_at": (datetime.now().timestamp() + delay)
        }

        # 如果延迟执行,使用有序集合
        if delay > 0:
            self.redis.zadd(
                "scheduled_tasks",
                {json.dumps(task): task["execute_at"]}
            )
        else:
            # 立即执行,加入队列
            self.redis.lpush(self.task_queue, json.dumps(task))

        return task["id"]

    async def process_tasks(self):
        """处理任务"""
        while True:
            # 检查延迟任务
            now = datetime.now().timestamp()
            tasks = self.redis.zrangebyscore(
                "scheduled_tasks",
                0,
                now
            )

            for task_json in tasks:
                task = json.loads(task_json)
                self.redis.lpush(self.task_queue, task_json)
                self.redis.zrem("scheduled_tasks", task_json)

            # 处理队列任务
            task_json = self.redis.brpop(self.task_queue, timeout=1)
            if task_json:
                task = json.loads(task_json[1])
                result = await self.execute_task(task)

                # 保存结果
                self.redis.lpush(
                    self.result_queue,
                    json.dumps({"task_id": task["id"], "result": result})
                )

学习建议

  • 理解分布式系统的 CAP 定理
  • 学习分布式一致性算法
  • 掌握分布式锁、分布式事务
  • 了解服务发现和负载均衡

练习题

  1. 实现一个分布式任务队列
  2. 实现分布式锁机制
  3. 设计一个分布式系统的监控方案

3.3 Kubernetes (K8s)

岗位要求

  • 熟悉 K8s、Docker 等容器类技术
  • 熟悉基于 docker、k8s 开发部署服务的流程

Kubernetes 部署配置

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: python-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: python-app
  template:
    metadata:
      labels:
        app: python-app
    spec:
      containers:
      - name: python-app
        image: myregistry/python-app:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-secret
              key: url
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10

---
# service.yaml
apiVersion: v1
kind: Service
metadata:
  name: python-app-service
spec:
  selector:
    app: python-app
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

Python 与 K8s 交互

from kubernetes import client, config

# 加载配置
config.load_incluster_config()  # 在集群内
# config.load_kube_config()    # 本地开发

v1 = client.CoreV1Api()

# 获取 Pod 列表
def list_pods(namespace="default"):
    pods = v1.list_namespaced_pod(namespace)
    for pod in pods.items:
        print(f"Pod: {pod.metadata.name}, Status: {pod.status.phase}")

# 创建 Deployment
def create_deployment(name, image, replicas=1):
    apps_v1 = client.AppsV1Api()

    deployment = client.V1Deployment(
        metadata=client.V1ObjectMeta(name=name),
        spec=client.V1DeploymentSpec(
            replicas=replicas,
            selector=client.V1LabelSelector(
                match_labels={"app": name}
            ),
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(labels={"app": name}),
                spec=client.V1PodSpec(
                    containers=[
                        client.V1Container(
                            name=name,
                            image=image,
                            ports=[client.V1ContainerPort(container_port=8000)]
                        )
                    ]
                )
            )
        )
    )

    apps_v1.create_namespaced_deployment(
        namespace="default",
        body=deployment
    )

学习建议

  • 掌握 K8s 基本概念(Pod、Service、Deployment)
  • 学习 K8s 资源管理
  • 了解 K8s 网络和存储
  • 学习 Helm 包管理

练习题

  1. 编写 K8s 部署配置文件
  2. 使用 Python 客户端操作 K8s
  3. 实现应用的自动扩缩容

3.4 大数据技术栈

岗位要求

  • 有数据开发经验,熟悉大数据技术栈
  • 包括但不限于 Hive/Spark/Kafka/Redis 等技术
  • 熟悉云原生数据架构

Spark 数据处理

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count

# 创建 Spark Session
spark = SparkSession.builder \
    .appName("DataProcessing") \
    .getOrCreate()

# 读取数据
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# 数据处理
result = df \
    .filter(col("age") > 18) \
    .groupBy("city") \
    .agg(
        count("*").alias("count"),
        avg("salary").alias("avg_salary")
    )

# 写入数据
result.write.mode("overwrite").parquet("output/")

Kafka 生产者/消费者

from kafka import KafkaProducer, KafkaConsumer
import json

# 生产者
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

producer.send('my_topic', {'key': 'value'})
producer.flush()

# 消费者
consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    print(f"收到消息: {message.value}")

学习建议

  • 学习 Spark 数据处理
  • 掌握 Kafka 消息队列
  • 了解大数据处理流程
  • 学习数据仓库概念

练习题

  1. 使用 Spark 处理大规模数据
  2. 实现 Kafka 生产者和消费者
  3. 构建一个数据管道

3.5 区块链开发

岗位要求

  • 理解区块链底层结构
  • 具备 PoW/PoS 共识经验
  • 智能合约/Web3.py 经验

简单区块链实现

import hashlib
import json
from datetime import datetime

class Block:
    def __init__(self, index, transactions, previous_hash):
        self.index = index
        self.transactions = transactions
        self.previous_hash = previous_hash
        self.timestamp = datetime.now().isoformat()
        self.nonce = 0
        self.hash = self.calculate_hash()

    def calculate_hash(self):
        block_string = json.dumps({
            "index": self.index,
            "transactions": self.transactions,
            "previous_hash": self.previous_hash,
            "timestamp": self.timestamp,
            "nonce": self.nonce
        }, sort_keys=True)
        return hashlib.sha256(block_string.encode()).hexdigest()

    def mine_block(self, difficulty):
        target = "0" * difficulty
        while self.hash[:difficulty] != target:
            self.nonce += 1
            self.hash = self.calculate_hash()
        print(f"区块已挖出: {self.hash}")

class Blockchain:
    def __init__(self):
        self.chain = [self.create_genesis_block()]
        self.difficulty = 2
        self.pending_transactions = []

    def create_genesis_block(self):
        return Block(0, [], "0")

    def add_transaction(self, transaction):
        self.pending_transactions.append(transaction)

    def mine_pending_transactions(self):
        block = Block(
            len(self.chain),
            self.pending_transactions,
            self.chain[-1].hash
        )
        block.mine_block(self.difficulty)
        self.chain.append(block)
        self.pending_transactions = []

    def get_latest_block(self):
        return self.chain[-1]

    def is_chain_valid(self):
        for i in range(1, len(self.chain)):
            current = self.chain[i]
            previous = self.chain[i-1]

            if current.hash != current.calculate_hash():
                return False
            if current.previous_hash != previous.hash:
                return False
        return True

学习建议

  • 理解区块链基本原理
  • 学习共识算法(PoW、PoS)
  • 了解智能合约开发
  • 学习 Web3.py 库

练习题

  1. 实现一个简单的区块链
  2. 实现 PoW 共识算法
  3. 使用 Web3.py 与以太坊交互

3.6 文档解析与 OCR

岗位要求

  • 负责多格式文档(PDF/扫描件/图片)的信息抽取
  • 熟练使用文档解析与 OCR 工具库
  • 具有大语言模型提示优化经验

PDF 解析

import PyPDF2
import pdfplumber

# 使用 PyPDF2
def extract_text_pypdf2(pdf_path):
    with open(pdf_path, 'rb') as file:
        reader = PyPDF2.PdfReader(file)
        text = ""
        for page in reader.pages:
            text += page.extract_text()
    return text

# 使用 pdfplumber(更准确)
def extract_text_pdfplumber(pdf_path):
    with pdfplumber.open(pdf_path) as pdf:
        text = ""
        for page in pdf.pages:
            text += page.extract_text()
    return text

# 提取表格
def extract_tables(pdf_path):
    with pdfplumber.open(pdf_path) as pdf:
        tables = []
        for page in pdf.pages:
            tables.extend(page.extract_tables())
    return tables

OCR 识别

from PIL import Image
import pytesseract
import cv2

def ocr_image(image_path):
    """OCR 识别"""
    image = Image.open(image_path)
    text = pytesseract.image_to_string(image, lang='chi_sim+eng')
    return text

def preprocess_image(image_path):
    """图像预处理"""
    img = cv2.imread(image_path)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    # 二值化
    _, binary = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
    return binary

# 使用 PaddleOCR
from paddleocr import PaddleOCR

ocr = PaddleOCR(use_angle_cls=True, lang='ch')
result = ocr.ocr('image.jpg', cls=True)

学习建议

  • 掌握 PDF 解析库(PyPDF2、pdfplumber)
  • 学习 OCR 工具(Tesseract、PaddleOCR)
  • 了解图像预处理技术
  • 学习文档结构化提取

练习题

  1. 实现 PDF 文档信息提取
  2. 使用 OCR 识别图片中的文字
  3. 构建文档解析和结构化提取系统

3.7 爬虫技术

岗位要求

  • 熟悉爬虫技术(Scrapy、Selenium)
  • 了解分布式爬虫

Scrapy 爬虫

import scrapy

class NewsSpider(scrapy.Spider):
    name = 'news'
    start_urls = ['https://example.com/news']

    def parse(self, response):
        # 提取标题
        titles = response.css('h2.title::text').getall()

        # 提取链接
        links = response.css('a::attr(href)').getall()

        for title, link in zip(titles, links):
            yield {
                'title': title,
                'link': response.urljoin(link)
            }

        # 跟进链接
        next_page = response.css('a.next::attr(href)').get()
        if next_page:
            yield response.follow(next_page, self.parse)

Selenium 自动化

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

driver = webdriver.Chrome()
driver.get("https://example.com")

# 等待元素加载
element = WebDriverWait(driver, 10).until(
    EC.presence_of_element_located((By.ID, "myElement"))
)

# 操作元素
driver.find_element(By.ID, "username").send_keys("user")
driver.find_element(By.ID, "password").send_keys("pass")
driver.find_element(By.ID, "submit").click()

driver.quit()

学习建议

  • 掌握 Scrapy 框架
  • 学习 Selenium 自动化
  • 了解反爬虫机制
  • 学习分布式爬虫架构

练习题

  1. 使用 Scrapy 爬取新闻网站
  2. 使用 Selenium 处理动态网页
  3. 实现分布式爬虫系统

四、学习路径建议

阶段一:基础技能(1-3个月)

  1. Python 基础(2周)

    • 完成 Python 语法学习(参考 page6.md)
    • 练习编写小程序
  2. Web 框架(3周)

    • 选择一个框架深入学习(推荐 FastAPI 或 Django)
    • 完成一个简单的 CRUD 项目
  3. 数据库(2周)

    • 学习 MySQL/PostgreSQL
    • 掌握 ORM 框架使用
  4. Linux 和 Git(1周)

    • 熟悉常用 Linux 命令
    • 掌握 Git 基本操作

阶段二:进阶技能(3-6个月)

  1. 异步编程(2周)

    • 学习 asyncio
    • 实现异步 API
  2. 接口鉴权(1周)

    • 实现 JWT 认证
    • 学习 OAuth2
  3. Docker(2周)

    • 学习 Docker 基础
    • 容器化你的项目
  4. 性能优化(持续)

    • 学习缓存策略
    • SQL 优化实践
  5. 测试和 CI/CD(2周)

    • 编写测试用例
    • 配置 CI/CD

阶段三:高阶技能(6-12个月)

根据职业方向选择:

  • 大模型方向:学习 LLM API、RAG、Agent 开发
  • 分布式方向:学习 K8s、微服务架构
  • 大数据方向:学习 Spark、Kafka
  • 区块链方向:学习区块链原理、智能合约

学习资源推荐

  1. 官方文档

  2. 在线课程

    • 慕课网、极客时间
    • Coursera、edX
  3. 实践项目

    • GitHub 开源项目
    • 自己设计项目
  4. 刷题平台

    • LeetCode(算法)
    • 牛客网(面试题)

总结

Python 后端开发是一个需要持续学习的领域。建议:

  1. 循序渐进:先掌握基础,再学习进阶
  2. 实践为主:多写代码,多做项目
  3. 关注行业:了解新技术和趋势
  4. 建立知识体系:系统化学习,不要碎片化

希望这份指南能够帮助你制定学习计划,快速达到岗位要求!