Files
xhsautopublisher/backup/parse_markdown_file.py.bk
2025-09-05 17:20:14 +08:00

243 lines
9.4 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#########################################################
## @file : parse_markdown_file.py
## @desc : parse hugo markdown file
## @create : 2025/6/22
## @author : Chengandoubao AI
## @email : douboer@gmail.com
#########################################################
import logging
import json
from collections import defaultdict
import re
from pathlib import Path
# 配置日志
def setup_logger(log_file=None):
logger = logging.getLogger('markdown_parser')
logger.setLevel(logging.INFO)
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 创建文件处理器(如果指定了日志文件)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
if log_file:
file_handler.setFormatter(formatter)
# 添加处理器
logger.addHandler(console_handler)
return logger
class MarkdownParser:
def __init__(self, log_file=None):
self.logger = setup_logger(log_file)
self.logger.info("MarkdownParser initialized")
def parse_markdown_file(self, file_path):
"""解析 Markdown 文件,提取元数据和 XHS 内容"""
xhsdata = defaultdict(dict)
try:
# 验证文件路径
file_path = Path(file_path)
if not file_path.exists():
self.logger.error(f"文件不存在: {file_path}")
raise FileNotFoundError(f"文件不存在: {file_path}")
if not file_path.is_file():
self.logger.error(f"不是有效的文件: {file_path}")
raise ValueError(f"不是有效的文件: {file_path}")
filename = file_path.name
self.logger.info(f"开始解析文件: {filename}")
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
# 解析 YAML 元数据
metadata = self._parse_metadata(lines)
# 处理标签和分类
self._process_tags_categories(metadata, xhsdata, filename)
# 处理图片路径
self._process_image_path(metadata, xhsdata, filename)
# 添加剩余元数据
for key, value in metadata.items():
if key not in xhsdata[filename]:
xhsdata[filename][key] = value
# 解析内容和图片
self._parse_content_images(lines, xhsdata, filename)
self.logger.info(f"文件解析完成: {filename}")
return xhsdata
except Exception as e:
self.logger.exception(f"解析文件时发生错误: {file_path}")
raise
def _parse_metadata(self, lines):
"""解析 Markdown 文件中的 YAML 元数据"""
metadata = {}
current_key = None
current_list = []
in_front_matter = False
try:
for line in lines:
if line.strip() == '---':
if in_front_matter:
break
in_front_matter = True
continue
if not in_front_matter:
continue
line = line.rstrip('\n') # 去除行尾换行符
# 处理列表项
if line.startswith(' - '):
if current_key:
current_list.append(line[4:].strip())
continue
# 处理键值对
if ':' in line:
# 保存之前的列表项
if current_key and current_list:
metadata[current_key] = current_list
current_list = []
key, *value_parts = line.split(':', 1)
key = key.strip()
value = value_parts[0].strip() if value_parts else ''
# 检查是否为多行值的开始
if value == '':
current_key = key
current_list = []
else:
metadata[key] = value
current_key = None
# 保存最后一个列表项
if current_key and current_list:
metadata[current_key] = current_list
self.logger.debug(f"解析元数据完成: {metadata}")
return metadata
except Exception as e:
self.logger.exception("解析元数据时发生错误")
raise
def _process_tags_categories(self, metadata, xhsdata, filename):
"""处理标签和分类字段"""
try:
for field in ['tags', 'xhstags', 'categories']:
if field in metadata:
value = metadata[field]
if isinstance(value, str):
# 处理 "[标签1,标签2]" 格式
if value.startswith('[') and value.endswith(']'):
value = value[1:-1].replace('"', '').split(',')
value = [tag.strip() for tag in value if tag.strip()]
else:
value = [value]
xhsdata[filename][field] = value
self.logger.debug(f"处理 {field}: {value}")
except Exception as e:
self.logger.exception(f"处理标签/分类时发生错误")
raise
def _process_image_path(self, metadata, xhsdata, filename):
"""处理图片路径,移除/img/或img/前缀"""
try:
if 'image' in metadata:
image_path = metadata['image']
clean_image = re.sub(r'^(/?img/)', '', image_path)
xhsdata[filename]['image'] = clean_image
self.logger.debug(f"处理图片路径: {image_path} -> {clean_image}")
except Exception as e:
self.logger.exception(f"处理图片路径时发生错误")
raise
def _parse_content_images(self, lines, xhsdata, filename):
"""解析内容和图片"""
try:
xhsdata[filename]['content'] = []
xhsdata[filename]['images'] = []
in_xhs_section = False
current_paragraph = []
for line in lines:
if line.strip() == '<!--xhs-->':
# 遇到新的 <!--xhs--> 标记,处理当前段落并开始新的
if current_paragraph and in_xhs_section:
xhsdata[filename]['content'].append(' '.join(current_paragraph))
current_paragraph = []
self.logger.debug(f"添加段落: {xhsdata[filename]['content'][-1][:30]}...")
in_xhs_section = True
continue
if in_xhs_section:
# 结束条件:空行或下一个 <!--xhs-->
if line.strip() == '':
if current_paragraph:
xhsdata[filename]['content'].append(' '.join(current_paragraph))
current_paragraph = []
self.logger.debug(f"添加段落: {xhsdata[filename]['content'][-1][:30]}...")
in_xhs_section = False
continue
# 提取图片路径
image_matches = re.findall(r'!\[.*?\]\((.*?)\)', line)
image_matches.extend(re.findall(r'!\[\[(.*?)\]\]', line))
for match in image_matches:
clean_match = re.sub(r'^(/?img/)', '', match)
xhsdata[filename]['images'].append(clean_match)
self.logger.debug(f"提取图片: {clean_match}")
# 去除图片标记后的文本
text = re.sub(r'!\[.*?\]\(.*?\)', '', line).strip()
text = re.sub(r'!\[\[.*?\]\]', '', text).strip()
if text:
current_paragraph.append(text)
# 处理最后一个段落
if current_paragraph and in_xhs_section:
xhsdata[filename]['content'].append(' '.join(current_paragraph))
self.logger.debug(f"添加最后一个段落: {xhsdata[filename]['content'][-1][:30]}...")
except Exception as e:
self.logger.exception(f"解析内容和图片时发生错误")
raise
# 示例使用
if __name__ == "__main__":
parser = MarkdownParser(log_file='markdown_parser.log')
try:
file_path = 'markdown/test.md'
result = parser.parse_markdown_file(file_path)
print(json.dumps(result, indent=2, ensure_ascii=False))
except Exception as e:
print(f"程序运行出错: {e}")