Aggregating analytics on documents

So for my application I have a collection of surveys completed by users. The ultimate goal would be to gather statistics on all the surveys that are submitted. i.e. how many surveys are submitted in total, how many people answered x on a given question, and how many people answered x on a given question that answered y on another question, etc.

I thought of two different approaches. Approach 1 would be to index all the surveys, and retrieve them all and generate the statistics by iterating through the list of surveys that come back as a response. While this would allow all my stats to be accurate, I believe this idea would take longer the more surveys are returned as the application scales. If I get more than 100k surveys, no more accurate stats as the limit will be hit. And at that point, it would also be very computationally expensive to calculate any statistics.

For the second approach, I could create a new collection called survey_statistics with a single document in it. And every time a survey would be created, I could increase the count of an integer value on that document. I could have an key such as totalSurveys that has a value of 0 and increment it every time the survey enters the database. This way, whenever I need to retrieve the statistics document, it would be fairly quick as it is only retrieving one document with the calculations completed. Intuitively, this method seems like the way to go. However, I was concerned with the idea of there ever coming a time where the statistics pulled from this document would not be accurate. If there was ever a point in time where a survey was stored successfully but the function to update the statistics document failed, the document would not have correct information when it is retrieved. Is there any way to ensure that both operations complete? I thought about incorporating the Do() function so that everything occurs sequentially (surveys are stored first, then stats are stored second). However I do not know if Do() ensures both steps of the process will complete.

Any thoughts? Is there a more efficient way anyone else has come up with if I am not on the right track?

duplicate data

The second approach has some nice pros, but some cons to watch out for.

Fauna has transactional guarantees, so you can create a new survey document and update the statistics document in the same transaction. If either operation fails, then the whole thing would fail, and your application would need to retry.

The concern with this approach is the potential for contention. If multiple requests attempt to write to the same Document then Fauna will retry whichever transactions failed up to 5 times before responding with a 409 contention error. You are considering 100k+ responses that all want to write to a single document, so you will want to be sure that all of those responses are spread out.

I’ve also seen it recommended that you could split the aggregation across several documents, which could spread out the load on any individual one, but significantly reduce the effort to batch them together later.

full table scan computations

You’re not kidding. You can consider fetching all of the data and aggregating it on the client/server-side where the computation itself will be much cheaper. But I’m sure there’s a break-even point somewhere with the network traffic.

You can use the cursors sent back with the page to finish the aggregation, either computing client side or with FQL. As long the index is sorted such that newer documents come later in the index, paging will be perfectly accurate. Paging through a few 100k documents, even in default batches of 64, should only take on the order of seconds or a few tens of seconds, which should be reasonable for a short-lived batch process. If such a process is run periodically, you can persist the last cursor used and start from there the next time it is run, so you don’t have to do a full scan everytime – just the first time.

I’ve also seen it recommended that you could split the aggregation across several documents, which could spread out the load on any individual one, but significantly reduce the effort to batch them together later.

So this gave me an idea. Instead of multiple writes to one document, what about creating a new document in the stats collection every time a survey is submitted with the most recent stats. Then I can create an index that sorts the collection by most recent date and just do pull of the most recent document. That way I’m still only pulling one document back in the request, and I don’t have to worry about contention. The only cost would be the capacity cost of storing all of the unnecessary documents in the collection from old statistics.

I might have oversimplified how contention errors can come up.

A read-write query on a serialized index, regardless of which documents it is reading and which ones it is writing, will be Strictly Serialized. In fact, in this case you need that serialization guarantee to always be sure that every request always gets the latest stat record. Since the requests also write to the index, they change which document is the latest. Every request looking for that latest document has to retry until it is finally its turn, but if it retries too many times, then a contention error will happen.

Here is an example pseudo query and what that might look like in practice

