Shared Memory Corrupted by Overlapping Agent Writes

Two agents write to the same shared memory store simultaneously, producing garbled or inconsistent state. Here's how to detect and prevent overlapping writes.

Your AutoGen or CrewAI multi-agent system uses a shared Redis store as the “team memory” — agents read prior findings and write their own. Agent A reads the research summary, appends three new findings, and writes back the combined list. Simultaneously, Agent B reads the same research summary (before Agent A writes), appends two different findings, and writes back its version. Agent A’s three findings are silently overwritten. The team memory now contains Agent B’s two findings only, Agent A’s work is gone, and downstream agents operate on incomplete context. This is a classic read-modify-write race condition applied to shared LLM memory.

Common causes

1. Read-modify-write without atomic compare-and-swap

The most common pattern. Agents read the full memory state, modify it in Python, then write back the full state. There is no check that the state hasn’t changed between the read and the write. Any overlapping read-modify-write pair loses one agent’s changes.

How to spot it: Find any code that does state = store.get(key); state.update(new_data); store.set(key, state). Without an if-not-modified-since check or a WATCH/CAS operation, this pattern always loses concurrent writes.

2. Append operations not atomic at the storage layer

Two agents both call store.append(key, new_item). The underlying implementation does store.set(key, store.get(key) + [new_item]) — a non-atomic read-modify-write. Both agents read the same list, both append one item, both write back a list with one item. The second write overwrites the first.

How to spot it: Check whether your store’s append or add operations are implemented as atomic server-side operations (e.g., Redis RPUSH) or as client-side read-modify-write (e.g., get + modify + set). Client-side is not safe for concurrent agents.

3. No write lock — multiple agents write concurrently

Parallel agents all have write access to the same memory namespace. There is no mutex or lock preventing simultaneous writes. The pipeline assumed agents would write to separate keys, but they end up writing to the same key because the key is derived from a shared task property (task category, model name, date).

How to spot it: Log every write call with the key, agent ID, and timestamp. Overlapping writes to the same key (multiple writes within a sub-second window) indicate a concurrency issue.

4. Optimistic locking version check skipped under load

The memory store uses optimistic locking: each record has a version field. Agents read version v, compute an update, and write back with a WHERE version = v condition. Under load, the write fails (another agent updated first), but the error handler retries without re-reading the current state — it retries the stale update.

How to spot it: Check the retry logic for version-conflict errors. If it retries the write with the original (stale) data rather than re-reading and re-computing, it will overwrite the winning write with stale data on the second attempt.

5. LangGraph state reducer merges incompatibly

In LangGraph, parallel node outputs are merged via state reducers. If two nodes both update the same key with operator.add on dicts (which tries + on dicts — a TypeError in Python) or if the reducer strategy is last_write_wins for a key that needs append, the merged state is wrong.

How to spot it: Review each key in the LangGraph state TypedDict. For keys updated by multiple parallel nodes, check whether the reducer (Annotated[list, operator.add] or equivalent) correctly merges all updates rather than discarding one.

6. Memory store flushes periodically and loses in-flight writes

A write-behind cache flushes to disk every 5 seconds. Two agents write within the same 5-second window. The flush writes the last in-memory state, which may only contain one agent’s write if the second write arrived after the first agent’s write was already being flushed.

How to spot it: Check the write-behind cache’s flush interval. If the interval is longer than the typical time between concurrent agent writes, writes can be lost during flush.

Shortest path to fix

Step 1: Use atomic server-side operations for all shared-memory writes

For append operations:

import redis

r = redis.Redis()

# WRONG — client-side read-modify-write
def append_finding_unsafe(key: str, finding: str):
    findings = json.loads(r.get(key) or "[]")
    findings.append(finding)
    r.set(key, json.dumps(findings))

# CORRECT — atomic server-side append via Redis list
def append_finding_safe(key: str, finding: str):
    r.rpush(key, finding)  # atomic RPUSH — no read-modify-write

def get_findings(key: str) -> list:
    return [item.decode() for item in r.lrange(key, 0, -1)]

For structured updates, use Redis hash fields:

# Atomic field-level update — only the field changes, not the whole record
r.hset("agent_memory", mapping={
    f"finding:{agent_id}:{timestamp}": json.dumps(finding_data)
})

Step 2: Implement optimistic locking with correct retry logic

def update_with_optimistic_lock(key: str, update_fn, max_retries: int = 5):
    for attempt in range(max_retries):
        with r.pipeline() as pipe:
            try:
                pipe.watch(key)  # watch for changes
                current = json.loads(pipe.get(key) or "{}")
                new_state = update_fn(current)  # compute update on current state

                pipe.multi()  # start transaction
                pipe.set(key, json.dumps(new_state))
                pipe.execute()  # atomic — fails if key changed since WATCH
                return new_state
            except redis.WatchError:
                # Key changed — re-read and retry (not stale retry)
                logger.debug("Optimistic lock conflict on %s — retrying (attempt %d)", key, attempt + 1)
                continue
    raise ConcurrencyError(f"Could not update {key} after {max_retries} attempts")

