import random import re import regex import time import math import pandas as pd import os import bs4 from datetime import datetime, timedelta from selenium import webdriver from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.service import Service as ChromeService from tqdm import tqdm HEADER = {"User-Agent": "Mozilla/119.0 (Windows NT 10.0; Win64; x64) Chrome/98.0.4758.102"} # Function to remove tags def remove_tags(html): # parse html content soup = bs4.BeautifulSoup(html, "html.parser") for data in soup(['style', 'script']): # Remove tags data.decompose() # return data by retrieving the tag content return ' '.join(soup.stripped_strings) def get_url(keyword, page_num): t_rand = 1702255000000 + random.randint(0,999999) url = f"https://section.cafe.naver.com/ca-fe/home/search/articles?q={keyword}&t={t_rand}&em=1&p={page_num}" return url def get_total_page(url): # Chrome WebDriver 설치 경로 가져오기 chrome_path = ChromeDriverManager().install() # WebDriver 설정 options = webdriver.ChromeOptions() options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option("useAutomationExtension", False) options.add_argument('headless') options.add_argument('window-size=1920x1080') options.add_argument("disable-gpu") # options.add_argument("user-agent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko") # WebDriver 초기화 driver = webdriver.Chrome(service=ChromeService(chrome_path), options=options) driver.get(url) time.sleep(3) total_count = driver.find_element(By.CSS_SELECTOR, ".sub_text") total_count = total_count.text total_page = math.ceil(int(total_count.replace(',',''))/12) return total_page def scrap_agent(url): # Chrome WebDriver 설치 경로 가져오기 chrome_path = ChromeDriverManager().install() # WebDriver 설정 options = webdriver.ChromeOptions() options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option("useAutomationExtension", False) options.add_argument('headless') options.add_argument('window-size=1920x1080') options.add_argument("disable-gpu") # options.add_argument("user-agent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko") # WebDriver 초기화 driver = webdriver.Chrome(service=ChromeService(chrome_path), options=options) driver.get(url) time.sleep(2) single_page_search_result = driver.find_elements(By.CSS_SELECTOR, ".item_list") search_result_plain_html = single_page_search_result[0].get_attribute('innerHTML') formatter = bs4.formatter.HTMLFormatter(indent=1) search_result_plain_html_beautified = bs4.BeautifulSoup(search_result_plain_html, 'html.parser').prettify(formatter=formatter) search_result_contents = regex.findall(r'
(.*?)\n <\/div>', search_result_plain_html_beautified, re.DOTALL) ret = { "url": [], "title": [], "cafe_name": [], "post_date": [], "text": [] } for content in search_result_contents: content = content.replace("\n", "") href = regex.search(r"https:\/\/cafe\.naver\.com\/[\uAC00-\uD7AFa-zA-Z0-9_]+\?iframe_url\=[\uAC00-\uD7AFa-zA-Z0-9\.\%_]+", content, re.DOTALL) title = regex.search(r'(.*?)<\/strong>',content) text = regex.search(r'

(.*?)<\/p>',content) cafe_name = regex.search(r'(.*?)<\/span>',content) post_date = regex.search(r'(.*?)<\/span>',content) href = remove_tags(href.group(0)) title = remove_tags(title.group(1)) text = remove_tags(text.group(1)) cafe_name = remove_tags(cafe_name.group(1)) post_date = remove_tags((post_date.group(1))) ret["url"].append(href) ret["title"].append(title) ret["cafe_name"].append(cafe_name) ret["post_date"].append(post_date) ret["text"].append(text) return ret def merge_dicts(dict_list): # Initialize the result dictionary with empty lists result = { "url": [], "title": [], "cafe_name": [], "post_date": [], "text": [] } # Iterate through each dictionary and merge the lists for d in dict_list: for key in result.keys(): result[key].extend(d.get(key, [])) return result def range_of_ranges(start, end, step): ranges = [] current_start = start while current_start <= end: # Calculate the end of the current range current_end = min(current_start + step - 1, end) # Append the current range as a list ranges.append([current_start, current_end]) # Update the start for the next range current_start += step return ranges if __name__ == "__main__": from joblib import Parallel, delayed keyword = "구미시" url = get_url(keyword, 1) total_page_num = get_total_page(url) for scope in range_of_ranges(1, total_page_num+1, 100): def parallel_scraping(keyword): urls = Parallel(n_jobs=-1)(delayed(get_url)(keyword, i) for i in range(scope[0], scope[1])) results = Parallel(n_jobs=12)(delayed(scrap_agent)(url) for url in urls) return results ret = parallel_scraping(keyword) merged_result = merge_dicts(ret) out_df = pd.DataFrame.from_dict(merged_result) if not os.path.exists(f"cafe/{keyword}"): os.mkdir(f"cafe/{keyword}") out_df.to_csv(f"cafe/{keyword}/{scope[0]}-{scope[1]}.csv", index=False) pass # cafe_url_scrapper()