You said:
ChatGPT said:
I’ll inspect the repository’s TypeScript client/server stream paths, focusing on delivery guarantees around ACKs, cursors/offsets, retries, buffering, reconnection, persistence, backpressure, and error/abort handling.
I’ve narrowed the audit to concrete suspects: write batching/producer sequencing, idempotent-producer retry/close behavior, SSE/long-poll state, binary SSE CORS headers, and store offset handling.
The newest findings strengthen two suspects: producer retry/auto-claim behavior and file-backed recovery now look like plausible real delivery-corruption risks, not just edge-case inefficiencies.
Yes — I found several that look like real reliability bugs, not just style issues. I only did a static audit of the uploaded TypeScript client/server zip, so the line numbers below are from that snapshot.
1) IdempotentProducer.flush() can report success after failed writes
This is the scariest general client-side issue.
In packages/client/src/idempotent-producer.ts, flush() only does:
-
enqueue pending batch:
:315-325 -
await this.#queue.drained()::327-328
But batches are pushed with a rejected-promise handler that deliberately swallows the worker error:
-
delayed push:
:546-550 -
normal push:
:553-555
The worker does call onError and rethrow:
-
#batchWorker::562-586
The problem is that the rethrow rejects the specific queue.push(...) promise, and that promise is caught and ignored. queue.drained() only waits for processing to finish; it is not the same as awaiting every push promise. The fastq docs distinguish these: queue.push(task) returns the promise that fulfills/rejects for that task, while queue.drained() resolves when the queue has been processed. GitHub
So this can happen:
-
producer.append("important") -
server/network/JSON validation fails
-
#batchWorkerthrows, but the push rejection is swallowed -
await producer.flush()resolves -
caller believes the message was delivered
That directly violates the comment in the example: “Ensure all messages are delivered before shutdown” at idempotent-producer.ts:122-124.
Fix: keep the push promises in a tracked set and have flush() await them, or record #firstError in #batchWorker and make flush() throw after drained(). Also, close() should refuse to close after a prior write error unless the caller explicitly discards pending writes.
2) autoClaim + maxInFlight can silently drop batches or hang on sequence waits
The code says it must block pipelining until the first auto-claim completes:
-
#epochClaimedsetup:idempotent-producer.ts:216-221 -
intended guard:
:541-544
But the guard uses this.#queue.length() > 0. In fastq, queue.length() is the number of tasks waiting in the queue, not tasks currently running; running work is not counted there. GitHub
That means a first batch can be in-flight claiming a new epoch while length() === 0, allowing more batches to start with the stale epoch.
The bad path is:
-
worker captures the current epoch before sending:
idempotent-producer.ts:562-565 -
on
403, auto-claim setsthis.#epoch = currentEpoch + 1 -
it resets
#nextSeq -
it retries that same batch as
seq = 0::729-743
If two stale batches race, both can retry under the newly claimed epoch with seq = 0. The server treats seq <= lastSeq as a duplicate:
-
in-memory store:
packages/server/src/store.ts:526-528 -
file store:
packages/server/src/file-store.ts:494-496 -
server returns
204 No Contentfor duplicates:packages/server/src/server.ts:1521-1535 -
client treats
204as success:idempotent-producer.ts:717-721
So batch A can be appended, batch B can be different data but receive “duplicate success,” and the client never knows B was not appended.
There is also a hang variant: #batchWorker signals sequence completion using the old captured epoch/seq after a successful auto-claim retry:
-
captures old epoch:
idempotent-producer.ts:562-565 -
signals completion with that old tuple:
:575-576
If a later request under the new epoch gets a 409 sequence_gap and waits for newEpoch/seq0, it can wait forever because the successful seq0 was signaled under the stale epoch:
-
wait logic:
idempotent-producer.ts:749-765
Fix: make auto-claim a single-flight gate. Until the claim has succeeded, dispatch exactly one batch. After claiming, reassign sequence numbers for all pending batches under the new epoch, or enqueue them only after the claim promise resolves. Also signal completion for the actual committed epoch/seq, not the originally attempted tuple.
3) File-backed long-poll can miss the final append-and-close message
This one is server-side and file-store-specific.
In file-backed append, append-and-close commits closed: true and then notifies long-polls:
-
metadata update sets
closed:packages/server/src/file-store.ts:1207-1217 -
then notifies data waiters and closed waiters:
:1220-1224
The pending long-poll resolver marks streamClosed based only on current metadata, even if it is returning messages:
-
file-store.ts:1671-1680
Then server.ts checks result.streamClosed before it uses result.messages:
-
packages/server/src/server.ts:919-932 -
only later would it assign
messages = result.messages::957-958
So a waiting long-poll client can receive 204 Stream-Closed at the old offset, with no final data, even though the append-and-close wrote data. The client then marks the stream closed and stops.
The in-memory store has the safer behavior: it only sets streamClosed in the pending resolver when msgs.length === 0:
-
packages/server/src/store.ts:1129-1133
Fix: either make file-store match in-memory:
streamClosed = currentMeta?.closed && msgs.length === 0 ? true : undefined
or change handleRead() to only early-return closed when result.streamClosed && result.messages.length === 0. If messages are present, return 200 with the messages and include Stream-Closed: true on that final response.
4) File-backed close can race with append and be overwritten
File-backed append has a per-stream append lock:
-
lock field:
packages/server/src/file-store.ts:236-241 -
append acquires it:
:1027-1037
But close does not use that lock:
-
closeStream():file-store.ts:1278-1297 -
closeStreamWithProducer()::1310-1405
appendInner() reads metadata before doing async file I/O:
-
reads metadata:
file-store.ts:1045 -
writes/fsyncs later:
:1172-1187 -
then writes
updatedMeta.closed: options.close ? true : streamMeta.closed::1207-1217
Race:
-
append starts and reads
closed = false -
append awaits file write/fsync
-
close writes
closed = true -
append resumes and writes stale
closed = false
That can effectively reopen a stream after a successful close. It can also erase closedBy.
Fix: close paths need to participate in the same per-stream serialization as appends, or append must re-read and merge metadata immediately before putSync(), preserving closure monotonically:
closed = options.close || freshMeta.closed || streamMeta.closed
Be careful with lock ordering: appendWithProducer() currently takes producer lock then stream lock, so producer close should use the same order or be refactored to avoid deadlocks.
5) SSE fallback appears to close the stream instead of falling back
The SSE state machine can request fallback to long-poll:
-
packages/client/src/stream-response-state.ts:190-215
#trySSEReconnect() handles that and returns null:
-
packages/client/src/response.ts:447-465
But the caller treats null as “closed”:
-
#processSSEEvents():response.ts:522-539 -
stream closes on
{ type: "closed" }::773-776
The long-poll branch exists below:
-
response.ts:792-832
…but the code never reaches it after fallback. So in the exact scenario fallback is meant to survive — short/broken SSE connections — the response stream just ends.
Fix: return a distinct result such as { type: "fallback" }, set sseEventIterator = null, and continue the pull loop so the long-poll branch runs using the current offset/cursor.
Other sharp edges worth fixing
IdempotentProducer.close() does not handle 409 sequence_gap, while #doSendBatch() does. If a close overtakes a delayed or earlier batch, close fails instead of waiting for the missing seq. Compare #doClose() at idempotent-producer.ts:401-480 with send-batch gap handling at :749-765.
DurableStream.writable().write() returns immediately and ignores the producer’s eventual success/failure:
-
packages/client/src/stream.ts:835-838
That means Web Stream backpressure is effectively bypassed. Errors only surface later through onError/close(), and close() currently calls producer.close() before checking writeError:
-
stream.ts:839-843
appendStream() uses a streaming request body but the shared fetch wrapper retries by reusing the same args indefinitely by default:
-
default
maxRetries: Infinity:packages/client/src/fetch.ts:53-58 -
retry loop reuses
args::105-176 -
appendStream()passes aReadableStreambody:packages/client/src/stream.ts:761-771
A streamed request body is generally not safely replayable. On a retryable response or network error, this can hang or repeatedly fail instead of producing a clean non-retryable error.
Stream-Seq with batching is per HTTP batch, not per append() call. The batching path picks only the last non-undefined seq:
-
packages/client/src/stream.ts:620-631
So append(..., { seq: "1" }), append(..., { seq: "2" }), append(..., { seq: "3" }) can become one POST with only Stream-Seq: 3. That does not drop the batched payload, but it means server-side writer coordination is at batch granularity, not append-call granularity.
Patch order I’d choose
Fix these first:
-
IdempotentProducer.flush()error propagation. -
autoClaimserialization/renumbering. -
file-store long-poll final append-and-close handling.
-
file-store close/append locking.
-
SSE fallback control flow.
Those five are the ones most likely to produce “I wrote data, got success/EOF, and a reader never sees it” or “the live stream just stops.”