産業用UVCカメラのすすめ 産業用UVCカメラのすすめ

※画面を横にするとパソコン版のレイアウトでご覧いただけます。正しく表示されていない場合は横向きでご覧ください。

tiscamera用GStreamerのラッパーファイル(TIS.py)

TIS.pyは画像を表示するためのライブラリGStreamerを簡単に使用するために、GStreamerをラップしたpython用の中間ファイルです。TIS.pyを使ってプログラムを作成することでコードを簡略化でき、保守性の高いコードを作成することができます。

例えば、TIS.pyのファイルがない場合には下記のようにGstreamerのパイプラインのコマンドなど意識しながらカメラをオープンしないといけません。

TIS.pyなしの場合

gi.require_version("Gst", "1.0")
from gi.repository import Gst

Gst.init(sys.argv)  
Gst.debug_set_default_threshold(Gst.DebugLevel.WARNING)

serial = None

pipeline = Gst.parse_launch("tcambin name=bin "
                           " ! videoconvert"
                           " ! ximagesink sync=false")

camera = pipeline.get_by_name("bin")
if serial is not None:
   camera.set_property("serial", serial)

pipeline.set_state(Gst.State.PLAYING)

TIS.pyのファイルを読み込むことで下記のようにコードを簡略化してカメラをオープンすることができます。

TIS.pyありの場合

import TIS

Tis = TIS.TIS()
Tis.openDevice("10710286", 640, 480, "30/1", TIS.SinkFormats.BGRA,True)
Tis.Start_pipeline() 

tiscameraのバージョンについて

自分がどのバージョンを使っているかはターミナル画面にて「tcam-ctrl --version」を入力してください。
下記のように【Tcam】の欄にバージョンが表示されます。下記の場合は1.0.0がtiscameraのバージョンです。

コード全体

#### 解説1
import time
from collections import namedtuple

import gi
import re
import numpy
from enum import Enum
gi.require_version("Gst", "1.0")
gi.require_version("Tcam", "1.0")

from gi.repository import GLib, GObject, Gst, Tcam

DeviceInfo = namedtuple("DeviceInfo", "status name identifier connection_type")
CameraProperty = namedtuple("CameraProperty", "status value min max default step type flags category group")

#### 解説2
class SinkFormats(Enum):
    GRAY8 = 0
    GRAY16_LE = 1
    BGRA = 2

    def toString(pf):
        if( pf == SinkFormats.GRAY16_LE ):
            return "GRAY16_LE"

        if( pf == SinkFormats.GRAY8 ):
            return "GRAY8"

        if( pf == SinkFormats.BGRA ):
            return "BGRx"

        return "BGRx"

    def fromString(pf):
        if( pf == "GRAY16_LE"):
            return SinkFormats.GRAY16_LE
        if( pf == "GRAY8"):
            return SinkFormats.GRAY8
      
        return SinkFormats.BGRA

#### 解説3
#### 解説3-1
class TIS:
    'The Imaging Source Camera'

    def __init__(self):
        ''' Constructor
        :return: none
        '''
        Gst.init([]) 
        self.serialnumber = ""
        self.height = 0
        self.width = 0
        self.framerate="15/1"
        self.livedisplay = True
        self.sinkformat = SinkFormats.BGRA
        self.sample = None
        self.samplelocked = False
        self.newsample = False
        self.img_mat = None
        self.ImageCallback = None
        self.pipeline = None
        self.source = None

#### 解説3-2
    def openDevice(self,serial, width, height, framerate, sinkformat: SinkFormats, showvideo: bool):
        ''' Inialize a device, e.g. camera.
        :param serial: Serial number of the camera to be used.
        :param width: Width of the wanted video format
        :param height: Height of the wanted video format
        :param framerate: Numerator of the frame rate. /1 is added automatically
        :param color: True = 8 bit color, False = 8 bit mono. ToDo: Y16
        :return: none
        '''
        self.serialnumber = serial
        self.height = height
        self.width = width
        self.framerate = framerate
        self.sinkformat = sinkformat
        self.livedisplay = showvideo
        self._createPipeline()
        self.source.set_property("serial", self.serialnumber)
        self.pipeline.set_state(Gst.State.READY)
        self.pipeline.get_state(40000000)

#### 解説3-3
    def _createPipeline(self):
        p = 'tcambin name=source ! capsfilter name=caps'
        if self.livedisplay is True:
            p += " ! tee name=t"
            p += " t. ! queue ! videoconvert ! ximagesink"
            p += " t. ! queue ! appsink name=sink"
        else:
            p += ' ! appsink name=sink'

        print(p)
        try:
            self.pipeline = Gst.parse_launch(p)
        except GLib.Error as error:
            print("Error creating pipeline: {0}".format(error))
            raise

        # Quere the source module.
        self.source = self.pipeline.get_by_name("source")

        # Query a pointer to the appsink, so we can assign the callback function.
        appsink = self.pipeline.get_by_name("sink")
        appsink.set_property("max-buffers",5)
        appsink.set_property("drop",1)
        appsink.set_property("emit-signals",1)
        appsink.connect('new-sample', self.on_new_buffer)

#### 解説3-4
    def on_new_buffer(self, appsink):
        self.newsample = True
        if self.samplelocked is False:
            try:
                self.sample = appsink.get_property('last-sample')
                if self.ImageCallback is not None:
                    self.__convert_sample_to_numpy()
                    self.ImageCallback(self, *self.ImageCallbackData);

            except GLib.Error as error:
                print("Error on_new_buffer pipeline: {0}".format(error))
                raise
        return False

