python后台api接口开发教程对接easyclick

教程包包括3个大的部分

1.api开发的基础知识

2.api开发实战部分

3.上线部署

4.api对接,这里api可以对接到任何地方我们用的easyclick 录的教程

 

python后台api接口开发教程对接easyclick

 

 

 

教程总共7节。总大小2.73G具体目录参考截图python后台api接口开发教程对接easyclick

 

教程笔记:

 

 

1. python开发环境和pycharm安装破解

链接:https://pan.baidu.com/s/1INJYzcZSPBX8g48vD61pgg?pwd=7wby 
提取码:7wby
import time

now = int(time.time())

print(now)



2 . 我的第一个接口


#安装fastapi
 pip install fastapi

import uvicorn
from fastapi import FastAPI, Request, Response
from pydantic import BaseModel

app = FastAPI()


@app.get("/", summary="我是GET", description="我是描述信息")
async def index_get(request: Request, Ip: str = "127.0.0.2", token: str = None):
    header = request.headers
    print("header", header)
    ip = request.client.host
    print("ip:", ip)
    return {"code": 200, "msg": "GET", "ip": Ip, "token": token}


class Post_In(BaseModel):
    ip: str
    token: str


@app.post("/", summary="我是POST", description="我是描述信息")
async def index_post(request: Request, post_in: Post_In):
    return {"code": 200, "msg": "POST", "ip": post_in.ip, "token": post_in.token}


if __name__ == '__main__':
    uvicorn.run(app="run:app", host="127.0.0.1", port=8000, reload=True, log_level="debug", workers=4)

接口开发的基础知识点

# 导入fastapi模块
import uvicorn
import random
from fastapi import FastAPI, Request, Response, HTTPException
from pydantic import BaseModel, Field, validator

# 创建FastAPI实例
app = FastAPI()


# 请求和响应的例子
@app.get("/request_response")
async def request_response(request: Request, response: Response):
    print(request.headers)
    print(request.client.host)
    print(request.client.port)
    print(request.cookies)
    response.set_cookie(key="test", value="test_value")
    return {"status": 200, "message": "success", "data": response}


class Phone(BaseModel):
    age: int
    bank_card: str = Field(min_length=5, max_length=15, examples=["huawei"], description="设备品牌")
    bank_card_number: str = Field(max_length=20, min_length=8, default='123456', description="银行卡号")
    is_locked: bool = False

    @validator('age')
    def validate_bank_card_number(cls, v):
        if v < 5 or v > 15:
            raise HTTPException(status_code=505, detail="年龄必须在5-15岁之间!!!!")
        return v


# 请求参数的例子
@app.post("/get_phone")
async def response_response(my_phone: Phone, pheon: str = None, num: int = 1):
    print(my_phone.phone)
    return {"status": 200, "message": "success", "data": ""}


# 数据校验的例子
@app.post("/validate_phone")
async def validate_phone(phone: Phone):
    return {"status": 200, "message": "success", "data": ""}


if __name__ == '__main__':
    uvicorn.run(app="demo2:app", host="127.0.0.1", port=8000, reload=True, log_level="debug", workers=4)

3. 数据库ORM



#安装orm
pip install tortoise-orm

#官方文档

https://tortoise.github.io/migration.html


#安装迁移
pip install aerich 


模型创建

创建 models.model

模型迁移

# 生成迁移文件
aerich init -t settings.TORTOISE_ORM_CONFIG
# 执行迁移
aerich init-db# 模型初始化
#模型跟踪
aerich migrate --name=xxx
# 模型更新
aerich upgrade
#模型回退
aerich downgrade

数据库配置

TORTOISE_ORM = {
    'connections': {
        'default': {
            # 'engine': 'tortoise.backends.asyncpg',  PostgreSQL
            'engine': 'tortoise.backends.mysql',  # MySQL or Mariadb
            'credentials': {
                'host': '127.0.0.1',
                'port': '3306',
                'user': '数据库用户名',
                'password': '数据库密码',
                'database': '数据库名',
                'minsize': 1,
                'maxsize': 5,
                'charset': 'utf8mb4',
                "echo": True
            }
        },
    },
    # "connections": {"default": "mysql://root:zxc123456@127.0.0.1:3306/wechatScan"},
    'apps': {
        'models': {
            # models:models 找到对应自定义的model.py
            'models': ["models.models", "aerich.models"],  # aerich.models迁移模型
            'default_connection': 'default',
        }
    },
    'use_tz': False,
    'timezone': 'Asia/Shanghai',
}
#注册到app中
register_tortoise(
    app=app,
    config=TORTOISE_ORM,
)

@app.post("/add", summary="添加手机号")
async def request_response():
    for i in range(10):
        phone_number = await generate_random_phone_number()
        await Phone.create(phone=phone_number)
    return {"status": 200, "message": "success", "data": ""}

@app.delete("/delete/", summary="删除手机号")
async def delete(phone: int = None):
    if phone is None:
        return Response(status_code=400, content={"status": 400, "message": "pip 不能为空", "data": ""})
    await Phone.filter(phone=phone).delete()
    return {"status": 200, "message": "success", "data": ""}

@app.put("/update", summary="更新手机号")
@transactions.atomic()
async def update(phone: int = None):
    if phone is None:
        return Response(status_code=400, content={"status": 400, "message": "pip 不能为空", "data": ""})
    phone_number = await generate_random_phone_number()
    print("新的手机号:" + phone_number)
    await Phone.filter(phone=phone, is_lock=0).update(phone=phone_number, is_lock=1)
    return {"status": 200, "message": "success", "data": ""}

