Lanzamiento de un clúster de EMR mediante funciones de Lambda para ejecutar scripts de PySpark, parte 1: Los scripts de Spark

Maximiliano Palay and Tiziana Romani
.
26 de abril de 2023
Lanzamiento de un clúster de EMR mediante funciones de Lambda para ejecutar scripts de PySpark, parte 1: Los scripts de Spark

En un mundo de dispositivos interconectados, los datos se generan a un ritmo sin precedentes. Para aprovechar estos datos, los ingenieros suelen emplear técnicas de aprendizaje automático (ML) que les permiten recopilar información valiosa e información útil. A medida que aumenta el volumen de datos, surge la necesidad de ejecutar los procesos de procesamiento de macrodatos a gran escala; su portátil no es suficiente.

Para ayudar a solucionar este problema, podemos usar marcos distribuidos como Spark, que nos permiten usar la computación distribuida para analizar nuestros datos. En esta serie de dos partes, darás un pequeño paso para ejecutar tus cargas de trabajo de datos a escala. Presentamos un tutorial práctico en el que aprenderá a usar los servicios de AWS para ejecutar su procesamiento en la nube. Para ello, lanzaremos un clúster de EMR con funciones de Lambda y ejecutaremos un script de PySpark de muestra. Necesitará una cuenta de AWS para seguir esta serie, ya que utilizaremos sus servicios.

En esta primera parte, analizaremos las tecnologías y la arquitectura de alto nivel y analizaremos un problema de muestra. En la parte 2, profundizaremos en los detalles, incluida la arquitectura y la configuración de la infraestructura necesaria.

Arquitectura de alto nivel

Antes de profundizar en nuestro problema de muestra, revisemos brevemente la arquitectura y los servicios de AWS involucrados. Usaremos Spark, un marco de computación distribuida de código abierto para procesar grandes volúmenes de datos. Spark se puede ejecutar en EMR y utilizaremos su API de Python, PySpark. AWS Elastic MapReduce (EMR) es un servicio administrado que simplifica la implementación y la administración de los clústeres de Hadoop y Spark. Simple Storage Service (S3) es el servicio de almacenamiento de objetos de Amazon, que utilizaremos para almacenar nuestro script de PySpark.

Para crear el clúster de EMR, utilizaremos AWS Lambda, un servicio de procesamiento sin servidor que nos permite ejecutar código sin necesidad de aprovisionar o administrar servidores. El uso de Lambda para crear el clúster nos permite definir la infraestructura como código, de modo que podemos definir las especificaciones y configuraciones del clúster una vez y ejecutarlas varias veces. De este modo, podemos lanzar varios clústeres fácilmente con la misma función. Lambda también nos permite definir activadores automáticos, que podemos usar para aprovisionar clústeres automáticamente en función de un conjunto de acciones.

En nuestro ejemplo de caso práctico, crearemos un script de Python con la biblioteca de Python boto3 de AWS. Este script se cargará en AWS Lambda y hará que el clúster de EMR ejecute nuestro script PySpark de muestra. Este último se almacenará en S3, desde donde el clúster de EMR lo recuperará en tiempo de ejecución.

El problema

Hemos creado un script de Python muy simple para entrenar un modelo de aprendizaje automático con Spark. Generaremos datos aleatorios, calcularemos una combinación lineal de los mismos, añadiremos ruido a los resultados y entrenaremos un modelo para que pueda aprender cómo combinamos los datos. El modelo debería poder filtrar el ruido y calcular con bastante precisión los coeficientes que utilizamos para calcular la combinación lineal de los datos de entrada.


Empecemos a programar

En esta sección, lo guiaremos a través del código, explicando los bloques de código y comentando las funciones utilizadas. Al final, tendrás el script completo disponible.

1import argparsefrom pyspark.sql import SparkSessionimport pyspark.sql.types as Timport pyspark.sql.functions as Ffrom pyspark.ml.regression import LinearRegressionfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.mllib.evaluation import RegressionMetrics


