Pads driving the pipeline

Sinkpads assigned to operate in pull-based mode, while none of its sourcepads operate in pull-based mode (or it has no sourcepads), can start a task that will drive the pipeline dataflow. Within this function, those elements have random access over all of their sinkpads, and push data over their sourcepads. This can come in useful for several different kinds of elements:

In order to start this task, you will need to create it in the activation function.


#include "filter.h"
#include <string.h>

static gboolean	gst_my_filter_activate	(GstPad      * pad);
static gboolean	gst_my_filter_activate_pull (GstPad  * pad,
					 gboolean      active);
static void	gst_my_filter_loop	(GstMyFilter * filter);

GST_BOILERPLATE (GstMyFilter, gst_my_filter, GstElement, GST_TYPE_ELEMENT);



static void
gst_my_filter_init (GstMyFilter * filter)
{

[..]

  gst_pad_set_activate_function (filter->sinkpad, gst_my_filter_activate);
  gst_pad_set_activatepull_function (filter->sinkpad,
      gst_my_filter_activate_pull);


[..]
}

[..]

static gboolean
gst_my_filter_activate (GstPad * pad)
{
  if (gst_pad_check_pull_range (pad)) {
    return gst_pad_activate_pull (pad, TRUE);
  } else {
    return FALSE;
  }
}

static gboolean
gst_my_filter_activate_pull (GstPad  *pad,
			     gboolean active)
{
  GstMyFilter *filter = GST_MY_FILTER (GST_OBJECT_PARENT (pad));

  if (active) {
    filter->offset = 0;
    return gst_pad_start_task (pad,
        (GstTaskFunction) gst_my_filter_loop, filter);
  } else {
    return gst_pad_stop_task (pad);
  }
}
    

Once started, your task has full control over input and output. The most simple case of a task function is one that reads input and pushes that over its source pad. It's not all that useful, but provides some more flexibility than the old chain-based case that we've been looking at so far.


#define BLOCKSIZE 2048

static void
gst_my_filter_loop (GstMyFilter * filter)
{
  GstFlowReturn ret;
  guint64 len;
  GstFormat fmt = GST_FORMAT_BYTES;
  GstBuffer *buf = NULL;

  if (!gst_pad_query_duration (filter->sinkpad, &fmt, &len)) {
    GST_DEBUG_OBJECT (filter, "failed to query duration, pausing");
    goto stop;
  }

   if (filter->offset >= len) {
    GST_DEBUG_OBJECT (filter, "at end of input, sending EOS, pausing");
    gst_pad_push_event (filter->srcpad, gst_event_new_eos ());
    goto stop;
  }

  /* now, read BLOCKSIZE bytes from byte offset filter->offset */
  ret = gst_pad_pull_range (filter->sinkpad, filter->offset,
      BLOCKSIZE, &buf);

  if (ret != GST_FLOW_OK) {
    GST_DEBUG_OBJECT (filter, "pull_range failed: %s", gst_flow_get_name (ret));
    goto stop;
  }

  /* now push buffer downstream */
  ret = gst_pad_push (filter->srcpad, buf);

  buf = NULL; /* gst_pad_push() took ownership of buffer */

  if (ret != GST_FLOW_OK) {
    GST_DEBUG_OBJECT (filter, "pad_push failed: %s", gst_flow_get_name (ret));
    goto stop;
  }

  /* everything is fine, increase offset and wait for us to be called again */
  filter->offset += BLOCKSIZE;
  return;

stop:
  GST_DEBUG_OBJECT (filter, "pausing task");
  gst_pad_pause_task (filter->sinkpad);
}