Back to Blog

The RAMifications of long-living processes

Omer Banay

At Epsio Labs, we develop an incremental SQL engine — in brief, an SQL engine that operates on changes instead of reprocessing the whole dataset each time a query runs. I’ll leave explaining why that’s useful to another blog post of ours — Why We Built a Streaming SQL Engine.

In the early days of Epsio, one of our first ways to stress test our engine was to run the TPC-DS benchmark queries (an industry standard benchmark) with a large data scale (100 TB of records).

We ran one of our newly born tests, and after several minutes, our test machine completely froze for 10 minutes!
Afterwards, we noticed our engine process got killed (some would say brutally murdered). Looking at our system metrics and logs, we saw the system memory was at 99%, which was much higher than expected.

Memory Utilization on 16 GB machine, Between 19:06 and 19:17 logs were not sent (thus the awfully straight line)

In this blog post, we will navigate the uncertainty of memory utilization, unveil the mystery of our process’s memory bloat, and discuss how we tackled it.

What happens when the machine is out of memory?

Data and memory-intensive programs such as ours often reach high memory usages. If the workload is large enough and no memory regulation is in place, it might cause your system to reach close to 100% memory utilization.

In order to keep our machines running even in such scenarios, Linux gives us two configurable tools:

  1. Swap memory — A file or partition on disk that’s used as a supplement to physical memory (RAM). When more RAM is needed than what’s available, inactive pages in memory are moved to the swap space in order to free memory space. We chose to disable swap in our test machine because we consider extreme memory utilization a bug; we wouldn’t want to rely on swap to save us due to its performance impact. Also, Epsio might be deployed on a machine that has swap disabled.
  2. OOM Killer — The Out of Memory Killer is a process ran by the Linux kernel when the system memory is extremely low. It reviews all the running processes and assigns a score to each, based on memory usage and other parameters. It then chooses one or more of them to be killed. [1] [2]

The culprit of our process’s assassination is of course the OOM Killer, who chose to kill the engine process because of its high memory usage.
To ensure that our accusation is true, we can look at the system logs with journalctl (a utility to query various services’ logs):

(You can ignore the nasty command)

journalctl --list-boots | awk '{ print $1 }' | \
xargs -I{} journalctl --utc --no-pager -b {} -kqg 'killed process's -o verbose --output-fields=MESSAGE

Tue 2023-02-02 16:21:52.348846 UTC [s=694483b10fab40da9c36d06a1f288df8;i=acc;b=8ee2faf0530244958351de3c743bd18a;m=d991edcc;t=605b8a8153aae;x=affb06d11f0113d9]
    MESSAGE=Out of memory: Killed process 9049 (epsio_engine) total-vm:16253052kB, anon-rss:14777084kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:30128kB oom_score_adj:0

Let’s focus on the output:

Out of memory: Killed process 9049 (epsio_engine) ... anon-rss:14777084kB

Our process got killed by the OOM Killer while having 14.7 GB allocated to it in physical memory (see “anon-rss” value in the output).

So OOM Killer was spawned, which means our system was out of memory. When you get that close to the sun, bad things start to happen — my machine was unresponsive via ssh, application logs were not sent, and everything seemed to be frozen. It took 10 minutes until my machine was responsive again, which is weird. You’d think the OOM Killer would kill a process in a millisecond.

Apparently, this is a known phenomenon in Linux that is sometimes called “OOM hangs/livelocks”. The reason for this phenomenon is a bit complicated, but to be concise, when memory is requested and there’s not enough available memory, the system first tries to reclaim memory pages that are backed by a file on the hard disk (memory-mapped files).
If enough memory is reclaimed, the OOM killer is never spawned. This can go on and on, causing the system to be slow and unresponsive.
Interestingly enough, SSDs’ fast performance makes this phenomenon even worse. [3]

We definitely never want to reach an unresponsive state because of OOM hangs. We can avoid this by killing our process before the system memory is at full capacity (see earlyoom). For the simple task of making our stress test machine want to talk to us, we chose to add a CGroup in order to limit the process’s memory (the process will be killed when the limit is exceeded).

