Scaling at Instacart: Distributing Data Across Multiple Postgres Databases with Rails

The Challenge
I love a good challenge and at Instacart we have lots of exciting challenges. I work on the Shopper Success team, tasked with making shopping and delivering groceries to our customers a good and reliable gig for our shoppers. Part of that responsibility is ensuring the stability of all of our fulfillment systems, so that shoppers can work. We have a lot of data, which is a wonderful problem to have, but also rapidly multiplying as we grow. Our code runs in different apps and services, but we still relied on only two databases, our primary and our item catalog, and were quickly running out of space.
Objectives
The first step of architecting is always to figure out what we’re optimizing for. We’ve enjoyed great success with Postgres and saw no reason to depart. It has given us high performance and has an incredible array of datatypes and query functions that have been extremely useful.
Space
Our most urgent priority was finding more space. We were up against the clock and at high risk of hitting the 6TB limit (raised to 12TB before we finished) of Amazon RDS. We had a number of parallel initiatives to potentially address the issue, with the expectation that at least one would resolve the issues for the medium term and a combination would provide longer-term support.
Isolation
One way to improve the robustness of systems is to isolate them, limiting the frequency of incidents and breadth of impact. At Instacart, we serve a four-sided marketplace. We serve the customers and the shoppers who pick and deliver groceries to customers, but we also work closely with retailers and consumer packaged goods companies. We have different SLAs in each area of the business to cater to their needs. With our shoppers, there is a moral imperative to provide a high stability. If our fulfillment (shopper) systems go down, the company can’t deliver orders, but more importantly, we would be letting down those who depend on Instacart for income.
Performance
Obviously we wanted to maintain high performance, but were not looking to drastically alter our healthy performance. Replicas already handle most of the read load and our write load was manageable.
Application-Level Asynchronous Data Pump with Multiple Postgres Databases
After examining the hundreds of tables we have, we found only a few dozen were needed by multiple domains. These tables often have a number of joins and complex queries that make an API inappropriate. For example, when customers place orders, the order is written to the Customers database in the Customers domain, where it is needed so customers can access and modify their orders. However, some of the data about this order is critical to the Fulfillment domain. In order to provide isolation, we needed to copy the data somewhere. Postgres is tremendously powerful and we saw no reason to move away from it, especially with horizontally-scaling options like Aurora available. We decided to create a dedicated Fulfillment database that would house all of the data needed to complete an order and share the necessary data between databases. This allows complete isolation and greater ownership of relevant data.

