Python 后端岗位技能要求完全指南
本文基于真实招聘需求,系统梳理 Python 后端开发岗位的技能要求,从基础到进阶再到高阶,帮助学习者有针对性地提升技能,快速达到岗位要求。
岗位职责汇总表
根据市场招聘需求,Python 后端开发岗位的主要职责如下:
| 职责类别 | 具体职责 | 出现频率 |
|---|---|---|
| 代码开发与维护 | 负责现有后端代码的维护与扩展,二次开发新功能及接口 | ⭐⭐⭐⭐⭐ |
| 负责后端功能的开发和迭代 | ⭐⭐⭐⭐⭐ | |
| 负责核心代码编写,负责技术和产品落地 | ⭐⭐⭐⭐ | |
| 后端代码结构优化与重构 | ⭐⭐⭐⭐ | |
| 对已有业务的代码性能和质量进行改进、抽象和重构 | ⭐⭐⭐⭐ | |
| 系统设计与架构 | 参与后端系统设计与开发,主要涉及 RESTful API、gRPC 等服务接口的构建 | ⭐⭐⭐⭐⭐ |
| 深入发掘和分析业务需求,撰写技术方案和系统设计 | ⭐⭐⭐⭐ | |
| 参与技术选型和架构设计,解决复杂的技术挑战 | ⭐⭐⭐ | |
| 持续对系统架构进行改造和优化 | ⭐⭐⭐ | |
| 框架与接口开发 | 使用 Python 框架(如 Flask、FastAPI、Django 等)构建高并发、高可靠性的后端服务 | ⭐⭐⭐⭐⭐ |
| 能够基于框架开发 RESTful API 和服务 | ⭐⭐⭐⭐⭐ | |
| 搭建链节点交互 API(Flask / FastAPI)支持本地调试与多节点模拟 | ⭐⭐ | |
| 数据库相关 | 进行数据库设计与优化,熟悉 PostgreSQL、MySQL、Redis、MongoDB 等各类数据库应用 | ⭐⭐⭐⭐⭐ |
| 设计和实现数据库,确保数据的安全和可靠性 | ⭐⭐⭐⭐ | |
| 掌握 SQL 查询优化及数据库设计原则 | ⭐⭐⭐⭐ | |
| 测试与质量保证 | 编写测试用例及自动化测试,确保系统质量 | ⭐⭐⭐⭐ |
| 参与持续集成/持续部署流程 | ⭐⭐⭐⭐ | |
| 熟悉 pytest 等测试框架,具备调试代码和性能优化的能力 | ⭐⭐⭐ | |
| 性能优化 | 负责后端性能瓶颈分析与优化 | ⭐⭐⭐⭐ |
| 持续完善并维护产品服务,不断提升服务性能和稳定性 | ⭐⭐⭐⭐ | |
| 对于接口的性能可以进行优化 | ⭐⭐⭐⭐ | |
| 业务协作 | 参与业务需求分析和产品设计,在初期即为产品注入强劲的技术驱动力 | ⭐⭐⭐⭐ |
| 实现业务系统的后端服务,并和产品经理紧密配合,确保系统按时上线并且功能完备 | ⭐⭐⭐⭐ | |
| 与团队协作完成从需求分析到产品上线的全流程开发 | ⭐⭐⭐ | |
| 与前端开发团队合作,确保 API 集成到 Web 和移动应用程序中 | ⭐⭐⭐ | |
| 系统运维 | 负责企业内部系统的运维 | ⭐⭐⭐ |
| 管理和维护生产系统,以确保可用性和可靠性 | ⭐⭐⭐ | |
| 了解服务的部署和运维;常用的 Linux 操作命令,docker 服务部署等 | ⭐⭐⭐ | |
| 配合容器化部署、多节点运维调试,与团队协作推动流程落地 | ⭐⭐ | |
| 文档与监控 | 编写高质量的代码、测试用例和技术文档 | ⭐⭐⭐ |
| 实现和管理日志记录和监控,以便有效地监视和解决问题 | ⭐⭐⭐ | |
| 优化共识安全与性能,撰写相关技术和接口文档 | ⭐⭐ | |
| 特殊领域开发 | 参与大模型应用后台相关开发 | ⭐⭐⭐ |
| 负责大模型应用类座舱软件功能设计及开发工作 | ⭐⭐ | |
| 负责智能座舱软件、座舱AI引擎的软件功能设计及开发工作 | ⭐⭐ | |
| 参与区块链相关项目的智能合约开发与优化 | ⭐⭐ | |
| 基于 Python 构建脚本驱动账本系统,设计区块、交易流程、共识模块(PoW/PoS) | ⭐ | |
| 负责多格式文档(PDF/扫描件/图片)的信息抽取系统设计、开发与维护 | ⭐⭐ | |
| 负责数据处理、分析与可视化系统的开发工作 | ⭐⭐⭐ | |
| 负责数据智能、自动化等后端代码设计、开发和维护工作 | ⭐⭐⭐ | |
| 负责基于 python 语言进行数据分析 | ⭐⭐ | |
| 项目管理 | 独立负责 ERP 系统某一业务模块的软件开发和后期维护工作,保证系统的稳定性 | ⭐⭐ |
| 主导生态、域控、仪表等自动化测试开发 | ⭐ | |
| 主导自动化测试实施结果分析和定位 | ⭐ | |
| 自动化工具需求、问题管理 | ⭐ |
说明:
- ⭐⭐⭐⭐⭐:出现频率极高,几乎所有岗位都要求
- ⭐⭐⭐⭐:出现频率高,大部分岗位要求
- ⭐⭐⭐:出现频率中等,部分岗位要求
- ⭐⭐:出现频率较低,特定岗位要求
- ⭐:出现频率很低,特殊领域岗位要求
目录
一、基本技能要求
基本技能是 Python 后端开发的基石,几乎所有岗位都会要求掌握这些技能。
1.1 Python 语言基础
岗位要求:
- 精通 Python 语言,掌握 Python 基础语法
- 理解 Python 的特性,如动态类型、装饰器、生成器等
- 熟悉标准库的使用
- 具备良好的编程习惯
核心知识点:
# 1. 基础语法和数据类型
name = "Python"
age = 30
is_active = True
scores = [85, 90, 88]
# 2. 函数定义和使用
def calculate_average(scores):
"""计算平均分"""
return sum(scores) / len(scores) if scores else 0
# 3. 面向对象编程
class User:
def __init__(self, name, age):
self.name = name
self.age = age
def introduce(self):
return f"我是{self.name},今年{self.age}岁"
# 4. 装饰器
def log_time(func):
def wrapper(*args, **kwargs):
import time
start = time.time()
result = func(*args, **kwargs)
print(f"{func.__name__} 执行时间: {time.time() - start:.2f}秒")
return result
return wrapper
@log_time
def slow_function():
import time
time.sleep(1)
return "完成"
# 5. 生成器和迭代器
def fibonacci(n):
"""生成斐波那契数列"""
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
# 6. 异常处理
def safe_divide(a, b):
try:
return a / b
except ZeroDivisionError:
return "除数不能为0"
except TypeError:
return "参数类型错误"
学习建议:
- 掌握 Python 基础语法(参考 page6.md)
- 熟悉常用内置函数和标准库
- 理解 Python 的面向对象特性
- 练习编写规范的代码(遵循 PEP 8)
练习题:
- 编写一个函数,实现列表去重并保持原有顺序
- 使用装饰器实现函数执行时间统计
- 实现一个简单的上下文管理器
点击查看答案
```python # 1. 列表去重 def unique_list(lst): seen = set() result = [] for item in lst: if item not in seen: seen.add(item) result.append(item) return result # 2. 执行时间统计装饰器 import time from functools import wraps def timing_decorator(func): @wraps(func) def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) end = time.time() print(f"{func.__name__} 耗时: {end - start:.4f}秒") return result return wrapper # 3. 上下文管理器 class FileManager: def __init__(self, filename, mode): self.filename = filename self.mode = mode self.file = None def __enter__(self): self.file = open(self.filename, self.mode) return self.file def __exit__(self, exc_type, exc_val, exc_tb): if self.file: self.file.close() return False ```1.2 Web 框架(Django/Flask/FastAPI)
岗位要求:
- 掌握至少一个 Python Web 框架(Django/Flask/FastAPI)
- 能够基于框架开发 RESTful API 和服务
- 熟悉 ORM 框架的使用
- 了解框架的中间件机制
Django 基础示例:
# settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'rest_framework',
'myapp',
]
# models.py
from django.db import models
class User(models.Model):
name = models.CharField(max_length=100)
email = models.EmailField(unique=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'users'
# views.py
from rest_framework import viewsets
from rest_framework.response import Response
from .models import User
from .serializers import UserSerializer
class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
def list(self, request):
users = self.get_queryset()
serializer = self.get_serializer(users, many=True)
return Response(serializer.data)
Flask 基础示例:
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
@app.route('/api/users', methods=['GET'])
def get_users():
users = User.query.all()
return jsonify([{'id': u.id, 'name': u.name, 'email': u.email}
for u in users])
@app.route('/api/users', methods=['POST'])
def create_user():
data = request.json
user = User(name=data['name'], email=data['email'])
db.session.add(user)
db.session.commit()
return jsonify({'id': user.id, 'name': user.name}), 201
FastAPI 基础示例:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
app = FastAPI()
class UserCreate(BaseModel):
name: str
email: str
class UserResponse(BaseModel):
id: int
name: str
email: str
users_db = []
next_id = 1
@app.get("/api/users", response_model=List[UserResponse])
async def get_users():
return users_db
@app.post("/api/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
global next_id
new_user = UserResponse(id=next_id, **user.dict())
users_db.append(new_user)
next_id += 1
return new_user
学习建议:
- Django:适合大型项目,功能全面,学习曲线较陡
- Flask:轻量级,灵活,适合中小型项目
- FastAPI:现代框架,支持异步,自动生成 API 文档
练习题:
- 使用任意框架创建一个简单的用户管理 API(增删改查)
- 实现用户认证中间件
- 添加 API 限流功能
1.3 数据库技术
岗位要求:
- 熟悉关系型数据库(MySQL/PostgreSQL)
- 了解非关系型数据库(Redis/MongoDB)
- 掌握 SQL 语言和数据库设计原则
- 熟悉 ORM 框架(Django ORM、SQLAlchemy)
- 具备数据库优化能力
MySQL 基础:
# 使用 pymysql
import pymysql
conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='testdb',
charset='utf8mb4'
)
cursor = conn.cursor()
# 查询
cursor.execute("SELECT * FROM users WHERE age > %s", (18,))
results = cursor.fetchall()
# 插入
cursor.execute(
"INSERT INTO users (name, email) VALUES (%s, %s)",
('张三', 'zhangsan@example.com')
)
conn.commit()
cursor.close()
conn.close()
PostgreSQL 示例:
# 使用 psycopg2
import psycopg2
conn = psycopg2.connect(
host="localhost",
database="mydb",
user="postgres",
password="password"
)
cur = conn.cursor()
cur.execute("SELECT * FROM users")
rows = cur.fetchall()
Redis 缓存示例:
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 字符串操作
r.set('user:1:name', '张三')
name = r.get('user:1:name')
# 哈希操作
r.hset('user:1', 'name', '张三')
r.hset('user:1', 'age', 25)
user = r.hgetall('user:1')
# 列表操作
r.lpush('tasks', 'task1', 'task2')
task = r.rpop('tasks')
# 设置过期时间
r.setex('session:123', 3600, 'session_data')
MongoDB 示例:
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']
collection = db['users']
# 插入
user = {'name': '张三', 'age': 25, 'email': 'zhangsan@example.com'}
collection.insert_one(user)
# 查询
users = collection.find({'age': {'$gt': 18}})
for user in users:
print(user)
# 更新
collection.update_one(
{'name': '张三'},
{'$set': {'age': 26}}
)
SQL 优化技巧:
-- 1. 使用索引
CREATE INDEX idx_email ON users(email);
-- 2. 避免 SELECT *
SELECT id, name, email FROM users; -- 好
SELECT * FROM users; -- 不好
-- 3. 使用 LIMIT
SELECT * FROM users LIMIT 10;
-- 4. 避免子查询,使用 JOIN
-- 不好
SELECT * FROM orders WHERE user_id IN (SELECT id FROM users WHERE age > 18);
-- 好
SELECT o.* FROM orders o
JOIN users u ON o.user_id = u.id
WHERE u.age > 18;
学习建议:
- 掌握 SQL 基础语法(SELECT、INSERT、UPDATE、DELETE)
- 理解数据库索引原理和优化方法
- 学习事务处理和 ACID 特性
- 了解数据库设计范式
练习题:
- 设计一个博客系统的数据库表结构(用户、文章、评论)
- 编写 SQL 查询:找出最近一周发布的文章及其作者
- 使用 Redis 实现一个简单的缓存装饰器
1.4 RESTful API 设计
岗位要求:
- 具有 RESTful API 设计与开发经验
- 了解常见的接口安全和性能优化策略
- 熟悉 HTTP 协议和状态码
- 能够设计合理的 API 接口
RESTful API 设计原则:
# 1. 使用正确的 HTTP 方法
# GET - 获取资源
# POST - 创建资源
# PUT - 更新资源(完整)
# PATCH - 更新资源(部分)
# DELETE - 删除资源
# 2. 使用合理的 URL 设计
# 好的设计
GET /api/users # 获取用户列表
GET /api/users/1 # 获取单个用户
POST /api/users # 创建用户
PUT /api/users/1 # 更新用户
DELETE /api/users/1 # 删除用户
# 不好的设计
GET /api/getUsers
POST /api/createUser
POST /api/updateUser
POST /api/deleteUser
# 3. 使用标准 HTTP 状态码
# 200 OK - 成功
# 201 Created - 创建成功
# 400 Bad Request - 请求错误
# 401 Unauthorized - 未授权
# 403 Forbidden - 禁止访问
# 404 Not Found - 资源不存在
# 500 Internal Server Error - 服务器错误
FastAPI RESTful API 示例:
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()
class UserCreate(BaseModel):
name: str
email: str
class UserUpdate(BaseModel):
name: Optional[str] = None
email: Optional[str] = None
class UserResponse(BaseModel):
id: int
name: str
email: str
users_db = {}
next_id = 1
@app.get("/api/users", response_model=List[UserResponse])
async def list_users(skip: int = 0, limit: int = 10):
"""获取用户列表"""
return list(users_db.values())[skip:skip+limit]
@app.get("/api/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
"""获取单个用户"""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
return users_db[user_id]
@app.post("/api/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
"""创建用户"""
global next_id
new_user = UserResponse(id=next_id, **user.dict())
users_db[next_id] = new_user
next_id += 1
return new_user
@app.put("/api/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user: UserUpdate):
"""更新用户"""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
existing_user = users_db[user_id]
update_data = user.dict(exclude_unset=True)
updated_user = existing_user.copy(update=update_data)
users_db[user_id] = updated_user
return updated_user
@app.delete("/api/users/{user_id}", status_code=204)
async def delete_user(user_id: int):
"""删除用户"""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
del users_db[user_id]
return None
API 文档和版本控制:
# 版本控制
@app.get("/api/v1/users")
async def get_users_v1():
pass
@app.get("/api/v2/users")
async def get_users_v2():
pass
# 统一响应格式
class APIResponse(BaseModel):
code: int = 200
message: str = "success"
data: Optional[dict] = None
@app.get("/api/users")
async def get_users():
return APIResponse(
code=200,
message="获取成功",
data={"users": [...]}
)
学习建议:
- 理解 REST 架构风格和设计原则
- 掌握 HTTP 协议和状态码
- 学习 API 版本控制策略
- 了解 API 文档生成工具(Swagger、OpenAPI)
练习题:
- 设计一个任务管理系统的 RESTful API
- 实现 API 请求参数验证
- 添加 API 响应缓存机制
1.5 Linux 基础
岗位要求:
- 熟悉 Linux 操作系统
- 能够使用 Shell 脚本进行日常运维操作
- 了解基本的服务器配置和部署
- 能够在 Linux 上调试和排查问题
常用 Linux 命令:
# 文件操作
ls -la # 列出文件(详细信息)
cd /path/to/dir # 切换目录
pwd # 显示当前目录
mkdir -p dir1/dir2 # 创建目录
rm -rf dir # 删除目录
cp source dest # 复制文件
mv source dest # 移动/重命名文件
cat file.txt # 查看文件内容
head -n 10 file.txt # 查看前10行
tail -f file.log # 实时查看日志
# 文件权限
chmod 755 script.sh # 修改文件权限
chown user:group file # 修改文件所有者
# 进程管理
ps aux # 查看所有进程
ps aux | grep python # 查找 Python 进程
kill -9 PID # 强制杀死进程
top # 实时查看进程
htop # 更友好的进程查看工具
# 网络相关
netstat -tuln # 查看端口占用
lsof -i :8000 # 查看8000端口占用
curl http://example.com # 发送 HTTP 请求
wget http://example.com # 下载文件
# 系统信息
df -h # 查看磁盘使用情况
free -h # 查看内存使用情况
uname -a # 查看系统信息
Shell 脚本示例:
#!/bin/bash
# 部署脚本示例
# 变量定义
PROJECT_DIR="/var/www/myapp"
BACKUP_DIR="/backup"
DATE=$(date +%Y%m%d_%H%M%S)
# 备份当前版本
echo "备份当前版本..."
tar -czf "$BACKUP_DIR/backup_$DATE.tar.gz" "$PROJECT_DIR"
# 停止服务
echo "停止服务..."
systemctl stop myapp
# 更新代码
echo "更新代码..."
cd "$PROJECT_DIR"
git pull origin main
# 安装依赖
echo "安装依赖..."
source venv/bin/activate
pip install -r requirements.txt
# 数据库迁移
python manage.py migrate
# 重启服务
echo "重启服务..."
systemctl start myapp
echo "部署完成!"
Python 与 Linux 交互:
import subprocess
import os
# 执行 Shell 命令
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(result.stdout)
# 检查文件是否存在
if os.path.exists('/path/to/file'):
print("文件存在")
# 获取环境变量
python_path = os.environ.get('PYTHONPATH', '/usr/bin/python')
# 执行系统命令并获取输出
def run_command(cmd):
process = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
return stdout.decode('utf-8'), stderr.decode('utf-8')
output, error = run_command('ps aux | grep python')
print(output)
学习建议:
- 掌握常用 Linux 命令
- 学习 Shell 脚本编写
- 了解 Linux 文件系统结构
- 熟悉进程管理和服务管理
练习题:
- 编写一个 Shell 脚本,自动备份数据库
- 使用 Python 编写一个简单的系统监控脚本
- 实现一个日志轮转脚本
1.6 Git 版本控制
岗位要求:
- 了解版本控制系统(Git)
- 熟悉 Git 基本操作(clone、commit、push、pull)
- 理解分支管理策略
- 能够解决代码冲突
Git 基础命令:
# 初始化仓库
git init
git clone https://github.com/user/repo.git
# 基本操作
git status # 查看状态
git add file.py # 添加到暂存区
git add . # 添加所有文件
git commit -m "提交信息" # 提交
git push origin main # 推送到远程
git pull origin main # 拉取远程更新
# 分支操作
git branch # 查看分支
git branch feature/new # 创建分支
git checkout feature/new # 切换分支
git checkout -b feature/new # 创建并切换分支
git merge feature/new # 合并分支
git branch -d feature/new # 删除分支
# 查看历史
git log # 查看提交历史
git log --oneline # 简洁历史
git log --graph --all # 图形化历史
git diff # 查看差异
git show commit_id # 查看某次提交
# 撤销操作
git reset --soft HEAD~1 # 撤销提交,保留更改
git reset --hard HEAD~1 # 撤销提交,丢弃更改
git checkout -- file.py # 撤销工作区更改
git revert commit_id # 创建新提交来撤销
Git 工作流示例:
# 1. 创建功能分支
git checkout -b feature/user-auth
# 2. 开发并提交
git add .
git commit -m "添加用户认证功能"
# 3. 推送分支
git push origin feature/user-auth
# 4. 创建 Pull Request(在 GitHub/GitLab 上)
# 5. 合并后删除分支
git checkout main
git pull origin main
git branch -d feature/user-auth
Python 中使用 Git:
import subprocess
def git_command(cmd):
"""执行 Git 命令"""
result = subprocess.run(
['git'] + cmd.split(),
capture_output=True,
text=True
)
return result.stdout.strip()
# 获取当前分支
current_branch = git_command('branch --show-current')
print(f"当前分支: {current_branch}")
# 获取最新提交信息
last_commit = git_command('log -1 --pretty=format:"%h - %an, %ar : %s"')
print(f"最新提交: {last_commit}")
# 检查是否有未提交的更改
status = git_command('status --porcelain')
if status:
print("有未提交的更改")
else:
print("工作区干净")
学习建议:
- 掌握 Git 基本命令和工作流
- 理解分支管理策略(Git Flow、GitHub Flow)
- 学习解决代码冲突
- 了解 Git Hooks 的使用
练习题:
- 创建一个新分支,开发功能后合并到主分支
- 使用 Git 回退到之前的某个提交
- 解决一次合并冲突
1.7 数据结构与算法
岗位要求:
- 熟悉常用的数据结构和算法
- 具备良好的算法基础和数据结构知识
- 具备良好的问题解决能力
- 能够进行算法优化
常用数据结构:
# 1. 列表(动态数组)
arr = [1, 2, 3, 4, 5]
arr.append(6) # O(1)
arr.insert(0, 0) # O(n)
arr.pop() # O(1)
# 2. 字典(哈希表)
d = {'a': 1, 'b': 2}
d['c'] = 3 # O(1) 平均
value = d.get('a') # O(1) 平均
# 3. 集合
s = {1, 2, 3}
s.add(4) # O(1) 平均
4 in s # O(1) 平均
# 4. 栈(使用列表实现)
stack = []
stack.append(1) # 入栈
stack.pop() # 出栈
# 5. 队列(使用 collections.deque)
from collections import deque
queue = deque()
queue.append(1) # 入队
queue.popleft() # 出队
# 6. 堆(使用 heapq)
import heapq
heap = []
heapq.heappush(heap, 3)
heapq.heappush(heap, 1)
heapq.heappush(heap, 2)
min_val = heapq.heappop(heap) # 1
常用算法:
# 1. 排序算法
def quick_sort(arr):
"""快速排序"""
if len(arr) <= 1:
return arr
pivot = arr[len(arr) // 2]
left = [x for x in arr if x < pivot]
middle = [x for x in arr if x == pivot]
right = [x for x in arr if x > pivot]
return quick_sort(left) + middle + quick_sort(right)
# 2. 二分查找
def binary_search(arr, target):
"""二分查找"""
left, right = 0, len(arr) - 1
while left <= right:
mid = (left + right) // 2
if arr[mid] == target:
return mid
elif arr[mid] < target:
left = mid + 1
else:
right = mid - 1
return -1
# 3. 深度优先搜索(DFS)
def dfs(graph, start, visited=None):
"""深度优先搜索"""
if visited is None:
visited = set()
visited.add(start)
print(start)
for neighbor in graph[start]:
if neighbor not in visited:
dfs(graph, neighbor, visited)
# 4. 广度优先搜索(BFS)
from collections import deque
def bfs(graph, start):
"""广度优先搜索"""
visited = set()
queue = deque([start])
visited.add(start)
while queue:
node = queue.popleft()
print(node)
for neighbor in graph[node]:
if neighbor not in visited:
visited.add(neighbor)
queue.append(neighbor)
# 5. 动态规划示例:斐波那契数列
def fibonacci_dp(n):
"""动态规划计算斐波那契数列"""
if n <= 1:
return n
dp = [0] * (n + 1)
dp[1] = 1
for i in range(2, n + 1):
dp[i] = dp[i-1] + dp[i-2]
return dp[n]
算法复杂度分析:
# O(1) - 常数时间
def get_first(arr):
return arr[0]
# O(log n) - 对数时间(二分查找)
def binary_search(arr, target):
# ... 见上面代码
# O(n) - 线性时间
def find_max(arr):
max_val = arr[0]
for val in arr:
if val > max_val:
max_val = val
return max_val
# O(n log n) - 线性对数时间(快速排序)
def quick_sort(arr):
# ... 见上面代码
# O(n²) - 平方时间(冒泡排序)
def bubble_sort(arr):
n = len(arr)
for i in range(n):
for j in range(0, n - i - 1):
if arr[j] > arr[j + 1]:
arr[j], arr[j + 1] = arr[j + 1], arr[j]
学习建议:
- 掌握常见数据结构(数组、链表、栈、队列、树、图)
- 学习基本算法(排序、查找、递归、动态规划)
- 理解算法复杂度(时间复杂度和空间复杂度)
- 刷题练习(LeetCode、牛客网)
练习题:
- 实现一个 LRU 缓存(使用字典和双向链表)
- 实现快速排序和归并排序,比较性能
- 解决 LeetCode 中等难度的题目(如两数之和、反转链表)
点击查看 LRU 缓存实现
```python from collections import OrderedDict class LRUCache: def __init__(self, capacity: int): self.cache = OrderedDict() self.capacity = capacity def get(self, key: int) -> int: if key not in self.cache: return -1 # 移动到末尾(最近使用) self.cache.move_to_end(key) return self.cache[key] def put(self, key: int, value: int) -> None: if key in self.cache: # 更新值并移动到末尾 self.cache.move_to_end(key) else: if len(self.cache) >= self.capacity: # 删除最久未使用的(第一个) self.cache.popitem(last=False) self.cache[key] = value ```二、进阶技能要求
进阶技能是区分初级和中级开发者的关键,掌握这些技能能够胜任更复杂的项目开发。
2.1 异步编程与并发
岗位要求:
- 熟悉多任务并发,异步、协程的使用
- 对于 HTTP、WebSocket、SSE 等接口有使用经验
- 能理解不同场景的应用
- 对于接口的性能可以进行优化
异步编程基础:
import asyncio
import aiohttp
# 1. 基本异步函数
async def fetch_data(url):
"""异步获取数据"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# 2. 并发执行多个任务
async def main():
urls = [
'https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3'
]
# 并发执行
tasks = [fetch_data(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 运行
results = asyncio.run(main())
FastAPI 异步示例:
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/api/data")
async def get_data():
"""异步接口"""
# 模拟异步 I/O 操作
await asyncio.sleep(1)
return {"data": "异步数据"}
@app.get("/api/batch")
async def batch_process(items: list):
"""批量处理"""
async def process_item(item):
await asyncio.sleep(0.1) # 模拟处理
return f"processed_{item}"
tasks = [process_item(item) for item in items]
results = await asyncio.gather(*tasks)
return results
WebSocket 示例:
from fastapi import FastAPI, WebSocket
import json
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 接收消息
data = await websocket.receive_text()
message = json.loads(data)
# 处理消息
response = {
"type": "response",
"message": f"收到: {message.get('text', '')}"
}
# 发送响应
await websocket.send_text(json.dumps(response))
except Exception as e:
print(f"WebSocket 错误: {e}")
finally:
await websocket.close()
SSE (Server-Sent Events) 示例:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
@app.get("/api/stream")
async def stream_data():
"""SSE 流式数据"""
async def generate():
for i in range(10):
data = {
"id": i,
"message": f"消息 {i}",
"timestamp": asyncio.get_event_loop().time()
}
yield f"data: {json.dumps(data)}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
协程和生成器:
import asyncio
# 协程函数
async def producer(queue):
"""生产者"""
for i in range(5):
await queue.put(f"item_{i}")
await asyncio.sleep(0.5)
await queue.put(None) # 结束标志
async def consumer(queue):
"""消费者"""
while True:
item = await queue.get()
if item is None:
break
print(f"处理: {item}")
await asyncio.sleep(0.2)
async def main():
queue = asyncio.Queue()
await asyncio.gather(
producer(queue),
consumer(queue)
)
asyncio.run(main())
学习建议:
- 理解异步编程的概念和优势
- 掌握 asyncio 库的使用
- 学习异步 I/O 操作(数据库、HTTP 请求)
- 了解事件循环机制
练习题:
- 实现一个异步爬虫,并发抓取多个网页
- 使用 WebSocket 实现一个简单的聊天室
- 实现一个异步任务队列系统
2.2 接口鉴权与安全
岗位要求:
- 熟悉基础的接口鉴权功能:Token 认证、OAuth2 授权、签名验证等
- 了解常见的接口安全和性能优化策略
- 具备数据安全相关经验
Token 认证(JWT):
import jwt
from datetime import datetime, timedelta
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
app = FastAPI()
security = HTTPBearer()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
def create_access_token(data: dict, expires_delta: timedelta = None):
"""创建 JWT Token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=1)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""验证 Token"""
try:
token = credentials.credentials
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 已过期"
)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的 Token"
)
@app.post("/api/login")
async def login(username: str, password: str):
"""登录接口"""
# 验证用户名密码(示例)
if username == "admin" and password == "123456":
access_token = create_access_token(
data={"sub": username},
expires_delta=timedelta(hours=24)
)
return {"access_token": access_token, "token_type": "bearer"}
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误"
)
@app.get("/api/protected")
async def protected_route(payload: dict = Depends(verify_token)):
"""受保护的路由"""
return {
"message": "访问成功",
"user": payload.get("sub")
}
OAuth2 授权:
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 模拟用户数据库
fake_users_db = {
"admin": {
"username": "admin",
"hashed_password": "hashed_password_here",
"email": "admin@example.com"
}
}
def verify_password(plain_password, hashed_password):
"""验证密码(实际应使用 bcrypt)"""
return plain_password == "123456" # 简化示例
def get_user(username: str):
"""获取用户"""
if username in fake_users_db:
return fake_users_db[username]
return None
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
"""OAuth2 登录"""
user = get_user(form_data.username)
if not user or not verify_password(form_data.password, user["hashed_password"]):
raise HTTPException(
status_code=400,
detail="用户名或密码错误"
)
# 生成 token(简化示例)
access_token = create_access_token(data={"sub": user["username"]})
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/api/users/me")
async def read_users_me(token: str = Depends(oauth2_scheme)):
"""获取当前用户信息"""
payload = verify_token(token)
username = payload.get("sub")
user = get_user(username)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return user
签名验证:
import hmac
import hashlib
import time
from urllib.parse import urlencode
def generate_sign(params: dict, secret_key: str) -> str:
"""生成签名"""
# 1. 参数排序
sorted_params = sorted(params.items())
# 2. 拼接字符串
query_string = urlencode(sorted_params)
# 3. 添加密钥
string_to_sign = f"{query_string}&key={secret_key}"
# 4. MD5 加密
sign = hashlib.md5(string_to_sign.encode()).hexdigest().upper()
return sign
def verify_sign(params: dict, secret_key: str, received_sign: str) -> bool:
"""验证签名"""
expected_sign = generate_sign(params, secret_key)
return hmac.compare_digest(expected_sign, received_sign)
# 使用示例
params = {
"app_id": "123456",
"timestamp": str(int(time.time())),
"data": "test"
}
secret_key = "your_secret_key"
sign = generate_sign(params, secret_key)
print(f"生成的签名: {sign}")
# 验证
is_valid = verify_sign(params, secret_key, sign)
print(f"签名验证: {is_valid}")
学习建议:
- 理解认证和授权的区别
- 掌握 JWT Token 的使用
- 学习 OAuth2 授权流程
- 了解常见的安全漏洞(SQL 注入、XSS、CSRF)
练习题:
- 实现一个完整的 JWT 认证系统
- 实现 API 签名验证中间件
- 添加接口访问频率限制(限流)
2.3 消息队列
岗位要求:
- 熟悉消息队列的使用
- 了解消息队列的选型和应用场景
- 能够使用消息队列实现异步任务处理
Redis 作为消息队列:
import redis
import json
import time
r = redis.Redis(host='localhost', port=6379, db=0)
# 生产者
def produce_task(queue_name, task_data):
"""生产任务"""
r.lpush(queue_name, json.dumps(task_data))
print(f"任务已加入队列: {task_data}")
# 消费者
def consume_task(queue_name, timeout=0):
"""消费任务"""
while True:
# 阻塞式获取任务
task = r.brpop(queue_name, timeout=timeout)
if task:
task_data = json.loads(task[1])
print(f"处理任务: {task_data}")
# 处理任务逻辑
process_task(task_data)
def process_task(task_data):
"""处理任务"""
time.sleep(1) # 模拟处理时间
print(f"任务处理完成: {task_data}")
# 使用示例
produce_task("task_queue", {"type": "email", "to": "user@example.com"})
RabbitMQ 示例:
import pika
import json
# 连接 RabbitMQ
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='task_queue', durable=True)
# 生产者
def send_task(task_data):
"""发送任务"""
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=json.dumps(task_data),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
)
)
print(f"任务已发送: {task_data}")
# 消费者
def callback(ch, method, properties, body):
"""处理消息"""
task_data = json.loads(body)
print(f"收到任务: {task_data}")
# 处理任务
process_task(task_data)
# 确认消息已处理
ch.basic_ack(delivery_tag=method.delivery_tag)
# 设置消费者
channel.basic_qos(prefetch_count=1) # 公平分发
channel.basic_consume(
queue='task_queue',
on_message_callback=callback
)
# 开始消费
channel.start_consuming()
Celery 异步任务队列:
# celery_app.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def send_email(to, subject, body):
"""发送邮件任务"""
print(f"发送邮件到 {to}: {subject}")
# 实际发送邮件逻辑
return f"邮件已发送到 {to}"
@app.task
def process_data(data):
"""数据处理任务"""
print(f"处理数据: {data}")
# 数据处理逻辑
return "处理完成"
# 使用
from celery_app import send_email, process_data
# 异步执行任务
result = send_email.delay("user@example.com", "主题", "内容")
print(f"任务ID: {result.id}")
# 获取结果
task_result = result.get(timeout=10)
print(f"任务结果: {task_result}")
学习建议:
- 理解消息队列的作用和优势
- 学习常用消息队列(Redis、RabbitMQ、Kafka)
- 掌握任务队列的使用场景
- 了解消息持久化和可靠性保证
练习题:
- 使用 Redis 实现一个简单的任务队列
- 使用 Celery 实现异步邮件发送功能
- 实现消息队列的监控和重试机制
2.4 Docker 容器化
岗位要求:
- 具有 Docker 容器化及服务编排经验
- 熟悉基于 Docker、K8s 开发部署服务的流程
- 了解服务的部署和运维
Dockerfile 示例:
# Python 应用 Dockerfile
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose 示例:
# docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/mydb
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
volumes:
- .:/app
command: uvicorn main:app --reload --host 0.0.0.0
db:
image: postgres:13
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=mydb
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:6-alpine
ports:
- "6379:6379"
volumes:
postgres_data:
Python 中使用 Docker:
import docker
# 连接 Docker
client = docker.from_env()
# 构建镜像
def build_image(dockerfile_path, tag):
"""构建 Docker 镜像"""
client.images.build(
path=dockerfile_path,
tag=tag
)
print(f"镜像构建完成: {tag}")
# 运行容器
def run_container(image, ports, environment):
"""运行容器"""
container = client.containers.run(
image,
ports=ports,
environment=environment,
detach=True
)
print(f"容器已启动: {container.id}")
return container
# 查看容器日志
def get_logs(container_id):
"""获取容器日志"""
container = client.containers.get(container_id)
logs = container.logs()
return logs.decode('utf-8')
学习建议:
- 掌握 Docker 基本命令(build、run、push、pull)
- 学习 Dockerfile 编写
- 了解 Docker Compose 的使用
- 学习容器编排和最佳实践
练习题:
- 为你的 Python 项目编写 Dockerfile
- 使用 Docker Compose 搭建开发环境(应用+数据库+Redis)
- 实现多阶段构建优化镜像大小
2.5 微服务架构
岗位要求:
- 了解分布式系统及微服务架构
- 能够独立排查和解决服务问题
- 具有微服务下服务数据的埋点、链路耗时追踪经验
微服务基础架构:
# service_a.py - 服务A
from fastapi import FastAPI
import httpx
app = FastAPI()
@app.get("/api/service-a/data")
async def get_data():
# 调用服务B
async with httpx.AsyncClient() as client:
response = await client.get("http://service-b:8001/api/service-b/data")
data_b = response.json()
return {
"service": "A",
"data": "来自服务A",
"from_b": data_b
}
# service_b.py - 服务B
from fastapi import FastAPI
app = FastAPI()
@app.get("/api/service-b/data")
async def get_data():
return {
"service": "B",
"data": "来自服务B"
}
服务注册与发现:
# service_registry.py
from typing import Dict, List
import time
class ServiceRegistry:
def __init__(self):
self.services: Dict[str, List[dict]] = {}
def register(self, service_name: str, address: str, port: int):
"""注册服务"""
if service_name not in self.services:
self.services[service_name] = []
service_info = {
"address": address,
"port": port,
"timestamp": time.time()
}
self.services[service_name].append(service_info)
print(f"服务注册: {service_name} at {address}:{port}")
def discover(self, service_name: str) -> List[dict]:
"""服务发现"""
return self.services.get(service_name, [])
def health_check(self, service_name: str):
"""健康检查"""
# 实现健康检查逻辑
pass
# 使用示例
registry = ServiceRegistry()
registry.register("user-service", "localhost", 8001)
registry.register("order-service", "localhost", 8002)
services = registry.discover("user-service")
链路追踪示例:
import uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
app = FastAPI()
class TracingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# 生成追踪ID
trace_id = str(uuid.uuid4())
request.state.trace_id = trace_id
# 添加追踪头
response = await call_next(request)
response.headers["X-Trace-Id"] = trace_id
return response
app.add_middleware(TracingMiddleware)
@app.get("/api/data")
async def get_data(request: Request):
trace_id = request.state.trace_id
print(f"追踪ID: {trace_id}")
return {"data": "test", "trace_id": trace_id}
学习建议:
- 理解微服务架构的优缺点
- 学习服务间通信方式(HTTP、gRPC、消息队列)
- 了解服务注册与发现机制
- 掌握分布式系统的常见问题(一致性、可用性、分区容错)
练习题:
- 设计一个简单的微服务架构(用户服务、订单服务)
- 实现服务间的调用和错误处理
- 添加服务监控和日志收集
2.6 性能优化
岗位要求:
- 对于接口的性能可以进行优化
- 数据库 SQL 优化
- 负责后端性能瓶颈分析与优化
接口性能优化:
from functools import lru_cache
import time
from fastapi import FastAPI
import redis
app = FastAPI()
r = redis.Redis(host='localhost', port=6379, db=0)
# 1. 使用缓存
@lru_cache(maxsize=128)
def expensive_computation(n):
"""昂贵的计算(使用缓存)"""
time.sleep(0.1) # 模拟耗时操作
return n * n
# 2. Redis 缓存装饰器
def cache_result(expire=3600):
"""Redis 缓存装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"{func.__name__}:{args}:{kwargs}"
# 尝试从缓存获取
cached = r.get(cache_key)
if cached:
return cached.decode('utf-8')
# 执行函数
result = func(*args, **kwargs)
# 存入缓存
r.setex(cache_key, expire, str(result))
return result
return wrapper
return decorator
@cache_result(expire=3600)
def get_user_data(user_id: int):
"""获取用户数据(带缓存)"""
# 模拟数据库查询
time.sleep(0.5)
return {"user_id": user_id, "name": "张三"}
# 3. 数据库连接池
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
"postgresql://user:pass@localhost/db",
poolclass=QueuePool,
pool_size=10,
max_overflow=20
)
SQL 优化:
# 1. 使用索引
# CREATE INDEX idx_user_email ON users(email);
# 2. 避免 N+1 查询
# 不好的方式
def get_users_bad():
users = User.query.all()
for user in users:
posts = Post.query.filter_by(user_id=user.id).all() # N+1 查询
user.posts = posts
# 好的方式(使用 JOIN 或预加载)
def get_users_good():
from sqlalchemy.orm import joinedload
users = User.query.options(joinedload(User.posts)).all()
# 3. 使用批量操作
def create_users_bad(users_data):
for data in users_data:
user = User(**data)
db.session.add(user)
db.session.commit() # 多次提交
def create_users_good(users_data):
users = [User(**data) for data in users_data]
db.session.bulk_save_objects(users)
db.session.commit() # 一次提交
# 4. 分页查询
def get_users_paginated(page=1, per_page=20):
return User.query.paginate(
page=page,
per_page=per_page,
error_out=False
)
异步优化:
import asyncio
import aiohttp
from typing import List
# 串行请求(慢)
async def fetch_serial(urls: List[str]):
results = []
async with aiohttp.ClientSession() as session:
for url in urls:
async with session.get(url) as response:
results.append(await response.json())
return results
# 并发请求(快)
async def fetch_parallel(urls: List[str]):
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
results = []
for response in responses:
results.append(await response.json())
return results
学习建议:
- 掌握性能分析工具(cProfile、py-spy)
- 学习数据库查询优化技巧
- 了解缓存策略和实现
- 学习异步编程提升性能
练习题:
- 使用缓存优化一个慢查询接口
- 优化一个存在 N+1 查询问题的代码
- 使用异步编程提升批量 API 调用性能
2.7 测试与 CI/CD
岗位要求:
- 编写测试用例及自动化测试
- 熟悉 pytest 等测试框架
- 参与持续集成/持续部署流程
- 熟悉 CI/CD 流程
单元测试(pytest):
# test_user.py
import pytest
from app.models import User
def test_create_user():
"""测试创建用户"""
user = User(name="张三", email="zhangsan@example.com")
assert user.name == "张三"
assert user.email == "zhangsan@example.com"
def test_user_email_validation():
"""测试邮箱验证"""
with pytest.raises(ValueError):
User(name="张三", email="invalid-email")
@pytest.fixture
def sample_user():
"""测试夹具"""
return User(name="测试用户", email="test@example.com")
def test_user_with_fixture(sample_user):
"""使用夹具的测试"""
assert sample_user.name == "测试用户"
API 测试:
# test_api.py
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
def test_get_users():
"""测试获取用户列表"""
response = client.get("/api/users")
assert response.status_code == 200
assert isinstance(response.json(), list)
def test_create_user():
"""测试创建用户"""
user_data = {
"name": "测试用户",
"email": "test@example.com"
}
response = client.post("/api/users", json=user_data)
assert response.status_code == 201
assert response.json()["name"] == "测试用户"
CI/CD 配置(GitHub Actions):
# .github/workflows/ci.yml
name: CI/CD Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:13
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
run: |
pytest --cov=app --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v2
deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v2
- name: Deploy to production
run: |
echo "部署到生产环境"
# 实际的部署命令
学习建议:
- 掌握 pytest 测试框架
- 学习测试类型(单元测试、集成测试、端到端测试)
- 了解 CI/CD 工具(GitHub Actions、Jenkins、GitLab CI)
- 学习测试覆盖率的概念
练习题:
- 为你的 API 编写完整的测试用例
- 配置 GitHub Actions 实现自动化测试
- 实现自动化部署流程
三、高阶技能要求
高阶技能通常是特定领域或高级岗位的要求,掌握这些技能能够胜任更专业和复杂的项目。
3.1 大模型应用开发
岗位要求:
- 熟悉基于大模型的构建 Agent 智能体的开发
- 对于大模型的 API 接口有封装调用经验
- 对于 Prompt 有调优经验
- 熟悉 RAG 知识库的构建和工作机制
- 了解 MCP、function-call 等相关技术
- 大模型的微调训练:SFT、LoRA 等相关经验
大模型 API 调用:
import openai
from typing import List, Dict
class LLMClient:
def __init__(self, api_key: str, base_url: str = None):
self.client = openai.OpenAI(
api_key=api_key,
base_url=base_url or "https://api.openai.com/v1"
)
def chat_completion(self, messages: List[Dict], model: str = "gpt-3.5-turbo"):
"""聊天完成"""
response = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=0.7,
max_tokens=1000
)
return response.choices[0].message.content
def generate_text(self, prompt: str, **kwargs):
"""生成文本"""
messages = [{"role": "user", "content": prompt}]
return self.chat_completion(messages, **kwargs)
# 使用示例
client = LLMClient(api_key="your-api-key")
response = client.generate_text("解释一下 Python 的装饰器")
print(response)
Prompt 工程:
class PromptTemplate:
"""Prompt 模板"""
SYSTEM_PROMPT = """你是一个专业的 Python 开发助手。
你的任务是帮助用户解决编程问题,提供清晰的代码示例和解释。"""
def __init__(self):
self.system_prompt = self.SYSTEM_PROMPT
def build_prompt(self, user_query: str, context: str = None):
"""构建 Prompt"""
messages = [
{"role": "system", "content": self.system_prompt}
]
if context:
messages.append({
"role": "system",
"content": f"上下文信息:{context}"
})
messages.append({
"role": "user",
"content": user_query
})
return messages
def optimize_prompt(self, original_prompt: str) -> str:
"""优化 Prompt"""
# 添加明确的指令
optimized = f"""请按照以下要求回答:
1. 提供清晰的解释
2. 给出代码示例
3. 说明注意事项
问题:{original_prompt}"""
return optimized
# 使用
template = PromptTemplate()
prompt = template.build_prompt(
"如何实现一个单例模式?",
context="Python 3.9+"
)
RAG 知识库构建:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.document_loaders import TextLoader
class RAGSystem:
def __init__(self, embedding_model: str = "text-embedding-ada-002"):
self.embeddings = OpenAIEmbeddings(model=embedding_model)
self.vectorstore = None
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
def build_knowledge_base(self, documents_path: str):
"""构建知识库"""
# 加载文档
loader = TextLoader(documents_path)
documents = loader.load()
# 文档切片
texts = self.text_splitter.split_documents(documents)
# 创建向量存储
self.vectorstore = Chroma.from_documents(
texts,
self.embeddings
)
print(f"知识库构建完成,包含 {len(texts)} 个文档片段")
def search(self, query: str, k: int = 3):
"""搜索相关文档"""
if not self.vectorstore:
raise ValueError("知识库未构建")
docs = self.vectorstore.similarity_search(query, k=k)
return docs
def rag_query(self, query: str, llm_client):
"""RAG 查询"""
# 1. 检索相关文档
relevant_docs = self.search(query)
context = "\n".join([doc.page_content for doc in relevant_docs])
# 2. 构建 Prompt
prompt = f"""基于以下上下文信息回答问题:
上下文:
{context}
问题:{query}
请基于上下文信息回答,如果上下文中没有相关信息,请说明。"""
# 3. 调用 LLM
response = llm_client.generate_text(prompt)
return response
# 使用示例
rag = RAGSystem()
rag.build_knowledge_base("documents.txt")
result = rag.rag_query("Python 装饰器是什么?", client)
Agent 智能体开发:
from typing import List, Dict, Callable
import json
class Agent:
def __init__(self, llm_client, tools: List[Callable] = None):
self.llm_client = llm_client
self.tools = tools or []
self.conversation_history = []
def add_tool(self, tool_func: Callable, description: str):
"""添加工具"""
self.tools.append({
"function": tool_func,
"name": tool_func.__name__,
"description": description
})
def execute_tool(self, tool_name: str, parameters: dict):
"""执行工具"""
for tool in self.tools:
if tool["name"] == tool_name:
return tool["function"](**parameters)
return None
def chat(self, user_input: str):
"""对话"""
# 构建工具描述
tools_desc = "\n".join([
f"- {tool['name']}: {tool['description']}"
for tool in self.tools
])
# 构建 Prompt
system_prompt = f"""你是一个智能助手,可以使用以下工具:
{tools_desc}
当需要使用工具时,请以 JSON 格式回复:
{{"action": "use_tool", "tool": "tool_name", "parameters": {{...}}}}
否则直接回答用户问题。"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_input}
]
response = self.llm_client.chat_completion(messages)
# 检查是否需要使用工具
try:
action = json.loads(response)
if action.get("action") == "use_tool":
tool_result = self.execute_tool(
action["tool"],
action["parameters"]
)
return f"工具执行结果:{tool_result}"
except:
pass
return response
# 使用示例
def get_weather(city: str) -> str:
"""获取天气"""
return f"{city}的天气是晴天,25°C"
agent = Agent(client)
agent.add_tool(get_weather, "获取指定城市的天气信息")
response = agent.chat("北京今天天气怎么样?")
学习建议:
- 学习 Prompt 工程技巧
- 掌握向量数据库的使用
- 了解 RAG 架构和实现
- 学习 Agent 开发框架(LangChain、AutoGPT)
练习题:
- 实现一个简单的 RAG 问答系统
- 开发一个能够调用工具的 Agent
- 优化 Prompt 提升回答质量
3.2 分布式系统
岗位要求:
- 了解分布式系统及微服务架构
- 能够独立排查和解决服务问题
- 理解分布式系统的常见问题
分布式锁:
import redis
import time
import uuid
class DistributedLock:
def __init__(self, redis_client, key, timeout=10):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
def acquire(self):
"""获取锁"""
end = time.time() + self.timeout
while time.time() < end:
if self.redis.set(
self.key,
self.identifier,
nx=True,
ex=self.timeout
):
return True
time.sleep(0.001)
return False
def release(self):
"""释放锁"""
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis.eval(script, 1, self.key, self.identifier)
# 使用
r = redis.Redis()
lock = DistributedLock(r, "resource")
if lock.acquire():
try:
# 执行需要加锁的操作
pass
finally:
lock.release()
分布式任务调度:
import asyncio
from datetime import datetime
import json
class DistributedScheduler:
def __init__(self, redis_client):
self.redis = redis_client
self.task_queue = "task_queue"
self.result_queue = "result_queue"
async def schedule_task(self, task_type: str, data: dict, delay: int = 0):
"""调度任务"""
task = {
"id": str(uuid.uuid4()),
"type": task_type,
"data": data,
"scheduled_at": datetime.now().isoformat(),
"execute_at": (datetime.now().timestamp() + delay)
}
# 如果延迟执行,使用有序集合
if delay > 0:
self.redis.zadd(
"scheduled_tasks",
{json.dumps(task): task["execute_at"]}
)
else:
# 立即执行,加入队列
self.redis.lpush(self.task_queue, json.dumps(task))
return task["id"]
async def process_tasks(self):
"""处理任务"""
while True:
# 检查延迟任务
now = datetime.now().timestamp()
tasks = self.redis.zrangebyscore(
"scheduled_tasks",
0,
now
)
for task_json in tasks:
task = json.loads(task_json)
self.redis.lpush(self.task_queue, task_json)
self.redis.zrem("scheduled_tasks", task_json)
# 处理队列任务
task_json = self.redis.brpop(self.task_queue, timeout=1)
if task_json:
task = json.loads(task_json[1])
result = await self.execute_task(task)
# 保存结果
self.redis.lpush(
self.result_queue,
json.dumps({"task_id": task["id"], "result": result})
)
学习建议:
- 理解分布式系统的 CAP 定理
- 学习分布式一致性算法
- 掌握分布式锁、分布式事务
- 了解服务发现和负载均衡
练习题:
- 实现一个分布式任务队列
- 实现分布式锁机制
- 设计一个分布式系统的监控方案
3.3 Kubernetes (K8s)
岗位要求:
- 熟悉 K8s、Docker 等容器类技术
- 熟悉基于 docker、k8s 开发部署服务的流程
Kubernetes 部署配置:
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: python-app
spec:
replicas: 3
selector:
matchLabels:
app: python-app
template:
metadata:
labels:
app: python-app
spec:
containers:
- name: python-app
image: myregistry/python-app:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
---
# service.yaml
apiVersion: v1
kind: Service
metadata:
name: python-app-service
spec:
selector:
app: python-app
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
Python 与 K8s 交互:
from kubernetes import client, config
# 加载配置
config.load_incluster_config() # 在集群内
# config.load_kube_config() # 本地开发
v1 = client.CoreV1Api()
# 获取 Pod 列表
def list_pods(namespace="default"):
pods = v1.list_namespaced_pod(namespace)
for pod in pods.items:
print(f"Pod: {pod.metadata.name}, Status: {pod.status.phase}")
# 创建 Deployment
def create_deployment(name, image, replicas=1):
apps_v1 = client.AppsV1Api()
deployment = client.V1Deployment(
metadata=client.V1ObjectMeta(name=name),
spec=client.V1DeploymentSpec(
replicas=replicas,
selector=client.V1LabelSelector(
match_labels={"app": name}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": name}),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name=name,
image=image,
ports=[client.V1ContainerPort(container_port=8000)]
)
]
)
)
)
)
apps_v1.create_namespaced_deployment(
namespace="default",
body=deployment
)
学习建议:
- 掌握 K8s 基本概念(Pod、Service、Deployment)
- 学习 K8s 资源管理
- 了解 K8s 网络和存储
- 学习 Helm 包管理
练习题:
- 编写 K8s 部署配置文件
- 使用 Python 客户端操作 K8s
- 实现应用的自动扩缩容
3.4 大数据技术栈
岗位要求:
- 有数据开发经验,熟悉大数据技术栈
- 包括但不限于 Hive/Spark/Kafka/Redis 等技术
- 熟悉云原生数据架构
Spark 数据处理:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count
# 创建 Spark Session
spark = SparkSession.builder \
.appName("DataProcessing") \
.getOrCreate()
# 读取数据
df = spark.read.csv("data.csv", header=True, inferSchema=True)
# 数据处理
result = df \
.filter(col("age") > 18) \
.groupBy("city") \
.agg(
count("*").alias("count"),
avg("salary").alias("avg_salary")
)
# 写入数据
result.write.mode("overwrite").parquet("output/")
Kafka 生产者/消费者:
from kafka import KafkaProducer, KafkaConsumer
import json
# 生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('my_topic', {'key': 'value'})
producer.flush()
# 消费者
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(f"收到消息: {message.value}")
学习建议:
- 学习 Spark 数据处理
- 掌握 Kafka 消息队列
- 了解大数据处理流程
- 学习数据仓库概念
练习题:
- 使用 Spark 处理大规模数据
- 实现 Kafka 生产者和消费者
- 构建一个数据管道
3.5 区块链开发
岗位要求:
- 理解区块链底层结构
- 具备 PoW/PoS 共识经验
- 智能合约/Web3.py 经验
简单区块链实现:
import hashlib
import json
from datetime import datetime
class Block:
def __init__(self, index, transactions, previous_hash):
self.index = index
self.transactions = transactions
self.previous_hash = previous_hash
self.timestamp = datetime.now().isoformat()
self.nonce = 0
self.hash = self.calculate_hash()
def calculate_hash(self):
block_string = json.dumps({
"index": self.index,
"transactions": self.transactions,
"previous_hash": self.previous_hash,
"timestamp": self.timestamp,
"nonce": self.nonce
}, sort_keys=True)
return hashlib.sha256(block_string.encode()).hexdigest()
def mine_block(self, difficulty):
target = "0" * difficulty
while self.hash[:difficulty] != target:
self.nonce += 1
self.hash = self.calculate_hash()
print(f"区块已挖出: {self.hash}")
class Blockchain:
def __init__(self):
self.chain = [self.create_genesis_block()]
self.difficulty = 2
self.pending_transactions = []
def create_genesis_block(self):
return Block(0, [], "0")
def add_transaction(self, transaction):
self.pending_transactions.append(transaction)
def mine_pending_transactions(self):
block = Block(
len(self.chain),
self.pending_transactions,
self.chain[-1].hash
)
block.mine_block(self.difficulty)
self.chain.append(block)
self.pending_transactions = []
def get_latest_block(self):
return self.chain[-1]
def is_chain_valid(self):
for i in range(1, len(self.chain)):
current = self.chain[i]
previous = self.chain[i-1]
if current.hash != current.calculate_hash():
return False
if current.previous_hash != previous.hash:
return False
return True
学习建议:
- 理解区块链基本原理
- 学习共识算法(PoW、PoS)
- 了解智能合约开发
- 学习 Web3.py 库
练习题:
- 实现一个简单的区块链
- 实现 PoW 共识算法
- 使用 Web3.py 与以太坊交互
3.6 文档解析与 OCR
岗位要求:
- 负责多格式文档(PDF/扫描件/图片)的信息抽取
- 熟练使用文档解析与 OCR 工具库
- 具有大语言模型提示优化经验
PDF 解析:
import PyPDF2
import pdfplumber
# 使用 PyPDF2
def extract_text_pypdf2(pdf_path):
with open(pdf_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text()
return text
# 使用 pdfplumber(更准确)
def extract_text_pdfplumber(pdf_path):
with pdfplumber.open(pdf_path) as pdf:
text = ""
for page in pdf.pages:
text += page.extract_text()
return text
# 提取表格
def extract_tables(pdf_path):
with pdfplumber.open(pdf_path) as pdf:
tables = []
for page in pdf.pages:
tables.extend(page.extract_tables())
return tables
OCR 识别:
from PIL import Image
import pytesseract
import cv2
def ocr_image(image_path):
"""OCR 识别"""
image = Image.open(image_path)
text = pytesseract.image_to_string(image, lang='chi_sim+eng')
return text
def preprocess_image(image_path):
"""图像预处理"""
img = cv2.imread(image_path)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 二值化
_, binary = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
return binary
# 使用 PaddleOCR
from paddleocr import PaddleOCR
ocr = PaddleOCR(use_angle_cls=True, lang='ch')
result = ocr.ocr('image.jpg', cls=True)
学习建议:
- 掌握 PDF 解析库(PyPDF2、pdfplumber)
- 学习 OCR 工具(Tesseract、PaddleOCR)
- 了解图像预处理技术
- 学习文档结构化提取
练习题:
- 实现 PDF 文档信息提取
- 使用 OCR 识别图片中的文字
- 构建文档解析和结构化提取系统
3.7 爬虫技术
岗位要求:
- 熟悉爬虫技术(Scrapy、Selenium)
- 了解分布式爬虫
Scrapy 爬虫:
import scrapy
class NewsSpider(scrapy.Spider):
name = 'news'
start_urls = ['https://example.com/news']
def parse(self, response):
# 提取标题
titles = response.css('h2.title::text').getall()
# 提取链接
links = response.css('a::attr(href)').getall()
for title, link in zip(titles, links):
yield {
'title': title,
'link': response.urljoin(link)
}
# 跟进链接
next_page = response.css('a.next::attr(href)').get()
if next_page:
yield response.follow(next_page, self.parse)
Selenium 自动化:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
driver = webdriver.Chrome()
driver.get("https://example.com")
# 等待元素加载
element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "myElement"))
)
# 操作元素
driver.find_element(By.ID, "username").send_keys("user")
driver.find_element(By.ID, "password").send_keys("pass")
driver.find_element(By.ID, "submit").click()
driver.quit()
学习建议:
- 掌握 Scrapy 框架
- 学习 Selenium 自动化
- 了解反爬虫机制
- 学习分布式爬虫架构
练习题:
- 使用 Scrapy 爬取新闻网站
- 使用 Selenium 处理动态网页
- 实现分布式爬虫系统
四、学习路径建议
阶段一:基础技能(1-3个月)
Python 基础(2周)
- 完成 Python 语法学习(参考 page6.md)
- 练习编写小程序
Web 框架(3周)
- 选择一个框架深入学习(推荐 FastAPI 或 Django)
- 完成一个简单的 CRUD 项目
数据库(2周)
- 学习 MySQL/PostgreSQL
- 掌握 ORM 框架使用
Linux 和 Git(1周)
- 熟悉常用 Linux 命令
- 掌握 Git 基本操作
阶段二:进阶技能(3-6个月)
异步编程(2周)
- 学习 asyncio
- 实现异步 API
接口鉴权(1周)
- 实现 JWT 认证
- 学习 OAuth2
Docker(2周)
- 学习 Docker 基础
- 容器化你的项目
性能优化(持续)
- 学习缓存策略
- SQL 优化实践
测试和 CI/CD(2周)
- 编写测试用例
- 配置 CI/CD
阶段三:高阶技能(6-12个月)
根据职业方向选择:
- 大模型方向:学习 LLM API、RAG、Agent 开发
- 分布式方向:学习 K8s、微服务架构
- 大数据方向:学习 Spark、Kafka
- 区块链方向:学习区块链原理、智能合约
学习资源推荐
官方文档
- Python: https://docs.python.org/
- FastAPI: https://fastapi.tiangolo.com/
- Django: https://docs.djangoproject.com/
在线课程
- 慕课网、极客时间
- Coursera、edX
实践项目
- GitHub 开源项目
- 自己设计项目
刷题平台
- LeetCode(算法)
- 牛客网(面试题)
总结
Python 后端开发是一个需要持续学习的领域。建议:
- 循序渐进:先掌握基础,再学习进阶
- 实践为主:多写代码,多做项目
- 关注行业:了解新技术和趋势
- 建立知识体系:系统化学习,不要碎片化
希望这份指南能够帮助你制定学习计划,快速达到岗位要求!