Usaremos argumentos para nuestro script, por lo que necesitamos importar argparse. Vamos a importar SparkSession para controlar la sesión de Spark en ejecución. Usaremos funciones definidas por el usuario, por lo que necesitamos pyspark.sql.types y pyspark.sql.functions. En este ejemplo, ajustaremos un modelo de regresión lineal del MLLib de Spark. Las funciones que necesita este modelo se formatean con VectorAssembler. Por último, el modelo se evaluará con el sistema RegressionMetrics incorporado.

9NUM_SAMPLES = 1000  # number of samples we'll generateif __name__ == "__main__":    # arguments parsing    parser = argparse.ArgumentParser(        prog='LinearRegression',        description='Randomly generates data and fits a linear regression model using Spark MLlib.'    )    parser.add_argument('--x1', required=True, type=float)    parser.add_argument('--x2', required=True, type=float)    parser.add_argument('--max_iter', required=True, type=int)    args = parser.parse_args()


A continuación, defina el número de filas de datos sintéticos que generaremos. En las líneas 14 a 22 configuramos el analizador de argumentos para poder pasar los parámetros al script. Esta es una gran característica que podemos usar a nuestro favor al realizar pruebas y aumenta la modularidad de nuestro código. Le indicamos al analizador los nombres de nuestros argumentos, si son necesarios para ejecutar el programa y su tipo de datos. Tenga en cuenta que los argumentos se pasan como cadenas en la función Lambda y que el analizador los convierte en el tipo especificado.

24# We're obtaining the spark session.spark = SparkSession\    .builder\    .appName("SparkMLExample")\    .getOrCreate()

En la línea 25-28 obtenemos la Spark Session.

30# create a dataframe num_sample = [[i] for i in range(NUM_SAMPLES)] df = spark.createDataFrame(num_sample) df = df.repartition(4)

Entre las líneas 31 y 32, estamos inicializando un nuevo marco de datos con números que van desde cero hasta NUM_SAMPLES: 1. Aunque esto no es obligatorio directamente, es una forma sencilla de inicializar el marco de datos para nuestro propósito. Debido a la forma en que generamos el marco de datos, tiene una sola partición. Si ese es el caso, todo el procesamiento lo realizará una máquina que pertenezca al clúster. Vamos a reparticionar el marco de datos en cuatro particiones, de esta forma, el procesamiento se distribuirá en todo el clúster. Tenga en cuenta que esto es correcto para nuestro ejemplo actual, pero realizar esta operación en grandes cantidades de datos puede resultar en una reorganización costosa.

34# generate two columns of random valuesdf = df.withColumn("rand", F.rand())df = df.withColumn("rand2", F.rand())print("displaying generated dataframe...")df.show(10)

En las líneas 35 a 36 generamos dos columnas con valores aleatorios mediante la función rand de Spark. Esto genera números aleatorios distribuidos uniformemente en el rango [0,1). Los usaremos como características de nuestro modelo

41# calculate the linear combination of the two generated columns, using the # args x1, x2    df = df.withColumn( "raw_result", F.udf(lambda x: args.x1 * x[0] + args.x2 * x[1], T.FloatType()) (F.array(F.col('rand'), F.col('rand2')))) # generate some random noise to add to the linear combination df = df.withColumn("noise", F.rand()) # shift the noise from [0,1) to [-0.5,0.5) df = df.withColumn("noise", F.udf( lambda x: x - 0.5, T.FloatType())(F.col('noise')))# add the noise to the resultdf = df.withColumn("noisy_result", F.udf(lambda x, y: x + y, T.FloatType())(F.col('raw_result'), F.col('noise')))print("displaying generated dataframe with labels...")df.show(10)

En las líneas 43 a 46 calculamos una combinación lineal de las dos columnas generadas, utilizando los multiplicadores pasados por los argumentos. Luego generamos otra columna con muestras aleatorias llamada ruido en la línea 49. Como el ruido se genera en el rango [0,1), restamos 0.5 para cambiar el rango a [-0.5, 0.5). Luego, este ruido se suma a la combinación lineal que creamos en las líneas 56-58. El propósito es agregar ruido a los datos, porque si los datos fueran perfectos, solo se necesitarían un par de muestras para calcular los coeficientes.

