Organización de un flujo de trabajo de aprendizaje automático con Step Functions y EMR

Victoria Seoane
.
June 20, 2023
Organización de un flujo de trabajo de aprendizaje automático con Step Functions y EMR

Como hemos visto en publicaciones anteriores (Lanzar un clúster de EMR mediante funciones de Lambda para ejecutar scripts de PySpark, parte 1 y 2), EMR es una plataforma de big data muy popular, diseñada para simplificar la ejecución de marcos de Big Data como Hadoop y Spark. Aprovechar esta plataforma nos permite aumentar y reducir nuestra capacidad informática, aprovechar las instancias puntuales baratas e integrarnos sin problemas con nuestro lago de datos. Cuando combinamos EMR con Lambdas, podemos lanzar nuestros clústeres de forma programática mediante FaaS sin servidor y reaccionar ante la disponibilidad de nuevos datos o si se activan según un cronograma de cron.

En esta publicación, proponemos llevar este enfoque de usar Lambdas para activar clústeres de EMR un paso más allá y usar AWS Step Functions para organizar nuestro proceso de aprendizaje automático. AWS Step Functions es un servicio totalmente administrado que facilita la creación y ejecución de flujos de trabajo de varios pasos. Nos permite definir una serie de pasos que componen nuestro flujo de trabajo y, a continuación, Step Functions los ejecuta en orden. Estos pasos se pueden ejecutar de forma secuencial, en paralelo, esperar a que finalicen las etapas anteriores o ejecutarse de forma condicional si otros flujos de trabajo fallan. La siguiente imagen muestra un ejemplo de una función escalonada. Se trata de una serie de pasos en los que podemos ver un inicio, unos pasos, unas decisiones, unas alternativas y un final. Esto es ideal para los flujos de trabajo de la vida real, en los que es necesario tomar diferentes decisiones a medida que avanza el proceso.

Otras ventajas de este enfoque son la perfecta integración de StepFunctions con otros servicios, como Lambda, que nos permiten crear un flujo de trabajo complejo sin tener que orquestarlo manualmente, con la supervisión y el manejo de errores fáciles de usar que proporciona StepFunctions, lo que simplifica nuestra lógica de reintentos y la supervisión general del proceso.

Así que, esperando haberte convencido de probar este nuevo enfoque, ¡pongámonos manos a la obra!

Agilizando nuestra función Step

La generación de Step Functions se puede realizar de dos maneras diferentes: mediante el editor visual de Step Functions o mediante CloudFormation. Cuando empecé esta publicación, quería usar CloudFormation, pero me topé con un límite de CloudFormation (CloudFormation no nos permite crear StepFunctions con una definición siempre que la necesitemos). Por eso utilizaré el editor visual Step Functions, combinado con Amazon States Language.

Desde el Sitio web de AWS:

El lenguaje de estados de Amazon es un lenguaje estructurado basado en JSON que se utiliza para definir su máquina de estados, una colección de estados, que pueden funcionar (estados de tareas), determinar a qué estados hacer la transición al siguiente (estados de elección), detener una ejecución con un error (estados de error), etc.

Nuestro flujo de trabajo de aprendizaje automático

Definiremos un flujo de trabajo compuesto por 3 pasos: preprocesamiento de datos, entrenamiento del modelo y evaluación del modelo.

¡Un conjunto de ejemplos, codificados en Python usando PySpark, y paso a paso para lanzar nuestra función First Step!

Para ello, necesitaremos crear cinco cosas:

preprocessing_script.py

