""" Gmail API 邮件读取工具 使用前准备: 1. 访问 https://console.cloud.google.com/ 创建项目 2. 启用 Gmail API 3. 创建 OAuth 2.0 凭据(桌面应用类型),下载 credentials.json 放到本目录 4. 首次运行会弹出浏览器授权,授权后自动生成 token.json """ import os import base64 import json from datetime import datetime from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from email.utils import parsedate_to_datetime # 只读权限 SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] # 凭据文件路径(和脚本同目录) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) CREDENTIALS_FILE = os.path.join(BASE_DIR, 'credentials.json') TOKEN_FILE = os.path.join(BASE_DIR, 'token.json') def get_service(): """获取 Gmail API 服务实例""" creds = None # 尝试加载已有 token if os.path.exists(TOKEN_FILE): creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) # token 无效或过期,重新授权 if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: if not os.path.exists(CREDENTIALS_FILE): print(f"❌ 找不到 {CREDENTIALS_FILE}") print("请从 Google Cloud Console 下载 OAuth 凭据文件,命名为 credentials.json 放到本目录") return None flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES) creds = flow.run_local_server(port=0) # 保存 token 供下次使用 with open(TOKEN_FILE, 'w') as f: f.write(creds.to_json()) return build('gmail', 'v1', credentials=creds) def decode_body(payload): """递归解析邮件正文(优先纯文本)""" # 直接有 body data if 'body' in payload and payload['body'].get('data'): return base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8', errors='ignore') # 多部分邮件,递归查找 if 'parts' in payload: # 优先找 text/plain for part in payload['parts']: if part.get('mimeType') == 'text/plain': data = part['body'].get('data', '') if data: return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore') # 没有纯文本,找 text/html for part in payload['parts']: if part.get('mimeType') == 'text/html': data = part['body'].get('data', '') if data: return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore') # 递归子部分 for part in payload['parts']: result = decode_body(part) if result: return result return None def get_header(headers, name): """从 headers 列表中取指定字段""" for h in headers: if h['name'].lower() == name.lower(): return h['value'] return '' def list_emails(service, query='', label_ids=None, max_results=10): """ 列出邮件 :param query: Gmail 搜索语法,如 'from:xxx@gmail.com' 'subject:报告' 'is:unread' :param label_ids: 标签过滤,如 ['INBOX'], ['UNREAD'] :param max_results: 最多返回条数 """ params = {'userId': 'me', 'maxResults': max_results} if query: params['q'] = query if label_ids: params['labelIds'] = label_ids results = service.users().messages().list(**params).execute() return results.get('messages', []) def read_email(service, msg_id): """读取单封邮件详情""" msg = service.users().messages().get(userId='me', id=msg_id, format='full').execute() headers = msg['payload']['headers'] subject = get_header(headers, 'Subject') or '(无主题)' sender = get_header(headers, 'From') to = get_header(headers, 'To') date_str = get_header(headers, 'Date') body = decode_body(msg['payload']) or '(无正文)' # 解析日期 try: date = parsedate_to_datetime(date_str) date_str = date.strftime('%Y-%m-%d %H:%M:%S') except Exception: pass return { 'id': msg_id, 'subject': subject, 'from': sender, 'to': to, 'date': date_str, 'body': body, 'labels': msg.get('labelIds', []), 'snippet': msg.get('snippet', ''), } def get_attachments(service, msg_id, save_dir=None): """下载邮件附件""" msg = service.users().messages().get(userId='me', id=msg_id, format='full').execute() attachments = [] if save_dir is None: save_dir = os.path.join(BASE_DIR, 'attachments') def _find_attachments(payload): if 'parts' in payload: for part in payload['parts']: filename = part.get('filename', '') if filename and part['body'].get('attachmentId'): att = service.users().messages().attachments().get( userId='me', messageId=msg_id, id=part['body']['attachmentId'] ).execute() data = base64.urlsafe_b64decode(att['data']) os.makedirs(save_dir, exist_ok=True) filepath = os.path.join(save_dir, filename) with open(filepath, 'wb') as f: f.write(data) attachments.append({'filename': filename, 'path': filepath, 'size': len(data)}) print(f" 📎 已保存附件: {filename} ({len(data)} bytes)") # 递归 _find_attachments(part) _find_attachments(msg['payload']) return attachments # ============ 使用示例 ============ if __name__ == '__main__': service = get_service() if not service: exit(1) print("=" * 60) print("📬 Gmail 邮件读取") print("=" * 60) # --- 示例1: 读取收件箱最近 5 封邮件 --- print("\n📥 收件箱最近 5 封邮件:\n") messages = list_emails(service, label_ids=['INBOX'], max_results=5) for m in messages: email_data = read_email(service, m['id']) print(f"📧 主题: {email_data['subject']}") print(f" 发件人: {email_data['from']}") print(f" 日期: {email_data['date']}") print(f" 摘要: {email_data['snippet'][:80]}...") print() # --- 示例2: 搜索特定邮件(取消注释使用)--- # messages = list_emails(service, query='subject:报告 is:unread', max_results=5) # --- 示例3: 读取完整正文 --- # if messages: # email_data = read_email(service, messages[0]['id']) # print(f"\n完整正文:\n{email_data['body']}") # --- 示例4: 下载附件 --- # if messages: # get_attachments(service, messages[0]['id'])