Apache Spark

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8864
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 03 Sep 2017 05:29

Spark Optimization Series de Microsoft. (4 capitulos, abarcan hasta Spark 2.2, muy recomendable)

He encontrado muy interesante esta serie de cuatro capitulos de Microsoft:

Fuente: https://channel9.msdn.com/Shows/Data-Exposed/Spark-Performance-Series-1-with-Maxim-Lukiyanov

Resumiendo, los principales puntos a tener en cuenta son:

1. Configuracion de parametros/propiedades para nuestro Job, aqui no habla de maximo 5 cores por executor, incluso nos dice que podemos probar con todos los cores de un nodo en un solo executor.
2. Caching, no recomienda usar el sistema de caching de Spark ya que pierde el particionado, nos recomienda otros sistemas como Alluxio o usar hdfs.
Ver: https://www.alluxio.com/blog/effective-spark-dataframes-with-alluxio
3. Particionado, gestionar bien que columna usamos para particionar los datos, habla de tablas Hive particionadas. (Es decir definir el particionado en el metastore the Hive)
4. GC Overhead, tener muy encuenta cuanto tiempo empleamos en GC Collection por tarea, en el UI de Spark podemos ver cuanto tiempo se ha dedicado a GC Collection y si es superior a 10% aparecera en rojo, para reducir este problema, dividiremos el nodo en mas executors, cada uno con menos cores y sobre todo con menos memoria RAM asignada.
5. Archivo fuente, basicamente usar siempre que se pueda Parquet, ya que Spark esta muy optimizado para Parquet.
6. Joins y CBO, lo que ya mencionamos anteriormente.
7. Outliers y Skew se pueden identificar en el Timeline del Spark UI, ya que se ve lo que esta ocurriendo en cada executor.
8. Bucketing, grupos de keys para particionar, permite optimizar grandes queries (pre-ordenando los buckets al crear la tabla).

Nota: Menciona que hay que reducir la comunicacion entre las maquinas, en otro articulo he visto mencionado que para cantidades de 20 nodos con menos de 1T o 60 nodos con 2.5T reducir a cero la latencia de la red, mejora el proceso en un 2% unicamente, que el cuello de botella esta en la CPU y el IO, pero esto hay que tomarlo con un poco de excepticismo ya que no considera la optimizacion via Tungsten que reduce en gran medida la carga de la CPU.
Adjuntos
SparkMSN_Bucketing.JPG
SparkMSN_Bucketing2.JPG
SparkMSN_CBO2.JPG
SparkMSN_CBO.JPG
SparkMSN_CBO3.JPG
SparkMSN_Skew.JPG
SparkMSN1.JPG
SparkMSN2.JPG
SparkMSN3.JPG
SparkMSN4.JPG
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8864
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 07 Sep 2017 06:28

Operaciones en DataSets:

    Cuando necesitamos una columna que es "typed", podemos convertirla de la siguiente forma:

    Código: Seleccionar todo

    someDS.agg(avg($"column").as[Double])


    mapGroups y flatMapGroups son similares al API funcional map y flatMap pero para agrupaciones, es decir para KeyValueGroupedDataset

    reduceByKey no existe en Datasets, pero se puede simular de varias formas, una por ejemplo es usar groupByKey y despues mapGroups

El problema con mapGroups es que require shuffling y es muy caro, por lo que es mejor usar reduce reduceGroups o un Aggregator que es similar al de RDDs.

Pivot parte de RelationalGroupedDataset ([ULR]https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html[/URL])

Similar al pivot de Python (pandas) y de R (reshape2)
Si queremos agrupar por A y B, pivotar por C y sumar por D:
    1. Pandas: pivot_table(df, values='D', index=['A', 'B'], columns=['C'], aggfunc=np.sum)
    2. reshape2: dcast(df, A + B ~ C, sum)
    3. Spark: df.groupBy("A", "B").pivot("C").sum("D")
    4. Spark optimizado: df.groupBy("A", "B").pivot("C", Seq("small", "large")).sum("D")

Tener en cuenta que el API de RDDs y de datasets no es exactamente lo mismo, no todas las funciones de RDDs existen en Datasets ni las de que coinciden son exactamente iguales.
Adjuntos
DatasetOps1.JPG
DatasetOps2.JPG
DatasetOps3.JPG
DatasetOps4.JPG
DatasetOps5.JPG
reduceByKey.jpg
WhenToUse.JPG
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8864
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 09 Sep 2017 14:10

Operaciones con RDDs:

Diferenciamos RDDs y PairRDDs, el segundo es tratado por Spark de forma especial y tiene diferentes metodos que no tenemos en los RDDs normales.

