首页 > 代码库 > gstreamer的collectpad源码分析

gstreamer的collectpad源码分析

1. 背景:

        gstreamer的collectpad是一类特殊的pad,这类pad工作于收集模式,用于管理控制若干个pad组成的pad集合的数据同步处理。大部分的合成器(muxer)均使用collectpad来收集音视频数据,并根据可重载的收集条件判断函数对不同pad之间的数据进行处理(或同步)。

        由于collectpad中大部分处理函数均可重载(set_func),因此本文只讨论默认的处理函数。

2. 默认流程:

        collectpad的简单流程如下图:

技术分享

        不同的pad工作与不同的线程中,当某一个pad有数据到来时,会对所有pad进行判断,看看是否可以满足收集条件,如果满足收集条件就向对应的element推送数据。如果不满足收集条件,就会将该线程挂起,等待其他线程的数据。

        当某个pad处于挂起时,其他pad收到数据后,一样会对收集条件进行判断,如果满足条件,会将所有pad的数据推送至element,同时广播条件变量,唤醒所有挂起中的其他pad(线程)。

        简单的函数调用关系如下:

技术分享

3. 数据结构:

        数据结构如下:一个_GstCollectPads中维护了一个_GstCollectData的链表,每个pad对应一个_GstCollectData,其中记录了pad中的数据的时间戳,buffer,已经对应pad的状态(如锁、等待等标志位),GstCollectPadsPrivate中则记录了collectpad中注册的各种事件回调函数,这里的回调函数都有接口可以进行重载。此外,GstCollectPadsPrivate还维护了线程间同步用的锁和条件变量。

/**
 * GstCollectPads:
 * @data: (element-type GstBase.CollectData): #GList of #GstCollectData managed
 *   by this #GstCollectPads.
 *
 * Collectpads object.
 */
struct _GstCollectPads {
  /* 基类。  */
  GstObject      object;

  /*< public >*/ /* with LOCK and/or STREAM_LOCK */
  /* 所有PAD的集合。  */
  /*
    * GstCollectData:
    * @collect: owner #GstCollectPads
    * @pad: #GstPad managed by this data
    * @buffer: currently queued buffer.
    * @pos: position in the buffer
    * @segment: last segment received.
    * @dts: the signed version of the DTS converted to running time. To access
    *       this memeber, use %GST_COLLECT_PADS_DTS macro. (Since 1.6)
    *
    * Structure used by the collect_pads.
    struct _GstCollectData
    {
      /* with STREAM_LOCK of @collect */
      /* 指向回collectpad。  */
      GstCollectPads        *collect;
      GstPad                *pad;
      GstBuffer             *buffer;
      guint                  pos;
      GstSegment             segment;
    
      /*< private >*/
      /* state: bitfield for easier extension;
       * eos, flushing, new_segment, waiting */
      GstCollectPadsStateFlags    state;
    
      GstCollectDataPrivate *priv;
    
      union {
        struct {
          /*< public >*/
          gint64 dts;
          /*< private >*/
        } abi;
        gpointer _gst_reserved[GST_PADDING];
      } ABI;
    };
   */
  GSList        *data;                  /* list of CollectData items */

  /*< private >*/
  GRecMutex      stream_lock;          /* used to serialize collection among several streams */

  GstCollectPadsPrivate *priv;

  gpointer _gst_reserved[GST_PADDING];
};

4. 代码分析:

4.1 主入口函数:

        主入口函数gst_collect_pads_chain,不同pad工作于不同线程中。代码分析如下:

/* For each buffer we receive we check if our collected condition is reached
 * and if so we call the collected function. When this is done we check if
 * data has been unqueued. If data is still queued we wait holding the stream
 * lock to make sure no EOS event can happen while we are ready to be
 * collected 
 */
