iBook/booklist_parse.py

324 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import config
import plistlib
import sqlite3
import os
from collections import defaultdict
class BookListManager:
def __init__(self, plist_path=None, db_path=None):
self.plist_path = plist_path or config.LOCAL_BOOKS_PLIST
self.db_path = db_path or config.LOCAL_LIBRARY_DB
self._booksinfo = None
self._books_open = None
def get_books_info(self):
if self._booksinfo is not None:
return self._booksinfo
booksinfo = defaultdict(dict)
with open(self.plist_path, 'rb') as f:
plist_data = plistlib.load(f)
for book in plist_data.get('Books', []):
bk_id = book.get('BKGeneratedItemId')
if not bk_id:
continue
booksinfo[bk_id] = {
'displayname': book.get('BKDisplayName', ''),
'author': book.get('artistName', ''),
'type': book.get('BKBookType', ''),
'bookid': bk_id,
'itemname': book.get('itemName', ''),
'path': book.get('path', ''),
'date': book.get('BKInsertionDate',''),
'updatedate': book.get('updateDate','')
}
# 统计每本书最近30天每天的阅读时长
try:
from annotationdata import AnnotationManager
import datetime
# 每天最小阅读时长(有笔记)
READ_TIME_DAY = getattr(config, 'READ_TIME_DAY', 60) # 单位:分钟
# 无笔记但当天有打开书籍时的阅读时长
READ_TIME_OPEN_DAY = getattr(config, 'READ_TIME_OPEN_DAY', 30) # 单位:分钟
today = datetime.datetime.now().date()
manager = AnnotationManager()
annotations = manager.get_annotations()
# 获取所有书籍的打开时间ZLASTOPENDATE单位为苹果时间戳
books_open = self.get_books_last_open()
this_year = today.year
for bk_id in booksinfo:
notes = annotations.get(bk_id, {})
day_notes = {}
# 收集每本书所有笔记的创建时间,按天分组
# day_notes: {date对象: [datetime对象, ...]},便于后续统计每天的阅读行为
for uuid, note in notes.items():
raw_date = note.get('creationdate')
try:
dt = datetime.datetime.strptime(raw_date, '%Y-%m-%d %H:%M:%S')
day = dt.date()
if day not in day_notes:
day_notes[day] = []
day_notes[day].append(dt)
except Exception:
pass
# 获取该书的打开时间戳ZLASTOPENDATE用于判断无笔记时是否有打开过书籍
open_info = books_open.get(bk_id, {})
last_open_ts = open_info.get('last_open')
# 生成最近30天的阅读时长列表readtime30d索引0为今天索引29为30天前
readtime30d = []
for i in range(30):
day = today - datetime.timedelta(days=i)
times = day_notes.get(day, [])
# 统计当天阅读时长
if not times:
# 没有笔记,判断当天是否有打开过书籍
opened = False
if last_open_ts:
open_dt = datetime.datetime(2001, 1, 1) + datetime.timedelta(seconds=last_open_ts)
if open_dt.date() == day:
opened = True
# 无笔记但当天有打开书籍阅读时间设为READ_TIME_OPEN_DAY
readtime = READ_TIME_OPEN_DAY if opened else 0
elif len(times) == 1:
# 只有一条笔记,设为最小阅读时长
readtime = READ_TIME_DAY
else:
# 多条笔记统计相邻笔记时间差仅累加小于3小时的部分单位分钟
times_sorted = sorted(times)
total_minutes = 0
for idx in range(1, len(times_sorted)):
delta = (times_sorted[idx] - times_sorted[idx-1]).total_seconds() / 60
# 只统计相邻笔记间隔小于等于180分钟的部分
if 0 < delta <= 180:
total_minutes += int(delta)
# 如果没有有效时间差,则用最小阅读时长
readtime = total_minutes if total_minutes > 0 else READ_TIME_DAY
readtime30d.append(readtime)
# 保存每本书的30天阅读时长列表
booksinfo[bk_id]['readtime30d'] = readtime30d
# 新增:统计今年每月和全年阅读时长(遍历今年每一天,更精确)
# readtime12m: 今年每月阅读时长列表索引0为1月索引11为12月
# readtime_year: 今年总阅读时长(分钟)
readtime12m = [0] * 12
readtime_year = 0
first_day = datetime.date(this_year, 1, 1)
days_in_year = (today - first_day).days + 1
for i in range(days_in_year):
day = first_day + datetime.timedelta(days=i)
times = day_notes.get(day, [])
# 统计当天阅读时长逻辑与readtime30d一致
if not times:
# 无笔记,判断当天是否有打开过书籍
opened = False
if last_open_ts:
open_dt = datetime.datetime(2001, 1, 1) + datetime.timedelta(seconds=last_open_ts)
if open_dt.date() == day:
opened = True
# 无笔记但当天有打开书籍阅读时间设为READ_TIME_OPEN_DAY
readtime = READ_TIME_OPEN_DAY if opened else 0
elif len(times) == 1:
# 只有一条笔记,设为最小阅读时长
readtime = READ_TIME_DAY
else:
# 多条笔记统计相邻笔记时间差仅累加小于3小时的部分单位分钟
times_sorted = sorted(times)
total_minutes = 0
for idx in range(1, len(times_sorted)):
delta = (times_sorted[idx] - times_sorted[idx-1]).total_seconds() / 60
# 只统计相邻笔记间隔小于等于180分钟的部分
if 0 < delta <= 180:
total_minutes += int(delta)
# 如果没有有效时间差,则用最小阅读时长
readtime = total_minutes if total_minutes > 0 else READ_TIME_DAY
# 按月累计到readtime12m
readtime12m[day.month-1] += readtime
# 全年累计到readtime_year
readtime_year += readtime
# 保存到booksinfo
booksinfo[bk_id]['readtime12m'] = readtime12m
booksinfo[bk_id]['readtime_year'] = readtime_year
except Exception as e:
print(f'警告: 统计readtime30d失败: {e}')
self._booksinfo = booksinfo
return booksinfo
def get_books_last_open(self):
if self._books_open is not None:
return self._books_open
books_open = defaultdict(dict)
if not os.path.exists(self.db_path):
return books_open
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(''' SELECT ZASSETID, zlastopendate FROM ZBKLIBRARYASSET WHERE zlastopendate IS NOT NULL ''')
rows = cursor.fetchall()
for row in rows:
asset_id, last_open = row
if asset_id:
books_open[asset_id] = {
'last_open': last_open
}
conn.close()
except Exception as e:
print(f'警告: 读取BKLibrary.sqlite失败: {e}')
self._books_open = books_open
return books_open
def get_total_readtime(self, days=30):
"""
获取最近days天每天所有书籍的总阅读时间分钟返回长度为days的列表。
列表第0项为今天第1项为昨天依次类推。
"""
booksinfo = self.get_books_info()
total = [0] * days
for info in booksinfo.values():
readtime30d = info.get('readtime30d', [])
for i in range(min(days, len(readtime30d))):
total[i] += readtime30d[i]
return total
def get_total_readtime_year(self):
"""
获取全年所有书的累计阅读时间(分钟)。
"""
booksinfo = self.get_books_info()
total = 0
for info in booksinfo.values():
total += info.get('readtime_year', 0)
return total
def get_total_readtime12m(self):
"""
获取全年所有书的月度累计阅读时间长度12的列表单位分钟
"""
booksinfo = self.get_books_info()
total = [0] * 12
for info in booksinfo.values():
readtime12m = info.get('readtime12m', [0]*12)
for i in range(12):
total[i] += readtime12m[i]
return total
# ---------------- 已读完书籍 (本年度) ----------------
def get_finished_books_this_year(self):
"""返回本年度读完的书籍列表 [(asset_id, info, finished_date_ts), ...]
依据 ZISFINISHED=1 且 ZDATEFINISHED 在今年内。
如果 plist 信息缺少显示名则回退 asset_id。
"""
booksinfo = self.get_books_info()
import datetime, sqlite3
year = datetime.datetime.now().year
results = []
if not os.path.exists(self.db_path):
return results
try:
conn = sqlite3.connect(self.db_path)
cur = conn.cursor()
# ZDATEFINISHED: Apple CoreData 时间戳(相对 2001-01-01 秒)
cur.execute("""
SELECT ZASSETID, ZDATEFINISHED, ZISFINISHED
FROM ZBKLIBRARYASSET
WHERE ZISFINISHED=1 AND ZDATEFINISHED IS NOT NULL
""")
rows = cur.fetchall()
conn.close()
# 调试:原始满足完成条件的行数
try:
if getattr(self, '_debug_finished_books', True):
print(f"[debug finished] raw_rows={len(rows)} (ZISFINISHED=1 & ZDATEFINISHED not null)")
except Exception:
pass
except Exception as e:
print(f'警告: 查询已读完书籍失败: {e}')
return results
base = datetime.datetime(2001,1,1)
for asset_id, finished_ts, flag in rows:
try:
if not asset_id or finished_ts is None:
continue
finished_dt = base + datetime.timedelta(seconds=finished_ts)
if finished_dt.year != year:
continue
info = booksinfo.get(asset_id, {})
results.append((asset_id, info, finished_dt))
except Exception:
pass
try:
if getattr(self, '_debug_finished_books', True):
print(f"[debug finished] after year filter={len(results)}, year={year}")
if results:
print("[debug finished] sample asset_ids:", ','.join(r[0] for r in results[:5]))
except Exception:
pass
# 按完成时间倒序
results.sort(key=lambda x: x[2], reverse=True)
return results
if __name__ == '__main__':
manager = BookListManager()
booksinfo = manager.get_books_info()
manager = BookListManager()
booksinfo = manager.get_books_info()
from pprint import pprint
print("\n【前三条示例】")
for k, v in list(booksinfo.items())[:3]:
print(f"{k}:")
pprint(v, sort_dicts=False, indent=2)
print('-' * 60)
print("\n【最近打开时间示例】")
books_open = manager.get_books_last_open()
import datetime
for k, v in list(books_open.items())[:3]:
ts = v['last_open']
dt = datetime.datetime(2001, 1, 1) + datetime.timedelta(seconds=ts)
print(f"{k}: {dt} (timestamp: {ts})")
# 展示最近5天有阅读行为的书籍的readtime30d信息
print("\n【最近5天有阅读行为的书籍的readtime30d信息】")
books_with_recent_reading = []
for book_id, book in booksinfo.items():
readtime30d = book.get('readtime30d', [])
# 最近5天含今天有阅读行为
if len(readtime30d) >= 5 and any(rt > 0 for rt in readtime30d[:5]):
books_with_recent_reading.append((book_id, book))
for book_id, book in books_with_recent_reading:
print(f"书名: {book.get('displayname', book_id)}")
print(f"readtime30d: {book.get('readtime30d', [])}")
print('-' * 60)
# 测试每本书今年每月和年总阅读时长
print("\n【每本书今年每月阅读时长(分钟)和年总阅读时长】")
for k, v in booksinfo.items():
print(f"书名: {v.get('displayname', k)}")
print(f"readtime12m: {v.get('readtime12m', [])}")
print(f"readtime_year: {v.get('readtime_year', 0)} 分钟")
print('-' * 60)
# 测试get_total_readtime天数可自定义
print("\n【最近7天每天所有书籍总阅读时间分钟")
total_readtime7d = manager.get_total_readtime(days=7)
for i, mins in enumerate(total_readtime7d):
if i == 0:
label = "今天"
elif i == 1:
label = "昨天"
else:
label = f"{i}天前"
print(f"{label}: {mins} 分钟")
# 测试全年总阅读时间
print("\n【全年所有书的累计阅读时间(分钟)】")
total_readtime_year = manager.get_total_readtime_year()
print(f"全年总阅读时间: {total_readtime_year} 分钟")
# 测试全年每月累计阅读时间
print("\n【全年所有书的月度累计阅读时间(分钟)】")
total_readtime12m = manager.get_total_readtime12m()
for month, mins in enumerate(total_readtime12m, start=1):
print(f"{month}月: {mins} 分钟")