@app.get("/get", summary="获取手机号")
async def get(phone: int = None):
    try:
        if phone is None:
            return {"status": 400, "message": "pip 不能为空", "data": ""}
        phone_obj = await Phone.get_or_none(phone=phone)
        if phone_obj is None:
            return {"status": 400, "message": "手机号不存在", "data": ""}
        return {"status": 200, "message": "success", "data": phone_obj}
    except Exception as e:
        return {"status": 500, "message": str(e), "data": ""}


@app.get("/list", summary="获取手机号列表")
async def list():
    try:
        phone_list = await Phone.all().count()
        return {"status": 200, "message": "success", "num": phone_list}
    except Exception as e:
        return {"status": 500, "message": str(e), "data": ""}

####事务模式

# 创建一个异步路由,用于添加手机号
@app.post("/add_phone", summary="添加手机号")
async def add_phone():
    # 创建一个原子事务,用于将操作绑定到一个事务
    @transactions.atomic()
    async def bound_to_fall():
        # 异步创建一个手机号记录
        await Phone.create(phone="13800138000")
        print("添加成功13800138000")
        phone_number = "8717013231"
        # 异步创建另一个手机号记录
        await Phone.create(phone=phone_number)
        print("8717013231添加成功")

    try:
        # 执行原子事务
        await bound_to_fall()
        return {"status": 200, "message": "success", "data": ""}
    except Exception as e:
        print(e)
        return {"status": 500, "message": str(e), "data": ""}e), "data": ""}

文件上传


@app.post("/upload_image", summary="上传图片")
async def upload_image(base64_image: str, image_name: str):
    base64_image = te.b64_img("1.png")

    # 上传图片的处理逻辑
    # 将 Base64 编码的图片数据解码并保存为文件
    str_img = base64.b64decode(base64_image)
    with open(image_name, "wb") as f:
        f.write(str_img)

    return {"status": 200, "message": "success", "data": ""}

跨域

# 3、配置 CORSMiddleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 允许所有源,也可以指定具体源
    allow_credentials=True,
    allow_methods=["*"],  # 允许所有方法
    allow_headers=["*"],  # 允许所有头
)

部署前整理

pip freeze > requirements.txt

#关闭文档显示
#关闭token

#准备服务器域名
# 启动模式
worker_class = 'uvicorn.workers.UvicornWorker'

easyclick测试代码

var cons={}
cons.token="HNFKLJHWHBELOHFLWKHGALKDFASDFASDFASDFSADFJWHBEIU"
cons.host="http://8.217.136.232:22222"

cons.addphone=function (unm) {
    try {
        sleep(1000);
        let url=cons.host+"/api/api/phone/phone?num="+unm+"&token="+cons.token
        let result=http.httpPost(url,"","",30*1000,null)
        if (result.indexOf("添加成功")) {
            let jsonobj=JSON.parse(result)
            if (jsonobj.data.length>0){
                return jsonobj.data
            }
        }
    }catch (e) {
        loge(e.message)
        return false
    }
}

cons.getphone=function () {
    try {
        sleep(1000);
        let url=cons.host+"/api/api/phone/phone?token="+cons.token
        let result=http.httpGet(url,null,30*1000,null)
        logd(result)
        if (result) {
            let result1=JSON.parse(result)
            if (result1.status===200) {
                logd("手机号:"+result1.data.phone)
                return result1.data.phone
            }
        }
    }catch (e) {
        loge(e.message)
        return false
    }
}

cons.putphone=function (PHOEN_NUM,WECHAT_NAME) {
    let j = {
        "phone_number": PHOEN_NUM,
        "Wechat_name": WECHAT_NAME,
    }
    try {
        let pm = {
            "url": cons.host + "/api/api/phone/phone?phone_number=" + j.phone_number + "&Wechat_name=" + j.Wechat_name + "&token=" + cons.token,
            "method": "PUT",
            "timeout": 30 * 1000
        }
        let result = http.request(pm)
        if (result.statusCode == 200) {
            let ssss=JSON.parse(result.body)
            if (ssss.status==200) {
                logd(JSON.stringify(ssss))
                return true
            }else {
                logd(JSON.stringify(ssss))
                return  false
            }
        } else {
            logd(result.body)
            return false
        }
    } catch (e) {
        logd(e.message)
        return false
    }
}



let new_phone_num=cons.getphone()
logd(new_phone_num)
new_phone_num="13728892004"
let phone_num=cons.putphone(new_phone_num,"我是微信昵称")
logd(phone_num)

给TA打赏
共{{data.count}}人
人已打赏
EasyClick内部资源脚本开发

easyclick本地素材包替换陌陌素材包下载方法

2023-9-25 9:08:23

PC软件工具仓库手机工具手机操作精品软件脚本开发工具

安卓投屏软件:Anlink史上最牛X投屏工具安装使用教程

2022-9-28 16:13:40

重要声明

本站资源大多来自网络,如有侵犯你的权益请联系管理员,我们会第一时间进行审核删除。站内资源为网友个人学习或测试研究使用,未经原版权作者许可,禁止用于任何商业途径!请在下载24小时内删除!

按键精灵加交流群1:财神汇脚本开发交流群 按键精灵加交流群2:财神汇脚本开发交流群 Easyclick开发交流群:EasyClick学习交流群


如果遇到付费才可观看的文章,建议升级会员或者成为认证用户。全站所有资源任意下免费看本站资源少部分采用7z压缩为防止有人压缩软件不支持7z格式7z解压,建议下载7-zip,zip、rar解压,建议下载WinRAR

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索