How To Repair Spark Xcom 2
Wondering how to share data between tasks? What are XCOMs in Apache Airflow? Well you are at the right place. In this tutorial, you are going to learn everything yous need about XComs in Airflow. What are they, how they work, how can yous define them, how to get them and more than. If you lot followed my form "Apache Airflow: The Easily-On Guide", Aiflow XCom should not audio unfamiliar to you. At the end of this tutorial, you lot volition have a solid knowledge of XComs and you will be ready to employ them in your DAGs. Permit's get started!
As usual, to better explain why you demand a functionality, information technology'due south ever proficient to start with a apply instance. The Airflow XCom is non an piece of cake concept, so let me illustrate why information technology might exist useful for you lot. Let'south imagine you lot have the following data pipeline:
In a nutshell, this information pipeline trains dissimilar motorcar learning models based on a dataset and the concluding task selects the model having the highest accuracy. The question is,
How tin can we become the accuracy of each model in the task Choosing Model to choose the best one?
One solution could be to shop the accuracies in a database and fetch them back in the task Choosing Model with a SQL request. That's perfectly feasible. But, information technology'due south there any native easier machinery in Airflow assuasive you to do that?
Aye! XComs!
What is an Airflow XCom ?
XCom stands for "cross-communication" and allows to exchange messages or small corporeality of data betwixt tasks. Y'all can recollect of an XCom every bit a lilliputian object with the following fields:
that is stored IN the metadata database of Airflow. From left to right,
- The key is the identifier of your XCom. No need to be unique and is used to get dorsum the xcom from a given task.
- The value is … the value of your XCom. What you want to share. Keep in mind that your value must be serializable in JSON or pickable. Notice that serializing with pickle is disabled by default to avoid RCE exploits/security issues. If you lot want to learn more about the differences between JSON/Pickle click here.
- The timestamp is the data at which the XCom was created.
- The execution date! This is important! That execution date corresponds to the execution engagement of the DagRun having generated the XCom. That'southward how Airflow avert fetching an XCom coming from another DAGRun. You lot don't know what I'm talking about? Cheque my video about how scheduling works in Airflow.
- The task id of the task where the XCom was created.
- The dag id of the dag where the XCom was created.
To access your XComs in Airflow, go to Admin -> XComs.
Smashing! Now you lot know what a XCom is, let's create your first Airflow XCom
How to utilize XCom in Airflow
Time to practise! To let you lot follow the tutorial, here is the data pipeline we use:
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from random import uniform from datetime import datetime default_args = { 'start_date': datetime(2020, 1, 1) } def _training_model(): accuracy = uniform(0.1, 10.0) print(f'model\'s accuracy: {accuracy}') def _choose_best_model(): print('choose best model') with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag: downloading_data = BashOperator( task_id='downloading_data', bash_command='sleep 3' ) training_model_task = [ PythonOperator( task_id=f'training_model_{task}', python_callable=_training_model ) for task in ['A', 'B', 'C']] choose_model = PythonOperator( task_id='choose_model', python_callable=_choose_best_model ) downloading_data >> training_model_task >> choose_model
Add this code into a file xcom_dag.py in dags/ and you should obtain the following DAG:
The information pipeline is pretty simple. We have 5 tasks. downloading_data is a BashOperator executing the bash control which waits for 3 seconds. Then, we have iii tasks, training_model_[A,B,C] dynamically generated in a list comprehension. Each task implements the PythonOperator to execute the function _training_model. That functions generates randomly an accuracy for each models A, B, C. Finally, nosotros desire to cull the all-time model based on the generated accuracies in the task choose_model.
Our goal is to create 1 XCom for each model and fetch back the XComs from the task choose_model to cull the best.
How? 2 steps:
- Create an XCom for each training_model job
- Pull the XComs from choose_model
Permit's do it!
How to push an Airflow XCom
In this Airflow XCom example, nosotros are going to discover how to button an XCom containing the accurateness of each model A, B and C.
In that location are multiple ways of creating a XCom but let's begin the most basic one. Whenever y'all want to create a XCom from a task, the easiest fashion to do it is past returning a value. In the case of the PythonOperator, use the return keyword forth with the value in the python callable office in club to create automatically a XCom.
def _training_model(ti): accuracy = uniform(0.1, ten.0) print(f'model\'s accurateness: {accurateness}') render accuracy
By adding return accuracy, if you execute the DAG, yous will obtain the following XComs:
Well washed! With just one line of code, you've already pushed your first XCom!
What's of import hither is the key,return_value. Past default, when a XCom is automatically created by returning a value, Airflow assigns the keyreturn_value. In add-on, you lot tin see that each XCom was well created from different tasks ( based on the task ids ) but got something weird hither. Nosotros don't return whatsoever value from the task downloading_data but we an associated XCom.
Where does information technology come from?
The do_xcom_push argument
By default, all operators returning a value, create a XCom . There is one argument that ALL OPERATORS SHARE ( BashOperator, PythonOperator etc. ) which is do_xcom_push set to True. Allow's change that argument for the BashOperator to False.
downloading_data = BashOperator( task_id='downloading_data', bash_command='sleep three', do_xcom_push=False )
Plough off the toggle of the DAG. Articulate the task instances (In Browse -> Chore Instances). Delete all DAGRuns (Browse -> DagRuns) as well as the XComs (Scan -> XComs). Now, if y'all plough on the toggle of your information pipeline again, yous obtain the following XComs:
Every bit you tin can meet, this time, we don't go the extra XCom that was generated by downloading_data. Every bit an exercise, endeavour to avoid generating XComs from the PythonOperator with the same statement. At the stop, yous should accept no XComs at all.
Pushing a XCom with xcom_push
The simplest mode to create a XCom is past returning a value from an operator. Nosotros know that, and we know that we can alter that behaviour with do_xcom_push. By the fashion, keep in mind that all operators do not return XComs. Therefore. it depends of the implementation of the operator yous use.
Ok, is there another way to create a XCom? A way that allows more flexibility? Aye there is! With the method xcom_push. Let's employ it!
Start affair first, the method xcom_push is only accessible from a job case object. With the PythonOperator we tin admission it by passing the parameter ti to the python callable role. In Airflow 1.x.x, we had to set up the argument provide_context only in Airflow 2.0, that's not the instance anymore. Now, y'all just take to specify the keyword argument as a parameter for the python callable function.
def _training_model(ti): accuracy = uniform(0.1, ten.0) print(f'model\'s accuracy: {accuracy}') eturn accuracy
Notice the argument ti. Once we tin can admission the task instance object, we can call xcom_push.
xcom_push expects two parameters:
- A key to place the XCom
- A value to the XCom that is serializable in JSON or picklable, stored in the metadata database of Airflow.
At the finish, to push the accuracy with xcom_push you do,
def _training_model(ti): accurateness = uniform(0.one, 10.0) print(f'model\'due south accurateness: {accuracy}') ti.xcom_push(key='model_accuracy', value=accuracy)
If you trigger the DAG over again, you obtain 3 XComs. However, they all have the aforementioned primal,model_accuracy as specified in xcom_push and non return_value equally earlier. By the way, when y'all execute twice your DAG on the aforementioned execution date, the XComs created during the showtime DAGRun are overwritten by the ones created in the second DAGRun.
That's it! That'due south all you need to know about xcom_push.
The useless argument
Actually, there is i additional parameter I didn't talk well-nigh which is execution_date . By specifying a date in the hereafter, that XCom won't be visible until the respective DAGRun is triggered. To be honnest, I never found any solid use example for this.
Pulling a XCom with xcom_pull
Alright, now we know how to push an XCom from a task, what nearly pulling it from another task? We are trying to exchange data between tasks, are we? Let'southward become!
In order to pull a XCom from a task, you lot accept to use the xcom_pull method. Like xcom_push, this method is available through a chore instance object. xcom_pull expects 2 arguments:
- task_ids, only XComs from tasks matching ids will exist pulled
- fundamental, only XComs with matching central will be returned
Two things to go on in heed here. Get-go, information technology looks similar we tin specify multiple task ids, therefore we tin pull XComs from multiple tasks at in one case. 2nd, we have to give a central to pull the right XComs. Let'southward pull our first XCom.
def _choose_best_model(ti): fetched_accuracy = ti.xcom_pull(key='model_accuracy', task_ids=['training_model_A']) print(f'choose best model: {fetched_accuracy}')
In the lawmaking above, nosotros pull the XCom with the cardinal model_accuracy that was created from the task training_model_A. Trigger your DAG, click on the task choose_model and log. You lot obtain the output:
Nosotros have successfully pulled the accuracy stored in a XCom that was created by the task training_model_A from the task choosing_model! ( Detect that the value will be different for you).
Pulling XComs from multiple tasks
We know how to push and pull a XCom between 2 tasks. At this point, nosotros are able to share data between tasks in Airflow! Great! Merely that's not all. Indeed, we are able to pull only one XCom from choose_model, whereas we want to pull all XComs from training_model_A, B and C to choose which one is the best.
How can we do this?
Simple! You just demand to specify the task ids in xcom_pull.
def _choose_best_model(ti): fetched_accuracies = ti.xcom_pull(key='model_accuracy', task_ids=['training_model_A', 'training_model_B', 'training_model_C']) print(f'choose all-time model: {fetched_accuracies}')
If you trigger you DAG, you obtain the 3 different accuracies and now you lot are able to choose which model is performing the all-time.
Congratulations! Now y'all are able to commutation data between tasks in your information pipelines!
Await…
You lot want to acquire more than? 😱
Ok then!
The bashoperator with XComs
I know, I know. Then far, in the Airflow XCom example, we've seen how to share data between tasks using the PythonOperator, which is the almost popular operator in Airflow. Keen, but. There is another very popular operator which is, the BashOperator. I can't count the number of times I received the questions, "Hey Marc, how the bashoperator xcom_pull and xcom_push method piece of work? how tin we share data with the BashOperator, I don't accept access to the task case object!" Well, permit'southward answer those questions!
The bashoperator xcom_push
You already know that by default, an XCom is pushed when yous use the BashOperator. We've seen that with the task downloading_data. This controlled by the parameter do_xcom_push which is common to all operators. Still, there was one issue. The XCom was empty. So, how can we create an XCom having a value with the BashOperator?
By using templating! Expect, what? You lot don't know what templating is? Well, cheque my other tutorial right there before moving on. THIS IS SUPER IMPORTANT!
At present yous know, what templating is, let's move on! Here is what y'all should practice to button a XCom from the BashOperator:
downloading_data = BashOperator( task_id='downloading_data', bash_command='repeat "How-do-you-do, I am a value!"', do_xcom_push=True )
Go along in listen that, just the final line written to stdout by your command, will exist pushed as a XCom. By the way, you lot don't have to specify do_xcom_push hither, as information technology is fix to Truthful by default.
Pushing a XCom with the BashOperator washed, what about pulling a XCOM?
The bashoperator xcom_pull
Pulling a XCom from the BashOperator is a lilliputian bit more than circuitous. This fourth dimension, every bit you tin't execute a python role to access the task instance object, yous are going to use the Jinja Template Engine. Indeed, since the statement bash_command is templated, you lot can render values at runtime in information technology. Let'southward leverage this to pull a XCom.
fetching_data = BashOperator( task_id='fetching_data', bash_command="echo 'XCom fetched: {{ ti.xcom_pull(task_ids=[\'downloading_data\']) }}'", do_xcom_push=False )
Here, the magic happens with the ii pairs of curly brackets {{}}. That's how we point to the Jinja Template Engine that a value here should be evaluated at runtime and in that case, xcom_pull volition exist replaced past the XCom pushed by the task downloading_data. Notice that I didn't specify a central here. Why? Because the cardinal of the XCom retuned by downloading_data is return_value. This is the default behaviour. Aforementioned for xcom_pull. By default, the central of the XCom pulled is return_value. That's why, I didn't specify it here.
Add this job just after downloading_data and set the dependency accordingly (downloading_data >> fetching_data) and you should obtain:
Go on in listen that y'all might non exist able to do that with all operators. At the finish, y'all take to sympathise how your operator works, to know if you lot tin apply XComs with information technology and if so, how. For that, the code/documentation is your friend 😍
XCom limitations
BIG WARNING HERE!
DO Non SHARE PANDA DATAFRAMES THROUGH XCOMS OR Whatever Data THAT Tin can BE Large!
I insist, do NOT do that! Why?
Airflow is Non a processing framework. Information technology is not Spark, neither Flink. Airflow is an orchestrator, and it the best orchestrator. There is no optimisations to process big data in Airflow neither a way to distribute information technology (perhaps with ane executor, only this is another topic). If yous try to exchange big data between your tasks, you lot will terminate up with a retentiveness overflow mistake! Oh, and do you know the xcom limit size in Airflow?
Estimate what, it depends on the database you employ!
- SQLite: 2 Go
- Postgres: 1 Become
- MySQL: 64 KB
Aye, 64 Kilobytes for MySQL! Again, use XComs just for sharing small amount of data.
Ane last point, don't forget that XComs create implicit dependencies between your tasks that are not visible from the UI.
In Practice
Decision
That's it about Airflow XCom. I hope y'all really enjoyed what you lot've learned. In that location are other topics almost XComs that are coming shortly ( I know, I didn't talk about XCom backends and XComArgs 😉) . If you lot desire to learn more about Airflow, go check my course The Consummate Hands-On Introduction to Apache Airflow right here. Or if you already know Airflow and want to go mode much further, enrol in my 12 hours course here.
Take a great day! 🙂
Interested by learning more than? Stay tuned and get special promotions!
How To Repair Spark Xcom 2,
Source: https://marclamberti.com/blog/airflow-xcom/
Posted by: johnsonhoullich.blogspot.com
0 Response to "How To Repair Spark Xcom 2"
Post a Comment