4 Domanda: Aggiornamento di una colonna di dataframe in spark

domanda creata a Thu, Jun 22, 2017 12:00 AM

Guardando la nuova scintilla dataframe api, non è chiaro se sia possibile modificare le colonne del dataframe.

Come faccio a cambiare un valore nella riga x della colonna y di un dataframe?

Nel pandas questo sarebbe df.ix[x,y] = new_value

Modifica: consolidando ciò che è stato detto di seguito, non è possibile modificare il dataframe esistente poiché è immutabile, ma è possibile restituire un nuovo dataframe con le modifiche desiderate.

Se desideri semplicemente sostituire un valore in una colonna in base a una condizione, ad esempio np.where:

 
from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

Se vuoi eseguire qualche operazione su una colonna e creare una nuova colonna che viene aggiunta al dataframe:

 
import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

Se vuoi che la nuova colonna abbia lo stesso nome della vecchia colonna, puoi aggiungere il passaggio aggiuntivo:

 
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
    
61
  1. se vuoi accedere a DataFrame per indice, devi prima creare un indice. Vedi, ad es. stackoverflow.com/questions/26828815/... . Oppure aggiungi una colonna di indice con il tuo indice.
    2015-03-31 09: 38: 50Z
4 risposte                              4                         

Sebbene non sia possibile modificare una colonna in quanto tale, è possibile operare su una colonna e restituire un nuovo DataFrame che rifletta tale modifica. Per fare ciò devi prima creare un UserDefinedFunction che implementa l'operazione da applicare e quindi applicare selettivamente quella funzione solo alla colonna di destinazione. In Python:

 
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df ora ha lo stesso schema di old_df (supponendo che old_df.target_column fosse di tipo StringType) ma tutti i valori nella colonna target_column saranno new_value.

    
62
2017-02-21 22: 02: 49Z
  1. questa è una risposta reale al problema, grazie! tuttavia, i lavori scintillanti non finiscono per me, tutti gli esecutori ottengono i los. riesci a pensare ad un modo alternativo? Lo uso con un UDF un po 'più complesso dove faccio la trasformazione in stringhe. Non esiste sintassi simile ai panda come new_df = old_df.col1.apply (lambda x: func (x))?
    2015-03-31 07: 58: 09Z
  2. c'è anche: new_df = old_df.withColumn('target_column', udf(df.name))
    2015-03-31 13: 13: 30Z
  3. Sì, dovrebbe funzionare bene. Tieni presente che le UDF possono assumere colonne solo come parametri. Se vuoi trasferire altri dati nella funzione devi prima applicarlo parzialmente.
    2015-05-18 06: 16: 26Z
  4. @ KatyaHandler Se vuoi solo duplicare una colonna, un modo per farlo sarebbe semplicemente selezionarlo due volte: df.select([df[col], df[col].alias('same_column')]), dove col è il nome della colonna voglio duplicare. Con l'ultima versione di Spark, molte delle cose che ho usato per le UDF possono essere fatte con le funzioni definite in pyspark.sql.functions. Le prestazioni di UDF in Pyspark sono davvero scarse, quindi potrebbe valere la pena esaminare: spark.apache.org/docs/latest/api/python/...
    2015-09-23 19: 48: 58Z
  5. è StringType non Stringtype in udf = UserDefinedFunction(lambda x: 'new_value', Stringtype())
    2017-02-16 05: 31: 15Z

Di solito quando si aggiorna una colonna, vogliamo mappare un vecchio valore ad un nuovo valore. Ecco un modo per farlo in pyspark senza UDF:

 
# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).
    
35
2015-12-21 22: 23: 26Z
  1. Come usare questo, quando il mio update_col è una lista Ex- =: update_cols=['col1','col2','col3']?
    2017-08-18 13: 23: 40Z
  2. Usa un ciclo for.
    2017-08-18 13: 50: 44Z

DataFrames sono basati su RDD. Gli RDD sono strutture immutabili e non consentono di aggiornare gli elementi sul posto. Per modificare i valori, dovrai creare un nuovo DataFrame trasformando quello originale usando le operazioni DSL o RDD simili a SQL come map.

Uno slide deck altamente raccomandato: Presentazione di DataFrames in Spark per la scienza dei dati su larga scala .

    
13
2016-02-24 21: 56: 18Z
  1. Che cosa è esattamente l'astrazione del dataframe che aggiunge che non potrebbe essere già fatto nella stessa quantità di righe con una tabella?
    2015-03-17 22: 25: 36Z
  2. "DataFrames introduce nuovi operatori semplificati per il filtraggio, l'aggregazione e la proiezione su dataset di grandi dimensioni.Internamente, DataFrames si avvalgono dell'ottimizzatore logico Spark SQL per pianificare in modo intelligente l'esecuzione fisica delle operazioni funziona bene su dataset di grandi dimensioni "- databricks.com/blog /2015/03/13/announcing-spark-1-3.html
    2015-03-17 22: 40: 17Z

Proprio come maasg dice che puoi creare un nuovo DataFrame dal risultato di una mappa applicata al vecchio DataFrame. Un esempio per un dato DataFrame df con due righe:

 
val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

Notare che se i tipi delle colonne cambiano, è necessario dargli uno schema corretto invece di df.schema. Controlla l'API di org.apache.spark.sql.Row per i metodi disponibili: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Aggiorna] O usando UDF in Scala:

 
import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

e se il nome della colonna deve rimanere lo stesso puoi rinominarlo di nuovo:

 
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
    
11
2017-05-23 11: 33: 15Z
fonte posta Qui