
File name
Commit message
Commit date
04-24
04-24
04-24
04-24
from flask_restx import Resource, Namespace, fields
from flask import Flask, request
import os
from database.database import DB
import pandas as pd
import jwt
import datetime
paths = os.getcwd()
Action = Namespace(
name="Action",
description="노드 분석을 위해 사용하는 api.",
)
trip_log_model = Action.model('TripLog', {
'trip_id': fields.String(required=True, description='The ID of the trip (64 characters)'),
'trip_distance_m': fields.Float(required=True, description='Total distance traveled in meters'),
'trip_time_s': fields.Float(required=True, description='Total time of the trip in seconds'),
'abrupt_start_count': fields.Integer(required=True, description='Count of abrupt starts'),
'abrupt_stop_count': fields.Integer(required=True, description='Count of abrupt stops'),
'abrupt_acceleration_count': fields.Integer(required=True, description='Count of abrupt accelerations'),
'abrupt_deceleration_count': fields.Integer(required=True, description='Count of abrupt decelerations'),
'helmet_on': fields.Integer(required=True, description='Whether the helmet was worn during the trip, must be 0 or 1 with 1 is the helmet on.'),
'final_score': fields.Float(required=True, description='The final safety score for the trip')
})
history_request_model = Action.model(
'history_request', {
'user_id' : fields.String(required=True, description = 'The user ID that you want to query history')
}
)
@Action.route('/gps_update')
class GPS_update(Resource):
@Action.doc(responses={200: 'Success'})
@Action.doc(responses={401: 'Unauthorized'})
@Action.doc(responses={500: 'Internal Error'})
def post(self):
token = request.headers.get('Authorization')
if not token:
return {'result': 'fail', 'msg': '토큰이 없습니다.'}
try:
# Decode the token to verify it
decoded_token = jwt.decode(token, "secret", algorithms=['HS256'])
user_id = decoded_token['id']
except jwt.ExpiredSignatureError:
return {'result': 'fail', 'msg': '토큰이 만료되었습니다.'}, 401
except jwt.InvalidTokenError:
return {'result': 'fail', 'msg': '유효하지 않은 토큰입니다.'}, 401
db = DB()
data = request.get_json()
if len(data["trip_id"]) !=64:
return {500 :"ERROR! INVALID TRIP_ID!"}
if len(data["trip_log"]["timestamp"]) == 0:
return {500 :"ERROR! 'trip_log' is empty!"}
time_stamp_len = len(data["trip_log"]["timestamp"])
latitude_len = len(data["trip_log"]["latitude"])
longitude_len = len(data["trip_log"]["longitude"])
if time_stamp_len != latitude_len or latitude_len != longitude_len:
return {
500: f"ERROR! Mismatching length of data in trip_log! \n timestamp : {time_stamp_len} \n latitude : {latitude_len} \n longitude : {longitude_len}"
}
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print(data)
df = pd.DataFrame(data["trip_log"])
df["user_id"] = data["user_id"]
df["trip_id"] = data["trip_id"]
columns = df.columns
data_csv_block = df.to_csv(header=False, index=False)
print(f"-------------------------------------")
print(f"recieved trip_id : {df['trip_id'][0]}")
print(f"recieved time : {datetime.datetime.now()}")
# GPS 데이터베이스에 삽입
db.insert_gps_data(data_csv_block, columns)
acc_df = pd.DataFrame({
'trip_id': data["trip_id"],
'user_id': data["user_id"],
'accel_x': data["accel"]["x"],
'accel_y': data["accel"]["y"],
'accel_z': data["accel"]["z"],
'timestamp': data["accel"]["timestamp"]
})
accel_columns_ordered = ['trip_id', 'user_id', 'timestamp', 'accel_x', 'accel_y', 'accel_z']
accel_csv_data = acc_df[accel_columns_ordered].to_csv(header=False, index=False)
db.insert_accel_data(accel_csv_data, accel_columns_ordered)
gyro_df = pd.DataFrame({
'trip_id': data["trip_id"],
'user_id': data["user_id"],
'gyro_x': data["gyro"]["x"],
'gyro_y': data["gyro"]["y"],
'gyro_z': data["gyro"]["z"],
'timestamp': data["gyro"]["timestamp"]
})
gyro_columns_ordered = ['trip_id', 'user_id', 'timestamp', 'gyro_x', 'gyro_y', 'gyro_z']
gyro_csv_data = gyro_df[gyro_columns_ordered].to_csv(header=False, index=False)
db.insert_gyro_data(gyro_csv_data, gyro_columns_ordered)
motion_df = pd.DataFrame({
'trip_id': data["trip_id"],
'user_id': data["user_id"],
'motion_pitch': data["motion"]["pitch"],
'motion_roll': data["motion"]["roll"],
'motion_yaw': data["motion"]["yaw"],
'timestamp': data["motion"]["timestamp"]
})
motion_columns_ordered = ['trip_id', 'user_id', 'timestamp', 'motion_pitch', 'motion_roll', 'motion_yaw']
motion_csv_data = motion_df[motion_columns_ordered].to_csv(header=False, index=False)
db.insert_motion_data(motion_csv_data, motion_columns_ordered)
azimuth_df = pd.DataFrame({
'trip_id': data["trip_id"],
'user_id': data["user_id"],
'timestamp': data["azimuth"]["timestamp"],
'azimuth_heading': data["azimuth"]["heading"]
})
azimuth_columns_ordered = ['trip_id', 'user_id', 'timestamp', 'azimuth_heading']
azimuth_csv_data = azimuth_df[azimuth_columns_ordered].to_csv(header=False, index=False)
db.insert_azimuth_data(azimuth_csv_data, azimuth_columns_ordered)
return {'result': f'success'}
@Action.route('/trip_and_score_update')
class TRIP_insert(Resource):
@Action.expect(trip_log_model)
@Action.doc(responses={200: 'Success'})
@Action.doc(responses={401: 'Unauthorized'})
@Action.doc(responses={500: 'Internal Error'})
def post(self):
token = request.headers.get('Authorization')
# Check if token is provided
if not token:
return {'result': 'fail', 'msg': '토큰이 없습니다.'}, 401
try:
# Decode the token to verify it
decoded_token = jwt.decode(token, "secret", algorithms=['HS256'])
user_id = decoded_token['id']
except jwt.ExpiredSignatureError:
return {'result': 'fail', 'msg': '토큰이 만료되었습니다.'}, 401
except jwt.InvalidTokenError:
return {'result': 'fail', 'msg': '유효하지 않은 토큰입니다.'}, 401
db = DB()
data = request.get_json()
if len(data["trip_id"]) != 64:
return {"result" : "ERROR! INVALID TRIP_ID!"}, 500
trip_id = data["trip_id"]
trip_distance_m = data["total_distance_m"]
trip_time_s = data["total_time_s"]
abrupt_start_count = data["abrupt_start_count"]
abrupt_stop_count = data["abrupt_stop_count"]
abrupt_acceleration_count = data["abrupt_acceleration_count"]
abrupt_deceleration_count = data["abrupt_deceleration_count"]
helmet_on = helmet_on = 1 if data["helmet_on"] == "true" else 0
final_score = data["final_score"]
if (helmet_on != 1) and (helmet_on != 0):
return {"result" : f"ERROR! INVALID 'helmet_on'! \n helmet_on : {helmet_on}"}, 500
db.insert_trip_data(
user_id,
trip_id,
trip_distance_m,
trip_time_s,
abrupt_start_count,
abrupt_stop_count,
abrupt_acceleration_count,
abrupt_deceleration_count,
helmet_on,
final_score
)
return {'result': f'success'}
@Action.route('/get_history')
class Get_history(Resource):
@Action.expect(history_request_model)
@Action.doc(responses={401: 'Unauthorized'})
@Action.doc(responses={500: 'Internal Error'})
def post(self):
token = request.headers.get('Authorization')
# Check if token is provided
if not token:
return {'result': 'fail', 'msg': '토큰이 없습니다.'}, 401
try:
# Decode the token to verify it
decoded_token = jwt.decode(token, "secret", algorithms=['HS256'])
user_id = decoded_token['id']
except jwt.ExpiredSignatureError:
return {'result': 'fail', 'msg': '토큰이 만료되었습니다.'}, 401
except jwt.InvalidTokenError:
return {'result': 'fail', 'msg': '유효하지 않은 토큰입니다.'}, 401
# Interact with the DB to get user history
data = request.get_json()
user_id = data["user_id"]
try:
db = DB()
result, status_code = db.get_history(user_name=user_id)
return {'result': 'success', 'data': result}, status_code
except Exception as e:
print(str(e))
return {'result': 'fail', 'msg': str(e)}, 500
@Action.route('/get_history_main')
class Get_history_main(Resource):
@Action.expect(history_request_model)
@Action.doc(responses={401: 'Unauthorized'})
@Action.doc(responses={500: 'Internal Error'})
def post(self):
token = request.headers.get('Authorization')
if not token:
return {'result': 'fail', 'msg': '토큰이 없습니다.'}, 401
try:
decoded_token = jwt.decode(token, "secret", algorithms=['HS256'])
user_id = decoded_token['id']
except jwt.ExpiredSignatureError:
return {'result': 'fail', 'msg': '토큰이 만료되었습니다.'}, 401
except jwt.InvalidTokenError:
return {'result': 'fail', 'msg': '유효하지 않은 토큰입니다.'}, 401
data = request.get_json()
user_id = data["user_id"]
try:
db = DB()
result, status_code = db.get_history_main(user_name=user_id)
if not result: # `result`가 비어있는 경우
return {'result': 'success', 'data': []}, 200
return {'result': 'success', 'data': result}, status_code
except Exception as e:
print(str(e))
return {'result': 'fail', 'msg': str(e)}, 500
@Action.route('/get_history_by_period')
class GetHistoryByPeriod(Resource):
@Action.expect(history_request_model) # 요청 모델 정의 필요
@Action.doc(responses={401: 'Unauthorized'})
@Action.doc(responses={500: 'Internal Error'})
def post(self):
token = request.headers.get('Authorization')
if not token:
return {'result': 'fail', 'msg': '토큰이 없습니다.'}, 401
try:
decoded_token = jwt.decode(token, "secret", algorithms=['HS256'])
user_id = decoded_token['id']
except jwt.ExpiredSignatureError:
return {'result': 'fail', 'msg': '토큰이 만료되었습니다.'}, 401
except jwt.InvalidTokenError:
return {'result': 'fail', 'msg': '유효하지 않은 토큰입니다.'}, 401
# 요청 데이터 가져오기
data = request.get_json()
start_date = data.get("start_date")
end_date = data.get("end_date")
if not start_date or not end_date:
return {'result': 'fail', 'msg': 'start_date와 end_date가 필요합니다.'}, 400
try:
db = DB()
result, status_code = db.get_history_by_period(user_name=user_id, start_date=start_date, end_date=end_date)
return {'result': 'success', 'data': result}, status_code
except Exception as e:
print(str(e))
return {'result': 'fail', 'msg': str(e)}, 500
@Action.route('/ping')
class Ping(Resource):
def post(self):
return {"result" : 'success', "msg" : "pong!"}, 200