no-line-numbersimport argparse# Import necessary librariesfrom pyspark.sql import SparkSessionfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.classification import RandomForestClassifierfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorif __name__ == "__main__": # Initialize Spark session spark = SparkSession.builder.appName("PreprocessingApplication").getOrCreate() parser = argparse.ArgumentParser( prog='LinearRegression', description='Randomly generates data and fits a linear regression model using Spark MLlib.' ) parser.add_argument('--s3bucket', required=True) args = parser.parse_args() s3_path = args.s3bucket # Part 1: Generate Fake Data and Save to S3 # Create a DataFrame with fake data fake_data = spark.createDataFrame([ (1, 0.5, 1.2), (0, 1.0, 3.5), (1, 2.0, 0.8), # Add more rows as needed ], ["label", "feature1", "feature2"]) # Perform necessary data transformations # (e.g., feature engineering, handling missing values, etc.) # ... # Prepare the data for model training # Assuming the features are in columns "feature1" and "feature2" assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features") data = assembler.transform(fake_data) # Save the processed data to S3 data.write.parquet("s3a://" + s3_path + "/path_to_processed_data.parquet") # Stop the Spark session spark.stop()

training_script.by

no-line-numbersimport argparse# Import necessary librariesfrom pyspark.sql import SparkSessionfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.classification import RandomForestClassifierfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorif __name__ == "__main__": # Initialize Spark session spark = SparkSession.builder.appName("TrainingApplication").getOrCreate() parser = argparse.ArgumentParser( prog='LinearRegression', description='Randomly generates data and fits a linear regression model using Spark MLlib.' ) parser.add_argument('--s3bucket', required=True) args = parser.parse_args() s3_path = args.s3bucket # Part 2: Read Data from S3, Model Training, and Save Model to S3 # Read the processed data from S3 processed_data = spark.read.parquet("s3a://" + s3_path + "/path_to_processed_data.parquet") # Split the data into training and test sets train_data, test_data = processed_data.randomSplit([0.7, 0.3], seed=42) # Initialize the classification model (e.g., RandomForestClassifier) classifier = RandomForestClassifier(labelCol="label", featuresCol="features") # Train the model on the training data model = classifier.fit(train_data) # Save the trained model to S3 model.write().overwrite().save("s3a://" + s3_path + "/path_to_saved_model") # Stop the Spark session spark.stop()


evaluation_script.py

no-line-numbersimport argparse# Import necessary librariesfrom pyspark.sql import SparkSessionfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModelfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorif __name__ == "__main__": # Initialize Spark session spark = SparkSession.builder.appName("EvaluationApplication").getOrCreate() parser = argparse.ArgumentParser( prog='LinearRegression', description='Randomly generates data and fits a linear regression model using Spark MLlib.' ) parser.add_argument('--s3bucket', required=True) args = parser.parse_args() s3_path = args.s3bucket # Part 3: Load Model from S3 and Perform Evaluation # Load the saved model from S3 loaded_model = RandomForestClassificationModel.load("s3a://" + s3_path + "/path_to_saved_model") test_data = spark.read.parquet("s3a://" + s3_path + "/test_data.parquet") # Make predictions on the test data predictions = loaded_model.transform(test_data) # Evaluate the model's performance evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy") accuracy = evaluator.evaluate(predictions) # Print the evaluation result print("Accuracy:", accuracy) # Stop the Spark session spark.stop()

no-line-numbers{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "ec2.amazonaws.com" }, "Action": "sts:AssumeRole" } ]}


 — Y los siguientes permisos:

no-line-numbers{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Resource": "*", "Action": [ "cloudwatch:*", "ec2:Describe*", "s3:*" ] } ]}

no-line-numbers{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "states.amazonaws.com" }, "Action": "sts:AssumeRole" } ]}

no-line-numbers{ "Version": "2012-10-17", "Statement": [ { "Action": [ "states:Create*", "states:Describe*", "states:StartExecution", "elasticmapreduce:RunJobFlow", "elasticmapreduce:*", "states:List*", "iam:PassRole", “sns:*” ], "Resource": "*", "Effect": "Allow" } ]}

Una vez que tengamos todos estos elementos listos, tendremos que reemplazarlos en nuestra definición de StepFunction, que comparto a continuación.

