Analytics Space : Python scripts conversion

All python scripts that will be used in Airflow must be stored in Git repository https://github.com/infopandora/Analitycs
Under the path: Analytics/Python/Airflow/dags.

Example of a DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from airflow.utils.dates import days_ago
from datetime import timedelta, datetime, timezone
from google.cloud import secretmanager
import requests
import pandas as pd
from google.cloud import bigquery
from pyquery import PyQuery
import onetimepass as otp
import time
import zipfile
import io
import logging
from string import Template
from schema_list import schemas

#for reloading data trigger DAG manually with these parameters and desired dates
#
#{
#  "date_from": "2025-06-01T00:00:00Z",
#  "date_to": "2025-06-02T00:00:00Z"
#}

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': 'kallyone@sg.ink',
    'email_on_retry': 'kallyone@sg.ink',
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

tags = ['processing', 'bets']

def get_secret(secret_id, version_id="latest"):
    client = secretmanager.SecretManagerServiceClient()
    name = f"projects/g1-site/secrets/{secret_id}/versions/{version_id}"
    response = client.access_secret_version(request={"name": name})
    return response.payload.data.decode("UTF-8")

def report_retry(session, report_url, headers, max_retries=10, sleep_seconds=30):
    last_exception = None
    for i in range(1, max_retries + 1):
        try:
            logging.info(f"Attempt {i}: {report_url}")
            response = session.get(report_url, headers=headers, timeout=60)
            return response
        except Exception as e:
            logging.warning(f"Attempt {i} failed: {str(e)}")
            last_exception = e
            time.sleep(sleep_seconds)
    raise Exception(f"All retries failed: {str(last_exception)}")

def bquery_upload(client, table_id, write_disposition, final_table, schema_name):
    job_config = bigquery.LoadJobConfig(
        schema=schemas[schema_name],
        write_disposition=write_disposition,
    )
    job = client.load_table_from_dataframe(final_table, table_id, job_config=job_config)
    return job.result()

def admin_login(session, report_config, BrandName, email, password, auth):
    entry_url = report_config['AdminURL'].iloc[0]
    login_url = report_config['AdminLoginURL'].iloc[0]
    my_secret = auth
    logged_in = False
    log_counter = 0

    while not logged_in:
        pp = session.get(entry_url)
        pq = PyQuery(pp.content)
        ss = pq.find('meta[name=csrf-token]').attr('content')

        headers = {
            'scheme': 'https',
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36',
            'upgrade-insecure-requests': '1'
        }

        my_token = otp.get_totp(my_secret)
        payload = {
            u'utf8': u'%E2%9C%93',
            'authenticity_token': ss,
            'admin_user[email]': email,
            'admin_user[password]': password,
            'admin_user[otp_attempt]': my_token,
            'commit': 'Login'
        }

        lo = session.post(login_url, headers=headers, data=payload)
        lo = PyQuery(lo.content)

        if lo('a:Contains("stat")'):
            logged_in = True
            logging.info(f"[{BrandName}] Successfully logged in.")
        else:
            logging.warning(f"[{BrandName}] Login attempt {log_counter + 1} failed. Retrying...")
            time.sleep(10)
            log_counter += 1

    return session

