3 Вопрос: Как сохранить структуру столбцов в Spark Dataframe после операции отображения над строками

вопрос создан в Thu, May 2, 2019 12:00 AM

Я пытаюсь применить функцию к каждой строке фрейма данных Spark, как в примере.

val df = sc.parallelize(
  Seq((1, 2, 0), (0, 0, 1), (0, 0, 0))).toDF("x", "y", "z")
df.show()

который дает

+---+---+---+
|  x|  y|  z|
+---+---+---+
|  1|  2|  0|
|  0|  0|  1|
|  0|  0|  0|
+---+---+---+

Предположим, я хочу что-то сделать со значениями в каждой строке, например, изменить 0 на 5.

val b = df.map(row => row.toSeq.map(x => x match{
    case 0 => 5
    case x: Int => x
}))

b.show()
+---------+
|    value|
+---------+
|[1, 2, 5]|
|[5, 5, 1]|
|[5, 5, 5]|
+---------+

Функция работала, но теперь у меня есть один столбец, записи которого являются списками, а не 3 столбцами целых. Я хочу вернуть мои именованные столбцы.

    
2
  1. Не уверен, почему кто-то отклонил вопрос, получивший 3 интересных ответа. Это никогда не произойдет в перекрестной проверке.
    2019-05-02 16: 35: 29Z
  2. Upvoted - действительно, это часто случается
    2019-05-03 06: 20: 48Z
3 ответа                              3                         

Есть несколько способов сделать это, вот некоторые:

df.map(row => {
      val size = row.size
      var seq: Seq[Int] = Seq.empty[Int]
      for (a <- 0 to size - 1) {
        val value: Int = row(a).asInstanceOf[Int]
        val newVal: Int = value match {
          case 0 =>
            5
          case _ =>
            value
        }
        seq = seq :+ newVal
      }
      Row.fromSeq(seq)
    })(RowEncoder.apply(df.schema))
 val columns = df.columns
    df.select(
        columns.map(c => when(col(c) === 0, 5).otherwise(col(c)).as(c)): _*)
      .show()
def fun: (Int => Int) = { x =>
      if (x == 0) 5 else x
    }
    val function = udf(fun)
    df.select(function(col("x")).as("x"),
              function(col("y")).as("y"),
              function(col("z")).as("z"))
      .show()
def checkZero(a: Int): Int = if (a == 0) 5 else a

      df.map {
        case Row(a: Int, b: Int, c: Int) =>
          Row(checkZero(a), checkZero(b), checkZero(c))
      } { RowEncoder.apply(df.schema) }
      .show()
    
1
2019-05-02 16: 15: 15Z
  1. Все это отличные ответы, но я принял этот, потому что первое решение обобщает лучше для случая, когда строка преобразуется более сложными способами, чем в моем игрушечном примере .
    2019-05-02 16: 34: 22Z

Вы можете определить UDF для применения этой замены. Например:

def subsDef(k: Int): Int = if(k==0) 5  else k
val subs = udf[Int, Int](subsDef)

Затем вы можете применить UDF к определенному столбцу или, если хотите, к каждому столбцу DF:

// to a single column, for example "x"
df = df.withColumn("x", subs(col("x")))
df.show()
+---+---+---+
|  x|  y|  z|
+---+---+---+
|  1|  2|  0|
|  5|  0|  1|
|  5|  0|  0|
+---+---+---+



// to every columns of DF
df.columns.foreach(c => {
      df = df.withColumn(c, subs(col(c)))
    })
df.show()
+---+---+---+
|  x|  y|  z|
+---+---+---+
|  1|  2|  5|
|  5|  5|  1|
|  5|  5|  5|
+---+---+---+
    
3
2019-05-02 15: 17: 30Z

Вместо того, чтобы преобразовывать DataFrame по строкам, рассмотрите возможность использования встроенной функции Spark API when/otherwise следующим образом:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq((1, 2, 0), (0, 0, 1), (0, 0, 0)).toDF("x", "y", "z")

val vFrom = 0
val vTo = 5

val cols = df.columns  // Filter for specific columns if necessary

df.select( cols.map( c => 
    when(col(c) === vFrom, vTo).otherwise(col(c)).as(c)
  ): _*
).show
// +---+---+---+
// |  x|  y|  z|
// +---+---+---+
// |  1|  2|  5|
// |  5|  5|  1|
// |  5|  5|  5|
// +---+---+---+
    
2
2019-05-02 16: 12: 34Z
  1. Как всегда хороший ответ.
    2019-05-03 06: 24: 35Z
источник размещен Вот