
File name
Commit message
Commit date
import random
import re
import regex
import time
import math
import pandas as pd
import os
import bs4
from datetime import datetime, timedelta
from selenium import webdriver
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service as ChromeService
from tqdm import tqdm
HEADER = {"User-Agent": "Mozilla/119.0 (Windows NT 10.0; Win64; x64) Chrome/98.0.4758.102"}
# Function to remove tags
def remove_tags(html):
# parse html content
soup = bs4.BeautifulSoup(html, "html.parser")
for data in soup(['style', 'script']):
# Remove tags
data.decompose()
# return data by retrieving the tag content
return ' '.join(soup.stripped_strings)
def get_url(keyword, page_num):
t_rand = 1702255000000 + random.randint(0,999999)
url = f"https://section.cafe.naver.com/ca-fe/home/search/articles?q={keyword}&t={t_rand}&em=1&p={page_num}"
return url
def get_total_page(url):
# Chrome WebDriver 설치 경로 가져오기
chrome_path = ChromeDriverManager().install()
# WebDriver 설정
options = webdriver.ChromeOptions()
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
options.add_argument('headless')
options.add_argument('window-size=1920x1080')
options.add_argument("disable-gpu")
# options.add_argument("user-agent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko")
# WebDriver 초기화
driver = webdriver.Chrome(service=ChromeService(chrome_path), options=options)
driver.get(url)
time.sleep(3)
total_count = driver.find_element(By.CSS_SELECTOR, ".sub_text")
total_count = total_count.text
total_page = math.ceil(int(total_count.replace(',',''))/12)
return total_page
def scrap_agent(url):
# Chrome WebDriver 설치 경로 가져오기
chrome_path = ChromeDriverManager().install()
# WebDriver 설정
options = webdriver.ChromeOptions()
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
options.add_argument('headless')
options.add_argument('window-size=1920x1080')
options.add_argument("disable-gpu")
# options.add_argument("user-agent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko")
# WebDriver 초기화
driver = webdriver.Chrome(service=ChromeService(chrome_path), options=options)
driver.get(url)
time.sleep(2)
single_page_search_result = driver.find_elements(By.CSS_SELECTOR, ".item_list")
search_result_plain_html = single_page_search_result[0].get_attribute('innerHTML')
formatter = bs4.formatter.HTMLFormatter(indent=1)
search_result_plain_html_beautified = bs4.BeautifulSoup(search_result_plain_html, 'html.parser').prettify(formatter=formatter)
search_result_contents = regex.findall(r' <div class="article_item_wrap">(.*?)\n <\/div>', search_result_plain_html_beautified, re.DOTALL)
ret = {
"url": [],
"title": [],
"cafe_name": [],
"post_date": [],
"text": []
}
for content in search_result_contents:
content = content.replace("\n", "")
href = regex.search(r"https:\/\/cafe\.naver\.com\/[\uAC00-\uD7AFa-zA-Z0-9_]+\?iframe_url\=[\uAC00-\uD7AFa-zA-Z0-9\.\%_]+", content, re.DOTALL)
title = regex.search(r'<strong class="title">(.*?)<\/strong>',content)
text = regex.search(r'<p class="text">(.*?)<\/p>',content)
cafe_name = regex.search(r'<span class="cafe_name">(.*?)<\/span>',content)
post_date = regex.search(r'<span class="date">(.*?)<\/span>',content)
href = remove_tags(href.group(0))
title = remove_tags(title.group(1))
text = remove_tags(text.group(1))
cafe_name = remove_tags(cafe_name.group(1))
post_date = remove_tags((post_date.group(1)))
ret["url"].append(href)
ret["title"].append(title)
ret["cafe_name"].append(cafe_name)
ret["post_date"].append(post_date)
ret["text"].append(text)
return ret
def merge_dicts(dict_list):
# Initialize the result dictionary with empty lists
result = {
"url": [],
"title": [],
"cafe_name": [],
"post_date": [],
"text": []
}
# Iterate through each dictionary and merge the lists
for d in dict_list:
for key in result.keys():
result[key].extend(d.get(key, []))
return result
def range_of_ranges(start, end, step):
ranges = []
current_start = start
while current_start <= end:
# Calculate the end of the current range
current_end = min(current_start + step - 1, end)
# Append the current range as a list
ranges.append([current_start, current_end])
# Update the start for the next range
current_start += step
return ranges
if __name__ == "__main__":
from joblib import Parallel, delayed
keyword = "구미시"
url = get_url(keyword, 1)
total_page_num = get_total_page(url)
for scope in range_of_ranges(1, total_page_num+1, 100):
def parallel_scraping(keyword):
urls = Parallel(n_jobs=-1)(delayed(get_url)(keyword, i) for i in range(scope[0], scope[1]))
results = Parallel(n_jobs=12)(delayed(scrap_agent)(url) for url in urls)
return results
ret = parallel_scraping(keyword)
merged_result = merge_dicts(ret)
out_df = pd.DataFrame.from_dict(merged_result)
if not os.path.exists(f"cafe/{keyword}"):
os.mkdir(f"cafe/{keyword}")
out_df.to_csv(f"cafe/{keyword}/{scope[0]}-{scope[1]}.csv", index=False)
pass
# cafe_url_scrapper()