6台のFPD-Link IIIカメラで同時に画像保存
概要
このサンプルプログラムは、NVIDIA Jetson Xavier NX内蔵のキャリアボード「CBH 6-JX-FPD」に最大6台のFPD-Link IIIカメラを接続し、外部トリガ入力を検出したタイミングで各カメラの画像を取得・ディスクに保存します。
【ハードウェア構成】
サンプルプログラム
モバイルサイトではご利用いただけません。
検証デバイス
対象ハードウェア | CBH 6-JX-FPD(NVIDIA Jetson Xavier NX JetPackインストール済) |
---|---|
検証時に使用したデバイス |
・FPD-Link IIIカメラ(DFK36CR0234-I67)×6 ・FAKRA IP67ケーブル1m(CA-FAKRA-IP67-1m)×6 ・ミニレンズ(TBN 6 5MP)×6 ■お客様でご用意していただくもの ・HDMIケーブル・マウス・キーボード・モニタ |
デバイスのセットアップ
カメラとCBH 6-JX-FPDをFAKRAケーブルで接続し、CBH 6-JX-FPDのIN1端子に外部トリガ(ジャンパーピン)を配線します。
※ここではテスト用にボタンスイッチを接続していますが、本端子は3.3V~36V(駆動電流 5mA~12mA)の幅広い電圧に対応しています(リファレンスマニュアル P.9参照)。
その他、I2CやCANやRS232などに対応しています。
外部トリガについて
キャリアボードCBH 6-JX-FPDには、外部機器からの信号を受けてカメラの撮影タイミングを同期するための「GPIn(外部トリガ入力)」が用意されています。外部トリガを入力するキャリアボードの入力ピンとFPD-Link IIIカメラとのプログラムによる接続は、tis-board-ctrl(あらかじめインストールされているコマンドラインツール)を使う事ができます。
サンプルプログラムについて
このプログラムは、まず tis-board-ctrlコマンドでGPIOをカメラのポートに割り当てたあと、6台のカメラをシリアル番号指定で開き、解像度やゲイン、露光時間、ホワイトバランスなどの設定を行います。
キャリアボードが外部トリガ信号を受け取るごとにカメラに対してトリガを発行し、そのトリガを受けたカメラは1枚フレームを出力します。キャリアボードがフレームを受け取るとコールバックが走り、別スレッドでそのフレームをファイルに保存します。指定した秒数(ここでは1秒)以内に画像が来なければ、カメラのパイプラインを再起動しています。もしカメラがハングアップをしていた場合は、パイプラインの再起動でデバイスが再度起動します。
コード全体
import sys
import cv2
import numpy as np
import os
import TIS
import time
import datetime
import threading
import atexit
import subprocess
# 実行したいコマンドをリスト化(引数ごとに分割)
commands = [
['tis-board-ctrl', 'DESER1_GPIO1', 'BOARD_GPIN1'],
['tis-board-ctrl', 'DESER1_GPIO3', 'BOARD_GPIN1'],
['tis-board-ctrl', 'DESER2_GPIO1', 'BOARD_GPIN1'],
['tis-board-ctrl', 'DESER3_GPIO1', 'BOARD_GPIN1'],
['tis-board-ctrl', 'DESER4_GPIO1', 'BOARD_GPIN1'],
['tis-board-ctrl', 'DESER4_GPIO3', 'BOARD_GPIN1'],
]
for cmd in commands:
try:
# コマンド実行(終了コードチェック付き)
result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"OK: {' '.join(cmd)}")
print(" stdout:", result.stdout.strip())
except subprocess.CalledProcessError as e:
print(f"Error: {' '.join(cmd)} (code={e.returncode})")
print(" stderr:", e.stderr.strip())
# 使用するカメラの台数
num_camera = 6
# 画像サイズ
width = 1920
height = 1200
# 保存先のベースディレクトリ
save_dir_base = os.getcwd()
# カメラごとの一時データ管理クラス
class CustomData:
def __init__(self, camera_number):
self.imagecounter = 0
self.busy = False
self.camera_number = camera_number
self.image_received = False
# 新しい画像が届いたときの処理(コールバック関数)
def on_new_image(tis, userdata):
userdata.imagecounter += 1
userdata.image_received = True # 画像受信フラグを立てる
print(f"[Cam{userdata.camera_number}] 画像受信")
image = tis.get_image()
thread = threading.Thread(target=save_image, args=(image, userdata))
thread.start()
# 画像保存処理
def save_image(image, userdata):
now = datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')[:-2]
filename = save_dir[userdata.camera_number] + "/image_{:05}_".format(userdata.imagecounter) + now + ".png"
cv2.imwrite(filename, image)
#cv2.imshow(filename,image)
# 終了処理(全カメラ停止)
def cleanup_function():
print("終了処理を実行します")
for i in range(num_camera):
Tis[i].set_property("TriggerMode", "Off")
Tis[i].stop_pipeline()
atexit.register(cleanup_function)
# カメラと保存ディレクトリの初期化
print("カメラ初期化中...")
save_dir = []
Tis = []
CD = []
now_dir = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
for i in range(num_camera):
Tis.append(TIS.TIS())
save_path = os.path.join(save_dir_base, now_dir, str(i))
save_dir.append(save_path)
os.makedirs(save_path, exist_ok=True)
print(f"カメラ{i} オープン")
serial = [
"03520908",
"04420304",
"02520628",
"02520630",
"02520631",
"04420305"
][i]
Tis[i].open_device(serial, width, height, "5/1", TIS.SinkFormats.BGRA, False)
# CustomData作成
CD.append(CustomData(i))
CD[i].busy = True
Tis[i].set_property("TriggerMode", "Off")
Tis[i].set_image_callback(on_new_image, CD[i])
Tis[i].start_pipeline()
try:
# カメラ設定(ホワイトバランスなど)
Tis[i].set_property("GainAuto", "Off")
Tis[i].set_property("ExposureAuto", "Off")
Tis[i].set_property("BalanceWhiteAuto", "Off")
Tis[i].set_property("BalanceWhiteAuto", "Off")
Tis[i].set_property("BalanceWhiteRed", 1.0)
Tis[i].set_property("BalanceWhiteGreen", 1.0)
Tis[i].set_property("BalanceWhiteBlue", 1.0)
Tis[i].set_property("ExposureAuto", "Off")
Tis[i].set_property("ExposureTime", 1000)
Tis[i].set_property("GainAuto", "Continuous")
Tis[i].set_property("BalanceWhiteAuto", "Continuous")
except Exception as error:
print(f"[Cam{i}] 設定エラー:", error)
Tis[i].set_property("TriggerMode", "On")
CD[i].busy = False
print("カメラ初期化完了")
#別スレッドで定期的にトリガー送信と再起動監視を実行
def trigger_loop():
while True:
# 受信状況をチェックし、必要に応じてストリームを再起動する
for i in range(num_camera):
# 画像が受信されていない場合
if not CD[i].image_received:
print(f"[Cam{i}] 画像を受信していません → 再起動中")
try:
# ストリームを停止
Tis[i].stop_pipeline()
time.sleep(0.1) # 少し待機
# ストリームを再開
Tis[i].start_pipeline()
time.sleep(0.1) # 少し待機
# トリガーモードをオンに設定
Tis[i].set_property("TriggerMode", "On")
except Exception as e:
# 再起動に失敗した場合、エラーメッセージを表示
print(f"[Cam{i}] 再起動に失敗しました: {e}")
# 画像受信フラグをリセット
CD[i].image_received = False
# 次のトリガーまで1秒待機
time.sleep(1)
# トリガーループを別スレッドで起動
threading.Thread(target=trigger_loop, daemon=True).start()
# メインスレッドはユーザー入力を待機(qで終了)
while True:
key = input("q:終了\n>> ")
if key.strip().lower() == "q":
break
print("プログラム終了")
モジュールのインポートと基本設定
import sys # Pythonインタプリタや実行環境に関する情報を取得するため
import cv2 # OpenCVライブラリ。画像読み書きや処理に使用
import numpy as np # 数値計算ライブラリ。画像処理の配列操作などで利用
import os # ファイル/ディレクトリ操作用
import TIS # IC Imaging Control の Pythonラッパー
import time # 時刻操作(sleepや現在時刻取得)に使用
import datetime # 日付時刻フォーマット用
import threading # マルチスレッドを用いた非同期処理用
import atexit # プログラム終了時のクリーンアップ登録用
import subprocess # 外部コマンド実行用
この部分では、プログラムで必要となる各種モジュールを読み込んでいます。
トリガー設定コマンドを一括実行
# 実行したいコマンド群をリスト化。各引数は要素ごとに分割して記述。
commands = [
['tis-board-ctrl', 'DESER1_GPIO1', 'BOARD_GPIN1'],
['tis-board-ctrl', 'DESER1_GPIO3', 'BOARD_GPIN1'],
['tis-board-ctrl', 'DESER2_GPIO1', 'BOARD_GPIN1'],
['tis-board-ctrl', 'DESER3_GPIO1', 'BOARD_GPIN1'],
['tis-board-ctrl', 'DESER4_GPIO1', 'BOARD_GPIN1'],
['tis-board-ctrl', 'DESER4_GPIO3', 'BOARD_GPIN1'],
]
for cmd in commands:
try:
# check=True で、異常終了時に CalledProcessError が発生
result = subprocess.run(
cmd,
check=True,
stdout=subprocess.PIPE, # 標準出力をキャプチャ
stderr=subprocess.PIPE # 標準エラーをキャプチャ
)
print(f"OK: {' '.join(cmd)}")
print(" stdout:", result.stdout.strip())
except subprocess.CalledProcessError as e:
# コマンドがエラーになった場合のエラーハンドリング
print(f"Error: {' '.join(cmd)} (code={e.returncode})")
print(" stderr:", e.stderr.strip())
プログラムはまず commands というリストに、キャリアボードの各 GPIO ピンをトリガ入力に割り当てるためのコマンドを格納します。このリストを順番に処理する for ループ内では、subprocess.run を使って各コマンドを実行し、標準出力と標準エラーをキャプチャします。正常終了したコマンドについては「OK」とともに実行内容をターミナル画面に出力し、トリガ入力設定が正しく完了したかどうかを表示するようにしています。
なお、コマンドが正常終了しなかった場合には例外 (CalledProcessError) が発生し、その場でエラーキャッチしてエラーメッセージと終了コードを表示します。
基本パラメータとデータ管理クラス
# ──── カメラ台数や画像サイズなど基本パラメータ ────
num_camera = 6
width = 1920
height = 1200
save_dir_base = os.getcwd() # 現在の作業ディレクトリをベースに保存
# ──── カメラごとの状態管理用クラス ────
class CustomData:
def __init__(self, camera_number):
self.imagecounter = 0 # 何枚目のフレームかをカウント
self.busy = False # コールバック中かどうか
self.camera_number = camera_number
self.image_received = False # 一度でも画像を受け取ったか
CustomData はスレッド間でカメラ状態を共有するためのオブジェクトです。image_received フラグを使って、一定間隔ごとに通信断の有無をチェックします。
画像受信コールバック
def on_new_image(tis, userdata):
"""TISライブラリから呼ばれるコールバック関数"""
userdata.imagecounter += 1
userdata.image_received = True
print(f"[Cam{userdata.camera_number}] 画像受信")
image = tis.get_image() # バッファから NumPy 画像を取得
# 保存は別スレッドで非同期実行してメイン処理をブロックしない
threading.Thread(target=save_image, args=(image, userdata)).start()
on_new_image は、TISライブラリから新しいフレーム到着時に自動で呼ばれる関数です。
まず、userdata.imagecounterをカウントアップし受信フラグを立て、ターミナル画面に「画像受信」の文字をターミナル画面に出力します。次に tis.get_image() でバッファから NumPy 形式の画像を取得し、save_image を別スレッドで起動してディスク保存を非同期実行しています。
保存処理
def save_image(image, userdata):
"""別スレッドで呼ばれ、画像をディスクへ保存する関数"""
now = datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')[:-2]
# フォルダ + カウンタ + タイムスタンプ でファイル名を生成
filename = save_dir[userdata.camera_number] \
+ f"/image_{userdata.imagecounter:05}_{now}.png"
cv2.imwrite(filename, image)
save_image は呼び出し時の時刻を「YYYYMMDDhhmmssff」形式で取得し、カメラ番号・連番・タイムスタンプを組み合わせたファイル名を生成、cv2.imwrite で画像を PNG 形式で保存します。
終了時のクリーンアップ登録
def cleanup_function():
print("終了処理を実行します")
for i in range(num_camera):
# トリガオフ → パイプライン停止
Tis[i].set_property("TriggerMode", "Off")
Tis[i].stop_pipeline()
# プログラム終了時に必ず呼ばれる
atexit.register(cleanup_function)
atexit.register に渡した関数は、正常終了・例外終了・CTRL+Cのいずれでも実行されます。これで全カメラのシャッター/ストリームが確実に止まり、リソースリークを防ぎます。
カメラ初期化とパイプライン開始
print("カメラ初期化中...")
save_dir = []
Tis = []
CD = []
now_dir = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
for i in range(num_camera):
# TISインスタンス生成
Tis.append(TIS.TIS())
# 保存フォルダ作成
save_path = os.path.join(save_dir_base, now_dir, str(i))
save_dir.append(save_path)
os.makedirs(save_path, exist_ok=True)
print(f"カメラ{i} オープン")
serials = [
"03520908", "04420304", "02520628",
"02520630", "02520631", "04420305"
]
# デバイスをシリアル指定で開き、解像度・フォーマット設定
Tis[i].open_device(
serials[i], width, height, "5/1",
TIS.SinkFormats.BGRA, False
)
# 状態オブジェクトを作成・初期化
CD.append(CustomData(i))
CD[i].busy = True
# トリガモードオフ→コールバック登録→パイプライン開始
Tis[i].set_property("TriggerMode", "Off")
Tis[i].set_image_callback(on_new_image, CD[i])
Tis[i].start_pipeline()
# 各種カメラパラメータ設定
try:
# オート設定OFF → 手動パラメータ指定
Tis[i].set_property("GainAuto", "Off")
Tis[i].set_property("ExposureAuto", "Off")
Tis[i].set_property("BalanceWhiteAuto", "Off")
Tis[i].set_property("BalanceWhiteRed", 1.0)
Tis[i].set_property("BalanceWhiteGreen", 1.0)
Tis[i].set_property("BalanceWhiteBlue", 1.0)
Tis[i].set_property("ExposureTime", 1000)
# 続けて連続オートにもどす例
Tis[i].set_property("GainAuto", "Continuous")
Tis[i].set_property("BalanceWhiteAuto", "Continuous")
except Exception as error:
print(f"[Cam{i}] 設定エラー:", error)
# 最後にトリガをオンにして外部信号待ち状態へ
Tis[i].set_property("TriggerMode", "On")
CD[i].busy = False
print("カメラ初期化完了")
プログラムはまず「カメラ初期化中…」とターミナル画面で表示し、画像保存用のフォルダ一覧とカメラ制御用のリストを準備します。現在時刻を元にフォルダ名を作成し、6台分のループを回します。ループ内では、静止画保存するための保存先フォルダを作り、カメラのシリアル番号を指定してカメラを開きます。次に一度トリガーモードをオフにし、画像受信のコールバックを登録してパイプラインを開始します。ゲインや露光、ホワイトバランスなどを設定し、最後にトリガーモードをオンに戻します。全カメラの準備が終わると「カメラ初期化完了」とターミナル画面で表示されます。
トリガ監視&自動再起動ループ
#別スレッドで定期的にトリガー送信と再起動監視を実行
def trigger_loop():
while True:
# 受信状況をチェックし、必要に応じてストリームを再起動する
for i in range(num_camera):
# 画像が受信されていない場合
if not CD[i].image_received:
print(f"[Cam{i}] 画像を受信していません → 再起動中")
try:
# ストリームを停止
Tis[i].stop_pipeline()
time.sleep(0.1) # 少し待機
# ストリームを再開
Tis[i].start_pipeline()
time.sleep(0.1) # 少し待機
# トリガーモードをオンに設定
Tis[i].set_property("TriggerMode", "On")
except Exception as e:
# 再起動に失敗した場合、エラーメッセージを表示
print(f"[Cam{i}] 再起動に失敗しました: {e}")
# 画像受信フラグをリセット
CD[i].image_received = False
# 次のトリガーまで1秒待機
time.sleep(1)
#スレッドで起動(メイン終了時に自動終了)
threading.Thread(target=trigger_loop, daemon=True).start()
このコードは、カメラからキャリアボードのカメラ接続ポート(Tis[i])へ画像が送信されているかを定期的に監視するためのものです。
trigger_loopという関数を別のスレッドで実行して、キャリアボードのカメラ接続ポート(Tis[i])が画像を受信しているかどうかをチェックし、受信していなければパイプラインのライブスタートのストリームを再起動する処理を行います。
trigger_loop関数は、無限ループの中で(Tis[i])ごとに画像の受信状態を確認します。もし(Tis[i])が画像を受信していない場合、(Tis[i])のストリームを一旦停止させ、少し待機した後、再度ストリームを開始します。
そして、トリガーモードを「オン」に設定して、(Tis[i])が画像を受信できるようにします。万が一、再起動に失敗した場合はエラーメッセージが表示されます。その後、画像の受信フラグをリセットし、次のチェックまで1秒間待機します。このようにして、カメラが適切に動作していない場合に自動的にキャリアボードのカメラ接続ポート(Tis[i])のパイプラインを再起動し、安定した画像受信を確保することができます。
最後に、threading.Threadを使ってこのtrigger_loop関数を別スレッドで実行します。これにより、メインスレッドが終了してもこの監視処理は継続して実行されます。daemon=Trueと設定することで、プログラムが終了する際にスレッドが自動的に終了するようになります。
メインループ(ユーザー終了待機)
# メインスレッドはキー入力を待つだけ
while True:
key = input("q:終了\n>> ")
if key.strip().lower() == "q":
break
print("プログラム終了")
メインスレッドではユーザーの入力を待つだけで、qを入力するとループを抜けてプログラムを終了します。これにより、必要な処理がすべて終わった後にユーザーの指示で安全に停止できます。