Let(
  {
    surveyRef: Ref(Collection('surveys'), '12345678'),
    
    // The query is writing, so the query will enter the transaction pipeline.
    // The new document created is "locked" by this transaction.
    surveyResult: Create(Collection('survey_results'), {
      data: {
        survey: Var('surveyRef'),
        /* ... survey result data */
      },
    }),

    // Index('latest_stats') is added to the transaction log.
    // Any writes to the index from earlier transactions will require this transaction to restart.
    lastSurveyStatSet: Match(Index('latest_stats_by_survey'), Var('surveyRef')),

    // Creating 'survey_stats' Documents means that we are both 
    // reading AND writing to Index('latest_stats_by_survey').
    // Changes to the index here mean changes to the resulting Set of the 
    // previous Match function.
    result: If(
      Exists(Var('lastSurveyStatSet')),
      Create(Collection('survey_stats'), { /* ... aggregate stats */ })
      Create(Collection('survey_stats'), { /* ... create first stats doc */ })
    ),
  },
  Var('result')
)

Example going step by step:

  1. 10 requests to create survey_results come in at roughly the same time.
  2. They all eagerly start running their transaction
  3. Each proceeds with creating a survey_results Doc
  4. Each proceeds to read the latest_stats_by_survey Index
  5. The earliest transaction creates a survey_stats Doc, which also writes to the latest_stats_by_survey Index. The 9 later transactions no longer have the latest survey_stats document and need to start over.
  6. The first transaction completes. 9 requests to create survey_results are pending.
  7. They all eagerly start running their transaction
  8. … and the process repeats until the transactions succeed or retry 5 times.

What I mean here is more like creating a pool of stat documents. This could be very similar to the pseudo code above. But you could use a hash of the user ID or something so that only 1/100 (or some number you’d have to figure out) requests read/write the same stats Doc. The important thing is to do a more exact match on the stats Doc, so that it is less likely that the matching Set from the index will be controlled by any other transactions.

Here is an example of splitting writes into a pool of stat docs. Note that it still relies on multiple requests editing the same documents. But the contention will now only happen when users with the same hash value submit surveys.

For comparison, with the code in the previous post, I hit contention issues trying to run it ~20 times asynchronously, as near simultaneously as I could.

The following technique, though, I was able to run 500 times asynchronously, with 100 “pooled” stat documents. The downside is that you still have to batch them together eventually. But you can optimize this with a pool size of about 1/5th of what you expect to be simultaneous uploads.

Let(
  {
    surveyRef,

    // some hash function on the user
    userRef: CurrentIdentity(),
    userId: Select('id', Var('userRef')),
    hash: Modulo(ToInteger(Var('userId')), 100),

    surveyStatsSet: Match(
      Index('survey_stats_by_survey_and_hash'),
      Var('surveyRef'),
      Var('hash')
    ),
  },
  Do(
    // create the survey results
    Create(Collection('survey_results'), {
      data: {
        survey: Var('surveyRef'),
        /* ... survey result data */
      },
    }),
    // Update an existing stats doc, or create a new one
    If(
      Exists(Var('surveyStatsSet')),
      Let(
        {
          surveyStats: Get(Var('surveyStatsSet')),
          surveyStatsRef: Select('ref', Var('surveyStats')),
        },
        Update(Var('surveyStatsRef'), { /* ... aggregate stats for this survey and hash */ })
      ),
      Create(Collection('survey_stats'), /* ... create first stats doc with this survey and hash */ )
    )
  )
)
)

@kdilla301 Did you get a chance to consider this more? It’s more of an academic exercise for me, but I know that such an aggregation pattern is a real need for many people. I am very interested in understanding how folks ultimately work through it. I know I’m not the only one interested.

Cheers!

Sorry I got caught up with other work. Now I’m back to this and of course I have to figure out how to implement this now. I will post with questions soon.

Okay so I about half way understand what is going on in the second query. But I am having trouble with grasping some things.

  1. While I understand FQL Fairly well, I find creating indexes intuitively to be pretty difficult. Can you walk me through what an example of what the survey_stats_by_survey_and_hash index would look like? Im having trouble understanding how the index would know of the incoming hash from a new user if the previously stored hash is unique to the old user.

  2. How would you handle this if there were no users and instead all the surveys were submitted anonymously? Im assuming this would only change the hash section but what would that math look like in Fauna since we don’t have a unique number to start with? I would imagine this would also complicate matching on the hash value because we want a different hash every time the query is run correct?