#### 解説3-5
    def setSinkFormat(self, sf: SinkFormats):
        self.sinkformat = sf

#### 解説3-6
    def showLive(self, show: bool):
        self.livedisplay = show

#### 解説3-7
    def _setcaps(self):
        """ 
        Set pixel and sink format and frame rate
        """
        caps = Gst.Caps.new_empty()
        videoformat = 'video/x-raw,format=%s,width=%d,height=%d,framerate=%s' % ( SinkFormats.toString(self.sinkformat),self.width,self.height,self.framerate,)
        structure = Gst.Structure.new_from_string(videoformat)

        caps.append_structure(structure)
        
        capsfilter = self.pipeline.get_by_name("caps")
        capsfilter.set_property("caps", caps)

#### 解説3-8
    def Start_pipeline(self):
        """
        Start the pipeline, so the video runs
        """
        try:
            self._setcaps()
            self.pipeline.set_state(Gst.State.PLAYING)
            error = self.pipeline.get_state(5000000000) 
            if error[1] != Gst.State.PLAYING:
                print("Error starting pipeline. {0}".format("") )    
                return False

        except: # GError as error:
            print("Error starting pipeline: {0}".format("unknown too"))
            raise
        return True

#### 解説3-9
    def __convert_sample_to_numpy(self):
        ''' Convert a GStreamer sample to a numpy array
            Sample code from https://gist.github.com/cbenhagen/76b24573fa63e7492fb6#file-gst-appsink-opencv-py-L34

            The result is in self.img_mat.
        :return:
        '''
        self.samplelocked = True
        buf = self.sample.get_buffer()
        caps = self.sample.get_caps()
        mem = buf.get_all_memory()
        success, info = mem.map(Gst.MapFlags.READ)
        if success:
            data = info.data
            mem.unmap(info)
                
            bpp = 4
            dtype = numpy.uint8
            
            if( caps.get_structure(0).get_value('format') == "BGRx" ):
                bpp = 4

            if(caps.get_structure(0).get_value('format') == "GRAY8" ):
                bpp = 1

            if(caps.get_structure(0).get_value('format') == "GRAY16_LE" ):
                bpp = 1
                dtype = numpy.uint16

            self.img_mat = numpy.ndarray(
                (caps.get_structure(0).get_value('height'),
                caps.get_structure(0).get_value('width'),
                bpp),
                buffer=data,
                dtype=dtype)
            self.newsample = False
            self.samplelocked = False

#### 解説3-10
    def wait_for_image(self, timeout):
        ''' Wait for a new image with timeout
        :param timeout: wait time in second, should be a float number
        :return:
        '''
        tries = 10
        while tries > 0 and not self.newsample:
            tries -= 1
            time.sleep(float(timeout) / 10.0)

#### 解説3-11
    def Snap_image(self, timeout):
        '''
        Snap an image from stream using a timeout.
        :param timeout: wait time in second, should be a float number. Not used
        :return: bool: True, if we got a new image, otherwise false.
        '''
        if self.ImageCallback is not None:
            print("Snap_image can not be called, if a callback is set.")
            return False

        self.wait_for_image(timeout)
        if self.sample is not None and self.newsample:
            self.__convert_sample_to_numpy()
            return True

        return False

#### 解説3-12
    def Get_image(self):
        return self.img_mat

#### 解説3-13
    def Stop_pipeline(self):
        self.pipeline.set_state(Gst.State.PAUSED)
        self.pipeline.set_state(Gst.State.READY)
        
#### 解説3-14
    def List_Properties(self):
        property_names = self.source.get_tcam_property_names()

        for name in property_names:
            try:
                base = self.source.get_tcam_property(name)
                print("{}\t{}".format(base.get_display_name(),
                                      name
                                        ))
            except Exception as error:
                raise Exception( name + " : " + error.message )

#### 解説3-15
    def get_source(self):
        return self.source

    def Get_Property(self, PropertyName):
        try:
            baseproperty = self.source.get_tcam_property(PropertyName)
            val = baseproperty.get_value()
            return val

        except GLib.Error as error:
            raise Exception(PropertyName + " : " + error.message )
            return 0
        return 0

#### 解説3-16
    def Set_Property(self, PropertyName, value):

        try:
            baseproperty = self.source.get_tcam_property(PropertyName)
            baseproperty.set_value(value)
        except GLib.Error as error:
            raise Exception(PropertyName + " : " + error.message )


#### 解説3-17
    def Set_Image_Callback(self, function, *data):
        self.ImageCallback = function
        self.ImageCallbackData = data

#### 解説3-18
    def selectDevice(self):
        ''' Select a camera, its video format and frame rate
        :return: True on success, False on nothing selected
        '''
        monitor = Gst.DeviceMonitor.new()
        monitor.add_filter("Video/Source/tcam")
        serials = []
        i = 0
        for device in monitor.get_devices():
            struc = device.get_properties()
            i +=1
            print("{} : Model: {} Serial: {} {} ".format(i, 
                                                         struc.get_string("model"),
                                                         struc.get_string("serial"),
                                                         struc.get_string("type")
                                                        ))

            serials.append("{}-{}".format(struc.get_string("serial"),
                                          struc.get_string("type")
                                          ))

        if i > 0:
            i = int(input("Select : "))
            if i == 0:
                return False

            self.serialnumber = serials[i-1]
            print(self.serialnumber)
            
            return self.selectFormat()

        return False

