iBook/backup/parseweb.py

309 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

murl = ""
#########################################################
## @file : parseweb.py (refactored)
## @desc : Douban & Amazon book spider (Douban primary)
#########################################################
import requests
import re
import os
import logging
from collections import defaultdict
from html import unescape
from urllib.parse import quote
logger = logging.getLogger()
if not logger.handlers:
logger.addHandler(logging.FileHandler('log'))
logger.setLevel(logging.INFO)
ISDOUBAN = 1
IMGPATH = './downimg'
LINKPREF = 'https://book.douban.com/subject/' if ISDOUBAN else 'https://www.amazon.cn/s?k='
mheaders = {
'Host': 'www.douban.com',
'Referer': 'https://www.douban.com',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0 Safari/537.36'
}
mparams = {}
murl = ''
if ISDOUBAN:
mparams['Host'] = 'www.douban.com'
mparams['search_text'] = 'bkname_xxx'
mparams['cat'] = '1001'
mparams['k'] = 'bookname_xxx'
murl = 'https://search.douban.com/book/subject_search'
else:
mheaders['Host'] = 'www.amazon.cn'
mheaders['Referer'] = 'https://www.amazon.cn'
mparams['Host'] = 'www.amazon.cn'
mparams['k'] = 'bkname_xxx'
mparams['i'] = 'stripbooks'
mparams['__mk_zh_CN='] = '亚马逊网站'
mparams['reg'] = 'nb_sb_noss'
murl = 'https://www.amazon.cn/s'
TEST_BOOKS = [
'24堂财富课','甲骨文','庆余年(精校版)','商君书-中华经典名著全本全注全译丛书','苏世民:我的经验与教训',
'杨伯峻_论语译注','小窗幽记','少年凯歌','投资要义','白鱼解字','历史的巨镜','货币的教训','钱从哪里来',
'中国古代简史','罗马人的故事(套装共15册)','改变心理学的40项研究','如何假装懂音乐','管子(上下册)',
'投资中最简单的事','薛兆丰经济学讲义','枪炮、病菌与钢铁:人类社会的命运','中央帝国的哲学密码','新编说文解字大全集',
'市场的逻辑(增订本)','金融的本质:伯南克四讲美联储','从零开始学写作','中国国家治理的制度逻辑','中国为什么有前途','日本的世界观'
]
class BookInfoSpider:
if ISDOUBAN:
re_bn = re.compile(r'class="nbg.+?sid: (\d+?),.+?title="(.+?)".+?img src="(.+?)"')
re_star = re.compile(r'^<span class="allstar(\d+)"></span>')
re_score = re.compile(r'class="rating_nums">(.+?)<')
re_ratenum = re.compile(r'^<span>\((\d+)人评价\)</span>')
re_author = re.compile(r'class="subject-cast">(.+?)<')
re_description = re.compile(r'^<p>(.+?)(</p>){0,1}$')
else:
re_asin = re.compile(r'^<div data-asin="(.+?)" data-index')
re_img = re.compile(r'^<img src="(.+?)"$')
re_bn = re.compile(r'^alt="(.+?)"$')
re_author = re.compile(r'^<div class=.+auto"><\/span>.+$')
re_rate = re.compile(r'^<span aria-label="(.+?)">$')
re_end = re.compile(r'^<span class="a-letter-space"><\/span><\/div><\/div>')
def _douban_suggest(self, query: str):
"""调用豆瓣 suggest 接口返回列表[{id,title,url,pic}]"""
session = requests.Session()
session.headers.update(mheaders)
# 先访问主页获取 cookies
try:
session.get('https://www.douban.com/', timeout=10)
except:
pass
url = f'https://book.douban.com/j/subject_suggest?q={quote(query)}'
try:
r = session.get(url, timeout=8)
if r.status_code == 200:
return r.json()
except Exception as e:
logger.debug(f"suggest error {query}: {e}")
return []
def _fetch_subject(self, sid: str):
url = f'https://book.douban.com/subject/{sid}/'
try:
r = requests.get(url, headers=mheaders, timeout=10)
if r.status_code == 200:
return r.text
except Exception as e:
logger.debug(f'subject fetch {sid} error: {e}')
return ''
def _extract_description(self, html: str) -> str:
if not html:
return ''
m = re.search(r'<meta property="og:description" content="(.*?)"', html, re.S)
if m:
return unescape(m.group(1).strip())
m = re.search(r'<div class="intro">(.*?)</div>', html, re.S)
if m:
raw = m.group(1)
ps = re.findall(r'<p>(.*?)</p>', raw, re.S)
txt = '\n'.join(unescape(re.sub(r'<.*?>','', p)).strip() for p in ps)
if txt:
return txt
return ''
def grab_book_info_new(self, bookname: str):
if ISDOUBAN != 1:
return None
suggestions = self._douban_suggest(bookname)
if not suggestions:
return None
def score(item):
title = item.get('title','')
if bookname in title:
return 3*len(bookname)/max(len(title),1)
if title in bookname:
return 2*len(title)/max(len(bookname),1)
return len(set(bookname) & set(title)) / max(len(title),1)
suggestions.sort(key=score, reverse=True)
chosen = suggestions[0]
sid = str(chosen.get('id'))
html = self._fetch_subject(sid)
desc = self._extract_description(html)
author=''; publisher=''; publishing=''
info_blk = re.search(r'<div id="info">(.*?)</div>', html, re.S)
if info_blk:
info_txt = re.sub(r'<br\s*/?>','\n', info_blk.group(1))
info_txt = re.sub(r'<.*?>','', info_txt)
lines = [l.strip() for l in info_txt.split('\n') if l.strip()]
for line in lines:
if line.startswith('作者') and not author:
author = line.split(':',1)[-1].strip()
elif line.startswith('出版社') and not publisher:
publisher = line.split(':',1)[-1].strip()
elif re.search(r'出版年', line) and not publishing:
publishing = line.split(':',1)[-1].strip()
bkinfo = defaultdict(dict)
bkinfo[sid]['link'] = f'https://book.douban.com/subject/{sid}'
bkinfo[sid]['bookname'] = chosen.get('title','')
bkinfo[sid]['img'] = chosen.get('pic','')
if desc: bkinfo[sid]['description'] = desc
if author: bkinfo[sid]['author'] = author
if publisher: bkinfo[sid]['publisher'] = publisher
if publishing: bkinfo[sid]['publishing'] = publishing
return [bookname, bkinfo]
def grab_book_info(self, mbkn: str):
if ISDOUBAN==1:
mparams['search_text'] = mbkn
mparams['k'] = mbkn
else:
mparams['k'] = mbkn
session = requests.Session()
session.headers.update(mheaders)
# 先访问主页获取 cookies
try:
session.get('https://www.douban.com/', timeout=10)
except:
pass
try:
r = session.get(murl, params=mparams, timeout=10)
except Exception as e:
logger.debug(f'request error {mbkn}: {e}')
return [mbkn, defaultdict(dict)]
if r.status_code != 200:
logger.debug(f'status {r.status_code} for {mbkn}')
bkinfo = defaultdict(dict)
sid=None; stat=None
resp = r.text
if ISDOUBAN==1:
stat='SID'
for line in resp.split('\n'):
line=line.strip()
if not line: continue
if stat=='SID':
ret=re.search(self.re_bn, line)
if ret:
sid=ret.group(1)
bkinfo[sid]['link']=os.path.join(LINKPREF,sid)
bkinfo[sid]['bookname']=ret.group(2)
bkinfo[sid]['img']=ret.group(3)
stat='STAR'
continue
elif stat=='STAR':
ret=re.search(self.re_star, line)
if ret:
star=ret.group(1)
if star=='00':
stat='AUTHOR'
elif star.isdigit() and int(star)>0:
stat='SCORE'
elif stat=='SCORE':
ret=re.search(self.re_score, line)
if ret:
bkinfo[sid]['score']=ret.group(1)
stat='RATENUM'
continue
elif stat=='RATENUM':
ret=re.search(self.re_ratenum, line)
if ret:
bkinfo[sid]['ratenum']=ret.group(1)
stat='AUTHOR'
continue
elif stat=='AUTHOR':
ret=re.search(self.re_author, line)
if ret:
tt=ret.group(1).split(' / ')
if len(tt)>=3:
*author, bkinfo[sid]['publisher'], bkinfo[sid]['publishing']=tt
bkinfo[sid]['author']='/'.join(author)
else:
bkinfo[sid]['author']=ret.group(1)
stat='DESCRIPTION'
continue
elif stat=='DESCRIPTION':
ret=re.search(self.re_description, line)
if ret:
bkinfo[sid]['description']=ret.group(1).strip()
stat='SID'
continue
else: # AMAZON
stat='ASIN'
for line in resp.split('\n'):
line=line.strip()
if not line: continue
if stat=='ASIN':
ret=re.search(self.re_asin, line)
if ret:
sid=ret.group(1)
bkinfo[sid]['link']=os.path.join(LINKPREF,ret.group(1))
stat='IMG'
continue
elif stat=='IMG':
ret=re.search(self.re_img, line)
if ret:
bkinfo[sid]['img']=ret.group(1)
stat='BOOKNAME'
continue
elif stat=='BOOKNAME':
ret=re.search(self.re_bn, line)
if ret:
bkname=re.split(r'[(\s]',ret.group(1).strip())[0]
bkinfo[sid]['bookname']=bkname
stat='AUTHOR'
continue
elif stat=='AUTHOR':
ret=re.search(self.re_author, line)
if ret:
author=','.join(re.split(r'<span.+?auto">|<\/span', ret.group(0))[3::4])
bkinfo[sid]['author']=author
stat='RATE'
continue
elif stat=='RATE':
ret=re.search(self.re_rate, line)
if ret:
bkinfo[sid]['rate']=ret.group(1).split(' ')[0]
stat='AUTHOR'
continue
if re.search(self.re_end, line):
stat='ASIN'
return [mbkn, bkinfo]
def filter_spide_book(self, mbkinfo):
if not mbkinfo: return None
mbkn=mbkinfo[0]
best=None
for sid,v in mbkinfo[1].items():
bkn=v.get('bookname','')
if not bkn: continue
score=0
if mbkn in bkn: score+=3
if bkn in mbkn: score+=2
score+=len(set(mbkn)&set(bkn))*0.01
if (not best) or score>best[0]:
best=(score,{mbkn:v})
return best[1] if best else None
def down_book_img(self, mbkinfo):
if not mbkinfo: return
if not os.path.exists(IMGPATH): os.mkdir(IMGPATH)
for _,v in mbkinfo.items():
link=v.get('img')
if not link: continue
fname=link.split('/')[-1]
p=os.path.join(IMGPATH,fname)
if os.path.exists(p):
continue
try:
img=requests.get(link, headers=mheaders, timeout=10)
if img.status_code==200:
with open(p,'wb') as fp:
fp.write(img.content)
except Exception as e:
logger.debug(f'download img error {link}: {e}')