Seeds grow in the underground

Ecto & Multi-tenancy - Dynamic Repos - Part 2

2019-11-01

In the first part I covered the basics of getting started with Dynamic repositories with Ecto. Using that post we can create one or more repos at runtime, create the necessary database, run migrations to get it ready and then direct queries to it. That's a good start. Building blocks for something better. I'll try to get into the better bits here.

That said, I still ended up using prefixes because it made my code simpler. It didn't end up as cool or as interesting. But it turned out a lot less suprising. I'm planning on covering prefixes in a separate post. You can read the basic guide on prefixes from the docs until then.

A brief outline:

Cleaning up runtime migrations

This is largely based on advice I received from José during my dive into this stuff. I hadn't otherwise considered this solution.

By convention migrations live in .exs files, outside of your application. They live in the priv directory and are mostly a concern to whatever mix ecto commands you happen to be running. Since they aren't really a runtime concern normally this is reasonable. Keeps your application clean.

For us, they are a runtime concern. So what do we do? Well, Ecto.Migrator.run has support for getting a list of migration modules instead of a list of migration paths. Let's try that.

Given our trusted application MyApp and our repo module MyApp.Repo I'd simply add them to a migrations directory and make sure they live under MyApp.Repo.Migrations.CreateMySpecificTable and then create a migrations.ex :

defmodule MyApp.Repo.Migrations do
  require Logger

  def get_migrations() do
    migration_modules = [
      {20190819_062207, MyApp.Repo.Migrations.CreateMySpecificTable},
      {20190819_063752, MyApp.Repo.Migrations.FixMySpecificTable}
  ]

    for {_version, module} <- migration_modules do
      if !migration?(module) do
        Logger.error(
          "#{inspect(module)] does not seem to be a migration module. Please make sure that your migration files are .ex and not .exs for runtime migrations."
        )
      end
    end

    migration_modules
  end

  defp migration?(mod) do
    function_exported?(mod, :__migration__, 0)
  end
end

There is nothing special about this modules location.

This has more code than strictly necessary. But I shot myself in the foot last time I brought in some migrations because I didn't catch the .ex to .exs so I ripped out the private migration? check from the migrator to prevent myself from doing that again and I want to spare everyone else that nuisance.

So you can pass this get_migrations() result right into your Ecto.Migrator.run. So what have done here? We've made the migrations part of our application. Which makes sense since we want them to available at runtime.

Managing multiple connection pools

This was my next need, managing all the connection pools. So a consequence of this approach is that Ecto will start a connection pool for each tenant. I would have preferred being able to run a single pool for many databases but this is what we have for now.

So I set up a GenServer that you could ask for the repo PID of a customer. So it registers processes. But there are some extras that separate it from a straight up registry.

So the GenServer manages the registry state and some public functionality in the module would provide conveniences for creating a database, migrating a database and activating the one you need.

I ended up with code that looked like this:

