Airflow Run Tasks At Different Times In The Same Dag?
Solution 1:
Turns out there are two variables that can be set that will solve my problem very easily.
concurrency
and max_active_runs
In the below example, you can have 4 dags running and each dag can have 4 tasks running at the same time. Other combinations are also possible.
dag = DAG(
dag_id='sample_dag',
default_args=default_args,
schedule_interval='@daily',
# this will allow up to 16 tasks to be run at the same time
concurrency=16,
# this will allow up to 4 dags to be run at the same time
max_active_runs=4,
)
Solution 2:
I can think of 3 possible solutions to your woes (will add more alternatives when they come to mind)
Set
start_date
on individualtask
s within theDAG
(apart from astart_date
ofDAG
itself) as told here. However I would never favour this approach because it would be like a step back onto the same time-based crons thatAirflow
tries to replace.Use
pool
s to segregatetask
s by runtime / priority. Here's an idea (you might need to rework as per your requirements): Put all tinytask
s intiny_task_pool
and all big ones inbig_task_pool
. Let thetiny_task_pool
have significantly higher number ofslot
s thanbig_task_pool
. That would make starvation of your tiny-tasks much less likely. You can go creative with even more levels ofpool
s.Even if your
task
s have no real dependencies between them, it shouldn't hurt much to deliberately introduce some dependencies so that all (or most) big tasks are madedownstream
of tiny ones (and hence change structure of yourDAG
). That would dub into a shortest-job-first kind of approach. You can also explorepriority_weight
/priority_rule
to gain even more fine-grained control.
All the above alternatives assume that task
s' lengths (duration of execution) are known ahead of time. In real-world, that might not be true; or even if it is, it might gradually change overtime. For that, I'd suggest you to tweak your dag-definition script to factor-in the average (or median) runtime of your task
s over last 'n' runs to decide their priority.
- For
start_date
method, just supply a laterstart_date
(actually same date, later time) totask
s that ran longer in previous runs - For
pool
s method, movetask
s around differentpool
s based on their previous running durations - For task-dependency method, make longer running
task
sdownstream
. This might sound difficult but you can visualize it like this: Create 3DummyOperator
s and link them up (one after another). Now you have to fill-in all small tasks between the first 2DummyOperator
s and the big ones between the next two.
Solution 3:
This is likely because you have fewer execution slots than you have slow jobs. The scheduler doesn't particularly care what order it's running the tasks in, because you've said you don't care either.
If it really matters to you, these should probably be broken up into different dags or you should declare dependencies that you want the cheaper tasks to finish first. There are any number of ways to express what you want, you just have to figure out what that is.
Post a Comment for "Airflow Run Tasks At Different Times In The Same Dag?"