En un nivel superior, definimos tres pasos para cada etapa de nuestro flujo de trabajo: creación de clústeres, ejecución de código y terminación de clústeres. También definimos un paso «general» que avisará a través de SNS si se produce un error en alguno de los pasos de ejecución del código. Algo interesante de destacar es que StepFunctions tiene un tipo de paso predefinido (CreateCluster, AddStep y TerminateCluster) para cada uno de los pasos que necesitamos, lo que reduce significativamente la cantidad de código que necesitamos escribir para nuestra orquestación. :

no-line-numbers{ "Comment": "EMR Data Processing Workflow", "StartAt": "DataProcessing", "States": { "DataProcessing": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "DataProcessingCluster", "ReleaseLabel": "emr-6.4.0", "LogUri": "s3:///logs/", "Instances": { "InstanceFleets": [ { "Name": "Master", "InstanceFleetType": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m5.xlarge" } ] }, { "Name": "Core", "InstanceFleetType": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m5.xlarge" } ] } ], "KeepJobFlowAliveWhenNoSteps": true, "TerminationProtected": false }, "Applications": [ { "Name": "Spark" } ], "ServiceRole": "arn:aws:iam:::role/EMR_DefaultRole", "JobFlowRole": "arn:aws:iam:::instance-profile/", "VisibleToAllUsers": true }, "ResultPath": "$.DataProcessingClusterResult", "Next": "DataProcessingStep" }, "DataProcessingStep": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.DataProcessingClusterResult.ClusterId", "Step": { "Name": "DataProcessingStep", "ActionOnFailure": "TERMINATE_CLUSTER", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", "s3:///preprocessing_script.py", "--s3bucket", "" ] } } }, "ResultPath": "$.DataProcessingStepResult", "Next": "Terminate_Pre_Processing_Cluster", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "HandleErrors" } ] }, "Terminate_Pre_Processing_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.DataProcessingClusterResult.ClusterId" }, "Next": "Training" }, "Training": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "TrainingCluster", "ReleaseLabel": "emr-6.4.0", "LogUri": "s3:///logs/", "Instances": { "InstanceFleets": [ { "Name": "Master", "InstanceFleetType": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m5.xlarge" } ] }, { "Name": "Core", "InstanceFleetType": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m5.xlarge" } ] } ], "KeepJobFlowAliveWhenNoSteps": true, "TerminationProtected": false }, "Applications": [ { "Name": "Spark" } ], "ServiceRole": "arn:aws:iam:::role/EMR_DefaultRole", "JobFlowRole": "arn:aws:iam:::instance-profile/", "VisibleToAllUsers": true }, "ResultPath": "$.TrainingClusterResult", "Next": "TrainingStep" }, "TrainingStep": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.TrainingClusterResult.ClusterId", "Step": { "Name": "TrainingStep", "ActionOnFailure": "TERMINATE_CLUSTER", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", "s3:///training_script.py", "--s3bucket", "" ] } } }, "ResultPath": "$.TrainingStepResult", "Next": "Terminate_Training_Cluster", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "HandleErrors" } ] }, "Terminate_Training_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.TrainingClusterResult.ClusterId" }, "Next": "Evaluation" }, "Evaluation": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "EvaluationCluster", "ReleaseLabel": "emr-6.4.0", "LogUri": "s3:///logs/", "Instances": { "InstanceFleets": [ { "Name": "Master", "InstanceFleetType": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m5.xlarge" } ] }, { "Name": "Core", "InstanceFleetType": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m5.xlarge" } ] } ], "KeepJobFlowAliveWhenNoSteps": true, "TerminationProtected": false }, "Applications": [ { "Name": "Spark" } ], "ServiceRole": "arn:aws:iam:::role/EMR_DefaultRole", "JobFlowRole": "arn:aws:iam:::instance-profile/", "VisibleToAllUsers": true }, "ResultPath": "$.EvaluationClusterResult", "Next": "EvaluationStep" }, "EvaluationStep": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.EvaluationClusterResult.ClusterId", "Step": { "Name": "EvaluationStep", "ActionOnFailure": "TERMINATE_CLUSTER", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", "s3:///evaluation_script.py", "--s3bucket", "" ] } } }, "Next": "Terminate_Evaluation_Cluster", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "HandleErrors" } ] }, "Terminate_Evaluation_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.EvaluationClusterResult.ClusterId" }, "End": true }, "HandleErrors": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "arn:aws:sns:us-east-1::YOUR_SNS_TOPIC", "Message": "An error occurred - The last EMR cluster was not terminated to allow further analysis of the error" }, "End": true } }}