defmodule MyApp.Customers.RepoManager do
  use GenServer
  alias MyApp.Customers

  # Public API (client)

  def start_link(settings) when is_list(settings) do
    settings = Enum.reduce(
      settings,
      %{},
      fn {key, value}, new ->
        Map.put(new, key, value)
      end
    )
    start_link(settings)
  end

  def start_link(settings) when is_map(settings) do
    GenServer.start_link(__MODULE__, settings, name: __MODULE__)
  end

  def get_database_name(customer) do
    "database_#{customer.slug}"
  end

  def get_customer_options(customer) do
    customer_database = get_database_name(customer)
    config = Application.get_env(:my_app, MyApp.Repo)

    config
    |> Keyword.put(:name, nil)
    |> Keyword.put(:database, customer_database)
  end

  def set_customer(customer, ensure_exists \\ false) do
    # Yes, this is entirely side-effects
    if ensure_exists do
      GenServer.cast(__MODULE__, {:ensure_repo_exists, customer})
    end

    repo_pid = GenServer.call(__MODULE__, {:get_customer_pool, customer})
    MyApp.Repo.put_dynamic_repo(repo_pid)
    {:ok, repo_pid}
  end

  def unset_customer(customer) do
    GenServer.call(__MODULE__, {:get_customer_pool, customer})
    MyApp.Repo.put_dynamic_repo(MyApp.Repo)
    :ok
  end

  def destroy_repo(customer, wait \\ false) do
    unset_customer(customer)
    options = get_customer_options(customer)

    if wait do
      # Takes a bit
      MyApp.Repo.__adapter__().storage_down(options)
    else
      Task.async(fn ->
        MyApp.Repo.__adapter__().storage_down(options)
      end)
    end
  end

  # Callbacks (internal)

  @impl true
  def init(
        %{
          soft_limit: soft_limit,
          hard_limit: _hard_limit
        } = settings
      ) do
    # Bonus point if we could sort this for most recently active
    customers = Customers.list_customers()

    # Ensure all customers have databases
    for customer <- customers do
      ensure_repo_exists(customer)
    end

    state = %{
      pools: %{},
      settings: settings
    }

    state =
      customers
      # Warm up by starting arbitrary customers up to the limit
      |> Enum.take(soft_limit)
      |> Enum.reduce(state, fn customer, state ->
        start_connection_pool(customer, state)
      end)

    {:ok, state}
  end

  @impl true
  def handle_call(
        {:get_customer_pool, customer},
        _from,
        state
      ) do
    {repo_pid, state} = get_connection_pool(customer, state)
    {:reply, repo_pid, state}
  end

  @impl true
  def handle_cast(
        {:ensure_repo_exists, customer},
        state
      ) do
    ensure_repo_exists(customer)
    {:noreply, state}
  end

  @impl true
  def handle_cast(
        :clean_pool,
        %{
          pools: pools,
          settings: %{
            soft_limit: soft_limit
          }
        } = state
      ) do
    diff = map_size(pools) - soft_limit

    pools =
      if diff > 0 do
        close_oldest(pools, diff)
      else
        pools
      end

    state = %{state | pools: pools}

    {:noreply, state}
  end

  # Internal functions

  defp ensure_repo_exists(customer) do
    options = get_customer_options(customer)
    MyApp.Repo.__adapter__().storage_up(options)
    options = Keyword.put(options, :pool_size, 2)
    {:ok, repo_pid} = MyApp.Repo.start_link(options)
    MyApp.Repo.put_dynamic_repo(repo_pid)

    Ecto.Migrator.run(
      MyApp.Repo,
      MyApp.Repo.Migrations.get_migrations(),
      :up,
      all: true,
      dynamic_repo: repo_pid
    )

    MyApp.Repo.stop(1000)
    MyApp.Repo.put_dynamic_repo(MyApp.Repo)
  end

  defp start_connection_pool(
         customer,
         %{pools: pools, settings: settings} = state
       ) do
    diff = map_size(pools) - settings.hard_limit

    pools =
      if diff > 0 do
        close_oldest(pools, diff)
      else
        pools
      end

    options = get_customer_options(customer)
    {:ok, repo_pid} = MyApp.Repo.start_link(options)
    last_used = timestamp()
    pools = Map.put(pools, customer.id, {repo_pid, last_used})

    GenServer.cast(:clean_pool, state)

    %{state | pools: pools}
  end

  defp get_connection_pool(customer, %{pools: pools, settings: _settings} = state) do
    pool = Map.get(pools, customer.id, nil)

    state =
      if pool == nil do
        start_connection_pool(customer, state)
      else
        state
      end

    %{pools: pools} = state

    {repo_pid, _last_used} = Map.get(pools, customer.id, nil)

    # Update usage timestamp
    pools = Map.put(pools, customer.id, {repo_pid, timestamp()})
    state = Map.put(state, :pools, pools)

    GenServer.cast(:clean_pool, state)

    {repo_pid, state}
  end

  defp close_oldest(pools, number) do
    {trim, keep} =
      pools
      |> Enum.sort_by(fn {_customer_id, {_repo_pid, last_used}} ->
        last_used
      end)
      |> Enum.split(number)

    for {_customer_id, {repo_pid, _last_used}} <- trim do
      close_pool(repo_pid)
    end

    # Recreate map for the rest
    Enum.reduce(keep, %{}, fn {customer_id, pool}, pools ->
      Map.put(pools, customer_id, pool)
    end)
  end

  defp close_pool(repo_pid) do
    MyApp.Repo.put_dynamic_repo(repo_pid)
    MyApp.Repo.stop(1000)
  end

  defp timestamp do
    DateTime.utc_now()
  end
end

It runs a GenServer that keeps track of your connection pools and can give you access to them on-demand. It will also do some simplistic enforcing of limits on the number of pools. This allows some control of Postgres connection limits, memory usage and such.

This was an effort that never quite made it into use since I switched to prefixes. So consider your use case, check your assumptions and if anything, just use this code as an indicator of one way to go about it. It hasn't earned any battle-scars yet.

Phoenix integration, plug and play

To avoid having every developer that ever makes an Ecto query needing to remember to call put_dynamic_repoI figured it might be worthwhile to do that automatically. So I built a plug that did this. It assumes a few things about your API path structure. It uses the RepoManager we created above to set your dynamic repo based on information in the request.

defmodule MyApp.Customers.CustomerPlug do
  import Plug.Conn
  alias MyApp.Customers
  alias MyApp.Customers.RepoManager

  def init(options) do
    options
  end

  def call(conn, _opts) do
    customer_id = extract_customer_id(conn)
    customer = Customers.get_customer!(customer_id)
    {:ok, repo_pid} = RepoManager.set_customer(customer)

    conn
    |> assign(:customer_id, customer_id)
    |> assign(:customer, customer)
    |> assign(:customer_repo, repo_pid)
  end

  defp extract_customer_id(
         %Plug.Conn{
           path_info: path_info
         } = conn
       ) do
    if List.starts_with?(path_info, ["api", "customers"]) do
      case Enum.fetch(path_info, 2) do
        {:ok, customer_id_string} ->
          case Integer.parse(customer_id_string) do
            {integer, _} -> integer
            :error -> auth_failed(conn)
          end

        :error ->
          auth_failed(conn)
      end
    else
      auth_failed(conn)
    end
  end

  defp auth_failed(conn) do
    send_resp(conn, 400, "Invalid request, customer authentication failed.")
    halt(conn)
  end