#### 解説3-19
    def selectFormat(self):
        '''
        '''
        formats = self.createFormats()
        i = 0
        f = [] 
        for key, value in formats.items():
            f.append(key)
            i = i + 1
            print("{}: {}".format(i, key))

        i = int(input("Select : "))
        if i == 0:
            return False

        formatindex = f[i-1]
        i = 0
        for res in formats[formatindex].res_list:
            i = i + 1
            print("{}:  {}x{}".format(i, res.width, res.height))

        i = int(input("Select : "))
        if i == 0:
            return False


        width = formats[formatindex].res_list[i-1].width
        height = formats[formatindex].res_list[i-1].height
        o = 0
        for rate in formats[formatindex].res_list[i-1].fps :
            o += 1
            print("{}:  {}".format(o, rate))

        framerate = formats[formatindex].res_list[i-1].fps[o-1] 
        o = int(input("Select : "))
        if o == 0:
            return False

        framerate = formats[formatindex].res_list[i-1].fps[o-1] 
        #print(format,width,height,framerate )
        self.openDevice(self.serialnumber, width, height, framerate, SinkFormats.BGRA, True)
        return True

#### 解説3-20
    def createFormats(self):
        source = Gst.ElementFactory.make("tcambin")
        source.set_property("serial", self.serialnumber)
        
        source.set_state(Gst.State.READY)

        caps = source.get_static_pad("src").query_caps()
        format_dict = {}

        for x in range(caps.get_size()):
            structure = caps.get_structure(x)
            name = structure.get_name()
            try:
                videoformat = structure.get_value("format")

                if videoformat not in format_dict:
                    format_dict[videoformat] = FmtDesc(name, videoformat)
                    

                width = structure.get_value("width")
                height = structure.get_value("height")       

                rates = self.get_framerates(structure)
                tmprates = []

                for rate in rates:
                    tmprates.append(str(rate))

                format_dict[videoformat].res_list.append(ResDesc(width, height, tmprates))      

            except:
                print("Except")
                pass

        source.set_state(Gst.State.NULL)
        source.set_property("serial", "")
        source = None

        return format_dict

#### 解説3-21
    def get_framerates(self, fmt):
        try:
            tmprates = fmt.get_value("framerate")
            if type(tmprates) == Gst.FractionRange:
                # A range is given only, so create a list of frame rate in 10 fps steps:
                rates = []
                rates.append("{0}/{1}".format(int(tmprates.start.num),int(tmprates.start.denom)))
                r = int((tmprates.start.num + 10) / 10) * 10
                while r < (tmprates.stop.num / tmprates.stop.denom ):
                    rates.append("{0}/1".format(r))
                    r += 10

                rates.append("{0}/{1}".format(int(tmprates.stop.num),int(tmprates.stop.denom)))
            else:
                rates = tmprates

        except TypeError:
            # Workaround for missing GstValueList support in GI
            substr = fmt.to_string()[fmt.to_string().find("framerate="):]
            # try for frame rate lists
            field, values, remain = re.split("{|}", substr, maxsplit=3)
            rates = [x.strip() for x in values.split(",")]
        return rates
#### 解説3-22
    def execute_command(self, PropertyName ):
        try:
            baseproperty = self.source.get_tcam_property(PropertyName)
            baseproperty.set_command()
        except GLib.Error as error:
            raise Exception( PropertyName + " : " + error.message )


#### 解説4
class ResDesc:
    """"""
    def __init__(self,                 
                 width: int,
                 height: int,
                 fps: list):
        self.width = width
        self.height = height
        self.fps = fps

#### 解説5
#### 解説5-1
class FmtDesc:
    """"""

    def __init__(self,
                 name: str = "",
                 fmt: str = ""):
        self.name = name
        self.fmt = fmt
        self.res_list = []

#### 解説5-2
    def get_name(self):
        if self.name == "image/jpeg":
            return "jpeg"
        else:
            return self.fmt

#### 解説5-3
    def get_resolution_list(self):

        res_list = []

        for entry in self.res_list:
            res_list.append(entry.resolution)

        return res_list

#### 解説5-4
    def get_fps_list(self, resolution: str):

        for entry in self.res_list:
            if entry.resolution == resolution:
                return entry.fps

#### 解説5-5
    def generate_caps_string(self, resolution: str, fps: str):
        if self.name == "image/jpeg":
            return "{},width={},height={},framerate={}".format(self.name,
                                                               resolution.split('x')[0],
                                                               resolution.split('x')[1],
                                                               fps)
        else:
            return "{},format={},width={},height={},framerate={}".format(self.name,
                                                                         self.fmt,
                                                                         resolution.split('x')[0],
                                                                         resolution.split('x')[1],
                                                                         fps)

解説1:importで宣言する

import time
from collections import namedtuple

import gi
import re
import numpy
from enum import Enum
gi.require_version("Gst", "1.0")
gi.require_version("Tcam", "1.0") #version 1.0のみ対応

from gi.repository import GLib, GObject, Gst, Tcam

#namedtupleでクラスを作成する
DeviceInfo = namedtuple("DeviceInfo", "status name identifier connection_type")
CameraProperty = namedtuple("CameraProperty", "status value min max default step type flags category group")

