""" 演示 HTTP 服务 - GET /get 普通 GET 请求演示 - POST /post 普通 POST 请求演示 - GET /stream/ 流式响应演示(逐行返回 n 条 JSON) 运行方式: pip install flask python app.py 默认监听 0.0.0.0:8080,局域网内 Android 设备通过 http://<本机IP>:8080 访问 """ import json import time from flask import Flask, request, Response, jsonify app = Flask(__name__) PORT = 8080 def cors(response): response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization" response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" return response @app.after_request def after_request(response): return cors(response) # ── GET /get ────────────────────────────────────────────────────────────────── @app.route("/get", methods=["GET", "OPTIONS"]) def demo_get(): data = { "code": 0, "message": "GET 请求成功", "data": { "method": "GET", "url": request.url, "args": dict(request.args), "headers": dict(request.headers), "timestamp": int(time.time()), }, } return jsonify(data) # ── POST /post ──────────────────────────────────────────────────────────────── @app.route("/post", methods=["POST", "OPTIONS"]) def demo_post(): body = {} raw = "" try: body = request.get_json(force=True) or {} raw = json.dumps(body, ensure_ascii=False) except Exception: raw = request.data.decode("utf-8", errors="replace") data = { "code": 0, "message": "POST 请求成功", "data": { "method": "POST", "url": request.url, "body": body, "raw": raw, "headers": dict(request.headers), "timestamp": int(time.time()), }, } return jsonify(data) # ── GET /stream/ ─────────────────────────────────────────────────────────── @app.route("/stream/", methods=["GET"]) def demo_stream(n): """ 逐行输出 n 条 JSON,每条之间延迟 500ms,模拟流式响应。 客户端用 @Streaming + ResponseBody 逐行读取即可。 """ n = min(max(n, 1), 20) # 限制 1~20 条 def generate(): for i in range(n): line = json.dumps( { "index": i + 1, "total": n, "message": f"流式数据第 {i + 1} 条", "timestamp": int(time.time()), }, ensure_ascii=False, ) yield line + "\n" time.sleep(0.5) return Response( generate(), status=200, mimetype="application/octet-stream", headers={"X-Accel-Buffering": "no"}, ) # ── 入口 ────────────────────────────────────────────────────────────────────── if __name__ == "__main__": print(f"Demo server running on http://0.0.0.0:{PORT}") print(" GET http://<本机IP>:8080/get") print(" POST http://<本机IP>:8080/post") print(" GET http://<本机IP>:8080/stream/5") app.run(host="0.0.0.0", port=PORT, debug=False, threaded=True)