######################################################### ## @file : parse_markdown_file.py ## @desc : parse hugo markdown file ## @create : 2025/6/22 ## @author : Chengan,doubao AI ## @email : douboer@gmail.com ######################################################### import json import re from pathlib import Path from collections import defaultdict from logger_utils import CommonLogger class MarkdownParser: def __init__(self, log_file=None): self.logger = CommonLogger(log_file).get_logger() self.logger.info("MarkdownParser initialized") def parse_markdown_file(self, file_path): """解析 Markdown 文件,提取元数据和 XHS 内容""" xhsdata = defaultdict(dict) try: # 验证文件路径 file_path = Path(file_path) if not file_path.exists(): self.logger.error(f"文件不存在: {file_path}") raise FileNotFoundError(f"文件不存在: {file_path}") if not file_path.is_file(): self.logger.error(f"不是有效的文件: {file_path}") raise ValueError(f"不是有效的文件: {file_path}") filename = file_path.name self.logger.info(f"开始解析文件: {filename}") with open(file_path, 'r', encoding='utf-8') as file: lines = file.readlines() # 解析 YAML 元数据 metadata = self._parse_metadata(lines) # 处理标签和分类 self._process_tags_categories(metadata, xhsdata, filename) # 处理图片路径 self._process_image_path(metadata, xhsdata, filename) # 添加剩余元数据 for key, value in metadata.items(): if key not in xhsdata[filename]: xhsdata[filename][key] = value # 解析内容和图片 self._parse_content_images(lines, xhsdata, filename) self.logger.info(f"文件解析完成: {filename}") return xhsdata except Exception as e: self.logger.exception(f"解析文件时发生错误: {file_path}") raise def _parse_metadata(self, lines): """解析 Markdown 文件中的 YAML 元数据""" metadata = {} current_key = None current_list = [] in_front_matter = False try: for line in lines: if line.strip() == '---': if in_front_matter: break in_front_matter = True continue if not in_front_matter: continue line = line.rstrip('\n') # 去除行尾换行符 # 处理列表项 if line.startswith(' - '): if current_key: current_list.append(line[4:].strip()) continue # 处理键值对 if ':' in line: # 保存之前的列表项 if current_key and current_list: metadata[current_key] = current_list current_list = [] key, *value_parts = line.split(':', 1) key = key.strip() value = value_parts[0].strip() if value_parts else '' # 检查是否为多行值的开始 if value == '': current_key = key current_list = [] else: metadata[key] = value current_key = None # 保存最后一个列表项 if current_key and current_list: metadata[current_key] = current_list self.logger.debug(f"解析元数据完成: {metadata}") return metadata except Exception as e: self.logger.exception("解析元数据时发生错误") raise def _process_tags_categories(self, metadata, xhsdata, filename): """处理标签和分类字段""" try: for field in ['tags', 'xhstags', 'categories']: if field in metadata: value = metadata[field] if isinstance(value, str): # 处理 "[标签1,标签2]" 格式 if value.startswith('[') and value.endswith(']'): value = value[1:-1].replace('"', '').split(',') value = [tag.strip() for tag in value if tag.strip()] else: value = [value] xhsdata[filename][field] = value self.logger.debug(f"处理 {field}: {value}") except Exception as e: self.logger.exception(f"处理标签/分类时发生错误") raise def _process_image_path(self, metadata, xhsdata, filename): """处理图片路径,移除/img/或img/前缀""" try: if 'image' in metadata: image_path = metadata['image'] clean_image = re.sub(r'^(/?img/)', '', image_path) xhsdata[filename]['image'] = clean_image self.logger.debug(f"处理图片路径: {image_path} -> {clean_image}") except Exception as e: self.logger.exception(f"处理图片路径时发生错误") raise def _parse_content_images(self, lines, xhsdata, filename): """解析内容和图片""" try: xhsdata[filename]['content'] = [] xhsdata[filename]['images'] = [] xhsdata[filename]['tables'] = [] # 新增表格存储字段 in_xhs_section = False current_paragraph = [] in_table = False table_rows = [] last_line_was_heading = False # 标记上一行是否为标题 for line in lines: if line.strip() == '': # 遇到新的 标记,处理当前段落并开始新的 if current_paragraph and in_xhs_section: xhsdata[filename]['content'].append(' '.join(current_paragraph)) current_paragraph = [] self.logger.debug(f"添加段落: {xhsdata[filename]['content'][-1][:30]}...") in_xhs_section = True last_line_was_heading = False # 重置标题标记 continue if in_xhs_section: # 结束条件:空行或下一个 if line.strip() == '': if current_paragraph: xhsdata[filename]['content'].append(' '.join(current_paragraph)) current_paragraph = [] self.logger.debug(f"添加段落: {xhsdata[filename]['content'][-1][:30]}...") if in_table: xhsdata[filename]['tables'].append(table_rows) table_rows = [] in_table = False in_xhs_section = False last_line_was_heading = False # 重置标题标记 continue # 检查是否为标题行 if line.strip().startswith('#'): # 如果当前有累积的段落内容,先添加到content if current_paragraph: xhsdata[filename]['content'].append(' '.join(current_paragraph)) current_paragraph = [] self.logger.debug(f"添加段落: {xhsdata[filename]['content'][-1][:30]}...") # 将标题单独添加为一个段落 xhsdata[filename]['content'].append(line.strip()) self.logger.debug(f"添加标题: {line.strip()[:30]}...") last_line_was_heading = True # 标记上一行是标题 continue # 如果上一行是标题且当前行没有 标识,则忽略当前行 if last_line_was_heading: self.logger.debug(f"忽略标题后的内容: {line.strip()[:30]}...") continue # 提取图片路径 image_matches = re.findall(r'!\[.*?\]\((.*?)\)', line) image_matches.extend(re.findall(r'!\[\[(.*?)\]\]', line)) for match in image_matches: clean_match = re.sub(r'^(/?img/)', '', match) xhsdata[filename]['images'].append(clean_match) self.logger.debug(f"提取图片: {clean_match}") # 去除图片标记后的文本 text = re.sub(r'!\[.*?\]\(.*?\)', '', line).strip() text = re.sub(r'!\[\[.*?\]\]', '', text).strip() # 检查是否为表格行 if line.strip().startswith('|'): if not in_table: in_table = True row = [cell.strip() for cell in line.strip().strip('|').split('|')] table_rows.append(row) else: if in_table: xhsdata[filename]['tables'].append(table_rows) table_rows = [] in_table = False if text: current_paragraph.append(text) last_line_was_heading = False # 重置标题标记 # 处理最后一个段落 if current_paragraph and in_xhs_section: xhsdata[filename]['content'].append(' '.join(current_paragraph)) self.logger.debug(f"添加最后一个段落: {xhsdata[filename]['content'][-1][:30]}...") # 处理最后一个表格 if in_table: xhsdata[filename]['tables'].append(table_rows) except Exception as e: self.logger.exception(f"解析内容和图片时发生错误") raise def render_to_html(self, xhsdata): """将 xhsdata 内容渲染为 HTML""" html_parts = [] for filename, data in xhsdata.items(): # 添加 xhstitle if 'xhstitle' in data: html_parts.append(f'

