How do I improve?

Hello!

I’ve been using ESDB since version 5.0. Today, I feel quite capable with the tool. I use it in production. I tinker with it regularly. It’s my second favorite piece of software in the world.

That being said, I’ve basically learned event-sourcing and DDD backwards by reverse engineering the concepts using Kurrent and talking to other engineers over the years. That is all well and good, but I feel a gap in my ability to push Kurrent to its limit.

For example, my current production instance of ESDB 24.10 is benchmarking at about 10-14,000 transactions per second. But my one single projection cannot keep up with more than 750 events per second. The projection fans events out from the $all stream into a {key}-checkpoint stream and a {key}-routes stream.

Because of the backpressure generated by the single-threaded JS interepreter (Jint? sorry if I butchered this), I have to carefully monitor my production server. I have a half dozen apps built around my ESDB instance whose sole job is to basically… prevent ESDB from stalling out and crashing when the world is inevitably stopped by GC or system.

For numbers, I am doing about 600k events per day, or 16 million events every month. It works out to roughly 7 events/sec. This is fine, my projection keeps up in real-time speeds. What doesn’t work though is when I need to reset the projection and process the entire history again. Then it takes days and days to process. That’s my core pain point right now. I suspect I’m misunderstanding how scavenging and checkpointing is meant to work. Are you supposed to have 10s of millions of events in a projection?

Obviously, this is an issue of education. I see clearly that Kurrent/ESDB can push harder. But I’ve already thrown a lot of hardware at the issue.

So here’s my question: How the heck do I learn to improve? There’s a possibility that secondary indexes could help me, or maybe concurrent event projections? But I am struggling to visualize either of those, or to verify if they are rabbit holes worth going down or not.

I’m looking for tips, resources, anything remotely helpful. Books, docs, whatever it is I’ll take it!

Thanks so much for taking the time to listen.

p.s. I love Kurrent so much.

1 Like

Hiya @ajohnston! Thanks for your kind words :slight_smile: and sorry about the pains that you’re going through.

10s of millions of events is not a small number. There are a few approaches we can potentially take to speed this up but it really depends on your scenario.

To get started, will you be able to describe what the projection is used for? It’ll also be great to see what your projection code is like. You’ve mentioned about scavenging (which is always a good idea) but is there anything that I stopping you from using it to reduce the number of streams required for projection?

Hey @stephen.tung, thanks for the quick reply!

What my projection is doing in plain terms:

  1. All events from my client are written directly to the timeline stream. These events carry metadata describing which types it’s relevant to. This is a couple hundred bytes of data containing all of the ids of each relevant stream.
  2. timeline events - these are the core domain events. When one arrives, the projection:
  • Reads route metadata (routeTypes and routeIds) from the event.
  • Copies routeTypes into a working array: [Shipping, Delivery, Purchase, etc…]
  • Iterates through routeIds (typically 1-5). For each entry, it resolves the type name and fans out linkTo for each of the streams the event should be routed to. This is typically 1-5 link events.
  • Tracks each instance’s “latest” position so we know what still needs processing.
  1. {key}-checkpoint events - these are progress markers written by downstream flow processors. When one arrives, the projection:
    • Updates the checkpoint position for that flow instance
    • Marks instances as done (isDone) or errored (errorPosition)
    • Removes completed instances from tracking

The output state is a “resume manifest”; it computes which flow instances are resumable (latest > checkpoint, not stopped) and returns { checkpoint, routes }. My apps read this state to know which flows have pending work after a restart or interruption.

Example Output:

{
  "checkpoint": 1560873,
  "routes": [
    "ShiftScheduleTopic",
    "ClientClassifierTopic",
    "GenericClientPathParserTopic",
    [
      "BoxRollList",
      "Client_Name202416075:299:Law Docket 1986L1906"
    ]
  ],
  "schedule": []
}

So in summary: it’s a coordination projection. It builds a real-time picture of “what work is pending” by cross-referencing timeline events against checkpoint progress, and it fans events out into per-flow route streams so each processor only sees the events it cares about.

Source Code – Scheduling is unused currently:

// resume-projection.js
//
// Observes the area stream and flow checkpoint streams to:
//
// 1. Link area events to a stream for each flow instance
// 2. Maintain a list of instances with pending work, to use when resuming
// 3. Maintain a schedule of pending events
//
// More information on EventStore projections:
// https://eventstore.org/docs/projections/user-defined-projections/

options({
  resultStreamName: "resume"
});

fromAll()
  .when({ $init: defaultState, $any: onNext })
  .transformBy(getResumeState)
  .outputState();

function defaultState() {
  return { checkpoint: null, schedule: {}, instances: {} };
}