Pythonで下記のライブラリを使用するためにimportを使います。

time プログラム内で経過時間を測定するために使用します。
namedtuple データを格納するだけのクラスを作るために宣言します。
gi Python APIを使用します。
re 文字列の書式チェックなど正規表現を行うために使用する。
numpy Pythonでの機械学習の計算をより速く、効率的に行えるようにするために使用します。
Enum 決められた複数の選択肢から値を指定する際に利用します。

tiscameraのバージョンが1.0以上であるかどうか確認するためにgi.require_version("Tcam", "1.0")でバージョンを確認し、バージョンが1.0未満の場合はプログラムが実行されないようにしています。

解説2:使用できるカラーフォーマットを列挙型で宣言

class SinkFormats(Enum):
    GRAY8 = 0
    GRAY16_LE = 1
    BGRA = 2

    def toString(pf):
        if( pf == SinkFormats.GRAY16_LE ):
            return "GRAY16_LE"

        if( pf == SinkFormats.GRAY8 ):
            return "GRAY8"

        if( pf == SinkFormats.BGRA ):
            return "BGRx"

        return "BGRx"

    def fromString(pf):
        if( pf == "GRAY16_LE"):
            return SinkFormats.GRAY16_LE
        if( pf == "GRAY8"):
            return SinkFormats.GRAY8
      
        return SinkFormats.BGRA

与えられた引数の値によって、GRAY8(モノクロ8bit)、GRAY16_LE(モノクロ16bit)、BGRA(カラー8bit)のいずれに対応しているのか場合分けし、toStringの場合はプロパティ値から文字列を返し、fromStringの場合は文字列からプロパティ値を返すようにしています。

解説3:カメラ制御用クラスの定義

解説3-1:カメラの初期化

class TIS
    def __init__(self):
        Gst.init([]) 
        self.serialnumber = ""
        self.height = 0
        self.width = 0
        self.framerate="15/1"
        self.livedisplay = True
        self.sinkformat = SinkFormats.BGRA
        self.sample = None
        self.samplelocked = False
        self.newsample = False
        self.img_mat = None
        self.ImageCallback = None
        self.pipeline = None

カメラで扱う撮影機能や露光時間などの設定をするためのコードをまとめて「オブジェクト」として扱う(オブジェクト指向として扱う)ために、オブジェクトを生成する時のひな型としてTISクラスを宣言します。

上記でオブジェクトが扱う変数などの初期化を行います(コンストラクタ関数)。クラス名を呼び出すことで、自動的に上記の関数が呼び出されカメラのシリアル番号・解像度・フレームレート・表示の有無・カラーフォーマット・コールバック関数の有無・パイプラインに関して初期化しています。

解説3-2:カメラデバイスを開く


    def openDevice(self,serial, width, height, framerate, sinkformat: SinkFormats, showvideo: bool):
        self.serialnumber = serial
        self.height = height
        self.width = width
        self.framerate = framerate
        self.sinkformat = sinkformat
        self.livedisplay = showvideo
        self._createPipeline() #解説3-3で説明
        self.source.set_property("serial", self.serialnumber)
        self.pipeline.set_state(Gst.State.READY)

カメラをオープンする関数です。カメラをオープンするために下記の引数を与えます。

  • シリアル番号
  • 横の解像度
  • 縦の解像度
  • フレームレート
  • カラーフォーマット
  • 画面上にライブ表示するか(TrueあるいはFalse)

例えば、下記のように指定することでGstreamerのパイプラインを気にすることなく、カメラを開くことができます。

例:

Tis.openDevice("34819936-v4l2", 640, 480, "30/1",TIS.SinkFormats.BGRA, True)

解説3-3:パイプラインを作成する

    def _createPipeline(self):
        p = 'tcambin name=source ! capsfilter name=caps'
        if self.livedisplay is True:
            p += " ! tee name=t"
            p += " t. ! queue ! videoconvert ! ximagesink"
            p += " t. ! queue ! appsink name=sink"
        else:
            p += ' ! appsink name=sink'

        print(p)
        try:
            self.pipeline = Gst.parse_launch(p)
        except GLib.Error as error:
            print("Error creating pipeline: {0}".format(error))
            raise

        # Gstreamerのsourceエレメントを参照します
        self.source = self.pipeline.get_by_name("source")
        # Gstreamerのsinkエレメントを参照します
        self.appsink = self.pipeline.get_by_name("sink")

        # コールバック関数を割り当てることができるように、appsinkへのポインタを参照します。
        self.appsink.set_property("max-buffers",5)
        self.appsink.set_property("drop",1)
        self.appsink.set_property("emit-signals",1)
        self.appsink.connect('new-sample', self.on_new_buffer) ##解説3.4で説明

解説3-2内で指定したカメラのパラメータ(シリアル番号や解像度)に従ってパイプラインを作成します。

まず、ライブ表示する場合には下記のコマンドラインをGst.parse_launchでセットします。シンクにはappsinkを指定します。このAppsinkは、アプリケーションにパイプライン内のGStreamerデータのハンドルを取得させるためのさまざまなメソッドをサポートするシンクプラグインです。capsにはカラーフォーマット・解像度・フレームレートが設定され、解説3-8のパイプラインを開始時にセットされます。

tcambin name=source ! capsfilter name=caps ! tee name=t t. ! queue ! videoconvert ! ximagesink t. ! queue ! appsink name=sink