Algunos aspectos interesantes que podemos ver, por ejemplo:

no-line-numbers"$.EvaluationClusterResult.ClusterId"

hace referencia al paso anterior:

no-line-numbers"ResultPath": "$.EvaluationClusterResult",

lo que nos permite crear el clúster en un paso, obtener su identificador y luego usarlo en otro paso.

Si quisiéramos hacer cosas más complejas, la idea es la misma: tenemos que hacer referencia a ella en la ruta de resultados o en la ruta de salida. Más detalles sobre esto aquí

Volviendo a la creación de nuestra pila, una vez que hayamos reemplazado los valores, podemos crear una nueva función escalonada. Puedes usar esto eslabón o navegue hasta la consola StepFunctions, seleccione «Crear máquina de estados» y seleccione la opción «Escriba su flujo de trabajo en código».

Ambas opciones abrirán la pantalla que vemos a continuación, que incluye una aplicación básica de «Hola mundo». Reemplazarás el código en la definición y actualizarás el gráfico.

Una vez que hayas actualizado el gráfico, deberías ver lo siguiente:

Los siguientes pasos son asignar un nombre a la función Step y asignarle un rol (lo ideal sería el que habíamos creado anteriormente) y crearla con el botón «Crear máquina de estados». Espera unos minutos para que se complete, ¡y listo!

Encontrarás un botón para «Iniciar la ejecución», que activará tus pasos. Cuando comience a ejecutarse, el diagrama de flujo se actualizará con los diferentes estados: azul para los pasos en curso, verde para los pasos realizados correctamente, naranja para los errores detectados y rojo para los errores inesperados.

Es importante destacar que los clústeres de EMR estarán disponibles a través de la consola de EMR habitual, lo que simplifica la supervisión del proceso para las personas que están familiarizadas con ella. El hecho de que se haya activado mediante StepFunctions es completamente transparente para los usuarios finales.

Volviendo a nuestra función Step y al manejo de errores, en la imagen de abajo podemos ver dos errores diferentes. El paso «TrainingStep» detectó un error (que forma parte del flujo de trabajo esperado), pero el paso «handleErrors» falló. Esto nos permite encontrar fácilmente qué falló y dónde.

En esta otra imagen, podemos ver que el flujo de trabajo falló, pero la gestión de errores se realizó correctamente.

Es importante destacar esto porque los errores detectados forman parte de nuestro flujo de trabajo y nos hemos preparado para ellos (por ejemplo, con alertas). Los errores inesperados pueden dejar nuestro trabajo incompleto y requerir una intervención manual.

Conclusiones y próximos pasos

Bueno, gracias por llegar hasta aquí y ¡espero que estés entusiasmado por probarlo!

Podemos ver que poner en marcha un conjunto de clústeres de EMR es bastante fácil con Step Functions, y nos permite aprovechar la potencia de EMR de una manera sencilla y directa. Como recomendación final, me gustaría destacar algunas de las mejores prácticas para usar estos dos servicios de forma conjunta:

Con todo esto en mente, ¡adelante con tus propios experimentos! Y no dudes en escribirnos si tienes cualquier problema o pregunta. ¡Estaremos encantados de ayudarte!

¡Feliz programación! 🙂

