######################################################### ## @file : parse_markdown_file.py ## @desc : parse hugo markdown file ## @create : 2025/6/22 ## @author : Chengan,doubao AI ## @email : douboer@gmail.com ######################################################### import logging import json from collections import defaultdict import re from pathlib import Path # 配置日志 def setup_logger(log_file=None): logger = logging.getLogger('markdown_parser') logger.setLevel(logging.INFO) # 创建控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 创建文件处理器(如果指定了日志文件) if log_file: file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.DEBUG) logger.addHandler(file_handler) # 设置日志格式 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) if log_file: file_handler.setFormatter(formatter) # 添加处理器 logger.addHandler(console_handler) return logger class MarkdownParser: def __init__(self, log_file=None): self.logger = setup_logger(log_file) self.logger.info("MarkdownParser initialized") def parse_markdown_file(self, file_path): """解析 Markdown 文件,提取元数据和 XHS 内容""" xhsdata = defaultdict(dict) try: # 验证文件路径 file_path = Path(file_path) if not file_path.exists(): self.logger.error(f"文件不存在: {file_path}") raise FileNotFoundError(f"文件不存在: {file_path}") if not file_path.is_file(): self.logger.error(f"不是有效的文件: {file_path}") raise ValueError(f"不是有效的文件: {file_path}") filename = file_path.name self.logger.info(f"开始解析文件: {filename}") with open(file_path, 'r', encoding='utf-8') as file: lines = file.readlines() # 解析 YAML 元数据 metadata = self._parse_metadata(lines) # 处理标签和分类 self._process_tags_categories(metadata, xhsdata, filename) # 处理图片路径 self._process_image_path(metadata, xhsdata, filename) # 添加剩余元数据 for key, value in metadata.items(): if key not in xhsdata[filename]: xhsdata[filename][key] = value # 解析内容和图片 self._parse_content_images(lines, xhsdata, filename) self.logger.info(f"文件解析完成: {filename}") return xhsdata except Exception as e: self.logger.exception(f"解析文件时发生错误: {file_path}") raise def _parse_metadata(self, lines): """解析 Markdown 文件中的 YAML 元数据""" metadata = {} current_key = None current_list = [] in_front_matter = False try: for line in lines: if line.strip() == '---': if in_front_matter: break in_front_matter = True continue if not in_front_matter: continue line = line.rstrip('\n') # 去除行尾换行符 # 处理列表项 if line.startswith(' - '): if current_key: current_list.append(line[4:].strip()) continue # 处理键值对 if ':' in line: # 保存之前的列表项 if current_key and current_list: metadata[current_key] = current_list current_list = [] key, *value_parts = line.split(':', 1) key = key.strip() value = value_parts[0].strip() if value_parts else '' # 检查是否为多行值的开始 if value == '': current_key = key current_list = [] else: metadata[key] = value current_key = None # 保存最后一个列表项 if current_key and current_list: metadata[current_key] = current_list self.logger.debug(f"解析元数据完成: {metadata}") return metadata except Exception as e: self.logger.exception("解析元数据时发生错误") raise def _process_tags_categories(self, metadata, xhsdata, filename): """处理标签和分类字段""" try: for field in ['tags', 'xhstags', 'categories']: if field in metadata: value = metadata[field] if isinstance(value, str): # 处理 "[标签1,标签2]" 格式 if value.startswith('[') and value.endswith(']'): value = value[1:-1].replace('"', '').split(',') value = [tag.strip() for tag in value if tag.strip()] else: value = [value] xhsdata[filename][field] = value self.logger.debug(f"处理 {field}: {value}") except Exception as e: self.logger.exception(f"处理标签/分类时发生错误") raise def _process_image_path(self, metadata, xhsdata, filename): """处理图片路径,移除/img/或img/前缀""" try: if 'image' in metadata: image_path = metadata['image'] clean_image = re.sub(r'^(/?img/)', '', image_path) xhsdata[filename]['image'] = clean_image self.logger.debug(f"处理图片路径: {image_path} -> {clean_image}") except Exception as e: self.logger.exception(f"处理图片路径时发生错误") raise def _parse_content_images(self, lines, xhsdata, filename): """解析内容和图片""" try: xhsdata[filename]['content'] = [] xhsdata[filename]['images'] = [] in_xhs_section = False current_paragraph = [] for line in lines: if line.strip() == '': # 遇到新的 标记,处理当前段落并开始新的 if current_paragraph and in_xhs_section: xhsdata[filename]['content'].append(' '.join(current_paragraph)) current_paragraph = [] self.logger.debug(f"添加段落: {xhsdata[filename]['content'][-1][:30]}...") in_xhs_section = True continue if in_xhs_section: # 结束条件:空行或下一个 if line.strip() == '': if current_paragraph: xhsdata[filename]['content'].append(' '.join(current_paragraph)) current_paragraph = [] self.logger.debug(f"添加段落: {xhsdata[filename]['content'][-1][:30]}...") in_xhs_section = False continue # 提取图片路径 image_matches = re.findall(r'!\[.*?\]\((.*?)\)', line) image_matches.extend(re.findall(r'!\[\[(.*?)\]\]', line)) for match in image_matches: clean_match = re.sub(r'^(/?img/)', '', match) xhsdata[filename]['images'].append(clean_match) self.logger.debug(f"提取图片: {clean_match}") # 去除图片标记后的文本 text = re.sub(r'!\[.*?\]\(.*?\)', '', line).strip() text = re.sub(r'!\[\[.*?\]\]', '', text).strip() if text: current_paragraph.append(text) # 处理最后一个段落 if current_paragraph and in_xhs_section: xhsdata[filename]['content'].append(' '.join(current_paragraph)) self.logger.debug(f"添加最后一个段落: {xhsdata[filename]['content'][-1][:30]}...") except Exception as e: self.logger.exception(f"解析内容和图片时发生错误") raise # 示例使用 if __name__ == "__main__": parser = MarkdownParser(log_file='markdown_parser.log') try: file_path = 'markdown/test.md' result = parser.parse_markdown_file(file_path) print(json.dumps(result, indent=2, ensure_ascii=False)) except Exception as e: print(f"程序运行出错: {e}")