This mitigation makes our machine happy; however, the engine process will still be abruptly killed because of memory bloat, which takes us to the root cause of the problem: Why did Epsio need such an extreme amount of memory in that test?

Processes fighting for memory on my ubuntu machine, circa not so long ago

Epsio View And Its Big Pipeline

The failed test creates an Epsio view for a TPC-DS query. An Epsio view is an incremental view; it shows up-to-date results of the query. When an Epsio view is created, a new Epsio engine process is spawned. The engine first calculates the current results of the query — we call this stage population, and its purpose is similar to other SQL engines.
After population, the engine begins to consume row changes (e.g. inserts, deletes, updates) and update the results accordingly — we call this stage CDC Streaming (Change Data Capture).

The specific test that failed created an Epsio view with a query similar to the one below (this is a simplified version):

select  i_item_id,
        avg(cs_quantity) agg1
 from catalog_sales
 join item on cs_item_sk = i_item_sk
 join promotion on cs_promo_sk = p_promo_sk
 where (promotion.p_channel_email = 'N' or promotion.p_channel_event  = 'N')
 group by i_item_id

Our SQL engine constructs a data pipeline for the query. Each node in the pipeline corresponds to an SQL operator. Here is an illustration:

Each root node (catalog_sales table, item table, promotion table) is an input node that yields a batch of rows either from the population or CDC stages. If we processed one batch of rows at a time and waited for it to reach completion, we would waste idle CPU cores and increase the latency of the next batch. That’s why we want to stream multiple batches of rows in parallel.

Now, try to think of all the ways memory might bloat when we stream these batches of thousands of records:

  • We might stream too many batches too fast, which is bad because memory will become scarce.
  • Each join node might output more rows than the sum of its inputs. For example, if the first join key in our query is cs_item_sk = i_item_sk, for a batch of 10,000 rows from the left side of the join that have cs_item_sk = 1 and another 10,000 rows from the right side of the join that have i_item_sk =1 , the results would be the cross product of the rows (100,000,000 rows!).
  • A bottleneck node might cause a “traffic jam” in the pipeline, causing memory to be occupied for more time. This might not seem like a problem, but remember that multiple Epsio views can run simultaneously, and if all of them have lots of batches in memory waiting for the bottleneck to finish, system memory will become scarce.

Those disasters were already foretold, to tackle these situations we:

  • Limited the amount of parallel batches.
  • Because a node like Join might output larger batches than its input, we introduced a stalling mechanism that will wait for more memory to be available before streaming batches between nodes.
  • Added a spill-to-disk mechanism that can choose to move stalled batches from memory to disk.
  • Introduced more optimizations to our SQL query planner in order to have a more efficient pipeline. For example, because our query’s filter node has predicates (filters) relevant to the promotion table’s columns, we can move it to be before the join node. That way, we avoid unnecessary joins between rows that would be filtered out later anyway, resulting in less memory usage and better performance. This optimization is known as “Predicate Pushdown” because it pushes the predicates (filters) closer to the data source (table input).
Predicate pushdown example

Some of these mitigations are good examples of memory regulation. However, even with the mitigations in place, the test caused our process memory to bloat. The process memory usage was much higher than expected and exceeded our memory regulation limits by far. At this point, we decided to go more in depth.

Malloc And Its Shenanigans

To understand our process memory usage, we have to understand what happens when our process allocates dynamic memory.

In Rust, dynamic memory allocations use the global memory allocator, which in our case is the malloc allocator implemented in glibc 2.35.
So under the hood, our process basically uses malloc and free as any normal C program does.

Malloc Being Greedy

The allocator asks the OS for memory by using a syscall, but it strives to reduce the number of calls for performance reasons. Hence, when free is called, it doesn’t necessarily return the freed memory chunk to the OS, but instead holds several data structs for the bookkeeping of free chunks. On the next malloc call, the allocator looks for a suitable free chunk to return. If found, it will return a pointer to the chunk and mark it as used. Otherwise, it will issue another syscall for memory.