static GstFlowReturn
gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
  GstCollectData *data;
  GstCollectPads *pads;
  GstFlowReturn ret;
  GstBuffer **buffer_p;
  guint32 cookie;

  GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));

  /* some magic to get the managing collect_pads */
  GST_OBJECT_LOCK (pad);
  data = (GstCollectData *) gst_pad_get_element_private (pad);
  if (G_UNLIKELY (data == NULL))
    goto no_data;
  ref_data (data);
  GST_OBJECT_UNLOCK (pad);

  pads = data->collect;

  GST_COLLECT_PADS_STREAM_LOCK (pads);
  /* 状态判断。  */
  /* if not started, bail out */
  if (G_UNLIKELY (!pads->priv->started))
    goto not_started;
  /* check if this pad is flushing */
  if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
              GST_COLLECT_PADS_STATE_FLUSHING)))
    goto flushing;
  /* pad was EOS, we can refuse this data */
  if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
              GST_COLLECT_PADS_STATE_EOS)))
    goto eos;

  /* see if we need to clip */
  /* 数据前处理。  */
  if (pads->priv->clip_func) {
    GstBuffer *outbuf = NULL;
    ret =
        pads->priv->clip_func (pads, data, buffer, &outbuf,
        pads->priv->clip_user_data);
    buffer = outbuf;

    if (G_UNLIKELY (outbuf == NULL))
      goto clipped;

    if (G_UNLIKELY (ret == GST_FLOW_EOS))
      goto eos;
    else if (G_UNLIKELY (ret != GST_FLOW_OK))
      goto error;
  }

  GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
      GST_DEBUG_PAD_NAME (pad));

  /* One more pad has data queued */
  // 如果当前collectpad处于WAITING状态会将queuedpads增加
  if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
    pads->priv->queuedpads++;
  buffer_p = &data->buffer;
  gst_buffer_replace (buffer_p, buffer);

  /* update segment last position if in TIME */
  /* 更新当前pad上对应的时间信息,后续用于重新计算等待状态需要用到。  */
  if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
    GstClockTime timestamp;

    timestamp = GST_BUFFER_DTS_OR_PTS (buffer);

    if (GST_CLOCK_TIME_IS_VALID (timestamp))
      data->segment.position = timestamp;
  }

  /* While we have data queued on this pad try to collect stuff */
  do {
    /* Check if our collected condition is matched and call the collected
     * function if it is */
    /* 主要处理函数,判断收集条件是否满足,后续分析。  */
    ret = gst_collect_pads_check_collected (pads);
    /* when an error occurs, we want to report this back to the caller ASAP
     * without having to block if the buffer was not popped */
    /* 数据流处理异常,进入异常处理分支。  */
    if (G_UNLIKELY (ret != GST_FLOW_OK))
      goto error;

    /* data was consumed, we can exit and accept new data */
	/* 当buffer在check_collected函数中被消费,会在其中减少引用次数,释放buffer。
	 * 数据被处理后退出循环,等待下一次buffer到来调用chain函数。  */
    if (data->buffer == NULL)
      break;

    /* 数据未被处理,未满足数据收集条件,本pad对应线程将进行唤醒等待。  */
    /* Having the _INIT here means we don't care about any broadcast up to here
     * (most of which occur with STREAM_LOCK held, so could not have happened
     * anyway).  We do care about e.g. a remove initiated broadcast as of this
     * point.  Putting it here also makes this thread ignores any evt it raised
     * itself (as is a usual WAIT semantic).
     */
    GST_COLLECT_PADS_EVT_INIT (cookie);

    /* pad could be removed and re-added */
    unref_data (data);
    GST_OBJECT_LOCK (pad);
    if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
      goto pad_removed;
    ref_data (data);
    GST_OBJECT_UNLOCK (pad);

    GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
        GST_DEBUG_PAD_NAME (pad));

    /* wait to be collected, this must happen from another thread triggered
     * by the _chain function of another pad. We release the lock so we
     * can get stopped or flushed as well. We can however not get EOS
     * because we still hold the STREAM_LOCK.
     */
    /* 等待条件变量被唤醒。  */
    GST_COLLECT_PADS_STREAM_UNLOCK (pads);
    GST_COLLECT_PADS_EVT_WAIT (pads, cookie);
    GST_COLLECT_PADS_STREAM_LOCK (pads);

    GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));

	/* 唤醒后的状态判断。  */
    /* after a signal, we could be stopped */
    if (G_UNLIKELY (!pads->priv->started))
      goto not_started;
    /* check if this pad is flushing */
    if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
                GST_COLLECT_PADS_STATE_FLUSHING)))
      goto flushing;
  }
  while (data->buffer != NULL);

unlock_done:
  GST_COLLECT_PADS_STREAM_UNLOCK (pads);
  /* data is definitely NULL if pad_removed goto was run. */
  if (data)
    unref_data (data);
  if (buffer)
    gst_buffer_unref (buffer);
  return ret;

