Scaling Elasticsearch: Sharding and Availability for Hundreds Of Millions of Documents

SignalFx is known for monitoring modern infrastructure, consuming metrics from things like AWS or Docker or Kafka, applying analytics in real time to that data, and enabling alerting that cuts down the noise. Core to how we do that is search. It’s not good enough to just process and store/retrieve data faster than anything out there, if it takes a long time for users to find the data they care about. So to match the speed of the SignalFlow analytics engine that sits at the core of SignalFx, we’ve been using Elasticsearch for our searching needs from day one.

In this post we’ll go over what we’ve learned about scaling Elasticsearch while maintaining availability for all the search capabilities inside SignalFx, both for internal services and out to our customers.

Elasticsearch Webinar  

Why Elasticsearch

While we use Cassandra as our timeseries database and it’s very fast for that purpose, it doesn’t allow us to run ad-hoc queries or have full-text search capabilities. When we’re dealing with the metadata attached to metrics, we are handling structured text. Being able to run queries on top of that data is Elasticsearch’s sweet spot!

We’ve found Elasticsearch to be highly scalable, providing a great API, and very easy to work with for all our engineers. Ease of setup also makes it very accessible to developers without operational experience. Furthermore it’s built on Lucene, which we’ve found to be solid.

Since the launch of SignalFx in March of 2015, our Elasticsearch deployment has grown from 6 shards to 24 (plus replicas) spread over 72 machines holding many hundreds of millions of documents. And it’ll soon double to keep up with our continued growth.

What Elasticsearch is Used for in SignalFx

An important aspect of how SignalFx works and why people use it is SignalFx’s ability to consume dimensions and other metadata. Users ship metadata in with metrics, which then allows them to aggregate, filter, or group both the raw metrics and any analytics they do against those metrics by the metadata. For example: get the 90th percentile of latency for an API call response grouped by service and client type. All of that metadata is stored in, indexed by, and searched with Elasticsearch.

Elasticsearch used for SignalFx Metric Metadata

Filters in SignalFx charts


That metadata and features like metric names, dashboard titles, or detector names are how SignalFx users find content or objects in the system. Anytime a user uses the Catalog, those searches are served by Elasticsearch.

Elasticsearch used in the SignalFx Catalog

Finding things in the SignalFx Catalog


In addition to metrics, SignalFx also consumes/produces and presents events like alerts, code pushes, config runs, etc., which have their own metadata. All of those events are stored, indexed, and searched using Elasticsearch.

Elasticsearch used for SignalFx Events

Events in SignalFx


Metadata changes frequently, making it highly mutable. This makes the way we use Elasticsearch very resource intensive, and makes availability a paramount concern since users rely on the service for their use of SignalFx. Since we have theoretically unbounded growth, we naturally end up with two major challenges: how many shards should we have and how do we reshard with zero downtime.

You can read more about how we monitor Elasticsearch to baseline resource utilization and figure out when we’ll be due for a resharding here: How We Monitor Elasticsearch at Scale.

Elasticsearch Scaling Challenge #1: How Many Shards Are the Right Number of Shards

The fundamental unit of scale in Elasticsearch is the shard. Sharding allows scale out by partitioning the data into smaller chunks that can be distributed across a cluster of nodes:

An index can potentially store a large amount of data that can exceed the hardware limits of a single node. For example, a single index of a billion documents taking up 1TB of disk space may not fit on the disk of a single node or may be too slow to serve search requests from a single node alone.

To solve this problem, Elasticsearch provides the ability to subdivide your index into multiple pieces called shards. When you create an index, you can simply define the number of shards that you want. Each shard is in itself a fully-functional and independent “index” that can be hosted on any node in the cluster.

The challenge is to figure out the right number of shards, because you only get to make the decision once per index. And it impacts both performance, storage and scale, since queries are sent to all shards. In order to move to a different number of shards, you have to create a new index with the new shard configuration and migrate everything. This is non-trivial.

Of course, you don’t know how many queries or users you’ll have ahead of time, or what your growth will be. You could say: Let’s set it to 10,000 shards for many years of growth. But that won’t work because then your data is split into 10,000 pieces and every query will get sent to all 10,000 shards requiring 10,000 responses creating a commensurate amount of I/O, threads, context switches, coordination on the master, etc. This is what’s called the gazillion shard problem.

Unfortunately, there’s no magic formula. To decide how many shards to start with, you need to consider how big the index might grow — by size, by query volume, and by write load. The Elastic team recommends starting with one shard, sending “realistic” traffic, and seeing where it breaks. Then add more shards and retest until you find the right number. The key is to pick some kind of a timescale. You will eventually have to reshard; the only question is when. So given some projected index size in some time frame based on some growth metric that ties to Elasticsearch usage (like number of users), compare the numbers you see with what kind of storage usage and performance you want per shard. Then extrapolate out from there.

One other thing: you have to have a minimum of one shard per node. So if you decide your index needs three shards and you distribute them to three nodes (one per node) and those nodes run out of resources—you’ve painted yourself into a corner. Putting one shard per node means that when any node runs out of resources you have no choice but to reshard. We had this experience early on and don’t recommend it: SignalFx was so successful at launch that the original six shards we had gone with on six nodes immediately had to be resharded. Which leads us to the next challenge…