From the OS’ perspective, the user process still occupies the memory in the free chunks.

Memory Profiling

Interesting stuff! Now that we understand a bit more about malloc, let’s profile our process memory usage to see how many free chunks are there. We can use the libc mallinfo2 function (__libc_mallinfo2), which returns a struct containing information about memory allocations.
From the mallinfo man page:

The structure fields contain the following information:
..
uordblks
The total number of bytes used by in-use allocations.
fordblks
The total number of bytes in free blocks.

Let’s create a task that will log mallinfo results every 10 seconds:

use libc::mallinfo2;
use tokio::time;
use epsio_tracing::info;

pub fn start_mallinfo_monitor() -> JoinHandle<()> {
    tokio::spawn(async move {
        let mut interval = time::interval(Duration::from_millis(10000));
        interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
        loop {
            interval.tick().await;
            unsafe {
                let mallinfo2_struct = mallinfo2();
                info!(
                  message = "mallinfo2",
                  mallinfo2 = mallinfo2_struct
              );
            }
        }
    })
}

When we put uordblks and fordblks on a graph, we get:

So we are actually not even using most of the occupied memory; ~11GB out of 15GB are actually free chunks.

To shed more light on the matter, I used the malloc_info libc function, which prints a huge nasty XML with much more details:

more arenas
...
</heap>
<!--Arena number 60-->
<heap nr="60">
<sizes>
  <size from="33" to="48" total="96" count="2"/>
  <size from="113" to="128" total="128" count="1"/>
  <size from="49" to="49" total="79625" count="1625"/>
  ...
  <size from="224705" to="224705" total="224705" count="1"/>
  <size from="300689" to="524241" total="37369240" count="88"/>
  <!--There are 235 free chunks whose size ranges between 524353 to 67108785
   bytes, and occupy 804350523 bytes in total. -->
  <size from="524353" to="67108785" total="804350523" count="235"/>
  <unsorted from="65" to="65" total="65" count="1"/>
  <!-- The "top" chunk is not included here -->
</sizes>
<total type="fast" count="3" size="224"/>
<!--Total count of free chunks and total size 852MB-->
<total type="rest" count="8250" size="852802425"/>
<system type="current" size="1173217280"/>
<system type="max" size="1220870144"/>
<aspace type="total" size="1173217280"/>
<aspace type="mprotect" size="1173217280"/>
<aspace type="subheaps" size="18"/>
</heap>
<heap>
<heap nr="61">
...
More arenas

It turns out our process has more than 60 arenas (corresponding to the XML’s heap element). But what the hell is an arena? to explain that we need to understand how malloc deals with multiple threads.

Malloc And Multi-Threading

In order to efficiently and safely handle multi-threaded applications, glibc’s malloc needs to tackle the situation where two threads might access the same memory region at the same time (which would cause parallelization bugs). Therefore, the allocator segments the memory into different regions. These regions of memory are called “arenas”.

Each arena structure has a mutex in it, which is used to control access to that arena. This mutex assures that two different threads won’t access the same memory at the same time. Contention for this mutex is the reason why multiple arenas are created.

The number of arenas is capped at eight times the number of CPU cores by default.

Memory Fragmentation

It’s important to note that each arena has a heap (sometimes multiples) — a contiguous memory region holding the memory chunks (used or free). An heap can be grown or shrunk using the brk and sbrk syscalls.

As our process continues to stream row batches through the data pipeline, many small mallocs and frees are called, and heaps might get fragmented — gaps of free chunks between used chunks start to form. adjacent free chunks are combined into bigger free chunks. But a heap can shrink only by its “top” chunk — the free chunk at the current end of the heap. Free chunks trapped between allocated chunks might be used for the next allocation but will not be returned to the OS. We lovingly refer to this situation as memory fragmentation.

