diff --git a/examples/example_fin.py b/examples/example_fin.py new file mode 100644 index 0000000..32e43f8 --- /dev/null +++ b/examples/example_fin.py @@ -0,0 +1,48 @@ +import sys +import os +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) + +import lib.custom_logger as get_logger +logger = get_logger.get_logger(level=10) + +from dotenv import load_dotenv +load_dotenv() + +from providers.tools.api_j_quants import ApiJQuants + + +def example(): + # 上場企業情報の取得 + client = ApiJQuants() + + # 銘柄一覧から検索する + # result = client.search_companies("トヨタ") + # logger.info(f"Found {len(result)} companies") + # dic = result.to_dict(orient="records") + # logger.info(f"First company: {dic if len(dic) > 0 else 'N/A'}") + + # 銘柄の日足情報を取得する + # df = client.get_price_daily_quotes( + # code="7203", + # start_date="2025-05-01", + # end_date="2025-05-31", + # save_csv="./data/fin/silver/y=2025/m=05/7203_price_daily_quotes_2025-06.csv", + # ) + # logger.info(f"Got {len(df)} rows of daily price data") + + # # 直近の財務諸表情報を取得する + # df = client.get_price_daily_quotes( + # code="7203", + # save_csv="./data/fin/silver/y=2025/m=09/7203_fins_statements_2025-09-16.csv", + # ) + # logger.info(f"Got {len(df)} rows of daily price data") + + # 直近の財務諸表予定を確認する + df = client.get_fins_announcement( + save_csv="./data/fin/silver/y=2025/m=09/fins_announcement_2025-09-16.csv", + ) + + + + +example() \ No newline at end of file diff --git a/examples/example_news.py b/examples/example_news.py new file mode 100644 index 0000000..2d04c9a --- /dev/null +++ b/examples/example_news.py @@ -0,0 +1,48 @@ +import sys +import os +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) + +import lib.custom_logger as get_logger +logger = get_logger.get_logger(level=10) + +from dotenv import load_dotenv +load_dotenv() + +from providers.tools.api_news_api_org import ApiNewsAPIOrg + + +def example_news(): + api = ApiNewsAPIOrg() + res = api.get_news( + query="technology", # カテゴリを指定, + from_date="2025-09-10", + ) + for article in res["articles"]: + # print(article) + logger.info(f"Title: {article['title']} URL: {article['link']}\ndescription: {article['description']}") + + +def example_headline(): + api = ApiNewsAPIOrg() + res = api.get_headline_news( + query= None, + country= "jp", + category= None, + ) + for article in res["articles"]: + # print(article) + logger.info(f"Title: {article['title']} URL: {article['link']}\ndescription: {article['description']}") + + +def example_source(): + api = ApiNewsAPIOrg() + res = api.get_sources( + category=None, + language="ja", + country=None, + ) + for source in res["sources"]: + # print(article) + logger.info(f"Name: {source['name']} URL: {source['url']}\ndescription: {source['description']}") + +example_source() diff --git a/examples/example_sns.py b/examples/example_sns.py index 62d9548..d46ce3e 100644 --- a/examples/example_sns.py +++ b/examples/example_sns.py @@ -13,10 +13,12 @@ load_dotenv(".env") from lib.custom_logger import get_logger logger = get_logger(level=10) -from providers.sns.api_sns_x import APISNSX +from providers.sns.api_sns_x import ApiSnsX +# from providers.sns.api_youtube import ApiYoutube +from providers.sns.api_youtube_downloader import ApiYoutubeDownloader def example_get_tweet(): - items = APISNSX.search_recent_tweets( + items = ApiSnsX.search_recent_tweets( query="OpenAI lang:ja -is:retweet", max_results=10 ) @@ -25,5 +27,49 @@ def example_get_tweet(): logger.info(f"- {tweet['id']}: {tweet['text']}") +# example_get_tweet() -example_get_tweet() \ No newline at end of file +# def example_get_youtube(): + # client = ApiYoutube() + + # カテゴリ一覧 の取得 + # items = client.get_categories() + # logger.info(f"Found {len(items)} categories") + # for item in items: + # logger.info(f"- {item['id']}: {item['title']}") + + # 人気動画の取得 + # items = client.get_most_popular( + # region_code="JP", + # video_category_id="25", + # max_results=10, + # ) + # logger.info(f"Found {len(items)} popular videos.") + # for item in items: + # logger.info(f"- {item['id']}: {item['snippet']['title']}") + + # 動画の詳細情報を取得 + # items = client.get_videos_by_ids( + # video_ids=["zXJ31wzT3Vo"], + # ) + # for item in items: + # logger.info(f"- {item['id']}: {item['snippet']['title']}") + + # 人気カテゴリを取得する + # items = client.rank_popular_categories( + # region_code="JP", + # ) + # for item in items: + # logger.info(f"- {item}") + +# example_get_youtube() + +def example_youtube_downloader(): + video_url = "https://www.youtube.com/watch?v=mnNwcWzc510" + client = ApiYoutubeDownloader() + info = client.download_audio( + video_url=video_url, + ) + logger.info(f"Downloaded video info: {info}") + +example_youtube_downloader() \ No newline at end of file diff --git a/examples/example_sns_scraper.py b/examples/example_sns_scraper.py index 3ae6ee7..5fda1f4 100644 --- a/examples/example_sns_scraper.py +++ b/examples/example_sns_scraper.py @@ -38,56 +38,19 @@ async def run_headless(): await bot.stop() -asyncio.run(run_headless()) +# asyncio.run(run_headless()) -# async def example_get_tweet_scraper(): -# bot = XScraper(storage_state="x_cookies.json", headless=False, slow_mo=100) -# await bot.start() +async def example_get_trand(): + bot = XScraper(storage_state="x_cookies.json", headless=True) + await bot.start() + try: + trends = await bot.get_trends(limit=10) + for t in trends: + print(t["rank"], t["name"], t["tweet_count"], t["url"]) + finally: + await bot.stop() -# # 初回だけ:手動ログインして Cookie を保存 -# # await bot.login_manual() -# # await asyncio.sleep(240) # 60秒待つ +asyncio.run(example_get_trand()) -# # 検索で収集 -# res = await bot.search_live("OpenAI lang:ja -is:retweet", scroll_secs=6) -# print("search tweets:", len(res)) -# if res: -# print(res[0]) - -# await bot.stop() - - -# asyncio.run(example_get_tweet_scraper()) - - -from pathlib import Path -from playwright.async_api import async_playwright, TimeoutError -STATE = "x_cookies.json" - -async def save_state_once(): - async with async_playwright() as p: - browser = await p.chromium.launch(headless=False, slow_mo=50) - ctx = await browser.new_context() - page = await ctx.new_page() - await page.goto("https://x.com/login", wait_until="domcontentloaded") - input("ログインを完了したら Enter...") - # ホームが開ける=ログイン確認してから保存 - await page.goto("https://x.com/home", wait_until="domcontentloaded") - await page.wait_for_selector('[aria-label="Account menu"]', timeout=15000) - await ctx.storage_state(path=STATE) # ★ここで保存 - await ctx.close(); await browser.close() - -async def use_saved_state_headless(): - async with async_playwright() as p: - browser = await p.chromium.launch(headless=True) - ctx = await browser.new_context(storage_state=STATE) - page = await ctx.new_page() - await page.goto("https://x.com/home", wait_until="domcontentloaded") - # ここでログイン要求が出るなら state が効いていない - - -# save_state_once() -# asyncio.run(save_state_once()) -asyncio.run(use_saved_state_headless()) diff --git a/requirements.txt b/requirements.txt index 22dc4de..16c589a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,4 +26,12 @@ selectolax playwright==1.52.0 # SNS(X) -requests-oauthlib \ No newline at end of file +requests-oauthlib + +# jpx-jquants +jquants-api-client + +# youtube +google-api-python-client +#downloader +yt_dlp \ No newline at end of file diff --git a/src/providers/sns/api_sns_x.py b/src/providers/sns/api_sns_x.py index bfb67b5..6fb2346 100644 --- a/src/providers/sns/api_sns_x.py +++ b/src/providers/sns/api_sns_x.py @@ -6,7 +6,7 @@ from lib.custom_logger import get_logger logger = get_logger() -class APISNSX: +class ApiSnsX: """X (formerly Twitter) API interaction class. diff --git a/src/providers/sns/api_youtube.py b/src/providers/sns/api_youtube.py new file mode 100644 index 0000000..6ef787d --- /dev/null +++ b/src/providers/sns/api_youtube.py @@ -0,0 +1,187 @@ +import os +import time +from typing import Optional, List, Iterable, Tuple, Dict +from googleapiclient.errors import HttpError +from datetime import datetime, timedelta, timezone +from googleapiclient.discovery import build + + + +from lib.custom_logger import get_logger +logger = get_logger() + +class ApiYotube: + """Yotuba (formerly YouTube) API interaction class. + + Notes: + - Yotuba APIを使用して動画やチャンネル情報を取得するためのクラス + - `pip install google-api-python-client` が必要です + + """ + YOTUBA_API_KEY = os.getenv("YOTUBA_API_KEY") + + + def __init__(self, api_key: Optional[str] = None, cache_discovery: bool = True): + self.api_key = api_key or self.YOTUBA_API_KEY + if not self.api_key: + raise ValueError("YOTUBA_API_KEY が未設定です。環境変数または引数 api_key に設定してください。") + # discovery ドキュメントのHttp(S)キャッシュは不要でもOK + self.yt = build("youtube", "v3", developerKey=self.api_key, cache_discovery=cache_discovery) + + + # ===== 基本ユーティリティ ===== + @staticmethod + def _sleep_backoff(i: int): + """指数バックオフ用のスリープ(最大 16秒程度)""" + time.sleep(min(2 ** i, 16)) + + @staticmethod + def _to_iso8601(dt: datetime) -> str: + """UTC の ISO8601 文字列へ""" + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc).isoformat() + + + # ===== リスト操作系(ページング対応の共通ヘルパ) ===== + def _paged_call(self, func, **kwargs) -> Iterable[dict]: + """pageToken を自動で辿って items を yield する共通ヘルパ。""" + page_token = None + i = 0 + while True: + try: + resp = func(pageToken=page_token, **kwargs).execute() + except HttpError as e: + # レート制限/一時エラーは指数バックオフして再試行 + status = getattr(e, "status_code", None) or getattr(e.resp, "status", None) + if status in (403, 429, 500, 503): + logger.warning(f"HTTP {status} / quota or transient error. retrying... ({i})") + self._sleep_backoff(i) + i += 1 + if i > 5: + logger.error("再試行回数オーバー") + raise + continue + raise + + for it in resp.get("items", []): + yield it + page_token = resp.get("nextPageToken") + if not page_token: + break + + # ===== 機能 ===== + def get_categories(self, region_code: str = "JP") -> list[dict]: + """地域別の動画カテゴリ一覧を返す""" + try: + resp = self.yt.videoCategories().list( + part="snippet", + regionCode=region_code + ).execute() + except HttpError as e: + raise + + out = [] + for it in resp.get("items", []): + out.append({ + "id": it.get("id"), + "title": it.get("snippet", {}).get("title"), + "assignable": it.get("snippet", {}).get("assignable", False), + }) + return out + + def get_most_popular( + self, + region_code: str = "JP", + video_category_id: Optional[str] = None, + max_results: int = 50, + parts: str = "snippet,statistics,contentDetails" + ) -> List[dict]: + """公式“人気(mostPopular)”を取得。カテゴリ絞り込み可。""" + kwargs = dict( + part=parts, + chart="mostPopular", + regionCode=region_code, + maxResults=min(max_results, 50) # API制約 + ) + if video_category_id: + kwargs["videoCategoryId"] = video_category_id + + items = list(self._paged_call(self.yt.videos().list, **kwargs)) + return items + + def get_videos_by_ids( + self, + video_ids: List[str], + parts: str = "snippet,statistics,contentDetails" + ) -> List[dict]: + """動画ID配列に対して詳細・統計をまとめて取得。50件ずつに分割。""" + out: List[dict] = [] + chunk = 50 + for i in range(0, len(video_ids), chunk): + sub = video_ids[i:i+chunk] + try: + resp = self.yt.videos().list(part=parts, id=",".join(sub)).execute() + out.extend(resp.get("items", [])) + except HttpError as e: + logger.exception(f"videos.list 失敗: {e}") + raise + return out + + def rank_popular_categories( + self, + region_code: str = "JP", + sample_size: int = 200, + ) -> List[Tuple[str, Dict[str, int]]]: + """mostPopular をページングで収集し、カテゴリごとの + - 件数(動画数) + - 総再生数(viewCount 合計) + を集計してランキング化。 + + 戻り値: [(category_id, {"count": n, "views": total}), ...] を views で降順 + """ + collected: List[dict] = [] + # pageごと50件。sample_size まで集める + per_page = 50 + fetched = 0 + page_token = None + i = 0 + while fetched < sample_size: + try: + resp = self.yt.videos().list( + part="snippet,statistics", + chart="mostPopular", + regionCode=region_code, + maxResults=min(per_page, sample_size - fetched), + pageToken=page_token + ).execute() + except HttpError as e: + status = getattr(e, "status_code", None) or getattr(e.resp, "status", None) + if status in (403, 429, 500, 503): + logger.warning(f"quota/一時エラー: 再試行 {i}") + self._sleep_backoff(i); i += 1 + if i > 5: + raise + continue + raise + + items = resp.get("items", []) + collected.extend(items) + fetched += len(items) + page_token = resp.get("nextPageToken") + if not page_token or len(items) == 0: + break + + # 集計 + category_stats: Dict[str, Dict[str, int]] = {} + for it in collected: + cat = it.get("snippet", {}).get("categoryId") or "unknown" + views = int(it.get("statistics", {}).get("viewCount", 0)) + if cat not in category_stats: + category_stats[cat] = {"count": 0, "views": 0} + category_stats[cat]["count"] += 1 + category_stats[cat]["views"] += views + + # views 降順 + ranked = sorted(category_stats.items(), key=lambda kv: kv[1]["views"], reverse=True) + return ranked \ No newline at end of file diff --git a/src/providers/sns/api_youtube_downloader.py b/src/providers/sns/api_youtube_downloader.py new file mode 100644 index 0000000..638ad48 --- /dev/null +++ b/src/providers/sns/api_youtube_downloader.py @@ -0,0 +1,84 @@ +import os +import yt_dlp + +from lib.custom_logger import get_logger +logger = get_logger() + +class ApiYoutubeDownloader: + """ + YouTube動画ダウンロードクラス + + Notes: + - yt_dlpライブラリを使用してYouTube動画をダウンロードするためのクラス + - `pip install yt_dlp` が必要です + - 著作権に注意して使用してください + """ + + @classmethod + def download_video(cls, video_url:str, output_dir:str="downloads"): + """ + YouTube動画をダウンロードする + + Args: + video_url (str): ダウンロードするYouTube動画のURLまたはVideo_ID + output_dir (str): ダウンロードした動画の保存先パス(ディレクトリ)。デフォルトは "downloads" + Returns: + str: ダウンロードした動画のファイルパス + """ + logger.info(f"Downloading video from URL: {video_url}") + os.makedirs(output_dir, exist_ok=True) + + if not video_url.startswith("http"): + video_url = f"https://www.youtube.com/watch?v={video_url}" + + ydl_opts = { + "outtmpl": os.path.join(output_dir, "%(title)s [%(id)s].%(ext)s"), + # 最良の映像+音声を結合。必要に応じて 'mp4' 固定に再エンコード可 + "format": "bv*+ba/b", + "merge_output_format": "mp4", + "noprogress": False, + "quiet": False, + "restrictfilenames": True, + } + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + ydl.download([video_url]) + + return ydl.prepare_filename(ydl.extract_info(video_url, download=False)) + + + @classmethod + def download_audio(cls, video_url: str, output_dir: str = "downloads") -> str: + """元の音声を変換せず保存(最良の音声トラックをそのまま)""" + os.makedirs(output_dir, exist_ok=True)z + if not video_url.startswith("http"): + video_url = f"https://www.youtube.com/watch?v={video_url}" + + ydl_opts = { + "outtmpl": os.path.join(output_dir, "%(title)s [%(id)s].%(ext)s"), + # 音声トラックを優先的に取る + "format": "bestaudio/best", + "postprocessors": [ + { + "key": "FFmpegExtractAudio", + "preferredcodec": "mp3", + "preferredquality": "192", # 128/192/320 kbps + } + ], + # 互換性重視で 44.1kHz / ステレオにする + "postprocessor_args": ["-ar", "44100", "-ac", "2"], + "prefer_ffmpeg": True, + "restrictfilenames": True, + "noprogress": False, + "quiet": False, + } + + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(video_url, download=True) + # 最終ファイルパスを安全に取得 + path = ( + (info.get("requested_downloads") or [{}])[0].get("filepath") + or info.get("filepath") + or info.get("_filename") + ) + return path \ No newline at end of file diff --git a/src/providers/sns/x_sns_scraper.py b/src/providers/sns/x_sns_scraper.py index 5911778..a1b5d0e 100644 --- a/src/providers/sns/x_sns_scraper.py +++ b/src/providers/sns/x_sns_scraper.py @@ -8,6 +8,9 @@ from urllib.parse import quote # TWEET_RX = re.compile(r"/i/api/graphql/.+/(TweetDetail|TweetResultByRestId|ConversationTimeline)") TWEET_RX = re.compile(r"/i/api/graphql/.+/(TweetDetail|TweetResultByRestId|ConversationTimeline|SearchTimeline)") +TREND_COUNT_RX = re.compile(r"(?P(?:\d{1,3}(?:,\d{3})+|\d+(?:\.\d+)?))(?:\s*[KkMm万億])?|\d+件の投稿") + + def _sg(d, path, default=None): cur = d @@ -152,6 +155,185 @@ async def _fill_with_scroll(page, base_list, limit, tries=5): out = list(items.values()); out.sort(key=k, reverse=True) return out[:limit] +# trend情報の数値パース +def _parse_count(text: str) -> int | None: + """ + '12.3K posts' / '7,654 posts' / '1.2M posts' / '2.3万 件の投稿' / '12件の投稿' をざっくり整数へ。 + 言語・表記ゆれが多いので取り切れない場合は None。 + """ + if not text: + return None + t = text.replace("\u202f", " ").replace("\xa0", " ") + m = TREND_COUNT_RX.search(t) + if not m: + return None + raw = m.group(0) + # 万/億 対応(日本語) + if "万" in raw: + try: + num = float(re.sub(r"[^\d\.]", "", raw)) + return int(num * 10_000) + except Exception: + return None + if "億" in raw: + try: + num = float(re.sub(r"[^\d\.]", "", raw)) + return int(num * 100_000_000) + except Exception: + return None + # 英語K/M + if re.search(r"[Kk]\b", raw): + try: + num = float(re.sub(r"[^\d\.]", "", raw)) + return int(num * 1_000) + except Exception: + return None + if re.search(r"[Mm]\b", raw): + try: + num = float(re.sub(r"[^\d\.]", "", raw)) + return int(num * 1_000_000) + except Exception: + return None + # カンマ区切り or 素の数字 or 「件の投稿」 + try: + digits = re.sub(r"[^\d]", "", raw) + return int(digits) if digits else None + except Exception: + return None + +async def _scrape_trend_cards(page): + await page.wait_for_selector('[data-testid="trend"]', timeout=10_000) + + return await page.evaluate(r""" + () => { + const cleanup = s => (s || '').replace(/[\u202f\xa0]/g, ' ').trim(); + const isCountText = t => /posts|件の投稿/.test(t); + const isPureDigits = t => /^\d+$/.test((t||'').trim()); + const isDot = t => (t||'').trim() === '·'; + const isLabelish = t => /Trending|トレンド|ニュース|エンタメ|スポーツ|政治/i.test(t); + const stripDotParts = t => { + const parts = (t || '').split('·').map(p => cleanup(p)).filter(Boolean); + const good = parts.filter(p => !isPureDigits(p) && !isCountText(p) && !isLabelish(p)); + return good.join(' ').trim() || t; + }; + const absolutize = href => { + if (!href) return null; + if (/^https?:/i.test(href)) return href; + return href.startsWith('/') ? ('https://x.com' + href) : ('https://x.com/' + href); + }; + + const pickAnchor = el => { + // あれば a[href] を使う(地域やUIによって付くこともある) + const sel = [ + 'a[href*="/hashtag/"]', + 'a[href*="/search?"]', + 'a[href^="/i/events/"]', + 'a[href]' + ]; + for (const s of sel) { + const a = el.querySelector(s); + if (a) return a; + } + return null; + }; + + const nameFromHref = href => { + try { + const u = new URL(href); + if (/\/hashtag\//.test(u.pathname)) { + const tag = u.pathname.split('/').pop(); + if (tag) return '#' + decodeURIComponent(tag); + } + if (u.pathname === '/search' && u.searchParams.has('q')) { + const q = u.searchParams.get('q') || ''; + return decodeURIComponent(q); + } + } catch (_) {} + return null; + }; + + const titleFromSpans = el => { + // タイトル候補(#〜 を最優先) + const spans = Array.from(el.querySelectorAll('span')).map(s => cleanup(s.textContent)).filter(Boolean); + + // 1) まず #ハッシュタグ + const hash = spans.find(t => t.startsWith('#') && !isLabelish(t)); + if (hash) return hash; + + // 2) 見出しロールの短文 + const heading = el.querySelector('[role="heading"]'); + if (heading) { + const hs = Array.from(heading.querySelectorAll('span')).map(s => cleanup(s.textContent)).filter(Boolean); + const h = hs.find(t => !isLabelish(t) && !isPureDigits(t) && !isCountText(t) && !isDot(t) && t.length <= 80); + if (h) return stripDotParts(h); + } + + // 3) span 全体から拾う + const cand = spans.find(t => + !isLabelish(t) && !isPureDigits(t) && !isCountText(t) && !isDot(t) && t.length <= 80 + ); + return cand ? stripDotParts(cand) : null; + }; + + const makeUrlFromName = name => { + if (!name) return null; + if (name.startsWith('#')) { + const tag = name.slice(1); + return 'https://x.com/hashtag/' + encodeURIComponent(tag); + } + return 'https://x.com/search?q=' + encodeURIComponent(name) + '&src=trend_click'; + }; + + const parseCount = text => { + if (!text) return null; + const t = text.replace(/\u202f|\xa0/g, ' '); + const m = (t.match(/(\d{1,3}(?:,\d{3})+|\d+(?:\.\d+)?)(?:\s*([KkMm]|万|億))?/) || [])[0]; + if (!m) return null; + if (/件の投稿/.test(t)) { + const d = (t.match(/\d[\d,]*/)||[''])[0].replace(/,/g,''); + return d ? parseInt(d,10) : null; + } + const num = parseFloat(m.replace(/,/g,'')); + if (/[Kk]\b/.test(m)) return Math.round(num * 1_000); + if (/[Mm]\b/.test(m)) return Math.round(num * 1_000_000); + if (/万/.test(m)) return Math.round(num * 10_000); + if (/億/.test(m)) return Math.round(num * 100_000_000); + return Math.round(num); + }; + + const cards = Array.from(document.querySelectorAll('[data-testid="trend"]')); + const out = []; + let rank = 1; + + for (const el of cards) { + const a = pickAnchor(el); + const href = a ? a.getAttribute('href') : null; + const urlFromA = absolutize(href); + const spans = Array.from(el.querySelectorAll('span')).map(s => cleanup(s.textContent)).filter(Boolean); + const countText = spans.find(t => /posts|件の投稿/.test(t)) || null; + + // name は URL優先→DOM + let name = urlFromA ? nameFromHref(urlFromA) : null; + if (!name) name = titleFromSpans(el); + + // URL は name から生成( が無くてもOK) + const url = urlFromA || makeUrlFromName(name); + const search_url = name ? ('https://x.com/search?q=' + encodeURIComponent(name) + '&src=trend_click') : null; + + out.push({ + name, + rank: rank++, + url, + search_url, + tweet_count_text: countText, + tweet_count: parseCount(countText || '') + }); + } + return out; + } + """) + + class XScraper: """ @@ -255,3 +437,39 @@ class XScraper: url = f"https://x.com/{username.lstrip('@')}" first = await _goto_and_scrape(self.page, url) return await _fill_with_scroll(self.page, first, limit) + + async def get_trends(self, limit: int = 10) -> list[dict]: + """ + トレンド一覧を取得(ログイン済み推奨)。 + まず DOM で抜き、見つからない場合は少しスクロールして追加読み込み。 + """ + await self.page.goto("https://x.com/explore/tabs/trending", wait_until="domcontentloaded") + await asyncio.sleep(1.2) # 初期XHR待ち + + # 1回目抽出 + items = await _scrape_trend_cards(self.page) + + # まだ足りなければ軽くスクロール(2回) + tries = 0 + while len(items) < limit and tries < 2: + await self.page.evaluate("window.scrollBy(0, document.body.scrollHeight);") + await asyncio.sleep(1.0) + more = await _scrape_trend_cards(self.page) + # rank重複が出やすいので name/url で重複排除 + seen = {(i.get("name"), i.get("url")) for i in items} + for m in more: + key = (m.get("name"), m.get("url")) + if key not in seen: + items.append(m) + seen.add(key) + tries += 1 + + # tweet_count を数値化 + for it in items: + it["tweet_count"] = _parse_count(it.get("tweet_count_text") or "") + + # rankでソートして切り詰め + items.sort(key=lambda x: x.get("rank") or 9999) + return items[:limit] + + \ No newline at end of file diff --git a/src/providers/tools/api_j_quants.py b/src/providers/tools/api_j_quants.py new file mode 100644 index 0000000..6bb48d5 --- /dev/null +++ b/src/providers/tools/api_j_quants.py @@ -0,0 +1,253 @@ +import os +import re +import unicodedata +import jquantsapi +import pandas as pd + +from lib.custom_logger import get_logger +logger = get_logger() + +# 依存関係版(曖昧一致/かな変換が必要なら) +try: + import jaconv + from rapidfuzz import fuzz + _HAS_FUZZ = True +except Exception: + # 依存が未導入でも部分一致/正規表現は動かす + logger.warning("jaconv, rapidfuzz not found. fuzzy search is disabled.") + _HAS_FUZZ = False + + +def _norm_basic(s: str) -> str: + """依存なしの基本正規化(全角→半角、大小無視)""" + if s is None: + return "" + return unicodedata.normalize("NFKC", str(s)).casefold() + + +def _norm_full(s: str) -> str: + """依存ありの強力正規化(かな/カナも揃える)""" + if s is None: + return "" + s = unicodedata.normalize("NFKC", str(s)) + if 'jaconv' in globals(): + s = jaconv.kata2hira(jaconv.z2h(s, kana=True, digit=True, ascii=True)) + return s.casefold() + + +class ApiJQuants: + """ + J-Quants の銘柄マスタ取得・検索 + """ + JPX_JQUANTS_REFRESH_TOKEN = os.getenv("JPX_JQUANTS_REFRESH_TOKEN") + CSV_FILE_PATH = os.getenv("JPX_JQUANTS_LIST_DATA_PATH", "data/fin/bronze/listed_info.csv") + + def __init__(self, csv_file: str | None = None): + self.cli = jquantsapi.Client(refresh_token=self.JPX_JQUANTS_REFRESH_TOKEN) + self.csv_file = csv_file or self.CSV_FILE_PATH + self._listed_df: pd.DataFrame | None = None + + # --- 公開API ---------------------------------------------------- + + @classmethod + def get_listed_info(cls, save_csv: str | None = None) -> pd.DataFrame: + """ + 銘柄一覧の取得(J-Quantsから)。必要ならCSV保存も。 + Returns: pd.DataFrame + """ + client = jquantsapi.Client(refresh_token=cls.JPX_JQUANTS_REFRESH_TOKEN) + df = client.get_listed_info().copy() + + if save_csv: + os.makedirs(os.path.dirname(save_csv), exist_ok=True) + df.to_csv(save_csv, index=False, encoding="utf-8-sig") + logger.info(f"銘柄一覧をCSVに保存しました: {save_csv}") + + return df + + @classmethod + def get_price_daily_quotes( + cls, + code: str, + start_date: str = None, + end_date: str = None, + save_csv:str = None, + ) -> pd.DataFrame: + """ + 指定銘柄の日次株価情報を取得 + + Args: + code (str): 銘柄コード(4桁) + start_date (str): 取得開始日 (YYYY-MM-DD) + end_date (str): 取得終了日 (YYYY-MM-DD) + + Returns: pd.DataFrame + """ + client = jquantsapi.Client(refresh_token=cls.JPX_JQUANTS_REFRESH_TOKEN) + df:pd.DataFrame = client.get_prices_daily_quotes(code=code, from_yyyymmdd=start_date, to_yyyymmdd=end_date).copy() + if save_csv: + os.makedirs(os.path.dirname(save_csv), exist_ok=True) + df.to_csv(save_csv, index=False, encoding="utf-8-sig") + logger.info(f"{code}の日次株価情報をCSVに保存しました: {save_csv}") + + return df + + + + @classmethod + def get_fins_statements( + cls, + code: str, + priod_date: str = None, + save_csv:str = None, + ) -> pd.DataFrame: + """ + 直近の財務諸表を取得する + + Args: + code (str): 銘柄コード(4桁) + priod_date (str): 取得対象の決算日 (YYYYMMDD)。Noneの場合はその銘柄のすべての財務諸表(過去の四半期・年度ごと)が返ります + + Returns: pd.DataFrame + """ + client = jquantsapi.Client(refresh_token=cls.JPX_JQUANTS_REFRESH_TOKEN) + df:pd.DataFrame = client.get_fins_statements(code=code, data_yyyymmdd=priod_date).copy() + if save_csv: + os.makedirs(os.path.dirname(save_csv), exist_ok=True) + df.to_csv(save_csv, index=False, encoding="utf-8-sig") + logger.info(f"{code}の財務諸表をCSVに保存しました: {save_csv}") + return df + + + @classmethod + def get_fins_announcement( + cls, + save_csv:str = None, + ) -> pd.DataFrame: + """ + 指定銘柄の有価証券報告書等の提出予定情報を取得 + + Args: + # code (str): 銘柄コード(4桁) + # start_date (str): 取得開始日 (YYYY-MM-DD) + # end_date (str): 取得終了日 (YYYY-MM-DD) + save_csv (str): 保存先のCSVファイルパス + + Returns: pd.DataFrame + """ + client = jquantsapi.Client(refresh_token=cls.JPX_JQUANTS_REFRESH_TOKEN) + df:pd.DataFrame = client.get_fins_announcement().copy() + if save_csv: + os.makedirs(os.path.dirname(save_csv), exist_ok=True) + df.to_csv(save_csv, index=False, encoding="utf-8-sig") + logger.info(f"有価証券報告書等の提出予定情報をCSVに保存しました: {save_csv}") + return df + + + + + + + def load_listed(self, force: bool = False) -> pd.DataFrame: + """ + 銘柄一覧を読み込み(CSVがあればCSV、なければAPI)。正規化列もここで必ず作る。 + """ + if self._listed_df is None or force: + if os.path.exists(self.csv_file) and not force: + df = pd.read_csv(self.csv_file, dtype=str) # コード等の先頭ゼロ欠落対策で文字列 + else: + df = self.get_listed_info(save_csv=self.csv_file) + + df = self._prepare_norm_columns(df) + self._listed_df = df + + return self._listed_df + + def search_companies( + self, + q: str, + mode: str = "partial", # "partial" | "regex" | "fuzzy" + fields: list[str] = None, + limit: int = 50, + threshold: int = 70, # fuzzy のしきい値 + ) -> pd.DataFrame: + """ + 銘柄一覧から企業を検索する + + Args: + q (str): 検索クエリ + mode (str): 検索モード + - "partial": 部分一致(正規化して大小/全半/カナ差異を吸収) + - "regex": 正規表現(高度なパターン検索) + - "fuzzy": あいまい一致(表記ゆれやタイポに強い) + fields (list[str] | None): 検索対象のフィールド + 例: ["_CompanyName_norm", "_CompanyNameEnglish_norm", "_SectorName_norm", "_Code_norm"] + None の場合は上記をデフォルトとする + limit (int): 取得する最大件数 + threshold (int): fuzzy モードで採用するスコアのしきい値 (0–100) + + Returns: + pd.DataFrame: 検索結果の DataFrame を返す + """ + df = self.load_listed() + if fields is None: + fields = ["_CompanyName_norm", "_CompanyNameEnglish_norm", "_SectorName_norm", "_Code_norm"] + + q_norm = _norm_full(q) + + if mode == "partial": + mask = pd.Series(False, index=df.index) + for col in fields: + if col in df.columns: + mask = mask | df[col].str.contains(re.escape(q_norm), na=False) + return df[mask].head(limit).copy() + + if mode == "regex": + pattern = re.compile(q, flags=re.IGNORECASE) + mask = pd.Series(False, index=df.index) + for raw_col in ["Code", "CompanyName", "CompanyNameEnglish", "SectorName"]: + if raw_col in df.columns: + mask = mask | df[raw_col].astype(str).str.contains(pattern, na=False) + return df[mask].head(limit).copy() + + if mode == "fuzzy": + if not _HAS_FUZZ: + raise RuntimeError("fuzzy 検索には 'rapidfuzz' と 'jaconv' のインストールが必要です。") + key_series = ( + df.get("_CompanyName_norm", "").astype(str) + " " + + df.get("_CompanyNameEnglish_norm", "").astype(str) + " " + + df.get("_SectorName_norm", "").astype(str) + " " + + df.get("_Code_norm", "").astype(str) + ) + scores = key_series.map(lambda s: fuzz.token_set_ratio(q_norm, s)) + hit = ( + df.assign(_score=scores) + .query("_score >= @threshold") # threshold を利用 + .sort_values("_score", ascending=False) + ) + return hit.head(limit).copy() + + raise ValueError("mode must be one of: partial | regex | fuzzy") + + # ===== 内部ユーティリティ ===== + + def _prepare_norm_columns(self, df: pd.DataFrame) -> pd.DataFrame: + """検索に使う正規化列を作成。API取得でもCSV読込でも必ず通す。""" + df = df.copy() + # 文字列化&欠損埋め + for col in ["Code", "CompanyName", "CompanyNameEnglish", "SectorName", "MarketCode"]: + if col in df.columns: + df[col] = df[col].astype(str).fillna("") + + norm = _norm_full if _HAS_FUZZ else _norm_basic + if "Code" in df.columns: + df["_Code_norm"] = df["Code"].map(norm) + if "CompanyName" in df.columns: + df["_CompanyName_norm"] = df["CompanyName"].map(norm) + if "CompanyNameEnglish" in df.columns: + df["_CompanyNameEnglish_norm"] = df["CompanyNameEnglish"].map(norm) + if "SectorName" in df.columns: + df["_SectorName_norm"] = df["SectorName"].map(norm) + if "MarketCode" in df.columns: + df["_MarketCode_norm"] = df["MarketCode"].map(norm) + return df \ No newline at end of file diff --git a/src/providers/tools/api_news_api_org.py b/src/providers/tools/api_news_api_org.py new file mode 100644 index 0000000..98a9f9b --- /dev/null +++ b/src/providers/tools/api_news_api_org.py @@ -0,0 +1,148 @@ +import requests +import os +from lib.custom_logger import get_logger + + +logger = get_logger() + +class ApiNewsAPIOrg: + """ + NewsAPIOrgを操作するクラス + + Notes: + - NewsAPIOrg APIを使用してニュース記事を取得するためのクラス + - APIキーは環境変数 `NEWS_API_ORG_API_KEY` から取得されます + - 詳細なAPIドキュメントは https://newsdata.io/docs/api を参照してください + """ + + NEWS_API_ORG_API_KEY = os.getenv("NEWS_API_ORG_API_KEY") + + @classmethod + def get_news(cls, + query: str = None, + from_date: str = None, + to_date: str = None, + language: str = "jp", + domains: str = None, + excludeDomains: str = None, + pageSize: int = None, + page: int = None, + sortBy: str = "publishedAt",): + """ + NewsAPIOrgからニュース記事を取得する + 世界中のニュースサイトやブログ(15万以上のソース)に公開された記事を全文検索するためのエンドポイント + + Args: + query (str): 検索クエリ + from_date (str): 取得開始日 (YYYY-MM-DD) + to_date (str): 取得終了日 (YYYY-MM-DD) + language (str): 記事の言語 (例: "jp" 日本語) + domains (str): 取得対象のドメイン (カンマ区切り + excludeDomains (str): 除外するドメイン (カンマ区切り) + sortBy (str): ソート順 (relevancy, popularity, publishedAt) + """ + url = "https://newsapi.org/v2/everything" + params = { + "apikey": cls.NEWS_API_ORG_API_KEY, + "q" : query, + "from": from_date, + "to": to_date, + "language": language, + "domains": domains, + "excludeDomains": excludeDomains, + "pageSize": pageSize, + "page": page, + "sortBy": sortBy, + } + # None値は送らない + params = {k: v for k, v in params.items() if v is not None} + + + + response = requests.get(url,params=params) + response.raise_for_status() + json_data = response.json() + if not json_data.get("status") == "ok": + logger.error(f"NewsAPIOrg API Error: {json_data.get('message')}") + raise Exception(f"NewsAPIOrg API Error: {json_data.get('message')}") + logger.debug(f"NewsAPIOrg API Response: {json_data}") + return json_data + + + + @classmethod + def get_headline_news(cls, + query: str = None, + country: str = "jp", + category: str = "technology", + source: str = None, + pageSize: int = None, + page: int = None, + ): + """ + NewsAPIOrgから最新のニュース記事を取得する + 世界中のニュースサイトから 最新のトップニュース・速報(breaking news) を取得するための API。 + + Args: + query (str): 検索クエリ + country (str): 国コード (例: "jp" 日本) + category (str): カテゴリ (business, entertainment, general, health, science, + sports, technology) + pageSize (int): 取得する記事の最大数 (1-100) + page (int): ページ番号 (1から始まる) + """ + url = "https://newsapi.org/v2/top-headlines" + params = { + "apikey": cls.NEWS_API_ORG_API_KEY, + "q" : query, + "country": country, + "category": category, + "source": source, + "pageSize": pageSize, + "page": page, + } + # None値は送らない + params = {k: v for k, v in params.items() if v is not None} + + response = requests.get(url,params=params) + response.raise_for_status() + json_data = response.json() + if not json_data.get("status") == "ok": + logger.error(f"NewsAPIOrg API Error: {json_data.get('message')}") + raise Exception(f"NewsAPIOrg API Error: {json_data.get('message')}") + logger.debug(f"NewsAPIOrg API Response: {json_data}") + return json_data + + @classmethod + def get_sources(cls, + country: str = "jp", + language: str = "jp", + category: str = "technology", + ): + """ + NewsAPIOrgからニュースソースを取得する + ニュースソース(ニュースサイトやブログ)のリストを取得するためのエンドポイント + + Args: + country (str): 国コード (例: "jp" 日本) + language (str): 記事の言語 (例: "jp" 日本語) + category (str): カテゴリ (business, entertainment, general, health, science, + sports, technology) + """ + url = "https://newsapi.org/v2/top-headlines/sources" + params = { + "apikey": cls.NEWS_API_ORG_API_KEY, + "country": country, + "language": language, + "category": category, + } + # None値は送らない + params = {k: v for k, v in params.items() if v is not None} + response = requests.get(url,params=params) + response.raise_for_status() + json_data = response.json() + if not json_data.get("status") == "ok": + logger.error(f"NewsAPIOrg API Error: {json_data.get('message')}") + raise Exception(f"NewsAPIOrg API Error: {json_data.get('message')}") + logger.debug(f"NewsAPIOrg API Response: {json_data}") + return json_data