Get newest document per type (grouped by an id field)

Hi all, I’m trying to figure out a FQL query that is the most (cost)efficient in terms of reads and writes for a collection of logs. I have a collection of distinct processes (up till a 1000 max probably) with a ‘process_id’ that all produce a high amount of logs (thousands). My plan was to write these logs as separate documents in another collection called process_logs.

What I can’t really figure out though is how to most efficiently build a query that returns all processes with the latest log, so I can list them ordered by the time they were last logged/updated. Using Event Streaming to keep this list up to date, while logs are being added.

processes data example

{
    name: "Example process",
    process_id: "proc_123456",
    last_activated: [[timestamp]]
}

process_logs data example

{
    process_id: "proc_123456",
    event_type: "location_change",
    value: 19,
    ...
}

I’m able to retrieve all logs per process with an Index obviously, but I’m looking for a solution to get all proceses with their latest log attached to them without racking up a lot of reads. I’ve looked at Join for this, but wasn’t getting the results I was looking for.

Hopefully somebody can help me looking in the right direction. Thank you!

Hi fhijlkema! Welcome to the forums.

There are a couple of different issues to solve in your outlined problem. Since Fauna requires you to be quite deliberate about setting up documents in collections, and then indexes for those collections, it is best if can solve each portion of the problem separately, and then figure out how to combine the solutions. Perhaps that means a single query, or perhaps multiple queries might be required.

Fortunately, your outlined problem can be done in a single query. But let’s outline the individual issues first, and figure out how to address those.

First, you want the list of all process documents. That’s pretty straightforward:

Map(
  Paginate(Documents(Collection("processes"))),
  Lambda("pref", Get(Var("pref")))
)

That query uses the Map function to iterate over the paginated list of documents in the processes collection, and for each entry the entire document is read.

To reduce the number of read operations, if you could be selective about which fields need to be returned, you could create an index that returns those fields without having to read the entire document. Even for really simple documents, using an index can be beneficial because read operations are measured for distinct reads up to 4k in size. Each document requires a distinct read, but reading from an index costs 1 read op per 4k read from the index, and lots of process documents with your example definition could fit into 4k.

Here’s an index that returns all of a processes document’s fields:

CreateIndex({
  name: "all_processes",
  source: Collection("processes"),
  values: [
    { field: ["data", "name"] },
    { field: ["data", "process_id"] },
    { field: ["data", "last_activated"] }
  ]
})

And only minor adjustments need to be made to the first query to use this index:

Paginate(Match(Index("all_processes")), { size: 5000 }),

We no longer need the Map, since the index returns the fields we need, so we no longer need to fetch the documents. I added the size parameter to guarantee that a few thousand processes documents would all be returned at once. Without that, the default page size is 64, so we’d otherwise have to run multiple queries to get all of the processes documents.

Second, we need to return the latest process_logs document for a specific process. To do that, we need an index that sorts the process_logs documents by time. Your example didn’t include a timestamp, so let’s assume that they have one in the when field:

> Create(
  Collection("process_logs"),
  {
    data: {
      process_id: "proc_123456",
      event_type: "location_change",
      value: 19,
      when: Now()
    }
  }
)

With that document structure, the index we need would look like:

CreateIndex({
  name: "logs_by_pid",
  source: Collection("process_logs"),
  terms: [
    { field: ["data", "process_id"] }
  ],
  values: [
    { field: ["data", "when"], reverse: true },
    { field: ["data", "event_type"] },
    { field: ["data", "value"] },
  ]
})

I left the process_id value out of the values definition, since we use it to search for appropriate logs; we know which process the log belongs to already.

To fetch logs by process_id, we can use this query:

Paginate(Match(Index("logs_by_pid"), "proc_123456"))

With some made-up data, the result is:

{
  data: [
    [ Time("2022-05-16T14:15:45.800868Z"), 'color_change', 3 ],
    [ Time("2022-05-16T14:15:04.920746Z"), 'location_change', 19 ]
  ]
}

Notice how the results are sorted so that the most recent document comes first. Since we only want the first document, we can reduce the page size to 1 item:

> Paginate(Match(Index("logs_by_pid"), "proc_123456"), { size: 1 })
{
  after: [
    Time("2022-05-16T14:15:04.920746Z"),
    'location_change',
    19,
    Ref(Collection("process_logs"), "331823610545046050")
  ],
  data: [ [ Time("2022-05-16T14:15:45.800868Z"), 'color_change', 3 ] ]
}

Notice that the result includes an after cursor. That would help us if we needed to fetch subsequent pages of results. We really just want the log’s fields themselves, so we can use Select to extract those:

> Select(
  ["data", 0],
  Paginate(
    Match(Index("logs_by_pid"), "proc_123456"),
    { size: 1 }
  )
)
[ Time("2022-05-16T14:15:45.800868Z"), 'color_change', 3 ]

That selects the 0th (first) item in the result page’s data field.

Now that both issues have been addressed, how would we combine them? Since the latest log entry needs to be returned for each process, it would be easiest to compose an object that includes the fields from the process with the log entry. Since we need to do this for every process, we’ll need to use Map once again. To fetch the appropriate process_logs documents, we can use Let to allow us to save the results for a sub-expression so that we can perform the object composition.

> Map(
  Paginate(Match(Index("all_processes")), { size: 5000 }),
  Lambda(
    ["name", "pid", "last_activated"],
    Let(
      {
        most_recent_log: Select(
          ["data", 0],
          Paginate(
            Match(Index("logs_by_pid"), "proc_123456"),
            { size: 1 }
          )
        )
      },
      {
        name: Var("name"),
        process_id: Var("pid"),
        last_activated: Var("last_activated"),
        most_recent_log: Var("most_recent_log")
      }
    )
  )
)
{
  data: [
    {
      name: 'Example process 1',
      process_id: 'proc_123456',
      last_activated: Time("2022-05-16T14:13:51.950553Z"),
      most_recent_log: [ Time("2022-05-16T14:15:45.800868Z"), 'color_change', 3 ]
    },
    {
      name: 'Example process 2',
      process_id: 'proc_123457',
      last_activated: Time("2022-05-16T14:14:14.857535Z"),
      most_recent_log: [ Time("2022-05-16T14:15:45.800868Z"), 'color_change', 3 ]
    }
  ]
}

Success, in a single query!

2 Likes

Hi Ewan,

Thanks for your elaborate answer to my question, this is indeed similar to how I would usually approach this in Fauna. In this case I was hoping to not have to map/cycle through potentially hundreds of index queries, in order to minimise read operations.

I have for example gotten similar results following your reduce solution from Distinct with custom field - #3 by aprilmintacpineda using just two index queries, but it is definitely not as elegant. And it seems to come at a cost of computing operations, where you gain in read operations.

It would have been great if functions like Distinct or Join would be a little bit more powerful for cases like this, with the risk of oversimplifying things here :-).

Your answer is probably the best answer, so thanks again for your detailed help.

Kind regards,
Fabian

1 Like

This topic was automatically closed 2 days after the last reply. New replies are no longer allowed.