윤영준 윤영준 2024-09-06
massive modification, this is the part where DB gets cryptography, extended functionality support for the application
@9d4865e527f227acde6c37eec539578aba157b75
 
.gitignore (added)
+++ .gitignore
@@ -0,0 +1,3 @@
+/database/keys/
+/database/db_config.json
+/database/db_config1.json
action.py
--- action.py
+++ action.py
@@ -18,59 +18,6 @@
 )
 
 
-
-def find_node(gps_address_y,gps_address_x):
-  db=DB()
-  nn_end = None
-  end_delta = float("inf")
-  value=0.0001
-  near_nodes=[]
-  while near_nodes==[]:
-    value=value*10
-    near_nodes=db.db_get_near_node(gps_address_y,gps_address_x,value)
-    
-  for n in near_nodes:
-    e_dist = haversine((gps_address_x,gps_address_y), n)
-    if e_dist < end_delta :
-      nn_end = n
-      end_delta = e_dist
-  return nn_end
-
-
-@Action.route('/image_summit')
-class fileUpload(Resource):
-    @Action.doc(responses={200: 'Success'})
-    @Action.doc(responses={500: 'Register Failed'})
-    def post(self):
-        if request.method == 'POST':
-            current_time = datetime.now()
-            timestamp = current_time.strftime("%Y%m%d%H%M%S")
-        # 시간을 원하는 형식으로 포맷팅하기 (예: 년-월-일 시:분:초)
-            formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
-            # 포맷팅된 현재 시간 출력
-            token = request.headers.get('Authorization')
-            print(token)
-            if not token:
-                return jsonify({'result': 'fail', 'msg': '토큰이 없습니다.'})
-            else:
-            # Decode the token to verify it
-                decoded_token = jwt.decode(token, "secret", algorithms=['HS256'])
-                print(decoded_token)
-                user_id = decoded_token['id']
-            print("현재 시간,저장요청:", formatted_time)
-            f = request.files['file']
-            save_path = f"images/{timestamp}/{user_id}"
-            os.makedirs(save_path, exist_ok=True)
-            new_filename = f"{save_path}/{user_id}_{timestamp}.jpg"
-            print(f)
-            f.save(new_filename)
-            print("저장완료", formatted_time)
-
-            return {
-                'save': 'done'  # str으로 반환하여 return
-            }, 200
-
-
 @Action.route('/gps_update')
 class fileUpload(Resource):
     def post(self):
app.py
--- app.py
+++ app.py
@@ -2,7 +2,6 @@
 from flask_restx import Api
 from flask_cors import CORS 
 from auth import Auth
-from trip import Trip
 from action import Action
 from flask_caching import Cache
 import json
@@ -21,8 +20,6 @@
           license="")
 
 print("Api Started")
-api.add_namespace(Trip, '/trip')
-print("Api Add Trip")
 
 api.add_namespace(Auth, '/auth')
 print("Api Add Auth")
@@ -30,5 +27,5 @@
 api.add_namespace(Action, '/action')
 
 if __name__ == "__main__":
-    app.run(debug=True, host='192.168.0.195', port=15857)
+    app.run(debug=True, host='0.0.0.0', port=15857)
     print("Flask Start")
auth.py
--- auth.py
+++ auth.py
@@ -27,6 +27,16 @@
 
 })
 