Elasticsearch Scaling Challenge #2: Resharding With Zero Downtime

Typically, this is how you reshard:

  • Create new index with new number of shards
  • Migrate docs via bulk operations: read all the docs in the old index and index (copy) them into the new index

This is a straightforward operation when nothing is changing. But if you’ve got a live system creating new data and serving queries for customers, docs are being changed in the original index. There’s no way to have an exact snapshot of the data and it’s possible that you will never catch up with the changes being made in the original index, depending on load. The switch from the old index to the new index at the end has to be atomic — you have to make sure that when you redirect queries and writes to the new index, everything that was available in the old index is available in the new index so that queries continue to work without interruption.

What happens if you make a mistake? How do you go back? Or what if the new index is just much slower? You need a fallback. This is where a lot of our work with Elasticsearch has been, resharding without downtime.

The fundamental mechanism that allows us to maintain availability through resharding is something we call the index generation. When we index a doc into Elasticsearch, we write a special field called “index generation”, which is the same for all docs in the index in normal conditions. We’re basically versioning the writes. This is a bit of state that has to be stored somewhere, and we use Zookeeper.


Let’s say we’re on an original (source) index with the generation set to 1 (gen-1) and walk through resharding to a new (target) index:

Initialization phase

  • To begin resharding, we create the new index with the right number of shards (and mapping changes if any). The new index has zero replicas and has its refresh interval set to -1, since we don’t need to query that index yet.
    • We usually spin up a new set of nodes. This incurs additional cost, but we’ve found it to be the safest choice.
    • If load on the source index is very low, and you know it’s going to stay that way, it’s possible to do the whole thing in place—but we don’t recommend it.

Bulk import phase

  • Increment generation to gen-2 and confirm that generation has been updated across all the components writing to the index (we use zookeeper to coordinate this). From this point forward, all new docs or changed docs are gen-2. This allows us to bound the set of documents that are at gen-1 or less.
  • Do a scan on the source index and bulk import gen-1 (or less) docs to the target index. We use scrolling for this, because it’s the most efficient way of retrieving a large number of documents from an index.
    • Keep in mind that scrolling can disallow merged segments from being claimed back by the filesystem, since the scroll is still using them.
    • To avoid that, we rely on a bucket number that is available on every document we index. The bucket number is assigned when the document is created and is a hash of the ID modulo the total number of buckets (we use 64k). This allows us to partition the document set into a random and uniformly distributed set of buckets, which we add as a parameter to scrolling—in effect, preventing scrolling over the entire index at once.
    • This also gave us a way to makes the operation recoverable in case of faults. Since bulk import at our size could take days, there can be occasional problems that called for pause, recovery, or rollback.
    • Now we can stop and start scrolling without having to go back to the beginning of the index because we’re importing bucket by bucket and tracking which bucket we’re on (in Zookeeper). This lets us automate dealing with crashes during the migration and pause migrations for any reason.
    • We also turn off index refreshing on the target index to make migrations faster, since it’s not serving any requests.


Double publishing and cleanup phase

  • Once all gen-1 docs are confirmed to be in the target index, we flip the switch (in Zookeeper), directing all writers to start writing to the target index while still writing to the source index.
  • Increment generation to gen-3.
    • Now we know that anything that changes from this point forward is gen 3 AND
    • All gen-3 changes are written to both the source and the target index AND
    • Anything that’s gen-1 (or lower) has been migrated to the target index AND
    • Only gen-2 docs need to be reconciled between the source and target index AND
    • gen-2 docs are a bounded set (i.e., their count will either stay the same or decrease, but never increase)
  • Query the source index for gen-2 docs and write them to the target index, at which point they’ll become gen-3.
    • We still have double writing on, so they’ll get written to both indexes.Query the source index for gen-2 docs and write them to the target index, at which point they’ll become gen-3.
    • This process continues until there are no more gen-2 docs in the source index.
  • At completion of the last step, we know that the target index is completely up to date with the source index.


Closing phase

  • Change the index alias, which is an atomic operation. Now all queries are served by the target index.
  • Turn off double writing.
  • Close the source index, freeing up resources.
  • Maintain the source index on disk for a full day, just in case. 🙂


What We Learned

There are two distinct challenges with scaling Elasticsearch centered around sharding: how many shards do you need and how do you reshard without downtime. But with a little work, we’ve found that both are solvable.

  1. Understand the performance vs storage tradeoff for your use case through incremental baselining both starting with a single shard and moving up from there.
  2. Start out with more than one shard per node so you can horizontally scale by adding nodes and moving shards until you hit a one-to-one ratio.
  3. Break up long processes into smaller components:
    • For scrolling, we do this using the bucketing mechanism described above.
    • For the entire resharding process, we do this using the index generation mechanism described above.

We’ve been able to go from being forced to reshard without warning, to being able to predict when we’ll have to reshard and preparing for the process ahead of time.

And we can now scale, reshard, move indexes with zero downtime—maintaining availability and performance of Elasticsearch to both internal consumers of the service as well as SignalFx customers searching and navigating in our UI.

Mahdi Ben Hamida

Posted by