63# create the LinearRegression model to be fitlr = LinearRegression(featuresCol="features", labelCol="label",                      solver="l-bfgs", maxIter=args.max_iter)

En la línea 64, creamos una instancia de un modelo de regresión lineal del MLlib de Spark para que se ajuste. Como el modelo está equipado con un marco de datos de entrada, le indicamos qué columnas buscar cuando se le da un marco de datos, en nuestro caso features (inputs) y label (output objetivo).

67# we need to format the data for the LinearRegression Modelvector_assembler = VectorAssembler().setInputCols(  ['rand', 'rand2']).setOutputCol('features') df = vector_assembler.transform(df).select(     "features", F.col("noisy_result").alias("label"))

Los modelos de MLlib requieren que la entrada de datos esté formateada de una manera específica, y el VectorAssembler nos ayuda a lograrlo. Vamos a pasar de dos columnas separadas que contienen valores flotantes a una DenseVector que contiene ambas características flotantes. En la línea 68 estamos creando una instancia del VectorAssembler y en la línea 70 lo estamos usando para transformar nuestros datos.

73# split the data into training and test setvdf_train, df_test = df.randomSplit([0.8, 0.2])


Como práctica habitual en los problemas de aprendizaje automático, dividimos los datos en entrenamiento y pruebas. Los datos de entrenamiento se utilizan para ajustar el modelo. Los datos de la prueba se utilizan para evaluar el modelo a partir de datos invisibles una vez que se ha ajustado. Es una especie de simulación de lo que sucedería en la producción. Para ello, utilizamos RandomSplit de Spark y le decimos que queremos el 80% de los datos para el entrenamiento y el 20% para las pruebas.

76# fit the modellr_model = lr.fit(df_train)# print some info & metrics of the fitting processprint("printing training summary info...")trainingSummary = lr_model.summaryprint(f"number of iterations: {trainingSummary.totalIterations}")print(f"RMSE: {trainingSummary.rootMeanSquaredError}")print(f"r2: {trainingSummary.r2}")print("model coefficients: ", lr_model.coefficients)

¡Aquí es donde ocurre la magia! En la línea 77 estamos montando el modelo. En las siguientes líneas, parte de la información y las métricas del proceso de entrenamiento se imprimen en formato estándar. Tenga en cuenta que también imprimimos los coeficientes aprendidos por el modelo. Si las cosas van bien, deberían estar muy cerca de los coeficientes que se aprobaron con los argumentos. Deberían estar cerca y no exactamente iguales debido al ruido que hemos introducido en los datos.

87# transform the test datadf_eval = lr_model.transform(df_test)print("displaying predictions on evaluation data...")df_eval.show(10)# format the data so we can evaluate the performance of the model using# RegressionMetricsdf_eval = df_eval.select('label', 'prediction')metrics = RegressionMetrics(df_eval.rdd)print("displaying metrics on test")print(f"test data RMSE: {metrics.rootMeanSquaredError}")print(f"test data r2: {metrics.r2}")

¿Recuerdas los datos de la prueba? Aquí es donde los usamos para evaluar el rendimiento del modelo en datos desconocidos. Estamos transformando el marco de datos de prueba con el modelo ya ajustado, lo que generará una nueva columna llamada predicción con el resultado. Al usar RegressionMetrics integrado en Spark, obtenemos dos métricas para los resultados de los datos de prueba. Estas son las mismas métricas que las que obtuvimos del resumen del entrenamiento, por lo que puedes comparar el rendimiento del modelo según los datos del entrenamiento con los datos de las pruebas.