The critical difference from the buggy version: on WatchError, the function re-reads the current state before retrying.

Step 3: Partition memory namespaces by agent ID for private writes

def write_agent_memory(agent_id: str, key: str, value: dict):
    # Private namespace — no other agent writes here
    namespaced_key = f"agent:{agent_id}:{key}"
    r.set(namespaced_key, json.dumps(value))

def read_shared_memory(key: str) -> dict:
    # Shared namespace — read-only for agents
    return json.loads(r.get(f"shared:{key}") or "{}")

def publish_to_shared(agent_id: str, contribution_key: str, value: dict):
    # Coordinator-only write to shared namespace
    # Uses optimistic locking or Redis streams
    r.xadd("shared_memory_stream", {
        "agent_id": agent_id,
        "key": contribution_key,
        "value": json.dumps(value),
    })

A separate coordinator process reads from the stream and merges contributions with proper conflict resolution.

Step 4: Fix LangGraph state reducer for parallel node outputs

from typing import Annotated
import operator

class AgentState(TypedDict):
    # Use operator.add for lists — appends both lists
    findings: Annotated[list[str], operator.add]
    # Use a custom reducer for dicts — deep merge
    artifacts: Annotated[dict, deep_merge]

def deep_merge(a: dict, b: dict) -> dict:
    result = dict(a)
    for key, val in b.items():
        if key in result and isinstance(result[key], dict) and isinstance(val, dict):
            result[key] = deep_merge(result[key], val)
        else:
            result[key] = val
    return result

Test the reducer explicitly:

def test_parallel_findings_merge():
    state_a = {"findings": ["finding 1", "finding 2"]}
    state_b = {"findings": ["finding 3"]}
    merged = merge_state(state_a, state_b)
    assert len(merged["findings"]) == 3  # all three preserved

Step 5: Log all writes with agent ID and detect conflicts

def monitored_write(key: str, value, agent_id: str):
    r.set(key, json.dumps(value))
    r.lpush(
        f"write_log:{key}",
        json.dumps({"agent": agent_id, "ts": time.time(), "size": len(str(value))})
    )
    # Check for recent writes by other agents (within 100ms)
    recent = r.lrange(f"write_log:{key}", 0, 5)
    agents = [json.loads(e)["agent"] for e in recent]
    if len(set(agents)) > 1:
        logger.warning("Concurrent writes detected on key %s by agents: %s", key, agents)

Prevention

  • Use atomic server-side operations (Redis RPUSH, HSET, ZADD, SQL INSERT ON CONFLICT) for all shared-memory writes — never client-side read-modify-write.
  • Partition memory into per-agent private namespaces for writes; use a coordinator to merge into shared namespaces with explicit conflict resolution.
  • Implement optimistic locking with WATCH/CAS for any write that requires reading before writing; re-read the current state on retry, never retry the stale update.
  • In LangGraph, define explicit state reducers for every key that parallel nodes update; test reducers for commutativity and associativity.
  • Log all writes with agent ID and timestamp; alert when two different agent IDs write the same key within a 100ms window.
  • Write a concurrent-write test: spawn 10 threads that all append to the same key simultaneously and verify that all 10 values are present in the result.
  • Avoid write-behind caches for shared agent memory; use write-through caching to ensure durability.
  • Document which keys are private (one writer) vs. shared (multiple writers); shared keys require explicit concurrency controls.

FAQ

Q: Does Redis support fully serialized multi-key transactions? A: Redis supports MULTI/EXEC transactions and WATCH for optimistic locking on multiple keys. These are single-shard only — they don’t work across Redis Cluster shards. For multi-key atomic operations across a cluster, use Lua scripts (EVAL), which execute atomically.

Q: Is a message queue safer than shared memory for agent communication? A: Yes. Message queues (Redis Streams, Kafka, SQS) serialize writes by design — each message is appended atomically, and consumers read the ordered log. For agent communication that is inherently sequential (each agent contributes findings), a stream is safer and more auditable than a mutable shared dict.

Q: How do I debug a corruption that has already happened? A: Use the write log to reconstruct the sequence of writes. Find the last write that produced a correct state, identify the overlapping writes after it, and manually re-apply the lost updates. For a Redis-backed store, enable keyspace-events to get a real-time log of future writes for post-mortem analysis.

Q: Do embedding-based vector stores have the same concurrency issues? A: Yes. Pinecone, Qdrant, and Chroma have upsert semantics — a concurrent upsert with the same vector ID overwrites the previous one. Use unique IDs (agent ID + timestamp + content hash) for every write rather than a fixed ID per “topic.” This converts the concurrency problem from destructive overwrites to an append-and-deduplicate problem.

Tags: #AI coding #Agents #Troubleshooting