/* 异常状态处理。  */
pad_removed:
  {
    GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
    GST_OBJECT_UNLOCK (pad);
    ret = GST_FLOW_NOT_LINKED;
    goto unlock_done;
  }
  /* ERRORS */
no_data:
  {
    GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
    GST_OBJECT_UNLOCK (pad);
    gst_buffer_unref (buffer);
    return GST_FLOW_NOT_LINKED;
  }
not_started:
  {
    GST_DEBUG ("not started");
    gst_collect_pads_clear (pads, data);
    ret = GST_FLOW_FLUSHING;
    goto unlock_done;
  }
flushing:
  {
    GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
    gst_collect_pads_clear (pads, data);
    ret = GST_FLOW_FLUSHING;
    goto unlock_done;
  }
eos:
  {
    /* we should not post an error for this, just inform upstream that
     * we don't expect anything anymore */
    GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
    ret = GST_FLOW_EOS;
    goto unlock_done;
  }
clipped:
  {
    GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
    ret = GST_FLOW_OK;
    goto unlock_done;
  }
error:
  {
    /* we print the error, the element should post a reasonable error
     * message for fatal errors */
    GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
    gst_collect_pads_clear (pads, data);
    goto unlock_done;
  }
}
4.2 框架上的收集条件判断

        在check函数,首先对collectpads上面的pad状态进行检查,只有当有数据的pads和总的pads数满足一定条件时候,才会执行第二重的收集条件判断。函数为gst_collect_pads_check_collected,代码分析如下:

static GstFlowReturn
gst_collect_pads_check_collected (GstCollectPads * pads)
{
  GstFlowReturn flow_ret = GST_FLOW_OK;
  GstCollectPadsFunction func;
  gpointer user_data;

  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);

  /* 获取回调数据。  */
  GST_OBJECT_LOCK (pads);
  func = pads->priv->func;
  user_data = http://www.mamicode.com/pads->priv->user_data;>4.3 默认的第二重收集条件判断

        第二重收集条件的函数是可以进行重载的,可以使用gst_collect_pads_set_function进行设置,这里只分析默认的函数gst_collect_pads_default_collected。

        代码分析如下:

/*
 * Default collect callback triggered when #GstCollectPads gathered all data.
 *
 * Called with STREAM_LOCK.
 */
static GstFlowReturn
gst_collect_pads_default_collected (GstCollectPads * pads, gpointer user_data)
{
  GstCollectData *best = NULL;
  GstBuffer *buffer;
  GstFlowReturn ret = GST_FLOW_OK;
  GstCollectPadsBufferFunction func;
  gpointer buffer_user_data;

  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);

  /* 获取回调数据。  */
  GST_OBJECT_LOCK (pads);
  func = pads->priv->buffer_func;
  buffer_user_data = http://www.mamicode.com/pads->priv->buffer_user_data;>

        注意,这里如果对某些函数进行重载或者设置了非等待状态,有两个潜在的异常流程。

4.3.1 异常流程1:

        当使用默认的时间比较函数,且设置了非等待状态的pad有数据到来时,在函数gst_collect_pads_recalculate_waiting,当earliest_data检测到本PAD时,这时候时间戳应该是相等的,但是这时候如果处于非等待状态,无论是否加锁最后都会返回TRUE,这时候gst_collect_pads_default_collected函数中的第一个判断总会直接返回GST_FLOW_OK,但是实际并没有弹出任何buffer,但是gst_collect_pads_check_collected的循环条件并没有改变,导致这个线程会一直在这里循环。如果其他pad没有数据进入,则这个pad会进入死循环。

4.3.2 异常流程2:

        当所有的pads都设置了non-waiting状态,则在框架的收集条件判断函数gst_collect_pads_check_collected中的pads数量比较循环总是成立,且所有pads数据弹出时都不会减少当前的queuedpad数,因此当有一个buffer弹出后,会持续弹出所有buffer,当buffer为空时,循环条件依旧成立,在处理空buffer时,认为这个pad已经进入了EOS状态,从而导致异常。异常流程如下图:

技术分享

4.4 寻找最优的可用buffer和pad

        这个函数流程比较简单,就是遍历collectpads中的所有pad,并和earliest_time进行比较,寻找最早的时间点的buffer。

        这里涉及到时间比较的函数,这里的默认时间比较函数比较简单,就是单纯判断时间点的大小,相等返回0,第一个时间点大于第二个返回1,小于返回-1。

