c-brain-glass/server/app.py

119 行
3.7 KiB
Python

"""
演示 HTTP 服务
- GET /get 普通 GET 请求演示
- POST /post 普通 POST 请求演示
- GET /stream/<n> 流式响应演示逐行返回 n JSON
运行方式
pip install flask
python app.py
默认监听 0.0.0.0:8080局域网内 Android 设备通过 http://<本机IP>:8080 访问
"""
import json
import time
from flask import Flask, request, Response, jsonify
app = Flask(__name__)
PORT = 8080
def cors(response):
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
return response
@app.after_request
def after_request(response):
return cors(response)
# ── GET /get ──────────────────────────────────────────────────────────────────
@app.route("/get", methods=["GET", "OPTIONS"])
def demo_get():
data = {
"code": 0,
"message": "GET 请求成功",
"data": {
"method": "GET",
"url": request.url,
"args": dict(request.args),
"headers": dict(request.headers),
"timestamp": int(time.time()),
},
}
return jsonify(data)
# ── POST /post ────────────────────────────────────────────────────────────────
@app.route("/post", methods=["POST", "OPTIONS"])
def demo_post():
body = {}
raw = ""
try:
body = request.get_json(force=True) or {}
raw = json.dumps(body, ensure_ascii=False)
except Exception:
raw = request.data.decode("utf-8", errors="replace")
data = {
"code": 0,
"message": "POST 请求成功",
"data": {
"method": "POST",
"url": request.url,
"body": body,
"raw": raw,
"headers": dict(request.headers),
"timestamp": int(time.time()),
},
}
return jsonify(data)
# ── GET /stream/<n> ───────────────────────────────────────────────────────────
@app.route("/stream/<int:n>", methods=["GET"])
def demo_stream(n):
"""
逐行输出 n JSON每条之间延迟 500ms模拟流式响应
客户端用 @Streaming + ResponseBody 逐行读取即可
"""
n = min(max(n, 1), 20) # 限制 1~20 条
def generate():
for i in range(n):
line = json.dumps(
{
"index": i + 1,
"total": n,
"message": f"流式数据第 {i + 1}",
"timestamp": int(time.time()),
},
ensure_ascii=False,
)
yield line + "\n"
time.sleep(0.5)
return Response(
generate(),
status=200,
mimetype="application/octet-stream",
headers={"X-Accel-Buffering": "no"},
)
# ── 入口 ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print(f"Demo server running on http://0.0.0.0:{PORT}")
print(" GET http://<本机IP>:8080/get")
print(" POST http://<本机IP>:8080/post")
print(" GET http://<本机IP>:8080/stream/5")
app.run(host="0.0.0.0", port=PORT, debug=False, threaded=True)