function onNext(state, event) {
  let { streamId, metadataRaw } = event;
  let metadata = metadataRaw ? JSON.parse(metadataRaw) : {};

  observe();

  function observe() {
    if(streamId.startsWith("$")) {
      return;
    }
    else if(streamId === "timeline") {
      updateArea();
    }
    else {
      if(streamId.endsWith("-checkpoint")) {
        updateProgress();
      }
    }
  }

  function updateArea() {
    state.checkpoint = parseInt(event.sequenceNumber);

    updateRoutes();
    updateSchedule();
  }

  //
  // Routes
  //

  function updateRoutes() {
    let types = metadata.routeTypes.slice(0);

    for(let { type, ids } of metadata.routeIds) {
      for(let id of ids) {
        updateMultiInstanceRoute(types[type], id);
      }

      types[type] = null;
    }

    for(let type of types) {
      if(type) {
        updateSingleInstanceRoute(type);
      }
    }
  }

  function updateSingleInstanceRoute(type) {
    linkTo(type + "-routes", event);

    updateInstanceLatest(state.instances, type);
  }

  function updateMultiInstanceRoute(type, id) {
    linkTo(`${type}|${id}-routes`, event);

    updateInstanceLatest(getInstanceIds(type), id);
  }

  function updateInstanceLatest(instances, key) {
    updateInstance(instances, key, instance => instance[0] = state.checkpoint);
  }

  function updateInstance(instances, key, update) {
    let instance = instances[key];

    if(!instance) {
      let latest = null;
      let checkpoint = null;
      let isStopped = false;

      instance = [latest, checkpoint, isStopped];

      instances[key] = instance;
    }

    update(instance);
  }

  function getInstanceIds(type) {
    let ids = state.instances[type];

    if(!ids) {
      ids = {};

      state.instances[type] = ids;
    }

    return ids;
  }

  //
  // Schedule
  //

  function updateSchedule() {
    if(metadata.whenOccurs) {
      appendToSchedule();
    }
    else {
      if(metadata.cause !== null && metadata.fromSchedule) {
        removeFromSchedule();
      }
    }
  }

  function appendToSchedule() {
    linkTo("schedule", event);

    state.schedule[state.checkpoint] = null;
  }

  function removeFromSchedule() {
    let cause = parseInt(metadata.cause);

    delete state.schedule[cause];
  }

  //
  // Progress
  //

  function updateProgress() {
    let flowKey = streamId.substring(0, streamId.length - "-checkpoint".length);

    let separatorIndex = flowKey.indexOf("|");

    if(separatorIndex === -1) {
      updateSingleInstanceProgress(flowKey);
    }
    else {
      let type = flowKey.substring(0, separatorIndex);
      let id = flowKey.substring(separatorIndex + 1);

      updateMultiInstanceProgress(type, id);
    }
  }

  function updateSingleInstanceProgress(type) {
    if(metadata.isDone) {
      delete state.instances[type];
    }
    else {
      updateInstanceProgress(state.instances, type);
    }
  }

  function updateMultiInstanceProgress(type, id) {
    let instanceIds = getInstanceIds(type);

    if(metadata.isDone) {
      delete instanceIds[id];

      if(Object.keys(instanceIds).length === 0) {
        delete state.instances[type];
      }
    }
    else {
      updateInstanceProgress(instanceIds, id);
    }
  }

  function updateInstanceProgress(instances, key) {
    updateInstance(instances, key, instance => {
      instance[1] = metadata.position;
      instance[2] = instance[2] || metadata.errorPosition !== null;
    });
  }
}

//
// Resume state
//

function getResumeState(state) {
  let routes = [];
  let schedule;

  buildRoutes();
  buildSchedule();

  return { checkpoint: state.checkpoint, routes, schedule };

  function buildRoutes() {
    for(let type in state.instances) {
      buildTypeRoutes(type, state.instances[type]);
    }
  }

  function buildSchedule() {
    schedule = Object.keys(state.schedule).map(position => parseInt(position));

    schedule.sort((a, b) => a - b);
  }

  function buildTypeRoutes(type, typeState) {
    if(Array.isArray(typeState)) {
      if(isResumable(typeState)) {
        routes.push(type);
      }
    }
    else {
      let resumableIds = Object.keys(typeState).filter(id => isResumable(typeState[id]));

      if(resumableIds.length > 0) {
        routes.push([type, ...resumableIds]);
      }
    }
  }

  function isResumable([latest, checkpoint, isStopped]) {
    return !isStopped &&
      latest !== null &&
      (checkpoint === null || checkpoint < latest);
  }
}

In regards to Scavenging, I think I’m just struggling to conceptualize it because I’m dealing with two different problems at the same time.

Problem 1: I feel bottlenecked by the performance of JS projections. Possibly the answer here is that I should be doing the resume projection on a .NET client instead of in ESDB. But I was hoping there might be an easier win.

