Apache Spark

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8868
Registrado: 09 May 2012 01:38

Apache Spark

Mensajepor Dalamar » 21 Ene 2017 08:24

Merece la pena usar apache Spark para manipulacion de datos?

Apache Spark es un entorno distribuido, estilo Map/Reduce pero mas avanzado.

Yo uso Apache Spark en el trabajo, de hecho lo he introducido en este banco, y me he peleado bastante con el...

Es interesante usar Spark sin Hadoop?

Si, ya que es un sistema de ejecucion, todo depende del tamaño de tus datos, se puede usar con MySQL, Cassandra y no he mirado con redis pero seria interesante...

Usar Spark en windows? Es posible?

Si, yo lo hago en el trabajo para desarrollo, hay que cambiar algunas configuraciones, pero funciona perfectamente.

Usar Spark en una sola maquina?

Las ventajas son pocas para el incremento de complejidad que supone, estas son que te gestiona el consumo de memoria muy bien y no tienes que gestionar multithreading en tu codigo, el mismo se encarga, aunque todo esto bien se podria hacer directamente en Scala con programacion funcional.

Cores Per Node and Memory Per Node could also be used to optimize Spark for local mode. If your local machine has 8 cores and 16 GB of RAM and you want to allocate 75% of your resources to running a Spark job, setting Cores Per Node and Memory Per Node to 6 and 12 respectively will give you optimal settings. You would also want to zero out the OS Reserved settings. If Spark is limited to using only a portion of your system, there is no need to set aside resources specifically for the OS.


Spark con que lenguaje? Java/Scala/Python/R ?

Python y R son ideales para machine learning, pero toda la funcionalidad de Spark no esta disponible en estos, Spark esta pensado para Scala y es lo mas recomendable excepto para procesos de Machine Learning de todas formas se pueden combinar tareas de Spark en uno y otro lenguaje.

Estoy por probar un cluster de Raspberry PI con Spark a ver que tal va...

De momento no he necesitado Spark en casa para mis experimentos con datos de mercado y trading, es mas interesante tener una buena GPU con CUDA para hacer experimentos con Machine Learning.

Como configuro el cluster? Parametros del job?

Ver: http://c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark-config-cheatsheet/

A good rule of thumb for selecting the optimal number of Executors Per Node would be to select the setting that minimizes Unused Memory Per Node and Unused Cores Per Node while keeping Total Memory Per Executor below the Executor Memory Upper Bound and Core Per Executor below the Executor Core Upper Bound.


Number of Nodes

The number of worker machines in your cluster. This can be as low as one machine.

Memory Per Node (GB)

The amount of RAM per node that is available for Spark’s use. If using Yarn, this will be the amount of RAM per machine managed by Yarn Resource Manager.

Cores Per Node

The number of cores per node that are available for Spark’s use. If using Yarn, this will be the number of cores per machine managed by Yarn Resource Manager.

Memory Overhead Coefficient

Recommended value: .1

The percentage of memory in each executor that will be reserved for spark.yarn.executor.memoryOverhead.

Executor Memory Upper Bound (GB)

Recommended value: 64

The upper bound for executor memory. Each executor runs on its own JVM. Upwards of 64GB of memory and garbage collection issues can cause slowness.

Executor Core Upper Bound

Recommended value: 5

The upper bound for cores per executor. More than 5 cores per executor can degrade HDFS I/O throughput. I believe this value can safely be increased if writing to a web-based “file system” such as S3, but significant increases to this limit are not recommended.

OS Reserved Cores

Recommended value: 1

Cores per machine to reserve for OS processes. Should be zero if only a percentage of the machine’s cores were made available to Spark (i.e. entered in the Cores Per Node field above).

OS Reserved Memory (GB)

Recommended value: 1

The amount of RAM per machine to reserve for OS processes. Should be zero if only a percentage of the machine’s RAM was made available to Spark (i.e. entered in the Memory Per Node field above).

Parallelism Per Core

Recommended value: 2