Manténgase a la vanguardia de las últimas tendencias y conocimientos sobre big data, aprendizaje automático e inteligencia artificial. ¡No se lo pierda y suscríbase a nuestro boletín de noticias!

Como hemos visto en publicaciones anteriores (Lanzar un clúster de EMR mediante funciones de Lambda para ejecutar scripts de PySpark, parte 1 y 2), EMR es una plataforma de big data muy popular, diseñada para simplificar la ejecución de marcos de Big Data como Hadoop y Spark. Aprovechar esta plataforma nos permite aumentar y reducir nuestra capacidad informática, aprovechar las instancias puntuales baratas e integrarnos sin problemas con nuestro lago de datos. Cuando combinamos EMR con Lambdas, podemos lanzar nuestros clústeres de forma programática mediante FaaS sin servidor y reaccionar ante la disponibilidad de nuevos datos o si se activan según un cronograma de cron.

En esta publicación, proponemos llevar este enfoque de usar Lambdas para activar clústeres de EMR un paso más allá y usar AWS Step Functions para organizar nuestro proceso de aprendizaje automático. AWS Step Functions es un servicio totalmente administrado que facilita la creación y ejecución de flujos de trabajo de varios pasos. Nos permite definir una serie de pasos que componen nuestro flujo de trabajo y, a continuación, Step Functions los ejecuta en orden. Estos pasos se pueden ejecutar de forma secuencial, en paralelo, esperar a que finalicen las etapas anteriores o ejecutarse de forma condicional si otros flujos de trabajo fallan. La siguiente imagen muestra un ejemplo de una función escalonada. Se trata de una serie de pasos en los que podemos ver un inicio, unos pasos, unas decisiones, unas alternativas y un final. Esto es ideal para los flujos de trabajo de la vida real, en los que es necesario tomar diferentes decisiones a medida que avanza el proceso.

Otras ventajas de este enfoque son la perfecta integración de StepFunctions con otros servicios, como Lambda, que nos permiten crear un flujo de trabajo complejo sin tener que orquestarlo manualmente, con la supervisión y el manejo de errores fáciles de usar que proporciona StepFunctions, lo que simplifica nuestra lógica de reintentos y la supervisión general del proceso.

Así que, esperando haberte convencido de probar este nuevo enfoque, ¡pongámonos manos a la obra!

Agilizando nuestra función Step

La generación de Step Functions se puede realizar de dos maneras diferentes: mediante el editor visual de Step Functions o mediante CloudFormation. Cuando empecé esta publicación, quería usar CloudFormation, pero me topé con un límite de CloudFormation (CloudFormation no nos permite crear StepFunctions con una definición siempre que la necesitemos). Por eso utilizaré el editor visual Step Functions, combinado con Amazon States Language.

Desde el Sitio web de AWS:

El lenguaje de estados de Amazon es un lenguaje estructurado basado en JSON que se utiliza para definir su máquina de estados, una colección de estados, que pueden funcionar (estados de tareas), determinar a qué estados hacer la transición al siguiente (estados de elección), detener una ejecución con un error (estados de error), etc.

Nuestro flujo de trabajo de aprendizaje automático

Definiremos un flujo de trabajo compuesto por 3 pasos: preprocesamiento de datos, entrenamiento del modelo y evaluación del modelo.

¡Un conjunto de ejemplos, codificados en Python usando PySpark, y paso a paso para lanzar nuestra función First Step!

Para ello, necesitaremos crear cinco cosas:

preprocessing_script.py

