Fetch results from BigQueryOperator in airflow

I am trying to fetch results from BigQueryOperator using airflow but I could not find a way to do it. I tried calling the next() method in the bq_cursor member (available in 1.10) however it returns None. This is how I tried to do it

import datetime
import logging

from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time()
)

def MyChequer(**kwargs):
    big_query_count = bigquery_operator.BigQueryOperator(
        task_id='my_bq_query',
        sql='select count(*) from mydataset.mytable'
    )

    big_query_count.execute(context=kwargs)

    logging.info(big_query_count)
    logging.info(big_query_count.__dict__)
    logging.info(big_query_count.bq_cursor.next())

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'project_id': 'myproject'
}

with models.DAG(
        'bigquery_results_execution',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    myoperator = python_operator.PythonOperator(
        task_id='threshold_operator',
        provide_context=True,
        python_callable=MyChequer
    )

    # Define DAG
    myoperator

Taking a look to bigquery_hook.py and bigquery_operator.py it seems to be the only available way to fetch the results.

Answer

Thanks to @kaxil and @Mike for their answers. I found the problem. There is a kind of bug (in my mind) in the BigQueryCursor. As part of the run_with_configuration, the running_job_id is being returned but never assigned to job_id which is used to pull the results in the next method. A workaround (not really elegant but good if you do not want to re-implement everything), is assign the job_id based on the running_job_id in the hook like this

big_query_count.execute(context=kwargs)
#workaround
big_query_count.bq_cursor.job_id = big_query_count.bq_cursor.running_job_id
logging.info(big_query_count.bq_cursor.next())

Leave a Reply

Your email address will not be published. Required fields are marked *