Submit Spark jobs to EMR cluster from Airflow
I was using a large EMR version 6.x cluster (>10 m6g.16xlarge, 3 masters for HA) to handle all of Spark jobs to get data from Debezium to S3 using Airflow (docker) to submit jobs and my job is running hourly. Each hour I submit ~200 jobs.
There are 2 ways to submit spark job to EMR
- spark-submit
- aws emr step api
If I used spark-submit I would need to add spark dependencies all to airflow and it will be heavy to maintain docker image => I prefer to use aws emr step api to submit because I could add the dependencies on S3 and it is much simpler than spark-submit.
Implementation
Because EMR cluster was created ahead so I only need 2 operators below:
EmrAddStepsOperator, EmrStepSensor
- EmrAddStepsOperator: to run spark-submit
- EmrStepSensor: to check the job status. But it is tricky on AWS because of the rate limit on api call.
Common issues
botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the AddJobFlowSteps operation (reached max retries: 4): Rate exceeded
So the dag has to be able to retry if the job fails or has an api rate limit issue.
I tried to use SubDagOperator to cover this issue but in Airflow 1.10.x the SubDagOpertor has been shutdown without no reason and the dag didn’t recover after that. It is known issue on Airflow 1.x. Best to avoid SubDagOpertor at all times.
To solve this airflow has provided a callback on every airflow operator/sensor on_retry_callback call clear upstream tasks status to make a retry. EmrStepSensor will fail after a number of retries so it is not going to retry forever.
Sample code
spark_steps = [
{
'Name': f'delta_{task_suffix}',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': spark_conf + delta_agrs,
}
}
]
add_step_task_id = 'add_steps_id'
step_adder = EmrAddStepsOperator(
task_id=add_step_task_id,
schedule_internal="hourly",
pool=dag_id,
job_flow_id="CLUSTER_ID",
aws_conn_id="AWS_CONNECTION",
steps=spark_steps,
on_failure_callback=None,
retries=3,
retry_delay=timedelta(minutes=5),
dag=dag
)
step_checker = EmrStepSensor(
task_id=f'watch_step_id',
dag=dag,
job_flow_id="CLUSTER_ID",
step_id="{{ task_instance.xcom_pull('" + add_step_task_id + "', key='return_value')[0] }}",
aws_conn_id="AWS_CONNECTION",
poke_interval=60 + random.randrange(5, 60, 5),
on_retry_callback=retry_upstream_task,
retries=3,
retry_delay=timedelta(minutes=5),
params={'to_retry': [add_step_task_id]},
mode="reschedule")
step_adder >> step_checker
@provide_session
def clear_task(task_id, session=None, adr=False, dag=None):
taskinstance.clear_task_instances(tis=task_id,
session=session,
activate_dag_runs=adr,
dag=dag)
def retry_upstream_task(context):
tasks = context["dag_run"].get_task_instances()
dag = context["dag"]
to_retry = context["params"].get("to_retry", [])
task_to_retry = [ti for ti in tasks if ti.task_id in to_retry]
print("task_to_retry ... ", task_to_retry)
clear_task(task_to_retry, dag=dag)
I added random poke_interval for helping the rate limit issue.
poke_interval=60 + random.randrange(5, 60, 5)
Conclusion
AWS API provide a simple way to do submit the spark job but also the rate limit.
Airflow remains my favourite ETL scheduling because of its simple easy to use design.