def report_extract(report_name, client, report_config, BrandName, email, password, auth, date_from=None, date_to=None):
    current_update_date = pd.to_datetime(date_to) if date_to else datetime.now(timezone.utc)
    session = requests.Session()
    session = admin_login(session, report_config, BrandName, email, password, auth)

    report_url = report_config['AdminReportURL'].iloc[0]
    last_update_date = pd.to_datetime(date_from) if date_from else report_config['LastUpdateDate'].iloc[0]
    export_report_name = report_config['ExportName'].iloc[0]
    dateregex = '^(?P<mon>(January|February|March|April|May|June|July|August|September|October|November|December))\s+(?P<dd>\d{2}),\s+(?P<y>\d{4})\s+(?P<hh>\d{2}):(?P<mm>\d{2})*$'

    headers = {
        'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
        'sec-ch-ua': '" Not;A Brand";v="99", "Google Chrome";v="91", "Chromium";v="91"',
        'scheme': 'https',
        'sec-ch-ua-mobile': '?0',
        'sec-fetch-dest': 'document',
        'sec-fetch-mode': 'navigate',
        'sec-fetch-site': 'none',
        'sec-fetch-user': '?1',
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36',
        'upgrade-insecure-requests': '1'
    }

    last_update_date -= timedelta(hours=2)
    hh = 4 if BrandName == "Hiddenjack" else 2

    while last_update_date < current_update_date:
        period_end = last_update_date + timedelta(hours=hh)
        scoped_url = report_url + (
            '''&q%%5Bcreated_at_gteq%%5D=%s&q%%5Bcreated_at_lteq%%5D=%s&utf8=%%E2%%9C%%93&scope=all''' % (
                last_update_date.strftime("%Y-%m-%d+%H%%3A%M"),
                period_end.strftime("%Y-%m-%d+%H%%3A%M")
            ))

        logging.info(scoped_url)
        main_report = report_retry(session, scoped_url, headers)

        report_status = 'Launched'
        while report_status != 'Finished':
            time.sleep(30)
            exports = session.get(f'https://{report_config["domain"]}/backend/exports', headers=headers)
            ps = PyQuery(exports.content).find('div.index_as_table')

            if ps:
                exports_table = pd.read_html(exports.content, encoding='utf8')[0]
                exports_table["Created At"] = pd.to_datetime(exports_table["Created At"])
                export_id = exports_table[(exports_table['Type Name'] == export_report_name) & (
                    exports_table['Admin User'].str.contains('stat')) & (
                    exports_table["Created At"] > pd.Timestamp(current_update_date).to_datetime64())].Id.max()

                report_url = f'https://{report_config["domain"]}/backend/exports/{str(export_id)}/download'
                report_status = exports_table[(exports_table['Id'] == export_id)].Status.iloc[0]

                logging.info(report_status)
            else:
                logging.info(ps)

        main_report = report_retry(session, report_url, headers)
        bytes_data = io.BytesIO(main_report.content)
        zf = zipfile.ZipFile(bytes_data, "r")
        rawData = pd.DataFrame()

        for fileinfo in zf.infolist():
            new_df = pd.read_csv(io.StringIO(zf.read(fileinfo).decode('utf-8')), decimal=',')
            rawData = pd.concat([rawData, new_df])

        rawData.columns = rawData.columns.str.replace('-', ' ').str.title().str.replace(' ', '')

        date_cols = rawData.iloc[0].str.contains(dateregex, na=False)
        if date_cols.any():
            idxs = date_cols[date_cols].index
            rawData[idxs] = pd.to_datetime(rawData[idxs].stack(), format='%B %d, %Y %H:%M').unstack().astype(str)

        exclude_cols = ['Id', 'User', 'Payment', 'UserIdInCasino', 'PartnerId']
        rawData[rawData.columns.difference(exclude_cols)] = rawData[rawData.columns.difference(exclude_cols)].astype(str)

        rawData['BrandName'] = BrandName
        rawData['UploadedAt'] = pd.Timestamp.utcnow()
        rawData = rawData.rename(columns={"RollbackBetAmount": "RollbackType"})

        table_id = "g1-site.development." + report_config['DestinationTable'].iloc[0]
        write_disposition = report_config['WriteDisposition'].iloc[0]

        result = bquery_upload(client, table_id, write_disposition, rawData, "Bets")

        logging.info(f"Bets {BrandName} size: {len(rawData)}")
        log_message = Template('Table $table successfully uploaded $time with next result: "$result" ')
        logging.info(log_message.safe_substitute(table=table_id, result=result, time=current_update_date))

        last_update_date = period_end

    client.query(
        f"""
        UPDATE g1-site.development.report_upload_config
        SET LastUpdateDate = TIMESTAMP("{current_update_date.strftime('%Y-%m-%d %H:%M:%S')} UTC")
        WHERE ReportName='{report_name}'
        """)

def run_etl_process(**kwargs):
    domain = kwargs['domain']
    params = kwargs['params']

    date_from = params.get('date_from')
    date_to = params.get('date_to')

    email = get_secret(f"{domain}_EMAIL")
    password = get_secret(f"{domain}_PASS")
    auth = get_secret(f"{domain}_AUTH")

    client = bigquery.Client()

    report_config = pd.read_gbq(
        f"SELECT * FROM `g1-site.development.report_upload_config` WHERE BrandName='{domain}'",
        dialect='standard'
    )
    report_config["auth"] = auth
    report_config["domain"] = f"{domain.lower()}.casino-backend.com"

    report_name = report_config['ReportName'].iloc[0]
    report_extract(report_name, client, report_config, domain, email, password, auth, date_from, date_to)

