4 Domanda: Come fondere Spark DataFrame?

domanda creata a Mon, May 27, 2019 12:00 AM

Esiste un equivalente di Pandas Melt Function in Apache Spark in PySpark o almeno in Scala?

Stavo eseguendo un dataset di esempio fino ad ora in python e ora voglio usare Spark per l'intero set di dati.

Grazie in anticipo.

    
32
  1. Vedi anche unpivot in spark-sql /pyspark e Trasponi la colonna in fila con Spark
    2018-05-11 18: 56: 56Z
4 risposte                              4                         

Non esiste una funzione incorporata (se lavori con il supporto di SQL e Hive abilitato puoi usare funzione stack , ma non è esposto in Spark e non ha un'implementazione nativa) ma è banale eseguire il rollover. Importazioni richieste:

 
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable 

Implementazione di esempio:

 
def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

E alcuni test (basati su documenti di Pandas ):

 
import pandas as pd

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6}})

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])
 
   A variable  value
0  a        B      1
1  b        B      3
2  c        B      5
3  a        C      2
4  b        C      4
5  c        C      6
 
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()
 
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

Nota: per l'uso con versioni precedenti di Python rimuovi le annotazioni di tipo.

Related:

53
2018-10-19 17: 06: 57Z
  1. Il tuo codice aggiunge i tick ai nomi delle colonne e poi fallisce nella chiamata withColumn. Altri riferimenti disponibili qui ( stackoverflow.com/questions/55781796/... )
    2019-04-21 11: 49: 10Z

È arrivata questa domanda nella mia ricerca di un'implementazione di melt in Spark per Scala.

Pubblicare la mia porta Scala nel caso in cui qualcuno incappa anche su questo.

 
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/** Extends the [[org.apache.spark.sql.DataFrame]] class
 *
 *  @param df the data frame to melt
 */
implicit class DataFrameFunctions(df: DataFrame) {

    /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
     * 
     *  melt is (kind of) the inverse of pivot
     *  melt is currently (02/2017) not implemented in spark
     *
     *  @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
     *  @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark
     *  
     *  @todo method overloading for simple calling
     *
     *  @param id_vars the columns to preserve
     *  @param value_vars the columns to melt
     *  @param var_name the name for the column holding the melted columns names
     *  @param value_name the name for the column holding the values of the melted columns
     *
     */

    def melt(
            id_vars: Seq[String], value_vars: Seq[String], 
            var_name: String = "variable", value_name: String = "value") : DataFrame = {

        // Create array<struct<variable: str, value: ...>>
        val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)

        // Add to the DataFrame and explode
        val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

        val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}

        return _tmp.select(cols: _*)

    }
}

Dato che non sono così avanzato considerando Scala, sono sicuro che ci sono margini di miglioramento.

Qualsiasi commento è benvenuto.

    
19
2018-04-28 05: 59: 45Z

UPD

Finalmente ho trovato l'implementazione più efficace per me. Usa tutte le risorse per il cluster nella configurazione del mio filo.

 
from pyspark.sql.functions import explode
def melt(df):
    sp = df.columns[1:]
    return (df
            .rdd
            .map(lambda x: [str(x[0]), [(str(i[0]), 
                                         float(i[1] if i[1] else 0)) for i in zip(sp, x[1:])]], 
                 preservesPartitioning = True)
            .toDF()
            .withColumn('_2', explode('_2'))
            .rdd.map(lambda x: [str(x[0]), 
                                str(x[1][0]), 
                                float(x[1][1] if x[1][1] else 0)], 
                     preservesPartitioning = True)
            .toDF()
            )

Per un dataframe molto ampio ho diminuito le prestazioni alla generazione di _vars_and_vals dalla risposta dell'utente6910411.

È stato utile implementare la fusione tramite selectExpr

 
columns=['a', 'b', 'c', 'd', 'e', 'f']
pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns)
df = spark.createDataFrame(pd_df)
+---+---+---+---+---+---+
|  a|  b|  c|  d|  e|  f|
+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|
|  4|  5|  6|  7|  9|  8|
|  7|  8|  9|  1|  2|  4|
|  8|  3|  9|  8|  7|  4|
+---+---+---+---+---+---+

cols = df.columns[1:]
df.selectExpr('a', "stack({}, {})".format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in cols))))
+---+----+----+
|  a|col0|col1|
+---+----+----+
|  1|   b|   2|
|  1|   c|   3|
|  1|   d|   4|
|  1|   e|   5|
|  1|   f|   6|
|  4|   b|   5|
|  4|   c|   6|
|  4|   d|   7|
|  4|   e|   9|
|  4|   f|   8|
|  7|   b|   8|
|  7|   c|   9|
...
    
0
2019-04-24 08: 47: 51Z
  1. Sto riscontrando una mancata corrispondenza di tipo non risolvibile .. a causa della mancata corrispondenza del tipo di dati: Argument 2 (DoubleType)! = Argument 6 (LongType); linea 1 pos 0; . Il test mostra che sembra impli- cabile il tipo del tuo col1 in base ai primi elementi di col0. Quando diciamo i valori per dof di col0, digita la mancata corrispondenza. Come lo risolveresti? Sto provando stack ({}, {}) ". Format (len (cols), ',' .join ((" '{}', cast ({} come bigint) "... che sembra funzionare, ma non è sicuro se è il modo corretto ed efficiente. Sto avendo problemi di prestazioni quando impilando centinaia di colonne, quindi l'efficienza è imporTant.
    2019-04-23 19: 44: 47Z
  2. @ Kenny Non ho mai incontrato un tale problema in questo caso. Ma la soluzione sembra logica. Inoltre puoi provare la mia soluzione dall'aggiornamento.
    2019-04-24 08: 37: 19Z

Votato per la risposta di user6910411. Funziona come previsto, tuttavia, non può gestire bene i valori Nessuno. così ho refactorato la sua funzione di fusione al seguente:

 
from pyspark.sql.functions import array, col, explode, lit
from pyspark.sql.functions import create_map
from pyspark.sql import DataFrame
from typing import Iterable 
from itertools import chain

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create map<key: value>
    _vars_and_vals = create_map(
        list(chain.from_iterable([
            [lit(c), col(c)] for c in value_vars]
        ))
    )

    _tmp = df.select(*id_vars, explode(_vars_and_vals)) \
        .withColumnRenamed('key', var_name) \
        .withColumnRenamed('value', value_name)

    return _tmp

Il test è con il seguente dataframe:

 
import pandas as pd

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6},
                   'D': {1: 7, 2: 9}})

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C', 'D'])

A   variable    value
0   a   B   1.0
1   b   B   3.0
2   c   B   5.0
3   a   C   2.0
4   b   C   4.0
5   c   C   6.0
6   a   D   NaN
7   b   D   7.0
8   c   D   9.0

 
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C', 'D']).show()
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|  1.0|
|  a|       C|  2.0|
|  a|       D|  NaN|
|  b|       B|  3.0|
|  b|       C|  4.0|
|  b|       D|  7.0|
|  c|       B|  5.0|
|  c|       C|  6.0|
|  c|       D|  9.0|
+---+--------+-----+
    
0
2019-06-25 11: 18: 40Z
fonte posta Qui