APIを追加する

This commit is contained in:
ry.yamafuji 2025-09-17 07:03:06 +09:00
parent ac9f5d9032
commit 87a225fdad
11 changed files with 1056 additions and 53 deletions

48
examples/example_fin.py Normal file
View File

@ -0,0 +1,48 @@
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
import lib.custom_logger as get_logger
logger = get_logger.get_logger(level=10)
from dotenv import load_dotenv
load_dotenv()
from providers.tools.api_j_quants import ApiJQuants
def example():
# 上場企業情報の取得
client = ApiJQuants()
# 銘柄一覧から検索する
# result = client.search_companies("トヨタ")
# logger.info(f"Found {len(result)} companies")
# dic = result.to_dict(orient="records")
# logger.info(f"First company: {dic if len(dic) > 0 else 'N/A'}")
# 銘柄の日足情報を取得する
# df = client.get_price_daily_quotes(
# code="7203",
# start_date="2025-05-01",
# end_date="2025-05-31",
# save_csv="./data/fin/silver/y=2025/m=05/7203_price_daily_quotes_2025-06.csv",
# )
# logger.info(f"Got {len(df)} rows of daily price data")
# # 直近の財務諸表情報を取得する
# df = client.get_price_daily_quotes(
# code="7203",
# save_csv="./data/fin/silver/y=2025/m=09/7203_fins_statements_2025-09-16.csv",
# )
# logger.info(f"Got {len(df)} rows of daily price data")
# 直近の財務諸表予定を確認する
df = client.get_fins_announcement(
save_csv="./data/fin/silver/y=2025/m=09/fins_announcement_2025-09-16.csv",
)
example()

48
examples/example_news.py Normal file
View File

@ -0,0 +1,48 @@
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
import lib.custom_logger as get_logger
logger = get_logger.get_logger(level=10)
from dotenv import load_dotenv
load_dotenv()
from providers.tools.api_news_api_org import ApiNewsAPIOrg
def example_news():
api = ApiNewsAPIOrg()
res = api.get_news(
query="technology", # カテゴリを指定,
from_date="2025-09-10",
)
for article in res["articles"]:
# print(article)
logger.info(f"Title: {article['title']} URL: {article['link']}\ndescription: {article['description']}")
def example_headline():
api = ApiNewsAPIOrg()
res = api.get_headline_news(
query= None,
country= "jp",
category= None,
)
for article in res["articles"]:
# print(article)
logger.info(f"Title: {article['title']} URL: {article['link']}\ndescription: {article['description']}")
def example_source():
api = ApiNewsAPIOrg()
res = api.get_sources(
category=None,
language="ja",
country=None,
)
for source in res["sources"]:
# print(article)
logger.info(f"Name: {source['name']} URL: {source['url']}\ndescription: {source['description']}")
example_source()

View File