def create_domain_tasks(dag):
    domain_list = ['Staycasino', 'Slotozen.com', 'Wantedwin', 'RichardCasino', 'Crusino', 'Hiddenjack']
    tasks = []
    for domain in domain_list:
        task = PythonOperator(
            task_id=f"run_etl_for_{domain.lower().replace('.', '_')}",
            python_callable=run_etl_process,
            op_kwargs={'domain': domain},
            dag=dag,
        )
        tasks.append(task)
    return tasks

with DAG(
    dag_id='bets_process',
    default_args=default_args,
    schedule='@daily',
    start_date=days_ago(1),
    catchup=False,
    tags=tags,
    params={
        "date_from": None,
        "date_to": None
    }
) as dag:
    domain_tasks = create_domain_tasks(dag)

    run_cleaning_procedure = BigQueryExecuteQueryOperator(
        sql="""
            CALL `g1-site.development.bills_process`(
                @brand,
                @date_from,
                @date_to
            )
        """,
        use_legacy_sql=False,
        parameters={
            "brand": None,
            "date_from": "{{ params.date_from }}",
            "date_to": "{{ params.date_to }}"
        },
    )

    for task in domain_tasks:
        task >> run_cleaning_procedure

What need to be changed

Tha main changes are mostly new code:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from google.cloud import secretmanager

default_args = {
    'owner': 'airflow',
    'email_on_retry': ['kallyone@sg.ink'], # can be used few emails or a email group
    'email_on_failure': ['kallyone@sg.ink'], # can be used few emails or a email group
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

tags = ['ingestion'] # custom tags for dags organizing in Airflow UI

# secrets will be stored in Google Secret Manager and this function shuld be used for extracting them, no hardcoded values
def get_secret(secret_id, version_id="latest"):
    client = secretmanager.SecretManagerServiceClient()
    name = f"projects/g1-site/secrets/{secret_id}/versions/{version_id}"
    response = client.access_secret_version(request={"name": name})
    return response.payload.data.decode("UTF-8")

with DAG(
    dag_id='daily_report_ingestion',
    default_args=default_args,
    schedule='@daily', # best practice is to use CRON expression
    start_date=datetime.datetime(2024, 1, 1),
    catchup=False,
    tags=tags,
    params={
        'date_from': None,  # default: NULL
        'date_to': None,    # default: NULL
    }, # this parameter is responsible for realoadnig data
) as dag:
  
    ingestion_task = PythonOperator(
        task_id='run_ingestion',
        python_callable=main_ingestion,
        provide_context=True,
    )

    bq_sp_call = BigQueryExecuteQueryOperator(
        sql="""
            CALL `g1-site.development.bills_process`(
                @brand,
                @date_from,
                @date_to
            )
        """,
        use_legacy_sql=False,
        parameters={
            "brand": None,
            "date_from": "{{ params.date_from }}",
            "date_to": "{{ params.date_to }}"
        },
    )

    ingestion_task >> bq_sp_call

Start_date tells Airflow from which date to schedule runs and scheule tells the interval.

Catchup by default is True, it allows historical runs, i.e. when start_date is set to 2025-01-01 and today is 2025-07-01 Airflow will schedule and execute runs for each day until 2025-07-01, assuming schedule is like '0 1 * * *' (every day at 01:00 AM) or @daily so cathup should be set to False (except when we need historical data loading).

Logical dependecies between tasks are set using ‘>>’, in the example ingestion_taks should succeed and only than bq_sp_call will be executed. Also tasks can be put in paralel execution using square brackets

[ingestion_task_1, ingestion_task_2] >> bq_sp_call.

Changes regarding old code:

Rename main to smth like main_ingestion and this function will be called in DAG;

Secrets will be pulled as

email = get_secret(f"{domain}_EMAIL")
password = get_secret(f"{domain}_PASS")
auth = get_secret(f"{domain}_AUTH")

BQ connection defined as

hook = BigQueryHook(gcp_conn_id='gcp_conn')
client = hook.get_client()

Service account JSON will be stored inside the Airflow with the name gcp_conn (in that example);

load_date must be added as rawData['UploadedAt'] = pd.Timestamp.utcnow(), staging tables will be partitioned by that field. Best place right after adding BrandName column.