The level of parallelism per allocated core. This field is used to determine the spark.default.parallelism setting. Generally recommended setting for this value is double the number of cores. cores.

spark.executor.instances

(Number of Nodes * Selected Executors Per Node) - 1

This is the number of total executors in your cluster. We subtract one to account for the driver. The driver will consume as many resources as we are allocating to an individual executor on one, and only one, of our nodes.

spark.yarn.executor.memoryOverhead

Equal to Overhead Memory Per Executor

The memory to be allocated for the memoryOverhead of each executor, in MB. Calculated from the values from the row in the reference table that corresponds to our Selected Executors Per Node.

spark.executor.memory

Equal to Memory Per Executor

The memory to be allocated for each executor. Calculated from the values from the row in the reference table that corresponds to our Selected Executors Per Node.

spark.yarn.driver.memoryOverhead

Equal to spark.yarn.executor.memoryOverhead

The memory to be allocated for the memoryOverhead of the driver, in MB.

spark.driver.memory

Equal to spark.executor.memory

The memory to be allocated for the driver.

spark.executor.cores

Equal to Cores Per Executor

The number of cores allocated for each executor. Calculated from the values from the row in the reference table that corresponds to our Selected Executors Per Node.

spark.driver.cores

Equal to spark.executor.cores

The number of cores allocated for the driver.

spark.default.parallelism

spark.executor.instances * spark.executor.cores * Parallelism Per Core

Default parallelism for Spark RDDs, Dataframes, etc.


A tener en cuenta:

    En Scala la diferencia entre el operador "==" y "===", es que "==" usa equals, "===" depende del objeto y contexto, por ejemplo en ScalaTest los asserts con "===" dan una mayor explicacion del resultado, en scalaz es un equal con type-safe checking, pero en Spark es diferente, "==" devuelve un boolean y "===" devuelve un Column.

Nota: cuando menciono Hadoop me refiero al ecosistema en general, no a map/reduce unicamente, este ultimo ya no tiene sentido en casi ningun caso.

Ver: Scala muuy resumido https://github.com/mbonaci/scala
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8868
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 11 Feb 2017 16:16

Como empezar con Spark en casa sin complicarse mucho?

Una imagen de docker con zeppelin y a juuuugaaar....

Nota: En realidad puedes usar Spark directamente desde Windows sin instalar nada, solo descomprimiendolo en una carpeta y usando spark-shell
Adjuntos
DockerSpark1.PNG
SparkZeppelin.PNG
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8868
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 25 Jun 2017 07:53

Es Spark una moda?
Adjuntos
1-aIL4TJ1V7_V0KSnu9FCT7g.png
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8868
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 26 Jun 2017 07:50

Lo cierto es que:

“Big data is like teenage sex: everyone talks about it, nobody really knows how to do it, everyone thinks everyone else is doing it, so everyone claims they are doing it…”


Realmente no estas haciendo Big Data hasta que no tratas con mil millones de lineas y con terabytes de data...

Yo inicie el proyecto de Big Data en el banco y tratabamos unos millones de lineas al dia, nada... Eso no es Big Data, poco a poco cada vez se ha ido incrementando el volumen, ahora con la fusion, he heredado el cluster de Cloudera del otro banco y unos cuantos Terabytes de datos, las tablas mas grandes que utilizamos pasan de los 600 millones de lineas con mas de 100 columnas es lo que ya se aproxima a Big Data real... Y cuando uno empieza a dedicar el 80% del tiempo en optimizar los procesos.

Siguiente paso aplicar ML a esos datos, cosa que ya he empezado a hacer, outlier detection, clustering etc...
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8868
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 27 Ago 2017 07:37

Extender Catalyst Optimizer Rules en Spark

Spark SQL utiliza un optimizador llamado catalyst para optimizar todas las queries en Spark SQL y en el DSL de Dataframe.

https://github.com/bartekkalinka/spark-custom-rule-executor/blob/master/src/main/scala/Main.scala

Rules:
    - ConstantFolding: Elimina constantes de la query

Imprimir el plan Optimizado:

println(dataframe.queryExecution.optimizedPlan.numberedTreeString)


Como escribir una regla para el optimizador: http://blog.madhukaraphatak.com/introduction-to-spark-two-part-6/
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8868
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 29 Ago 2017 14:23

Spark se basa en Lazy evaluation, lo que quiere decir que no ejecuta nada hasta que realizamos una accion, tenemos acciones y transformaciones.

Tenemos que tener en cuenta que Spark no recuerda lo que hacemos entre Acciones por lo que si queremos evitar recalcular partes debemos de usar cache o persist.

Para cada action se crea un execution plan.
    - Tenemos stages que consisten en acumular el mayor numero posible de transformaciones con narrow dependencies.
      - Un narrow dependency es una dependencia que esta en una particion, es decir no necesita Shuffle.
    - Un stage se termina cuando tenemos wide dependencies que requieren shuffle.

Particiones:

Es importante comprender como se particionan los datos cuando los cargamos, ya que de ello dependera que tengamos wide o narrow dependencies, lo cual nos llevara a tener mas o menos stages y shuffle, shuffles son lo que debemos evitar a toda costa.

Cuando leemos un archivo tendremos tantas particiones como blockes tenga el archivo.
    - Esto se puede controlar con: sc.textFile(<inputPath>, <minPartitions>)

Cuando creamos un RDD paralelizando una coleccion, tendremos un numero de particiones definido por la propiedad: spark.default.parallelism (En local sera el numero de cores, en meses sera 8 etc.. El maximo sera el numero de executors)
    - Esto se puede controlar con: sc.parallelize(<sequence>, <numSlices>)

Lo normal es que una transformacion no altere el numero de particiones, pero en caso de Union el numero de particiones sera igual a la suma del conjunto.

Los datos de una particion no deben de ser demasiado grandes para que puedan caber en memoria, si no, iran a disco con su correspondiente impacto en el performance.

Dado el caso anterior, es posible tener mas particiones que cores en la maquina, es preferible tener particiones que entren en memoria que tener particiones muy grandes.

Demasiadas particiones con pocos datos tambien es algo ineficiente, pero el impacto de tener muchas tareas es menor comparado al impacto de enviar datos a disco.

Se recomienda (En Spark_ Big Data Cluster Computing in Production) tener 2 a 4 veces mas particiones por RDD que numero de CPUs.

Hay dos particionadores/partitioners, HashPartitioner y RangePartitioner, por defecto se usa HashPartitioner a no ser que alguno de los RDDs de entrada use otro.

HashPartitioner usa un Hash code para distribuir los valores por las particiones, RangePartitioner hace uso de entradas del RDD que se puedan ordenar para calcular particiones similares, el numero final de particiones puede ser menor al configurado inicialmente.

Es posible crear un particionador a medida, extendiendo de org.apache.spark.Partitioner, lo cual es interesante cuando se conoce el caso de uso especifico.

Cuando hacemos un map sobre un RDD podemos cambiar las keys del par RDD, y por tanto el particionamiento, para evitar esto podemos usar mapValues y flatMapValues.

Shuffling:

Durante el shuffle se escribe a disco y se crean tantos archivos como numero de mappers multiplicado por numero de reducers.

Poniendo la propiedad spark.shuffle.consolidateFiles a true, su puede usar para agregar una fase llamada "shuffle file consolidation", que permite tener un solo archivo de shuffle por core de reducers en vez de uno por reducer si hay multiples reducers en un mismo core.

Una operacion Join puede requerir Shuffle dependiendo de si los RDDs estan particionados o no, si estan particionados con el mismo particionador (partitionBy(new HashPArtitioner(4))) se puede evitar el shuffle.

Temas avanzados de Spark:
    - Crear un encoder propio
    - Crear un particionador a medida
    - Crear reglas para Catalyst a medida
    - Registrar clases al serializarlas usando KryoSerializer, poniendo spark.kryo.registrationRequired a true y definiendo la prioedad spark.serializer.KryoSerializer
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8868
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 01 Sep 2017 09:37

