import os import frontmatter from wordpress_xmlrpc import Client, WordPressPost from wordpress_xmlrpc.methods.posts import GetPosts, NewPost from datetime import datetime # 配置部分 hexo_posts_dir = '/home/songtianlun/Sync/Develop/frytea/source/_posts' # Hexo 文章目录 wordpress_url = "https://blog.lnf1.skybyte.me/xmlrpc.php" wordpress_username = 'songtianlun' wordpress_password = 'sotilu,WP2024' # 初始化 WordPress 客户端 wp = Client(wordpress_url, wordpress_username, wordpress_password) # 上传统计信息 total_posts = 0 successful_posts = 0 failed_posts = 0 duplicate_posts = [] def get_all_wp_posts(): print("Fetching existing posts from WordPress...") all_posts = [] offset = 0 increment = 20 # 每次获取的文章数量,可以调整 while True: batch = wp.call(GetPosts({'number': increment, 'offset': offset})) if not batch: break all_posts.extend(batch) offset += increment print(f"Total existing posts fetched: {len(all_posts)}") return {post.title: post.id for post in all_posts} def scan_directory_for_posts(directory, category_prefix=""): posts = [] post_count = 1 for root, dirs, files in os.walk(directory): category = category_prefix + os.path.basename(root) # 使用文件夹作为类别 for file in files: if file.endswith('.md'): file_path = os.path.join(root, file) post_title, post_data = process_md_file(file_path) if post_title and post_data: posts.append((post_title, post_data, category, post_count)) post_count += 1 return posts def process_md_file(file_path): with open(file_path, 'r', encoding='utf-8') as f: post_data = frontmatter.load(f) # 构造 WordPress 文章 post_title = post_data['title'] if 'title' in post_data else os.path.basename(file_path) return post_title, post_data def create_wordpress_post(post_title, post_data, category): # 构造 WordPress 文章 post = WordPressPost() post.title = post_title post.content = post_data.content post.terms_names = { 'category': [category], 'post_tag': post_data['tags'] if 'tags' in post_data else [], } if 'date' in post_data: post.date = datetime.strptime(str(post_data['date']), '%Y-%m-%d %H:%M:%S') post.post_status = 'publish' # 或 'draft' 保存为草稿 return post def upload_post_with_retries(post, post_number, retries=3): global successful_posts, failed_posts attempt = 0 while attempt < retries: try: post_id = wp.call(NewPost(post)) print(f"Post {post_number}: '{post.title}' - Upload successful. WordPress ID: {post_id}") successful_posts += 1 return True except Exception as e: attempt += 1 print(f"Post {post_number}: '{post.title}' - Attempt {attempt} failed with error: {e}") if attempt == retries: print(f"Post {post_number}: '{post.title}' - Failed to upload after {retries} attempts.") failed_posts += 1 return False def upload_posts_to_wordpress(posts_to_upload): global total_posts total_posts = len(posts_to_upload) for post, post_number in posts_to_upload: try: upload_post_with_retries(post, post_number) except Exception: print(f"Post {post_number}: '{post.title}' - Failed permanently.") if __name__ == "__main__": existing_wp_posts = get_all_wp_posts() posts = scan_directory_for_posts(hexo_posts_dir) posts_to_upload = [] for post_title, post_data, category, post_number in posts: if post_title in existing_wp_posts: print(f"Post '{post_title}' skipped: already exists.") duplicate_posts.append((post_title, post_number)) else: post = create_wordpress_post(post_title, post_data, category) posts_to_upload.append((post, len(posts_to_upload) + 1)) upload_posts_to_wordpress(posts_to_upload) print("Migration completed!") print(f"Total posts processed: {total_posts}") print(f"Successfully uploaded: {successful_posts}") print(f"Failed uploads: {failed_posts}") if duplicate_posts: print("\nDuplicate posts found:") for title, post_number in duplicate_posts: print(f"Title: {title} - File Number: {post_number}") else: print("\nNo duplicate posts found.")