1 Вопрос: Есть ли способ добавить несколько столбцов в фрейм данных, рассчитанный на основе скользящих средних из разных столбцов и / или за разную продолжительность

вопрос создан в Wed, May 8, 2019 12:00 AM

У меня есть фрейм данных с данными временных рядов, и я пытаюсь добавить в него множество столбцов скользящих средних с разными окнами разных диапазонов. Когда я делаю этот столбец за столбцом, результаты довольно медленные.

Я пытался просто накапливать звонки withColumn до тех пор, пока не получу все из них.

Псевдокод:

import pyspark.sql.functions as pysparkSqlFunctions

## working from a data frame with 12 colums:
## - key as a String
## - time as a DateTime
## - col_{1:10} as numeric values

window_1h =  Window.partitionBy("key")                \
                   .orderBy(col("time").cast("long")) \
                   .rangeBetween(-3600, 0)
window_2h =  Window.partitionBy("key")                \
                   .orderBy(col("time").cast("long")) \
                   .rangeBetween(-7200, 0)
df = df.withColumn("col1_1h", pysparkSqlFunctions.avg("col_1").over(window_1h))
df = df.withColumn("col1_2h", pysparkSqlFunctions.avg("col_1").over(window_2h))
df = df.withColumn("col2_1h", pysparkSqlFunctions.avg("col_2").over(window_1h))
df = df.withColumn("col2_2h", pysparkSqlFunctions.avg("col_2").over(window_2h))

Мне хотелось бы добавить возможность добавления всех четырех столбцов (или многих других) за один вызов, надеясь, что данные будут проходить только один раз для повышения производительности.

    
1
  1. Вы можете написать свою собственную функцию UDAF, которая делает это. Это будет единственная функция для самого длинного окна, которое у вас есть, и внутренне она будет собирать средние значения для всех более коротких окон за один проход.
    2019-05-09 12: 04: 44Z
1 ответ                              1                         

Я предпочитаю импортировать библиотеку функций как F, так как она выглядит аккуратнее и это стандартный псевдоним, используемый в официальной документации Spark.

Звездная строка, '*', должна охватывать все текущие столбцы в кадре данных. В качестве альтернативы можно заменить звездочку на *df.columns. Здесь звезда разбивает список на отдельные параметры для метода выбора.

from pyspark.sql import functions as F

df = df.select(
    "*",
    F.avg("col_1").over(window_1h).alias("col1_1h"),
    F.avg("col_1").over(window_2h).alias("col1_2h"),
    F.avg("col_2").over(window_1h).alias("col2_1h"),
    F.avg("col_2").over(window_1h).alias("col2_1h"),
)
    
- 1
2019-05-10 14: 47: 07Z
  1. Спасибо, это значительно быстрее и чище
    2019-05-08 16: 39: 11Z
  2. Извините, я удаляю принятый тег ответа, когда после этого проверяю строку отладки rdd, я все равно вижу столько операций mapPartition, сколько есть добавленных столбцов, поэтому Я подозреваю, что мы можем сделать лучше? Имеет ли это смысл? (Я не тот, кто проголосовал против;))
    2019-05-08 16: 46: 33Z
  3. Я не совсем уверен, что вы ищете. Дайте мне знать, если найдете решение. Мне было бы интересно увидеть это
    2019-05-10 14: 09: 17Z
источник размещен Вот