Problem 2: Resetting the resume projection currently requires consuming the history of all timeline events ever produced. I’m already checkpointing… so I think what I should be doing here is scavenging timeline events once they are safely checkpointed. But like I said earlier, I’ve learned event-sourcing backwards, so I admittedly don’t understand scavenging other than being a space-saving mechanism.

From my point of view, scavenging my timeline events means I would lose those events, that context, which is the whole point of storing them in ESDB. I want to be able to trace those events later on. But with the sheer number of events, an exception in my logic can result in needing to re-run the entire projection.

Thanks for sharing @ajohnston.

  1. Conceptually, do you need to replay all the events from the beginning of time? I’m guessing this is projection is used to coordinate some kind of fulfillment process. If so, do old requests matter?

  2. I’d like to understand your process more from a business perspective. What are the steps required to complete this workflow? Some sample events will be great.

  3. (Related to 2) What sort of processing are done in the output streams? i.e. the *-routes streams

1 Like

Stephen, please excuse my slow response. These are awesome questions; let me see if I can pull this all together without infodumping. Hopefully this makes sense to you.

Terms/Mapping:

  • timeline = the domain’s canonical, append-only log of signals. Effectively a global stream of commands and resulting domain events.
  • Topic = decision-making observer; in orthodox ES terms, closest to a policy handler / process manager, and sometimes saga-like
  • Query = projection / read model
  • Flow = an instance of Topic/Query
  • Route = the routing rule that decides which Flow instances should observe an event.
  • {FlowKey}-checkpoint = the checkpoint stream for a Flow instance, storing the serialized flow state plus checkpoint metadata such as position, error position/message, and done status.
  • {FlowKey}-routes = a per-flow linkTo stream which acts like the observer’s inbox

The ESDB JS projection named resume is specifically a coordination projection. It watches timeline plus *-checkpoint, fans events out into *-routes, tracks scheduled work, and outputs something like { checkpoint, routes, schedule }

When I boot up my framework, I kick off a “resume” process for each Flow. I ask the resume projection which flows still have pending work, and then each of those flows resumes from its own {FlowKey}-routes stream after its checkpoint. So the normal steady-state recovery story is actually fine.

The expensive part is only when I have to reset/rebuild the resume projection itself, because that one has to re-read the full global timeline to reconstruct all route streams and pending checkpoints.

To answer your second question directly: here is a representative slice of the business workflow. A recent chain looked like this:

Position  Type
#1914631  UpdateScanRegistry (<-- that is a command included in the event timeline, I believe that is non-orthodox behavior)
#1914632  ScanRegistered
#1914647  NewImageDetected
#1914666  ClientIdentified
#1914667  ScanCompleted

Routing:
Here is the same chain with actual routing behavior.

#1914631 UpdateScanRegistry
This is the external input. In ES terms, this is the command-ish ingress point from outside the bounded context. It contains 18 potential new images.

#1914632 ScanRegistered
UpdateScanRegistry routed to the Topic: SmartScanner, which will emit one ScanRegistered per valid scan in the command.

#1914647 NewImageDetected
This one was for user ajohnston.

This event routed to three observers:

  • ScanActivityQuery|ajohnston (multi instance)
  • ShiftScheduleTopic (single instance)
  • ClientClassifierTopic (single instance)

For this one event, the downstream processing looks like this:

Stream Name: ScanActivityQuery|ajohnston-routes
The route stream contains links like:

-> #1914648 NewImageDetected
-> #1914647 NewImageDetected
-> #1914644 NewImageDetected

So that stream is basically the observer-specific inbox/history for that one projection instance.

Stream Name: ShiftScheduleTopic-routes
This is a process / policy branch, not a read model. It watches the sequence of scans across operators to infer when they take breaks and lunch time. In code, if the gap is large enough, it emits TimeOffTask events which then get routed downstream to TimeOnTaskQuery|ajohnston.

Checkpointing:
The checkpoint stream is the consumer offset + serialized state for a given Flow (e.g. ShiftScheduleTopic-checkpoint, ScanActivityQuery|{instance}-checkpoint).

The lifecycle is:
An event is appended to timeline.
The JS resume projection links that event to every relevant {FlowKey}-routes stream.
A Topic or Query on the client consumes its routed events.
After it successfully processes them, they write a checkpoint event to {FlowKey}-checkpoint (maxCount = 1)

So the checkpoint is produced by the flow host after successful handling, not by the projection itself.

As a simple example:

  • #1914667 NewImageDetected gets linked into:
    • ClientImageList|{ClientKey}-routes

Then that observer runs independently.

For example, ClientImageList:

  • reads the routed NewImageDetected
  • updates its in-memory/query state
  • then writes a checkpoint to ClientImageList|{ClientKey}-checkpoint