Para convertir un RDD a PairRDD usamos un map o una list comprehension (yield) donde devolvemos un Tuple que contiene primero el key y despues el resto.

byKey nos permite crear la key usando una funcion, por ejemplo:

Código: Seleccionar todo

words.keyBy(word => word.toLowerCase.toSeq(0))


fold es una operacion que se puede paralelizar (Y por lo tanto esta en el API de RDDs), pero foldRight y foldLeft no se pueden, ya que no suman siempre datos del mismo tipo.

Para PairRDDs:

mapValues y flatmapValues son las versiones que no pasan la key a la funcion lambda, tienen ciertas ventajas.

combineByKey es muy importante, es utilizada internamente para implmentar aggregateByKey, reduceByKey y groupByKey.

Para usar combineByKey, hay que especificar tres funciones:
    1. createCombiner
    2. mergeValue
    3. mergeCombiners

aggregateByKey, tambien necesita tres parametros:
    1. Valor inicial, como por ejemplo cero para numeros, un Set vacio, una cadena vacia etc...
    2. Funcion para combinar dos parametros dentro de la misma particion.
    3. Funcion para combinar parametros entre particiones.

Tanto en Join como en Cogroup lo que se retorna son la key, despues los valores de la derecha y despues los de la izquierda, en groupByKey es la key y los valores todos juntos.

Nota: La documentacion de PairRDDs menciona que usar colecciones mutables es adecuado para evitar consumo extra de memoria asociado con crear nuevas colecciones o agregar nuevos valores.

groupByKey crea agrupaciones en el lado del mapper, por lo que el shuffling es terrible, se puede mejorar en muchos casos usando las otras funciones, dependiendo del caso.

La diferencia entre join y cogroup, que join es equialente a inner join y cogroup es equivalente a full outer join, ademas cogroup devuelve iterators.

Dentro de OrderedRDDFunctions tenemos:

sortByKey, para ordenar por Key, por ejemplo para buscar el max o min.
filterByRange, para filtrar dentro de un rango.
repartitionAndSortWithinPartitions, para reparticionar y ordenar dentro de cada particion.

Nota: Recordemos que en Spark 2, no tiene sentido usar RDDs con datos estructurados o semi, ya que Dataframes y Datasets tienen mejor performance y Datasets contienen las mismas operaciones funconales que los RDDs.
Adjuntos
TypesOfRDDs.JPG
RDDInterface.JPG
visualapi.pdf
(3.61 MiB) Descargado 5 veces
reduce_by.png
group_by.png
TypesOfRDDs.JPG
SparkOperations2.JPG
SparkOperations1.JPG
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8864
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 09 Sep 2017 14:11

Hablemos de Catalyst.

La optimizacion mas basica es Predicate Pushdown, si tenemos que operar haciendo un shuffling y despues filtrar los resultados, lo logico es optimizar en los mappers aplicando el filtrado antes y haciendo un shuffling con menos datos.

Si queremos ver en profundidad cada una de las reglas que Catalyst utiliza, tenemos que ir al cofigo fuente de Spark y en especial a esta clase:

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Adjuntos
Catalyst1.JPG
Catalyst2.JPG
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8864
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 15 Sep 2017 21:22

Otra pregunta interesante es, si tienes lineas duplicadas y no tienes una key unica como identificas duplicados? Como los extraes? etc...

Para esto hay que mirar: (Y ya de paso vemos todo lo relacionado con zip en RDDs)

RDD.zipWithIndex is just like Seq.zipWithIndex, it adds contiguous (Long) numbers. This needs to count the elements in each partition first, so your input will be evaluated twice. Cache your input RDD if you want to use this.

RDD.zipWithUniqueId also gives you unique Long IDs, but they are not guaranteed to be contiguous. (They will only be contiguous if each partition has the same number of elements.) The upside is that this does not need to know anything about the input, so it will not cause double-evaluation.

RDD.zip allows you to “zip” together two RDDs assuming they have the same length. This creates a PairRDD. The two RDDs must have the same number of partitions as well as the same number of elements.

RDD.zipPartitions Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by applying a function to the zipped partitions. Assumes that all the RDDs have the *same number of partitions*, but does *not* require them to have the same number of elements in each partition.


Ver: monotonically_increasing_id() en Spark 2
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8864
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 16 Sep 2017 14:23

MLLib Y ML

DStream y Streams Structurados

DStream usa StreamingContext

DStream tiene varios RDD, cada intervalo batch es un RDD diferente, usando transform se puede aplicar una operacion al RDD completo, con map se aplicaria a cada elemento.

