import re import regex import time import math import pandas as pd import os import bs4 import warnings from bs4 import MarkupResemblesLocatorWarning from datetime import datetime, timedelta from selenium import webdriver from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.service import Service as ChromeService from tqdm import tqdm from joblib import Parallel, delayed from utils.cache_clear import delete_cache # To slience the MarkupResemblesLocatorWarning of bs4, which is annoying since I KNOW EVERYTHING IS HTML warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning, module='bs4') HEADER = {"User-Agent": "Mozilla/119.0 (Windows NT 10.0; Win64; x64) Chrome/98.0.4758.102"} def remove_html(strr): # print(strr) cleaning = regex.sub(r'<.*?>', '', strr.strip().replace("\n",' ')) cleaning = regex.sub(r' +', ' ', cleaning) return cleaning # Function to remove tags def remove_tags(html): # parse html content soup = bs4.BeautifulSoup(html, "html.parser") for data in soup(['style', 'script']): # Remove tags data.decompose() # return data by retrieving the tag content return ' '.join(soup.stripped_strings) def get_url(keyword, start_date, end_date, page_num, by_sim=True): if by_sim: orderBy = "sim" else: orderBy = "recentdate" url = f"https://section.blog.naver.com/Search/Post.naver?pageNo={page_num}&orderBy={orderBy}&startDate={start_date}&endDate={end_date}&keyword={keyword}" return url def get_article_count(plain_html): regex_match_count = regex.search(r'\d*(,\d{3}|d{3})*\b*건\b*<', plain_html) regex_match_count = regex.sub(r'건\b*<', '', regex_match_count.group()) match_count = regex_match_count.replace(',', '') return int(match_count) def get_total_page(url): # Chrome WebDriver 설치 경로 가져오기 chrome_path = ChromeDriverManager().install() # WebDriver 설정 options = webdriver.ChromeOptions() options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option("useAutomationExtension", False) options.add_argument('headless') options.add_argument('window-size=1920x1080') options.add_argument("disable-gpu") # options.add_argument("user-agent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko") # WebDriver 초기화 driver = webdriver.Chrome(service=ChromeService(chrome_path), options=options) driver.get(url) time.sleep(3) plain_html = driver.page_source article_count = get_article_count(plain_html) total_page = math.ceil(int(article_count) / 7) return total_page def scrap_agent(url): # Chrome WebDriver 설치 경로 가져오기 chrome_path = ChromeDriverManager().install() # WebDriver 설정 options = webdriver.ChromeOptions() options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option("useAutomationExtension", False) options.add_argument('--blink-settings=imagesEnabled=false') options.add_argument('headless') options.add_argument('window-size=1920x1080') options.add_argument("disable-gpu") # options.add_argument("user-agent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko") # WebDriver 초기화 driver = webdriver.Chrome(service=ChromeService(chrome_path), options=options) driver.get(url) time.sleep(2) single_page_search_result = driver.find_elements(By.CSS_SELECTOR, ".area_list_search") search_result_plain_html = single_page_search_result[0].get_attribute('innerHTML') formatter = bs4.formatter.HTMLFormatter(indent=1) search_result_plain_html_beautified = bs4.BeautifulSoup(search_result_plain_html, 'html.parser').prettify( formatter=formatter) search_result_contents = regex.findall( r'()?' r'(.*?)' r'', search_result_plain_html_beautified, re.DOTALL ) ret = { "url": [], "title": [], "blog_name": [], "post_date": [], "text": [] } for content in search_result_contents: content = content[1].replace("\n", "") href = regex.search( r"https:\/\/blog\.naver\.com\/[\uAC00-\uD7AFa-zA-Z0-9_-]+\/[\uAC00-\uD7AFa-zA-Z0-9_-]+", content, re.DOTALL) # title is using bit different approach since the search engine highlights for search keywords title = regex.search(r'(.*?)<\/span>', content) text = regex.search(r'(.*?)<\/a>', content) author_name = regex.search(r'(.*?)<\/em>', content) post_date = regex.search(r'(.*?)<\/span>', content) href = href.group(0) title = remove_html(title.group(1)) text = remove_tags(text.group(1)) author_name = remove_tags(author_name.group(1)) post_date = remove_tags((post_date.group(1))) ret["url"].append(href) ret["title"].append(title) ret["blog_name"].append(author_name) ret["post_date"].append(post_date) ret["text"].append(text) delete_cache(driver) driver.close() return ret def merge_dicts(dict_list): # Initialize the result dictionary with empty lists result = { "url": [], "title": [], "blog_name": [], "post_date": [], "text": [] } # Iterate through each dictionary and merge the lists for d in dict_list: for key in result.keys(): result[key].extend(d.get(key, [])) return result def generate_date_range(start_date, end_date, interval): """ :param start_date: start date in datetimestr formatted in %Y-%m-%d :param end_date: end date in datetimestr formatted in %Y-%m-%d :param interval: interval of time in DAYS :return: returns list of time interval that will be feeded into naver_blog_scrapper. for example, [[2023-10-01, 2023-10-07],[2023-10-08, 2023-10-14]] for generate_date_range("2023-10-01", "2023-10-14", 7) Also, this function can handle when the time range of start and end date is not perfectly divisible by interval generate_date_range("2023-10-01", "2023-10-14", 10) will produce output [['2023-10-01', '2023-10-10'], ['2023-10-11', '2023-10-14']] """ # Convert the start and end date strings to datetime objects start = datetime.strptime(start_date, "%Y-%m-%d") end = datetime.strptime(end_date, "%Y-%m-%d") # Generate a range of dates from start to end date_ranges = [] current_date = start while current_date < end: current_end = min(current_date + timedelta(days=interval - 1), end) date_ranges.append([current_date.strftime("%Y-%m-%d"), current_end.strftime("%Y-%m-%d")]) current_date = current_end + timedelta(days=1) return date_ranges def range_of_ranges(start, end, step): ranges = [] current_start = start while current_start <= end: # Calculate the end of the current range current_end = min(current_start + step - 1, end) # Append the current range as a list ranges.append([current_start, current_end]) # Update the start for the next range current_start += step return ranges def naver_blog_scrapper(keyword, start_day, end_day, date_interval, page_step=10, start_page_num=1, browser_thread_count=12): url = get_url(keyword, start_day, end_day, 1) total_page_num = get_total_page(url) print(f"processing : {keyword}, during {start_day} to {end_day}") print(f"total_page_num : {total_page_num}") date_range = generate_date_range(start_day, end_day, date_interval) for i, date_range in enumerate(date_range): url = get_url(keyword, date_range[0], date_range[1], 1) total_page_num = get_total_page(url) task_pool = range_of_ranges(1, total_page_num+1, page_step) for task in task_pool: def parallel_scraping(keyword): urls = Parallel(n_jobs=-1)(delayed(get_url)(keyword, date_range[0], date_range[1], i) for i in range(task[0], task[1])) results = Parallel(n_jobs=browser_thread_count)(delayed(scrap_agent)(url) for url in urls) return results ret = parallel_scraping(keyword) merged_result = merge_dicts(ret) out_df = pd.DataFrame.from_dict(merged_result) if not os.path.exists(f"blog/{keyword}"): os.mkdir(f"blog/{keyword}") out_df.to_csv(f"blog/{keyword}/{date_range[0]}-{date_range[1]}%{task[0]}-{task[1]}.csv", index=False) if __name__ == "__main__": #TODO start_page_num must be not working as intended naver_blog_scrapper("도개면", "2022-01-01", "2023-10-31", 100, 50, 1, 12)