Submit Spark jobs to EMR cluster from Airflow

Introduction

Chanh Le
2 min readJun 8, 2021

I was using a large EMR version 6.x cluster (>10 m6g.16xlarge, 3 masters for HA) to handle all of Spark jobs to get data from Debezium to S3 using Airflow (docker) to submit jobs and my job is running hourly. Each hour I submit ~200 jobs.

There are 2 ways to submit spark job to EMR

If I used spark-submit I would need to add spark dependencies all to airflow and it will be heavy to maintain docker image => I prefer to use aws emr step api to submit because I could add the dependencies on S3 and it is much simpler than spark-submit.

Implementation

Because EMR cluster was created ahead so I only need 2 operators below:

EmrAddStepsOperator, EmrStepSensor

  • EmrAddStepsOperator: to run spark-submit
  • EmrStepSensor: to check the job status. But it is tricky on AWS because of the rate limit on api call.

Common issues

botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the AddJobFlowSteps operation (reached max retries: 4): Rate exceeded

So the dag has to be able to retry if the job fails or has an api rate limit issue.

I tried to use SubDagOperator to cover this issue but in Airflow 1.10.x the SubDagOpertor has been shutdown without no reason and the dag didn’t recover after that. It is known issue on Airflow 1.x. Best to avoid SubDagOpertor at all times.

To solve this airflow has provided a callback on every airflow operator/sensor on_retry_callback call clear upstream tasks status to make a retry. EmrStepSensor will fail after a number of retries so it is not going to retry forever.

Sample code

spark_steps = [
{
'Name': f'delta_{task_suffix}',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': spark_conf + delta_agrs,
}
}
]
add_step_task_id = 'add_steps_id'
step_adder = EmrAddStepsOperator(
task_id=add_step_task_id,
schedule_internal="hourly",
pool=dag_id,
job_flow_id="CLUSTER_ID",
aws_conn_id="AWS_CONNECTION",
steps=spark_steps,
on_failure_callback=None,
retries=3,
retry_delay=timedelta(minutes=5),
dag=dag
)

step_checker = EmrStepSensor(
task_id=f'watch_step_id',
dag=dag,
job_flow_id="CLUSTER_ID",
step_id="{{ task_instance.xcom_pull('" + add_step_task_id + "', key='return_value')[0] }}",
aws_conn_id="AWS_CONNECTION",
poke_interval=60 + random.randrange(5, 60, 5),
on_retry_callback=retry_upstream_task,
retries=3,
retry_delay=timedelta(minutes=5),
params={'to_retry': [add_step_task_id]},
mode="reschedule")

step_adder >> step_checker


@provide_session
def clear_task(task_id, session=None, adr=False, dag=None):
taskinstance.clear_task_instances(tis=task_id,
session=session,
activate_dag_runs=adr,
dag=dag)


def retry_upstream_task(context):
tasks = context["dag_run"].get_task_instances()
dag = context["dag"]
to_retry = context["params"].get("to_retry", [])
task_to_retry = [ti for ti in tasks if ti.task_id in to_retry]
print("task_to_retry ... ", task_to_retry)
clear_task(task_to_retry, dag=dag)

I added random poke_interval for helping the rate limit issue.

poke_interval=60 + random.randrange(5, 60, 5)

Conclusion

AWS API provide a simple way to do submit the spark job but also the rate limit.

Airflow remains my favourite ETL scheduling because of its simple easy to use design.

Have fun!

--

--

Chanh Le
Chanh Le

No responses yet