Distributed Replay Buffer with Flame Runner
This blog reviews the replay-buffer example in examples/rl/replay_buffer. The example demonstrates how Flame can manage an append-heavy reinforcement-learning data structure by combining flamepy.core.patch_object, versioned object reads, and flamepy.runner services.
The example is not intended to evaluate an RL algorithm. It uses random Gymnasium actions, so reward quality is outside the scope here. The relevant question is whether the replay-buffer data path can support distributed collection and repeated sampling without forcing every operation through full-buffer copies.
Executive summary
| Item | Finding |
|---|---|
| Artifact | Distributed replay buffer implemented with flamepy.runner, Flame object cache, and Gymnasium collectors. |
| Primary write path | Collectors append transition batches through flamepy.core.patch_object, so writes send only deltas instead of replacing the full buffer. |
| Primary read path | Replay-buffer service instances use get_object(..., deserializer=...) to materialize state from a base object plus patches. With cached versions, reads can apply only newly observed patch batches. |
| Sampling model | --sample-parallelism N creates N replay-buffer service instances and issues N sample requests, allowing batch sampling work to run in parallel over the same shared replay-buffer object. |
| Differentiation | Flame separates handler and data: multiple handlers can serve parallel sample() calls while the buffer data remains in one shared ObjectRef. |
| Programming model | The example separates handler and data: ReplayBuffer carries behavior and an ObjectRef; the buffer contents live in Flame’s object cache. |
| Performance interpretation | The example run shows incremental distributed reads substantially outperforming forced full reads. Local in-memory execution can still be faster for cheap environments because it avoids Flame runtime overhead. |
Scope
The implementation under review consists of three files:
| File | Role |
|---|---|
main.py |
Creates the Runner, starts collector and replay-buffer services, coordinates collection, and issues sample requests. |
collector.py |
Steps a Gymnasium environment, accumulates transitions, and calls buffer.push(...). |
replay_buffer.py |
Owns the shared ObjectRef, appends patches, materializes state, merges patches, and samples transitions. |
The example covers data movement and service orchestration. It does not implement policy learning, prioritized replay, storage eviction, or correctness guarantees beyond the shared object-cache semantics used by the example.
System architecture
The distributed path begins by creating a shared cached object and registering two service types:
with Runner(f"replay-buffer-{env_name.lower()}") as rr:
buffer = ReplayBuffer(rr, force_full_get=force_full_get)
buffer_svc = rr.service(buffer, warmup=sample_parallelism)
collector = rr.service(Collector(env_name), autoscale=True)
ReplayBuffer(rr) initializes the cache object:
self.buffer_ref = rr.put_object({"transitions": [], "total_added": 0})
From that point forward, the replay buffer is represented by two parts:
| Part | Responsibility |
|---|---|
ObjectRef data |
Stores the base replay-buffer state plus appended transition patches in Flame’s cache. |
ReplayBuffer handler |
Provides push, state, sample, merge, and materialization behavior around the shared object reference. |
The collection loop fans out environment work and lets each collector append directly to the shared object:
collect_futures = [
collector.collect(buffer, steps_per_collection)
for _ in range(num_collections)
]
collect_results = rr.get(collect_futures)
Finding 1: Delta updates reduce write amplification
The write path is deliberately narrow:
def push(self, transitions: List[dict]) -> None:
self._patch_object(self.buffer_ref, transitions)
Each collector accumulates a batch of transitions locally, then calls buffer.push(transitions). Because push uses patch_object, the collector writes only the newly produced transitions.
This is the main design advantage for replay-buffer writes. A whole-object update would require a worker or service to read the existing buffer, append locally, and write the full object back. The patch-based path changes the normal append flow to:
collector -> ObjectRef patch
instead of:
collector -> replay-buffer service -> full buffer replacement
The replay-buffer service remains important for reads and sampling, but append traffic does not need to be serialized through one service method that owns the entire Python list.
Finding 2: Incremental reads control replay-buffer read cost
Patch writes only help if readers can avoid repeatedly downloading and replaying the full patch history. The example addresses this with Flame’s versioned get_object behavior and a stateful deserializer.
ReplayBuffer._fetch() reads the object through get_object:
def _fetch(self) -> dict:
ref = self.buffer_ref
if self.force_full_get:
ref = self._object_ref(endpoint=ref.endpoint, key=ref.key, version=0)
return self._get_object(ref, deserializer=self._deserializer)
The --force-full-get flag changes the ObjectRef version to 0, forcing a full read. Normal mode allows the client cache to request only the patches that arrived after the service instance’s last cached version when the server can safely return a patch-only response.
The replay-buffer deserializer then applies only the newly seen patch batches:
for delta in deltas[self._materialized_patch_count :]:
self._materialized_data["transitions"].extend(delta)
self._materialized_data["total_added"] += len(delta)
self._materialized_patch_count = len(deltas)
return self._materialized_data
For a long-lived replay-buffer service instance, this changes repeated state() and sample() calls from rebuilding against the entire history to advancing a local materialized view. That is the read-side complement to patch_object.
Finding 3: Sampling can run in parallel
The example exposes replay-buffer sample parallelism as a runtime parameter, not as an afterthought:
DEFAULT_SAMPLE_PARALLELISM = 2
main.py splits one logical batch into multiple sample requests:
sample_request_sizes = _sample_request_sizes(batch_size, sample_parallelism)
sample_futures = [
buffer_svc.sample(request_size)
for request_size in sample_request_sizes
]
batches = rr.get(sample_futures)
The service is created with warmup=sample_parallelism, so Flame starts a fixed set of replay-buffer service instances. For example, --batch-size 64 --sample-parallelism 2 creates two sample requests whose returned batches sum to 64 transitions.
This does not add threads inside a single replay-buffer instance. It creates multiple service instances and lets Flame schedule sample requests across them. Each instance maintains its own materialized view of the shared cached object; the shared ObjectRef is the source of truth.
The expected benefit depends on workload shape. Parallel sampling is most useful when the buffer is large enough, sampling is frequent enough, or the learner can overlap sampling with other distributed work.
The important differentiation is the ownership model. The replay-buffer data lives in the object cache, while ReplayBuffer service instances are handlers around the same ObjectRef. Increasing --sample-parallelism increases the number of sampling handlers without changing the logical ownership of the data.
That makes parallel sampling a direct expression of Flame’s handler-plus-data model. The application asks for more sample handlers; Flame keeps the buffer data in shared cache and lets each handler materialize the patches it needs.
Finding 4: Handler plus data is the general model
The example is best read as a handler-plus-data pattern.
The data is the object stored in Flame’s cache:
{"transitions": [], "total_added": 0}
The handler is the Python object that defines behavior:
class ReplayBuffer:
def push(self, transitions): ...
def state(self): ...
def sample(self, batch_size): ...
def merge(self): ...
This model is more general than replay buffers. The handler can be passed to remote collectors, registered as a service, and materialized in multiple processes. The data remains addressable through ObjectRef and can evolve through patches.
The same structure applies to other append-heavy shared objects, including rollout queues, metrics accumulators, feature caches, evaluation logs, and intermediate datasets produced by many workers.
Reproduction procedure
Run the distributed example from a Flame console container:
docker compose exec -it flame-console /bin/bash
cd /opt/examples/rl/replay_buffer
uv run main.py
Run the local baseline without a Flame cluster:
uv run main.py --local
Relevant distributed flags:
| Flag | Description | Default |
|---|---|---|
--env |
Gymnasium environment | CartPole-v1 |
--iterations |
Collection iterations | 50 |
--collections |
Collector calls per iteration | 20 |
--steps-per-collection |
Environment steps per collector call | 500 |
--batch-size |
Total sampled batch size per iteration | 64 |
--sample-parallelism |
Replay-buffer service instances and distributed sample requests | 2 |
--metrics-json |
Write per-iteration metrics | off |
--force-full-get |
Force full replay-buffer reads for baseline measurement | off |
--merge-every |
Compact patches every N iterations | mode-dependent |
--no-merge |
Disable patch merging | off |
To compare forced full reads with incremental reads on the same workload:
mkdir -p /tmp/replay-buffer-metrics
uv run main.py \
--force-full-get \
--metrics-json /tmp/replay-buffer-metrics/full.json \
--iterations 50 \
--collections 20 \
--steps-per-collection 500 \
--batch-size 64
uv run main.py \
--metrics-json /tmp/replay-buffer-metrics/incremental.json \
--iterations 50 \
--collections 20 \
--steps-per-collection 500 \
--batch-size 64
Example measurement
The example README includes one 500,000-transition run:
| Mode | Total time | Throughput |
|---|---|---|
| Local baseline | 2.24s | 223,376.4 transitions/sec |
| Incremental reads | 6.58s | 76,016.1 transitions/sec |
| Forced full reads | 71.84s | 6,959.5 transitions/sec |
The main distributed comparison is incremental reads versus forced full reads. In this run, incremental reads completed in 6.58s and forced full reads completed in 71.84s, a difference consistent with reducing repeated full-history transfer and materialization.
The local baseline is still faster for this specific workload because CartPole-v1 and random action sampling are cheap, while distributed execution pays service, scheduler, network, and cache overhead. This result should not be read as a claim that distributed replay is always faster than local memory. It shows where patch-based reads and writes improve the distributed path.
Limitations
The example is a data-path demonstration, not a complete RL training system.
- The collector uses random actions; no policy update or reward-quality conclusion should be drawn.
- The replay buffer does not implement prioritized sampling or capacity-based eviction.
- Parallel sampling duplicates materialized views across service instances, trading memory for concurrency.
- Results depend on cluster size, cache placement, environment cost, batch size, and collection parallelism.
- Local execution remains the right baseline for small environments and small buffers.
Conclusion
The replay-buffer example demonstrates a practical Flame pattern for append-heavy RL workloads:
- Use
flamepy.core.patch_objectto append transition batches as deltas. - Use versioned
get_objectplus a deserializer to materialize only newly observed patches on repeated reads. - Use
flamepy.runnerservices to issue sample requests in parallel over the same shared replay-buffer object when the buffer is large enough to justify it. - Model shared state as handler plus data: Python behavior around an
ObjectRef, with the data stored and evolved in Flame’s cache.
The broader result is a programming model that keeps application code small while giving Flame room to optimize data movement and service placement.