Let me know if more information would help here, as I don’t want to fill the page up with too much of my ramblings.

Where I’m at conceptually right now:
Your first question is the same one I’m asking myself: if this projection is only about restart/resume, then old completed work should not matter conceptually. Resetting the resume projection is expensive because it replays my entire global timeline log, which is a “one-time” cost to make all the checkpointing function as described.

The thing holding me back from simply relying on checkpoints and scavenging timeline is that I need to retain the events for at least 6 months for auditing, historical analysis, etc… I have unlimited disk drive space to store them on, so that’s not a problem; it may be that I need to migrate events out to a “cold storage” timeline periodically. I just don’t know what other people normally do in this situation.

1 Like

Hey, good stuff here!

We know that the current projections engine is kind of weird. It’s just too complex for what it is doing today. Its complexity is also a big factor in considering to refactor it.

However, we did a stab on a new engine. The PR is there, all the tests pass, and we are planning to run some measurements how it compares in terms of performance with the old one. We should get some stuff done in the next couple of weeks.

Heya @ajohnston - sorry was busy and took me a while to get back to this.

  • Sounds like you have experience resetting the projection. But in that case, what do you do with the downstream topic and queries? Normally I’d consider those to be invalid and will reset them as well (your downstream consumers of the flow may need special handling) but I’m curious how you did it in the past.
  • How are your flow hosts programmed? Do they catch-up or persistent subscription? More commonly we see checkpoints managed from what you describe as the flow hosts themselves. That can give you more flexibility given it sounds like right now the centralized resume projection may be doing too much.

I’ve been eagerly watching the progress on github and am rooting you on. I had no idea projections v2 was in the works until you mentioned it, and that’s quite exciting. Take your time with it.

No worries @stephen.tung we all have busy lives. :slightly_smiling_face:

  • You’re right, the downstream topic and queries are invalidated and have to be rebuilt when I reset the projection. The projection replays all events from my timeline and links them to {key}-routes for every topic/query that is interested (usually < 4). The flow host reads from {key}-routes and builds a {key}-checkpoint stream for those types; {key}-checkpoint is always limited to a single event representing the current state, so it is overwritten.
  • It’s a catch-up subscription. If I understand what you’re saying correctly, then I think you might have the answer here. When I think about my situation, I’ve got a mix of fast and slow flows. I’m writing checkpoints, but I’m using them only for state, not to decide where to restart the subscription.

The consequence of this is that all my flows are coupled to a single global checkpoint. If a fast flow is at position 55,000 and a slow flow is at 100, my catch-up model must start at position 100 and replay 49,900 events just to be discarded. So I should decouple the subscription model so each flow manages its own catch-up position independently from its checkpoint stream, rather than having a single global checkpoint that’s effectively pinned to the slowest flow in the system. That way a fast flow doesn’t pay the replay cost of a slow flow, and I can stop relying on the resume projection to tell me which flows have pending work, since each flow would simply subscribe from its own last known position and process whatever comes in.

Yeh, it might take some time to implement but I think it can untangle some of your problems.

Alternatively, you can also scavenge the timeline stream but keep say events from the past 6 months (with the $maxAge metadata). I don’t have a clear picture, but it sounds like you have a process to reset the resume project as well as its downstream hosts. The key is you want to make sure the downstream consumers of the queries are consistent, and the consumers of the topics free from duplicated messages. I suspect you have that figured out and even if you only have 6 months worth of events, it should still work.

Btw, for what you call “downstream query”, you probably want to use catch-up subscription and store the checkpoint to whatever persistence store you have. We have an example of this here: Part 2 - Project KurrentDB Events to Postgres | Kurrent Docs

For “downstream topics” we usually use persistent subscription for that. And it has the ability to track checkpoints for you so that you don’t have to manage it yourself:

The new projection engine may help with increased performance, but if it took you days to process this before, it is not going to miraculously make it hours instead.

Secondary Index probably won’t help here since you are not really using system projections like $ce or $et.

1 Like

Thanks for all the help. You’ve gone above and beyond and I appreciate that. You’ve given me a couple of avenues that seem viable. I’ll go pursue them and report back if I find success, so that I can share what I learn.

1 Like

I wanted to share a quick stopgap measure I’m using in case it helps anyone else.

If your projection engine is your bottleneck, you can grab the diagnostics at /projection/<NAME_OF_PROJECTION>/statistics

I don’t append new events unless the projection’s progress is 100 and the bufferedEvents is 0. It’s not the ideal solution, but it will buy me enough time to figure one out. I haven’t had a crash in over 10mil events after deploying this temporary fix. I could probably run this way for 6 months no problem, but plan to resolve it sooner.

1 Like