import psycopg2 # driver 임포트 import json import bcrypt from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives import padding from cryptography.hazmat.backends import default_backend import re import os from io import StringIO from datetime import datetime, timedelta config_file_path = "database/db_config.json" class DB(): def __init__(self): # Load the database configuration from the JSON file self.db_config = self.load_db_config(config_file_path) # Initialize database connection self.conn = psycopg2.connect( host=self.db_config['host'], dbname=self.db_config['dbname'], user=self.db_config['user'], password=self.db_config['password'], port=self.db_config['port'], # options=self.db_config['options'] ) self.schema = self.db_config["schema"] self.conn.autocommit=True self.cur = self.conn.cursor() # yeah, that double quotation is absolutely needed (to distinguish capital letters) self.cur.execute("SET search_path TO " + f'"{self.schema}"') with open("database/keys/encryption_key2025-04-03_14:33:09", "rb") as f: self.encryption_key = f.read() def load_db_config(self, config_file_path): """ Loads database configuration from a JSON file. """ with open(config_file_path, 'r') as config_file: return json.load(config_file) def encrypt_aes(self, plain_text): iv = os.urandom(16) # AES block size is 16 bytes cipher = Cipher(algorithms.AES(self.encryption_key), modes.CBC(iv), backend=default_backend()) encryptor = cipher.encryptor() # Pad the plaintext to be a multiple of 16 bytes padder = padding.PKCS7(algorithms.AES.block_size).padder() padded_data = padder.update(plain_text.encode('utf-8')) + padder.finalize() encrypted_data = encryptor.update(padded_data) + encryptor.finalize() return encrypted_data, iv def decrypt_aes(self, encrypted_data, iv): cipher = Cipher(algorithms.AES(self.encryption_key), modes.CBC(iv), backend=default_backend()) decryptor = cipher.decryptor() decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize() # Remove padding after decryption unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() unpadded_data = unpadder.update(decrypted_data) + unpadder.finalize() return unpadded_data.decode('utf-8') def cleanse_and_validate_input(self, data): """ Cleanses input by removing leading/trailing spaces and validates the data. Returns cleansed data and an error message if validation fails. """ username = data.get('username', '').strip() password = data.get('password', '').strip() email = data.get('email', '').strip() phone = data.get('phone', '').strip() phone = phone.replace("-","") sex = data.get('sex', '').strip() # Validate username if not username: return None, "Username is required." if len(username) > 26: return None, "Username must not exceed 26 characters." # Validate password if not password: return None, "Password is required." if len(password) < 8: return None, "Password must be at least 8 characters long." # Validate email format if not email or not re.fullmatch(r"[^@]+@[^@]+\.[^@]+", email): return None, "Invalid email address." # Validate phone number format if not re.fullmatch(r'010\d{8}', phone): return None, "Phone number must be in the format 010XXXXXXXX where X are digits." # Validate sex input # if not sex: # return None, "Sex is required." # if sex not in ['Male', 'Female', 'Non-binary', 'Other']: # return None, "Invalid value for sex." sex = "WHATEVER" return { 'username': username, 'password': password, 'email': email, 'phone': phone, 'sex': sex }, None def register_user(self, data): data, error = self.cleanse_and_validate_input(data) if error: return {'status': 'error', 'message': error}, 400 username = data['username'] password = data['password'] email = data['email'] phone = data['phone'] sex = data['sex'] # Hash the password with bcrypt, which automatically handles the salt hashed_pw = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) # Encrypt email, phone, and sex with AES encrypted_email, email_iv = self.encrypt_aes(email) encrypted_phone, phone_iv = self.encrypt_aes(phone) encrypted_sex, sex_iv = self.encrypt_aes(sex) # Insert the user into the database try: self.cur.execute(f""" INSERT INTO users (username, user_pw, user_email, email_iv, user_phone, phone_iv, user_time_stamp) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( username, psycopg2.Binary(hashed_pw), psycopg2.Binary(encrypted_email), psycopg2.Binary(email_iv), psycopg2.Binary(encrypted_phone), psycopg2.Binary(phone_iv), datetime.now() # Correct way to insert current timestamp with timezone ) ) self.conn.commit() return {'status': 'success', 'message': f'user {username} registered successfully'}, 200 except psycopg2.Error as e: self.conn.rollback() return {'status': 'error', 'message': str(e)}, 400 def login_user(self, data): username = data.get('username', '').strip() password = data.get('password', '').strip() # Validate input if not username or not password: return {'status': 'error', 'message': 'Username and password are required.'}, 400 # Retrieve the user's hashed password from the database self.cur.execute("SELECT user_pw FROM users WHERE username = %s", (username,)) user = self.cur.fetchone() if user is None: return {'status': 'error', 'message': 'Invalid username or password'}, 401 hashed_pw = bytes(user[0]) # Convert the retrieved hashed password to bytes # Check if the provided password matches the stored hashed password if bcrypt.checkpw(password.encode('utf-8'), hashed_pw): return {'status': 'success', 'message': 'Logged in successfully'}, 200 else: return {'status': 'error', 'message': 'Invalid username or password'}, 401 def get_phone_number(self, data): username = data.get('username', '').strip() if not username: return {'status': 'error', 'message': 'Username is required.'}, 400 # Retrieve the encrypted phone number and IV from the database self.cur.execute("SELECT user_phone, phone_iv FROM users WHERE username = %s", (username,)) user = self.cur.fetchone() if user is None: return {'status': 'error', 'message': 'User not found'}, 404 encrypted_phone, phone_iv = user # Decrypt the phone number decrypted_phone = self.decrypt_aes(encrypted_phone, phone_iv) return {'status': 'success', 'phone_number': decrypted_phone}, 200 def get_email(self, data): username = data.get('username', '').strip() if not username: return {'status': 'error', 'message': 'Username is required.'}, 400 # Retrieve the encrypted phone number and IV from the database self.cur.execute("SELECT user_email, email_iv FROM users WHERE username = %s", (username,)) user = self.cur.fetchone() if user is None: return {'status': 'error', 'message': 'User not found'}, 404 encrypted_phone, phone_iv = user # Decrypt the phone number decrypted_phone = self.decrypt_aes(encrypted_phone, phone_iv) return {'status': 'success', 'phone_number': decrypted_phone}, 200 def insert_gps_data(self, csv_block, columns): cur = self.conn.cursor() data = StringIO(csv_block) # using COPY instead of INSERT to do even less operation per data. cur.copy_from(data, 'gps_data', sep=',', columns=columns) self.conn.commit() cur.close() return True def insert_accel_data(self, csv_block, columns): cur = self.conn.cursor() try: data = StringIO(csv_block) # Create StringIO internally cur.copy_from(data, f'accel_data', sep=',', columns=columns) self.conn.commit() success = True except Exception as e: self.conn.rollback() print(f"DB Error inserting Accel: {e}") success = False finally: if cur: cur.close() return success def insert_gyro_data(self, csv_block, columns): cur = self.conn.cursor() try: data = StringIO(csv_block) # Create StringIO internally cur.copy_from(data, f'gyro_data', sep=',', columns=columns) self.conn.commit() success = True except Exception as e: self.conn.rollback() print(f"DB Error inserting Gyro: {e}") success = False finally: if cur: cur.close() return success def insert_motion_data(self, csv_block, columns): cur = self.conn.cursor() try: data = StringIO(csv_block) # Create StringIO internally cur.copy_from(data, f'motion_data', sep=',', columns=columns) self.conn.commit() success = True except Exception as e: self.conn.rollback() print(f"DB Error inserting Motion: {e}") success = False finally: if cur: cur.close() return success def insert_azimuth_data(self, csv_block, columns): cur = self.conn.cursor() try: data = StringIO(csv_block) # Create StringIO internally cur.copy_from(data, f'azimuth_data', sep=',', columns=columns) self.conn.commit() success = True except Exception as e: self.conn.rollback() print(f"DB Error inserting Azimuth: {e}") success = False finally: if cur: cur.close() return success def insert_trip_data( self, username, trip_id, total_distance_m, total_time_s, abrupt_start_count, abrupt_stop_count, abrupt_acceleration_count, abrupt_deceleration_count, helmet_on, final_score ): self.cur.execute(f""" INSERT INTO trip_log (user_id, trip_id, timestamp, total_distance_m, total_time_s, abrupt_start_count, abrupt_stop_count, abrupt_acceleration_count, abrupt_deceleration_count, helmet_on, final_score) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( username, trip_id, datetime.now(), total_distance_m, total_time_s, abrupt_start_count, abrupt_stop_count, abrupt_acceleration_count, abrupt_deceleration_count, helmet_on, final_score ) ) def db_delete_id(self,user_id) : cur = self.conn.cursor() cur.execute(f''' delete from "{self.schema}".user_id ui where user_id = '{user_id}' ''') cur.close() def get_history(self, user_name): """ Retrieves all trip logs for the specified user within the last month and returns them in JSON format. [ { "trip_id": "trip_001", "timestamp": "2024-09-01 12:45:00", "total_distance_m": 1000.5, "total_time_s": 600, "abrupt_start_count": 3, "abrupt_stop_count": 2, "abrupt_acceleration_count": 1, "abrupt_deceleration_count": 1, "helmet_on": true, "final_score": 85.5 }, { "trip_id": "trip_002", "timestamp": "2024-09-02 14:30:00", "total_distance_m": 1500.0, "total_time_s": 720, "abrupt_start_count": 2, "abrupt_stop_count": 3, "abrupt_acceleration_count": 1, "abrupt_deceleration_count": 2, "turn_noslow_cnt":3 "helmet_on": false, "final_score": 90.0 } ] """ try: # Execute the query to retrieve logs within the last month self.cur.execute(""" SELECT trip_id, timestamp, total_distance_m, total_time_s, abrupt_start_count, abrupt_stop_count, abrupt_acceleration_count, abrupt_deceleration_count, helmet_on, turn_noslow_cnt, final_score FROM trip_log WHERE user_id = %s AND timestamp >= NOW() - INTERVAL '1 month' """, (user_name,)) # Fetch all results rows = self.cur.fetchall() # Format the results into a list of dictionaries result = [] for row in rows: trip_log = { "trip_id": row[0], "timestamp": row[1].strftime("%Y-%m-%d %H:%M:%S"), # Format timestamp "total_distance_m": row[2], "total_time_s": row[3], "abrupt_start_count": row[4], "abrupt_stop_count": row[5], "abrupt_acceleration_count": row[6], "abrupt_deceleration_count": row[7], "helmet_on": bool(row[8]), # Convert INT to Boolean "turn_noslow_cnt" : row[9], "final_score": row[10] } result.append(trip_log) # Convert the result list to JSON format return json.dumps(result), 200 except psycopg2.Error as e: self.conn.rollback() return {'status': 'error', 'message': str(e)}, 500 def get_history_main(self, user_name): try: # 최신 기록 1건 조회 self.cur.execute(""" SELECT trip_id, timestamp, total_distance_m, total_time_s, abrupt_start_count, abrupt_stop_count, abrupt_acceleration_count, abrupt_deceleration_count, helmet_on, final_score FROM trip_log WHERE user_id = %s ORDER BY timestamp DESC LIMIT 1 """, (user_name,)) row = self.cur.fetchone() if not row: return json.dumps({"message": "No records found", "data": []}), 200 latest_record = { "trip_id": row[0], "timestamp": row[1].strftime("%Y-%m-%d %H:%M:%S"), "total_distance_m": row[2], "total_time_s": row[3], "abrupt_start_count": row[4], "abrupt_stop_count": row[5], "abrupt_acceleration_count": row[6], "abrupt_deceleration_count": row[7], "helmet_on": bool(row[8]), "turn_noslow_cnt": row[9], "final_score": row[10] } # 전체 기록 수 조회 self.cur.execute("SELECT COUNT(*) FROM trip_log WHERE user_id = %s", (user_name,)) total_count = self.cur.fetchone()[0] # final_score가 95점 이상인 기록 수 조회 self.cur.execute("SELECT COUNT(*) FROM trip_log WHERE user_id = %s AND final_score >= 95", (user_name,)) count_above_95 = self.cur.fetchone()[0] # 평균 final_score 조회 self.cur.execute("SELECT AVG(final_score) FROM trip_log WHERE user_id = %s", (user_name,)) avg_final_score = self.cur.fetchone()[0] avg_final_score = round(avg_final_score, 2) # 응답 데이터 구성 response_data = { "latest_record": latest_record, "total_count": total_count, "count_above_95": count_above_95, "average_final_score": avg_final_score } return json.dumps(response_data), 200 except psycopg2.Error as e: self.conn.rollback() return {'status': 'error', 'message': str(e)}, 500 def get_history_by_period(self, user_name, start_date, end_date): try: # Execute the query to retrieve logs within the specified period self.cur.execute(""" SELECT trip_id, timestamp, total_distance_m, total_time_s, abrupt_start_count, abrupt_stop_count, abrupt_acceleration_count, abrupt_deceleration_count, helmet_on, turn_noslow_cnt, final_score FROM trip_log WHERE user_id = %s AND timestamp BETWEEN %s AND %s """, (user_name, start_date, end_date)) # Fetch all results rows = self.cur.fetchall() # Format the results into a list of dictionaries result = [] final_scores = [] daily_scores = {} for row in rows: trip_log = { "trip_id": row[0], "timestamp": row[1].strftime("%Y-%m-%d %H:%M:%S"), # Format timestamp "total_distance_m": row[2], "total_time_s": row[3], "abrupt_start_count": row[4], "abrupt_stop_count": row[5], "abrupt_acceleration_count": row[6], "abrupt_deceleration_count": row[7], "helmet_on": bool(row[8]), # Convert INT to Boolean "turn_noslow_cnt" : row[9], "final_score": row[10] } result.append(trip_log) final_scores.append(row[10]) # 날짜별 평균 계산을 위한 데이터 정리 date_key = row[1].strftime("%Y-%m-%d") if date_key not in daily_scores: daily_scores[date_key] = [] daily_scores[date_key].append(row[10]) # 전체 기간의 final_score 평균 계산 overall_avg_score = round(sum(final_scores) / len(final_scores), 2) if final_scores else 0 # start_date와 end_date를 datetime 객체로 변환 (이미 datetime이라면 그대로 사용) if isinstance(start_date, str): start_dt = datetime.strptime(start_date, "%Y-%m-%d") else: start_dt = start_date if isinstance(end_date, str): end_dt = datetime.strptime(end_date, "%Y-%m-%d") else: end_dt = end_date # start_date부터 end_date까지 모든 날짜에 대해 평균 점수를 계산 (데이터가 없으면 0) daily_avg_scores = {} current_dt = start_dt while current_dt <= end_dt: day_key = current_dt.day # 예: 1, 2, 3 등 full_date_key = current_dt.strftime("%Y-%m-%d") scores = daily_scores.get(full_date_key, []) daily_avg_scores[day_key] = round(sum(scores) / len(scores), 2) if scores else 0 current_dt += timedelta(days=1) # Convert the result list to JSON format response_data = { "history": result, "overall_avg_score": overall_avg_score, "daily_avg_scores": daily_avg_scores } return json.dumps(response_data), 200 except psycopg2.Error as e: self.conn.rollback() return {'status': 'error', 'message': str(e)}, 500 def close_connection(self): cur = self.cur cur.close() return True