from flask_restx import Resource, Namespace, fields from flask import Flask, request import os from database.database import DB import pandas as pd import jwt paths = os.getcwd() Action = Namespace( name="Action", description="노드 분석을 위해 사용하는 api.", ) trip_log_model = Action.model('TripLog', { 'trip_id': fields.String(required=True, description='The ID of the trip (64 characters)'), 'trip_distance_m': fields.Float(required=True, description='Total distance traveled in meters'), 'trip_time_s': fields.Float(required=True, description='Total time of the trip in seconds'), 'abrupt_start_count': fields.Integer(required=True, description='Count of abrupt starts'), 'abrupt_stop_count': fields.Integer(required=True, description='Count of abrupt stops'), 'abrupt_acceleration_count': fields.Integer(required=True, description='Count of abrupt accelerations'), 'abrupt_deceleration_count': fields.Integer(required=True, description='Count of abrupt decelerations'), 'helmet_on': fields.Integer(required=True, description='Whether the helmet was worn during the trip, must be 0 or 1 with 1 is the helmet on.'), 'final_score': fields.Float(required=True, description='The final safety score for the trip') }) history_request_model = Action.model( 'history_request', { 'user_id' : fields.String(required=True, description = 'The user ID that you want to query history') } ) @Action.route('/gps_update') class GPS_update(Resource): @Action.doc(responses={401: 'Unauthorized'}) @Action.doc(responses={500: 'Internal Error'}) def post(self): token = request.headers.get('Authorization') if not token: return {'result': 'fail', 'msg': '토큰이 없습니다.'} try: # Decode the token to verify it decoded_token = jwt.decode(token, "secret", algorithms=['HS256']) user_id = decoded_token['id'] except jwt.ExpiredSignatureError: return {'result': 'fail', 'msg': '토큰이 만료되었습니다.'}, 401 except jwt.InvalidTokenError: return {'result': 'fail', 'msg': '유효하지 않은 토큰입니다.'}, 401 db = DB() data = request.get_json() if len(data["trip_id"]) !=64: return {500 :"ERROR! INVALID TRIP_ID!"} if len(data["trip_log"]["timestamp"]) == 0: return {500 :"ERROR! 'trip_log' is empty!"} time_stamp_len = len(data["trip_log"]["timestamp"]) latitude_len = len(data["trip_log"]["latitude"]) longitude_len = len(data["trip_log"]["longitude"]) if time_stamp_len != latitude_len or latitude_len != longitude_len: return { 500: f"ERROR! Mismatching length of data in trip_log! \n timestamp : {time_stamp_len} \n latitude : {latitude_len} \n longitude : {longitude_len}" } df = pd.DataFrame(data["trip_log"]) df["user_id"] = data["user_id"] df["trip_id"] = data["trip_id"] columns = df.columns data_csv_block = df.to_csv(header=False, index=False) print(f"recieved : {data}") # GPS 데이터베이스에 삽입 db.insert_gps_data(data_csv_block, columns) return {'result': f'success'} @Action.route('/trip_and_score_update') class TRIP_insert(Resource): @Action.expect(trip_log_model) @Action.doc(responses={200: 'Success'}) @Action.doc(responses={401: 'Unauthorized'}) @Action.doc(responses={500: 'Internal Error'}) def post(self): token = request.headers.get('Authorization') # Check if token is provided if not token: return {'result': 'fail', 'msg': '토큰이 없습니다.'}, 401 try: # Decode the token to verify it decoded_token = jwt.decode(token, "secret", algorithms=['HS256']) user_id = decoded_token['id'] except jwt.ExpiredSignatureError: return {'result': 'fail', 'msg': '토큰이 만료되었습니다.'}, 401 except jwt.InvalidTokenError: return {'result': 'fail', 'msg': '유효하지 않은 토큰입니다.'}, 401 db = DB() data = request.get_json() if len(data["trip_id"]) != 64: return {"result" : "ERROR! INVALID TRIP_ID!"}, 500 trip_id = data["trip_id"] trip_distance_m = data["trip_distance_m"] trip_time_s = data["trip_time_s"] abrupt_start_count = data["abrupt_start_count"] abrupt_stop_count = data["abrupt_stop_count"] abrupt_acceleration_count = data["abrupt_acceleration_count"] abrupt_deceleration_count = data["abrupt_deceleration_count"] helmet_on = int(data["helmet_on"]) final_score = data["final_score"] if (helmet_on != 1) and (helmet_on != 0): return {"result" : f"ERROR! INVALID 'helmet_on'! \n helmet_on : {helmet_on}"}, 500 db.insert_trip_data( user_id, trip_id, trip_distance_m, trip_time_s, abrupt_start_count, abrupt_stop_count, abrupt_acceleration_count, abrupt_deceleration_count, helmet_on, final_score ) return {'result': f'success'} @Action.route('/get_history') class Get_history(Resource): @Action.expect(history_request_model) @Action.doc(responses={401: 'Unauthorized'}) @Action.doc(responses={500: 'Internal Error'}) def post(self): token = request.headers.get('Authorization') # Check if token is provided if not token: return {'result': 'fail', 'msg': '토큰이 없습니다.'}, 401 try: # Decode the token to verify it decoded_token = jwt.decode(token, "secret", algorithms=['HS256']) user_id = decoded_token['id'] except jwt.ExpiredSignatureError: return {'result': 'fail', 'msg': '토큰이 만료되었습니다.'}, 401 except jwt.InvalidTokenError: return {'result': 'fail', 'msg': '유효하지 않은 토큰입니다.'}, 401 # Interact with the DB to get user history try: db = DB() result, status_code = db.get_history(user_name=user_id) return {'result': 'success', 'data': result}, status_code except Exception as e: return {'result': 'fail', 'msg': str(e)}, 500