With multiple arenas and heaps, memory fragmentation is even more prevalent because each heap gets fragmented on its own. There are fewer chances to coalesce free chunks and to find a suitable free chunk for an allocation.

To illustrate how costly memory fragmentation can be, I wrote a small C program. The program allocates 1,000,0000 buffers and then frees all of them except the last one:

#include 
#include 
#include 
#include 

#define NUM_BUFFERS 1000000
#define BUFFER_SIZE 1024  // 1KB
// 1000000 buffers * 1KB = ~1GB

void* allocate_buffers_then_free() {
    // Allocate an array of NUM_BUFFERS buffers
    char* buffers[NUM_BUFFERS];
    int i;

    for (i = 0; i < NUM_BUFFERS; i++) {
        buffers[i] = (char*)malloc(BUFFER_SIZE);

        // Check if malloc was successful
        if (buffers[i] == NULL) {
            fprintf(stderr, "Failed to allocate memory for buffer %d\n", i);
            exit(1);
        }
        // Write to buffer in order to assure it's mapped to physical memory
        // This is done because overcommit is enabled see https://www.linuxembedded.fr/2020/01/overcommit-memory-in-linux
        memset(buffers[i], 0, BUFFER_SIZE);
    }
    printf("Allocated buffers\n");

    sleep(10);

    // Freeing all the buffers except the last one
    for (i = 0; i < NUM_BUFFERS - 1; i++) {
         free(buffers[i]);
    }
    printf("Finished freeing buffers\n");
}

int main() {
    setbuf(stdout, NULL);
    allocate_buffers_then_free();

    // Sleep until being killed
    while (1) {
        sleep(10);
    }
    return 0;
}

I compiled ([.code]gcc -o memtest memtest.c[.code]), ran the program, and watched my machine’s memory with [.code]watch free[.code]. Here is the output I got:

// free showed our system had 13GB of free memory
Allocated buffers
// system had 12GB of free memory (1GB was allocated for out process)
// 10 seconds of sleep...
Finished freeing buffers
// system memory still had only 12GB of free memory
// Meaning the freed buffer chunks were not freed back to the OS
^C⏎

What’s even more interesting to see is that if we change the program to free all the buffers except the first one, the memory will be freed back to the OS. Another adjustment that achieves the same outcome is increasing the buffer size to 1MB and reducing the number of buffers to 1000. Because over a certain allocation size (M_MMAP_THRESHOLD), malloc will use mmap (a syscall that can allocate memory outside of the heap), and mmaped buffers are easily freed to the OS with the munmap syscall.

I invite you to try it on your machine (be careful not to trigger an OOM).

Back to our process’s malloc_info XML… Our process had 64 arenas (the maximum number for an 8 core machine), and each one had between 100 and 1000 MB of free chunks. It strongly implies that malloc being greedy + memory fragmentation amplified by our use of multi-threading, are the reasons we have so much memory wasted on free chunks.

Switching Memory Allocator

Memory fragmentation is a hard problem that most memory-intensive programs face. To solve it, we could have tuned malloc with configurations or changed our application’s allocation patterns (which is a great topic for another blog post).
Even so, like many other memory-desperate developers, we started seeking a more adequate allocator.

In an academic research article called “Experimental Study of Memory Allocation for High-Performance Query Processing”, we read about other memory allocators and their pros and cons. The Jemalloc allocator was the star of the show. It was noted that one of its strong suits is memory fairness, which was defined as “returning freed memory back to the OS”.

One of the ways Jemalloc achieves memory fairness is by having a time-based mechanism that releases free chunks back to the OS with the munmap sycall (it strongly relies on mmap for its allocations).

We switched our global allocator to Jemalloc, which is very easy in Rust:

use tikv_jemallocator::Jemalloc;

#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

We ran the stress test, and surpassingly, it passed successfully. There was no notable performance difference in any of our stress tests.

We used Jemalloc stats (from the great tikv_jemalloc_ctl crate) in order to monitor memory fragmentation:

