# pip install requests requests-oauthlib import sys import os import asyncio sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) from dotenv import load_dotenv load_dotenv(".env") from lib.custom_logger import get_logger logger = get_logger(level=10) from providers.sns.x_sns_scraper import XScraper async def first_time_login(): bot = XScraper(storage_state="x_cookies.json", headless=False, slow_mo=50) await bot.start() await bot.login_manual() input("ログイン完了後に Enter を押してください...") ok = await bot.is_logged_in() print("Logged in?", ok) await bot.save_state() await bot.stop() # asyncio.run(first_time_login()) async def run_headless(): bot = XScraper(storage_state="x_cookies.json", headless=True) await bot.start() print("already logged in?", await bot.is_logged_in()) # ここに処理を書く(検索/会話取得など、次のステップで実装) items = await bot.search_tweets("OpenAI lang:ja -is:retweet", 30) logger.info(f"Found {len(items)} tweets") for tweet in items : logger.info(f"- {tweet['id']}: {tweet['text']}") await bot.stop() asyncio.run(run_headless()) # async def example_get_tweet_scraper(): # bot = XScraper(storage_state="x_cookies.json", headless=False, slow_mo=100) # await bot.start() # # 初回だけ:手動ログインして Cookie を保存 # # await bot.login_manual() # # await asyncio.sleep(240) # 60秒待つ # # 検索で収集 # res = await bot.search_live("OpenAI lang:ja -is:retweet", scroll_secs=6) # print("search tweets:", len(res)) # if res: # print(res[0]) # await bot.stop() # asyncio.run(example_get_tweet_scraper()) from pathlib import Path from playwright.async_api import async_playwright, TimeoutError STATE = "x_cookies.json" async def save_state_once(): async with async_playwright() as p: browser = await p.chromium.launch(headless=False, slow_mo=50) ctx = await browser.new_context() page = await ctx.new_page() await page.goto("https://x.com/login", wait_until="domcontentloaded") input("ログインを完了したら Enter...") # ホームが開ける=ログイン確認してから保存 await page.goto("https://x.com/home", wait_until="domcontentloaded") await page.wait_for_selector('[aria-label="Account menu"]', timeout=15000) await ctx.storage_state(path=STATE) # ★ここで保存 await ctx.close(); await browser.close() async def use_saved_state_headless(): async with async_playwright() as p: browser = await p.chromium.launch(headless=True) ctx = await browser.new_context(storage_state=STATE) page = await ctx.new_page() await page.goto("https://x.com/home", wait_until="domcontentloaded") # ここでログイン要求が出るなら state が効いていない # save_state_once() # asyncio.run(save_state_once()) asyncio.run(use_saved_state_headless())