
File name
Commit message
Commit date
import random
import re
import regex
import time
import math
import pandas as pd
import os
import bs4
import warnings
from bs4 import MarkupResemblesLocatorWarning
from datetime import datetime, timedelta
from selenium import webdriver
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service as ChromeService
from tqdm import tqdm
from joblib import Parallel, delayed
from utils.cache_clear import delete_cache
# To slience the MarkupResemblesLocatorWarning of bs4, which is annoying since I KNOW EVERYTHING IS HTML
warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning, module='bs4')
HEADER = {"User-Agent": "Mozilla/119.0 (Windows NT 10.0; Win64; x64) Chrome/98.0.4758.102"}
def remove_html(strr):
# print(strr)
cleaning = regex.sub(r'<.*?>', '', strr.strip().replace("\n",' '))
cleaning = regex.sub(r' +', ' ', cleaning)
return cleaning
# Function to remove tags
def remove_tags(html):
# parse html content
soup = bs4.BeautifulSoup(html, "html.parser")
for data in soup(['style', 'script']):
# Remove tags
data.decompose()
# return data by retrieving the tag content
return ' '.join(soup.stripped_strings)
def get_url(keyword, start_date, end_date, page_num, by_sim=True):
if by_sim:
orderBy = "sim"
else:
orderBy = "recentdate"
url = f"https://section.blog.naver.com/Search/Post.naver?pageNo={page_num}&rangeType=PERIOD&orderBy={orderBy}&startDate={start_date}&endDate={end_date}&keyword={keyword}"
return url
def get_article_count(plain_html):
regex_match_count = regex.search(r'\d*(,\d{3}|d{3})*\b*건\b*<', plain_html)
regex_match_count = regex.sub(r'건\b*<', '', regex_match_count.group())
match_count = regex_match_count.replace(',', '')
return int(match_count)
def get_total_page(url):
# Chrome WebDriver 설치 경로 가져오기
chrome_path = ChromeDriverManager().install()
# WebDriver 설정
options = webdriver.ChromeOptions()
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
options.add_argument('headless')
options.add_argument('window-size=1920x1080')
options.add_argument("disable-gpu")
# options.add_argument("user-agent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko")
# WebDriver 초기화
driver = webdriver.Chrome(service=ChromeService(chrome_path), options=options)
driver.get(url)
time.sleep(3)
plain_html = driver.page_source
article_count = get_article_count(plain_html)
total_page = math.ceil(int(article_count) / 7)
return total_page
def scrap_agent(url):
# Chrome WebDriver 설치 경로 가져오기
chrome_path = ChromeDriverManager().install()
# WebDriver 설정
options = webdriver.ChromeOptions()
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
options.add_argument('--blink-settings=imagesEnabled=false')
options.add_argument('headless')
options.add_argument('window-size=1920x1080')
options.add_argument("disable-gpu")
# options.add_argument("user-agent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko")
# WebDriver 초기화
driver = webdriver.Chrome(service=ChromeService(chrome_path), options=options)
driver.get(url)
time.sleep(1+2*random.random())
single_page_search_result = driver.find_elements(By.CSS_SELECTOR, ".area_list_search")
search_result_plain_html = single_page_search_result[0].get_attribute('innerHTML')
formatter = bs4.formatter.HTMLFormatter(indent=1)
search_result_plain_html_beautified = bs4.BeautifulSoup(search_result_plain_html, 'html.parser').prettify(
formatter=formatter)
search_result_contents = regex.findall(
r'(<!-- ngRepeat: post in postSearchCtrl.resultList -->)?'
r'(.*?)'
r'<!-- end ngRepeat: post in postSearchCtrl.resultList -->',
search_result_plain_html_beautified,
re.DOTALL
)
ret = {
"url": [],
"title": [],
"blog_name": [],
"post_date": [],
"text": []
}
for content in search_result_contents:
content = content[1].replace("\n", "")
href = regex.search(
r"https:\/\/blog\.naver\.com\/[\uAC00-\uD7AFa-zA-Z0-9_-]+\/[\uAC00-\uD7AFa-zA-Z0-9_-]+",
content, re.DOTALL)
# title is using bit different approach since the search engine highlights for search keywords
title = regex.search(r'<strong class="title_post">(.*?)<\/span>', content)
text = regex.search(r'<!-- ngIf: post.contents -->(.*?)<\/a>', content)
author_name = regex.search(r'<em class="name_author">(.*?)<\/em>', content)
post_date = regex.search(r'<span class="date">(.*?)<\/span>', content)
href = href.group(0)
title = remove_html(title.group(1))
text = remove_tags(text.group(1))
author_name = remove_tags(author_name.group(1))
post_date = remove_tags((post_date.group(1)))
ret["url"].append(href)
ret["title"].append(title)
ret["blog_name"].append(author_name)
ret["post_date"].append(post_date)
ret["text"].append(text)
delete_cache(driver)
driver.close()
return ret
def merge_dicts(dict_list):
# Initialize the result dictionary with empty lists
result = {
"url": [],
"title": [],
"blog_name": [],
"post_date": [],
"text": []
}
# Iterate through each dictionary and merge the lists
for d in dict_list:
for key in result.keys():
result[key].extend(d.get(key, []))
return result
def generate_date_range(start_date, end_date, interval):
"""
:param start_date: start date in datetimestr formatted in %Y-%m-%d
:param end_date: end date in datetimestr formatted in %Y-%m-%d
:param interval: interval of time in DAYS
:return: returns list of time interval that will be feeded into naver_blog_scrapper.
for example, [[2023-10-01, 2023-10-07],[2023-10-08, 2023-10-14]] for generate_date_range("2023-10-01", "2023-10-14", 7)
Also, this function can handle when the time range of start and end date is not perfectly divisible by interval
generate_date_range("2023-10-01", "2023-10-14", 10) will produce output [['2023-10-01', '2023-10-10'], ['2023-10-11', '2023-10-14']]
"""
# Convert the start and end date strings to datetime objects
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
# Generate a range of dates from start to end
date_ranges = []
current_date = start
while current_date < end:
current_end = min(current_date + timedelta(days=interval - 1), end)
date_ranges.append([current_date.strftime("%Y-%m-%d"), current_end.strftime("%Y-%m-%d")])
current_date = current_end + timedelta(days=1)
return date_ranges
def range_of_ranges(start, end, step):
ranges = []
current_start = start
while current_start <= end:
# Calculate the end of the current range
current_end = min(current_start + step - 1, end)
# Append the current range as a list
ranges.append([current_start, current_end])
# Update the start for the next range
current_start += step
return ranges
def naver_blog_scrapper(keyword, start_day, end_day, date_interval, page_step=10, start_page_num=1, browser_thread_count=12):
url = get_url(keyword, start_day, end_day, 1)
total_page_num = get_total_page(url)
print(f"processing : {keyword}, during {start_day} to {end_day}")
print(f"total_page_num : {total_page_num}")
date_range = generate_date_range(start_day, end_day, date_interval)
for i, date_range in enumerate(date_range):
url = get_url(keyword, date_range[0], date_range[1], 1)
total_page_num = get_total_page(url)
task_pool = range_of_ranges(1, total_page_num+1, page_step)
for task in task_pool:
def parallel_scraping(keyword):
urls = Parallel(n_jobs=-1)(delayed(get_url)(keyword, date_range[0], date_range[1], i) for i in range(task[0], task[1]))
results = Parallel(n_jobs=browser_thread_count)(delayed(scrap_agent)(url) for url in urls)
return results
ret = parallel_scraping(keyword)
merged_result = merge_dicts(ret)
out_df = pd.DataFrame.from_dict(merged_result)
if not os.path.exists(f"blog/{keyword}"):
os.mkdir(f"blog/{keyword}")
out_df.to_csv(f"blog/{keyword}/{date_range[0]}-{date_range[1]}%{task[0]}-{task[1]}.csv", index=False)
if __name__ == "__main__":
#TODO start_page_num must be not working as intended
naver_blog_scrapper("구미 송정동", "2023-02-05", "2023-10-31", 100, 50, 1, 12)
naver_blog_scrapper("구미 비산동", "2022-01-01", "2023-10-31", 100, 50, 1, 12)
naver_blog_scrapper("도량동", "2023-02-05", "2023-10-31", 100, 50, 1, 12)
naver_blog_scrapper("공단동", "2022-07-21", "2023-10-31", 100, 50, 1, 12)
naver_blog_scrapper("광평동", "2022-01-01", "2023-10-31", 100, 50, 1, 12)
naver_blog_scrapper("상모사곡동", "2022-01-01", "2023-10-31", 100, 50, 1, 12)
naver_blog_scrapper("임오동", "2022-01-01", "2023-10-31", 100, 50, 1, 12)
naver_blog_scrapper("인동동", "2022-01-01", "2023-10-31", 100, 50, 1, 12)
naver_blog_scrapper("진미동", "2022-01-01", "2023-10-31", 100, 50, 1, 12)
naver_blog_scrapper("양포동", "2022-01-01", "2023-10-31", 100, 50, 1, 12)