教程包包括3个大的部分
1.api开发的基础知识
2.api开发实战部分
3.上线部署
4.api对接,这里api可以对接到任何地方我们用的easyclick 录的教程
教程总共7节。总大小2.73G具体目录参考截图
教程笔记:
1. python开发环境和pycharm安装破解
链接:https://pan.baidu.com/s/1INJYzcZSPBX8g48vD61pgg?pwd=7wby
提取码:7wby
import time
now = int(time.time())
print(now)
2 . 我的第一个接口
#安装fastapi
pip install fastapi
import uvicorn
from fastapi import FastAPI, Request, Response
from pydantic import BaseModel
app = FastAPI()
@app.get("/", summary="我是GET", description="我是描述信息")
async def index_get(request: Request, Ip: str = "127.0.0.2", token: str = None):
header = request.headers
print("header", header)
ip = request.client.host
print("ip:", ip)
return {"code": 200, "msg": "GET", "ip": Ip, "token": token}
class Post_In(BaseModel):
ip: str
token: str
@app.post("/", summary="我是POST", description="我是描述信息")
async def index_post(request: Request, post_in: Post_In):
return {"code": 200, "msg": "POST", "ip": post_in.ip, "token": post_in.token}
if __name__ == '__main__':
uvicorn.run(app="run:app", host="127.0.0.1", port=8000, reload=True, log_level="debug", workers=4)
接口开发的基础知识点
# 导入fastapi模块
import uvicorn
import random
from fastapi import FastAPI, Request, Response, HTTPException
from pydantic import BaseModel, Field, validator
# 创建FastAPI实例
app = FastAPI()
# 请求和响应的例子
@app.get("/request_response")
async def request_response(request: Request, response: Response):
print(request.headers)
print(request.client.host)
print(request.client.port)
print(request.cookies)
response.set_cookie(key="test", value="test_value")
return {"status": 200, "message": "success", "data": response}
class Phone(BaseModel):
age: int
bank_card: str = Field(min_length=5, max_length=15, examples=["huawei"], description="设备品牌")
bank_card_number: str = Field(max_length=20, min_length=8, default='123456', description="银行卡号")
is_locked: bool = False
@validator('age')
def validate_bank_card_number(cls, v):
if v < 5 or v > 15:
raise HTTPException(status_code=505, detail="年龄必须在5-15岁之间!!!!")
return v
# 请求参数的例子
@app.post("/get_phone")
async def response_response(my_phone: Phone, pheon: str = None, num: int = 1):
print(my_phone.phone)
return {"status": 200, "message": "success", "data": ""}
# 数据校验的例子
@app.post("/validate_phone")
async def validate_phone(phone: Phone):
return {"status": 200, "message": "success", "data": ""}
if __name__ == '__main__':
uvicorn.run(app="demo2:app", host="127.0.0.1", port=8000, reload=True, log_level="debug", workers=4)
3. 数据库ORM
#安装orm
pip install tortoise-orm
#官方文档
https://tortoise.github.io/migration.html
#安装迁移
pip install aerich
模型创建
创建 models.model
模型迁移
# 生成迁移文件
aerich init -t settings.TORTOISE_ORM_CONFIG
# 执行迁移
aerich init-db# 模型初始化
#模型跟踪
aerich migrate --name=xxx
# 模型更新
aerich upgrade
#模型回退
aerich downgrade
数据库配置
TORTOISE_ORM = {
'connections': {
'default': {
# 'engine': 'tortoise.backends.asyncpg', PostgreSQL
'engine': 'tortoise.backends.mysql', # MySQL or Mariadb
'credentials': {
'host': '127.0.0.1',
'port': '3306',
'user': '数据库用户名',
'password': '数据库密码',
'database': '数据库名',
'minsize': 1,
'maxsize': 5,
'charset': 'utf8mb4',
"echo": True
}
},
},
# "connections": {"default": "mysql://root:[email protected]:3306/wechatScan"},
'apps': {
'models': {
# models:models 找到对应自定义的model.py
'models': ["models.models", "aerich.models"], # aerich.models迁移模型
'default_connection': 'default',
}
},
'use_tz': False,
'timezone': 'Asia/Shanghai',
}
#注册到app中
register_tortoise(
app=app,
config=TORTOISE_ORM,
)
增
@app.post("/add", summary="添加手机号")
async def request_response():
for i in range(10):
phone_number = await generate_random_phone_number()
await Phone.create(phone=phone_number)
return {"status": 200, "message": "success", "data": ""}
删
@app.delete("/delete/", summary="删除手机号")
async def delete(phone: int = None):
if phone is None:
return Response(status_code=400, content={"status": 400, "message": "pip 不能为空", "data": ""})
await Phone.filter(phone=phone).delete()
return {"status": 200, "message": "success", "data": ""}
改
@app.put("/update", summary="更新手机号")
@transactions.atomic()
async def update(phone: int = None):
if phone is None:
return Response(status_code=400, content={"status": 400, "message": "pip 不能为空", "data": ""})
phone_number = await generate_random_phone_number()
print("新的手机号:" + phone_number)
await Phone.filter(phone=phone, is_lock=0).update(phone=phone_number, is_lock=1)
return {"status": 200, "message": "success", "data": ""}
查
@app.get("/get", summary="获取手机号")
async def get(phone: int = None):
try:
if phone is None:
return {"status": 400, "message": "pip 不能为空", "data": ""}
phone_obj = await Phone.get_or_none(phone=phone)
if phone_obj is None:
return {"status": 400, "message": "手机号不存在", "data": ""}
return {"status": 200, "message": "success", "data": phone_obj}
except Exception as e:
return {"status": 500, "message": str(e), "data": ""}
@app.get("/list", summary="获取手机号列表")
async def list():
try:
phone_list = await Phone.all().count()
return {"status": 200, "message": "success", "num": phone_list}
except Exception as e:
return {"status": 500, "message": str(e), "data": ""}
####事务模式
# 创建一个异步路由,用于添加手机号
@app.post("/add_phone", summary="添加手机号")
async def add_phone():
# 创建一个原子事务,用于将操作绑定到一个事务
@transactions.atomic()
async def bound_to_fall():
# 异步创建一个手机号记录
await Phone.create(phone="13800138000")
print("添加成功13800138000")
phone_number = "8717013231"
# 异步创建另一个手机号记录
await Phone.create(phone=phone_number)
print("8717013231添加成功")
try:
# 执行原子事务
await bound_to_fall()
return {"status": 200, "message": "success", "data": ""}
except Exception as e:
print(e)
return {"status": 500, "message": str(e), "data": ""}e), "data": ""}
文件上传
@app.post("/upload_image", summary="上传图片")
async def upload_image(base64_image: str, image_name: str):
base64_image = te.b64_img("1.png")
# 上传图片的处理逻辑
# 将 Base64 编码的图片数据解码并保存为文件
str_img = base64.b64decode(base64_image)
with open(image_name, "wb") as f:
f.write(str_img)
return {"status": 200, "message": "success", "data": ""}
跨域
# 3、配置 CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有源,也可以指定具体源
allow_credentials=True,
allow_methods=["*"], # 允许所有方法
allow_headers=["*"], # 允许所有头
)
部署前整理
pip freeze > requirements.txt
#关闭文档显示
#关闭token
#准备服务器域名
# 启动模式
worker_class = 'uvicorn.workers.UvicornWorker'
easyclick测试代码
var cons={}
cons.token="HNFKLJHWHBELOHFLWKHGALKDFASDFASDFASDFSADFJWHBEIU"
cons.host="http://8.217.136.232:22222"
cons.addphone=function (unm) {
try {
sleep(1000);
let url=cons.host+"/api/api/phone/phone?num="+unm+"&token="+cons.token
let result=http.httpPost(url,"","",30*1000,null)
if (result.indexOf("添加成功")) {
let jsonobj=JSON.parse(result)
if (jsonobj.data.length>0){
return jsonobj.data
}
}
}catch (e) {
loge(e.message)
return false
}
}
cons.getphone=function () {
try {
sleep(1000);
let url=cons.host+"/api/api/phone/phone?token="+cons.token
let result=http.httpGet(url,null,30*1000,null)
logd(result)
if (result) {
let result1=JSON.parse(result)
if (result1.status===200) {
logd("手机号:"+result1.data.phone)
return result1.data.phone
}
}
}catch (e) {
loge(e.message)
return false
}
}
cons.putphone=function (PHOEN_NUM,WECHAT_NAME) {
let j = {
"phone_number": PHOEN_NUM,
"Wechat_name": WECHAT_NAME,
}
try {
let pm = {
"url": cons.host + "/api/api/phone/phone?phone_number=" + j.phone_number + "&Wechat_name=" + j.Wechat_name + "&token=" + cons.token,
"method": "PUT",
"timeout": 30 * 1000
}
let result = http.request(pm)
if (result.statusCode == 200) {
let ssss=JSON.parse(result.body)
if (ssss.status==200) {
logd(JSON.stringify(ssss))
return true
}else {
logd(JSON.stringify(ssss))
return false
}
} else {
logd(result.body)
return false
}
} catch (e) {
logd(e.message)
return false
}
}
let new_phone_num=cons.getphone()
logd(new_phone_num)
new_phone_num="13728892004"
let phone_num=cons.putphone(new_phone_num,"我是微信昵称")
logd(phone_num)