no-line-numbersimport argparse# Import necessary librariesfrom pyspark.sql import SparkSessionfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.classification import RandomForestClassifierfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorif __name__ == "__main__": # Initialize Spark session spark = SparkSession.builder.appName("PreprocessingApplication").getOrCreate() parser = argparse.ArgumentParser( prog='LinearRegression', description='Randomly generates data and fits a linear regression model using Spark MLlib.' ) parser.add_argument('--s3bucket', required=True) args = parser.parse_args() s3_path = args.s3bucket # Part 1: Generate Fake Data and Save to S3 # Create a DataFrame with fake data fake_data = spark.createDataFrame([ (1, 0.5, 1.2), (0, 1.0, 3.5), (1, 2.0, 0.8), # Add more rows as needed ], ["label", "feature1", "feature2"]) # Perform necessary data transformations # (e.g., feature engineering, handling missing values, etc.) # ... # Prepare the data for model training # Assuming the features are in columns "feature1" and "feature2" assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features") data = assembler.transform(fake_data) # Save the processed data to S3 data.write.parquet("s3a://" + s3_path + "/path_to_processed_data.parquet") # Stop the Spark session spark.stop()

training_script.by

no-line-numbersimport argparse# Import necessary librariesfrom pyspark.sql import SparkSessionfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.classification import RandomForestClassifierfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorif __name__ == "__main__": # Initialize Spark session spark = SparkSession.builder.appName("TrainingApplication").getOrCreate() parser = argparse.ArgumentParser( prog='LinearRegression', description='Randomly generates data and fits a linear regression model using Spark MLlib.' ) parser.add_argument('--s3bucket', required=True) args = parser.parse_args() s3_path = args.s3bucket # Part 2: Read Data from S3, Model Training, and Save Model to S3 # Read the processed data from S3 processed_data = spark.read.parquet("s3a://" + s3_path + "/path_to_processed_data.parquet") # Split the data into training and test sets train_data, test_data = processed_data.randomSplit([0.7, 0.3], seed=42) # Initialize the classification model (e.g., RandomForestClassifier) classifier = RandomForestClassifier(labelCol="label", featuresCol="features") # Train the model on the training data model = classifier.fit(train_data) # Save the trained model to S3 model.write().overwrite().save("s3a://" + s3_path + "/path_to_saved_model") # Stop the Spark session spark.stop()


evaluation_script.py

no-line-numbersimport argparse# Import necessary librariesfrom pyspark.sql import SparkSessionfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModelfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorif __name__ == "__main__": # Initialize Spark session spark = SparkSession.builder.appName("EvaluationApplication").getOrCreate() parser = argparse.ArgumentParser( prog='LinearRegression', description='Randomly generates data and fits a linear regression model using Spark MLlib.' ) parser.add_argument('--s3bucket', required=True) args = parser.parse_args() s3_path = args.s3bucket # Part 3: Load Model from S3 and Perform Evaluation # Load the saved model from S3 loaded_model = RandomForestClassificationModel.load("s3a://" + s3_path + "/path_to_saved_model") test_data = spark.read.parquet("s3a://" + s3_path + "/test_data.parquet") # Make predictions on the test data predictions = loaded_model.transform(test_data) # Evaluate the model's performance evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy") accuracy = evaluator.evaluate(predictions) # Print the evaluation result print("Accuracy:", accuracy) # Stop the Spark session spark.stop()

no-line-numbers{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "ec2.amazonaws.com" }, "Action": "sts:AssumeRole" } ]}


 — Y los siguientes permisos:

no-line-numbers{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Resource": "*", "Action": [ "cloudwatch:*", "ec2:Describe*", "s3:*" ] } ]}

no-line-numbers{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "states.amazonaws.com" }, "Action": "sts:AssumeRole" } ]}

no-line-numbers{ "Version": "2012-10-17", "Statement": [ { "Action": [ "states:Create*", "states:Describe*", "states:StartExecution", "elasticmapreduce:RunJobFlow", "elasticmapreduce:*", "states:List*", "iam:PassRole", “sns:*” ], "Resource": "*", "Effect": "Allow" } ]}

Una vez que tengamos todos estos elementos listos, tendremos que reemplazarlos en nuestra definición de StepFunction, que comparto a continuación.

