324 lines
15 KiB
Python
324 lines
15 KiB
Python
|
||
import config
|
||
import plistlib
|
||
import sqlite3
|
||
import os
|
||
from collections import defaultdict
|
||
|
||
class BookListManager:
|
||
|
||
def __init__(self, plist_path=None, db_path=None):
|
||
self.plist_path = plist_path or config.LOCAL_BOOKS_PLIST
|
||
self.db_path = db_path or config.LOCAL_LIBRARY_DB
|
||
self._booksinfo = None
|
||
self._books_open = None
|
||
|
||
def get_books_info(self):
|
||
if self._booksinfo is not None:
|
||
return self._booksinfo
|
||
booksinfo = defaultdict(dict)
|
||
with open(self.plist_path, 'rb') as f:
|
||
plist_data = plistlib.load(f)
|
||
for book in plist_data.get('Books', []):
|
||
bk_id = book.get('BKGeneratedItemId')
|
||
if not bk_id:
|
||
continue
|
||
booksinfo[bk_id] = {
|
||
'displayname': book.get('BKDisplayName', ''),
|
||
'author': book.get('artistName', ''),
|
||
'type': book.get('BKBookType', ''),
|
||
'bookid': bk_id,
|
||
'itemname': book.get('itemName', ''),
|
||
'path': book.get('path', ''),
|
||
'date': book.get('BKInsertionDate',''),
|
||
'updatedate': book.get('updateDate','')
|
||
}
|
||
# 统计每本书最近30天每天的阅读时长
|
||
try:
|
||
from annotationdata import AnnotationManager
|
||
import datetime
|
||
# 每天最小阅读时长(有笔记)
|
||
READ_TIME_DAY = getattr(config, 'READ_TIME_DAY', 60) # 单位:分钟
|
||
# 无笔记但当天有打开书籍时的阅读时长
|
||
READ_TIME_OPEN_DAY = getattr(config, 'READ_TIME_OPEN_DAY', 30) # 单位:分钟
|
||
today = datetime.datetime.now().date()
|
||
manager = AnnotationManager()
|
||
annotations = manager.get_annotations()
|
||
# 获取所有书籍的打开时间(ZLASTOPENDATE),单位为苹果时间戳
|
||
books_open = self.get_books_last_open()
|
||
this_year = today.year
|
||
for bk_id in booksinfo:
|
||
notes = annotations.get(bk_id, {})
|
||
day_notes = {}
|
||
# 收集每本书所有笔记的创建时间,按天分组
|
||
# day_notes: {date对象: [datetime对象, ...]},便于后续统计每天的阅读行为
|
||
for uuid, note in notes.items():
|
||
raw_date = note.get('creationdate')
|
||
try:
|
||
dt = datetime.datetime.strptime(raw_date, '%Y-%m-%d %H:%M:%S')
|
||
day = dt.date()
|
||
if day not in day_notes:
|
||
day_notes[day] = []
|
||
day_notes[day].append(dt)
|
||
except Exception:
|
||
pass
|
||
# 获取该书的打开时间戳(ZLASTOPENDATE),用于判断无笔记时是否有打开过书籍
|
||
open_info = books_open.get(bk_id, {})
|
||
last_open_ts = open_info.get('last_open')
|
||
# 生成最近30天的阅读时长列表(readtime30d),索引0为今天,索引29为30天前
|
||
readtime30d = []
|
||
for i in range(30):
|
||
day = today - datetime.timedelta(days=i)
|
||
times = day_notes.get(day, [])
|
||
# 统计当天阅读时长
|
||
if not times:
|
||
# 没有笔记,判断当天是否有打开过书籍
|
||
opened = False
|
||
if last_open_ts:
|
||
open_dt = datetime.datetime(2001, 1, 1) + datetime.timedelta(seconds=last_open_ts)
|
||
if open_dt.date() == day:
|
||
opened = True
|
||
# 无笔记但当天有打开书籍,阅读时间设为READ_TIME_OPEN_DAY
|
||
readtime = READ_TIME_OPEN_DAY if opened else 0
|
||
elif len(times) == 1:
|
||
# 只有一条笔记,设为最小阅读时长
|
||
readtime = READ_TIME_DAY
|
||
else:
|
||
# 多条笔记,统计相邻笔记时间差(仅累加小于3小时的部分,单位分钟)
|
||
times_sorted = sorted(times)
|
||
total_minutes = 0
|
||
for idx in range(1, len(times_sorted)):
|
||
delta = (times_sorted[idx] - times_sorted[idx-1]).total_seconds() / 60
|
||
# 只统计相邻笔记间隔小于等于180分钟的部分
|
||
if 0 < delta <= 180:
|
||
total_minutes += int(delta)
|
||
# 如果没有有效时间差,则用最小阅读时长
|
||
readtime = total_minutes if total_minutes > 0 else READ_TIME_DAY
|
||
readtime30d.append(readtime)
|
||
# 保存每本书的30天阅读时长列表
|
||
booksinfo[bk_id]['readtime30d'] = readtime30d
|
||
|
||
# 新增:统计今年每月和全年阅读时长(遍历今年每一天,更精确)
|
||
# readtime12m: 今年每月阅读时长列表(索引0为1月,索引11为12月)
|
||
# readtime_year: 今年总阅读时长(分钟)
|
||
readtime12m = [0] * 12
|
||
readtime_year = 0
|
||
first_day = datetime.date(this_year, 1, 1)
|
||
days_in_year = (today - first_day).days + 1
|
||
for i in range(days_in_year):
|
||
day = first_day + datetime.timedelta(days=i)
|
||
times = day_notes.get(day, [])
|
||
# 统计当天阅读时长,逻辑与readtime30d一致
|
||
if not times:
|
||
# 无笔记,判断当天是否有打开过书籍
|
||
opened = False
|
||
if last_open_ts:
|
||
open_dt = datetime.datetime(2001, 1, 1) + datetime.timedelta(seconds=last_open_ts)
|
||
if open_dt.date() == day:
|
||
opened = True
|
||
# 无笔记但当天有打开书籍,阅读时间设为READ_TIME_OPEN_DAY
|
||
readtime = READ_TIME_OPEN_DAY if opened else 0
|
||
elif len(times) == 1:
|
||
# 只有一条笔记,设为最小阅读时长
|
||
readtime = READ_TIME_DAY
|
||
else:
|
||
# 多条笔记,统计相邻笔记时间差(仅累加小于3小时的部分,单位分钟)
|
||
times_sorted = sorted(times)
|
||
total_minutes = 0
|
||
for idx in range(1, len(times_sorted)):
|
||
delta = (times_sorted[idx] - times_sorted[idx-1]).total_seconds() / 60
|
||
# 只统计相邻笔记间隔小于等于180分钟的部分
|
||
if 0 < delta <= 180:
|
||
total_minutes += int(delta)
|
||
# 如果没有有效时间差,则用最小阅读时长
|
||
readtime = total_minutes if total_minutes > 0 else READ_TIME_DAY
|
||
# 按月累计到readtime12m
|
||
readtime12m[day.month-1] += readtime
|
||
# 全年累计到readtime_year
|
||
readtime_year += readtime
|
||
# 保存到booksinfo
|
||
booksinfo[bk_id]['readtime12m'] = readtime12m
|
||
booksinfo[bk_id]['readtime_year'] = readtime_year
|
||
except Exception as e:
|
||
print(f'警告: 统计readtime30d失败: {e}')
|
||
self._booksinfo = booksinfo
|
||
return booksinfo
|
||
|
||
def get_books_last_open(self):
|
||
if self._books_open is not None:
|
||
return self._books_open
|
||
books_open = defaultdict(dict)
|
||
if not os.path.exists(self.db_path):
|
||
return books_open
|
||
try:
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
cursor.execute(''' SELECT ZASSETID, zlastopendate FROM ZBKLIBRARYASSET WHERE zlastopendate IS NOT NULL ''')
|
||
rows = cursor.fetchall()
|
||
for row in rows:
|
||
asset_id, last_open = row
|
||
if asset_id:
|
||
books_open[asset_id] = {
|
||
'last_open': last_open
|
||
}
|
||
conn.close()
|
||
except Exception as e:
|
||
print(f'警告: 读取BKLibrary.sqlite失败: {e}')
|
||
self._books_open = books_open
|
||
return books_open
|
||
|
||
def get_total_readtime(self, days=30):
|
||
"""
|
||
获取最近days天每天所有书籍的总阅读时间(分钟),返回长度为days的列表。
|
||
列表第0项为今天,第1项为昨天,依次类推。
|
||
"""
|
||
booksinfo = self.get_books_info()
|
||
total = [0] * days
|
||
for info in booksinfo.values():
|
||
readtime30d = info.get('readtime30d', [])
|
||
for i in range(min(days, len(readtime30d))):
|
||
total[i] += readtime30d[i]
|
||
return total
|
||
|
||
def get_total_readtime_year(self):
|
||
"""
|
||
获取全年所有书的累计阅读时间(分钟)。
|
||
"""
|
||
booksinfo = self.get_books_info()
|
||
total = 0
|
||
for info in booksinfo.values():
|
||
total += info.get('readtime_year', 0)
|
||
return total
|
||
|
||
def get_total_readtime12m(self):
|
||
"""
|
||
获取全年所有书的月度累计阅读时间(长度12的列表,单位:分钟)。
|
||
"""
|
||
booksinfo = self.get_books_info()
|
||
total = [0] * 12
|
||
for info in booksinfo.values():
|
||
readtime12m = info.get('readtime12m', [0]*12)
|
||
for i in range(12):
|
||
total[i] += readtime12m[i]
|
||
return total
|
||
|
||
# ---------------- 已读完书籍 (本年度) ----------------
|
||
def get_finished_books_this_year(self):
|
||
"""返回本年度读完的书籍列表 [(asset_id, info, finished_date_ts), ...]
|
||
|
||
依据 ZISFINISHED=1 且 ZDATEFINISHED 在今年内。
|
||
如果 plist 信息缺少显示名则回退 asset_id。
|
||
"""
|
||
booksinfo = self.get_books_info()
|
||
import datetime, sqlite3
|
||
year = datetime.datetime.now().year
|
||
results = []
|
||
if not os.path.exists(self.db_path):
|
||
return results
|
||
try:
|
||
conn = sqlite3.connect(self.db_path)
|
||
cur = conn.cursor()
|
||
# ZDATEFINISHED: Apple CoreData 时间戳(相对 2001-01-01 秒)
|
||
cur.execute("""
|
||
SELECT ZASSETID, ZDATEFINISHED, ZISFINISHED
|
||
FROM ZBKLIBRARYASSET
|
||
WHERE ZISFINISHED=1 AND ZDATEFINISHED IS NOT NULL
|
||
""")
|
||
rows = cur.fetchall()
|
||
conn.close()
|
||
# 调试:原始满足完成条件的行数
|
||
try:
|
||
if getattr(self, '_debug_finished_books', True):
|
||
print(f"[debug finished] raw_rows={len(rows)} (ZISFINISHED=1 & ZDATEFINISHED not null)")
|
||
except Exception:
|
||
pass
|
||
except Exception as e:
|
||
print(f'警告: 查询已读完书籍失败: {e}')
|
||
return results
|
||
base = datetime.datetime(2001,1,1)
|
||
for asset_id, finished_ts, flag in rows:
|
||
try:
|
||
if not asset_id or finished_ts is None:
|
||
continue
|
||
finished_dt = base + datetime.timedelta(seconds=finished_ts)
|
||
if finished_dt.year != year:
|
||
continue
|
||
info = booksinfo.get(asset_id, {})
|
||
results.append((asset_id, info, finished_dt))
|
||
except Exception:
|
||
pass
|
||
try:
|
||
if getattr(self, '_debug_finished_books', True):
|
||
print(f"[debug finished] after year filter={len(results)}, year={year}")
|
||
if results:
|
||
print("[debug finished] sample asset_ids:", ','.join(r[0] for r in results[:5]))
|
||
except Exception:
|
||
pass
|
||
# 按完成时间倒序
|
||
results.sort(key=lambda x: x[2], reverse=True)
|
||
return results
|
||
|
||
if __name__ == '__main__':
|
||
manager = BookListManager()
|
||
booksinfo = manager.get_books_info()
|
||
|
||
manager = BookListManager()
|
||
booksinfo = manager.get_books_info()
|
||
from pprint import pprint
|
||
print("\n【前三条示例】")
|
||
for k, v in list(booksinfo.items())[:3]:
|
||
print(f"{k}:")
|
||
pprint(v, sort_dicts=False, indent=2)
|
||
print('-' * 60)
|
||
|
||
print("\n【最近打开时间示例】")
|
||
books_open = manager.get_books_last_open()
|
||
import datetime
|
||
for k, v in list(books_open.items())[:3]:
|
||
ts = v['last_open']
|
||
dt = datetime.datetime(2001, 1, 1) + datetime.timedelta(seconds=ts)
|
||
print(f"{k}: {dt} (timestamp: {ts})")
|
||
|
||
# 展示最近5天有阅读行为的书籍的readtime30d信息
|
||
print("\n【最近5天有阅读行为的书籍的readtime30d信息】")
|
||
books_with_recent_reading = []
|
||
for book_id, book in booksinfo.items():
|
||
readtime30d = book.get('readtime30d', [])
|
||
# 最近5天(含今天)有阅读行为
|
||
if len(readtime30d) >= 5 and any(rt > 0 for rt in readtime30d[:5]):
|
||
books_with_recent_reading.append((book_id, book))
|
||
for book_id, book in books_with_recent_reading:
|
||
print(f"书名: {book.get('displayname', book_id)}")
|
||
print(f"readtime30d: {book.get('readtime30d', [])}")
|
||
print('-' * 60)
|
||
|
||
# 测试每本书今年每月和年总阅读时长
|
||
print("\n【每本书今年每月阅读时长(分钟)和年总阅读时长】")
|
||
for k, v in booksinfo.items():
|
||
print(f"书名: {v.get('displayname', k)}")
|
||
print(f"readtime12m: {v.get('readtime12m', [])}")
|
||
print(f"readtime_year: {v.get('readtime_year', 0)} 分钟")
|
||
print('-' * 60)
|
||
|
||
# 测试get_total_readtime,天数可自定义
|
||
print("\n【最近7天每天所有书籍总阅读时间(分钟)】")
|
||
total_readtime7d = manager.get_total_readtime(days=7)
|
||
for i, mins in enumerate(total_readtime7d):
|
||
if i == 0:
|
||
label = "今天"
|
||
elif i == 1:
|
||
label = "昨天"
|
||
else:
|
||
label = f"{i}天前"
|
||
print(f"{label}: {mins} 分钟")
|
||
|
||
# 测试全年总阅读时间
|
||
print("\n【全年所有书的累计阅读时间(分钟)】")
|
||
total_readtime_year = manager.get_total_readtime_year()
|
||
print(f"全年总阅读时间: {total_readtime_year} 分钟")
|
||
|
||
# 测试全年每月累计阅读时间
|
||
print("\n【全年所有书的月度累计阅读时间(分钟)】")
|
||
total_readtime12m = manager.get_total_readtime12m()
|
||
for month, mins in enumerate(total_readtime12m, start=1):
|
||
print(f"{month}月: {mins} 分钟") |