{data["xhstitle"]}

') # 添加 content for item in data.get('content', []): if item.startswith('#'): level = len(item.split(' ')[0]) title_text = item.replace('#' * level, '').strip() font_size = 24 - (level - 1) * 2 html_parts.append(f'{title_text}') else: html_parts.append(f'

{item}

') # 添加 tables for table in data.get('tables', []): html_parts.append('') for row in table: html_parts.append('') for cell in row: html_parts.append(f'') html_parts.append('') html_parts.append('
{cell}
') # 添加空行,让 xhstags 与内容之间空 3 行 for _ in range(3): html_parts.append('
') # 添加 xhstags,每个 tag 单独一行 if 'xhstags' in data: for tag in data['xhstags']: html_parts.append(f'#{tag}
') # 添加 xhssign if 'xhssign' in data: html_parts.append(f'


{data["xhssign"]}
') html = '\n'.join(html_parts) return html # 示例使用 if __name__ == "__main__": parser = MarkdownParser(log_file='logs/markdown_parser.log') try: file_path = 'markdown/test.md' result = parser.parse_markdown_file(file_path) print(json.dumps(result, indent=2, ensure_ascii=False)) html_output = parser.render_to_html(result) # 以 GBK 编码保存到文件 with open('temp/test.html', 'w', encoding='gbk') as f: f.write(html_output) except Exception as e: print(f"程序运行出错: {e}")