En un nivel superior, definimos tres pasos para cada etapa de nuestro flujo de trabajo: creación de clústeres, ejecución de código y terminación de clústeres. También definimos un paso «general» que avisará a través de SNS si se produce un error en alguno de los pasos de ejecución del código. Algo interesante de destacar es que StepFunctions tiene un tipo de paso predefinido (CreateCluster, AddStep y TerminateCluster) para cada uno de los pasos que necesitamos, lo que reduce significativamente la cantidad de código que necesitamos escribir para nuestra orquestación. :

no-line-numbers{ "Comment": "EMR Data Processing Workflow", "StartAt": "DataProcessing", "States": { "DataProcessing": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "DataProcessingCluster", "ReleaseLabel": "emr-6.4.0", "LogUri": "s3:///logs/", "Instances": { "InstanceFleets": [ { "Name": "Master", "InstanceFleetType": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m5.xlarge" } ] }, { "Name": "Core", "InstanceFleetType": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m5.xlarge" } ] } ], "KeepJobFlowAliveWhenNoSteps": true, "TerminationProtected": false }, "Applications": [ { "Name": "Spark" } ], "ServiceRole": "arn:aws:iam:::role/EMR_DefaultRole", "JobFlowRole": "arn:aws:iam:::instance-profile/", "VisibleToAllUsers": true }, "ResultPath": "$.DataProcessingClusterResult", "Next": "DataProcessingStep" }, "DataProcessingStep": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.DataProcessingClusterResult.ClusterId", "Step": { "Name": "DataProcessingStep", "ActionOnFailure": "TERMINATE_CLUSTER", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", "s3:///preprocessing_script.py", "--s3bucket", "" ] } } }, "ResultPath": "$.DataProcessingStepResult", "Next": "Terminate_Pre_Processing_Cluster", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "HandleErrors" } ] }, "Terminate_Pre_Processing_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.DataProcessingClusterResult.ClusterId" }, "Next": "Training" }, "Training": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "TrainingCluster", "ReleaseLabel": "emr-6.4.0", "LogUri": "s3:///logs/", "Instances": { "InstanceFleets": [ { "Name": "Master", "InstanceFleetType": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m5.xlarge" } ] }, { "Name": "Core", "InstanceFleetType": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m5.xlarge" } ] } ], "KeepJobFlowAliveWhenNoSteps": true, "TerminationProtected": false }, "Applications": [ { "Name": "Spark" } ], "ServiceRole": "arn:aws:iam:::role/EMR_DefaultRole", "JobFlowRole": "arn:aws:iam:::instance-profile/", "VisibleToAllUsers": true }, "ResultPath": "$.TrainingClusterResult", "Next": "TrainingStep" }, "TrainingStep": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.TrainingClusterResult.ClusterId", "Step": { "Name": "TrainingStep", "ActionOnFailure": "TERMINATE_CLUSTER", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", "s3:///training_script.py", "--s3bucket", "" ] } } }, "ResultPath": "$.TrainingStepResult", "Next": "Terminate_Training_Cluster", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "HandleErrors" } ] }, "Terminate_Training_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.TrainingClusterResult.ClusterId" }, "Next": "Evaluation" }, "Evaluation": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "EvaluationCluster", "ReleaseLabel": "emr-6.4.0", "LogUri": "s3:///logs/", "Instances": { "InstanceFleets": [ { "Name": "Master", "InstanceFleetType": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m5.xlarge" } ] }, { "Name": "Core", "InstanceFleetType": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m5.xlarge" } ] } ], "KeepJobFlowAliveWhenNoSteps": true, "TerminationProtected": false }, "Applications": [ { "Name": "Spark" } ], "ServiceRole": "arn:aws:iam:::role/EMR_DefaultRole", "JobFlowRole": "arn:aws:iam:::instance-profile/", "VisibleToAllUsers": true }, "ResultPath": "$.EvaluationClusterResult", "Next": "EvaluationStep" }, "EvaluationStep": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.EvaluationClusterResult.ClusterId", "Step": { "Name": "EvaluationStep", "ActionOnFailure": "TERMINATE_CLUSTER", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", "s3:///evaluation_script.py", "--s3bucket", "" ] } } }, "Next": "Terminate_Evaluation_Cluster", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "HandleErrors" } ] }, "Terminate_Evaluation_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.EvaluationClusterResult.ClusterId" }, "End": true }, "HandleErrors": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "arn:aws:sns:us-east-1::YOUR_SNS_TOPIC", "Message": "An error occurred - The last EMR cluster was not terminated to allow further analysis of the error" }, "End": true } }}