/**
 * gst_collect_pads_find_best_pad:
 * @pads: the collectpads to use
 * @data: returns the collectdata for earliest data
 * @time: returns the earliest available buffertime
 *
 * Find the oldest/best pad, i.e. pad holding the oldest buffer and
 * and return the corresponding #GstCollectData and buffertime.
 *
 * This function should be called with STREAM_LOCK held,
 * such as in the callback.
 */
static void
gst_collect_pads_find_best_pad (GstCollectPads * pads,
    GstCollectData ** data, GstClockTime * time)
{
  GSList *collected;
  GstCollectData *best = NULL;
  GstClockTime best_time = GST_CLOCK_TIME_NONE;

  g_return_if_fail (data != NULL);
  g_return_if_fail (time != NULL);

  /* 遍历所有pads,对所有pads中的数据与当前的earliest_time进行比较,
   * 寻找时间最靠前的buffer及其对应的pad。  */
  for (collected = pads->data; collected; collected = g_slist_next (collected)) {
    GstBuffer *buffer;
    GstCollectData *data = http://www.mamicode.com/(GstCollectData *) collected->data;>3.5 重新计算等待状态函数

        函数gst_collect_pads_recalculate_waiting会根据earliest_time和所有pad上的数据进行比较,计算collectpad是否需要重新进入等待状态,返回TRUE表示从非等待状态进入等待状态。

        这里如果设置了non-waiting状态,则要小心4.3中出现的异常。

/* General overview:
 * - only pad with a buffer can determine earliest_data (and earliest_time)
 * - only segment info determines (non-)waiting state
 * - ? perhaps use _stream_time for comparison
 *   (which muxers might have use as well ?)
 */

/*
 * Function to recalculate the waiting state of all pads.
 *
 * Must be called with STREAM_LOCK.
 *
 * Returns %TRUE if a pad was set to waiting
 * (from non-waiting state).
 */
static gboolean
gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
{
  GSList *collected;
  gboolean result = FALSE;

  /* If earliest time is not known, there is nothing to do. */
  /* 没有数据可以比较。  */
  if (pads->priv->earliest_data =http://www.mamicode.com/= NULL)>

4.6 锁和等待状态

        collectpad提供了接口gst_collect_pads_set_waiting可以给其他组件设置某个pad为等待或者非等待状态。其设置与锁GST_COLLECT_PADS_STATE_LOCKED标志位有关系。

        默认情况下的pad(注意,这里的pad为单独的一个pad,并非整个collectpad,这些状态为单个pad私有,并不是collectpad的属性)均为等待状态,而锁的初始化则根据element调用collect添加pad的函数gst_collect_pads_add_pad的最后一个参数决定。

/**
 * gst_collect_pads_set_waiting:
 * @pads: the collectpads
 * @data: the data to use
 * @waiting: boolean indicating whether this pad should operate
 *           in waiting or non-waiting mode
 *
 * Sets a pad to waiting or non-waiting mode, if at least this pad
 * has not been created with locked waiting state,
 * in which case nothing happens.
 *
 * This function should be called with @pads STREAM_LOCK held, such as
 * in the callback.
 *
 * MT safe.
 */
void
gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
    gboolean waiting)
{
  g_return_if_fail (pads != NULL);
  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
  g_return_if_fail (data != NULL);

  GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
      GST_PAD_NAME (data->pad), waiting,
      GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED));

  /* Do something only on a change and if not locked */
  /* 修改等待状态标志位需要在没有上锁的情况下处理,
   * 可以通过GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_LOCKED);方式加解锁。
   * 如果设置为非等待,则会把对应的queuedpad自增,当所有pad都处于非等待状态,则框架收集条件总是满足。
   * 可能存在4.3.2的问题。
   */
  if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
      (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
          ! !waiting)) {
    /* Set waiting state for this pad */
    if (waiting)
      GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
    else
      GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_WAITING);
    /* Update number of queued pads if needed */
    if (!data->buffer &&
        !GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS)) {
      if (waiting)
        pads->priv->queuedpads--;
      else
        pads->priv->queuedpads++;
    }

    /* signal waiters because something changed */
    GST_COLLECT_PADS_EVT_BROADCAST (pads);
  }
}



gstreamer的collectpad源码分析