提到Python,不得不感叹,简直不要太好用...
那么在Python中如何编写服务端的接口呢,方式有许多,今天主要说一下Flask,Flask是一个用Python编写的Web应用程序框架,具有轻量级和简洁性,还有强大的可扩展性等优点。
最简单的示例:
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello_world():
return 'Hello, Flask!'
if __name__ == '__main__':
app.run(debug=True)
这样我们就可以在浏览器访问到最简单的“Hello, Flask!”了,不过前提是你已经成功安装Flask 。
由于实际工作中可能会涉及更多内容,比如接收不同请求方式参数、数据库的连接和操作、跨域解决等等一系列问题,下面就进行一一解答。
cmd 安装以下指令:
# 安装指令
pip install pymysql
pip install flask
pip install flask_cors
打开Pycharm,在设置中查看Python解析器是否有这几个软件包,如果有则代表安装成功。
新建一个新文件,导入依赖包
# 需要用到的包引入
from pymysql import Connection
from flask import Flask, request, jsonify
from flask_cors import CORS
导入完毕后,开始连接数据库,我在本地已经成功安装MySQL并且创建了test数据库,里面有一张student表格,如果没有安装请自行安装并新建表格。
conn = Connection(
host="localhost",
port=3306,
user="root",
password="123456",
autocommit=True
)
# 获取游标对象
cursor = conn.cursor()
# 选择数据库
conn.select_db("test")
接下来是启动服务和解决跨域问题
# 后端服务启动
app = Flask(__name__)
# 为所有路径启用CORS
CORS(app)
接下来是接口定义部分,直接贴代码吧,其实我注释写得还蛮多,应该能看明白。
# 获取列表数据
@app.route("/student/list", methods=['GET'])
def student_list():
# 获取某个参数,例如 'name'
name = request.args.get('name')
# 构造查询语句
if name:
# 使用LIKE操作符和通配符%来查询包含name的记录
query = "select * from student where name like %s"
# 使用%%作为通配符%的转义,因为%在SQL中是特殊字符
cursor.execute(query, ('%' + name + '%',))
else:
# 如果没有name参数,查询所有记录
query = "select * from student"
cursor.execute(query)
try:
# 获取查询结果
data = cursor.fetchall()
except Exception as e:
# 处理数据库查询异常
print(f"查询异常: {e}")
return jsonify([]), 500
# 将结果转换为字典列表
# 这段代码使用了Python的列表推导式(list comprehension)来将一个数据库查询结果列表转换为包含字典的列表
result = [
{
"id": row[0],
"name": row[1],
"age": row[2]
} for row in data
]
# 返回JSON响应
return jsonify(result)
# 增加一条数据
@app.route("/student", methods=['POST'])
def create_student():
name = request.form.get("name")
age = request.form.get("age")
ids = request.form.get("id") # 这个id其实不应该传,应该是后端定一个规则自动生成的,这里偷懒了
if not name or not age or not ids:
return jsonify({"message": "缺少必填参数"}), 400
query = "insert into student (name, age, id) values (%s, %s, %s)"
try:
cursor.execute(query, (name, age, ids))
return jsonify({"message": f"学生{name}新增成功"}), 200
except Exception as e:
return jsonify({"message": str(e)}), 500
# 删除某条数据
@app.route("/student/<int:student_id>", methods=['DELETE'])
def delete_student(student_id):
query = "delete from student where id=%s"
try:
cursor.execute(query, (student_id,))
return jsonify({"message": f"学生 {student_id} 删除成功"}), 200
except Exception as e:
return jsonify({"message": str(e)}), 500
# 修改某条数据
@app.route("/student/<int:student_id>", methods=['PUT'])
def update_student(student_id):
name = request.form.get("name")
age = request.form.get("age")
if not name and not age:
return jsonify({"message": "缺少必要参数"}), 400
update_query = "update student set "
update_params = []
if name:
update_query += "name=%s, "
update_params.append(name)
if age:
update_query += "age=%s, "
update_params.append(age)
update_query = update_query.rstrip(', ')
query = f"{update_query} where id=%s"
update_params.append(student_id)
# print(query)
# print(update_params)
try:
cursor.execute(query, tuple(update_params))
return jsonify({"message": f"学生{name}信息更新成功"}), 200
except Exception as e:
return jsonify({"message": str(e)}), 500
最后定义运行在哪里以及端口号等
if __name__ == "__main__":
app.run("0.0.0.0", port=9090, debug=True)
conn.close() # 关闭数据库连接
使用postman 测试一下查询的接口:
查询结果正确,很完美!