Multiple Databases
It’s pretty easy to connect to multiple databases in Rails.
class FulfillmentRecord < ActiveRecord::Base
self.abstract_class = true
establish_connection "fulfillment_#{Rails.env}".to_sym
endmodule FulfillmentRecordConnection
extend ActiveSupport::Concern
included do
def self.retrieve_connection
FulfillmentRecord.connection
end
end
endclass SomeFulfillmentModel < FulfillmentRecord
# Connects to Fulfillment database
endclass SomeCustomersModel < CustomersRecord
# Connects to Customers database
endclass Fulfillment::SomeCustomersModel < SomeCustomersModel
include FulfillmentRecordConnection
# Inherits from the customers model, but connects to the
# fulfillment database
end
The example above shows how we can have models within a single app that have different connections in ActiveRecord. The line include FulfillmentRecordConnection
changes the connection of a subclass while inheriting all of the functionality from the parent class or reopened class.
Master & Copied Tables
When managing data in multiple places, we need to make sure they stay in sync and do not diverge. A change applied to one place needs to be applied elsewhere, eventually — more on this later. We looked at all of the tables needed by multiple domains and found that most had all of their writes originating from a single domain. Having writes from multiple domains turned out to be a good indication for us that the schema was wrong and it was time to split the table into separate entities. This was particularly convenient because master-slave synchronization is far simpler than master-master and the whole host of race conditions that follow.
For each shared table, we determined the owning domain where the table and writes would live. Other domains needing this table would have what we call a “copied table.” We were careful not to call them replicas or slaves. Our copied tables are read-only by the production Postgres user to avoid accidental modifications, as well as with a basic application-level write protection. Here is a simplified implementation of our CopiedTable
.
module CopiedTable
extend ActiveSupport::Concern def readonly?
!@force_write && !new_record?
end INSTANCE_FORCE_ACTIONS = [:update, :update!, :destroy, :destroy!, :save, :save!, :update_columns]
INSTANCE_FORCE_ACTIONS.each do |action|
define_method("force_#{action}") do |*args, &block|
@force_write = true
result = send("#{action}", *args, &block)
@force_write = false
result
end
end included do
def apply_published_changes(params)
id = params[:id]
attrs = filtered_attributes(params[:attrs])
action = params[:action] # create, update, or delete
# Do (force_)action with attrs
end def filtered_attributes(attributes)
# Removes attributes that do not exist on the copied table
present = column_names.each_with_object({}) { |key, hash| hash[key.to_sym] = true }
attributes.select { |key, v| present[key.to_sym] }
end
end
end
Simply, it prevents accidentally modifying records and makes it easy to take a generic payload of changes and apply it to a model. To use it, we include CopiedTable
on models that are copied.
Application-Level Data Pump
Postgres has a number of database-level mechanisms for sharing and replicating data. Replicas are great for isolating read load, but do nothing to help with space or write isolation. We experimented with foreign data wrappers (FDW) coupled with materialized views to cache foreign data. The experiment wasn’t unsuccessful, but we found materialized view refreshes to be potentially problematic at scale.
By replicating at the application-level, it gives us greater logical control. Two different apps might care about different data. Perhaps the Customers domain needs a full record of every order, but the Fulfillment domain only needs information to deliver current orders. The Fulfillment domain can ignore updates to past orders or information about which coupons were used. The performance overhead was a trade-off that made sense for us.
To facilitate this, we publish changes over Hub, our pub/sub implementation. We can make a model publish its changes with include PublishChanges
.
module PublishChanges
extend ActiveSupport::Concern included do
after_commit :publish_changes
end def hub_publish_changes
# Publish data for subscriber
# "#{model_name.underscore}_changed", e.g. "shopper_changed"
# type: model_name, e.g. Shopper
# id: self.id,
# action: action, e.g. :create
# changes: transformed_changes, e.g. { foo: :new_value }
# Implementation not shown
end
end
Then we have the CopiedTable
module to help receive updates. This snippet of copied table shows the apply_published_changes
class method, which allows our publish consumer to trivially call Model.apply_published_changes(params)
to create, update or delete from the copied table. The consumer can ignore rows the domain doesn't care about (e.g. past order totals).
module CopiedTable
extend ActiveSupport::Concern included do
def apply_published_changes(params)
id = params[:id]
attrs = filtered_attributes(params[:attrs])
action = params[:action]
# Do action with attrs
end def filtered_attributes(attributes)
# Removes attributes that do not exist on the copied table
present = column_names.each_with_object({}) { |key, hash| hash[key.to_sym] = true }
attributes.select { |key, v| present[key.to_sym] }
end
end
end
Eventual Consistency
Eventual consistency is a well-defined concept that basically means a replicated data source will eventually have all of the data that the original data source has, but there will probably be a lag. This is a common trade-off that allows high performance of both reads and writes. If two databases are atomically coupled, then slow performance on either will negatively affect the other. This pattern ensures that any performance issues on the Fulfillment domain don’t prevent customers from placing orders, for example.
Verifying Integrity
There’s always a risk data will get out of sync or fall significantly behind. If a bug prevents publishing changes or someone trips over a power cord, we need to be sure that we identify and resolve differences with a sweeper. We have a regularly scheduled job that runs for each copied model and ensures that all updates exist. We can query the first database by updated_at
, which we index out of habit, and the second database by id's from the first database. When performed in batches, this approach has worked well.
Refactoring Data
Great, so we have a new architecture in mind, but we can’t simply merge a few PRs and have terabytes of data moved without downtime, errors, or loss of data. Every data migration is different, depending on volume, access patterns and sensitivity. After moving hundreds of tables, we developed an effective process for moving data without negatively affecting live systems.
Identify dependencies
Before moving a table, it’s important to find all joins, triggers, and other related entities that will need to move. I found a few tricks that worked well. First, I simply searched both model and table names in the codebase and moved what I could easily find. Then, on a branch I removed the model and table I wanted to move and let our tests find queries I had missed. Last, I used our profiling tools and logging to examine queries that were run.
Consolidate writes
Our initial state for most tables was multiple domains reading from and writing to a single domain’s database for a particular table. In order to have a clearly defined access pattern with a reliable replication and validation, we wanted a single domain to own the table. Moving this code helped us find lingering or dead code from the old monolith. In every case we saw, when multiple domains were writing to the same table, it was a code smell and we decided to split up or refactor those models.
Establish Models
The next step is to create the schema and model in the new database. Copying part of the structure.sql
or schema.rb
is usually a good starting point. Be sure it indicates in code that the model is not ready for primetime yet or you may have someone querying old/bad data. I found it worked best to create a model with no validation that was used to copy whatever was in the original table.
class Foobar < ActiveRecord::Base
# Existing full model definition
# ...
endclass Fulfillment::Foobar < FulfillmentRecord
include CopiedTable
# Empty model, except for serialized fields
serialize :data
end
Backfill data
One of our engineers wrote a great tool, called pgsync, for transferring data between postgres databases. We use it most often for pulling obfuscated staging data to local dev environments, but it can also work between production databases. Depending on how much data you’re moving, it could take a while. Perhaps make a note of when you ran so you can sync future updates from that point forward.
pgsync the_table --in-batches --from $DATABASE_1_URL --to $DATABASE_2_URL --sleep 0.1 --to-safe
Synchronous Dual-write
As we prepare for “cutover,” it’s important that the tables are in close sync. If we cutover and there’s a one minute lag, we’re at risk of losing that data. Granted, there are ways to prevent, but an easy way to reduce issues stemming from race conditions is the have really short races. This setup will write to the second database in the callback for writing to the first.
module DualWrite
extend ActiveSupport::Concern class_methods do
def dual_write(to:)
self.to_model = to
end
end included do
cattr_accessor :to_model
after_save :dual_write_to_db
after_destroy :dual_destroy_from_db
end protected def prepare_attributes
present = Set.new(to_model.constantize.column_names)
attributes.select { |key, v| present.include?(key) }
end def dual_write_to_db
return unless enabled?
r = to_model.constantize.find_or_initialize_by(id: id)
r.assign_attributes(prepare_attributes)
r.respond_to?(:force_save!) ? r.force_save!(validate: false) : r.save!(validate: false)
end def dual_destroy_from_db
return unless enabled?
r = to_model.constantize.find_by(id: id)
return unless r
r.respond_to?(:force_destroy) ? r.force_destroy : r.destroy
end def enabled?
to_model && self.class.to_s != to_model
end
end
Note: self.attributes
includes the primary key, so the new row will be created without incrementing the auto-increment. This is something you could consider doing manually after each create, perhaps with:
Fulfillment::Foobar.connection.execute("SELECT SETVAL('foobar_id_seq', COALESCE(MAX(id), 1) ) FROM foobar;")
Cutover
Now we have all of our ducks in a row and it’s time to make the jump. Make a single PR that changes the database of the model, creates a new model for the old database, and dual-writes the other direction. You should also disable the sweeper just before the cutover is live and re-enable it the other direction just after. I recommend sticking with synchronous during the cutover to facilitate a cleaner rollback. Query both databases and verify the data looks good. I found a simple query like
SELECT COUNT(id), AVG(id) FROM foo_bar WHERE updated_at < now() - INTERVAL '10 seconds'
was remarkably effective in ensuring the source and copied tables contained the same rows.
Asynchronous Replication
Once we’ve gained confidence the cutover has gone smoothly and we will not need to rollback, it’s time to make our synchronization asynchronous, so we can gain the real benefits of isolation. Start publishing and consuming changes before removing our synchronous dual-write. Once we’re asynchronously writing, then we have decoupled the two databases! Writing to the first database is not dependent on the second database also being healthy and reads are on the local database.
Dropping Old Tables
In some cases we no longer have a need for the table in the old database. Dropping tables can feel like Russian roulette, but there are some tricks to tame it down. Archiving is basically always a good idea. If you can tolerate failed queries if you’re wrong, revoking permissions is a pragmatic way to verify a table isn’t used. I usually write out the grant statement to restore permissions and have it in a text editor so I can undo as quickly as possible if needed. If your system cannot tolerate a minute of failing queries, a more challenging, but safer approach, is to reset the table’s statistics. Make sure that tracking is enabled, reset that table’s and wait. When you check back, you should see that the access counts are zero. More on this in the Postgres docs.
Conclusion
Instacart has a lot of data and has adopted an application-level data pump pattern for replicating data between databases. This approach is generic enough to work for all tables, but customizable enough to allow for denormalization, filtering, and other performance optimizations. We start by moving all of the writes to the eventual owning domain, synchronously dual-writing to both databases, cutting over the source of truth, and then replicating asynchronously.
Hopefully my experience re-architecting our production data is helpful to you. Have ideas on how we could do better? Great, we’re hiring! Come join as we transform the grocery industry!