GitHub

Machine Learning(ML) Pipeline

using Airflow & Docker

Detail Image

Python & Pandas to develop model .
Apache Airflow to streamline ml workflow.
Implemented Docker to containerize the application.

Airlfow code 🚀

                    
                        from datetime import datetime, timedelta
                        from airflow import DAG

                        from airflow.operators.python_operator import PythonOperator
                        from airflow.operators.bash import BashOperator
                        from airflow.providers.docker.operators.docker import DockerOperator

                        import pandas as pd
                        import pickle


                        # default_args for DAG
                        default_args = {
                            'owner': 'dhiraj47',
                            'depends_on_past': False,                  
                            'email_on_failure': False,
                            'email_on_retry': False,
                            # 'retries': 1,                            
                            # 'retry_delay': timedelta(minutes=1),              
                        }


                        # function to train model
                        def train_recommendation_model(**kwargs):
                            
                            # Loading .csv datasets from github link
                            movies = pd.read_csv('https://raw.githubusercontent.com/47dhiraj/Movie-Recommendation-System-Django-and-ML/main/dataset/movies.csv')
                            ratings = pd.read_csv('https://raw.githubusercontent.com/47dhiraj/Movie-Recommendation-System-Django-and-ML/main/dataset/ratings.csv')

                            # Data cleaning and manipulation
                            ratings = pd.merge(movies, ratings).drop(['genres','timestamp'], axis=1) 
                            user_ratings = ratings.pivot_table(index=['userId'], columns=['title'], values='rating')
                            user_ratings = user_ratings.dropna(thresh=10, axis=1).fillna(0)

                            # using pearson coorelation algorithm
                            item_similarity_df = user_ratings.corr(method='pearson')

                            print('Recommendation model trained successfully !!')

                            ## Pushing data between task
                            item_similarity_df_pkl_file = '/tmp/item_similarity_df.pkl'
                            item_similarity_df.to_pickle(item_similarity_df_pkl_file)
                            kwargs['task_instance'].xcom_push(key='item_similarity_df_pkl_file', value=item_similarity_df_pkl_file)



                        def test_recommendation_model(**kwargs):

                            item_similarity_df_pkl_file = kwargs['task_instance'].xcom_pull(task_ids='recommendation_model_training', key='item_similarity_df_pkl_file')

                            if item_similarity_df_pkl_file is None:
                                print("Error: item_similarity_df_pkl_file is None")
                                return
                            else:
                                print("Received item_similarity_df_pkl_file:", item_similarity_df_pkl_file)

                                ## Loading the DataFrame 
                                try:
                                    item_similarity_df = pd.read_pickle(item_similarity_df_pkl_file)
                                except Exception as e:
                                    print("Error loading DataFrame from .pkl file:", str(e))
                                    return

                                def get_similar_movies(movie_name,user_rating):
                                    similar_score = item_similarity_df[movie_name]*(user_rating-2.5)
                                    similar_score = similar_score.sort_values(ascending=False)
                                    return similar_score


                                scifi_lover = [("The Martian", 5)]

                                similar_movies = pd.DataFrame()

                                for movie,rating in scifi_lover:
                                    similar_movies = similar_movies._append(get_similar_movies(movie,rating), ignore_index=True)

                                all_recommend = similar_movies.sum().sort_values(ascending=False)

                                print('Recommendation model tested successfully !!')

                                ## Pushing data between task
                                all_recommend_dict = all_recommend.to_dict()
                                kwargs['task_instance'].xcom_push(key='all_recommend', value=all_recommend_dict)



                        def tune_recommendation_model(**kwargs):

                            all_recommend_dict = kwargs['task_instance'].xcom_pull(task_ids='recommendation_model_testing', key='all_recommend')

                            if all_recommend_dict is None:
                                print("Error: all_recommend_dict is None")
                                return
                            else:
                                print("Received all_recommend_dict:", all_recommend_dict)

                                all_recommend = pd.Series(all_recommend_dict)

                                check_name = ['The Martian']

                                # Checking & eliminating, if the recommended movies is already seen ??
                                def check_seen(movie, seen_movies):
                                    for item in seen_movies:
                                        if item == movie:
                                            return True
                                    return False
                                

                                print('\nList of similar movies like',check_name, '\n')

                                i = 0
                                for movie, score in all_recommend.items():
                                    if not check_seen(movie, check_name):
                                        print(movie,'\t\t\t\t\t',  score)
                                        # print(movie)

                                    i = i + 1
                                    if i >= 36 + len(check_name):
                                        break




                        # Creating/Declaring a DAG object
                        ml_dag = DAG(
                            dag_id='ml_pipeline_dag',                         
                            default_args=default_args,        
                            description='ML pipeline for Recommendation System',
                            start_date=datetime(2024, 2, 26),
                            catchup=False,
                        )


                        # Task to train the recommendation model)
                        model_train_task = PythonOperator(
                            task_id='recommendation_model_training',     
                            python_callable=train_recommendation_model,         
                            dag=ml_dag,
                        )


                        # Task to test the recommendation model)
                        model_test_task = PythonOperator(
                            task_id='recommendation_model_testing',     
                            python_callable=test_recommendation_model,
                            provide_context=True,     
                            dag=ml_dag,
                        )


                        # Task too tune the recommendation model for better performance
                        model_tune_task = PythonOperator(
                            task_id='recommendation_model_tuning',     
                            python_callable=tune_recommendation_model,
                            provide_context=True,     
                            dag=ml_dag,
                        )


                        # Setting Task dependencies
                        model_train_task >> model_test_task >> model_tune_task