ライブ表示しない場合には下記のコマンドラインをGst.parse_launchでセットします。

tcambin name=source ! capsfilter name=caps ! appsink name=sink

このとき、Gstreamerのsource側に「name=source」、sink側に名称を付けて、 下記のようにget_by_nameを使ってプロパティ値と紐づけることでPythonでもGstreamerのパイプラインを制御することが可能です。

# Gstreamerのsourceエレメントを参照します
        self.source = self.pipeline.get_by_name("source")
        # Gstreamerのsinkエレメントを参照します
        self.appsink = self.pipeline.get_by_name("sink")

なお、appsinkでカメラから画像を送る際にPC内のバッファメモリを使用します。画像を受け取った後のプログラムの動作が遅くなっているときには大量のメモリを消費しますので、”self.appsink.set_property("max-buffers",5)”のようにmax-buffersのサイズを制限します。なお、最大のバッファメモリサイズに達したときに古いバッファを捨てるために"self.appsink.set_property("drop",1)"のように指定します(設定しないとバッファメモリ使用せずに送られてくる画像を拒否されます)。また、emit-signals プロパティを"self.appsink.set_property("emit-signals",1)"のように設定すると、カメラから送られてくる画像を取得できるときに、コールバック関数を呼び出されるようになります。最後に"self.appsink.connect('new-sample', self.on_new_buffer)"で使用するコールバック関数[on_new_buffer]を定義することでパイプラインの設定が完了します。

ワンポイントアドバイス

パイプラインの書き換え(Nvidiaのハードウェアエンコードをしたい場合)

def _create_pipeline(self, conversion: str, showvideo: bool):
    if conversion and not conversion.strip().endswith("!"):
        conversion += " !"

    p = 'tcambin name=source num-buffers=600 ! capsfilter name=caps'
   
    if showvideo:
    p += " ! tee name=t"
        p += " t. ! queue ! nvvidconv ! nveglglessink"
        p += " t. ! queue ! nvvidconv ! nvv4l2h265enc" \
            " ! h265parse !  qtmux ! filesink location=Videoh265HW.mp4 name=sink"
    else:
        p += " ! nvvidconv ! nvv4l2h265enc" \
            " ! h265parse !  qtmux ! filesink location=Videoh265HW.mp4 name=sink"


    print(p)
    try:
        self.pipeline = Gst.parse_launch(p)
    except GLib.Error as error:
        print("Error creating pipeline: {0}".format(error))
        raise

    # Quere the source module.
    self.source = self.pipeline.get_by_name("source")

    # Query a pointer to the appsink, so we can assign the callback function.
    #appsink = self.pipeline.get_by_name("sink")
    #appsink.set_property("max-buffers", 5)
    #appsink.set_property("drop", True)
    #appsink.set_property("emit-signals", True)
    #appsink.set_property("enable-last-sample", True)
    #appsink.connect('new-sample', self.__on_new_buffer)
    self.appsink = self.pipeline.get_by_name("sink")

上記のように変数pを置き換えることでハードウェアエンコード・録画用のパイプラインに変更することができます。なお、変数pのパイプラインはそれぞれ下記を示しています。

プラグイン/エレメント 説明
tcambin TISが提供しているGstreamerプラグインで、tiscameraでサポートされているすべてのGstreamerの要素をラップしオールインワンで簡単に処理できるようにしています。
capsfilter name=caps ビデオフォーマットの幅、ビデオフォーマットの高さ、文字列としてフレームレート、カラーフォーマットの指定をします。
tee name=t t. データを複数のパッドに分割しパイプラインを複数に分けます。tee を使った時には、queue を使う必要があります。name=tはTeeの代わりにtで宣言しており、teeの代わりにtと省略するために使用しています。
nvvidconv H.265の変換処理するためのフォーマット変換、スケーリング処理です。
nveglglessink nvidiaのウィンドウ表示用のsinkです。
nvv4l2h265enc プロパティ:iframeinterval
圧縮されていないフレームが差し込まれる間隔。Iframeintervalの値が小さいほど、動画の早送り・巻き戻しがサクサクできる(ただしファイルサイズが大きくなる)
h265parse MP4コンテナに格納するためにH.265ストリームを変換します。
qtmux MP4コンテナに格納する場合、qtmuxエレメントを使用します。
filesink ファイルへの書き込みをします。 location=Video_h265HW.mp4でファイル名を指定しています。

ハードウェアエンコードの詳細については下記を参照してください。
https://www.argocorp.com/UVC_camera/Jetson_hardware_encoder.html
MP4の動画を保存するためには、解説3-8のワンポイントアドバイスも変更していただく必要があります。

解説3-4:コールバック関数の定義

    def on_new_buffer(self, appsink):
        self.newsample = True
        if self.samplelocked is False:
            try:
                self.sample = appsink.get_property('last-sample')
                if self.ImageCallback is not None:
                    self.__convert_sample_to_numpy() ## 解説3-9で説明
                    self.ImageCallback(self, *self.ImageCallbackData);

            except GLib.Error as error:
                print("Error on_new_buffer pipeline: {0}".format(error))
                raise
        return False

コールバック関数を定義しています。__convert_sample_to_numpy関数(解説3-9で詳細について説明)で処理をしていなければ、画像を保持するようにしています。

解説3-5:カラーフォーマットの定義

    def setSinkFormat( self, sf: SinkFormats):
        self.sinkformat = sf

