Starting Quantum job from multiple k8s nodes
Quantum is an amazing elixir package for starting scheduled jobs in a genserver. I would recommend checking it out here, it has been a great help in many projects.
The issue
There is one problem though, and that is when using multiple pods of a service running quantum, executing the jobs multiple times as well. This is something you would probably want to prevent. The global mode has been deprecated in version 3.0.0, with the reason being:
(…) the implementation wasn’t great and was causing us a lot of trouble with unexpected behavior.
The solution
So how would you fix this? Well the way we have decided to do it is to use libcluster as well. So start by adding Quantum and Libcluster to you mix.exs
:
{:quantum, "~> 3.0"},
{:libcluster, "~> 3.0"}
Setting up libcluster
Now what libcluster allows you to do is manage your Kubernetes nodes from Erlang. Essentially what we are going to do is register a headless template which will help k8s to find your pods by using DNS lookups. A config might look like this:
# Clustering config
if is_binary(System.get_env("KUBERNETES_SERVICE_HOST")) do
config :your_app,
libcluster: [
kubernetes: [
strategy: Elixir.Cluster.Strategy.Kubernetes.DNS,
config: [
service: "your-app-headless",
application_name: "your-app",
polling_interval: 10_000
]
]
]
end
where we can see we use a headless service again to discover all our pods through our DNS server. This will help us when we register our supervisor. Create a new supervisor module called SchedulerSupervisor and add the following to it:
@moduledoc """
Responsible for starting the scheduled job on a single node
"""
use Supervisor
def start_link(quantum, opts) do
case :global.whereis_name(__MODULE__) do
:undefined ->
with {:error, {:already_started, pid}} <- do_start_link(quantum, opts) do
Process.link(pid)
{:ok, pid}
end
pid ->
Process.link(pid)
{:ok, pid}
end
end
defp do_start_link(quantum, opts) do
Supervisor.start_link(__MODULE__, {quantum, opts}, name: {:global, __MODULE__})
end
def init(state) do
:global.re_register_name(__MODULE__, self(), &resolve_global_conflict/3)
Quantum.Supervisor.init(state)
end
defp resolve_global_conflict(_name, pid_to_keep, pid_to_kill) do
Supervisor.stop(pid_to_kill)
pid_to_keep
end
this will ensure our supervisor is started only once across all zones. When we init we use the re_register_name
erlang function (Documentation) and we pass in a function to resolve when it is already registered. This function is our resolve_global_conflict
function which kills the duplicate supervisor and keeps the current one alive, and passes the process id back so we can use and link it.
Integrating Quantum
The last thing we need to do is add Quantum into the mix
(get it?) by creating a new scheduler module. This could look something like this:
defmodule Scheduler do
@moduledoc false
// the supervisor module is the module we created above
use Quantum, otp_app: :your_app, supervisor_module: SchedulerSupervisor
end
as you can see here we can define a supervisor_module option in our macro which will be used by the start_link
function for the Quantum job, usually this is a default quantum supervisor. This can also be passed in the application.ex
of your app but this is a much neater solution in my opinion.
Adding it to the application on start
And let’s not forget to add it to the application.ex
under your children:
@impl true
def start(_type, _args) do
children = supervisor_libcluster() ++ [Scheduler]
opts = [strategy: :one_for_one]
Supervisor.start_link(children, opts)
end
defp supervisor_libcluster do
config = Application.get_env(:your_app, :libcluster)
if config do
[{Cluster.Supervisor, [config, [name: ClusterSupervisor]]}]
else
[]
end
end
And there we have it! Now the job only runs once even tough we have multiple pods running!