Ecto & Multi-tenancy - Dynamic Repos - Part 2
2019-11-01Underjord is a tiny, wholesome team doing Elixir consulting and contract work. If you like the writing you should really try the code. See our services for more information.
In the first part I covered the basics of getting started with Dynamic repositories with Ecto. Using that post we can create one or more repos at runtime, create the necessary database, run migrations to get it ready and then direct queries to it. That's a good start. Building blocks for something better. I'll try to get into the better bits here.
That said, I still ended up using prefixes because it made my code simpler. It didn't end up as cool or as interesting. But it turned out a lot less suprising. I'm planning on covering prefixes in a separate post. You can read the basic guide on prefixes
from the docs until then.A brief outline:
- Cleaning up runtime migrations
- Managing multiple connection pools
- Phoenix integration, plug and play
- Testing
Cleaning up runtime migrations
This is largely based on advice I received from José during my dive into this stuff. I hadn't otherwise considered this solution.
By convention migrations live in .exs
files, outside of your application. They live in the
priv
directory and are mostly a concern to whatever mix ecto
commands you happen to be
running. Since they
aren't really a runtime concern normally this is reasonable. Keeps your application clean.
For us, they are a runtime concern. So what do we do? Well, Ecto.Migrator.run
has support for getting a
list of migration modules instead of a list of migration paths. Let's try that.
Given our trusted application MyApp
and our repo module MyApp.Repo
I'd simply add them
to a migrations directory and make sure they live under MyApp.Repo.Migrations.CreateMySpecificTable
and
then create a migrations.ex
:
defmodule MyApp.Repo.Migrations do
require Logger
def get_migrations() do
migration_modules = [
{20190819_062207, MyApp.Repo.Migrations.CreateMySpecificTable},
{20190819_063752, MyApp.Repo.Migrations.FixMySpecificTable}
]
for {_version, module} <- migration_modules do
if !migration?(module) do
Logger.error(
"#{inspect(module)} does not seem to be a migration module. Please make sure that your migration files are .ex and not .exs for runtime migrations."
)
end
end
migration_modules
end
defp migration?(mod) do
function_exported?(mod, :__migration__, 0)
end
end
There is nothing special about this modules location.
This has more code than strictly necessary. But I shot myself in the foot last time I brought in some migrations
because I didn't catch the .ex
to .exs
so I ripped out the private
migration?
check from the migrator to prevent myself from doing that again and I want to spare everyone
else that nuisance.
So you can pass this get_migrations()
result right into your Ecto.Migrator.run
. So what
have done here? We've made the migrations part of our application. Which makes sense since we want them to
available at runtime.
Managing multiple connection pools
This was my next need, managing all the connection pools. So a consequence of this approach is that Ecto will start a connection pool for each tenant. I would have preferred being able to run a single pool for many databases but this is what we have for now.
So I set up a GenServer that you could ask for the repo PID of a customer. So it registers processes. But there are some extras that separate it from a straight up registry.
- When asking for a customer repo it will start one if there isn't one.
- It maintains limits for how many repos we want to keep open at a time. Postgres has connection limits, we want to be able to stay within whatever limits we have on that end. On starting a new repo it would close old ones when close to the limit, LRU style.
So the GenServer manages the registry state and some public functionality in the module would provide conveniences for creating a database, migrating a database and activating the one you need.
I ended up with code that looked like this:
defmodule MyApp.Customers.RepoManager do
use GenServer
alias MyApp.Customers
# Public API (client)
def start_link(settings) when is_list(settings) do
settings = Enum.reduce(
settings,
%{},
fn {key, value}, new ->
Map.put(new, key, value)
end
)
start_link(settings)
end
def start_link(settings) when is_map(settings) do
GenServer.start_link(__MODULE__, settings, name: __MODULE__)
end
def get_database_name(customer) do
"database_#{customer.slug}"
end
def get_customer_options(customer) do
customer_database = get_database_name(customer)
config = Application.get_env(:my_app, MyApp.Repo)
config
|> Keyword.put(:name, nil)
|> Keyword.put(:database, customer_database)
end
def set_customer(customer, ensure_exists \\ false) do
# Yes, this is entirely side-effects
if ensure_exists do
GenServer.cast(__MODULE__, {:ensure_repo_exists, customer})
end
repo_pid = GenServer.call(__MODULE__, {:get_customer_pool, customer})
MyApp.Repo.put_dynamic_repo(repo_pid)
{:ok, repo_pid}
end
def unset_customer(customer) do
GenServer.call(__MODULE__, {:get_customer_pool, customer})
MyApp.Repo.put_dynamic_repo(MyApp.Repo)
:ok
end
def destroy_repo(customer, wait \\ false) do
unset_customer(customer)
options = get_customer_options(customer)
if wait do
# Takes a bit
MyApp.Repo.__adapter__().storage_down(options)
else
Task.async(fn ->
MyApp.Repo.__adapter__().storage_down(options)
end)
end
end
# Callbacks (internal)
@impl true
def init(
%{
soft_limit: soft_limit,
hard_limit: _hard_limit
} = settings
) do
# Bonus point if we could sort this for most recently active
customers = Customers.list_customers()
# Ensure all customers have databases
for customer <- customers do
ensure_repo_exists(customer)
end
state = %{
pools: %{},
settings: settings
}
state =
customers
# Warm up by starting arbitrary customers up to the limit
|> Enum.take(soft_limit)
|> Enum.reduce(state, fn customer, state ->
start_connection_pool(customer, state)
end)
{:ok, state}
end
@impl true
def handle_call(
{:get_customer_pool, customer},
_from,
state
) do
{repo_pid, state} = get_connection_pool(customer, state)
{:reply, repo_pid, state}
end
@impl true
def handle_cast(
{:ensure_repo_exists, customer},
state
) do
ensure_repo_exists(customer)
{:noreply, state}
end
@impl true
def handle_cast(
:clean_pool,
%{
pools: pools,
settings: %{
soft_limit: soft_limit
}
} = state
) do
diff = map_size(pools) - soft_limit
pools =
if diff > 0 do
close_oldest(pools, diff)
else
pools
end
state = %{state | pools: pools}
{:noreply, state}
end
# Internal functions
defp ensure_repo_exists(customer) do
options = get_customer_options(customer)
MyApp.Repo.__adapter__().storage_up(options)
options = Keyword.put(options, :pool_size, 2)
{:ok, repo_pid} = MyApp.Repo.start_link(options)
MyApp.Repo.put_dynamic_repo(repo_pid)
Ecto.Migrator.run(
MyApp.Repo,
MyApp.Repo.Migrations.get_migrations(),
:up,
all: true,
dynamic_repo: repo_pid
)
MyApp.Repo.stop(1000)
MyApp.Repo.put_dynamic_repo(MyApp.Repo)
end
defp start_connection_pool(
customer,
%{pools: pools, settings: settings} = state
) do
diff = map_size(pools) - settings.hard_limit
pools =
if diff > 0 do
close_oldest(pools, diff)
else
pools
end
options = get_customer_options(customer)
{:ok, repo_pid} = MyApp.Repo.start_link(options)
last_used = timestamp()
pools = Map.put(pools, customer.id, {repo_pid, last_used})
GenServer.cast(:clean_pool, state)
%{state | pools: pools}
end
defp get_connection_pool(customer, %{pools: pools, settings: _settings} = state) do
pool = Map.get(pools, customer.id, nil)
state =
if pool == nil do
start_connection_pool(customer, state)
else
state
end
%{pools: pools} = state
{repo_pid, _last_used} = Map.get(pools, customer.id, nil)
# Update usage timestamp
pools = Map.put(pools, customer.id, {repo_pid, timestamp()})
state = Map.put(state, :pools, pools)
GenServer.cast(:clean_pool, state)
{repo_pid, state}
end
defp close_oldest(pools, number) do
{trim, keep} =
pools
|> Enum.sort_by(fn {_customer_id, {_repo_pid, last_used}} ->
last_used
end)
|> Enum.split(number)
for {_customer_id, {repo_pid, _last_used}} <- trim do
close_pool(repo_pid)
end
# Recreate map for the rest
Enum.reduce(keep, %{}, fn {customer_id, pool}, pools ->
Map.put(pools, customer_id, pool)
end)
end
defp close_pool(repo_pid) do
MyApp.Repo.put_dynamic_repo(repo_pid)
MyApp.Repo.stop(1000)
end
defp timestamp do
DateTime.utc_now()
end
end
It runs a GenServer that keeps track of your connection pools and can give you access to them on-demand. It will also do some simplistic enforcing of limits on the number of pools. This allows some control of Postgres connection limits, memory usage and such.
This was an effort that never quite made it into use since I switched to prefixes. So consider your use case, check your assumptions and if anything, just use this code as an indicator of one way to go about it. It hasn't earned any battle-scars yet.
Phoenix integration, plug and play
To avoid having every developer that ever makes an Ecto query needing to remember to call
put_dynamic_repo
I figured it might be worthwhile to do that automatically. So I built a plug that did
this. It assumes a few things about your API path structure. It uses the RepoManager we created above to set your
dynamic repo based on information in the request.
defmodule MyApp.Customers.CustomerPlug do
import Plug.Conn
alias MyApp.Customers
alias MyApp.Customers.RepoManager
def init(options) do
options
end
def call(conn, _opts) do
customer_id = extract_customer_id(conn)
customer = Customers.get_customer!(customer_id)
{:ok, repo_pid} = RepoManager.set_customer(customer)
conn
|> assign(:customer_id, customer_id)
|> assign(:customer, customer)
|> assign(:customer_repo, repo_pid)
end
defp extract_customer_id(
%Plug.Conn{
path_info: path_info
} = conn
) do
if List.starts_with?(path_info, ["api", "customers"]) do
case Enum.fetch(path_info, 2) do
{:ok, customer_id_string} ->
case Integer.parse(customer_id_string) do
{integer, _} -> integer
:error -> auth_failed(conn)
end
:error ->
auth_failed(conn)
end
else
auth_failed(conn)
end
end
defp auth_failed(conn) do
send_resp(conn, 400, "Invalid request, customer authentication failed.")
halt(conn)
end
end
This means that for the current process you will have the right dynamic repo set already, based on what the query is about. In my case I would likely add authorization to this plug as well to make sure only the appropriate users get access to a given customer.
Any developer can just keep using Ecto the way one usually would within the realm of a Phoenix application for example. But if you spawn a process you are bound to lose your carefully selected dynamic process, because that is set using the process dictionary for the current process. So in the end there may be patterns that are quite a bit safer. But as long as your default repo doesn't work at least you'll get an error trying to run queries to let you know that you failed to do the right thing.
Testing
I found testing to be a bit of a challenge. During testing we generally have the Ecto Sandbox to keep things nice, fast and contained. But if you are testing this sort of thing you might want to either break out of the sandbox or make darn sure you are using it in the right way.
And do you want all your tests to actually exercise the database approach you have or do you want to limit that to the most important. I spent a bunch of time fighting with the sandbox functionality, performance and trying to find a good balance.
So testing-wise it definitely feels like you'll be a bit off of the beaten path. I think you can successfully bring most of your tests back into the fold and have a fairly normal experience and fast tests. The tests that I started with created a dynamic repo for each test, setup and teardown for those took forever.
This was my setup most recently.
I do not start the dynamic repo in the my application. I start my customer repo there. And my test helper reflects that.
ExUnit.start()
Ecto.Adapters.SQL.Sandbox.mode(MyApp.Customers.CentralRepo, :manual)
I couldn't find a good way to work with the sandbox and dynamic repos. So for the tests I used:
..
config :my_app, MyApp.Repo,
username: "postgres",
password: "postgres",
database: "my_app_test",
hostname: "localhost",
pool_size: 2
..
To test the repos and repo management I used this:
defmodule MyApp.RepoTest do
use ExUnit.Case
alias MyApp.Quality
alias MyApp.Customers
alias MyApp.Customers.RepoManager
def new_customer_attrs() do
%{
name: "Test customer",
slug: Ecto.UUID.generate(),
}
end
@create_published_attrs %{
title: "some title",
text: "some text"
}
setup do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(MyApp.Customers.CentralRepo)
Ecto.Adapters.SQL.Sandbox.mode(MyApp.Customers.CentralRepo, {:shared, self()})
:ok
end
describe "dynamic repos" do
test "create documentation with repo" do
{:ok, customer} = Customers.create_customer(new_customer_attrs())
RepoManager.set_customer(customer, true)
{:ok, documentation} = Quality.create_documentation(@create_published_attrs)
assert documentation.id == 1
RepoManager.destroy_repo(customer)
end
test "create documentation with two different repos" do
{:ok, customer} = Customers.create_customer(new_customer_attrs())
RepoManager.set_customer(customer, true)
{:ok, documentation} = Quality.create_documentation(@create_published_attrs)
assert documentation.id == 1
{:ok, other_customer} = Customers.create_customer(new_customer_attrs())
RepoManager.set_customer(other_customer, true)
{:ok, documentation} = Quality.create_documentation(@create_published_attrs)
assert documentation.id == 1
RepoManager.destroy_repo(customer)
RepoManager.destroy_repo(other_customer)
end
end
end
These tests create repos on-demand and verify that this functionality works as expected. Creating the same ID in parallell.
I wouldn't swear to this conn_case.ex
below being perfect. I know I fought quite a bit getting to a
place where the RepoManager and the sandbox wouldn't be a complete shitshow. This was part of both
ConnCase
and DataCase
to make sure other tests did not have to be written with customer
management in mind.
..
setup_all tags do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(MyApp.Customers.CentralRepo)
unless tags[:async] do
Ecto.Adapters.SQL.Sandbox.mode(
MyApp.Customers.CentralRepo,
{:shared, self()}
)
end
{:ok, customer} = MyApp.Customers.create_customer(@customer_attrs)
RepoManager.set_customer(customer, true)
:ok = Ecto.Adapters.SQL.Sandbox.checkin(MyApp.Customers.CentralRepo)
on_exit(fn ->
RepoManager.set_customer(customer)
RepoManager.destroy_repo(customer)
end)
:ok
end
..
So we set up cases to use a specific dynamic repo and unless they screw around with processes they don't have to care about the current customer or current repo. I think that was more elegant than passing a customer everywhere and using some boilerplate to get every test in line. With prefixes I took another approach. Which I'll cover when I cover that. If there is interest.
Update: Part 3 - Prefixes is available.
Would you be interested in more Ecto-related posts? Or does your business need someone to dive deep into these odd corners of our beloved ecosystem and draw a map in this manner. I rather enjoy it and would be happy to figure something out. Let me know at lars@underjord.io.
Underjord is a 4 people team doing Elixir consulting and contract work. If you like the writing you should really try the code. See our services for more information.
Note: Or try the videos on the YouTube channel.