この関数を使って、カラーフォーマットをGRAY8(モノクロ8bit)、GRAY16_LE(モノクロ16bit)、BGRA(カラー8bit)のいずれにセットします。

解説3-6:画面上にライブ表示するかの定義

    def showLive( self, show: bool):
        self.livedisplay = show

この関数を使って、画面上にカメラから取得した画像を表示するかどうかをセットします。

解説3-7:解像度・フレームレート・カラーフォーマットの定義

    def _setcaps(self):
        caps = Gst.Caps.new_empty()
        format = 'video/x-raw,format=%s,width=%d,height=%d,framerate=%s' % ( SinkFormats.toString(self.sinkformat),self.width,self.height,self.framerate,)
        structure = Gst.Structure.new_from_string(format)

        caps.append_structure(structure)
        
        capsfilter = self.pipeline.get_by_name("caps")
        capsfilter.set_property("caps", caps)

Gstremaerのパイプラインに解像度・フレームレート・カラーフォーマットを設定するための関数です。パイプラインがスタートされる前に呼び出されます(詳細は解説3-8を参照)。

解説3-8:パイプラインを開始

    def Start_pipeline(self):
        try:
            self._setcaps()
            self.pipeline.set_state(Gst.State.PLAYING)
            error = self.pipeline.get_state(5000000000) 
            if error[1] != Gst.State.PLAYING:
                print("Error starting pipeline. {0}".format("") )    
                return False

        except: # GError as error:
            print("Error starting pipeline: {0}".format("unknown too"))
            raise
        return True

解説3-7で設定した解像度・フレームレート・カラーフォーマットをセットして、解説3-3で設定したパイプラインを開始します。

get_stateでタイムアウトの時間をナノ秒単位で設定します。ここではタイムアウトを5,000,000,000ナノ秒(=5秒)に設定しています。

ワンポイントアドバイス

EOSイベントの追加(Nvidiaのハードウェアエンコードをしたい場合)

def start_pipeline(self):
   """
   Start the pipeline, so the video runs
   """
   self._setcaps()
   ret = self.pipeline.set_state(Gst.State.PLAYING)
   
   #↓変更点
   print("Waiting time:10seconds")
   time.sleep(10)
   #EOS(ストリーム終了メッセージ)を送る
   self.pipeline.send_event(Gst.Event.new_eos())
   #パイプラインから送られてくるメッセージを受け取る
   self.pipeline.get_bus()
   time.sleep(1)
   #↑変更点
   error = self.pipeline.get_state(5000000000)
   if error[1] != Gst.State.PLAYING:
       print("Error starting pipeline. {0}".format(""))
       
   return False
  return True

EOS(End of Stream)イベントはパイプラインに対して、動画ストリームが終了したときに各エレメントからアプリケーションにメッセージを送るように命令しています。アプリケーション側でパイプラインからメッセージを受け取り、パイプラインを適切なタイミングで停止するようにしています。

ライブ表示などのアプリケーションの場合にはEOSイベントは必要ありませんが、FilesinkでMP4動画を保存する(パイプラインを終了した後にヘッダー情報を更新するような)アプリケーション場合、EOSイベントを使用する必要があります。EOSイベントを使用しない場合にはMP4ファイルは壊れた状態で保存されてしまいます。

保存時間はライブスタートしてからEOSイベントを呼び出す時間までの間となるので上記のようにtime.sleepで動画の保存時間を指定してください。

解説3-9:画像をnumpy配列に変換する

    def __convert_sample_to_numpy(self):
        self.samplelocked = True
        buf = self.sample.get_buffer()
        caps = self.sample.get_caps()
        mem = buf.get_all_memory()
        success, info = mem.map(Gst.MapFlags.READ)
        if success:
            data = info.data
            mem.unmap(info)
                
            bpp = 4
            dtype = numpy.uint8
            bla = caps.get_structure(0).get_value('height')
            if( caps.get_structure(0).get_value('format') == "BGRx" ):
                bpp = 4

            if(caps.get_structure(0).get_value('format') == "GRAY8" ):
                bpp = 1

            if(caps.get_structure(0).get_value('format') == "GRAY16_LE" ):
                bpp = 1
                dtype = numpy.uint16

            self.img_mat = numpy.ndarray(
                (caps.get_structure(0).get_value('height'),
                caps.get_structure(0).get_value('width'),
                bpp),
                buffer=data,
                dtype=dtype)
            self.newsample = False
            self.samplelocked = False

取得した画像のバッファにアクセスするには、get_all_memoryでメモリブロックを取得した後にmapで最初にバッファのアドレスを取得します。なおmapは、指定したsize分のデータが書き込まれたアドレスを返します。メモリを使い終わったらunmapで開放するようにしてください。

次にカラーフォーマットによって1ピクセル当たりのデータ量を計算しています。データ量はそれぞれ下記の通りです。

GRAY8 モノクロ8bit(1Byte)
GRAY16_LE モノクロ16bit(2Byteだが16 ビット符号なし(uint16)として処理するため、uint16の1Byteとして処理)
BGRx カラー8bit(4Byte RGBAそれぞれ8bit)

最後にMat形式で画像を格納するためにnumpy配列に変換しています。

解説3-10:画像が送られてくるのを待つ

    def wait_for_image(self,timeout):
        tries = 10
        while tries > 0 and not self.newsample:
            tries -= 1
            time.sleep(float(timeout) / 10.0)

