コールバック関数の設定方法(OpenCVで二値化)
概要
カメラから連続フレームを取得して、OpenCVによる画像処理(上下反転 → グレースケール化 → 二値化 → リサイズ)と画面表示をリアルタイムに行うサンプルプログラムです。
サンプルプログラム
| 利用した開発環境 | Visual Studio™ 2022 |
|---|---|
| 開発環境 | Python 3.9以降 |
| OS | Windows / Linux(Ubuntu) |
| SDK | IC Imaging Control 4 SDK |
| 必要ライブラリ | imagingcontrol4, opencv-python |
| デバイスドライバ | GenICam 対応 TIS カメラ用ドライバ |
| 対応デバイス | TIS USB / GigE カメラ(GenICam対応機種) |
| サンプル(Python) | IC4_image-processing.zip |
| exeファイル アプリケーション |
ー |
| 別途ファイル | ー |
| 関連参照URL | ー |
インストール
pip install imagingcontrol4 opencv-python
実行方法
python IC4_image-processing.py
停止方法
表示ウィンドウを選択した状態でqキーまたはESCキーを押下する。
コード全体
iimport cv2
import imagingcontrol4 as ic4
# スレッド間で画像を共有するためのグローバル変数
shared_image = None
# ===== QueueSink Listener =====
# カメラからフレームが届くたびに、IC4 が下の frames_queued を自動で呼び出す。
class CamListener(ic4.QueueSinkListener):
def sink_connected(self, sink, image_type, min_buffers_required):
# 接続時:フレームを受け取るための“使い回しバッファ”をあらかじめ確保しておく。
sink.alloc_and_queue_buffers(max(min_buffers_required, 5))
return True # False を返すと開始(stream_setup)が失敗する
def sink_disconnected(self, sink):
# 停止時:今回のサンプルでは特に後始末は不要。
return
def frames_queued(self, sink):
# ここがフレーム到着ごとに呼ばれるコールバック
# 出力キューから1枚取り出して、OpenCV で処理を行う。
global shared_image
# 取り出したフレーム(届いていなければ None なので何もしない)
buf = sink.try_pop_output_buffer()
if buf is None:
return
try:
# ImageBufferをNumPy配列として参照する(BGR8 → 高さ×幅×3 の配列)。
# これは“バッファを直接見ているビュー”なので、buf.release() より前に使い切ること。
image = buf.numpy_wrap()
# --- OpenCV 処理 ---
image = cv2.flip(image, 0) # 上下反転(0=上下, 1=左右, -1=両方)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # カラー → グレースケール
_, mono = cv2.threshold(gray, 80, 255, cv2.THRESH_BINARY) # しきい値80で白黒2値化
view = cv2.resize(mono, (640, 480))
# メインスレッドへ渡すため、メモリを完全に複製する(バッファ解放後のクラッシュ対策)
shared_image = view.copy()
finally:
buf.release()
# ===== ここから実行 =====
if __name__ == "__main__":
# IC4 ライブラリを読み込み後に実行。
with ic4.Library.init_context(api_log_level=ic4.LogLevel.INFO,
log_targets=ic4.LogTarget.STDERR):
try:
# --- カメラを探す。1台も無ければ中断 ---
devs = ic4.DeviceEnum.devices()
if len(devs) < 1:
raise RuntimeError("カメラが見つかりません。1台必要です。")
# --- 先頭のカメラを開く ---
grabber = ic4.Grabber()
grabber.device_open(devs[0])
print(f"[open] cam1: {devs[0].model_name}")
# --- カメラの基本設定を書き込む ---
# ※値はお使いのカメラの仕様(対応解像度など)に合わせて変更してください。
mp = grabber.device_property_map # 設定の読み書き窓口
mp.set_value(ic4.PropId.WIDTH, int(1920)) # 画像幅
mp.set_value(ic4.PropId.HEIGHT, int(1080)) # 画像高さ
mp.set_value(ic4.PropId.ACQUISITION_FRAME_RATE, float(30)) # フレームレート
mp.set_value(ic4.PropId.PIXEL_FORMAT, "BGR8" ) # カラーフォーマット
mp.set_value(ic4.PropId.TRIGGER_MODE, "Off" ) # トリガーモード
mp.set_value(ic4.PropId.GAIN_AUTO, "Off") # ゲイン自動
mp.set_value(ic4.PropId.GAIN, 0) # ゲイン値
mp.set_value(ic4.PropId.EXPOSURE_AUTO, "Off") # 露光自動
mp.set_value(ic4.PropId.EXPOSURE_TIME, 16335) # 露光時間
# --- フレーム受け取りのsink(QueueSink)を作って取得開始 ---
listener = CamListener() # 上で定義したコールバック役
sink = ic4.QueueSink(listener, [ic4.PixelFormat.BGR8]) # 受け付けるフォーマットは BGR8
grabber.stream_setup(sink, setup_option=ic4.StreamSetupOption.ACQUISITION_START)
print("ライブ処理中… 表示ウィンドウを選択して 'q' または ESC で停止します。")
# --- メインループ(描画と停止待ち) ---
while True:
# コールバックで処理された画像があれば表示する
if shared_image is not None:
cv2.imshow("Window", shared_image)
key = cv2.waitKey(10) & 0xFF
if key == ord('q') or key == 27: # 'q' または ESC で終了
break
except ic4.IC4Exception as ex:
print(f"IC4 エラー: {ex.message}")
except Exception as e:
print("エラー:", e)
finally:
# --- エラーや強制終了時でも必ず実行してカメラを解放する ---
if grabber.is_device_open:
grabber.stream_stop() # 取得停止
grabber.device_close() # カメラを閉じる
cv2.destroyAllWindows() # 表示ウィンドウを閉じる
print("停止しました。")
解説
ライブラリの読み込みとグローバル変数の準備
import cv2
import imagingcontrol4 as ic4
shared_image = None
画像処理用に OpenCV(cv2)、カメラ制御用にimagingcontrol4を読み込みます。
コールバック関数とメインスレッドの間で画像を受け渡すためのグローバル変数shared_imageを準備します。
コールバックを受け取るクラスの定義
class CamListener(ic4.QueueSinkListener):
カメラからフレームが届いた瞬間に特定の処理をイベント発火させるため、SDKが提供するQueueSinkListenerを継承した独自のクラスCamListenerを定義します。このクラスの中に、接続時や画像到着時の具体的な挙動を書き込んでいきます。
接続時:バッファの確保(sink_connected)
def sink_connected(self, sink, image_type, min_buffers_required):
sink.alloc_and_queue_buffers(max(min_buffers_required, 5))
return True
カメラから次々に送られてくる画像データを一時的に格納するためのバッファを事前にまとめて確保します。
フレーム到着時の処理(frames_queued)
def frames_queued(self, sink):
# ここがフレーム到着ごとに呼ばれるコールバック
# 出力キューから1枚取り出して、OpenCV で処理を行う。
global shared_image
# 取り出したフレーム(届いていなければNoneなので何もしない)
buf = sink.try_pop_output_buffer()
if buf is None:
return
try:
# ImageBufferをNumPy配列として参照する(BGR8 → 高さ×幅×3 の配列)。
# これは“バッファを直接見ているビュー”なので、buf.release() より前に使い切ること。
image = buf.numpy_wrap()
# --- OpenCV 処理 ---
image = cv2.flip(image, 0) # 上下反転(0=上下, 1=左右, -1=両方)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # カラー → グレースケール
_, mono = cv2.threshold(gray, 80, 255, cv2.THRESH_BINARY) # しきい値80で白黒2値化
view = cv2.resize(mono, (640, 480))
# メインスレッドへ渡すため、メモリを完全に複製する(バッファ解放後のクラッシュ対策)
shared_image = view.copy()
finally:
buf.release()
frames_queuedはフレームがメモリに入ってきたときに、IC4の別スレッドから発火するイベントで、メインスレッドとは別スレッドの非同期で呼び出されます。
まず、カメラから送られてメモリ内に入ってきた最新の画像バッファ(buf)を1枚取り出します。IC4独自の画像バッファbufを、OpenCVで直接扱えるNumPy配列(縦×横×色チャンネル)の形式に変換します。OpenCVを使って取り出した画像に対して「上下反転(cv2.flip)」「グレースケール化(cv2.cvtColor)」「しきい値80での二値化(cv2.threshold)」を順に処理し、その画像を表示用サイズ(640x480)にリサイズします。OpenCVの画面表示関数はメインスレッド以外から呼び出すと、実行環境によってはフリーズする可能性があるので、この関数内では画面表示を行わず、OpenCVで処理した画像をグローバル変数shared_imageに格納するまでにしておきます。
メイン処理(カメラの初期化)
if __name__ == "__main__":
# IC4 ライブラリを読み込み後に実行。
with ic4.Library.init_context(api_log_level=ic4.LogLevel.INFO,
log_targets=ic4.LogTarget.STDERR):
try:
# --- カメラを探す。1台も無ければ中断 ---
devs = ic4.DeviceEnum.devices()
if len(devs) < 1:
raise RuntimeError("カメラが見つかりません。1台必要です。")
# --- 先頭のカメラを開く ---
grabber = ic4.Grabber()
grabber.device_open(devs[0])
print(f"[open] cam1: {devs[0].model_name}")
# --- カメラの基本設定を書き込む ---
# ※値はお使いのカメラの仕様(対応解像度など)に合わせて変更してください。
mp = grabber.device_property_map # 設定の読み書き窓口
mp.set_value(ic4.PropId.WIDTH, int(1920)) # 画像幅
mp.set_value(ic4.PropId.HEIGHT, int(1080)) # 画像高さ
mp.set_value(ic4.PropId.ACQUISITION_FRAME_RATE, float(30)) # フレームレート
mp.set_value(ic4.PropId.PIXEL_FORMAT, "BGR8" ) # カラーフォーマット
mp.set_value(ic4.PropId.TRIGGER_MODE, "Off" ) # トリガーモード
mp.set_value(ic4.PropId.GAIN_AUTO, "Off") # ゲイン自動
mp.set_value(ic4.PropId.GAIN, 0) # ゲイン値
mp.set_value(ic4.PropId.EXPOSURE_AUTO, "Off") # 露光自動
mp.set_value(ic4.PropId.EXPOSURE_TIME, 16335) # 露光時間
メインスレッドの処理で、まず最初にIC4ライブラリ全体を利用可能にするための初期化処理ic4.Library.init_contextを行います。次に現在PCに接続されているすべての対応カメラ(USB/GigEなど)をic4.DeviceEnum.devices()で検出し、リストとして取得します。次に映像を取得するための中心的なオブジェクトGrabberを生成し、検出されたカメラの先頭の1台(devs[0])を開きます。
カメラを開いたら、カメラのプロパティ(解像度、フレームレート、露光時間、ゲインなど)をdevice_property_mapを経由して設定を変更します。ここでは定義されたID(ic4.PropId)を用いて直感的にカメラ内部のレジスタ値を書き換えることができます。
※設定している値(解像度1920×1080など)はサンプル用の固定値です。接続したカメラがその解像度やフォーマットに対応していない場合はエラー(例外)が発生するため、実機に合わせて適切な値に書き換える必要があります。
ライブ取得の開始
# --- フレーム受け取りのsink(QueueSink)を作って取得開始 ---
listener = CamListener() # 上で定義したコールバック役
sink = ic4.QueueSink(listener, [ic4.PixelFormat.BGR8]) # 受け付けるフォーマットは BGR8
grabber.stream_setup(sink, setup_option=ic4.StreamSetupOption.ACQUISITION_START)
print("ライブ処理中… 表示ウィンドウを選択して 'q' または ESC で停止します。")
# --- メインループ(描画と停止待ち) ---
while True:
# コールバックで処理された画像があれば表示する
if shared_image is not None:
cv2.imshow("Window", shared_image)
key = cv2.waitKey(10) & 0xFF
if key == ord('q') or key == 27: # 'q' または ESC で終了
break
except ic4.IC4Exception as ex:
print(f"IC4 エラー: {ex.message}")
except Exception as e:
print("エラー:", e)
finally:
# --- エラーや強制終了時でも必ず実行してカメラを解放する ---
if grabber.is_device_open:
grabber.stream_stop() # 取得停止
grabber.device_close() # カメラを閉じる
cv2.destroyAllWindows() # 表示ウィンドウを閉じる
print("停止しました。")
カメラからの映像データを受け取るSinkを作成します。事前に定義したコールバッククラスのインスタンス(listener)と、受け取りたいピクセル形式(BGR8)を登録します。grabber.stream_setupで登録したsinkを開いているカメラに結びつけ、ACQUISITION_STARTによってカメラへデータ配信開始のコマンドを送信します。それにより、カメラから画像がメモリに届くたびに、別スレッドのframes_queuedのコールバック関数が自動的に動き始めます。
次にwhileの無限ループで、別スレッド側から最新画像が書き込まれるグローバル変数shared_imageを常に監視し、cv2.imshowを呼び出し、二値化画像を描画・更新します。cv2.waitKey(10) により、10msecの間、ユーザーのキー入力を待ち受けます。キーボードの「q」または「ESC」が押されたことを検知すると、無限ループを脱出します。無限ループを抜けた後には、stream_stopでカメラからの画像配信とコールバックの動作を完全に停止させ、device_closeで占有していたカメラを解放します。その後、cv2.destroyAllWindowsでOpenCVが生成した表示ウィンドウをすべて破棄し、メモリを開放します。



