Yoloで画像分析を実行する(書記実装)

This commit is contained in:
ry.yamafuji 2025-09-12 08:48:34 +09:00
parent ce369e2fa3
commit 95d16825dc
27 changed files with 908 additions and 3 deletions

1
.gitignore vendored
View File

@ -15,7 +15,6 @@ dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/

View File

@ -2,6 +2,8 @@
画像認識及び検知、セクションなどの視覚研究ラボ
![APP_TOP](docs/images/app01.png)
## Exec
```bat
@ -21,3 +23,13 @@ pip install -r requirements.txt
```sh
python -m pip install --upgrade --extra-index-url https://PySimpleGUI.net/install PySimpleGUI
```
## Models
### 物体検知
#### **Yolo Ver8**
* 画像認識が既に揃っており、簡単に物体検知が実装できます
* 処理が軽く、リアルタイムでも可能です

BIN
docs/images/app01.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 589 KiB

View File

@ -1,5 +1,11 @@
# 画像解析ツール
## 機能
* 画像・動画プレビュー機能
* ドラッグ&リリース対応
* 物体検知機能
## Library
@ -49,3 +55,19 @@
* 実行ファイルサイズが大きくなりがち(PyInstallerで数十MB以上)
* インストールや依存関係が重い(環境によってバージョン注意)
* モバイル対応は限定的(基本はデスクトップ専用)
## モデルの選定
### 物体認識&検出
* 商用や配布を考えるなら YOLOv8 / SSD / Faster R-CNN / Mask R-CNN / DETR
| モデル | 特徴 | 速度 | 精度 | 向いてる場面 |
| ---------------- | -------------------------------- | ---- | ---- | ------------------------ |
| **YOLO** | 一回で検出、軽量&高速 | ◎ | ◎ | リアルタイム監視 |
| **SSD** | シンプルで軽量 | ○ | ○ | モバイル端末 |
| **Faster R-CNN** | 二段階、高精度 | △ | ◎ | 精度最優先(医療、研究) |
| **Mask R-CNN** | 輪郭まで検出(セグメンテーション) | △ | ◎ | 領域抽出、背景切り抜き |
| **DETR** | Transformerベース | △ | ◎ | 最新研究、拡張性 |

BIN
docs/sample/car.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

110
docs/yolo.md Normal file
View File

@ -0,0 +1,110 @@
### YOLOv8 のクラスについて
* YOLOv8 は **学習済みの重み**によって、検出できるクラス(カテゴリ)が決まります。
* 公式で配布されている `yolov8n.pt`, `yolov8s.pt` などは **COCO データセット (COCO-2017, 80クラス)** で学習されています。
* つまり「何を検知できるか」は **モデルに埋め込まれているクラスラベルリスト**に依存します。
### 🔹 COCO の代表的なクラス(抜粋)
* 人物: `person`
* 車両: `car`, `truck`, `bus`, `motorbike`, `bicycle`
* 動物: `dog`, `cat`, `horse` など
* 家具・物体: `chair`, `sofa`, `tv`, `cell phone` など
(全部で 80 種類)
### ✅ `allowed_classes` の意味
* これは **あなた側のフィルタ**です。
* YOLO は 80 クラスを全部返してきますが、その中で「車両だけ欲しい」ときに
```python
allowed_classes = {"car", "truck", "bus", "motorbike", "bicycle"}
```
としてフィルタリングします。
### 🔹 例
```python
detections = self.detector.detect(img)
detections = [d for d in detections if d.label in allowed_classes]
```
---
## 転移学習
YOLOv8 は **転移学習 (fine-tuning)** をサポートしていて、
COCOの80クラスとは別に、自分で定義したクラスで再学習させられます。
たとえば「細菌」や「顕微鏡画像の特定形状」なども検出対象にできます。
## 新しいクラスを学習する流れ(YOLOv8)
1. **データセットの用意**
* 画像ファイル(`.jpg`, `.png` など)
* アノテーション(バウンディングボックス付きラベル)を YOLO形式で準備
```
<class_id> <x_center> <y_center> <width> <height>
```
(座標は画像サイズで 01 に正規化)
* ディレクトリ構造の例:
```
dataset/
├─ images/
│ ├─ train/
│ └─ val/
└─ labels/
├─ train/
└─ val/
```
1. **データ設定ファイルを作成**
* `bacteria.yaml` のような YAML を書く
```yaml
path: dataset
train: images/train
val: images/val
nc: 1 # クラス数
names: ['bacteria']
```
1. **学習を実行**
```bash
yolo detect train data=bacteria.yaml model=yolov8n.pt epochs=50 imgsz=640
```
* `yolov8n.pt` は小さいモデル(転移学習の出発点)
* `epochs=50` は繰り返し回数(データ量次第で増減)
* `imgsz=640` は入力画像サイズ
1. **学習済みモデルを使う**
```bash
yolo detect predict model=runs/detect/train/weights/best.pt source=your_test_image.jpg
```
* この `best.pt` が「細菌クラス専用モデル」になります
---
### 応用ポイント
* **複数クラス**も可能(例: `['bacteria', 'fungus', 'virus']`)
* **COCO + 自作データで再学習** → 汎用物体検知 + 特殊対象の両立もできる
* **顕微鏡画像**のように特殊なデータでも、十分なアノテーションがあれば対応可能
### 🔹 注意点
* 学習には **GPU環境**(Colab, Kaggle, 自前マシンなど)が必要
* データが少ない場合は、**転移学習**(小さいデータで fine-tuning)が特に効果的
* 正確なアノテーション(ラベル付け)が精度に直結します

View File

@ -39,7 +39,7 @@ class MainWindow(QWidget):
right_layout.addWidget(btn_quit)
right_layout.addStretch()
# --- メインレイアウト(横並び) ---
# --- メインレイアウト(横並び) ---
main_layout = QHBoxLayout()
main_layout.addLayout(left_layout, 1)
main_layout.addLayout(right_layout, 1)

View File

@ -1,2 +1,4 @@
ultralytics
opencv-python
PySide6
PySide6
numpy

18
src/app.py Normal file
View File

@ -0,0 +1,18 @@
import sys
from PySide6.QtWidgets import QApplication
from lib.custom_logger import get_logger
logger = get_logger()
logger.setLevel("DEBUG")
from app_status import AppStatus
from gui.windows.main_win import MainWindow
if __name__ == "__main__":
app = QApplication(sys.argv)
AppStatus()
w = MainWindow()
logger.info("Starting application")
w.show()
sys.exit(app.exec())

94
src/app_status.py Normal file
View File

@ -0,0 +1,94 @@
import cv2
from lib.singleton import Singleton
class AppStatus(Singleton):
"""アプリケーションの状態を管理するシングルトンクラス"""
def __init__(self):
if hasattr(self, '_initialized') and self._initialized:
return # すでに初期化済みなら何もしない
self.status = {}
self._initialized = True
def reset(self):
"""状態をリセット"""
self.status.clear()
def set_status(self, key, value):
self.status[key] = value
def get_status(self, key, default=None):
return self.status.get(key, default)
@property
def is_video(self)-> bool:
"""入力動画ファイルかどうか判定"""
return self.get_status('is_video', False)
@is_video.setter
def is_video(self, value: bool):
self.set_status('is_video', value)
@property
def current_media_path(self) -> str:
"""現在のメディアファイルのパス"""
return self.get_status('current_media_path', "")
@current_media_path.setter
def current_media_path(self, path: str):
self.set_status('current_media_path', path)
@property
def original_image(self)-> cv2.typing.MatLike :
"""元画像(処理用データの基準)"""
return self.get_status('original_image', None)
@original_image.setter
def original_image(self, img: cv2.typing.MatLike):
self.set_status('original_image', img)
@property
def processed_image(self)-> cv2.typing.MatLike :
"""処理後の画像"""
return self.get_status('processed_image', None)
@processed_image.setter
def processed_image(self, img: cv2.typing.MatLike):
self.set_status('processed_image', img)
@property
def processed_scale(self) -> float:
"""画像のスケール"""
return self.get_status('processed_scale', 1.0)
@processed_scale.setter
def processed_scale(self, value: float):
self.set_status('processed_scale', value)
@property
def detected_objects(self) -> list:
"""検出結果のリスト"""
return self.get_status('detected_objects', [])
@detected_objects.setter
def detected_objects(self, dets: list):
self.set_status('detected_objects', dets)
@property
def detection_count(self) -> int:
"""検出結果の数"""
return len(self.detected_objects) if self.detected_objects else 0
@property
def overlay_image(self) -> cv2.typing.MatLike:
"""オーバーレイ画像"""
return self.get_status('overlay_image', None)
@overlay_image.setter
def overlay_image(self, img: cv2.typing.MatLike):
self.set_status('overlay_image', img)

11
src/domain/type.py Normal file
View File

@ -0,0 +1,11 @@
from dataclasses import dataclass
@dataclass(frozen=True)
class Box:
x1: int; y1: int; x2: int; y2: int
@dataclass(frozen=True)
class Detection:
label: str
score: float
box: Box

View File

@ -0,0 +1,155 @@
import os
import numpy as np
import cv2
from PySide6.QtCore import Qt
from pathlib import Path
from PySide6.QtGui import QDragEnterEvent, QDropEvent
from PySide6.QtWidgets import (
QWidget,QLabel,QPushButton,
QVBoxLayout,QHBoxLayout,
QFileDialog, QMessageBox
)
from app_status import AppStatus
from pipeline.app_pipeline import ImagePipeline
from lib.common_widget import bgr_to_qpixmap
IMAGE_EXT = {".jpg", ".jpeg", ".png", ".bmp", ".gif"}
VIDEO_EXT = {".mp4", ".mov", ".avi", ".mkv", ".m4v"}
from lib.custom_logger import get_logger
logger = get_logger()
class MediaViewer(QWidget):
"""画像と動画を共通で扱う左側ビューワ"""
def __init__(self, parent=None):
super().__init__(parent)
self.status = AppStatus()
# ボタンバー
self.btn_bar = QHBoxLayout()
self.btn_open = QPushButton("Open")
self.btn_play = QPushButton("Play")
self.btn_pause = QPushButton("Pause")
self.btn_stop = QPushButton("Stop")
self.btn_clear = QPushButton("Clear")
self.btn_analyze = QPushButton("Analyze")
self.update_button_state()
for b in (
self.btn_open, self.btn_play, self.btn_pause, self.btn_stop,
self.btn_clear, self.btn_analyze):
self.btn_bar.addWidget(b)
self.btn_bar.addStretch()
self.btn_open.clicked.connect(self.open_file)
self.label = QLabel("Drop files here\n(画像/動画)")
self.label.setAlignment(Qt.AlignCenter)
self.label.setStyleSheet("background:#222;color:#aaa;border:1px solid #444;")
self.label.setMinimumSize(640, 360)
lay = QVBoxLayout(self)
lay.addLayout(self.btn_bar)
lay.addWidget(self.label, 1)
# events
self.btn_analyze.clicked.connect(self.on_analyze_view)
def load(self, path: str):
"""画像または動画を読み込む"""
self.status.current_media_path = path
suffix = Path(path).suffix.lower()
if suffix in IMAGE_EXT:
logger.info(f"Loading image: {path}")
self.status.is_video = False
# numpy 画像読み込み(ファイルをバイナリuint8)で読み込む
data = np.fromfile(path, dtype=np.uint8)
# OpenCVのデコーダに渡して画像を復元
# (cv2.IMREAD_COLOR で3チャンネル=BGR,8bit/チャネル (uint8)
self.status.original_image = cv2.imdecode(data, cv2.IMREAD_COLOR)
if self.status.original_image is None:
logger.error(f"Failed to decode image: {path}")
# アラート表示
QMessageBox.critical(self, "Error", f"画像を読み込めませんでした:\n{path}")
return
# 画像をQPixmapに変換して表示 アスペクト比を維持してリサイズ
self.label.setPixmap(bgr_to_qpixmap(self.status.original_image).scaled(
self.label.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation))
self.update_button_state()
# 画像解析パイプラインを実行
pipeline = ImagePipeline()
pipeline.run()
elif suffix in VIDEO_EXT:
logger.info(f"Loading video: {path}")
self.status.is_video = True
else:
logger.warning(f"Unsupported media type: {path}")
self.label.setText("未対応の形式です")
def on_analyze_view(self):
# 画像をoverlay表示
if self.status.overlay_image is None:
logger.warning("No overlay image to display")
# アラート表示
QMessageBox.critical(self, "Error", f"解析画像を読み込めませんでした")
return
# 画像をQPixmapに変換して表示 アスペクト比を維持してリサイズ
self.label.setPixmap(bgr_to_qpixmap(self.status.overlay_image).scaled(
self.label.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation))
def open_file(self):
"""ファイル選択ダイアログを開く"""
path, _ = QFileDialog.getOpenFileName(self, "Open media", "",
"Media (*.jpg *.jpeg *.png *.bmp *.gif *.mp4 *.mov *.avi *.mkv *.m4v)")
if path:
self.load(path)
def update_button_state(self):
"""ボタンの有効/無効を更新"""
if not self.status.current_media_path:
logger.debug("No media loaded, disabling buttons")
self.btn_open.setEnabled(True)
self.btn_play.setEnabled(False)
self.btn_pause.setEnabled(False)
self.btn_stop.setEnabled(False)
self.btn_clear.setEnabled(False)
self.btn_analyze.setEnabled(False)
return
if not self.status.is_video:
logger.debug("image loaded, disabling video buttons")
self.btn_open.setEnabled(True)
self.btn_play.setEnabled(False)
self.btn_pause.setEnabled(False)
self.btn_stop.setEnabled(False)
self.btn_clear.setEnabled(True)
self.btn_analyze.setEnabled(True)
# ---- D&D ----
# mainwindowにも処理が必要
def dragEnterEvent(self, e: QDragEnterEvent):
if e.mimeData().hasUrls():
e.acceptProposedAction()
def dropEvent(self, e: QDropEvent):
urls = e.mimeData().urls()
if not urls:
return
path = urls[0].toLocalFile()
if os.path.isfile(path):
self.load(path)

View File

@ -0,0 +1,6 @@
from PySide6.QtWidgets import QWidget
class RightPanel(QWidget):
def __init__(self, parent=None):
super().__init__(parent)

View File

@ -0,0 +1,76 @@
import os
from PySide6.QtGui import QAction,QDragEnterEvent, QDropEvent
from PySide6.QtWidgets import QMainWindow,QSplitter
from gui.widgets.media_viewr import MediaViewer
from gui.widgets.right_panel import RightPanel
from lib.custom_logger import get_logger
logger = get_logger()
class MainWindow(QMainWindow):
"""アプリメインウィンドウ"""
def __init__(self):
super().__init__()
self.setWindowTitle("CV Studio (Prototype)")
self.resize(1200, 700)
logger.info("MainWindow initialized")
# メニューを構築
self._build_menu()
# 半々に分割
self.viewer = MediaViewer()
self.right = RightPanel()
splitter = QSplitter()
splitter.addWidget(self.viewer)
splitter.addWidget(self.right)
splitter.setSizes([self.width()//2, self.width()//2])
self.setCentralWidget(splitter)
def _build_menu(self):
# File
menu_file = self.menuBar().addMenu("File")
open_act = QAction("Open...", self)
# open_act.triggered.connect(self.viewer.open_file)
menu_file.addAction(open_act)
# hr
menu_file.addSeparator()
# exit
exit_act = QAction("Exit", self)
exit_act.setShortcut("Ctrl+Q")
exit_act.triggered.connect(self.on_exit)
menu_file.addAction(exit_act)
logger.info("Main Window Menu built.")
# events----
def on_exit(self):
"""アプリ終了アクション"""
logger.info("Exit action triggered")
self.close() # QMainWindow.close() を呼ぶとアプリ終了
# ---- D&D ----
# ルートウィンドウにもD&D(フォルダから直接落とせるように)
def dragEnterEvent(self, e: QDragEnterEvent):
if e.mimeData().hasUrls():
e.acceptProposedAction()
def dropEvent(self, e: QDropEvent):
urls = e.mimeData().urls()
if urls:
path = urls[0].toLocalFile()
if os.path.isfile(path):
self.viewer.load(path)

View File

@ -0,0 +1,171 @@
import numpy as np
import cv2
from typing import Iterable, Optional, Dict, Tuple, List
from jobs.job_base import JobBase
from domain.type import Detection,Box
from ultralytics import YOLO
class JobDetectObjectsForCar(JobBase):
"""
駐車場の画像から物体検出を行うジョブ
- YOLOv8を使用して車両を検出する
- 検出結果はstatus.detected_objectsに格納
"""
# モデルはクラス変数で共有(初回だけDL/ロード)
_model: Optional[YOLO] = None
_names: Optional[Iterable[str]] = None
# デフォルト設定
ALLOWED_CLASSES = {"car", "truck", "bus", "motorbike", "bicycle"}
def __init__(
self,
weights: str = "yolov8n.pt",
conf: float = 0.35,
iou: float = 0.5,
allowed_classes: Optional[Iterable[str]] = None,
roi: Optional[Dict[str, float]] = None, # {"top":0~1, "bottom":0~1, "left":0~1, "right":0~1}
):
"""
Args:
conf (float): 信頼度の閾値(物体を正しく検知できたと判定する最低限の確率値)
iou (float): NMSのIoU閾値(重複する検出を抑制するための閾値)
allowed_classes (Optional[Iterable[str]]): 検出対象とするクラスのリスト(デフォルトは車両関連のみ)
roi (Optional[Dict[str, float]]): 関心領域(ROI)の指定画像の一部だけを検出対象にする場合に使用
Notes:
- confは0.0-1.0(: 0.35 35% 以上の確信度)
- 小さくすると 検知数が増える(でも誤検知も増える)
- 大きくすると 精度が高いものだけ残る(でも見逃しも増える)
- 駐車場で車両を数えるなら 0.3-0.5くらいが実用的
- iouは0.0-1.0(: 0.5 50% 以上重なっていると同じ物体とみなす)
- 小さくすると 似た位置にある物体を別々に検知しやすい(でも誤検知も増える)
- 大きくすると 似た位置にある物体をまとめて検知する(でも見逃しも増える)
- 一般的に 0.45-0.6 が推奨駐車場のように車が並ぶ環境だと 0.5くらいがバランス良いです
- allowed_classes を指定しない場合全クラスが検出対象になる
-
- roiは画像の一部だけを検出対象にする場合に使用座標は0.0-1.0の割合で指定
- : {"top":0.2, "bottom":0.8, "left":0.1, "right":0.9} は画像の中央部分を指定
- Noneの場合画像全体が検出対象になる
- キャッシュについて
- アプリ起動中はキャッシュされる(速い)
- アプリを終了再起動すると再ロードが必要(数秒で終わる)
- Ultralytics ~/.cache/ultralytics/ に保存します
"""
super().__init__(name=self.__class__.__name__)
self.description = "Detect Objects for Car Job"
self.weights = weights
self.conf = float(conf)
self.iou = float(iou)
self.allowed = set(allowed_classes) if allowed_classes else set(self.ALLOWED_CLASSES)
self.roi = roi # 省略可
# ---------- 内部ユーティリティ ----------
@classmethod
def _ensure_model(cls, weights: str):
if cls._model is None:
cls._model = YOLO(weights) # 初回は自動DL
# names は list か dict のことがある
cls._names = getattr(cls._model, "names", None)
return cls._model, cls._names
@staticmethod
def _apply_roi(img: np.ndarray, roi: Optional[Dict[str, float]]) -> Tuple[np.ndarray, Tuple[int, int]]:
if not roi:
return img, (0, 0)
h, w = img.shape[:2]
t = int(h * roi.get("top", 0.0))
b = int(h * (1.0 - roi.get("bottom", 0.0)))
l = int(w * roi.get("left", 0.0))
r = int(w * (1.0 - roi.get("right", 0.0)))
t = max(0, min(t, h-1)); b = max(t+1, min(b, h))
l = max(0, min(l, w-1)); r = max(l+1, min(r, w))
return img[t:b, l:r].copy(), (l, t)
@staticmethod
def _offset_boxes(dets: List[Detection], off: Tuple[int, int]) -> List[Detection]:
if off == (0, 0):
return dets
ox, oy = off
out: List[Detection] = []
for d in dets:
b = d.box
out.append(Detection(d.label, d.score, Box(b.x1+ox, b.y1+oy, b.x2+ox, b.y2+oy)))
return out
def execute(self):
self.logger.info(f"{self.name} execute started")
# ここにYOLOv8を使用した物体検出のコードを実装
# 検出結果をself.status.detected_objectsに格納
self.logger.info(f"{self.name} execute finished")
pass # 実装は後で追加
@staticmethod
def _draw(image_bgr: np.ndarray, dets: List[Detection]) -> np.ndarray:
out = image_bgr.copy()
for d in dets:
x1, y1, x2, y2 = d.box.x1, d.box.y1, d.box.x2, d.box.y2
cv2.rectangle(out, (x1, y1), (x2, y2), (0, 200, 0), 2)
cv2.putText(out, f"{d.label} {d.score:.2f}", (x1, max(0, y1 - 6)),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 200, 0), 2)
# バナー
cv2.rectangle(out, (10, 10), (260, 52), (0, 0, 0), -1)
cv2.putText(out, f"Count: {len(dets)}", (18, 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
return out
# ---------- 実行本体 ----------
def execute(self):
self.logger.info(f"{self.name} execute started")
# 1) 入力画像の取得Standardize後を推奨
img_bgr: Optional[np.ndarray] = self.status.processed_image
if img_bgr is None:
# 標準化前でも original_image があれば一応通す
img_bgr = self.status.original_image
if img_bgr is None:
self.logger.error("No image found in status (std_image_bgr/original_image)")
return
# 2) モデルの用意(初回だけロード)
model, names = self._ensure_model(self.weights)
# 3) ROI任意
roi_img, offset = self._apply_roi(img_bgr, self.roi)
# 4) 推論UltralyticsはRGB想定なので[..., ::-1]でOK
res = model.predict(roi_img[..., ::-1], conf=self.conf, iou=self.iou, verbose=False)[0]
names = model.model.names if hasattr(model, "model") and hasattr(model.model, "names") else model.names
# 5) 後処理(クラスフィルタ → Detectionへ詰め替え
dets: List[Detection] = []
for b in res.boxes:
cls_id = int(b.cls[0])
label = names[cls_id] if isinstance(names, (list, tuple)) else names.get(cls_id, str(cls_id))
if label not in self.allowed:
continue
x1, y1, x2, y2 = map(int, b.xyxy[0])
conf = float(b.conf[0])
dets.append(Detection(label=label, score=conf, box=Box(x1, y1, x2, y2)))
# ROIオフセットを元画像座標に戻す
dets = self._offset_boxes(dets, offset)
# 6) 可視化画像を生成
overlay = self._draw(img_bgr, dets)
# 7) statusへ保存
self.status.detected_objects = dets
self.status.overlay_image = overlay
self.logger.info(f"detected={len(dets)}, conf>={self.conf}, iou={self.iou}, allowed={sorted(self.allowed)}")
self.logger.info(f"{self.name} execute finished")

0
src/jobs/__init__.py Normal file
View File

16
src/jobs/job_base.py Normal file
View File

@ -0,0 +1,16 @@
from lib.custom_logger import get_logger
from app_status import AppStatus
class JobBase():
"""ジョブの基底クラス"""
def __init__(self, name="JobBase"):
self.logger = get_logger()
self.name = name
self.status = AppStatus()
self.logger.info(f"{self.name} initialized")
def execute(self):
"""ジョブの実行"""
self.logger.info(f"{self.name} execute called")
raise NotImplementedError("Subclasses must implement this method")

View File

@ -0,0 +1,38 @@
from jobs.job_base import JobBase
import cv2
import numpy as np
class JobStandardizeFormat(JobBase):
"""
画像ファイルのフォーマットを標準化するジョブ
- YOLOv8は内部でリサイズ/正規化
- ここでは必要に応じて色空間変換やチャネル数の調整を行う
- サイズ上限表示/処理負荷を抑えるため 長辺 max=12801920px に縮小(YOLO入力は内部で処理するの
"""
def __init__(self,max_side:int=1920):
super().__init__(name=self.__class__.__name__)
self.description = "Standardize Image Format Job"
self.max_side = max_side
def _cap_max_side(self, img: np.ndarray) -> tuple[np.ndarray, float]:
h, w = img.shape[:2]
self. logger.debug(f"Original image size: {w}x{h}")
side = max(h, w)
if self.max_side and side > self.max_side:
scale = self.max_side / side
img = cv2.resize(img, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA)
return img, float(scale)
return img, 1.0
def execute(self):
self.logger.info(f"{self.name} execute started")
# オリジナル画像から検証用の画像を取得
img, scale = self._cap_max_side(self.status.original_image)
self.status.processed_image = img
self.status.processed_scale = scale
self.logger.info(f"Image resized with scale {scale:.3f}")
# Yoloが実行するので基本的には何もしない
self.logger.warning(f"Yolo8 {img.shape} , auto resize/normalize internally")

10
src/lib/__init__.py Normal file
View File

@ -0,0 +1,10 @@
"""
This module provides the pengent library.
"""
from .custom_logger import get_logger, CustomLogger
__all__ = [
"get_logger",
"CustomLogger",
]

2
src/lib/common.py Normal file
View File

@ -0,0 +1,2 @@
import re

16
src/lib/common_widget.py Normal file
View File

@ -0,0 +1,16 @@
import cv2
import numpy as np
from PySide6.QtGui import QImage,QPixmap
def bgr_to_qpixmap(bgr):
"""BGR形式の画像をQPixmapに変換する
Notes:
- OpenCVはBGR形式QtはRGB形式を使うため変換が必要
- 画像のサイズやフォーマットに応じて適切にスケーリングする必要がある
"""
rgb: np.ndarray = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
h, w, ch = rgb.shape
qimg = QImage(rgb.data, w, h, ch*w, QImage.Format.Format_RGB888)
return QPixmap.fromImage(qimg)

56
src/lib/custom_logger.py Normal file
View File

@ -0,0 +1,56 @@
import logging
import functools
from .singleton import Singleton
class CustomLogger(Singleton):
"""
Singleton logger class that initializes a logger with a specified name and log file.
It provides a method to log entry and exit of functions.
"""
def __init__(self, name='main', log_file=None, level=logging.INFO):
if hasattr(self, '_initialized') and self._initialized:
return # すでに初期化済みなら何もしない
# self.logger.setLevel(level)
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
self.logger.propagate = False
formatter = logging.Formatter(
'%(asctime)s %(levelname)s [%(filename)s:%(lineno)3d]: %(message)s'
)
# Console handler
ch = logging.StreamHandler()
ch.setFormatter(formatter)
self.logger.addHandler(ch)
# File handler
if log_file:
fh = logging.FileHandler(log_file, encoding='utf-8')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
self._initialized = True
def get_logger(self):
return self.logger
def log_entry_exit(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
self.logger.info(f"Enter: {func.__qualname__}")
result = func(*args, **kwargs)
self.logger.info(f"Exit: {func.__qualname__}")
return result
return wrapper
def get_logger(name='main', log_file=None, level=logging.INFO):
custom_logger = CustomLogger(name, log_file, level)
return custom_logger.get_logger()

View File

@ -0,0 +1,40 @@
# infra/detectors/yolo_v8.py
import cv2
from typing import Iterable, List, Optional, Set
from ultralytics import YOLO
from types.detect import Detection, Box
class YoloV8Detector:
def __init__(self, weights: str = "yolov8n.pt", conf: float = 0.35, iou: float = 0.5,
allowed_classes: Optional[Iterable[str]] = None):
self.model = YOLO(weights) # 初回は自動DL
self.conf = conf
self.iou = iou
self.allowed: Optional[Set[str]] = set(allowed_classes) if allowed_classes else None
# names は list or dict のことがある
self.names = getattr(self.model, "names", None)
def detect(self, image_bgr) -> List[Detection]:
# UltralyticsはRGB想定
res = self.model.predict(image_bgr[..., ::-1], conf=self.conf, iou=self.iou, verbose=False)[0]
names = self.model.model.names if hasattr(self.model, "model") and hasattr(self.model.model, "names") else self.model.names
dets: List[Detection] = []
for b in res.boxes:
cls_id = int(b.cls[0])
label = names[cls_id] if isinstance(names, (list, tuple)) else names.get(cls_id, str(cls_id))
if self.allowed and label not in self.allowed:
continue
x1, y1, x2, y2 = map(int, b.xyxy[0])
score = float(b.conf[0])
dets.append(Detection(label=label, score=score, box=Box(x1, y1, x2, y2)))
return dets
@staticmethod
def draw(image_bgr, detections: Iterable[Detection]):
out = image_bgr.copy()
for d in detections:
x1, y1, x2, y2 = d.box.x1, d.box.y1, d.box.x2, d.box.y2
cv2.rectangle(out, (x1, y1), (x2, y2), (0, 200, 0), 2)
cv2.putText(out, f"{d.label} {d.score:.2f}", (x1, max(0, y1 - 6)),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 200, 0), 2)
return out

20
src/lib/singleton.py Normal file
View File

@ -0,0 +1,20 @@
"""Singleton pattern implementation in Python.
This implementation is thread-safe and ensures that only one instance of the class is created.
Singleton が提供するのは同じインスタンスを返す仕組み
* __init__() は毎回呼ばれる(多くの人が意図しない動作)
* __init__の2回目は_initialized というフラグは 使う側で管理する必要がある
"""
import threading
class Singleton(object):
_instances = {}
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls not in cls._instances:
with cls._lock:
if cls not in cls._instances: # ダブルチェック
cls._instances[cls] = super(Singleton, cls).__new__(cls)
return cls._instances[cls]

View File

@ -0,0 +1,12 @@
from pipeline.pipeline_base import PipelineBase
from jobs.job_standardize_format import JobStandardizeFormat
from jobs.Job_detect_objects_for_car import JobDetectObjectsForCar
class ImagePipeline(PipelineBase):
"""画像解析用のパイプライン"""
def __init__(self):
super().__init__()
self.logger.info("ImagePipeline initialized")
# ジョブを追加
self.add_job(JobStandardizeFormat())
self.add_job(JobDetectObjectsForCar())

View File

@ -0,0 +1,19 @@
from typing import List
from jobs.job_base import JobBase
from app_status import AppStatus
from lib.custom_logger import get_logger
logger = get_logger()
class PipelineBase:
"""Pipelineの基本クラス"""
def __init__(self):
self.jobs:List[JobBase] = []
self.logger = get_logger()
self.status = AppStatus()
def add_job(self, job: JobBase):
self.jobs.append(job)
def run(self):
for job in self.jobs:
job.execute()

BIN
src/yolov8n.pt Normal file

Binary file not shown.