resident — Total number of bytes in physically resident data pages mapped by the allocator.
allocated — Total number of bytes allocated by the application. (in-use allocations)

resident minus allocated should be approximately equal the free memory used by our process. So free chunk memory consumption is low, which is great! There’s a small problem with the graph — It keeps growing steadily. This can be explained by the fact that we haven’t reached the limit where we start stalling our data stream (12 GB).

Ultimately, the test passed, and the system memory seemed to be lower by more than half compared to the same test with glibc.

So we just made our stress test pass and the system will succeed in processing one incremental view with billions of records! But will it pass if a thousand incremental views are created?

Multi-Process Architecture Memory Implications

As you probably guessed, we had a stress test that spawned a thousand incremental views, and it crashed miserably because of high memory utilization.

At the time, we were using a multi-process architecture; each incremental view had a dedicated engine process that maintained it — received row changes (CDC), and ran them through the data pipeline.

Unfortunately, the existence of 1,000 engine processes presented a new hurdle for us. While each process had its own memory regulator limiting its usage to 4GB (for example), there was no certainty that other processes wouldn’t also utilize 4GB, potentially resulting in an OOM.

Moving To Single-Process Architecture

It became more and more evident that an architecture change was needed.

A process per incremental view provides isolation between them; however, a single-process architecture holds many benefits for us in terms of performance and capabilities:

  • Memory fragmentation is lower.
  • Memory utilization does not depend on the number of incremental views.
  • Memory regulation in our data pipeline is easier. In a multi-process architecture, different processes are not aware of other processes resource consumption and needs.
  • Only one RocksDB embedding.
  • Overall decrease in memory overhead.
  • Communication between incremental views and with their coordinator is easy and fast.

For these reasons, we decided to move to a single-process architecture where all the incremental views are just asyncio tasks maintained by one process. Now, instead of depending on the number of incremental views, our memory utilization depends on the total number of sql operations.

Is Single-Process Architecture Better For SQL Engines?

It’s important to note that single-process architecture isn’t necessarily better than multi-process, as with everything in life, there are tradeoffs.
Just to name a few:

  • It may be harder to implement. e.g. aborting a query (or incremental view in our case) is harder. In multi-process architecture, it is as easy as killing a process. This is less critical for us since incremental views are seldom aborted.
  • Different query executions share the same address space, so a memory bug in one query might affect other queries. Fortunately, this is of lesser concern for us, thanks to Rust.

PostgreSQL for example, uses a multi-process architecture for its worker processes (the processes in charge of executing queries).
In the Let’s make PostgreSQL multi-threaded thread, there’s an excellent comment that portrays the negative impact of single-process architecture:

OOM and local backend memory consumption seems to be one of the main
challenges for multi-threadig model:
right now some queries can cause high consumption of memory. work_mem is
just a hint and real memory consumption can be much higher.
Even if it doesn’t cause OOM, still not all of the allocated memory is
returned to OS after query completion and increase memory fragmentation.
Right now restart of single backend suffering from memory fragmentation
eliminates this problem. But it will be impossible for multi-threaded Postgres.


Basically, when a query’s execution is complete, Postgres can choose to kill the process if it detects that it has irregular physical memory usage or memory fragmentation. That way, Postgres eliminates both of these problems.
Having said that, while a query’s execution time is usually between milliseconds and hours, an incremental view’s execution rarely completes — it should be up until the view was dropped by a user or Epsio was shutdown. Therefore, this mitigation is also less relevant for us.

Conclusion

Memory-intensive applications can pose challenges. It’s crucial to consider the internal behavior of your allocator, regulate memory usage, and not rely solely on the OS to save the situation. Experimenting with different inputs and scenarios can cause different allocation patterns that might or might not cause fragmentation or bloating.

If you want to try Epsio’s blazingly fast (and super memory regulated) SQL engine, check us out here.

References:


This is some text inside of a div block.
...

Ready to get started?