geo-platform / server /users.py
3v324v23's picture
fix: use OUTPUT_DIR env var for writable path on Render/Docker
5c38f67
import os
import sqlite3
import json
import hmac
import hashlib
import binascii
import time
from pathlib import Path
OUTPUT = Path(os.environ.get('OUTPUT_DIR', str(Path(__file__).resolve().parent.parent / 'output')))
OUTPUT.mkdir(parents=True, exist_ok=True)
DB_PATH = OUTPUT / 'users.db'
def get_conn():
init_db()
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def init_db():
if DB_PATH.exists():
return
conn = sqlite3.connect(str(DB_PATH))
cur = conn.cursor()
cur.execute('''
CREATE TABLE companies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
domain TEXT,
created_at INTEGER
)
''')
cur.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
role TEXT DEFAULT 'user',
company_id INTEGER,
created_at INTEGER
)
''')
cur.execute('''
CREATE TABLE teams (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
company_id INTEGER,
created_at INTEGER
)
''')
conn.commit()
conn.close()
def _hash_password(password: str, salt: bytes = None) -> str:
if salt is None:
salt = os.urandom(16)
dk = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
return binascii.hexlify(salt).decode() + '$' + binascii.hexlify(dk).decode()
def _verify_password(stored: str, password: str) -> bool:
try:
salt_hex, dk_hex = stored.split('$')
salt = binascii.unhexlify(salt_hex)
expected = binascii.unhexlify(dk_hex)
dk = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
return hmac.compare_digest(dk, expected)
except Exception:
return False
def create_company(name: str, domain: str = None):
conn = get_conn()
cur = conn.cursor()
ts = int(time.time())
cur.execute('INSERT INTO companies (name, domain, created_at) VALUES (?,?,?)', (name, domain, ts))
conn.commit()
cid = cur.lastrowid
conn.close()
return cid
def create_user(email: str, password: str, role: str = 'user', company_id: int = None):
conn = get_conn()
cur = conn.cursor()
ph = _hash_password(password)
ts = int(time.time())
try:
cur.execute('INSERT INTO users (email, password_hash, role, company_id, created_at) VALUES (?,?,?,?,?)', (email, ph, role, company_id, ts))
conn.commit()
uid = cur.lastrowid
except sqlite3.IntegrityError:
conn.close()
raise
conn.close()
return uid
def authenticate_user(email: str, password: str):
conn = get_conn()
cur = conn.cursor()
cur.execute('SELECT * FROM users WHERE email=?', (email,))
row = cur.fetchone()
conn.close()
if not row:
return None
if not _verify_password(row['password_hash'], password):
return None
return dict(row)
def list_users():
conn = get_conn()
cur = conn.cursor()
cur.execute('SELECT id,email,role,company_id,created_at FROM users ORDER BY id DESC')
rows = [dict(r) for r in cur.fetchall()]
conn.close()
return rows
# Simple token generation using HMAC-SHA256. Not a full JWT but sufficient for local dev.
_SECRET = os.environ.get('USERS_SECRET', 'dev-secret-please-change')
def make_token(user_id: int, expires_in: int = 3600):
payload = json.dumps({'uid': user_id, 'exp': int(time.time()) + expires_in}, separators=(',',':')).encode('utf-8')
sig = hmac.new(_SECRET.encode('utf-8'), payload, hashlib.sha256).hexdigest()
return binascii.hexlify(payload).decode() + '.' + sig
def verify_token(token: str):
try:
p_hex, sig = token.split('.')
payload = binascii.unhexlify(p_hex)
expected = hmac.new(_SECRET.encode('utf-8'), payload, hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, sig):
return None
obj = json.loads(payload.decode('utf-8'))
if obj.get('exp',0) < int(time.time()):
return None
return obj.get('uid')
except Exception:
return None
def get_user(user_id: int):
conn = get_conn()
cur = conn.cursor()
cur.execute('SELECT id,email,role,company_id,created_at FROM users WHERE id=?', (user_id,))
row = cur.fetchone()
conn.close()
if not row:
return None
return dict(row)