Storh

Queries

RecordQuery is the streaming filter used with stream(). It covers the append-flavored access patterns: resume after a cursor, read a time window, filter on field equality, and stop at a limit. For richer predicates, ordering, and index-backed lookups, use the query builder; for everything a plain foreach over a stream can do, RecordQuery is the cheaper tool.

RecordQuery is immutable. Each modifier returns a cloned query:

$query = Storh\RecordQuery::all()
    ->after($lastSeenId)
    ->time_range_ms($fromMs, $untilMs)
    ->where_equal('kind', 'page')
    ->limit(100);

foreach ($store->stream($query) as $record) {
    // Records are yielded lazily.
}

after() and time_range_ms() use UUID ordering, so they need no timestamp field in the record data. On SegmentedLog they also skip work: segments whose id range falls outside the window are never opened, and sparse per-segment indexes seek close to the first matching record instead of scanning from the top.

where_equal() compares a top-level data field with === and accepts scalar values or null. A null filter matches records where the field is present and null; records without the field do not match.

Cursor pagination

Ids sort by creation time, so the last id of one page is the cursor for the next:

$page = iterator_to_array($store->stream(
    Storh\RecordQuery::all()->after($lastSeenId)->limit(100)
), false);

$lastSeenId = [] === $page ? $lastSeenId : end($page)->id();

Surviving corrupt records

A scan throws on the first unreadable record by default. Use continue_on_error() when the scan should skip it and report the failure instead:

$query = Storh\RecordQuery::all()->continue_on_error(
    static function (string $location, Throwable $error): void {
        error_log($location . ': ' . $error->getMessage());
    },
);

The location is the record id for DocStore and segment-file:byte-offset for SegmentedLog. repair() is the tool for actually cleaning corrupt records up; see CLI.

On this page