解説3-11で画像を取得していないときに待つ時間を指定しています。引数の"timeout"には秒単位の任意の数を指定することができます。この関数内では指定した時間内に10回画像が届いているのか確認しています。例えば、"timeout"の時間が1秒であれば、0.1秒ごとにカメラから画像が送られているのか確認することになります。

解説3-11:コールバック関数で取得した画像を待ち、numpy形式に変換

    def setSinkFormat( self, sf: SinkFormats):
        self.sinkformat = sf

コールバック関数で取得した画像が来るのを待ち、画像を取得したら解説3-9で説明した__convert_sample_to_numpyを呼び出してMat形式の画像を取得します。

解説3-12:Mat形式を返す

     def Get_image(self):
        return self.img_mat

Mat形式の画像を返します。

解説3-13:パイプラインを停止

    def Stop_pipeline(self):
        self.pipeline.set_state(Gst.State.PAUSED)
        self.pipeline.set_state(Gst.State.READY)

パイプラインを停止します。

解説3-14:プロパティの一覧をすべて表示する

    def List_Properties(self):
        property_names = self.source.get_tcam_property_names()

        for name in property_names:
            try:
                base = self.source.get_tcam_property(name)
                print("{}\t{}".format(base.get_display_name(),
                                      name
                                        ))
            except Exception as error:
                raise Exception( name + " : " + error.message )

get_tcam_property_namesを使ってカメラにあるプロパティ値すべてをターミナル画面上に表示します。
version1.0.0より古いバージョンであれば、get_display_nameの機能がありませんので機能しません。

解説3-15:指定したプロパティの値を取得する

    def get_source(self):
        return self.source

    def Get_Property(self, PropertyName):
        try:
            baseproperty = self.source.get_tcam_property(PropertyName)
            val = baseproperty.get_value()
            return val

        except Exception as error:
            raise Exception( PropertyName + " : " + error.message )

引数"PropertyName"で指定したプロパティ値を返します。
コマンドラインにて「tcam-ctrl -p 」を実行することで取得できるプロパティ値を取得できます。

また、キャプチャソフトウェアtcam-captureを使用することでも「info」->「state」タブにて現在使用している設定している一覧を取得することもできます。

解説3-16:指定したプロパティの値を設定する

    def Set_Property(self, PropertyName, value):
        try:
            baseproperty = self.source.get_tcam_property(PropertyName)
            baseproperty.set_value(value)
        except GLib.Error as error:
            raise Exception( PropertyName + " : " + error.message )

引数"PropertyName"で指定したプロパティ値を設定します。解説3-15と同様にコマンドラインにて「tcam-ctrl -p」を実行することで設定できるプロパティ値を取得できます。

解説3-17:コールバック関数をセット

    def Set_Image_Callback(self, function, *data):
        self.ImageCallback = function
        self.ImageCallbackData = data

下記のように第一引数にはコールバック関数の関数名、第二引数にはユーザークラスを設定することでコールバック関数とコールバック関数に紐づいたユーザー変数を定義することができます。

例:

Tis.Set_Image_Callback(on_new_image, CD)

解説3-18:PCに接続しているカメラを選択する

    def selectDevice(self):
        monitor = Gst.DeviceMonitor.new()
        monitor.add_filter("Video/Source/tcam")
        serials = []
        i = 0
        for device in monitor.get_devices():
            struc = device.get_properties()
            i +=1
            print("{} : Model: {} Serial: {} {} ".format(i, 
                                                        struc.get_string("model"),
                                                        struc.get_string("serial"),
                                                        struc.get_string("type")
                                                        ))

            serials.append("{}-{}".format(struc.get_string("serial"),
                                        struc.get_string("type")
                                        ))

        if i > 0:
            i = int(input("Select : "))
            if i == 0:
                return False

            self.serialnumber = serials[i-1]
            print(self.serialnumber)
            
            return self.selectFormat() ##解説3-19で説明

        return False

"monitor.add_filter("Video/Source/tcam")"で"Video/Source/tcam"に接続されているTheImagingSource社のデバイスを取得します。その後、接続されているデバイスをすべてコマンドラインで表示し、番号を指定することで任意のデバイスを開くことができます。デバイスを開くことができたら解説3-19のselectFormatにデバイス情報を渡してカメラの解像度・フレームレートなどをコマンドラインに表示されているリストから指定します。なお、version1.0.0より古いバージョンであれば、DeviceMonitorの機能がありませんので機能しません。

解説3-19:カメラのカラーフォーマット・解像度・フレームレートを一覧から選択

    def selectFormat(self):
        formats = self.createFormats() ##解説3-20で説明
        i = 0
        f =[] 
        for key,value in formats.items():
            f.append(key)
            i = i + 1
            print("{}: {}".format(i, key))

        i = int(input("Select : "))
        if i == 0:
            return False

        format = f[i-1] 
        i = 0
        for res in formats[format].res_list:
            i = i + 1
            print("{}:  {}x{}".format(i, res.width,res.height))

        i = int(input("Select : "))
        if i == 0:
            return False


        width=formats[format].res_list[i-1].width
        height =formats[format].res_list[i-1].height
        o = 0
        for rate in formats[format].res_list[i-1].fps :
            o += 1
            print("{}:  {}".format(o, rate))

        framerate = formats[format].res_list[i-1].fps[o-1] 
        o = int(input("Select : "))
        if o == 0:
            return False

        framerate = formats[format].res_list[i-1].fps[o-1] 
        #print(format,width,height,framerate )
        self.openDevice(self.serialnumber, width, height, framerate, SinkFormats.BGRA, True)
        return True