Algunos aspectos interesantes que podemos ver, por ejemplo:

no-line-numbers"$.EvaluationClusterResult.ClusterId"

hace referencia al paso anterior:

no-line-numbers"ResultPath": "$.EvaluationClusterResult",

lo que nos permite crear el clúster en un paso, obtener su identificador y luego usarlo en otro paso.

Si quisiéramos hacer cosas más complejas, la idea es la misma: tenemos que hacer referencia a ella en la ruta de resultados o en la ruta de salida. Más detalles sobre esto aquí

Volviendo a la creación de nuestra pila, una vez que hayamos reemplazado los valores, podemos crear una nueva función escalonada. Puedes usar esto eslabón o navegue hasta la consola StepFunctions, seleccione «Crear máquina de estados» y seleccione la opción «Escriba su flujo de trabajo en código».

Ambas opciones abrirán la pantalla que vemos a continuación, que incluye una aplicación básica de «Hola mundo». Reemplazarás el código en la definición y actualizarás el gráfico.

Una vez que hayas actualizado el gráfico, deberías ver lo siguiente:

Los siguientes pasos son asignar un nombre a la función Step y asignarle un rol (lo ideal sería el que habíamos creado anteriormente) y crearla con el botón «Crear máquina de estados». Espera unos minutos para que se complete, ¡y listo!

Encontrarás un botón para «Iniciar la ejecución», que activará tus pasos. Cuando comience a ejecutarse, el diagrama de flujo se actualizará con los diferentes estados: azul para los pasos en curso, verde para los pasos realizados correctamente, naranja para los errores detectados y rojo para los errores inesperados.

Es importante destacar que los clústeres de EMR estarán disponibles a través de la consola de EMR habitual, lo que simplifica la supervisión del proceso para las personas que están familiarizadas con ella. El hecho de que se haya activado mediante StepFunctions es completamente transparente para los usuarios finales.

Volviendo a nuestra función Step y al manejo de errores, en la imagen de abajo podemos ver dos errores diferentes. El paso «TrainingStep» detectó un error (que forma parte del flujo de trabajo esperado), pero el paso «handleErrors» falló. Esto nos permite encontrar fácilmente qué falló y dónde.

En esta otra imagen, podemos ver que el flujo de trabajo falló, pero la gestión de errores se realizó correctamente.

Es importante destacar esto porque los errores detectados forman parte de nuestro flujo de trabajo y nos hemos preparado para ellos (por ejemplo, con alertas). Los errores inesperados pueden dejar nuestro trabajo incompleto y requerir una intervención manual.

Conclusiones y próximos pasos

Bueno, gracias por llegar hasta aquí y ¡espero que estés entusiasmado por probarlo!

Podemos ver que poner en marcha un conjunto de clústeres de EMR es bastante fácil con Step Functions, y nos permite aprovechar la potencia de EMR de una manera sencilla y directa. Como recomendación final, me gustaría destacar algunas de las mejores prácticas para usar estos dos servicios de forma conjunta:

Con todo esto en mente, ¡adelante con tus propios experimentos! Y no dudes en escribirnos si tienes cualquier problema o pregunta. ¡Estaremos encantados de ayudarte!

¡Feliz programación! 🙂

Manténgase a la vanguardia de las últimas tendencias y conocimientos sobre big data, aprendizaje automático e inteligencia artificial. ¡No se lo pierda y suscríbase a nuestro boletín de noticias!