GStreamer는 본질적으로 다중 스레드이며 스레드로부터 완전히 안전합니다. 대부분의 스레딩 내부는 애플리케이션에서 숨겨져 있으므로 애플리케이션 개발이 더 쉬워집니다. 그러나 어떤 경우에는 애플리케이션이 그 중 일부에 영향을 미치기를 원할 수도 있습니다. GStreamer를 사용하면 애플리케이션이 파이프라인의 일부 부분에서 여러 스레드를 강제로 사용할 수 있습니다. 언제 스레드를 강제로 적용하시겠습니까?를 참조하세요.
GStreamer는 스레드 우선순위나 사용할 스레드 풀 등을 구성할 수 있도록 스레드가 생성될 때 이를 알려줄 수도 있습니다. GStreamer에서 스레드 구성을 참조하세요.
GStreamer에서 스케줄링
GStreamer 파이프라인의 각 element는 스케줄링을 결정합니다. Element는 pad가 push-based 인지 pull-based 인지를 선택할 수 있습니다. 예를 들어 element는 스레드를 시작하여 sink pad에서 pulling을 시작하거나 source pad에서 pushing을 시작하도록 선택할 수 있습니다. Element는 각각 push 및 pull 모드에서 데이터 처리를 위해 업스트림 또는 다운스트림 스레드를 사용하도록 선택할 수도 있습니다. GStreamer는 element가 스케줄링 되도록 선택하는 방법에 대해 어떠한 제한도 두지 않습니다. 자세한 내용은 플러그인 작성자 가이드를 참조하세요.
어떤 경우에도 일부 element는 "스트리밍 스레드"라고 하는 데이터 처리를 위한 스레드를 시작합니다. 스트리밍 스레드 또는 GstTask 객체는 element가 스트리밍 스레드를 만들어야 할 때 GstTaskPool에서 생성됩니다. 다음 섹션에서는 task 및 pool에 대한 알림을 받는 방법을 살펴보겠습니다.
GStreamer에서 스레드 구성
스트리밍 스레드의 상태를 알리기 위해 STREAM_STATUS 메시지가 버스에 게시됩니다. 메시지에서 다음 정보를 얻을 수 있습니다.
- 새 스레드가 생성되려고 하면 GST_STREAM_STATUS_TYPE_CREATE 유형으로 이에 대한 알림을 받게 됩니다. 그러면 GstTask에서 GstTaskPool을 구성할 수 있습니다. 사용자 지정 taskpool은 스트리밍 스레드를 구현하기 위한 task에 대한 사용자 지정 스레드를 제공합니다.
- 사용자 지정 taskpool을 구성하려면 이 메시지를 동기식으로 처리해야 합니다. 이 메시지가 반환될 때 task에 taskpool을 구성하지 않으면 task은 default pool을 사용합니다.
- 스레드가 들어가거나 나갈 때. 이제 스레드 우선순위를 구성할 수 있습니다. 스레드가 삭제되면 알림도 받습니다.
- 스레드가 시작, 일시 중지 및 중지될 때 메시지를 받습니다. 이는 GUI 애플리케이션에서 스트리밍 스레드의 상태를 시각화하는 데 사용될 수 있습니다.
이제 다음 섹션에서 몇 가지 예를 살펴보겠습니다.
스레드의 우선순위 높이기
.----------. .----------.
| fakesrc | | fakesink |
| src->sink |
'----------' '----------'
위의 간단한 파이프라인을 살펴보겠습니다. 스트리밍 스레드의 우선순위를 높이고 싶습니다. Peer fakesink로 push하는 가짜 데이터를 생성하기 위해 스트리밍 스레드를 시작하는 fakesrc element가 있습니다. 우선순위를 변경하는 흐름은 다음과 같습니다.
- READY에서 PAUSED 상태로 전환할 때 fakesrc에는 데이터를 fakesink로 push하기 위한 스트리밍 스레드가 필요합니다. 스트리밍 스레드에 대한 요구 사항을 나타내는 STREAM_STATUS 메시지를 게시합니다.
- 애플리케이션은 동기화 버스 핸들러를 사용하여 STREAM_STATUS 메시지에 반응합니다. 그런 다음 메시지 내부의 GstTask에 사용자 정의 GstTaskPool을 구성합니다. 사용자 정의 taskpool은 스레드 생성을 담당합니다. 이 예에서는 우선순위가 더 높은 스레드를 만듭니다.
- 또는 동기화 메시지가 스레드 컨텍스트에서 호출되므로 스레드 ENTER / LEAVE 알림을 사용하여 현재 스레드의 우선 순위 또는 스케줄링 정책을 변경할 수 있습니다.
첫 번째 단계에서는 task에 구성할 수 있는 사용자 정의 GstTaskPool을 구현해야 합니다. 다음은 pthread를 사용하여 SCHED_RR 실시간 스레드를 생성하는 GstTaskPool 하위 클래스의 구현입니다. 실시간 스레드를 생성하려면 추가 권한이 필요할 수 있습니다.
#include <pthread.h>
typedef struct
{
pthread_t thread;
} TestRTId;
G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL);
static void
default_prepare (GstTaskPool * pool, GError ** error)
{
/* we don't do anything here. We could construct a pool of threads here that
* we could reuse later but we don't */
}
static void
default_cleanup (GstTaskPool * pool)
{
}
static gpointer
default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data,
GError ** error)
{
TestRTId *tid;
gint res;
pthread_attr_t attr;
struct sched_param param;
tid = g_new0 (TestRTId, 1);
pthread_attr_init (&attr);
if ((res = pthread_attr_setschedpolicy (&attr, SCHED_RR)) != 0)
g_warning ("setschedpolicy: failure: %p", g_strerror (res));
param.sched_priority = 50;
if ((res = pthread_attr_setschedparam (&attr, ¶m)) != 0)
g_warning ("setschedparam: failure: %p", g_strerror (res));
if ((res = pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED)) != 0)
g_warning ("setinheritsched: failure: %p", g_strerror (res));
res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data);
if (res != 0) {
g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
"Error creating thread: %s", g_strerror (res));
g_free (tid);
tid = NULL;
}
return tid;
}
static void
default_join (GstTaskPool * pool, gpointer id)
{
TestRTId *tid = (TestRTId *) id;
pthread_join (tid->thread, NULL);
g_free (tid);
}
static void
test_rt_pool_class_init (TestRTPoolClass * klass)
{
GstTaskPoolClass *gsttaskpool_class;
gsttaskpool_class = (GstTaskPoolClass *) klass;
gsttaskpool_class->prepare = default_prepare;
gsttaskpool_class->cleanup = default_cleanup;
gsttaskpool_class->push = default_push;
gsttaskpool_class->join = default_join;
}
static void
test_rt_pool_init (TestRTPool * pool)
{
}
GstTaskPool *
test_rt_pool_new (void)
{
GstTaskPool *pool;
pool = g_object_new (TEST_TYPE_RT_POOL, NULL);
return pool;
}
Taskpool 작성 시 구현해야 할 중요한 기능은 “push” 기능입니다. 구현에서는 지정된 함수를 호출하는 스레드를 시작해야 합니다. 더 복잡한 구현에서는 스레드를 생성하고 삭제하는 것이 항상 가장 빠른 작업은 아니기 때문에 일부 스레드를 pull에 유지하려고 할 수 있습니다.
다음 단계에서는 fakesrc가 원할 때 사용자 지정 taskpool을 실제로 구성해야 합니다. 이를 위해 동기화 핸들러를 사용하여 STREAM_STATUS 메시지를 가로챕니다.
static GMainLoop* loop;
static void
on_stream_status (GstBus *bus,
GstMessage *message,
gpointer user_data)
{
GstStreamStatusType type;
GstElement *owner;
const GValue *val;
GstTask *task = NULL;
gst_message_parse_stream_status (message, &type, &owner);
val = gst_message_get_stream_status_object (message);
/* see if we know how to deal with this object */
if (G_VALUE_TYPE (val) == GST_TYPE_TASK) {
task = g_value_get_object (val);
}
switch (type) {
case GST_STREAM_STATUS_TYPE_CREATE:
if (task) {
GstTaskPool *pool;
pool = test_rt_pool_new();
gst_task_set_pool (task, pool);
}
break;
default:
break;
}
}
static void
on_error (GstBus *bus,
GstMessage *message,
gpointer user_data)
{
g_message ("received ERROR");
g_main_loop_quit (loop);
}
static void
on_eos (GstBus *bus,
GstMessage *message,
gpointer user_data)
{
g_main_loop_quit (loop);
}
int
main (int argc, char *argv[])
{
GstElement *bin, *fakesrc, *fakesink;
GstBus *bus;
GstStateChangeReturn ret;
gst_init (&argc, &argv);
/* create a new bin to hold the elements */
bin = gst_pipeline_new ("pipeline");
g_assert (bin);
/* create a source */
fakesrc = gst_element_factory_make ("fakesrc", "fakesrc");
g_assert (fakesrc);
g_object_set (fakesrc, "num-buffers", 50, NULL);
/* and a sink */
fakesink = gst_element_factory_make ("fakesink", "fakesink");
g_assert (fakesink);
/* add objects to the main pipeline */
gst_bin_add_many (GST_BIN (bin), fakesrc, fakesink, NULL);
/* link the elements */
gst_element_link (fakesrc, fakesink);
loop = g_main_loop_new (NULL, FALSE);
/* get the bus, we need to install a sync handler */
bus = gst_pipeline_get_bus (GST_PIPELINE (bin));
gst_bus_enable_sync_message_emission (bus);
gst_bus_add_signal_watch (bus);
g_signal_connect (bus, "sync-message::stream-status",
(GCallback) on_stream_status, NULL);
g_signal_connect (bus, "message::error",
(GCallback) on_error, NULL);
g_signal_connect (bus, "message::eos",
(GCallback) on_eos, NULL);
/* start playing */
ret = gst_element_set_state (bin, GST_STATE_PLAYING);
if (ret != GST_STATE_CHANGE_SUCCESS) {
g_message ("failed to change state");
return -1;
}
/* Run event loop listening for bus messages until EOS or ERROR */
g_main_loop_run (loop);
/* stop the bin */
gst_element_set_state (bin, GST_STATE_NULL);
gst_object_unref (bus);
g_main_loop_unref (loop);
return 0;
}
이 프로그램은 실시간 스레드를 생성하려면 루트 권한이 필요할 수 있습니다. 스레드를 생성할 수 없으면 상태 변경 기능이 실패하며 위 애플리케이션에서 이를 포착합니다.
파이프라인에 여러 스레드가 있는 경우 여러 STREAM_STATUS 메시지를 받게 됩니다. 애플리케이션 컨텍스트에서 이 스레드의 기능이 무엇인지 파악하려면 스레드를 시작하는 pad 또는 element와 같은 메시지의 소유자를 사용해야 합니다.
언제 스레드를 강제로 적용하시겠습니까?
스레드는 element에 의해 생성되지만 파이프라인에 새 스레드를 강제 적용하려는 목적으로만 파이프라인에 element를 삽입하는 것도 가능합니다.
스레드를 강제로 사용하는데는 여러 가지 이유가 있습니다. 그러나 성능 상의 이유로 모든 element에 대해 하나의 스레드를 사용하고 싶지는 않을 것입니다. 그렇게 하면 약간의 오버헤드가 발생하기 때문입니다. 이제 스레드가 특히 유용할 수 있는 몇 가지 상황을 나열해 보겠습니다.
- 데이터 버퍼링: 예를 들어 네트워크 스트림을 처리하거나 비디오 또는 오디오 카드와 같은 라이브 스트림에서 데이터를 기록할 때 사용합니다. 파이프라인의 다른 곳에서 짧은 중단이 발생해도 데이터 손실이 발생하지 않습니다. queue2를 사용한 네트워크 버퍼링에 대한 Stream buffering도 참조하세요.
- 출력 장치 동기화: 예를 들어 비디오 및 오디오 데이터가 모두 포함된 스트림을 재생할 때. 두 출력 모두에 스레드를 사용하면 독립적으로 실행되고 동기화가 더 좋아집니다.
위에서 우리는 "queue" element를 여러 번 언급했습니다. Queue는 스레드 사용을 강제할 수 있는 스레드 경계 element입니다. 이는 전 세계 대학의 스레딩 수업에서 배운 전통적인 공급자/소비자 모델을 사용하여 수행됩니다. 이를 통해 스레드 간 데이터 처리량을 스레드로부터 안전하게 만드는 수단으로 작동할 뿐만 아니라 버퍼로도 작동할 수 있습니다. Queue에는 특정 용도로 구성할 수 있는 여러 GObject 속성이 있습니다. 예를 들어 element에 대한 하한 및 상한 임계값을 설정할 수 있습니다. 하한 임계값 (기본값: 비활성화)보다 데이터가 적으면 출력이 차단됩니다. 데이터가 상한 임계값보다 많으면 입력을 차단하거나 데이터를 버리도록 구성한 경우 데이터가 버려집니다.
Queue를 사용하려면 (파이프라인에서 두 개의 서로 다른 스레드를 강제로 사용하려면), "queue" element를 생성하고 이를 파이프라인의 일부로 넣기만 하면 됩니다. GStreamer는 내부적으로 모든 스레딩 세부 사항을 처리합니다.
원문: Threads (gstreamer.freedesktop.org)
Threads
Threads GStreamer is inherently multi-threaded, and is fully thread-safe. Most threading internals are hidden from the application, which should make application development easier. However, in some cases, applications may want to have influence on some pa
gstreamer.freedesktop.org
'IT와 개발 > GStreamer Study' 카테고리의 다른 글
Pipeline manipulation (0) | 2024.07.19 |
---|---|
Autoplugging (2) | 2024.07.05 |
Dynamic Controllable Parameters (1) | 2024.06.21 |
Buffering (1) | 2024.06.14 |
Clocks and synchronization in GStreamer (3) | 2024.06.07 |