I like this idea. But dealing with anonymity is throwing me a curveball.

  1. It took me a while to understand that last query you posted. (I like it a lot and will probably use it as the template) However, I wanted to clarify something. From what was written, it looks like I am only updating one statistics document over and over again. If this is the case, how is it located by the most recent hash if the most recent hash will always be a different and unique value? Shouldnt the query not be able to locate the current stats document because the old hash doesn’t match the new hash that is created by the next user that triggers the query?

Welcome back @kdilla301!

I’ve learned a lot too in the last couple weeks. Let me try to answer your questions and then bring up another idea that has input from others. Really, it’s going back to your first suggestion with a few tweaks.

Keep this in mind: our goal is to reduce conflicts by (to the extent possible) letting read queries be read queries, write queries be write queries, and limit read+writes on the same Sets of Documents.

More about the stats pools

The hash idea was to route requests roughly evenly across different pools. Anything that you can calculate that over the long run will distribute evenly.

Not every hash is meant to be unique. We are aiming for a range of hash values where the size of the range is the number of stats documents your are aiming for.

This idea relies heavily on probability to avoid contention errors, and so there will still be edge cases where it will happen. In particular, if you your app grows faster than you can catch and increase the pool size, then you could be in trouble.

The index I used is nothing fancy:

Get(Index("survey_stats_by_survey_and_hash"))

{
  ref: Index("survey_stats_by_survey_and_hash"),
  ts: 1630683739690000,
  active: true,
  serialized: true,
  name: "survey_stats_by_survey_and_hash",
  unique: false,
  source: Collection("survey_stats"),
  terms: [
    {
      field: ["data", "survey"]
    },
    {
      field: ["data", "hash"]
    }
  ],
  partitions: 1
}

In the testing that I did, I calculated the hash from the user id and stored that in the survey_stats Document.

Let(
  {    
    userRef: CurrentIdentity(),
    userId: Select('id', Var('userRef')),
    hash: Modulo(ToInteger(Var('userId')), 2),
    /* ... */

    Create(Collection('survey_stats'), {
      data: {
        survey: Var('surveyRef'),
        hash: Var('hash'),
        /* ... */
      },
    })

    /* ... */

Unfortunately, new ID’s and timestamps don’t resolve until the end of the transaction, so you cannot Select(['ref', 'id'], ...) or use Select('ts', ...) as a number from documents created in the same transaction. E.g.

"description": "Number expected, Unresolved Transaction Time provided."

Without a token identity, you’ll have to calculate on something else.

trying hashes with timestamps

I just tried some things using ToMicros(Now()) as the seed for a hash value, but the driver can run quickly enough that Now() is actually the same for some transactions. I got the performance back by introducing a small random integer (like 1-12) meant to represent variation in user responses.

My conclusion is that using Now as a way to differentiate transactions is not a great idea on its own, but could maybe be combined with some other variables to increase the randomness.

This still requires some batch aggregation later

At some point you need to read to the total value of the aggregate. So far, we’ve not talked about that at all. Consider the following scenarios:

  1. The size of the stat pool is manageable – In such a case, the techniques we’ve talked about here may be a decent fit. You can let the survey_stats documents live on forever and when you need run the aggregation you run a fresh calculation on the survey_stats docs.
  2. The overall aggregation is too intense to run every time it is needed somewhere – it may be better to save a snapshot of the aggregation and update the snapshot over time. In this case, you may need to use something different or combine these ideas with something else.

Aggregation snapshots

I am working on an example of the snapshot aggregation idea with others, and will share more details as soon as I can. The basic idea, though, is to designate one or a small number of jobs/processes to be the only writers of aggregate data. Any clients that read the aggregate data never write which allows scalable reads and completely eliminates the chance of contention errors.