A Genstage Tutorial And Reflection
Writing A Job Runner (In Elixir) (Again) (10 years later)
Ten years ago, I wrote a job runner in Elixir after some inspiration from Jose
This is an update on that post.
Almost no code has changed, but I wrote it up a lot better, and added some more detail.
I find it wildly amusing it held up this well, and felt like re-sharing with everyone and see if someone with fresh eyes may get some enjoyment or learn a bit from this.
I also take things quite a bit further
Who is this for?
Are you curious?
If you know a little bit of Elixir, this is a great “levelling up” piece.
If you’re seasoned, it might be fun to implement if you have not.
If you don’t know Elixir, it will hopefully be an interesting case study and sales pitch.
Anyone with a Claude or Open AI subscription can easily follow along knowing no Elixir.
Work?
Applications must do work. This is typical of just about any program that reaches a sufficient size. In order to do that work, sometimes it’s desirable to have it happen elsewhere. If you have built software, you have probably needed a background job.
In this situation, you are fundamentally using code to run other code. Erlang has a nice format for this, called the Erlang term format. It can store its data in a way it can be passed around and run by other nodes We are going to examine doing this in Elixir with “tools in the shed”. We will have a single dependency called gen_stage
that is built and maintained by the language’s creator, Jose Valim.
For beginners, we will first cover a bit about Elixir and what it offers that might make this appealing
The Landscape of Job Processing
In Ruby, you might reach for Sidekiq. It’s battle-tested, using Redis for storage and threads for concurrency. Jobs are JSON objects, workers pull from queues, and if something crashes, you hope your monitoring catches it. It works well until you need to scale beyond a single Redis instance or handle complex job dependencies.
Python developers often turn to Celery. It’s more distributed by design, supporting multiple brokers and result backends. But the complexity shows - you’re configuring RabbitMQ, dealing with serialization formats, and debugging issues across multiple moving parts. When a worker dies mid-job, recovery depends on how well you’ve configured acknowledgments and retries.
Go developers might use machinery or asynq, leveraging goroutines for concurrency. The static typing helps catch errors early, but you’re still manually managing worker pools and carefully handling panics to prevent the whole process from dying.
Each solution reflects its language’s strengths and limitations. They all converge on similar patterns: a persistent queue, worker processes, and lots of defensive programming. What if the language itself provided better primitives for this problem?
Thinking About Job Runners, Producers, Consumers, and Events
The Architecture of Work
At its core, a job runner is a meta concept. It is code that runs code. There will always be work to be done in any given system that has users. But ensuring work gets done when it cannot be handled in a blocking, synchronous matter (and you have the time to await results) is nearly impossible. The devil is in these details. How do you handle failure? What is our plan when we have a situation that could overwhelm our worker pool? We seek out answers to these questions as we do this dive.
GenStage answers the questions we have asked so far, in general, with demand driven architecture. Instead of pushing work out, workers pull when they are ready. This inversion becomes a very elegant abstraction in practice.
Understanding Producer-Consumer Patterns
The producer-consumer pattern isn’t unique to Elixir. It’s a fundamental pattern in distributed systems:
In Apache Spark, RDDs (Resilient Distributed Datasets) flow through transformations. Each transformation is essentially a consumer of the previous stage and a producer for the next. Spark handles backpressure through its task scheduler - if executors are busy, new tasks wait.
In Kafka Streams, topics act as buffers between producers and consumers. Consumers track their offset, pulling messages at their own pace. The broker handles persistence and replication.
In Go channels, goroutines communicate through typed channels. A goroutine blocks when sending to a full channel or receiving from an empty one. This provides natural backpressure but requires careful capacity planning.
GenStage takes a different approach. There are no intermediate buffers or brokers. Producers and consumers negotiate directly:
- Consumer asks producer for work (specifying how much it can handle)
- Producer responds with up to that many events
- Consumer processes events and asks for more
This creates a pull-based system with automatic flow control. No queues filling up, no brokers to manage, no capacity planning. The system self-regulates based on actual processing speed.
What We’re Actually Building
Why Elixir Works for Job Processing
Processes are the unit of concurrency. Not threads, not coroutines - processes. Each process has its own heap, runs concurrently, and can’t corrupt another’s memory. Starting one is measured in microseconds and takes about 2KB of memory. You don’t manage a pool of workers; you spawn a process per job.
Failure is isolated by default. When a process crashes, it dies alone. No corrupted global state, no locked mutexes, no zombie threads. The supervisor sees the death, logs it, and starts a fresh process. Your job processor doesn’t need defensive try-catch blocks everywhere - it needs a good supervision tree.
Message passing is the only way to communicate. No shared memory means no locks, no race conditions, no memory barriers. A process either receives a message or it doesn’t. This constraint simplifies concurrent programming dramatically - you can reason about each process in isolation.
The scheduler handles fairness. The BEAM VM runs its own scheduler, preemptively switching between processes every 2000 reductions. One process can’t starve others by hogging the CPU. This is why Phoenix can handle millions of WebSocket connections - each connection is just another lightweight process.
Distribution is built-in. Connect nodes with one function call. Send messages across the network with the same syntax as local messages. The Erlang Term Format serializes any data structure, including function references. Your job queue can span multiple machines without changing the core logic.
Hot code reloading works. Deploy new code without stopping the system. The BEAM can run two versions of a module simultaneously, migrating processes gracefully. Your job processor can be upgraded while it’s processing jobs.
Introspection is exceptional. Connect to a running system and inspect any process. See its message queue, memory usage, current function. The observer GUI shows your entire system’s health in real-time. When production misbehaves, you can debug it live.
These aren’t features bolted on top - they’re fundamental to how the BEAM VM works. When you build a job processor in Elixir, you’re not fighting the language to achieve reliability and concurrency. You’re using it as designed.
Our job runner will have three core components:
Producers - These generate or fetch work. In our case, they’ll pull jobs from a database table. A producer doesn’t decide who gets work - it simply responds to demand. When a consumer asks for 10 jobs, the producer queries the database for 10 unclaimed jobs and returns them.
Consumers - These execute jobs. Each consumer is a separate Elixir process, isolated from others. When a consumer is ready for work, it asks its producer for events. After processing, it asks for more. If a consumer crashes while processing a job, only that job is affected.
Events - The unit of work flowing through the system. In GenStage, everything is an event. For our job runner, an event is a job to be executed. Events flow from producers to consumers based on demand, never faster than consumers can handle.
The Beauty of Modeling Everything as Events
When you model work as events, powerful patterns emerge:
Composition - You can chain stages together. A consumer can also be a producer for another stage. Want to add a step that enriches jobs before execution? Insert a producer-consumer between your current stages.
Fan-out/Fan-in - One producer can feed multiple consumers (fan-out). Multiple producers can feed one consumer (fan-in). The demand mechanism ensures fair distribution.
Buffering - Need a buffer? Add a producer-consumer that accumulates events before passing them on. The buffer only fills as fast as downstream consumers can drain it.
Filtering - A producer-consumer can selectively forward events. Only want to process high-priority jobs? Filter them in a middle stage.
Event Flow Pipeline: Social Media Processing
[BlueSky] ──┐
[Twitter] ──┼──→ [Producer] ═══→ [ProducerConsumer] ═══→ [Consumer] ──→ [Database]
[TikTok] ──┘ (transformation)
Flow: Social media posts → Producer → Transformation → Consumer → Storage
Why This Matters for Job Processing
Traditional job processors push jobs into queues. Workers poll these queues, hoping to grab work. This creates several problems:
- Queue overflow - Producers can overwhelm the queue if consumers are slow
- Unfair distribution - Fast workers might grab all the work
- Visibility - Hard to see where bottlenecks are
- Error handling - What happens to in-flight jobs when a worker dies?
GenStage’s demand-driven model solves these elegantly:
- No overflow - Producers only generate what’s demanded
- Fair distribution - Each consumer gets what it asks for
- Clear bottlenecks - Slow stages naturally build up demand
- Clean errors - Crashed consumers simply stop demanding; their work remains unclaimed
This isn’t theoretical. Telecom systems have used these patterns for decades. When you make a phone call, switches don’t push calls through the network - each hop pulls when ready. This prevents network overload even during disasters when everyone tries to call at once.
We’re applying the same battle-tested patterns to job processing. The result is a system that’s naturally resilient, self-balancing, and surprisingly simple to reason about.
Ready to see how this translates to code? Let’s build our first producer.
Building the Foundation
Step 1: Creating Your Phoenix Project
Let’s start fresh with a new Phoenix project. Open your terminal and run:
mix phx.new job_processor --live
cd job_processor
We’re keeping it lean - no dashboard or mailer for now. When prompted to install dependencies, say yes.
Why Phoenix? We’re not building a web app, but Phoenix gives us:
- A supervision tree already set up
- Configuration management
- A database connection (Ecto)
- LiveView for our monitoring dashboard (later)
Think of Phoenix as our application framework, not just a web framework.
Step 2: Adding GenStage
Open mix.exs
and add GenStage to your dependencies:
defp deps do
[
{:phoenix, "~> 1.7.12"},
{:phoenix_ecto, "~> 4.5"},
{:ecto_sql, "~> 3.11"},
{:postgrex, ">= 0.0.0"},
{:phoenix_html, "~> 4.1"},
{:phoenix_live_reload, "~> 1.2", only: :dev},
{:phoenix_live_view, "~> 0.20.14"},
{:telemetry_metrics, "~> 0.6"},
{:telemetry_poller, "~> 1.0"},
{:jason, "~> 1.2"},
{:bandit, "~> 1.2"},
# Add this line
{:gen_stage, "~> 1.2"}
]
end
Now fetch the dependency:
mix deps.get
That’s it. One dependency. GenStage is maintained by the Elixir core team, so it follows the same design principles as the language itself.
Step 3: Understanding GenStage’s Mental Model
Before we write code, let’s cement the mental model. GenStage orchestrates three types of processes:
Producers emit events. They don’t push events anywhere - they hold them until a consumer asks. Think of a producer as a lazy river of data. The water (events) only flows when someone downstream opens a valve (demands).
Consumers receive events. They explicitly ask producers for a specific number of events. This is the key insight: consumers control the flow rate, not producers.
Producer-Consumers do both. They receive events from upstream, transform them, and emit to downstream. Perfect for building pipelines.
Every GenStage process follows this lifecycle:
- Start and connect to other stages
- Consumer sends demand upstream
- Producer receives demand and emits events
- Consumer receives and processes events
- Repeat from step 2
The demand mechanism is what makes this special. In a traditional queue, you might have:
# Traditional approach - producer decides when to push
loop do
job = create_job()
Queue.push(job) # What if queue is full?
end
With GenStage:
# GenStage approach - consumer decides when to pull
def handle_demand(demand, state) do
jobs = create_jobs(demand) # Only create what's asked for
{:noreply, jobs, state}
end
The consumer is in control. It’s impossible to overwhelm a consumer because it only gets what it asked for.
Step 4: Creating the Producer
Now for the meat of it. Let’s build a producer that understands our job processing needs. Create a new file at lib/job_processor/producer.ex
:
defmodule JobProcessor.Producer do
use GenStage
require Logger
@doc """
Starts the producer with an initial state.
The state can be anything, but we'll use a counter to start simple.
"""
def start_link(initial \\ 0) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
@impl true
def init(counter) do
Logger.info("Producer starting with counter: #{counter}")
{:producer, counter}
end
@impl true
def handle_demand(demand, state) do
Logger.info("Producer received demand for #{demand} events")
# Generate events to fulfill demand
events = Enum.to_list(state..(state + demand - 1))
# Update our state
new_state = state + demand
# Return events and new state
{:noreply, events, new_state}
end
end
Let’s dissect this line by line:
use GenStage
- This macro brings in the GenStage behavior. It’s like use GenServer
but for stages. It requires us to implement certain callbacks.
start_link/1
- Standard OTP pattern. We name the process after its module so we can find it easily. In production, you might want multiple producers, so you’d make the name configurable.
init/1
- The crucial part: {:producer, counter}
. The first element declares this as a producer. The second is our initial state. GenStage now knows this process will emit events when asked.
handle_demand/2
- The heart of a producer. This callback fires when consumers ask for events. The arguments are:
-
demand
- How many events the consumer wants -
state
- Our current state
The return value {:noreply, events, new_state}
means:
-
:noreply
- We’re responding to demand, not a synchronous call -
events
- The list of events to emit (must be a list) -
new_state
- Our updated state
The Demand Buffer
Here’s something subtle but important: GenStage maintains an internal demand buffer. If multiple consumers ask for events before you can fulfill them, GenStage aggregates the demand.
For example:
- Consumer A asks for 10 events
- Consumer B asks for 5 events
-
Your
handle_demand/2
receives demand for 15 events
This batching is efficient and prevents your producer from being called repeatedly for small demands.
What if You Can’t Fulfill Demand?
Sometimes you can’t produce as many events as demanded. That’s fine:
def handle_demand(demand, state) do
available = calculate_available_work()
if available >= demand do
events = fetch_events(demand)
{:noreply, events, state}
else
# Can only partially fulfill demand
events = fetch_events(available)
{:noreply, events, state}
end
end
GenStage tracks unfulfilled demand. If you return fewer events than demanded, it remembers. The next time you have events available, you can emit them even without new demand:
def handle_info(:new_data_available, state) do
events = fetch_available_events()
{:noreply, events, state}
end
Producer Patterns
Our simple counter producer is just the beginning. Real-world producers follow several patterns:
Database Polling Producer:
def handle_demand(demand, state) do
jobs = Repo.all(
from j in Job,
where: j.status == "pending",
limit: ^demand,
lock: "FOR UPDATE SKIP LOCKED"
)
job_ids = Enum.map(jobs, & &1.id)
Repo.update_all(
from(j in Job, where: j.id in ^job_ids),
set: [status: "processing"]
)
{:noreply, jobs, state}
end
Rate-Limited Producer:
def handle_demand(demand, %{rate_limit: limit} = state) do
now = System.monotonic_time(:millisecond)
time_passed = now - state.last_emit
allowed = min(demand, div(time_passed * limit, 1000))
if allowed > 0 do
events = generate_events(allowed)
{:noreply, events, %{state | last_emit: now}}
else
# Schedule retry
Process.send_after(self(), :retry_demand, 100)
{:noreply, [], state}
end
end
Buffering Producer:
def handle_demand(demand, %{buffer: buffer} = state) do
{to_emit, remaining} = Enum.split(buffer, demand)
if length(to_emit) < demand do
# Buffer exhausted, try to refill
new_events = fetch_more_events()
all_events = to_emit ++ new_events
{to_emit_now, to_buffer} = Enum.split(all_events, demand)
{:noreply, to_emit_now, %{state | buffer: to_buffer}}
else
{:noreply, to_emit, %{state | buffer: remaining}}
end
end
Testing Your Producer
Let’s make sure our producer works. Create test/job_processor/producer_test.exs
:
defmodule JobProcessor.ProducerTest do
use ExUnit.Case
alias JobProcessor.Producer
test "producer emits events on demand" do
{:ok, producer} = Producer.start_link(0)
# Manually subscribe and ask for events
{:ok, _subscription} = GenStage.sync_subscribe(self(), to: producer, max_demand: 5)
# We should receive 5 events (0 through 4)
assert_receive {:"$gen_consumer", {_, _}, [0, 1, 2, 3, 4]}
end
test "producer maintains state across demands" do
{:ok, producer} = Producer.start_link(10)
# First demand
{:ok, _} = GenStage.sync_subscribe(self(), to: producer, max_demand: 3)
assert_receive {:"$gen_consumer", {_, _}, [10, 11, 12]}
# Second demand should continue from where we left off
send(producer, {:"$gen_producer", {self(), nil}, {:ask, 2}})
assert_receive {:"$gen_consumer", {_, _}, [13, 14]}
end
end
Run the tests with mix test
.
The Power of Stateful Producers
Our producer maintains state - a simple counter. But state can be anything:
- A database connection for polling
- A buffer of pre-fetched events
- Rate limiting information
- Metrics and telemetry data
Because each producer is just an Erlang process, it’s isolated. If one producer crashes, others continue. The supervisor restarts the crashed producer with a fresh state.
This is different from thread-based systems where shared state requires locks. Each producer owns its state exclusively. No locks, no race conditions, no defensive programming.
What We’ve Built
Our producer is deceptively simple, but it demonstrates core principles:
- Demand-driven - Only produces when asked
- Stateful - Maintains its own isolated state
- Supervised - Can crash and restart safely
- Testable - Easy to verify behavior
In the next section, we’ll build consumers that process these events. But the producer is the foundation - it controls the flow of work through our system.
Building A Consumer
Now that we have a producer emitting events, we need something to consume them. This is where consumers come in - they’re the workers that actually process the events flowing through our system.
But here’s the beautiful thing about GenStage consumers: they’re not passive recipients waiting for work to be thrown at them. They’re active participants in the flow control. A consumer decides how much work it can handle and explicitly asks for that amount. No more, no less.
Think about how this changes the dynamics. In a traditional message queue, producers blast messages into a queue, hoping consumers can keep up. If consumers fall behind, the queue grows. If consumers are faster than expected, they sit idle waiting for work. It’s a constant balancing act with lots of manual tuning.
GenStage flips this completely. Consumers know their own capacity better than anyone else. They know if they’re currently processing a heavy job, if they’re running low on memory, or if they’re about to restart. So they ask for exactly what they can handle right now.
The Consumer’s Lifecycle
A GenStage consumer follows a simple but powerful lifecycle:
- Subscribe - Connect to one or more producers
- Demand - Ask for a specific number of events
- Receive - Get events from producers (never more than requested)
- Process - Handle each event
- Repeat - Ask for more events when ready
The key insight is step 4: processing happens between demands. The consumer processes its current batch completely before asking for more. This creates natural backpressure - slow consumers automatically reduce the flow rate.
Building Our First Consumer
Let’s build a consumer that processes the events from our producer. Create a new file at lib/job_processor/consumer.ex
:
defmodule JobProcessor.Consumer do
use GenStage
require Logger
@doc """
Starts the consumer.
Like producers, consumers are just GenServer-like processes.
The state can be anything you need for processing.
"""
def start_link(opts \\ []) do
GenStage.start_link(__MODULE__, opts)
end
@impl true
def init(opts) do
# The key difference: we declare ourselves as a :consumer
# and specify which producer(s) to subscribe to
{:consumer, opts, subscribe_to: [JobProcessor.Producer]}
end
@impl true
def handle_events(events, _from, state) do
Logger.info("Consumer received #{length(events)} events")
# Process each event
for event <- events do
process_event(event, state)
end
# Always return {:noreply, [], state} for consumers
# The empty list means we don't emit any events (we're not a producer)
{:noreply, [], state}
end
defp process_event(event, state) do
# For now, just log what we received
Logger.info("Processing event: #{event}")
IO.inspect({self(), event, state}, label: "Consumer processed")
end
end
Understanding the Consumer Architecture
Let’s break down what makes this consumer work:
use GenStage
- Just like producers, consumers use the GenStage behavior. But the callbacks they implement are different.
init/1
returns {:consumer, state, options}
- The crucial difference from producers. The first element declares this process as a consumer. The subscribe_to
option tells GenStage which producers to connect to.
handle_events/3
instead of handle_demand/2
- Consumers implement handle_events/3
, which receives:
-
events
- The list of events to process -
from
- Which producer sent these events (usually ignored) -
state
- The consumer’s current state
The return value {:noreply, [], state}
- Consumers don’t emit events (that’s producers’ job), so the events list is always empty. They just process and update their state.
The Magic of Subscription
Notice the subscribe_to: [JobProcessor.Producer]
option. This does several important things:
Automatic connection - GenStage handles finding and connecting to the producer. No manual process linking or monitoring.
Automatic demand - The consumer automatically asks the producer for events. By default, it requests batches of up to 1000 events, but you can tune this.
Fault tolerance - If the producer crashes and restarts, the consumer automatically reconnects. If the consumer crashes, it doesn’t take down the producer.
Flow control - The consumer won’t receive more events than it asks for. If it’s slow processing the current batch, no new events arrive until it’s ready.
Tuning Consumer Demand
You can control how many events a consumer requests at once:
def init(opts) do
{:consumer, opts,
subscribe_to: [
{JobProcessor.Producer, min_demand: 5, max_demand: 50}
]}
end
min_demand
- Don’t ask for more events until we have fewer than this many
max_demand
- Never ask for more than this many events at once
This creates a buffering effect. The consumer will receive events in batches between min_demand and max_demand, giving you control over throughput vs. latency tradeoffs.
For job processing, you might want smaller batches to reduce memory usage:
subscribe_to: [
{JobProcessor.Producer, min_demand: 1, max_demand: 10}
]
Or larger batches for higher throughput:
subscribe_to: [
{JobProcessor.Producer, min_demand: 100, max_demand: 1000}
]
Why This Design Matters
The producer-consumer subscription model solves several classic distributed systems problems:
Backpressure - Slow consumers naturally slow down the entire pipeline. No queues overflow, no memory explosions.
Dynamic scaling - Add more consumers and they automatically start receiving events. Remove consumers and the remaining ones pick up the slack.
Fault isolation - A crashing consumer doesn’t affect others. A crashing producer can be restarted without losing in-flight work.
Observable performance - You can see exactly where bottlenecks are by monitoring demand patterns. High accumulated demand = bottleneck downstream.
Consumer Patterns
Real-world consumers follow several common patterns:
Database Writing Consumer:
def handle_events(events, _from, state) do
# Batch insert for efficiency
records = Enum.map(events, &transform_event/1)
Repo.insert_all(MyTable, records)
{:noreply, [], state}
end
HTTP API Consumer:
def handle_events(events, _from, state) do
for event <- events do
case HTTPoison.post(state.webhook_url, Jason.encode!(event)) do
{:ok, %{status_code: 200}} -> :ok
{:error, reason} -> Logger.error("Webhook failed: #{inspect(reason)}")
end
end
{:noreply, [], state}
end
File Processing Consumer:
def handle_events(events, _from, state) do
for event <- events do
file_path = "/tmp/processed_#{event.id}.json"
File.write!(file_path, Jason.encode!(event))
end
{:noreply, [], state}
end
Error Handling in Consumers
What happens when event processing fails? In traditional queue systems, you need complex retry logic, dead letter queues, and careful state management.
With GenStage consumers, it’s simpler. If a consumer crashes while processing events, those events are simply not acknowledged. When the consumer restarts, the producer still has them and will include them in the next batch.
For more sophisticated error handling, you can catch exceptions:
def handle_events(events, _from, state) do
for event <- events do
try do
process_event(event)
rescue
e ->
Logger.error("Failed to process event #{event.id}: #{inspect(e)}")
# Could send to dead letter queue, retry later, etc.
end
end
{:noreply, [], state}
end
But often, letting the process crash and restart is the right approach. It’s simple, it clears any corrupted state, and the supervisor handles the restart automatically.
Wiring It Together
Now we have both pieces: a producer that emits events and a consumer that processes them. But they’re just modules sitting in files. We need to start them as processes and connect them.
This is where OTP’s supervision trees shine. We’ll add both processes to our application’s supervision tree, and OTP will ensure they start in the right order and restart if they crash.
Open lib/job_processor/application.ex
and modify the start/2
function:
def start(_type, _args) do
children = [
# Start the Producer first
JobProcessor.Producer,
# Then start the Consumer
# The consumer will automatically connect to the producer
JobProcessor.Consumer,
# Other children like Ecto, Phoenix endpoint, etc.
JobProcessorWeb.Endpoint
]
opts = [strategy: :one_for_one, name: JobProcessor.Supervisor]
Supervisor.start_link(children, opts)
end
That’s it! The supervision tree will:
- Start the producer
- Start the consumer
- The consumer automatically subscribes to the producer
- Events start flowing immediately
Why This Supervision Strategy Works
The :one_for_one
strategy means if one process crashes, only that process is restarted. This is perfect for our producer-consumer setup:
Producer crashes - The consumer notices the connection is lost and waits. When the supervisor restarts the producer, the consumer automatically reconnects.
Consumer crashes - The producer keeps running, just stops emitting events. When the supervisor restarts the consumer, it reconnects and processing resumes.
This is fault isolation in action. Problems in one part of the system don’t cascade to other parts.
Testing the Connection
Let’s see our producer and consumer working together. Start the application:
mix phx.server
You should see logs showing the consumer processing events from the producer. Each event will be displayed with the process ID, event number, and state - something like this:
Consumer processed: {#PID<0.234.0>, 0, []}
Consumer processed: {#PID<0.234.0>, 1, []}
Consumer processed: {#PID<0.234.0>, 2, []}
Consumer processed: {#PID<0.234.0>, 3, []}
Consumer processed: {#PID<0.234.0>, 4, []}
...
Notice something important: the same PID processes every event. This is because we have a single consumer. Our counter increments predictably from 0, 1, 2, 3, 4… and all events flow to the same process.
The Single Consumer Scenario
With one consumer, we get:
- Predictable ordering - Events are processed in the exact order they’re generated
- Sequential processing - Each event is fully processed before the next one begins
- Simple state management - Only one process to reason about
- Potential bottleneck - If processing is slow, the entire pipeline slows down
Single Consumer Pattern: Sequential Processing
[Producer] ──→ ⓪ ──→ ① ──→ ② ──→ ③ ──→ ④ ──→ ⑤ ──→ [Consumer]
(Emits 0,1,2,3,4...) (Processes Sequentially)
Timeline:
t0: ████████ Process Event 0
t1: ████████ Process Event 1
t2: ████████ Process Event 2
t3: ░░░░░░░░░░░░░░░░░░░ Events 3,4,5... waiting
Key Characteristics:
✓ Predictable ordering - Events processed in exact sequence
✓ Sequential processing - One event completes before next begins
✓ Simple state management - Single process to track
⚠ Potential bottleneck - Slow processing blocks entire pipeline
This is perfect for scenarios where order matters or when you’re just getting started. But what happens when we add more consumers?
Scaling to Multiple Consumers
Let’s see what happens with multiple consumers. Add this to your supervision tree in lib/job_processor/application.ex
:
def start(_type, _args) do
children = [
# Start the Producer first
JobProcessor.Producer,
# Start multiple consumers
{JobProcessor.Consumer, [id: :consumer_1]},
{JobProcessor.Consumer, [id: :consumer_2]},
{JobProcessor.Consumer, [id: :consumer_3]},
# Other children
JobProcessorWeb.Endpoint
]
opts = [strategy: :one_for_one, name: JobProcessor.Supervisor]
Supervisor.start_link(children, opts)
end
Now restart your application and watch the logs:
Consumer processed: {#PID<0.234.0>, 0, []}
Consumer processed: {#PID<0.234.0>, 1, []}
Consumer processed: {#PID<0.235.0>, 2, []}
Consumer processed: {#PID<0.236.0>, 3, []}
Consumer processed: {#PID<0.234.0>, 4, []}
Consumer processed: {#PID<0.235.0>, 5, []}
...
Notice the different PIDs! Events are now distributed across multiple consumer processes. The distribution depends on which consumer asks for work first and how fast each consumer processes its events.
Multiple Consumer Pattern: Parallel Processing with Load Balancing
┌──→ Consumer 1 (PID <0.234.0>) ─→ Events: 0, 1, 4...
│ ↑ demand
[Producer] ─────────┼──→ Consumer 2 (PID <0.235.0>) ─→ Events: 2, 5, 8...
(First-Come- │ ↑ demand
First-Served) └──→ Consumer 3 (PID <0.236.0>) ─→ Events: 3, 6, 7...
↑ demand
Timeline: Parallel Processing
Consumer 1: ██████████ Event 0 ████████ Event 1 ████████████ Event 4
Consumer 2: ████████████ Event 2 ██████████ Event 5
Consumer 3: ████████ Event 3 ████████████ Event 6
✓ Benefits: ⚠ Challenges:
• Higher throughput - parallel • No ordering guarantees
• Fault tolerance - others continue • Shared resource contention
• Natural load balancing • Debugging complexity
• Better resource utilization • Potential race conditions
Understanding Event Distribution
GenStage’s default dispatcher (DemandDispatcher) uses a “first-come, first-served” approach:
- Consumer A finishes its current batch and asks for 10 more events
- Producer sends events 0-9 to Consumer A
- Consumer B asks for 10 events
- Producer sends events 10-19 to Consumer B
- Consumer A finishes and asks for more, gets events 20-29
This creates natural load balancing - faster consumers get more work. If Consumer A is processing heavy jobs slowly, Consumer B and C will pick up the slack.
The Trade-offs
Benefits of Multiple Consumers:
- Throughput - More work gets done in parallel
- Fault tolerance - If one consumer crashes, others continue
- Natural load balancing - Fast consumers get more work
- Resource utilization - Better use of multi-core systems
Challenges:
- No ordering guarantees - Event 5 might finish before event 3
- Shared resources - Multiple consumers might compete for database connections
- Debugging complexity - Multiple processes to track
Different Distribution Strategies
You can change how events are distributed by modifying the producer’s dispatcher. Add this to your producer’s init/1
function:
def init(counter) do
Logger.info("Producer starting with counter: #{counter}")
{:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
end
Now restart and watch what happens:
Consumer processed: {#PID<0.234.0>, 0, []}
Consumer processed: {#PID<0.235.0>, 0, []}
Consumer processed: {#PID<0.236.0>, 0, []}
Consumer processed: {#PID<0.234.0>, 1, []}
Consumer processed: {#PID<0.235.0>, 1, []}
Consumer processed: {#PID<0.236.0>, 1, []}
...
With BroadcastDispatcher, every consumer receives every event! This is useful for scenarios like:
- Multiple consumers writing to different databases
- One consumer processing events, another collecting metrics
- Broadcasting notifications to multiple systems
BroadcastDispatcher: Every Consumer Receives Every Event
┌─→ Database Writer (PID <0.234.0>) ─→ Events: 0, 1, 2, 3...
│
[Producer] ═══━━━━━━┼─→ Metrics Collector (PID <0.235.0>) ─→ Events: 0, 1, 2, 3...
(Broadcasting) │
└─→ Notification Service (PID <0.236.0>) ─→ Events: 0, 1, 2, 3...
Timeline: All Consumers Process Same Events Simultaneously
Database Writer: ████████ Event 0 ████████ Event 1 ████████ Event 2
Metrics Collector: ████████ Event 0 ████████ Event 1 ████████ Event 2
Notification Service:████████ Event 0 ████████ Event 1 ████████ Event 2
🔄 Broadcasting Use Cases:
• Multiple databases - Each consumer writes to different database
• Parallel processing - One processes data, another collects metrics
• Notification fanout - Broadcasting alerts to multiple services
• Audit trails - Simultaneous logging to multiple destinations
Key Differences from Load Balancing:
✓ Every consumer gets EVERY event (no distribution)
✓ Perfect for parallel processing different aspects
✓ Higher total throughput but more resource usage
⚠ N times more processing (N = number of consumers)
But we’re still just processing numbers. In the next section, we’ll replace our simple counter with a real job processing system that can execute arbitrary code.
From Toy Examples to Real Job Processing
We’ve built a solid foundation with our producer-consumer setup, but we’re still just processing incrementing numbers. That’s useful for understanding the mechanics, but real job processing needs persistent storage, job queuing, and the ability to execute arbitrary code.
This is where things get interesting. We’re going to transform our simple counter into a full job processing system that can serialize function calls, store them in a database, and execute them across multiple workers. Think of it as building your own mini-Sidekiq, but with GenStage’s elegant backpressure handling.
Why We Need a Database
Right now, our producer generates events from memory (a simple counter). But real job processors need persistence for several reasons:
Durability - Jobs shouldn’t disappear if the system restarts. When you queue a job to send an email, you expect it to survive server reboots.
Coordination - Multiple producer processes might be running across different servers. They need a shared source of truth for what work exists.
Status tracking - Jobs have lifecycles: queued, running, completed, failed. You need to track this state somewhere.
Debugging and monitoring - When jobs fail, you need to see what went wrong and potentially retry them.
The database becomes our job queue’s persistent storage layer, but GenStage handles all the flow control and distribution logic.
Setting Up Our Job Storage
Since we’re using Phoenix, we already have Ecto configured. But we need to set up our job storage table. The beauty of Elixir’s job processing is that we can serialize entire function calls as binary data using the Erlang Term Format.
Let’s create a migration for our jobs table:
mix ecto.gen.migration create_jobs
Now edit the migration file:
defmodule JobProcessor.Repo.Migrations.CreateJobs do
use Ecto.Migration
def change do
create table(:jobs) do
add :status, :string, null: false, default: "queued"
add :payload, :binary, null: false
add :attempts, :integer, default: 0
add :max_attempts, :integer, default: 3
add :scheduled_at, :utc_datetime
add :started_at, :utc_datetime
add :completed_at, :utc_datetime
add :error_message, :text
timestamps()
end
create index(:jobs, [:status])
create index(:jobs, [:scheduled_at])
create index(:jobs, [:status, :scheduled_at])
end
end
This gives us a robust job storage system:
- status - Track job lifecycle (queued, running, completed, failed)
- payload - The serialized function call
- attempts/max_attempts - Retry logic
- scheduled_at - Support for delayed jobs
- Timestamps - Monitor performance and debug issues
Run the migration:
mix ecto.migrate
Modeling Jobs
Let’s create an Ecto schema for our jobs. Create lib/job_processor/job.ex
:
defmodule JobProcessor.Job do
use Ecto.Schema
import Ecto.Changeset
schema "jobs" do
field :status, :string, default: "queued"
field :payload, :binary
field :attempts, :integer, default: 0
field :max_attempts, :integer, default: 3
field :scheduled_at, :utc_datetime
field :started_at, :utc_datetime
field :completed_at, :utc_datetime
field :error_message, :string
timestamps()
end
def changeset(job, attrs) do
job
|> cast(attrs, [:status, :payload, :attempts, :max_attempts,
:scheduled_at, :started_at, :completed_at, :error_message])
|> validate_required([:payload])
|> validate_inclusion(:status, ["queued", "running", "completed", "failed"])
end
@doc """
Serialize a function call into a job payload.
This is where the magic happens - we can serialize any module, function,
and arguments into binary data that can be stored and executed later.
"""
def encode_job(module, function, args) do
{module, function, args} |> :erlang.term_to_binary()
end
@doc """
Deserialize a job payload back into a function call.
"""
def decode_job(payload) do
:erlang.binary_to_term(payload)
end
end
Building the Job Queue Interface
Now we need an interface for interacting with jobs. This is where we abstract the database operations and provide a clean API for enqueueing and processing jobs. Create lib/job_processor/job_queue.ex
:
defmodule JobProcessor.JobQueue do
import Ecto.Query
alias JobProcessor.{Repo, Job}
@doc """
Enqueue a job for processing.
This is the public API that applications use to submit work.
"""
def enqueue(module, function, args, opts \\ []) do
payload = Job.encode_job(module, function, args)
attrs = %{
payload: payload,
max_attempts: Keyword.get(opts, :max_attempts, 3),
scheduled_at: Keyword.get(opts, :scheduled_at, DateTime.utc_now())
}
%Job{}
|> Job.changeset(attrs)
|> Repo.insert()
end
@doc """
Fetch available jobs for processing.
This is called by our GenStage producer to get work.
Uses FOR UPDATE SKIP LOCKED to avoid race conditions.
"""
def fetch_jobs(limit) do
now = DateTime.utc_now()
Repo.transaction(fn ->
# Find available jobs
job_ids =
from(j in Job,
where: j.status == "queued" and j.scheduled_at <= ^now,
limit: ^limit,
select: j.id,
lock: "FOR UPDATE SKIP LOCKED"
)
|> Repo.all()
# Mark them as running and return the full job data
{count, jobs} =
from(j in Job, where: j.id in ^job_ids)
|> Repo.update_all(
[set: [status: "running", started_at: DateTime.utc_now()]],
returning: [:id, :payload, :attempts, :max_attempts]
)
{count, jobs}
end)
end
@doc """
Mark a job as completed successfully.
"""
def complete_job(job_id) do
from(j in Job, where: j.id == ^job_id)
|> Repo.update_all(
set: [status: "completed", completed_at: DateTime.utc_now()]
)
end
@doc """
Mark a job as failed and handle retry logic.
"""
def fail_job(job_id, error_message, attempts \\ 1) do
job = Repo.get!(Job, job_id)
if attempts >= job.max_attempts do
# Permanently failed
from(j in Job, where: j.id == ^job_id)
|> Repo.update_all(
set: [
status: "failed",
error_message: error_message,
attempts: attempts,
completed_at: DateTime.utc_now()
]
)
else
# Retry later
retry_at = DateTime.add(DateTime.utc_now(), 60 * attempts, :second)
from(j in Job, where: j.id == ^job_id)
|> Repo.update_all(
set: [
status: "queued",
error_message: error_message,
attempts: attempts,
scheduled_at: retry_at
]
)
end
end
end
The Power of FOR UPDATE SKIP LOCKED
Notice that crucial line: lock: "FOR UPDATE SKIP LOCKED"
. This is a PostgreSQL feature that’s essential for job processing systems.
Here’s what happens without it:
- Consumer A queries for jobs, gets job #123
- Consumer B queries for jobs, gets the same job #123
- Both consumers try to process job #123 simultaneously
- Chaos ensues
With FOR UPDATE SKIP LOCKED
:
- Consumer A queries for jobs, locks job #123
- Consumer B queries for jobs, skips locked job #123, gets job #124
- Each job is processed exactly once
- No race conditions, no duplicate processing
This is why PostgreSQL (and similar databases) are preferred for job processing systems. The database handles the coordination for us.
Updating Our Producer
Now we can update our producer to fetch real jobs from the database instead of generating counter events. Update lib/job_processor/producer.ex
:
defmodule JobProcessor.Producer do
use GenStage
require Logger
alias JobProcessor.JobQueue
def start_link(_opts) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(:ok) do
Logger.info("Job Producer starting")
{:producer, %{}, dispatcher: GenStage.DemandDispatcher}
end
@impl true
def handle_demand(demand, state) when demand > 0 do
Logger.info("Producer received demand for #{demand} jobs")
case JobQueue.fetch_jobs(demand) do
{:ok, {count, jobs}} when count > 0 ->
Logger.info("Fetched #{count} jobs from database")
{:noreply, jobs, state}
{:ok, {0, []}} ->
# No jobs available, schedule a check for later
Process.send_after(self(), :check_for_jobs, 1000)
{:noreply, [], state}
{:error, reason} ->
Logger.error("Failed to fetch jobs: #{inspect(reason)}")
{:noreply, [], state}
end
end
@impl true
def handle_info(:check_for_jobs, state) do
# This allows us to produce events even when there's no pending demand
# if jobs become available
case JobQueue.fetch_jobs(10) do
{:ok, {count, jobs}} when count > 0 ->
{:noreply, jobs, state}
_ ->
Process.send_after(self(), :check_for_jobs, 1000)
{:noreply, [], state}
end
end
end
Understanding the Producer’s Evolution
Our producer has evolved significantly:
Database-driven - Instead of generating events from memory, we fetch them from persistent storage
Handles empty queues gracefully - When no jobs are available, we schedule a check for later instead of blocking
Error handling - Database operations can fail, so we handle those cases
Polling mechanism - The :check_for_jobs
message lets us produce events even when there’s no pending demand
This polling approach works well for most job processing systems. For higher throughput systems, you could use PostgreSQL’s LISTEN/NOTIFY to get push notifications when new jobs arrive.
Updating Our Consumer
Now our consumer needs to execute real job payloads instead of just logging numbers. Update lib/job_processor/consumer.ex
:
defmodule JobProcessor.Consumer do
use GenStage
require Logger
alias JobProcessor.{Job, JobQueue}
def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
@impl true
def init(opts) do
{:consumer, opts, subscribe_to: [JobProcessor.Producer]}
end
@impl true
def handle_events(jobs, _from, state) do
Logger.info("Consumer received #{length(jobs)} jobs")
for job <- jobs do
execute_job(job)
end
{:noreply, [], state}
end
defp execute_job(%{id: job_id, payload: payload, attempts: attempts}) do
try do
{module, function, args} = Job.decode_job(payload)
Logger.info("Executing job #{job_id}: #{module}.#{function}")
# Execute the job
result = apply(module, function, args)
# Mark as completed
JobQueue.complete_job(job_id)
Logger.info("Job #{job_id} completed successfully")
result
rescue
error ->
error_message = Exception.format(:error, error, __STACKTRACE__)
Logger.error("Job #{job_id} failed: #{error_message}")
# Mark as failed (with retry logic)
JobQueue.fail_job(job_id, error_message, attempts + 1)
end
end
end
The Magic of Code Serialization
The real power of this system is in those two lines:
{module, function, args} = Job.decode_job(payload)
result = apply(module, function, args)
We’re deserializing a function call that was stored as binary data and executing it. This means you can queue any function call:
# Send an email
JobQueue.enqueue(MyApp.Mailer, :send_welcome_email, [user_id: 123])
# Process an image
JobQueue.enqueue(MyApp.ImageProcessor, :resize_image, ["/path/to/image.jpg", 300, 200])
# Call an API
JobQueue.enqueue(MyApp.ApiClient, :sync_user_data, [user_id: 456])
# Even complex data structures
JobQueue.enqueue(MyApp.ReportGenerator, :generate_report, [%{
user_id: 789,
date_range: Date.range(~D[2024-01-01], ~D[2024-01-31]),
format: :pdf
}])
Each of these becomes a row in the database, gets picked up by our GenStage producer, distributed to available consumers, and executed. The serialization handles all the complex data structures automatically.
What We’ve Built
We now have a complete job processing system with:
- Persistent storage - Jobs survive restarts
- Automatic retries - Failed jobs are retried with exponential backoff
- Concurrent processing - Multiple consumers process jobs in parallel
- Backpressure handling - GenStage ensures consumers aren’t overwhelmed
- Race condition prevention - Database locking ensures each job runs exactly once
- Delayed jobs - Support for scheduling jobs to run later
- Error tracking - Failed jobs are logged with error messages
And the beautiful part? GenStage handles all the complex coordination. We just focus on the business logic of our jobs.
Testing Our Job System
Let’s create a simple job to test our system. Add this to lib/job_processor/test_job.ex
:
defmodule JobProcessor.TestJob do
require Logger
def hello(name) do
Logger.info("Hello, #{name}!")
Process.sleep(1000) # Simulate some work
"Greeted #{name}"
end
def failing_job do
Logger.info("This job will fail...")
raise "Intentional failure for testing"
end
def heavy_job(duration_ms) do
Logger.info("Starting heavy job for #{duration_ms}ms")
Process.sleep(duration_ms)
Logger.info("Heavy job completed")
"Completed heavy work"
end
end
Now you can queue jobs from the console:
iex -S mix
# Queue a simple job
JobProcessor.JobQueue.enqueue(JobProcessor.TestJob, :hello, ["World"])
# Queue a failing job (to test retry logic)
JobProcessor.JobQueue.enqueue(JobProcessor.TestJob, :failing_job, [])
# Queue multiple jobs to see parallel processing
for i <- 1..10 do
JobProcessor.JobQueue.enqueue(JobProcessor.TestJob, :hello, ["Person #{i}"])
end
Watch the logs to see jobs being processed, failures being retried, and the natural load balancing across multiple consumers.
We’ve transformed our simple counter example into a production-ready job processing system. The core GenStage concepts remained the same, but now we’re processing real work with persistence, error handling, and retry logic.
Bringing It All Together: From Tutorial to Production
Our tutorial system works well, but production systems need additional sophistication. Here’s where you’d take this next:
Multiple Job Types with Dedicated Queues
Real applications have different types of work with different characteristics:
# High-priority user-facing jobs
JobQueue.enqueue(:email_queue, Mailer, :send_welcome_email, [user_id])
# Background data processing
JobQueue.enqueue(:analytics_queue, Analytics, :process_events, [batch_id])
# Heavy computational work
JobQueue.enqueue(:ml_queue, ModelTrainer, :train_model, [dataset_id])
Each queue gets its own producer, consumer pool, and configuration:
defmodule JobProcessor.QueueSupervisor do
use Supervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def init(_init_arg) do
children = [
# Email queue - fast, lightweight
queue_spec(:email_queue, max_consumers: 5, max_demand: 1),
# Analytics queue - batch processing
queue_spec(:analytics_queue, max_consumers: 3, max_demand: 100),
# ML queue - heavy computation
queue_spec(:ml_queue, max_consumers: 1, max_demand: 1)
]
Supervisor.init(children, strategy: :one_for_one)
end
defp queue_spec(queue_name, opts) do
%{
id: :"#{queue_name}_supervisor",
start: {JobProcessor.QueueManager, :start_link, [queue_name, opts]},
type: :supervisor
}
end
end
Dynamic Consumer Scaling
Scale consumers based on queue depth and system load:
defmodule JobProcessor.AutoScaler do
use GenServer
def init(queue_name) do
schedule_check()
{:ok, %{queue: queue_name, consumers: [], target_consumers: 2}}
end
def handle_info(:check_scaling, state) do
queue_depth = JobQueue.queue_depth(state.queue)
current_consumers = length(state.consumers)
target = calculate_target_consumers(queue_depth, current_consumers)
new_state =
cond do
target > current_consumers -> scale_up(state, target - current_consumers)
target < current_consumers -> scale_down(state, current_consumers - target)
true -> state
end
schedule_check()
{:noreply, new_state}
end
defp calculate_target_consumers(queue_depth, current) do
cond do
queue_depth > 1000 -> min(current + 2, 10)
queue_depth > 100 -> min(current + 1, 10)
queue_depth < 10 -> max(current - 1, 1)
true -> current
end
end
end
Worker Registries and Health Monitoring
Track worker health and performance:
defmodule JobProcessor.WorkerRegistry do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def register_worker(queue, pid, metadata \\ %{}) do
GenServer.cast(__MODULE__, {:register, queue, pid, metadata})
end
def get_workers(queue) do
GenServer.call(__MODULE__, {:get_workers, queue})
end
def get_worker_stats do
GenServer.call(__MODULE__, :get_stats)
end
def handle_cast({:register, queue, pid, metadata}, state) do
Process.monitor(pid)
worker_info = %{
pid: pid,
queue: queue,
started_at: DateTime.utc_now(),
jobs_processed: 0,
last_job_at: nil,
metadata: metadata
}
new_workers = Map.put(state.workers || %{}, pid, worker_info)
{:noreply, %{state | workers: new_workers}}
end
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
Logger.warn("Worker #{inspect(pid)} died: #{inspect(reason)}")
new_workers = Map.delete(state.workers, pid)
{:noreply, %{state | workers: new_workers}}
end
end
Advanced Error Handling
Circuit breakers for failing job types:
defmodule JobProcessor.CircuitBreaker do
use GenServer
def should_process_job?(job_type) do
GenServer.call(__MODULE__, {:should_process, job_type})
end
def record_success(job_type) do
GenServer.cast(__MODULE__, {:success, job_type})
end
def record_failure(job_type, error) do
GenServer.cast(__MODULE__, {:failure, job_type, error})
end
def handle_call({:should_process, job_type}, _from, state) do
circuit_state = Map.get(state.circuits, job_type, :closed)
case circuit_state do
:closed -> {:reply, true, state}
:open ->
if circuit_should_retry?(state, job_type) do
{:reply, true, transition_to_half_open(state, job_type)}
else
{:reply, false, state}
end
:half_open -> {:reply, true, state}
end
end
end
Dead Letter Queues
Handle permanently failed jobs:
defmodule JobProcessor.DeadLetterQueue do
def handle_permanent_failure(job, final_error) do
dead_job = %{
original_job: job,
failed_at: DateTime.utc_now(),
final_error: final_error,
attempt_history: job.attempt_history || [],
forensics: collect_forensics(job)
}
Repo.insert(%DeadJob{data: dead_job})
JobProcessor.Notifications.send_dead_letter_alert(dead_job)
end
defp collect_forensics(job) do
%{
system_load: :erlang.statistics(:scheduler_utilization),
memory_usage: :erlang.memory(),
queue_depths: JobQueue.all_queue_depths(),
recent_errors: JobProcessor.ErrorTracker.recent_errors(job.module)
}
end
end
Observability
Comprehensive monitoring with telemetry:
defmodule JobProcessor.Telemetry do
def setup do
events = [
[:job_processor, :job, :start],
[:job_processor, :job, :stop],
[:job_processor, :job, :exception],
[:job_processor, :queue, :depth]
]
:telemetry.attach_many("job-processor-metrics", events, &handle_event/4, nil)
end
def handle_event([:job_processor, :job, :stop], measurements, metadata, _config) do
JobProcessor.Metrics.record_job_duration(metadata.queue, measurements.duration)
JobProcessor.Metrics.increment_jobs_completed(metadata.queue)
JobProcessor.Metrics.record_job_success(metadata.module, metadata.function)
end
end
GenStage’s demand-driven architecture naturally handles backpressure, load balancing, and fault isolation. These production patterns build on that foundation, giving you the tools to run job processing at scale. The same principles that made our tutorial system work - processes, supervision, and message passing - scale to enterprise deployments.
Looking Back, Moving Forward
Ten years later, I’m struck by how well this design has aged. The fundamentals haven’t changed because they were right from the beginning: isolated processes, demand-driven flow control, and supervision trees create systems that are both simple to reason about and robust under pressure.
What has changed is the ecosystem around these patterns. Tools like Broadway have formalized many of the production patterns we outlined. LiveView has shown how the same message-passing principles work for interactive applications. The BEAM’s reputation for building resilient systems has only grown stronger.
If you’re building distributed systems, the lessons from this tutorial extend far beyond job processing. Whether you’re handling WebSocket connections, processing streaming data, or coordinating microservices, the same patterns apply: pull don’t push, isolate failures, and let the system self-regulate.
The beauty of GenStage isn’t in its complexity—it’s in its simplicity. When your job processor starts handling millions of tasks without breaking a sweat, remember that it’s built on the same simple idea we started with: consumers asking producers for exactly what they can handle, one event at a time.