Files
sms-remote/app.py
T

74 lines
2.8 KiB
Python

from datetime import timedelta
from flask import Flask, abort, jsonify, make_response, request
from flask_bcrypt import Bcrypt
from flask_jwt_extended import JWTManager, create_access_token, create_refresh_token, get_jwt, get_jwt_identity, jwt_required
import logging
import sqlite3
import sys
import db
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'secret' # TODO change and load from a secrets store (should be over 32 bytes long)
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1)
app.config['JWT_REFRESH_TOKEN_EXPIRES'] = timedelta(days=30)
flask_bcrypt = Bcrypt(app)
jwt = JWTManager(app)
cur = sqlite3.connect("sms.db")
file_handler = logging.FileHandler('log/out.log')
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'))
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
@app.route("/api/v1/login", methods=["POST"])
def login():
access_key = request.json.get("access_key", None)
if access_key is None:
return make_response(jsonify({"msg": "Missing access key"}), 400)
secret_key = request.json.get("secret_key", None)
if secret_key is None:
return make_response(jsonify({"msg": "Missing secret key"}), 400)
res_tuple = cur.execute("SELECT secret_key_hash, type FROM devices WHERE access_key = ?", (access_key,)).fetchone()
if res_tuple is None:
return make_response(jsonify({"msg": "Invalid access key or secret key"}), 401)
(secret_key_hash, type_claim) = res_tuple
if flask_bcrypt.check_password_hash(secret_key_hash, secret_key):
# "typ" claim means device type
extra_claims = {"typ": type_claim}
# generate and return JWT
return make_response(
jsonify({"access_token": create_access_token(identity=access_key, additional_claims=extra_claims),
"refresh_token": create_refresh_token(identity=access_key, additional_claims=extra_claims)}),
200)
else:
return make_response(jsonify({"msg": "Invalid access key or secret key"}), 401)
@app.route("/api/v1/verify-token", methods=["GET"])
@jwt_required()
def verify_token():
return make_response(jsonify(logged_in_as=get_jwt_identity()), 200)
@app.route("/api/v1/refresh-token", methods=["POST"])
@jwt_required(refresh=True)
def refresh_token():
return make_response(jsonify({"access_token": create_access_token(identity=get_jwt_identity(), additional_claims={"typ": get_jwt()["typ"]})}), 200)
@app.route("/api/v1/devices", methods=["GET"])
@jwt_required()
def get_all_devices():
if get_jwt()["typ"] != "PRIMARY":
return make_response(jsonify(msg="Only a PRIMARY device can list all devices"), 403)
return make_response(jsonify([d.to_dict() for d in db.get_all_devices(cur)]), 200)