Costed Based Optimizer:

Lo basico:

Con catalyst tenemos optimizacion basada en reglas, pero esta optimizacion no tiene nada que ver con los datos en si, en Spark 2.2 tenemos CBO que analiza los datos y aplica optimizaciones en funcion de lo que hay en los datos y principalmente del impacto de los filtros y joins en los datos intermedios.

Avanzado:

Tenemos los siguientes comandos de SQL:

Código: Seleccionar todo

ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS column-name1, column-name2, ….


Que nos van a proporcionar informacion como maximo, minimo, numero de distintos, Nulls, medias etc...

Con esta informacion podemos anticipar en FILTER ( condiciones en la WHERE) y JOIN, como van a quedar los datos intermedios, en cuestion de volumen mayormente.

Una vez tengamos esta informacion sabremos si compensa mas hacer HashJoin (rapida pero usa mucha memoria) o Sort-merge Join (lenta pero usa menos memoria).

Aqui podemos investigar los tipos de Joins que puede hacer, como por ejemplo broadcast hash-join vs. shuffled hash-join, o multi-way join order etc...

Para activar esta funcionalidad necesitamos cambiar la proiedad "spark.sql.cbo.enabled" a true, ya que esta a false por defecto, para in impactar sistemas de produccion directamente.

Ver: https://databricks.com/blog/2017/08/31/cost-based-optimizer-in-apache-spark-2-2.html
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8868
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 02 Sep 2017 11:17

Cual es la diferencia entre Spark 1 y 2 ?

En Spark 1, nos centrabamos en evitar el shuffling, tecnicas para optimizar esa parte, usabamos RDDs.

En spark 2, la optimizacion se centra en las joins, ahora todo el mundo usa Dataframes y Datasets.
    Nos enfocamos en Catalyst y en Cost Based Optimizer

Tipos de Joins:
    Shuffle Hash Join
      - Crea una key y un hash en ella, que son los campos que usamos para hacer la join.
      - Hace un shuffle y map/reduce.
      - Funciona mejor cuando los datos estan distribuidos adecuadamente y cuando el numero de keys es adecuado.
    Broadcast Hash Join
      - Si una tabla es pequeña la envia a todos los nodos para evitar el shuffle
      - Habitualmente es mas eficiente que Shuffle Hash Join
      - Spark por si mismo decidira usarla, hay que verlo con un SQL Explain.
      - Algunas veces Catalyst no sabe el tamaño de la tabla y no hace la optimizacion. (Dependiendo de la fuente. Hive...)
    Cartesian Joins
      - Si se necesita un calculo, o hacer manualmente con el API de RDD, forzar a Broadcast y usar un UDF y hacer el calculo necesario.
    Theta Join
      - Spark hace un Cartesian
      - Crear buckets for kayA y keyB
    One to Many Join
      - No es un problema si usas parquet. (Debido al encoding de parquet)
Adjuntos
Cartesian1.JPG
Theta.JPG
ShuffleHashJoin2.JPG
LeftJoinShuffle.JPG
LeftJoinShuffle2.JPG
BroadcastHashJoin1.JPG
ShuffleHashJoin.JPG
Spark1vs2.JPG
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8868
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 02 Sep 2017 14:06

Tungsten, el generador de byte code java para acelerar la ejecucion.

Ver: https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
Adjuntos
Tungsten2.JPG
Tungsten.JPG
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!

Avatar de Usuario
Dalamar
Site Admin
Mensajes: 8868
Registrado: 09 May 2012 01:38

Re: Apache Spark

Mensajepor Dalamar » 02 Sep 2017 14:10

Ongoing work, la funcionalidad que esta por venir...

Atencion que cada vez se usa mas Python con Spark!
Adjuntos
NextSteps2.JPG
NextSteps3.JPG
SparkLenguajes.JPG
NextSteps.JPG
¿Te ha gustado este hilo? Compartelo en las redes sociales para que se sume mas gente a la conversacion!


Volver a “Big Data”

Ingresar