end

This means that for the current process you will have the right dynamic repo set already, based on what the query is about. In my case I would likely add authorization to this plug as well to make sure only the appropriate users get access to a given customer.

Any developer can just keep using Ecto the way one usually would within the realm of a Phoenix application for example. But if you spawn a process you are bound to lose your carefully selected dynamic process, because that is set using the process dictionary for the current process. So in the end there may be patterns that are quite a bit safer. But as long as your default repo doesn't work at least you'll get an error trying to run queries to let you know that you failed to do the right thing.

Testing

I found testing to be a bit of a challenge. During testing we generally have the Ecto Sandbox to keep things nice, fast and contained. But if you are testing this sort of thing you might want to either break out of the sandbox or make darn sure you are using it in the right way.

And do you want all your tests to actually exercise the database approach you have or do you want to limit that to the most important. I spent a bunch of time fighting with the sandbox functionality, performance and trying to find a good balance.

So testing-wise it definitely feels like you'll be a bit off of the beaten path. I think you can successfully bring most of your tests back into the fold and have a fairly normal experience and fast tests. The tests that I started with created a dynamic repo for each test, setup and teardown for those took forever.

This was my setup most recently.

I do not start the dynamic repo in the my application. I start my customer repo there. And my test helper reflects that.

ExUnit.start()

Ecto.Adapters.SQL.Sandbox.mode(MyApp.Customers.CentralRepo, :manual)

I couldn't find a good way to work with the sandbox and dynamic repos. So for the tests I used:

  ..
  
config :my_app, MyApp.Repo,
  username: "postgres",
  password: "postgres",
  database: "my_app_test",
  hostname: "localhost",
  pool_size: 2

  ..

To test the repos and repo management I used this:

defmodule MyApp.RepoTest do
  use ExUnit.Case

  alias MyApp.Quality
  alias MyApp.Customers
  alias MyApp.Customers.RepoManager

  def new_customer_attrs() do
    %{
      name: "Test customer",
      slug: Ecto.UUID.generate(),
    }
  end

  @create_published_attrs %{
    title: "some title",
    text: "some text"
  }

  setup do
    :ok = Ecto.Adapters.SQL.Sandbox.checkout(MyApp.Customers.CentralRepo)
    Ecto.Adapters.SQL.Sandbox.mode(MyApp.Customers.CentralRepo, {:shared, self()})
    :ok
  end

  describe "dynamic repos" do
    test "create documentation with repo" do
      {:ok, customer} = Customers.create_customer(new_customer_attrs())
      RepoManager.set_customer(customer, true)
      {:ok, documentation} = Quality.create_documentation(@create_published_attrs)

      assert documentation.id == 1

      RepoManager.destroy_repo(customer)
    end

    test "create documentation with two different repos" do
      {:ok, customer} = Customers.create_customer(new_customer_attrs())
      RepoManager.set_customer(customer, true)

      {:ok, documentation} = Quality.create_documentation(@create_published_attrs)
      assert documentation.id == 1

      {:ok, other_customer} = Customers.create_customer(new_customer_attrs())
      RepoManager.set_customer(other_customer, true)

      {:ok, documentation} = Quality.create_documentation(@create_published_attrs)
      assert documentation.id == 1

      RepoManager.destroy_repo(customer)
      RepoManager.destroy_repo(other_customer)
    end
  end
end

These tests create repos on-demand and verify that this functionality works as expected. Creating the same ID in parallell.

I wouldn't swear to this conn_case.ex below being perfect. I know I fought quite a bit getting to a place where the RepoManager and the sandbox wouldn't be a complete shitshow. This was part of both ConnCase and DataCase to make sure other tests did not have to be written with customer management in mind.

  ..
  setup_all tags do
    :ok = Ecto.Adapters.SQL.Sandbox.checkout(MyApp.Customers.CentralRepo)

    unless tags[:async] do
      Ecto.Adapters.SQL.Sandbox.mode(
        MyApp.Customers.CentralRepo,
        {:shared, self()}
      )
    end

    {:ok, customer} = MyApp.Customers.create_customer(@customer_attrs)

    RepoManager.set_customer(customer, true)

    :ok = Ecto.Adapters.SQL.Sandbox.checkin(MyApp.Customers.CentralRepo)

    on_exit(fn ->
      RepoManager.set_customer(customer)
      RepoManager.destroy_repo(customer)
    end)

    :ok
  end
  ..

So we set up cases to use a specific dynamic repo and unless they screw around with processes they don't have to care about the current customer or current repo. I think that was more elegant than passing a customer everywhere and using some boilerplate to get every test in line. With prefixes I took another approach. Which I'll cover when I cover that. If there is interest.

Would you be interested in more Ecto-related posts? Or does your business need someone to dive deep into these odd corners of our beloved ecosystem and draw a map in this manner. I rather enjoy it and would be happy to figure something out. Let me know at lars@underjord.io.