@ -13,10 +13,12 @@ load_dotenv(".env")
from lib.custom_logger import get_logger
logger = get_logger(level=10)
from providers.sns.api_sns_x import APISNSX
from providers.sns.api_sns_x import ApiSnsX
# from providers.sns.api_youtube import ApiYoutube
from providers.sns.api_youtube_downloader import ApiYoutubeDownloader
def example_get_tweet():
items = APISNSX.search_recent_tweets(
items = ApiSnsX.search_recent_tweets(
query="OpenAI lang:ja -is:retweet",
max_results=10
)
@ -25,5 +27,49 @@ def example_get_tweet():
logger.info(f"- {tweet['id']}: {tweet['text']}")
# example_get_tweet()
example_get_tweet()
# def example_get_youtube():
# client = ApiYoutube()
# カテゴリ一覧 の取得
# items = client.get_categories()
# logger.info(f"Found {len(items)} categories")
# for item in items:
# logger.info(f"- {item['id']}: {item['title']}")
# 人気動画の取得
# items = client.get_most_popular(
# region_code="JP",
# video_category_id="25",
# max_results=10,
# )
# logger.info(f"Found {len(items)} popular videos.")
# for item in items:
# logger.info(f"- {item['id']}: {item['snippet']['title']}")
# 動画の詳細情報を取得
# items = client.get_videos_by_ids(
# video_ids=["zXJ31wzT3Vo"],
# )
# for item in items:
# logger.info(f"- {item['id']}: {item['snippet']['title']}")
# 人気カテゴリを取得する
# items = client.rank_popular_categories(
# region_code="JP",
# )
# for item in items:
# logger.info(f"- {item}")
# example_get_youtube()
def example_youtube_downloader():
video_url = "https://www.youtube.com/watch?v=mnNwcWzc510"
client = ApiYoutubeDownloader()
info = client.download_audio(
video_url=video_url,
)
logger.info(f"Downloaded video info: {info}")
example_youtube_downloader()

View File

@ -38,56 +38,19 @@ async def run_headless():
await bot.stop()
asyncio.run(run_headless())
# asyncio.run(run_headless())
# async def example_get_tweet_scraper():
# bot = XScraper(storage_state="x_cookies.json", headless=False, slow_mo=100)
# await bot.start()
async def example_get_trand():
bot = XScraper(storage_state="x_cookies.json", headless=True)
await bot.start()
try:
trends = await bot.get_trends(limit=10)
for t in trends:
print(t["rank"], t["name"], t["tweet_count"], t["url"])
finally:
await bot.stop()
# # 初回だけ:手動ログインして Cookie を保存
# # await bot.login_manual()
# # await asyncio.sleep(240) # 60秒待つ
asyncio.run(example_get_trand())
# # 検索で収集
# res = await bot.search_live("OpenAI lang:ja -is:retweet", scroll_secs=6)
# print("search tweets:", len(res))
# if res:
# print(res[0])
# await bot.stop()
# asyncio.run(example_get_tweet_scraper())
from pathlib import Path
from playwright.async_api import async_playwright, TimeoutError
STATE = "x_cookies.json"
async def save_state_once():
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False, slow_mo=50)
ctx = await browser.new_context()
page = await ctx.new_page()
await page.goto("https://x.com/login", wait_until="domcontentloaded")
input("ログインを完了したら Enter...")
# ホームが開ける=ログイン確認してから保存
await page.goto("https://x.com/home", wait_until="domcontentloaded")
await page.wait_for_selector('[aria-label="Account menu"]', timeout=15000)
await ctx.storage_state(path=STATE) # ★ここで保存
await ctx.close(); await browser.close()
async def use_saved_state_headless():
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
ctx = await browser.new_context(storage_state=STATE)
page = await ctx.new_page()
await page.goto("https://x.com/home", wait_until="domcontentloaded")
# ここでログイン要求が出るなら state が効いていない
# save_state_once()
# asyncio.run(save_state_once())
asyncio.run(use_saved_state_headless())

View File

@ -27,3 +27,11 @@ playwright==1.52.0
# SNS(X)
requests-oauthlib
# jpx-jquants
jquants-api-client
# youtube
google-api-python-client
#downloader
yt_dlp

View File

