import config import plistlib import sqlite3 import os from collections import defaultdict class BookListManager: def __init__(self, plist_path=None, db_path=None): self.plist_path = plist_path or config.LOCAL_BOOKS_PLIST self.db_path = db_path or config.LOCAL_LIBRARY_DB self._booksinfo = None self._books_open = None def get_books_info(self): if self._booksinfo is not None: return self._booksinfo booksinfo = defaultdict(dict) with open(self.plist_path, 'rb') as f: plist_data = plistlib.load(f) for book in plist_data.get('Books', []): bk_id = book.get('BKGeneratedItemId') if not bk_id: continue booksinfo[bk_id] = { 'displayname': book.get('BKDisplayName', ''), 'author': book.get('artistName', ''), 'type': book.get('BKBookType', ''), 'bookid': bk_id, 'itemname': book.get('itemName', ''), 'path': book.get('path', ''), 'date': book.get('BKInsertionDate',''), 'updatedate': book.get('updateDate','') } # 统计每本书最近30天每天的阅读时长 try: from annotationdata import AnnotationManager import datetime # 每天最小阅读时长(有笔记) READ_TIME_DAY = getattr(config, 'READ_TIME_DAY', 60) # 单位:分钟 # 无笔记但当天有打开书籍时的阅读时长 READ_TIME_OPEN_DAY = getattr(config, 'READ_TIME_OPEN_DAY', 30) # 单位:分钟 today = datetime.datetime.now().date() manager = AnnotationManager() annotations = manager.get_annotations() # 获取所有书籍的打开时间(ZLASTOPENDATE),单位为苹果时间戳 books_open = self.get_books_last_open() this_year = today.year for bk_id in booksinfo: notes = annotations.get(bk_id, {}) day_notes = {} # 收集每本书所有笔记的创建时间,按天分组 for uuid, note in notes.items(): raw_date = note.get('creationdate') try: dt = datetime.datetime.strptime(raw_date, '%Y-%m-%d %H:%M:%S') day = dt.date() if day not in day_notes: day_notes[day] = [] day_notes[day].append(dt) except Exception: pass # 获取该书的打开时间戳(ZLASTOPENDATE),用于判断无笔记时是否有打开过书籍 open_info = books_open.get(bk_id, {}) last_open_ts = open_info.get('last_open') # 生成最近30天的阅读时长列表 readtime30d = [] for i in range(30): day = today - datetime.timedelta(days=i) times = day_notes.get(day, []) if not times: opened = False if last_open_ts: open_dt = datetime.datetime(2001, 1, 1) + datetime.timedelta(seconds=last_open_ts) if open_dt.date() == day: opened = True readtime = READ_TIME_OPEN_DAY if opened else 0 elif len(times) == 1: readtime = READ_TIME_DAY else: times_sorted = sorted(times) total_minutes = 0 for idx in range(1, len(times_sorted)): delta = (times_sorted[idx] - times_sorted[idx-1]).total_seconds() / 60 if 0 < delta <= 180: total_minutes += int(delta) readtime = total_minutes if total_minutes > 0 else READ_TIME_DAY readtime30d.append(readtime) booksinfo[bk_id]['readtime30d'] = readtime30d # 新增:统计今年每个月的阅读时长和年总阅读时长(遍历今年每一天) readtime12m = [0] * 12 # 今年每月阅读时长 readtime_year = 0 # 今年总阅读时长 first_day = datetime.date(this_year, 1, 1) days_in_year = (today - first_day).days + 1 for i in range(days_in_year): day = first_day + datetime.timedelta(days=i) times = day_notes.get(day, []) if not times: opened = False if last_open_ts: open_dt = datetime.datetime(2001, 1, 1) + datetime.timedelta(seconds=last_open_ts) if open_dt.date() == day: opened = True readtime = READ_TIME_OPEN_DAY if opened else 0 elif len(times) == 1: readtime = READ_TIME_DAY else: times_sorted = sorted(times) total_minutes = 0 for idx in range(1, len(times_sorted)): delta = (times_sorted[idx] - times_sorted[idx-1]).total_seconds() / 60 if 0 < delta <= 180: total_minutes += int(delta) readtime = total_minutes if total_minutes > 0 else READ_TIME_DAY readtime12m[day.month-1] += readtime readtime_year += readtime booksinfo[bk_id]['readtime12m'] = readtime12m booksinfo[bk_id]['readtime_year'] = readtime_year except Exception as e: print(f'警告: 统计readtime30d失败: {e}') self._booksinfo = booksinfo return booksinfo def get_books_last_open(self): if self._books_open is not None: return self._books_open books_open = defaultdict(dict) if not os.path.exists(self.db_path): return books_open try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT ZASSETID, zlastopendate FROM ZBKLIBRARYASSET WHERE zlastopendate IS NOT NULL ''') rows = cursor.fetchall() for row in rows: asset_id, last_open = row if asset_id: books_open[asset_id] = { 'last_open': last_open } conn.close() except Exception as e: print(f'警告: 读取BKLibrary.sqlite失败: {e}') self._books_open = books_open return books_open def get_total_readtime(self, days=30): """ 获取最近days天每天所有书籍的总阅读时间(分钟),返回长度为days的列表。 列表第0项为今天,第1项为昨天,依次类推。 """ booksinfo = self.get_books_info() total = [0] * days for info in booksinfo.values(): readtime30d = info.get('readtime30d', []) for i in range(min(days, len(readtime30d))): total[i] += readtime30d[i] return total def get_total_readtime_year(self): """ 获取全年所有书的累计阅读时间(分钟)。 """ booksinfo = self.get_books_info() total = 0 for info in booksinfo.values(): total += info.get('readtime_year', 0) return total def get_total_readtime12m(self): """ 获取全年所有书的月度累计阅读时间(长度12的列表,单位:分钟)。 """ booksinfo = self.get_books_info() total = [0] * 12 for info in booksinfo.values(): readtime12m = info.get('readtime12m', [0]*12) for i in range(12): total[i] += readtime12m[i] return total if __name__ == '__main__': manager = BookListManager() booksinfo = manager.get_books_info() manager = BookListManager() booksinfo = manager.get_books_info() from pprint import pprint print("\n【前三条示例】") for k, v in list(booksinfo.items())[:3]: print(f"{k}:") pprint(v, sort_dicts=False, indent=2) print('-' * 60) print("\n【最近打开时间示例】") books_open = manager.get_books_last_open() import datetime for k, v in list(books_open.items())[:3]: ts = v['last_open'] dt = datetime.datetime(2001, 1, 1) + datetime.timedelta(seconds=ts) print(f"{k}: {dt} (timestamp: {ts})") # 展示最近5天有阅读行为的书籍的readtime30d信息 print("\n【最近5天有阅读行为的书籍的readtime30d信息】") books_with_recent_reading = [] for book_id, book in booksinfo.items(): readtime30d = book.get('readtime30d', []) # 最近5天(含今天)有阅读行为 if len(readtime30d) >= 5 and any(rt > 0 for rt in readtime30d[:5]): books_with_recent_reading.append((book_id, book)) for book_id, book in books_with_recent_reading: print(f"书名: {book.get('displayname', book_id)}") print(f"readtime30d: {book.get('readtime30d', [])}") print('-' * 60) # 测试每本书今年每月和年总阅读时长 print("\n【每本书今年每月阅读时长(分钟)和年总阅读时长】") for k, v in booksinfo.items(): print(f"书名: {v.get('displayname', k)}") print(f"readtime12m: {v.get('readtime12m', [])}") print(f"readtime_year: {v.get('readtime_year', 0)} 分钟") print('-' * 60) # 测试get_total_readtime,天数可自定义 print("\n【最近7天每天所有书籍总阅读时间(分钟)】") total_readtime7d = manager.get_total_readtime(days=7) for i, mins in enumerate(total_readtime7d): if i == 0: label = "今天" elif i == 1: label = "昨天" else: label = f"{i}天前" print(f"{label}: {mins} 分钟") # 测试全年总阅读时间 print("\n【全年所有书的累计阅读时间(分钟)】") total_readtime_year = manager.get_total_readtime_year() print(f"全年总阅读时间: {total_readtime_year} 分钟") # 测试全年每月累计阅读时间 print("\n【全年所有书的月度累计阅读时间(分钟)】") total_readtime12m = manager.get_total_readtime12m() for month, mins in enumerate(total_readtime12m, start=1): print(f"{month}月: {mins} 分钟")