chore: move backend to separate dir

This commit is contained in:
2026-05-19 00:02:31 +02:00
parent 0414ee9e4e
commit db2290ba14
6 changed files with 4 additions and 0 deletions
+135
View File
@@ -0,0 +1,135 @@
from datetime import timedelta
from flask import Flask, abort, jsonify, make_response, request
from flask_bcrypt import Bcrypt
from flask_jwt_extended import JWTManager, create_access_token, create_refresh_token, get_jwt, get_jwt_identity, jwt_required
import logging
import sqlite3
import sys
import db
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'secret' # TODO change and load from a secrets store (should be over 32 bytes long)
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1)
app.config['JWT_REFRESH_TOKEN_EXPIRES'] = timedelta(days=30)
flask_bcrypt = Bcrypt(app)
jwt = JWTManager(app)
cur = sqlite3.connect("sms.db")
file_handler = logging.FileHandler('log/out.log')
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'))
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
msg_403_not_primary = "Only a PRIMARY device can perform this action"
msg_403_not_secondary = "Only a SECONDARY device can perform this action"
@app.route("/api/v1/login", methods=["POST"])
def login():
access_key = request.json.get("access_key", None)
if access_key is None:
return make_response(jsonify({"msg": "Missing access key"}), 400)
secret_key = request.json.get("secret_key", None)
if secret_key is None:
return make_response(jsonify({"msg": "Missing secret key"}), 400)
res_tuple = cur.execute("SELECT secret_key_hash, type FROM devices WHERE access_key = ?", (access_key,)).fetchone()
if res_tuple is None:
return make_response(jsonify({"msg": "Invalid access key or secret key"}), 401)
(secret_key_hash, type_claim) = res_tuple
if flask_bcrypt.check_password_hash(secret_key_hash, secret_key):
# "typ" claim means device type
extra_claims = {"typ": type_claim}
# generate and return JWT
return make_response(
jsonify({"access_token": create_access_token(identity=access_key, additional_claims=extra_claims),
"refresh_token": create_refresh_token(identity=access_key, additional_claims=extra_claims)}),
200)
else:
return make_response(jsonify({"msg": "Invalid access key or secret key"}), 401)
@app.route("/api/v1/verify-token", methods=["GET"])
@jwt_required()
def verify_token():
return make_response(jsonify(logged_in_as=get_jwt_identity()), 200)
@app.route("/api/v1/refresh-token", methods=["POST"])
@jwt_required(refresh=True)
def refresh_token():
return make_response(jsonify({"access_token": create_access_token(identity=get_jwt_identity(), additional_claims={"typ": get_jwt()["typ"]})}), 200)
@app.route("/api/v1/devices", methods=["GET"])
@jwt_required()
def get_all_devices():
if get_jwt()["typ"] != "PRIMARY":
return make_response(jsonify(msg=msg_403_not_primary), 403)
return make_response(jsonify([d.to_dict() for d in db.get_all_devices(cur)]), 200)
@app.route("/api/v1/sim-cards", methods=["GET"])
@jwt_required()
def get_sim_cards_by_device():
if not is_primary(get_jwt()):
return make_response(jsonify(msg=msg_403_not_primary), 403)
access_key = request.args.get('access_key', None)
return make_response(jsonify([s.to_dict() for s in db.get_sim_cards_by_device(cur, access_key)]), 200)
@app.route("/api/v1/sms-messages", methods=["GET"])
@jwt_required()
def get_sms_messages_by_local_phone_number():
if not is_primary(get_jwt()):
return make_response(jsonify(msg=msg_403_not_primary), 403)
local_phone_number = request.args.get("local_phone_number", None)
return make_response(jsonify([n.to_dict() for n in db.get_sms_messages_by_local_phone_number(cur, local_phone_number)]), 200)
@app.route("/api/v1/send-message", methods=["POST"])
@jwt_required()
def send_sms_message():
if not is_primary(get_jwt()):
return make_response(jsonify(msg=msg_403_not_primary), 403)
content = request.json.get("content", None)
access_key = get_jwt_identity()
local_phone_number = request.json.get("local_phone_number", None)
remote_phone_number = request.json.get("remote_phone_number", None)
app.logger.debug(f"sending msg: content='{content}', access_key='{access_key}', local_num='{local_phone_number}', remote_num='{remote_phone_number}'")
if db.send_sms_message(cur, content, access_key, local_phone_number, remote_phone_number):
return make_response(jsonify(msg="Message successfully sent"), 200)
else:
return make_response(jsonify(msg="Failed to send message"), 400)
@app.route("/api/v1/get-queued-sms-messages", methods=["GET"])
@jwt_required()
def get_queued_sms_messages():
if not is_secondary(get_jwt()):
return make_response(jsonify(msg=msg_403_not_secondary), 403)
access_key = get_jwt_identity()
local_phone_number = request.json.get("local_phone_number", None)
return make_response(jsonify([m.to_dict() for m in db.get_queued_sms_messages(cur, local_phone_number)]), 200)
def is_primary(jwt):
return jwt["typ"] == "PRIMARY"
def is_secondary(jwt):
return jwt["typ"] == "SECONDARY"
+58
View File
@@ -0,0 +1,58 @@
from dto import Device, SimCard, SmsMessage, QueuedSmsMessage
def get_all_devices(cur) -> list[Device]:
res = cur.execute("SELECT access_key, type, name FROM devices")
devices_from_db = res.fetchall()
devices = []
for item in devices_from_db:
devices.append(Device.convert(item))
return devices
def get_sim_cards_by_device(cur, access_key: str) -> list[SimCard]:
if access_key is None or not access_key:
return []
sim_cards_from_db = cur.execute("SELECT phone_number, device_access_key FROM sim_cards WHERE device_access_key = ?", (access_key,)).fetchall()
sim_cards = []
for item in sim_cards_from_db:
sim_cards.append(SimCard.convert(item))
return sim_cards
def get_sms_messages_by_local_phone_number(cur, local_phone_number: str) -> list[SmsMessage]:
if local_phone_number is None or not local_phone_number:
return []
msgs_from_db = cur.execute("SELECT * FROM messages WHERE local_phone_number = ?", (local_phone_number,)).fetchall()
msgs = []
for item in msgs_from_db:
msgs.append(SmsMessage.convert(item))
return msgs
def send_sms_message(cur, content: str, sender_access_key: str, local_phone_number: str, remote_phone_number: str) -> bool:
if content is None or not content \
or sender_access_key is None or not sender_access_key \
or local_phone_number is None or not local_phone_number \
or remote_phone_number is None or not remote_phone_number:
return False
cur.execute("INSERT INTO message_queue VALUES (?, ?, ?, ?)", \
(content, sender_access_key, local_phone_number, remote_phone_number))
cur.commit()
return True
def get_queued_sms_messages(cur, local_phone_number) -> list[QueuedSmsMessage]:
if local_phone_number is None or not local_phone_number:
return []
msgs_from_db = cur.execute("DELETE FROM message_queue WHERE local_phone_number = ? RETURNING *", (local_phone_number,)).fetchall()
cur.commit()
msgs = []
for item in msgs_from_db:
msgs.append(QueuedSmsMessage.convert(item))
return msgs
+80
View File
@@ -0,0 +1,80 @@
from dataclasses import dataclass
@dataclass
class Device:
access_key: str
device_type: str
name: str
def to_dict(self):
return {
'access_key': self.access_key,
'type': self.device_type,
'name': self.name
}
def convert(device_from_db) -> Device:
return Device(*device_from_db)
@dataclass
class SimCard:
phone_number: str
device_access_key: str
def to_dict(self):
return {
'phone_number': self.phone_number,
'device_access_key': self.device_access_key
}
def convert(sim_from_db) -> SimCard:
return SimCard(*sim_from_db)
@dataclass
class SmsMessage:
content: str
ts_received: int
ts_sent: int
msg_type: str
local_phone_number: str
remote_phone_number: str
def to_dict(self):
return {
'content': self.content,
'ts_received': self.ts_received,
'ts_sent': self.ts_sent,
'msg_type': self.msg_type,
'local_phone_number': self.local_phone_number,
'remote_phone_number': self.remote_phone_number
}
def convert(sms_from_db) -> SmsMessage:
return SmsMessage(*sms_from_db)
@dataclass
class QueuedSmsMessage:
content: str
sender_access_key: str
local_phone_number: str
remote_phone_number: str
def to_dict(self):
return {
'content': self.content,
'sender_access_key': self.sender_access_key,
'local_phone_number': self.local_phone_number,
'remote_phone_number': self.remote_phone_number
}
def convert(sms_from_db) -> QueuedSmsMessage:
return QueuedSmsMessage(*sms_from_db)
+48
View File
@@ -0,0 +1,48 @@
from flask_bcrypt import Bcrypt
import os
import sqlite3
if os.path.isfile("sms.db"):
os.unlink("sms.db")
flask_bcrypt = Bcrypt()
con = sqlite3.connect("sms.db")
cur = con.cursor()
# ID columns are not necessary - SQLite by default sets ROWID as INTEGER PRIMARY KEY which auto increments
cur.execute("CREATE TABLE devices(access_key, secret_key_hash, type, name)")
# type : INCOMING, OUTGOING
# local_phone_number : a SECONDARY device SIM's phone number
# remote_phone_number : a phone number or shortcode of the other party
# TODO add column for sender's access key
cur.execute("CREATE TABLE messages(content, ts_received, ts_sent, type, local_phone_number, remote_phone_number)")
cur.execute("CREATE TABLE sim_events(sim_id, ts, note, cost, currency)")
cur.execute("CREATE TABLE sim_cards(phone_number, device_access_key)")
cur.execute("CREATE TABLE message_queue(content, sender_access_key, local_phone_number, remote_phone_number)")
pw1_hash = flask_bcrypt.generate_password_hash('pw1').decode('utf-8')
pw2_hash = flask_bcrypt.generate_password_hash('pw2').decode('utf-8')
cur.execute("""
INSERT INTO devices VALUES
('test_access_key', ?, 'PRIMARY', 'test-primary'),
('test_access_key2', ?, 'SECONDARY', 'test-secondary')
""", (pw1_hash, pw2_hash))
cur.execute("""
INSERT INTO sim_cards VALUES
('+420123456789', 'test_access_key'),
('+421000111222', 'test_access_key'),
('+422999888777', 'test_access_key2')
""")
cur.execute("""
INSERT INTO messages VALUES
('how are you?', 1000, 999, 'INCOMING', '+420123456789', '+10005558888'),
('i am fine', 2000, 1999, 'OUTGOING', '+420123456789', '+10005558888')
""")
con.commit()
con.close()
Executable
+2
View File
@@ -0,0 +1,2 @@
uv run gunicorn app:app \
--reload