@ -6,7 +6,7 @@ from lib.custom_logger import get_logger
logger = get_logger()
class APISNSX:
class ApiSnsX:
"""X (formerly Twitter) API interaction class.

View File

@ -0,0 +1,187 @@
import os
import time
from typing import Optional, List, Iterable, Tuple, Dict
from googleapiclient.errors import HttpError
from datetime import datetime, timedelta, timezone
from googleapiclient.discovery import build
from lib.custom_logger import get_logger
logger = get_logger()
class ApiYotube:
"""Yotuba (formerly YouTube) API interaction class.
Notes:
- Yotuba APIを使用して動画やチャンネル情報を取得するためのクラス
- `pip install google-api-python-client` が必要です
"""
YOTUBA_API_KEY = os.getenv("YOTUBA_API_KEY")
def __init__(self, api_key: Optional[str] = None, cache_discovery: bool = True):
self.api_key = api_key or self.YOTUBA_API_KEY
if not self.api_key:
raise ValueError("YOTUBA_API_KEY が未設定です。環境変数または引数 api_key に設定してください。")
# discovery ドキュメントのHttp(S)キャッシュは不要でもOK
self.yt = build("youtube", "v3", developerKey=self.api_key, cache_discovery=cache_discovery)
# ===== 基本ユーティリティ =====
@staticmethod
def _sleep_backoff(i: int):
"""指数バックオフ用のスリープ(最大 16秒程度)"""
time.sleep(min(2 ** i, 16))
@staticmethod
def _to_iso8601(dt: datetime) -> str:
"""UTC の ISO8601 文字列へ"""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat()
# ===== リスト操作系(ページング対応の共通ヘルパ) =====
def _paged_call(self, func, **kwargs) -> Iterable[dict]:
"""pageToken を自動で辿って items を yield する共通ヘルパ。"""
page_token = None
i = 0
while True:
try:
resp = func(pageToken=page_token, **kwargs).execute()
except HttpError as e:
# レート制限/一時エラーは指数バックオフして再試行
status = getattr(e, "status_code", None) or getattr(e.resp, "status", None)
if status in (403, 429, 500, 503):
logger.warning(f"HTTP {status} / quota or transient error. retrying... ({i})")
self._sleep_backoff(i)
i += 1
if i > 5:
logger.error("再試行回数オーバー")
raise
continue
raise
for it in resp.get("items", []):
yield it
page_token = resp.get("nextPageToken")
if not page_token:
break
# ===== 機能 =====
def get_categories(self, region_code: str = "JP") -> list[dict]:
"""地域別の動画カテゴリ一覧を返す"""
try:
resp = self.yt.videoCategories().list(
part="snippet",
regionCode=region_code
).execute()
except HttpError as e:
raise
out = []
for it in resp.get("items", []):
out.append({
"id": it.get("id"),
"title": it.get("snippet", {}).get("title"),
"assignable": it.get("snippet", {}).get("assignable", False),
})
return out
def get_most_popular(
self,
region_code: str = "JP",
video_category_id: Optional[str] = None,
max_results: int = 50,
parts: str = "snippet,statistics,contentDetails"
) -> List[dict]:
"""公式“人気(mostPopular)”を取得。カテゴリ絞り込み可。"""
kwargs = dict(
part=parts,
chart="mostPopular",
regionCode=region_code,
maxResults=min(max_results, 50) # API制約
)
if video_category_id:
kwargs["videoCategoryId"] = video_category_id
items = list(self._paged_call(self.yt.videos().list, **kwargs))
return items
def get_videos_by_ids(
self,
video_ids: List[str],
parts: str = "snippet,statistics,contentDetails"
) -> List[dict]:
"""動画ID配列に対して詳細・統計をまとめて取得。50件ずつに分割。"""
out: List[dict] = []
chunk = 50
for i in range(0, len(video_ids), chunk):
sub = video_ids[i:i+chunk]
try:
resp = self.yt.videos().list(part=parts, id=",".join(sub)).execute()
out.extend(resp.get("items", []))
except HttpError as e:
logger.exception(f"videos.list 失敗: {e}")
raise
return out
def rank_popular_categories(
self,
region_code: str = "JP",
sample_size: int = 200,
) -> List[Tuple[str, Dict[str, int]]]:
"""mostPopular をページングで収集し、カテゴリごとの
- 件数動画数
- 総再生数(viewCount 合計)
を集計してランキング化
戻り値: [(category_id, {"count": n, "views": total}), ...] views で降順
"""
collected: List[dict] = []
# pageごと50件。sample_size まで集める
per_page = 50
fetched = 0
page_token = None
i = 0
while fetched < sample_size:
try:
resp = self.yt.videos().list(
part="snippet,statistics",
chart="mostPopular",
regionCode=region_code,
maxResults=min(per_page, sample_size - fetched),
pageToken=page_token
).execute()
except HttpError as e:
status = getattr(e, "status_code", None) or getattr(e.resp, "status", None)
if status in (403, 429, 500, 503):
logger.warning(f"quota/一時エラー: 再試行 {i}")
self._sleep_backoff(i); i += 1
if i > 5:
raise
continue
raise
items = resp.get("items", [])
collected.extend(items)
fetched += len(items)
page_token = resp.get("nextPageToken")
if not page_token or len(items) == 0:
break
# 集計
category_stats: Dict[str, Dict[str, int]] = {}
for it in collected:
cat = it.get("snippet", {}).get("categoryId") or "unknown"
views = int(it.get("statistics", {}).get("viewCount", 0))
if cat not in category_stats:
category_stats[cat] = {"count": 0, "views": 0}
category_stats[cat]["count"] += 1
category_stats[cat]["views"] += views
# views 降順
ranked = sorted(category_stats.items(), key=lambda kv: kv[1]["views"], reverse=True)
return ranked

View File

@ -0,0 +1,84 @@
import os
import yt_dlp
from lib.custom_logger import get_logger
logger = get_logger()
class ApiYoutubeDownloader:
"""
YouTube動画ダウンロードクラス
Notes:
- yt_dlpライブラリを使用してYouTube動画をダウンロードするためのクラス
- `pip install yt_dlp` が必要です
- 著作権に注意して使用してください
"""
@classmethod
def download_video(cls, video_url:str, output_dir:str="downloads"):
"""
YouTube動画をダウンロードする
Args:
video_url (str): ダウンロードするYouTube動画のURLまたはVideo_ID
output_dir (str): ダウンロードした動画の保存先パスディレクトリデフォルトは "downloads"
Returns:
str: ダウンロードした動画のファイルパス
"""
logger.info(f"Downloading video from URL: {video_url}")
os.makedirs(output_dir, exist_ok=True)
if not video_url.startswith("http"):
video_url = f"https://www.youtube.com/watch?v={video_url}"
ydl_opts = {
"outtmpl": os.path.join(output_dir, "%(title)s [%(id)s].%(ext)s"),
# 最良の映像+音声を結合。必要に応じて 'mp4' 固定に再エンコード可
"format": "bv*+ba/b",
"merge_output_format": "mp4",
"noprogress": False,
"quiet": False,
"restrictfilenames": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
return ydl.prepare_filename(ydl.extract_info(video_url, download=False))
@classmethod
def download_audio(cls, video_url: str, output_dir: str = "downloads") -> str:
"""元の音声を変換せず保存(最良の音声トラックをそのまま)"""
os.makedirs(output_dir, exist_ok=True)z
if not video_url.startswith("http"):
video_url = f"https://www.youtube.com/watch?v={video_url}"
ydl_opts = {
"outtmpl": os.path.join(output_dir, "%(title)s [%(id)s].%(ext)s"),
# 音声トラックを優先的に取る
"format": "bestaudio/best",
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192", # 128/192/320 kbps
}
],
# 互換性重視で 44.1kHz / ステレオにする
"postprocessor_args": ["-ar", "44100", "-ac", "2"],
"prefer_ffmpeg": True,
"restrictfilenames": True,
"noprogress": False,
"quiet": False,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
# 最終ファイルパスを安全に取得
path = (
(info.get("requested_downloads") or [{}])[0].get("filepath")
or info.get("filepath")
or info.get("_filename")
)
return path

View File

@ -8,6 +8,9 @@ from urllib.parse import quote
# TWEET_RX = re.compile(r"/i/api/graphql/.+/(TweetDetail|TweetResultByRestId|ConversationTimeline)")
TWEET_RX = re.compile(r"/i/api/graphql/.+/(TweetDetail|TweetResultByRestId|ConversationTimeline|SearchTimeline)")
TREND_COUNT_RX = re.compile(r"(?P<num>(?:\d{1,3}(?:,\d{3})+|\d+(?:\.\d+)?))(?:\s*[KkMm万億])?|\d+件の投稿")
def _sg(d, path, default=None):
cur = d
@ -152,6 +155,185 @@ async def _fill_with_scroll(page, base_list, limit, tries=5):
out = list(items.values()); out.sort(key=k, reverse=True)
return out[:limit]
# trend情報の数値パース
def _parse_count(text: str) -> int | None:
"""
'12.3K posts' / '7,654 posts' / '1.2M posts' / '2.3万 件の投稿' / '12件の投稿' をざっくり整数へ
言語表記ゆれが多いので取り切れない場合は None
"""
if not text:
return None
t = text.replace("\u202f", " ").replace("\xa0", " ")
m = TREND_COUNT_RX.search(t)
if not m:
return None
raw = m.group(0)
# 万/億 対応(日本語)
if "" in raw:
try:
num = float(re.sub(r"[^\d\.]", "", raw))
return int(num * 10_000)
except Exception:
return None
if "" in raw:
try:
num = float(re.sub(r"[^\d\.]", "", raw))
return int(num * 100_000_000)
except Exception:
return None
# 英語K/M
if re.search(r"[Kk]\b", raw):
try:
num = float(re.sub(r"[^\d\.]", "", raw))
return int(num * 1_000)
except Exception:
return None
if re.search(r"[Mm]\b", raw):
try:
num = float(re.sub(r"[^\d\.]", "", raw))
return int(num * 1_000_000)
except Exception:
return None
# カンマ区切り or 素の数字 or 「件の投稿」
try:
digits = re.sub(r"[^\d]", "", raw)
return int(digits) if digits else None
except Exception:
return None
async def _scrape_trend_cards(page):
await page.wait_for_selector('[data-testid="trend"]', timeout=10_000)
return await page.evaluate(r"""
() => {
const cleanup = s => (s || '').replace(/[\u202f\xa0]/g, ' ').trim();
const isCountText = t => /posts|件の投稿/.test(t);
const isPureDigits = t => /^\d+$/.test((t||'').trim());
const isDot = t => (t||'').trim() === '·';
const isLabelish = t => /Trending|トレンド|ニュース|エンタメ|スポーツ|政治/i.test(t);
const stripDotParts = t => {
const parts = (t || '').split('·').map(p => cleanup(p)).filter(Boolean);
const good = parts.filter(p => !isPureDigits(p) && !isCountText(p) && !isLabelish(p));
return good.join(' ').trim() || t;
};
const absolutize = href => {
if (!href) return null;
if (/^https?:/i.test(href)) return href;
return href.startsWith('/') ? ('https://x.com' + href) : ('https://x.com/' + href);
};
const pickAnchor = el => {
// あれば a[href] を使う地域やUIによって付くこともある
const sel = [
'a[href*="/hashtag/"]',
'a[href*="/search?"]',
'a[href^="/i/events/"]',
'a[href]'
];
for (const s of sel) {
const a = el.querySelector(s);
if (a) return a;
}
return null;
};
const nameFromHref = href => {
try {
const u = new URL(href);
if (/\/hashtag\//.test(u.pathname)) {
const tag = u.pathname.split('/').pop();
if (tag) return '#' + decodeURIComponent(tag);
}
if (u.pathname === '/search' && u.searchParams.has('q')) {
const q = u.searchParams.get('q') || '';
return decodeURIComponent(q);
}
} catch (_) {}
return null;
};
const titleFromSpans = el => {
// タイトル候補#〜 を最優先)
const spans = Array.from(el.querySelectorAll('span')).map(s => cleanup(s.textContent)).filter(Boolean);
// 1) まず #ハッシュタグ
const hash = spans.find(t => t.startsWith('#') && !isLabelish(t));
if (hash) return hash;
// 2) 見出しロールの短文
const heading = el.querySelector('[role="heading"]');
if (heading) {
const hs = Array.from(heading.querySelectorAll('span')).map(s => cleanup(s.textContent)).filter(Boolean);
const h = hs.find(t => !isLabelish(t) && !isPureDigits(t) && !isCountText(t) && !isDot(t) && t.length <= 80);
if (h) return stripDotParts(h);
}
// 3) span 全体から拾う
const cand = spans.find(t =>
!isLabelish(t) && !isPureDigits(t) && !isCountText(t) && !isDot(t) && t.length <= 80
);
return cand ? stripDotParts(cand) : null;
};
const makeUrlFromName = name => {
if (!name) return null;
if (name.startsWith('#')) {
const tag = name.slice(1);
return 'https://x.com/hashtag/' + encodeURIComponent(tag);
}
return 'https://x.com/search?q=' + encodeURIComponent(name) + '&src=trend_click';
};
const parseCount = text => {
if (!text) return null;
const t = text.replace(/\u202f|\xa0/g, ' ');
const m = (t.match(/(\d{1,3}(?:,\d{3})+|\d+(?:\.\d+)?)(?:\s*([KkMm]||))?/) || [])[0];
if (!m) return null;
if (/件の投稿/.test(t)) {
const d = (t.match(/\d[\d,]*/)||[''])[0].replace(/,/g,'');
return d ? parseInt(d,10) : null;
}
const num = parseFloat(m.replace(/,/g,''));
if (/[Kk]\b/.test(m)) return Math.round(num * 1_000);
if (/[Mm]\b/.test(m)) return Math.round(num * 1_000_000);
if (//.test(m)) return Math.round(num * 10_000);
if (//.test(m)) return Math.round(num * 100_000_000);
return Math.round(num);
};
const cards = Array.from(document.querySelectorAll('[data-testid="trend"]'));
const out = [];
let rank = 1;
for (const el of cards) {
const a = pickAnchor(el);
const href = a ? a.getAttribute('href') : null;
const urlFromA = absolutize(href);
const spans = Array.from(el.querySelectorAll('span')).map(s => cleanup(s.textContent)).filter(Boolean);
const countText = spans.find(t => /posts|件の投稿/.test(t)) || null;
// name URL優先DOM
let name = urlFromA ? nameFromHref(urlFromA) : null;
if (!name) name = titleFromSpans(el);
// URL name から生成<a> が無くてもOK
const url = urlFromA || makeUrlFromName(name);
const search_url = name ? ('https://x.com/search?q=' + encodeURIComponent(name) + '&src=trend_click') : null;
out.push({
name,
rank: rank++,
url,
search_url,
tweet_count_text: countText,
tweet_count: parseCount(countText || '')
});
}
return out;
}
""")
class XScraper:
"""
@ -255,3 +437,39 @@ class XScraper:
url = f"https://x.com/{username.lstrip('@')}"
first = await _goto_and_scrape(self.page, url)
return await _fill_with_scroll(self.page, first, limit)
async def get_trends(self, limit: int = 10) -> list[dict]:
"""
トレンド一覧を取得ログイン済み推奨
まず DOM で抜き見つからない場合は少しスクロールして追加読み込み
"""
await self.page.goto("https://x.com/explore/tabs/trending", wait_until="domcontentloaded")
await asyncio.sleep(1.2) # 初期XHR待ち
# 1回目抽出
items = await _scrape_trend_cards(self.page)
# まだ足りなければ軽くスクロール2回
tries = 0
while len(items) < limit and tries < 2:
await self.page.evaluate("window.scrollBy(0, document.body.scrollHeight);")
await asyncio.sleep(1.0)
more = await _scrape_trend_cards(self.page)
# rank重複が出やすいので name/url で重複排除
seen = {(i.get("name"), i.get("url")) for i in items}
for m in more:
key = (m.get("name"), m.get("url"))
if key not in seen:
items.append(m)
seen.add(key)
tries += 1
# tweet_count を数値化
for it in items:
it["tweet_count"] = _parse_count(it.get("tweet_count_text") or "")
# rankでソートして切り詰め
items.sort(key=lambda x: x.get("rank") or 9999)
return items[:limit]

View File

@ -0,0 +1,253 @@
import os
import re
import unicodedata
import jquantsapi
import pandas as pd
from lib.custom_logger import get_logger
logger = get_logger()
# 依存関係版(曖昧一致/かな変換が必要なら)
try:
import jaconv
from rapidfuzz import fuzz
_HAS_FUZZ = True
except Exception:
# 依存が未導入でも部分一致/正規表現は動かす
logger.warning("jaconv, rapidfuzz not found. fuzzy search is disabled.")
_HAS_FUZZ = False
def _norm_basic(s: str) -> str:
"""依存なしの基本正規化(全角→半角、大小無視)"""
if s is None:
return ""
return unicodedata.normalize("NFKC", str(s)).casefold()
def _norm_full(s: str) -> str:
"""依存ありの強力正規化(かな/カナも揃える)"""
if s is None:
return ""
s = unicodedata.normalize("NFKC", str(s))
if 'jaconv' in globals():
s = jaconv.kata2hira(jaconv.z2h(s, kana=True, digit=True, ascii=True))
return s.casefold()
class ApiJQuants:
"""
J-Quants の銘柄マスタ取得検索
"""
JPX_JQUANTS_REFRESH_TOKEN = os.getenv("JPX_JQUANTS_REFRESH_TOKEN")
CSV_FILE_PATH = os.getenv("JPX_JQUANTS_LIST_DATA_PATH", "data/fin/bronze/listed_info.csv")
def __init__(self, csv_file: str | None = None):
self.cli = jquantsapi.Client(refresh_token=self.JPX_JQUANTS_REFRESH_TOKEN)
self.csv_file = csv_file or self.CSV_FILE_PATH
self._listed_df: pd.DataFrame | None = None
# --- 公開API ----------------------------------------------------
@classmethod
def get_listed_info(cls, save_csv: str | None = None) -> pd.DataFrame:
"""
銘柄一覧の取得J-Quantsから必要ならCSV保存も
Returns: pd.DataFrame
"""
client = jquantsapi.Client(refresh_token=cls.JPX_JQUANTS_REFRESH_TOKEN)
df = client.get_listed_info().copy()
if save_csv:
os.makedirs(os.path.dirname(save_csv), exist_ok=True)
df.to_csv(save_csv, index=False, encoding="utf-8-sig")
logger.info(f"銘柄一覧をCSVに保存しました: {save_csv}")
return df
@classmethod
def get_price_daily_quotes(
cls,
code: str,
start_date: str = None,
end_date: str = None,
save_csv:str = None,
) -> pd.DataFrame:
"""
指定銘柄の日次株価情報を取得
Args:
code (str): 銘柄コード4
start_date (str): 取得開始日 (YYYY-MM-DD)
end_date (str): 取得終了日 (YYYY-MM-DD)
Returns: pd.DataFrame
"""
client = jquantsapi.Client(refresh_token=cls.JPX_JQUANTS_REFRESH_TOKEN)
df:pd.DataFrame = client.get_prices_daily_quotes(code=code, from_yyyymmdd=start_date, to_yyyymmdd=end_date).copy()
if save_csv:
os.makedirs(os.path.dirname(save_csv), exist_ok=True)
df.to_csv(save_csv, index=False, encoding="utf-8-sig")
logger.info(f"{code}の日次株価情報をCSVに保存しました: {save_csv}")
return df
@classmethod
def get_fins_statements(
cls,
code: str,
priod_date: str = None,
save_csv:str = None,
) -> pd.DataFrame:
"""
直近の財務諸表を取得する
Args:
code (str): 銘柄コード4
priod_date (str): 取得対象の決算日 (YYYYMMDD)Noneの場合はその銘柄のすべての財務諸表過去の四半期年度ごとが返ります
Returns: pd.DataFrame
"""
client = jquantsapi.Client(refresh_token=cls.JPX_JQUANTS_REFRESH_TOKEN)
df:pd.DataFrame = client.get_fins_statements(code=code, data_yyyymmdd=priod_date).copy()
if save_csv:
os.makedirs(os.path.dirname(save_csv), exist_ok=True)
df.to_csv(save_csv, index=False, encoding="utf-8-sig")
logger.info(f"{code}の財務諸表をCSVに保存しました: {save_csv}")
return df
@classmethod
def get_fins_announcement(
cls,
save_csv:str = None,
) -> pd.DataFrame:
"""
指定銘柄の有価証券報告書等の提出予定情報を取得
Args:
# code (str): 銘柄コード4桁
# start_date (str): 取得開始日 (YYYY-MM-DD)
# end_date (str): 取得終了日 (YYYY-MM-DD)
save_csv (str): 保存先のCSVファイルパス
Returns: pd.DataFrame
"""
client = jquantsapi.Client(refresh_token=cls.JPX_JQUANTS_REFRESH_TOKEN)
df:pd.DataFrame = client.get_fins_announcement().copy()
if save_csv:
os.makedirs(os.path.dirname(save_csv), exist_ok=True)
df.to_csv(save_csv, index=False, encoding="utf-8-sig")
logger.info(f"有価証券報告書等の提出予定情報をCSVに保存しました: {save_csv}")
return df
def load_listed(self, force: bool = False) -> pd.DataFrame:
"""
銘柄一覧を読み込みCSVがあればCSVなければAPI正規化列もここで必ず作る
"""
if self._listed_df is None or force:
if os.path.exists(self.csv_file) and not force:
df = pd.read_csv(self.csv_file, dtype=str) # コード等の先頭ゼロ欠落対策で文字列
else:
df = self.get_listed_info(save_csv=self.csv_file)
df = self._prepare_norm_columns(df)
self._listed_df = df
return self._listed_df
def search_companies(
self,
q: str,
mode: str = "partial", # "partial" | "regex" | "fuzzy"
fields: list[str] = None,
limit: int = 50,
threshold: int = 70, # fuzzy のしきい値
) -> pd.DataFrame:
"""
銘柄一覧から企業を検索する
Args:
q (str): 検索クエリ
mode (str): 検索モード
- "partial": 部分一致正規化して大小/全半/カナ差異を吸収
- "regex": 正規表現高度なパターン検索
- "fuzzy": あいまい一致表記ゆれやタイポに強い
fields (list[str] | None): 検索対象のフィールド
: ["_CompanyName_norm", "_CompanyNameEnglish_norm", "_SectorName_norm", "_Code_norm"]
None の場合は上記をデフォルトとする
limit (int): 取得する最大件数
threshold (int): fuzzy モードで採用するスコアのしきい値 (0100)
Returns:
pd.DataFrame: 検索結果の DataFrame を返す
"""
df = self.load_listed()
if fields is None:
fields = ["_CompanyName_norm", "_CompanyNameEnglish_norm", "_SectorName_norm", "_Code_norm"]
q_norm = _norm_full(q)
if mode == "partial":
mask = pd.Series(False, index=df.index)
for col in fields:
if col in df.columns:
mask = mask | df[col].str.contains(re.escape(q_norm), na=False)
return df[mask].head(limit).copy()
if mode == "regex":
pattern = re.compile(q, flags=re.IGNORECASE)
mask = pd.Series(False, index=df.index)
for raw_col in ["Code", "CompanyName", "CompanyNameEnglish", "SectorName"]:
if raw_col in df.columns:
mask = mask | df[raw_col].astype(str).str.contains(pattern, na=False)
return df[mask].head(limit).copy()
if mode == "fuzzy":
if not _HAS_FUZZ:
raise RuntimeError("fuzzy 検索には 'rapidfuzz''jaconv' のインストールが必要です。")
key_series = (
df.get("_CompanyName_norm", "").astype(str) + " " +
df.get("_CompanyNameEnglish_norm", "").astype(str) + " " +
df.get("_SectorName_norm", "").astype(str) + " " +
df.get("_Code_norm", "").astype(str)
)
scores = key_series.map(lambda s: fuzz.token_set_ratio(q_norm, s))
hit = (
df.assign(_score=scores)
.query("_score >= @threshold") # threshold を利用
.sort_values("_score", ascending=False)
)
return hit.head(limit).copy()
raise ValueError("mode must be one of: partial | regex | fuzzy")
# ===== 内部ユーティリティ =====
def _prepare_norm_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""検索に使う正規化列を作成。API取得でもCSV読込でも必ず通す。"""
df = df.copy()
# 文字列化&欠損埋め
for col in ["Code", "CompanyName", "CompanyNameEnglish", "SectorName", "MarketCode"]:
if col in df.columns:
df[col] = df[col].astype(str).fillna("")
norm = _norm_full if _HAS_FUZZ else _norm_basic
if "Code" in df.columns:
df["_Code_norm"] = df["Code"].map(norm)
if "CompanyName" in df.columns:
df["_CompanyName_norm"] = df["CompanyName"].map(norm)
if "CompanyNameEnglish" in df.columns:
df["_CompanyNameEnglish_norm"] = df["CompanyNameEnglish"].map(norm)
if "SectorName" in df.columns:
df["_SectorName_norm"] = df["SectorName"].map(norm)
if "MarketCode" in df.columns:
df["_MarketCode_norm"] = df["MarketCode"].map(norm)
return df

View File

@ -0,0 +1,148 @@
import requests
import os
from lib.custom_logger import get_logger
logger = get_logger()
class ApiNewsAPIOrg:
"""
NewsAPIOrgを操作するクラス
Notes:
- NewsAPIOrg APIを使用してニュース記事を取得するためのクラス
- APIキーは環境変数 `NEWS_API_ORG_API_KEY` から取得されます
- 詳細なAPIドキュメントは https://newsdata.io/docs/api を参照してください
"""
NEWS_API_ORG_API_KEY = os.getenv("NEWS_API_ORG_API_KEY")
@classmethod
def get_news(cls,
query: str = None,
from_date: str = None,
to_date: str = None,
language: str = "jp",
domains: str = None,
excludeDomains: str = None,
pageSize: int = None,
page: int = None,
sortBy: str = "publishedAt",):
"""
NewsAPIOrgからニュース記事を取得する
世界中のニュースサイトやブログ(15万以上のソース)に公開された記事を全文検索するためのエンドポイント
Args:
query (str): 検索クエリ
from_date (str): 取得開始日 (YYYY-MM-DD)
to_date (str): 取得終了日 (YYYY-MM-DD)
language (str): 記事の言語 (: "jp" 日本語)
domains (str): 取得対象のドメイン (カンマ区切り
excludeDomains (str): 除外するドメイン (カンマ区切り)
sortBy (str): ソート順 (relevancy, popularity, publishedAt)
"""
url = "https://newsapi.org/v2/everything"
params = {
"apikey": cls.NEWS_API_ORG_API_KEY,
"q" : query,
"from": from_date,
"to": to_date,
"language": language,
"domains": domains,
"excludeDomains": excludeDomains,
"pageSize": pageSize,
"page": page,
"sortBy": sortBy,
}
# None値は送らない
params = {k: v for k, v in params.items() if v is not None}
response = requests.get(url,params=params)
response.raise_for_status()
json_data = response.json()
if not json_data.get("status") == "ok":
logger.error(f"NewsAPIOrg API Error: {json_data.get('message')}")
raise Exception(f"NewsAPIOrg API Error: {json_data.get('message')}")
logger.debug(f"NewsAPIOrg API Response: {json_data}")
return json_data
@classmethod
def get_headline_news(cls,
query: str = None,
country: str = "jp",
category: str = "technology",
source: str = None,
pageSize: int = None,
page: int = None,
):
"""
NewsAPIOrgから最新のニュース記事を取得する
世界中のニュースサイトから 最新のトップニュース速報(breaking news) を取得するための API
Args:
query (str): 検索クエリ
country (str): 国コード (: "jp" 日本)
category (str): カテゴリ (business, entertainment, general, health, science,
sports, technology)
pageSize (int): 取得する記事の最大数 (1-100)
page (int): ページ番号 (1から始まる)
"""
url = "https://newsapi.org/v2/top-headlines"
params = {
"apikey": cls.NEWS_API_ORG_API_KEY,
"q" : query,
"country": country,
"category": category,
"source": source,
"pageSize": pageSize,
"page": page,
}
# None値は送らない
params = {k: v for k, v in params.items() if v is not None}
response = requests.get(url,params=params)
response.raise_for_status()
json_data = response.json()
if not json_data.get("status") == "ok":
logger.error(f"NewsAPIOrg API Error: {json_data.get('message')}")
raise Exception(f"NewsAPIOrg API Error: {json_data.get('message')}")
logger.debug(f"NewsAPIOrg API Response: {json_data}")
return json_data
@classmethod
def get_sources(cls,
country: str = "jp",
language: str = "jp",
category: str = "technology",
):
"""
NewsAPIOrgからニュースソースを取得する
ニュースソース(ニュースサイトやブログ)のリストを取得するためのエンドポイント
Args:
country (str): 国コード (: "jp" 日本)
language (str): 記事の言語 (: "jp" 日本語)
category (str): カテゴリ (business, entertainment, general, health, science,
sports, technology)
"""
url = "https://newsapi.org/v2/top-headlines/sources"
params = {
"apikey": cls.NEWS_API_ORG_API_KEY,
"country": country,
"language": language,
"category": category,
}
# None値は送らない
params = {k: v for k, v in params.items() if v is not None}
response = requests.get(url,params=params)
response.raise_for_status()
json_data = response.json()
if not json_data.get("status") == "ok":
logger.error(f"NewsAPIOrg API Error: {json_data.get('message')}")
raise Exception(f"NewsAPIOrg API Error: {json_data.get('message')}")
logger.debug(f"NewsAPIOrg API Response: {json_data}")
return json_data