Con Spark 2.2 tenemos Streams Structurados que no se basan en RDDs, se basan en Datasets.
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8864
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 17 Sep 2017 06:12

Cuando inicias Spark standalone le dices cuantos executors quieres:

    1. local (Crea un solo executor, no es buena idea)
    2. local[N] (Le dices cuantos executors quieres, dejas 1 o 2 para el OS y pones 2 o 3 executors para cada core/procesador)
    3. local[ * ] (Crea un executor por core/procesador fisico, no es ideal, quieres 2 o 3 por procesador)

Propiedades a nivel de nodo, en spark_env.sh:

SPARK_LOCAL_DISKS, nos permite definir los mount points de nuestros discos en cada maquina, lo cual es muy importante para distribuir la carga entre los discos y evitar un cuello de botella debido a varias tareas leyendo del mismo disco.

SPARK_WORKER_CORES, en cada nodo podemos cambiar el numero de cores, en caso de tener unas maquinas mas potentes que otras o con mas o menos dedicacion a spark. (Esto no es el numero de cores que puede dar a un worker, es el numero de cores que puede dar a todos sus workers en una maquina)

SPARK_WORKER_INSTANCES, es el numero de workers que se pueden tener en una maquina, por ejemplo si tenemos varios drivers en el cluster.

SPARK_WORKER_MEMORY, es cuanta memoria puede dar al cojunto de los workers, no a cada uno, como en los casos anteriores. (Por ejemplo si lo ponemos a 20GB, podria dar a un executor 20GB o empezar dos executors con 10GB cada uno.)

SPARK_DAEMON_MEMORY, es la cantidad de memoria que va a utilizar el master, o los masters si tenemos high availability, esto normalmente lo conseguimos usando Zookeeper.

Propiedades a nivel de aplicacion:

spark.cores.max, indica el numero maximo de cores que se pueden usar en todo el cluster.
spark.executor.memory, nos indica la memoria por executor.

Los parametros se pueden especificar en 4 sitios, la mayor precedencia se encuentra en el codigo, cuando los defines en el contexto o sesion, despues al llamar a spark-submit, despues el spark environment y spark default files, por ultimo en el scala settings, para verificar que settings estan teniendo efecto, se puede ir a UI de Spark y vereificar en la pestaña Environment.

Cada vez que lanzas una accion, en spark tienes un JOB, cada job puede tener varias STAGES, una Stage tiene una o varias TASKS

El Skew, o si hay muchos datos en una particion y pocos en otra se puede ver desde la pestaña Storage del UI de Spark, nos muestra las particiones de cada RDD y su tamaño en memoria y en disco, asi como en que executors residen.

YARN

YARN, hay dos formas de ejecutar Spark en YARN, usando client mode y cluster mode, client es cuando el driver corre en el cliente, en cluster node, el driver corre en el app master de Spark.

YARN se puede caer, pero Spark continuara funcionando, y que una vez se han negociado los recursos la comunicacion con el cliente es directa, pero no se podran negociar mas recursos, YARN permite negociar recursos dinamicamente.

Ejemplo de calculo de uso de cores y executors: (Sacado de la documentacion de MapR)

    - Consideremos un cluster de 5 nodos con 80 cores y 320 GB de memoria en total.
    - Un nodo tiene 16 cores y 64 GB
    - Guardamos 2 cores y 8 GB para el SO por ejemplo, dependiendo de lo que necesitemos.
    - Como regla, ponemos 3-5 threads/tasks por executor. (--executor-cores = 3)
    - Por nodo tenemeos 14 cores, quitamos un core para YARN aplication master (Ver: https://mapr.com/blog/resource-allocati ... park-yarn/) y dividimos cores por executor-cores, nos quedan 4 executors por nodo, en total 20 executors (--num-executors = 20)
    - En cada nodo tenemos 64GB - 8GB = 56GB, dividimos entre 4 executors y nos quedan 14 GB por executor, quitamos 10% para Yarn y nos quedan 12GB (--executor-memory = 12)

    One core per executor means only one task can be running at any time for one executor. In the case of a broadcast join, the memory can be shared by multiple running tasks in the same executor if we increase the number of cores per executor.

    Note that if dynamic resource allocation is enabled by setting spark.dynamicAllocation.enabled to true, Spark can scale the number of executors registered with this application up and down based on the workload. In this case, you do not need to specify spark.executor.instances manually.


    Tambien es posible tener 4 cores por executor y 19GB executor memory.

Nota: Spark tiene unos 20 threads for shuffling y otras tareas que son internal threads.
Adjuntos
YARNProperties.JPG
SPARKLocalDisks.JPG
StandaloneCores.JPG
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!


Volver a “Big Data”

Ingresar