+
+get_phone_number = Auth.inherit('get a phone number of an user', {
+    'id' : fields.String(description="user id", required=True)
+})
+
+
+get_email = Auth.inherit('get an email of an user', {
+    'id' : fields.String(description="user id", required=True)
+})
+
 user_fields_register = Auth.inherit('User reigster', user_fields, {
     'password': fields.String(description='Password', required=True),'email': fields.String(description='email', required=True),'user_sex': fields.String(description='sex', required=True),'phone': fields.String(description='phone', required=True)
 
@@ -54,58 +64,105 @@
 
 
 
+
 @Auth.route('/register')
 class AuthRegister(Resource):
     @Auth.expect(user_fields_register)
     @Auth.doc(responses={200: 'Success'})
     @Auth.doc(responses={500: 'Register Failed'})
     def post(self):
-        db=DB()
-        id_ = request.json['id']
-        password = request.json['password']
-        user_email = request.json['email']
-        sex = request.json['sex']
-        phone = request.json['phone']
-        pw_has = hashlib.sha256(password.encode('utf-8')).hexdigest()
-        db_check_duplicate = db.db_check_id(id_)
-        if db_check_duplicate != None:
-            return {
-                "message" : "Register Failed : duplicate ID"
-            }, 500
-        db_login_success_flag=db.db_login(id_,password)
-        if db_login_success_flag != None:
-            return {
-                "message": "Register Failed"
-            }, 500
+        user_manager = DB()
+        # Extract data from the request
+        data = request.json
+        id_ = data['id']
+        password = data['password']
+        user_email = data['email']
+        sex = data['user_sex']
+        phone = data['phone']
+
+        # Prepare data for registration
+        user_data = {
+            'username': id_,
+            'password': password,
+            'email': user_email,
+            'sex': sex,
+            'phone': phone
+        }
+
+        # Call the register_user method from the UserManager instance
+        result, status_code = user_manager.register_user(user_data)
+
+        # Return the appropriate response based on the result from UserManager
+        if status_code == 200:
+            return result, 200
         else:
-            db.db_add_id(id_,pw_has,user_email,sex,phone)
-            return {
-                'Authorization': id_  # str으로 반환하여 return
-            }, 200
+            return result, 500
+
+@Auth.route('/retrive_phone_number')
+class AuthRegister(Resource):
+    @Auth.expect(get_phone_number)
+    @Auth.doc(responses={200: 'Success'})
+    @Auth.doc(responses={500: 'Register Failed'})
+    def post(self):
+        user_manager = DB()
+        data = request.json
+        id_ = data['id']
+        query_input = {
+            "username" : id_
+        }
+        result, status_code = user_manager.get_phone_number(query_input)
+
+        if status_code == 200:
+            return result, 200
+        else:
+            return result, 500
+
+
+@Auth.route('/retrive_email')
+class AuthRegister(Resource):
+    @Auth.expect(get_email)
+    @Auth.doc(responses={200: 'Success'})
+    @Auth.doc(responses={500: 'Register Failed'})
+    def post(self):
+        user_manager = DB()
+        data = request.json
+        id_ = data['id']
+        query_input = {
+            "username" : id_
+        }
+        result, status_code = user_manager.get_email(query_input)
+
+        if status_code == 200:
+            return result, 200
+        else:
+            return result, 500
+
+
 
 @Auth.route('/login')
 class AuthLogin(Resource):
     @Auth.expect(user_fields_auth)
-    @Auth.doc(responses={200: 'Success'})
-    @Auth.doc(responses={404: 'User Not Found'})
-    @Auth.doc(responses={500: 'Auth Failed'})
+    @Auth.doc(responses={200: 'Login Successful'})
+    @Auth.doc(responses={401: 'Unauthorized'})
+    @Auth.doc(responses={500: 'Login Failed'})
     def post(self):
-        db=DB()
-        id = request.json['id']
-        password = request.json['password']
-        #CRITICAL ... WHY? WHY?
-        # ... hashing should be handled at client, not server... 
-        pw_hash = hashlib.sha256(password.encode('utf-8')).hexdigest()
-        result = db.db_login(id,pw_hash)
-        if result is not None:
-            payload = {
-                'id' : id,
-                'exp' : datetime.datetime.utcnow() + datetime.timedelta(days=14)
-            }
-            token = jwt.encode(payload, "secret", algorithm='HS256')
-            return jsonify({'result': 'success', 'token': token})
-        else:
-            return jsonify({'result': 'fail', 'msg': '아이디/비밀번호가 일치하지 않습니다.'})
+        user_manager = DB()
+        # Extract data from the request
+        data = request.json
+        id_ = data['id']
+        password = data['password']
+
+        # Prepare data for authentication
+        user_data = {
+            'username': id_,
+            'password': password
+        }
+
+        # Call the login_user method from the UserManager instance
+        result, status_code = user_manager.login_user(user_data)
+
+        # Return the appropriate response based on the result from UserManager
+        return result, status_code
 
 
 @Auth.route('/secession')
database/database.py
--- database/database.py
+++ database/database.py
@@ -1,216 +1,221 @@
 import psycopg2 # driver 임포트
-import time
+import json
+import bcrypt
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+from cryptography.hazmat.primitives import padding
+from cryptography.hazmat.backends import default_backend
+import re
+import os
 from io import StringIO
 from datetime import datetime, timedelta
 
+config_file_path = "database/db_config.json"
 
 class DB():
     def __init__(self):
-        self.conn=psycopg2.connect(
-            host='210.180.118.83',
-            dbname='TRAFFICAGNET',
-            user='takensoft',
-            password='tts96314728!@',
-            port='5432',
-            options=f'-c search_path=trafficagent'
-            ) # db에 접속
+        # Load the database configuration from the JSON file
+        self.db_config = self.load_db_config(config_file_path)
+
+        # Initialize database connection
+        self.conn = psycopg2.connect(
+            host=self.db_config['host'],
+            dbname=self.db_config['dbname'],
+            user=self.db_config['user'],
+            password=self.db_config['password'],
+            port=self.db_config['port'],
+            # options=self.db_config['options']
+        )
+        self.schema = self.db_config["schema"]
         self.conn.autocommit=True
-        self.schema = 'trafficagent'
-    '''
-    def __init__(self):
-        self.conn=psycopg2.connect(
-            host='165.229.169.113',
-            dbname='traffic_agent',
-            user='takensoft',
-            password='ts44301236!@',
-            port='5432',
-            options="-c search_path=traffic_agent_v1") # db에 접속
-        self.conn.autocommit=True
-        '''
-    
-        
-    
-    def db_check_id(self,id):
-        cur = self.conn.cursor() # 커서를 생성한다
+        self.cur = self.conn.cursor()
+        # yeah, that double quotation is absolutely needed (to distinguish capital letters)
+        self.cur.execute("SET search_path TO " + f'"{self.schema}"')
+        with open("database/keys/encryption_key2024-09-05_14:27:02", "rb") as f:
+            self.encryption_key = f.read()
 
-        cur.execute(f'''
-        SELECT user_id
-        FROM "{self.schema}".user_id
-        Where user_id = '{id}';
-        ''')
-        result=cur.fetchone()
-        cur.close()
+    def load_db_config(self, config_file_path):
+        """
+        Loads database configuration from a JSON file.
+        """
+        with open(config_file_path, 'r') as config_file:
+            return json.load(config_file)
 
-        return result
+    def encrypt_aes(self, plain_text):
+        iv = os.urandom(16)  # AES block size is 16 bytes
+        cipher = Cipher(algorithms.AES(self.encryption_key), modes.CBC(iv), backend=default_backend())
+        encryptor = cipher.encryptor()
 
-    def db_login(self,id,pw):
-        cur = self.conn.cursor() # 커서를 생성한다
+        # Pad the plaintext to be a multiple of 16 bytes
+        padder = padding.PKCS7(algorithms.AES.block_size).padder()
+        padded_data = padder.update(plain_text.encode('utf-8')) + padder.finalize()
 
-        cur.execute(f'''
-        SELECT user_id, user_pw, user_email, user_sex, user_phone, user_time_stamp
-        FROM "{self.schema}".user_id
-        Where user_id = '{id}' and user_pw='{pw}';
-        ''')
-        result=cur.fetchone()
+        encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
+        return encrypted_data, iv
+
+    def decrypt_aes(self, encrypted_data, iv):
+        cipher = Cipher(algorithms.AES(self.encryption_key), modes.CBC(iv), backend=default_backend())
+        decryptor = cipher.decryptor()
+
+        decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()
+
+        # Remove padding after decryption
+        unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
+        unpadded_data = unpadder.update(decrypted_data) + unpadder.finalize()
+
+        return unpadded_data.decode('utf-8')
+
+    def cleanse_and_validate_input(self, data):
+        """
+        Cleanses input by removing leading/trailing spaces and validates the data.
+        Returns cleansed data and an error message if validation fails.
+        """
+        username = data.get('username', '').strip()
+        password = data.get('password', '').strip()
+        email = data.get('email', '').strip()
+        phone = data.get('phone', '').strip()
+        sex = data.get('sex', '').strip()
+
+        # Validate username
+        if not username:
+            return None, "Username is required."
+        if len(username) > 255:
+            return None, "Username must not exceed 255 characters."
+
+        # Validate password
+        if not password:
+            return None, "Password is required."
+        if len(password) < 8:
+            return None, "Password must be at least 8 characters long."
+
+        # Validate email format
+        if not email or not re.fullmatch(r"[^@]+@[^@]+\.[^@]+", email):
+            return None, "Invalid email address."
+
+        # Validate phone number format
+        if not re.fullmatch(r'010\d{8}', phone):
+            return None, "Phone number must be in the format 010XXXXXXXX where X are digits."
+
+        # Validate sex input
+        if not sex:
+            return None, "Sex is required."
+        if sex not in ['Male', 'Female', 'Non-binary', 'Other']:
+            return None, "Invalid value for sex."
+
+        return {
+            'username': username,
+            'password': password,
+            'email': email,
+            'phone': phone,
+            'sex': sex
+        }, None
+
+    def register_user(self, data):
+        data, error = self.cleanse_and_validate_input(data)
+        if error:
+            return {'status': 'error', 'message': error}, 400
+
+        username = data['username']
+        password = data['password']
+        email = data['email']
+        phone = data['phone']
+        sex = data['sex']
+
+        # Hash the password with bcrypt, which automatically handles the salt
+        hashed_pw = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
+
+        # Encrypt email, phone, and sex with AES
+        encrypted_email, email_iv = self.encrypt_aes(email)
+        encrypted_phone, phone_iv = self.encrypt_aes(phone)
+        encrypted_sex, sex_iv = self.encrypt_aes(sex)
+
+        # Insert the user into the database
+        try:
+            self.cur.execute(f"""
+                INSERT INTO users (username, user_pw, user_email, email_iv, user_phone, phone_iv, user_sex, user_time_stamp)
+                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
+            """, (
+                username,
+                psycopg2.Binary(hashed_pw),
+                psycopg2.Binary(encrypted_email),
+                psycopg2.Binary(email_iv),
+                psycopg2.Binary(encrypted_phone),
+                psycopg2.Binary(phone_iv),
+                psycopg2.Binary(encrypted_sex),
+                datetime.now()  # Correct way to insert current timestamp with timezone
+            )
+                             )
+            self.conn.commit()
+            return {'status': 'success', 'message': f'user {username} registered successfully'}, 200
+        except psycopg2.Error as e:
+            self.conn.rollback()
+            return {'status': 'error', 'message': str(e)}, 400
+
+    def login_user(self, data):
+        username = data.get('username', '').strip()
+        password = data.get('password', '').strip()
+
+        # Validate input
+        if not username or not password:
+            return {'status': 'error', 'message': 'Username and password are required.'}, 400
+
+        # Retrieve the user's hashed password from the database
+        self.cur.execute("SELECT user_pw FROM users WHERE username = %s", (username,))
+        user = self.cur.fetchone()
+
+        if user is None:
+            return {'status': 'error', 'message': 'Invalid username or password'}, 401
+
+        hashed_pw = bytes(user[0])  # Convert the retrieved hashed password to bytes
+
+        # Check if the provided password matches the stored hashed password
+        if bcrypt.checkpw(password.encode('utf-8'), hashed_pw):
+            return {'status': 'success', 'message': 'Logged in successfully'}, 200
+        else:
+            return {'status': 'error', 'message': 'Invalid username or password'}, 401
+
+    def get_phone_number(self, data):
+        username = data.get('username', '').strip()
+
+        if not username:
+            return {'status': 'error', 'message': 'Username is required.'}, 400
+
+        # Retrieve the encrypted phone number and IV from the database
+        self.cur.execute("SELECT user_phone, phone_iv FROM users WHERE username = %s", (username,))
+        user = self.cur.fetchone()
+
+        if user is None:
+            return {'status': 'error', 'message': 'User not found'}, 404
+
+        encrypted_phone, phone_iv = user
+
+        # Decrypt the phone number
+        decrypted_phone = self.decrypt_aes(encrypted_phone, phone_iv)
+
+        return {'status': 'success', 'phone_number': decrypted_phone}, 200
 
 
-        cur.close()
+    def get_email(self, data):
+        username = data.get('username', '').strip()
 
-        return result
+        if not username:
+            return {'status': 'error', 'message': 'Username is required.'}, 400
 
-    def db_add_id(self,user_id,user_pw,user_email,user_sex,user_phone) :
-        cur = self.conn.cursor() # 커서를 생성한다
-        recievingtime=datetime.now()
-        d = recievingtime.isoformat(sep=' ', timespec='milliseconds')
-        # d=recievingtime.strftime("%Y-%m-%d %H:%M:%S.%f")
-        cur.execute(f'''
-        insert into "{self.schema}".user_id (user_id,user_pw,user_email,user_sex,user_phone,user_time_stamp)
-        values ('{user_id}','{user_pw}','{user_email}','{user_sex}','{user_phone}','{d}')
-        ''')
-        
-    def db_add_action(self,user_id,user_pw,user_email,user_sex,user_phone) :
-        cur = self.conn.cursor() # 커서를 생성한다
-        recievingtime=datetime.now()
-        d = recievingtime.isoformat(sep=' ', timespec='milliseconds')
-        # d=recievingtime.strftime("%Y-%m-%d %H:%M:%S.%f")
-        cur.execute(f'''
-        insert into "{self.schema}".user_id (user_id,user_pw,user_email,user_sex,user_phone,user_time_stamp)
-        values ('{user_id}','{user_pw}','{user_email}','{user_sex}','{user_phone}','{d}')
-        ''')
+        # Retrieve the encrypted phone number and IV from the database
+        self.cur.execute("SELECT user_email, email_iv FROM users WHERE username = %s", (username,))
+        user = self.cur.fetchone()
 
+        if user is None:
+            return {'status': 'error', 'message': 'User not found'}, 404
 
-        cur.close()
-    def db_delete_id(self,user_id) :
-        cur = self.conn.cursor() # 커서를 생성한다
-        cur.execute(f'''
-        delete
-        from "{self.schema}".user_id ui
-        where user_id  = '{user_id}'
-        ''')
-        cur.close()
+        encrypted_phone, phone_iv = user
 
-    def db_get_node(self):
-        cur = self.conn.cursor() # 커서를 생성한다
+        # Decrypt the phone number
+        decrypted_phone = self.decrypt_aes(encrypted_phone, phone_iv)
 
-        cur.execute('''
-        select "index",source_x ,source_y,target_x,target_y," dist "
-        from "{self.schema}".node n 
-        where flcass != 'pedstrian'
-        ''')
-        result=cur.fetchall()
-
-        return result
-    
-    def db_get_dest(self,dest1):
-        cur = self.conn.cursor() # 커서를 생성한다
-
-        cur.execute(f'''
-        select j.q,li.q ,li.location_cen_x_4623,li.location_cen_y_4623,li.location_exit_x_4623 ,li.location_exit_y_4623 
-        from "{self.schema}".jibun j, "{self.schema}".location_info li 
-        where j.build_num =li.build_code and (j.q='{dest1}' or li.q='{dest1}' or li.build_name_city like '{dest1}')
-        ''')
-        result=cur.fetchone()
-
-        return (float(result[4]),float(result[5]))
-    
-    def db_get_near_node(self,dest_x,dest_y,value):
-        cur = self.conn.cursor() # 커서를 생성한다
-
-        cur.execute(f'''
-        select source_x, source_y 
-        from "{self.schema}".node n 
-        where {dest_x} > source_y  - {value} and {dest_x} <= source_y  + {value}
-        and {dest_y} > source_x  - {value} and {dest_y} <= source_x + {value}
-
-        ''')
-        result=cur.fetchall()
-        return result
-
-    def db_get_address(self,dest1):
-        cur = self.conn.cursor() # 커서를 생성한다
-
-        cur.execute(f'''
-        select j.q,li.q ,li.location_cen_x_4623,li.location_cen_y_4623,li.location_exit_x_4623 ,li.location_exit_y_4623 
-        from "{self.schema}".jibun j, "{self.schema}".location_info li 
-        where j.build_num =li.build_code and (j.q='{dest1}' or li.q='{dest1}' or li.build_name_city like '{dest1}')
-        ''')
-        result=cur.fetchone()
-        print( (float(result[2]),float(result[3])))
-
-        return (float(result[2]),float(result[3]))
-    
-    def db_add_report(self,report_id,report_x,report_y) :
-        cur = self.conn.cursor() # 커서를 생성한다
-        now=time.localtime()
-        d=time.strftime('%Y-%m-%d %X', now)
-        cur.execute(f'''
-        insert into "{self.schema}".report (report_id,report_x,report_y,timestamp)
-        values ('{report_id}','{report_x}','{report_y}','{d}')
-        ''')
-        
-    def db_get_near_point(self,dest_x,dest_y):
-        cur = self.conn.cursor() # 커서를 생성한다
-        now=datetime.now() 
-        d_plus=now +timedelta(hours=1)
-        d_plus=str("'"+d_plus.strftime('%Y-%m-%d %X')+"'")
-        d_minus=now -timedelta(hours=1)
-        d_minus=str("'"+d_minus.strftime('%Y-%m-%d %X')+"'")
-        cur.execute(f'''
-        select report_x, report_y
-        from "{self.schema}".report 
-        where {dest_y} > report_y  - 0.000498 and {dest_y} <= report_y  + 0.000498
-        and {dest_x} > report_x  - 0.000498 and {dest_x} <= report_x + 0.000498
-        and timestamp between {d_minus} and {d_plus};
-
-
-        ''')
-        result=cur.fetchall()
-        return result
-    
-    
-    def db_add_pothole(self,pothole_id,pothole_location_x,pothole_location_y) :
-        cur = self.conn.cursor() # 커서를 생성한다
-        now=datetime.now() 
-        d=now.strftime('%Y-%m-%d %X')
-        cur.execute(f'''
-        insert into "{self.schema}".pothole (pothole_id,pothole_location_x,pothole_location_y,timestamp)
-        values ('{pothole_id}','{pothole_location_x}','{pothole_location_y}','{d}')
-        ''')
-        
-    def db_delete_pothole(self,dest_x,dest_y) :
-        cur = self.conn.cursor() # 커서를 생성한다
-        now=datetime.now() 
-        d_plus=now +timedelta(hours=1)
-        d_plus=str("'"+d_plus.strftime('%Y-%m-%d %X')+"'")
-        d_minus=now -timedelta(hours=1)
-        d_minus=str("'"+d_minus.strftime('%Y-%m-%d %X')+"'")
-        cur.execute(f'''
-        delete from "{self.schema}".pothole 
-        where {dest_y} > pothole_location_y  - 0.000498 and {dest_y} <= pothole_location_y  + 0.000498
-        and {dest_x} > pothole_location_x  - 0.000498 and {dest_x} <= pothole_location_x + 0.000498
-        and timestamp between {d_minus} and {d_plus};
-        ''')
-        
-    def db_display_pothole(self) :
-        cur = self.conn.cursor() # 커서를 생성한다
-        cur.execute(f'''
-        select report_x,report_y  from "{self.schema}".report 
-        ''')
-        result=cur.fetchall()
-        return result
-
-    def insert_gps_data(self, csv_block, columns):
-        cur = self.conn.cursor()
-        data = StringIO(csv_block)
-        
-        # using COPY instead of INSERT to do even less operation per data. 
-        cur.copy_from(data, 'gps_data', sep=',', columns = columns)
-        self.conn.commit()
-        cur.close()
-        return True             
+        return {'status': 'success', 'phone_number': decrypted_phone}, 200
         
     def close_connection(self):
-        cur = self.conn.cursor()
+        cur = self.cur
         cur.close()
         return True
 
 
database/key_gen.py (added)
+++ database/key_gen.py
@@ -0,0 +1,18 @@
+import os
+
+# NEVER be the part of server script, THIS SHOULD NEVER run with server.
+# ALSO, remember to BACKUP the key
+
+def create_and_save_key(key_file_path):
+    """
+    Generates a new AES encryption key and saves it to a file.
+    """
+    key = os.urandom(32)  # AES-256 requires a 32-byte key
+    with open(key_file_path, 'wb') as key_file:
+        key_file.write(key)
+    print(f"Encryption key created and saved to {key_file_path}")
+    return key
+
+if __name__ == "__main__":
+    from datetime import datetime
+    create_and_save_key(f"keys/encryption_key{datetime.now().strftime('%Y-%m-%d_%H:%M:%S')}")(파일 끝에 줄바꿈 문자 없음)
 
requirements.txt (deleted)
--- requirements.txt
@@ -1,41 +0,0 @@
-aniso8601==9.0.1
-attrs==23.2.0
-blinker==1.8.2
-cachelib==0.9.0
-click==8.1.7
-Flask==3.0.3
-Flask-Caching==2.3.0
-flask-restx==1.3.0
-haversine==2.8.1
-imageio==2.34.1
-importlib_resources==6.4.0
-itsdangerous==2.2.0
-Jinja2==3.1.4
-joblib==1.4.2
-jsonschema==4.22.0
-jsonschema-specifications==2023.12.1
-lazy_loader==0.4
-MarkupSafe==2.1.5
-networkx==3.3
-numpy==1.23.3
-opencv-python==4.10.0.82
-packaging==24.1
-pandas==2.2.2
-pillow==10.3.0
-psycopg2-binary==2.9.9
-PyJWT==2.8.0
-python-dateutil==2.9.0.post0
-pytz==2024.1
-referencing==0.35.1
-rpds-py==0.18.1
-scikit-image==0.23.2
-scikit-learn==1.5.0
-scipy==1.13.1
-six==1.16.0
-threadpoolctl==3.5.0
-tifffile==2024.5.22
-torch==1.12.1
-typing_extensions==4.12.2
-tzdata==2024.1
-Werkzeug==3.0.3
-
 
trip.py (deleted)
--- trip.py
@@ -1,78 +0,0 @@
-from flask import request
-from flask_restx import Resource, Api, Namespace, fields
-from navigation_model.model_trip import path_finder
-import time
-import numpy as np
-import networkx
-
-todos = {}
-count = 1
-
-not_in_list=[]
-
-
-#trip = path_finder()
-
-
-
-Trip = Namespace(
-    name="trip",
-    description="경로 노드를 받기위한 사용하는 API.",
-)
-
-trip_fields = Trip.model('Trip', {  # Model 객체 생성
-    'path_start': fields.String(description='a Todo', required=True),'path_end' : fields.String(description='a Todo', required=True)
-})
-
-
-
-@Trip.route('/trip')
-class TripPost(Resource):
-    @Trip.expect(trip_fields)
-    @Trip.response(201, 'Success', trip_fields)
-    def post(self):
-        """경로를 받습니다"""
-        start_time=time.time()
-        start = request.json['path_start']
-        end = request.json['path_end']
-        
-        return {
-            'nodes' : trip.get_trip(start,end),
-            'start_point' : trip.get_dest(start),
-            'end_point' : trip.get_dest(end)
-            
-            
-        }, 201
-@Trip.route('/trip2')
-class TripPost(Resource):
-    @Trip.expect(trip_fields)
-    @Trip.response(201, 'Success', trip_fields)
-    def post(self):
-        """경로를 받습니다"""
-        start_time=time.time()
-        start_x = request.json['dest1_x']
-        start_y = request.json['dest1_y']
-        end = request.json['path_end']
-        return {
-            'nodes' : trip.get_trip_2(start_x,start_y,end,not_in_list),
-            'start_point' : (start_x,start_y),
-            'end_point' : trip.get_dest(end)
-            
-            
-        }, 201
-
-
-@Trip.route('/remove')
-class removenodePost(Resource):
-    def post(self):
-        """경로를 받습니다"""
-        gps_x = request.json['gps_x']
-        gps_y = request.json['gps_y']
-        trip.G.remove_node((float(gps_x),float(gps_y)))
-        not_in_list.append((float(gps_x),float(gps_y)))
-        
-        return {
-            "done" : "done"
-        }, 201
-
-
Add a comment
List