Pipe and Filter

Problem

Many problems involve the application of a set of transformations to a stream of data. How do you support this type of problem with scalable solutions that non-specialists can program?

Context

Consider a stream of input data, which could be a stream signal received from the antenna, a flow of network packets, a segment of video, or a piece of computer program code. The goal is to apply a set of operations/computations (or filters) in a particular order (or partial order) to the input data (arriving through a pipe) to produce a transformed output.

The operations/computations on each object are applied to some units of input data stream (or tokens) at a time, which could be a segment of signal from the antenna, a network packet, a frame of video, or a line of code. The operations are independent and side-effect free, i.e. states in an operation does not depend on states in any other operations. Each operation reads a stream of input data, applies a transformation depending on local state (and occasionally on some shared read-only state), and produces a set of output data.

The structure of the concurrency is highly regular so the resulting solution should take advantage of this to allow application domain experts to implement highly scalable programs based on this pattern.

Forces

This pattern allows the simple composition of the individual filters where the ordering constraints are explicitly expressed, and system properties can be derived. Filters should be designed to be reused, replaced, and enhanced as long as input/output data format matches.

The efficient execution of a pipe-and-filter system depends on a steady flow of data through the system that maximally utilizes computation resources. Scheduling must balance the load on all available resources.

The size of the token defines the granularity of the computation.

When the tokens are too small, the overhead of managing a filter computation (read-process-write) could be overwhelming.

When the tokens are too large, buffering requirements on the pipes between the filters could be greater than necessary.

The number of filters should be large enough to allow concurrency to be exploited, yet small enough to minimize the pipeline’s fill/drain time and overhead of transferring data between successive stages.

Each stage of filter should be about equal in computational effort, since the slowest stage is a limiting bottleneck on the overall throughput.

Solution

The application should be organized as a series of computation tasks corresponding to the filters, connected by dependencies corresponding to the pipes. The tasks can be seen as vertices in a task graph, and the pipes carrying information from one task to another can be seen as a directed edge in the task graph. For a pair of tasks where one is in the transitive fan-in or transitive fan-out of the other, they must be executed according to the transitive dependency; when one is not in the transitive fan-in/out of the other, they can be executed in parallel.

Parallelism for tasks along a linear branch in the task graph can be exploited using the pipelining pattern; parallelism for tasks along parallel branches can be exploited using the task parallelism pattern.

The filters of this pattern should be allocated to processing elements to maximize the usage of the available processing elements, while respecting the dependencies imposed by pipes. To ease load-balancing problems, large tasks should be separated into multiple constituent steps; to reduce overhead, lightly-loaded consecutive tasks should be combined into a single task.

There usually exist non-trivial amount of data that are passed between filtering tasks along the pipes. When executed on any particular platform, some orders of executing the concurrent filtering task will require less storage along the pipes, or less latency from the time when a piece of data enter the application to when it exits. Selecting a schedule that will minimize latency or buffering requirements is the scheduling of a pipe-and-filter application.

Invariant

Pre-condition: A stream of data in a particular format, a set of side-effect free filter function applied in a sequence. The sequence is defined by means of a directed acyclic graph and need not be linear.

Invariant:

  1. Each filter accepts input data in a particular input format, and will produce output in its own output format (which can be different from the input format).
  2. Each filter produces the same result independent of time, or non-local state. Sometimes global read-only state can be involved.
  3. A filter is not aware of the identity of up-stream or down-stream filters, thus cannot invoke/control another filter (note: distinguishing it from data abstraction and object oriented programming)

Post-condition: A stream of data in a particular format

Examples

A typical media application is video decoding. A video decoder reads in a stream of encoded video signals and produces a stream of decoded video frames through stages of computation. The stages, or filters, usually include variable length decoding, inverse quantization, inverse DCT (discrete cosine transform), motion compensation, and frame rendering. These stages are connected by pipes that buffer portions of a video sequence, such as a video frame or a macro block. Motion compensation and frame rendering are stages that require significant amount of computation, and are usually partitioned into finer tasks that operate on different luminance or chrominance channels or subsets of macro blocks. The partitions provide task parallelism in the form of concurrent branches of tasks such as IDCT for different channels, or IDCT for different macro blocks. Scheduling for minimal latency could involves load balancing work on multiple PEs such that all tasks for a frame completes execution before moving onto the next frame. Scheduling for minimal buffer requirement would execute (producer) filter and (consumer) filter pair wise as much as possible to reduce intermediate state.

In networking, there exist languages such as Click and NP-Click that can be used to compose a pipe-and-filter system. The language includes a pre-defined library of filter components. These are connected together with pipes (network queues) that perform buffering of packets to compensate for communication overhead. Filters perform operations like packet checks (header checksum check), packet updates (update checksum), route table lookups, packet shaping and so on. There typically exists a packet payload store to avoid transmitting the packet bodies from filter to filter. Only packet headers are transmitted through the pipes and operated upon by filters. Filters typically operate in parallel on the stream of incoming packet headers. When executed on a parallel platform such as network processors, the filters are scheduled (both allocated to the parallel cores and ordered in time) in such a way as to maximize the usage of the parallel cores. This usually involves load-balancing the filters in an attempt to achieve either best throughput of the system (subject to buffering limitations on the device) or to meet Quality of Service (QoS) requirements on latency.

Known uses

A well-known implementation of this pattern is the Unix operating system’s support for the pipe operator “|” which connects processes that read from the standard input stream and write to the standard output stream. Compilation tool flows for programming languages (consisting of lexical analysis, syntactic analysis, semantic analysis, and code generation) and for hardware designs (CAD tool flow consists of RTL synthesis, logic optimization, physical design and layout) are examples of linear pipelines. Using dataflow and graphical languages, such as Click, Simulink or LabVIEW, non-expert programmers typically assemble complex systems by connecting black-box components by drawing arrows or wires to indicate a pipeline structure. These structures can be linear or a general directed graph.

Related Patterns

  • Pipeline
  • Task parallelism

Authors

  • Jike Chong, Arlo Faria, Satish Nadathur, Youngmin Yi