no-line-numbers|plain-textdisplaying generated dataframe...+---+-------------------+--------------------+| _1|               rand|               rand2|+---+-------------------+--------------------+|303| 0.4694038640011158|  0.6481209161559824|| 95| 0.6726286905382153|  0.8981606963845883||161| 0.8679103354458326| 0.18119671605635224||448|0.30126976391012883|  0.7447454830462397||170|  0.864527171418423| 0.07967898764743175||131| 0.8658936796366256| 0.19843634271437316|| 90| 0.9212755414082087|  0.1328917388102402||321| 0.4444531531651619| 0.45985908905674244|| 26| 0.8636854530294522| 0.26834470199928806||447| 0.9480941000568025|0.036768371545385814|+---+-------------------+--------------------+only showing top 10 rowsdisplaying generated dataframe with labels...+---+-------------------+--------------------+----------+-----------+------------+| _1|               rand|               rand2|raw_result|      noise|noisy_result|+---+-------------------+--------------------+----------+-----------+------------+|303| 0.4694038640011158|  0.6481209161559824| 11.175248| 0.06108435|   11.236333|| 95| 0.6726286905382153|  0.8981606963845883| 15.707894|-0.12919985|   15.578694||161| 0.8679103354458326| 0.18119671605635224| 10.491071|  0.3945236|   10.885594||448|0.30126976391012883|  0.7447454830462397| 10.460153| -0.0810913|   10.379062||170|  0.864527171418423| 0.07967898764743175|  9.442061| 0.43645015|    9.878511||131| 0.8658936796366256| 0.19843634271437316|   10.6433|-0.07997194|   10.563328|| 90| 0.9212755414082087|  0.1328917388102402| 10.541673|0.030807901|    10.57248||321| 0.4444531531651619| 0.45985908905674244|  9.043122|-0.29848525|   8.7446375|| 26| 0.8636854530294522| 0.26834470199928806| 11.320302|-0.43192774|   10.888374||447| 0.9480941000568025|0.036768371545385814|  9.848625| 0.11807168|    9.966697|+---+-------------------+--------------------+----------+-----------+------------+only showing top 10 rowsprinting training summary info...number of iterations: 4RMSE: 0.28385880311787814r2: 0.9954688447304679model coefficients:  [10.000646699625818,10.005967687529866]displaying predictions on evaluation data...+--------------------+---------+------------------+|            features|    label|        prediction|+--------------------+---------+------------------+|[0.00834606519633...|6.4147186|6.2857339058323545||[0.06440724048404...| 8.307216| 8.800616841502176||[0.11552795484457...| 7.149164| 6.903089763368193||[0.12749768801334...|2.6500432|2.8610826921022343||[0.16084911790794...| 2.955299| 2.606324673004222||[0.18106191852777...| 8.683829| 8.508712313017005||[0.19052191329248...|10.528968| 10.22865986628032||[0.19501743199211...|11.151053| 11.18943875216736||[0.20326177358481...| 9.618211| 9.652048794087703||[0.21711604751733...|10.206596|10.217273926029563|+--------------------+---------+------------------+only showing top 10 rowsdisplaying metrics on testtest data RMSE: 0.2923418543882163test data r2: 0.9945363063793037


Echemos un vistazo a la salida del script. El primer marco de datos que se muestra es el resultado de la línea 38 y contiene las entradas generadas aleatoriamente. El segundo es el resultado de la ejecución de la línea 61, donde mostramos los valores generados aleatoriamente, su combinación lineal, el ruido generado aleatoriamente y la suma de la combinación lineal y el ruido. A continuación, se imprime la información de entrenamiento, que incluye el número de iteraciones del algoritmo de regresión lineal, métricas como RMSE y r2, y los coeficientes calculados del modelo. Deberían estar muy cerca de los que estamos usando como entradas x1 y x2. Por último, se muestra una parte del marco de datos de prueba, incluidas las entradas (características), la etiqueta y la predicción del modelo. Las métricas de los datos de la prueba deben parecerse mucho a las del entrenamiento.

Conclusión

En esta primera parte de la serie, analizamos la arquitectura de alto nivel que utilizaremos para ejecutar el procesamiento de datos distribuidos en AWS. Con fines ilustrativos, configuramos un script de ejemplo extremadamente simple con PySpark, la API de Spark para Python. Revisamos el script y lo que hace.

En la parte 2, profundizaremos en los detalles de la ejecución de este script en EMR. Explicaremos las tecnologías que se utilizarán y configuraremos la infraestructura necesaria para ejecutar el ejemplo.

Manténgase a la vanguardia de las últimas tendencias y conocimientos sobre big data, aprendizaje automático e inteligencia artificial. ¡No se lo pierda y suscríbase a nuestro boletín de noticias!

En un mundo de dispositivos interconectados, los datos se generan a un ritmo sin precedentes. Para aprovechar estos datos, los ingenieros suelen emplear técnicas de aprendizaje automático (ML) que les permiten recopilar información valiosa e información útil. A medida que aumenta el volumen de datos, surge la necesidad de ejecutar los procesos de procesamiento de macrodatos a gran escala; su portátil no es suficiente.

Para ayudar a solucionar este problema, podemos usar marcos distribuidos como Spark, que nos permiten usar la computación distribuida para analizar nuestros datos. En esta serie de dos partes, darás un pequeño paso para ejecutar tus cargas de trabajo de datos a escala. Presentamos un tutorial práctico en el que aprenderá a usar los servicios de AWS para ejecutar su procesamiento en la nube. Para ello, lanzaremos un clúster de EMR con funciones de Lambda y ejecutaremos un script de PySpark de muestra. Necesitará una cuenta de AWS para seguir esta serie, ya que utilizaremos sus servicios.

En esta primera parte, analizaremos las tecnologías y la arquitectura de alto nivel y analizaremos un problema de muestra. En la parte 2, profundizaremos en los detalles, incluida la arquitectura y la configuración de la infraestructura necesaria.

Arquitectura de alto nivel

Antes de profundizar en nuestro problema de muestra, revisemos brevemente la arquitectura y los servicios de AWS involucrados. Usaremos Spark, un marco de computación distribuida de código abierto para procesar grandes volúmenes de datos. Spark se puede ejecutar en EMR y utilizaremos su API de Python, PySpark. AWS Elastic MapReduce (EMR) es un servicio administrado que simplifica la implementación y la administración de los clústeres de Hadoop y Spark. Simple Storage Service (S3) es el servicio de almacenamiento de objetos de Amazon, que utilizaremos para almacenar nuestro script de PySpark.

Para crear el clúster de EMR, utilizaremos AWS Lambda, un servicio de procesamiento sin servidor que nos permite ejecutar código sin necesidad de aprovisionar o administrar servidores. El uso de Lambda para crear el clúster nos permite definir la infraestructura como código, de modo que podemos definir las especificaciones y configuraciones del clúster una vez y ejecutarlas varias veces. De este modo, podemos lanzar varios clústeres fácilmente con la misma función. Lambda también nos permite definir activadores automáticos, que podemos usar para aprovisionar clústeres automáticamente en función de un conjunto de acciones.

En nuestro ejemplo de caso práctico, crearemos un script de Python con la biblioteca de Python boto3 de AWS. Este script se cargará en AWS Lambda y hará que el clúster de EMR ejecute nuestro script PySpark de muestra. Este último se almacenará en S3, desde donde el clúster de EMR lo recuperará en tiempo de ejecución.

El problema

Hemos creado un script de Python muy simple para entrenar un modelo de aprendizaje automático con Spark. Generaremos datos aleatorios, calcularemos una combinación lineal de los mismos, añadiremos ruido a los resultados y entrenaremos un modelo para que pueda aprender cómo combinamos los datos. El modelo debería poder filtrar el ruido y calcular con bastante precisión los coeficientes que utilizamos para calcular la combinación lineal de los datos de entrada.


Empecemos a programar

En esta sección, lo guiaremos a través del código, explicando los bloques de código y comentando las funciones utilizadas. Al final, tendrás el script completo disponible.

1import argparsefrom pyspark.sql import SparkSessionimport pyspark.sql.types as Timport pyspark.sql.functions as Ffrom pyspark.ml.regression import LinearRegressionfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.mllib.evaluation import RegressionMetrics


Usaremos argumentos para nuestro script, por lo que necesitamos importar argparse. Vamos a importar SparkSession para controlar la sesión de Spark en ejecución. Usaremos funciones definidas por el usuario, por lo que necesitamos pyspark.sql.types y pyspark.sql.functions. En este ejemplo, ajustaremos un modelo de regresión lineal del MLLib de Spark. Las funciones que necesita este modelo se formatean con VectorAssembler. Por último, el modelo se evaluará con el sistema RegressionMetrics incorporado.

9NUM_SAMPLES = 1000  # number of samples we'll generateif __name__ == "__main__":    # arguments parsing    parser = argparse.ArgumentParser(        prog='LinearRegression',        description='Randomly generates data and fits a linear regression model using Spark MLlib.'    )    parser.add_argument('--x1', required=True, type=float)    parser.add_argument('--x2', required=True, type=float)    parser.add_argument('--max_iter', required=True, type=int)    args = parser.parse_args()


A continuación, defina el número de filas de datos sintéticos que generaremos. En las líneas 14 a 22 configuramos el analizador de argumentos para poder pasar los parámetros al script. Esta es una gran característica que podemos usar a nuestro favor al realizar pruebas y aumenta la modularidad de nuestro código. Le indicamos al analizador los nombres de nuestros argumentos, si son necesarios para ejecutar el programa y su tipo de datos. Tenga en cuenta que los argumentos se pasan como cadenas en la función Lambda y que el analizador los convierte en el tipo especificado.

24# We're obtaining the spark session.spark = SparkSession\    .builder\    .appName("SparkMLExample")\    .getOrCreate()

En la línea 25-28 obtenemos la Spark Session.

30# create a dataframe num_sample = [[i] for i in range(NUM_SAMPLES)] df = spark.createDataFrame(num_sample) df = df.repartition(4)

Entre las líneas 31 y 32, estamos inicializando un nuevo marco de datos con números que van desde cero hasta NUM_SAMPLES: 1. Aunque esto no es obligatorio directamente, es una forma sencilla de inicializar el marco de datos para nuestro propósito. Debido a la forma en que generamos el marco de datos, tiene una sola partición. Si ese es el caso, todo el procesamiento lo realizará una máquina que pertenezca al clúster. Vamos a reparticionar el marco de datos en cuatro particiones, de esta forma, el procesamiento se distribuirá en todo el clúster. Tenga en cuenta que esto es correcto para nuestro ejemplo actual, pero realizar esta operación en grandes cantidades de datos puede resultar en una reorganización costosa.

34# generate two columns of random valuesdf = df.withColumn("rand", F.rand())df = df.withColumn("rand2", F.rand())print("displaying generated dataframe...")df.show(10)

En las líneas 35 a 36 generamos dos columnas con valores aleatorios mediante la función rand de Spark. Esto genera números aleatorios distribuidos uniformemente en el rango [0,1). Los usaremos como características de nuestro modelo

41# calculate the linear combination of the two generated columns, using the # args x1, x2    df = df.withColumn( "raw_result", F.udf(lambda x: args.x1 * x[0] + args.x2 * x[1], T.FloatType()) (F.array(F.col('rand'), F.col('rand2')))) # generate some random noise to add to the linear combination df = df.withColumn("noise", F.rand()) # shift the noise from [0,1) to [-0.5,0.5) df = df.withColumn("noise", F.udf( lambda x: x - 0.5, T.FloatType())(F.col('noise')))# add the noise to the resultdf = df.withColumn("noisy_result", F.udf(lambda x, y: x + y, T.FloatType())(F.col('raw_result'), F.col('noise')))print("displaying generated dataframe with labels...")df.show(10)

En las líneas 43 a 46 calculamos una combinación lineal de las dos columnas generadas, utilizando los multiplicadores pasados por los argumentos. Luego generamos otra columna con muestras aleatorias llamada ruido en la línea 49. Como el ruido se genera en el rango [0,1), restamos 0.5 para cambiar el rango a [-0.5, 0.5). Luego, este ruido se suma a la combinación lineal que creamos en las líneas 56-58. El propósito es agregar ruido a los datos, porque si los datos fueran perfectos, solo se necesitarían un par de muestras para calcular los coeficientes.

63# create the LinearRegression model to be fitlr = LinearRegression(featuresCol="features", labelCol="label",                      solver="l-bfgs", maxIter=args.max_iter)

En la línea 64, creamos una instancia de un modelo de regresión lineal del MLlib de Spark para que se ajuste. Como el modelo está equipado con un marco de datos de entrada, le indicamos qué columnas buscar cuando se le da un marco de datos, en nuestro caso features (inputs) y label (output objetivo).

67# we need to format the data for the LinearRegression Modelvector_assembler = VectorAssembler().setInputCols(  ['rand', 'rand2']).setOutputCol('features') df = vector_assembler.transform(df).select(     "features", F.col("noisy_result").alias("label"))

Los modelos de MLlib requieren que la entrada de datos esté formateada de una manera específica, y el VectorAssembler nos ayuda a lograrlo. Vamos a pasar de dos columnas separadas que contienen valores flotantes a una DenseVector que contiene ambas características flotantes. En la línea 68 estamos creando una instancia del VectorAssembler y en la línea 70 lo estamos usando para transformar nuestros datos.

73# split the data into training and test setvdf_train, df_test = df.randomSplit([0.8, 0.2])


Como práctica habitual en los problemas de aprendizaje automático, dividimos los datos en entrenamiento y pruebas. Los datos de entrenamiento se utilizan para ajustar el modelo. Los datos de la prueba se utilizan para evaluar el modelo a partir de datos invisibles una vez que se ha ajustado. Es una especie de simulación de lo que sucedería en la producción. Para ello, utilizamos RandomSplit de Spark y le decimos que queremos el 80% de los datos para el entrenamiento y el 20% para las pruebas.

76# fit the modellr_model = lr.fit(df_train)# print some info & metrics of the fitting processprint("printing training summary info...")trainingSummary = lr_model.summaryprint(f"number of iterations: {trainingSummary.totalIterations}")print(f"RMSE: {trainingSummary.rootMeanSquaredError}")print(f"r2: {trainingSummary.r2}")print("model coefficients: ", lr_model.coefficients)

¡Aquí es donde ocurre la magia! En la línea 77 estamos montando el modelo. En las siguientes líneas, parte de la información y las métricas del proceso de entrenamiento se imprimen en formato estándar. Tenga en cuenta que también imprimimos los coeficientes aprendidos por el modelo. Si las cosas van bien, deberían estar muy cerca de los coeficientes que se aprobaron con los argumentos. Deberían estar cerca y no exactamente iguales debido al ruido que hemos introducido en los datos.

87# transform the test datadf_eval = lr_model.transform(df_test)print("displaying predictions on evaluation data...")df_eval.show(10)# format the data so we can evaluate the performance of the model using# RegressionMetricsdf_eval = df_eval.select('label', 'prediction')metrics = RegressionMetrics(df_eval.rdd)print("displaying metrics on test")print(f"test data RMSE: {metrics.rootMeanSquaredError}")print(f"test data r2: {metrics.r2}")

¿Recuerdas los datos de la prueba? Aquí es donde los usamos para evaluar el rendimiento del modelo en datos desconocidos. Estamos transformando el marco de datos de prueba con el modelo ya ajustado, lo que generará una nueva columna llamada predicción con el resultado. Al usar RegressionMetrics integrado en Spark, obtenemos dos métricas para los resultados de los datos de prueba. Estas son las mismas métricas que las que obtuvimos del resumen del entrenamiento, por lo que puedes comparar el rendimiento del modelo según los datos del entrenamiento con los datos de las pruebas.

no-line-numbers|plain-textdisplaying generated dataframe...+---+-------------------+--------------------+| _1|               rand|               rand2|+---+-------------------+--------------------+|303| 0.4694038640011158|  0.6481209161559824|| 95| 0.6726286905382153|  0.8981606963845883||161| 0.8679103354458326| 0.18119671605635224||448|0.30126976391012883|  0.7447454830462397||170|  0.864527171418423| 0.07967898764743175||131| 0.8658936796366256| 0.19843634271437316|| 90| 0.9212755414082087|  0.1328917388102402||321| 0.4444531531651619| 0.45985908905674244|| 26| 0.8636854530294522| 0.26834470199928806||447| 0.9480941000568025|0.036768371545385814|+---+-------------------+--------------------+only showing top 10 rowsdisplaying generated dataframe with labels...+---+-------------------+--------------------+----------+-----------+------------+| _1|               rand|               rand2|raw_result|      noise|noisy_result|+---+-------------------+--------------------+----------+-----------+------------+|303| 0.4694038640011158|  0.6481209161559824| 11.175248| 0.06108435|   11.236333|| 95| 0.6726286905382153|  0.8981606963845883| 15.707894|-0.12919985|   15.578694||161| 0.8679103354458326| 0.18119671605635224| 10.491071|  0.3945236|   10.885594||448|0.30126976391012883|  0.7447454830462397| 10.460153| -0.0810913|   10.379062||170|  0.864527171418423| 0.07967898764743175|  9.442061| 0.43645015|    9.878511||131| 0.8658936796366256| 0.19843634271437316|   10.6433|-0.07997194|   10.563328|| 90| 0.9212755414082087|  0.1328917388102402| 10.541673|0.030807901|    10.57248||321| 0.4444531531651619| 0.45985908905674244|  9.043122|-0.29848525|   8.7446375|| 26| 0.8636854530294522| 0.26834470199928806| 11.320302|-0.43192774|   10.888374||447| 0.9480941000568025|0.036768371545385814|  9.848625| 0.11807168|    9.966697|+---+-------------------+--------------------+----------+-----------+------------+only showing top 10 rowsprinting training summary info...number of iterations: 4RMSE: 0.28385880311787814r2: 0.9954688447304679model coefficients:  [10.000646699625818,10.005967687529866]displaying predictions on evaluation data...+--------------------+---------+------------------+|            features|    label|        prediction|+--------------------+---------+------------------+|[0.00834606519633...|6.4147186|6.2857339058323545||[0.06440724048404...| 8.307216| 8.800616841502176||[0.11552795484457...| 7.149164| 6.903089763368193||[0.12749768801334...|2.6500432|2.8610826921022343||[0.16084911790794...| 2.955299| 2.606324673004222||[0.18106191852777...| 8.683829| 8.508712313017005||[0.19052191329248...|10.528968| 10.22865986628032||[0.19501743199211...|11.151053| 11.18943875216736||[0.20326177358481...| 9.618211| 9.652048794087703||[0.21711604751733...|10.206596|10.217273926029563|+--------------------+---------+------------------+only showing top 10 rowsdisplaying metrics on testtest data RMSE: 0.2923418543882163test data r2: 0.9945363063793037


Echemos un vistazo a la salida del script. El primer marco de datos que se muestra es el resultado de la línea 38 y contiene las entradas generadas aleatoriamente. El segundo es el resultado de la ejecución de la línea 61, donde mostramos los valores generados aleatoriamente, su combinación lineal, el ruido generado aleatoriamente y la suma de la combinación lineal y el ruido. A continuación, se imprime la información de entrenamiento, que incluye el número de iteraciones del algoritmo de regresión lineal, métricas como RMSE y r2, y los coeficientes calculados del modelo. Deberían estar muy cerca de los que estamos usando como entradas x1 y x2. Por último, se muestra una parte del marco de datos de prueba, incluidas las entradas (características), la etiqueta y la predicción del modelo. Las métricas de los datos de la prueba deben parecerse mucho a las del entrenamiento.

Conclusión

En esta primera parte de la serie, analizamos la arquitectura de alto nivel que utilizaremos para ejecutar el procesamiento de datos distribuidos en AWS. Con fines ilustrativos, configuramos un script de ejemplo extremadamente simple con PySpark, la API de Spark para Python. Revisamos el script y lo que hace.

En la parte 2, profundizaremos en los detalles de la ejecución de este script en EMR. Explicaremos las tecnologías que se utilizarán y configuraremos la infraestructura necesaria para ejecutar el ejemplo.

Manténgase a la vanguardia de las últimas tendencias y conocimientos sobre big data, aprendizaje automático e inteligencia artificial. ¡No se lo pierda y suscríbase a nuestro boletín de noticias!