選択可能なカラーフォーマット・解像度・フレームレートをコマンドラインに表示して、input関数を使って指定できるようにします。それぞれの項目を指定した後に、解説3-2で説明したopenDeviceでカメラをオープンしています。

解説3-20:使用できるカメラのカラーフォーマット・解像度・フレームレートを取得

    def createFormats(self):
        source = Gst.ElementFactory.make("tcambin")
        source.set_property("serial", self.serialnumber)
        
        source.set_state(Gst.State.READY)

        caps = source.get_static_pad("src").query_caps()
        format_dict = {}

        for x in range(caps.get_size()):
            structure = caps.get_structure(x)
            name = structure.get_name()
            try:
                format = structure.get_value("format")

                if format not in format_dict:
                    format_dict[format] = FmtDesc(name, format)
                    

                width = structure.get_value("width")
                height = structure.get_value("height")       

                rates = self.get_framerates(structure)
                r = []

                for rate in rates:
                    r.append(str(rate))

                format_dict[format].res_list.append(ResDesc(width,height,r))     ## 解説4にて説明

            except:
                print("Except")
                pass

        source.set_state(Gst.State.NULL)
        source.set_property("serial", "")
        source = None

        return format_dict

解説3-19で設定するために事前にカメラで設定できるカラーフォーマット・解像度・フレームレートを事前に取得しています。

解説3-21:フレームレート一覧を取得

    def get_framerates(self, fmt):
        try:
            tmprates = fmt.get_value("framerate")
            if  type(tmprates) == Gst.FractionRange:
                # 10fpsステップでフレームレートのリストを作成
                rates = []
                rates.append("{0}/{1}".format(int(tmprates.start.num),int(tmprates.start.denom)))
                r = int( (tmprates.start.num + 10) / 10 ) * 10
                while r < (tmprates.stop.num / tmprates.stop.denom ):
                    rates.append("{0}/1".format(r))
                    r += 10

                rates.append("{0}/{1}".format(int(tmprates.stop.num),int(tmprates.stop.denom)))
            else:
                rates = tmprates

        except TypeError:
            substr = fmt.to_string()[fmt.to_string().find("framerate="):]
            field, values, remain = re.split("{|}", substr, maxsplit=3)
            rates = [x.strip() for x in values.split(",")]
        return rates

解像度によって使用できるフレームレートが異なるので、解像度にあったフレームレート一覧を取得しています。

解説3-22:ソフトウエアトリガーをセット

    def execute_command(self, PropertyName ):
        try:
            baseproperty = self.source.get_tcam_property(PropertyName)
            baseproperty.set_command()
        except GLib.Error as error:
            raise Exception( PropertyName + " : " + error.message )

ソフトウエアトリガーを実行するための関数です。set_commandを使用するために宣言しています。

解説4:解像度とフレームレートを設定

class ResDesc:
    def __init__(self,                 
                 width: int,
                 height: int,
                 fps: list):
        self.width = width
        self.height = height
        self.fps = fps

設定できる解像度・フレームレートを一纏めにして扱えるようにクラス化しています。

解像度・フレームレートを一時的に保存するために用意しています。

解説5:カメラの解像度・フレームレートのリスト取得

下記はカメラで使用できる解像度とフレームレートを取得するためのクラスです。カメラは解像度によって使えるフレームレートが変わるので、クラス化して簡単に扱えるようにしています。

解説5-1:リストの初期化

class FmtDesc:
    def __init__(self,
                 name: str = "",
                 fmt: str = ""):
        self.name = name
        self.fmt = fmt
        self.res_list = []

カラーフォーマットとリストを初期化します。

解説5-2:カラーフォーマットのリストを取得する

    def get_name(self):
        if self.name == "image/jpeg":
            return "jpeg"
        else:
            return self.fmt

コマンドラインに表示するためのカラーフォーマットを取得します。

解説5-3:解像度のリストを取得する

    def get_resolution_list(self):
        res_list = []
        for entry in self.res_list:
            res_list.append(entry.resolution)

        return res_list

コマンドラインに表示するための解像度のリストを取得します。

解説5-4:フレームレートのリストを取得する

    def get_fps_list(self, resolution: str):

        for entry in self.res_list:
            if entry.resolution == resolution:
                return entry.fps

コマンドラインに表示するための設定できるフレームレートのリストを取得します。
設定できるフレームレートは解像度によるので条件分岐で設定できるか確認しています。

解説5-5:解像度・フレームレートを文字列に変換

def generate_caps_string(self, resolution: str, fps: str):
        if self.name == "image/jpeg":
            return "{},width={},height={},framerate={}".format(self.name,
                                                               resolution.split('x')[0],
                                                               resolution.split('x')[1],
                                                               fps)
        else:
            return "{},format={},width={},height={},framerate={}".format(self.name,
                                                                         self.fmt,
                                                                         resolution.split('x')[0],
                                                                         resolution.split('x')[1],
                                                                         fps)

formatメソッドを使って文字列の書式設定を行っています。APIから取得できる解像度は「x」が途中に入っているので、splitを使って分割し、横解像度と縦解像度に分けています。例えば、'1920x1080'の場合、width=1920、height=1080と分けてそれぞれの変数に格納して戻り値に渡すことができます。