1 Có cách nào để thêm nhiều cột vào khung dữ liệu được tính từ di chuyển trung bình từ các cột khác nhau và / hoặc trong khoảng thời gian khác nhau

câu hỏi được tạo ra tại Wed, May 8, 2019 12:00 AM

Tôi có một khung dữ liệu với dữ liệu chuỗi thời gian và tôi đang cố gắng thêm nhiều cột trung bình di chuyển vào đó với các cửa sổ khác nhau thuộc các phạm vi khác nhau. Khi tôi thực hiện cột này theo cột, kết quả khá chậm.

Tôi đã cố gắng thực hiện các cuộc gọi withColumn cho đến khi tôi có tất cả các cuộc gọi đó.

Mã giả:

import pyspark.sql.functions as pysparkSqlFunctions

## working from a data frame with 12 colums:
## - key as a String
## - time as a DateTime
## - col_{1:10} as numeric values

window_1h =  Window.partitionBy("key")                \
                   .orderBy(col("time").cast("long")) \
                   .rangeBetween(-3600, 0)
window_2h =  Window.partitionBy("key")                \
                   .orderBy(col("time").cast("long")) \
                   .rangeBetween(-7200, 0)
df = df.withColumn("col1_1h", pysparkSqlFunctions.avg("col_1").over(window_1h))
df = df.withColumn("col1_2h", pysparkSqlFunctions.avg("col_1").over(window_2h))
df = df.withColumn("col2_1h", pysparkSqlFunctions.avg("col_2").over(window_1h))
df = df.withColumn("col2_2h", pysparkSqlFunctions.avg("col_2").over(window_2h))

Điều tôi muốn là khả năng thêm cả 4 cột (hoặc nhiều cột khác) trong một cuộc gọi, hy vọng chỉ truyền dữ liệu một lần để có hiệu suất tốt hơn.

    
1
  1. Bạn có thể viết hàm UDAF của riêng bạn thực hiện điều đó. Nó sẽ là một chức năng duy nhất trên cửa sổ dài nhất bạn có và bên trong nó sẽ thu thập trung bình cho tất cả các cửa sổ ngắn hơn trong một đoạn.
    2019-05-09 12: 04: 44Z
1 Câu trả lời                              1                         

Tôi thích nhập thư viện hàm là F vì nó trông gọn gàng hơn và đó là bí danh tiêu chuẩn được sử dụng trong tài liệu Spark chính thức.

Chuỗi sao, '*', sẽ nắm bắt tất cả các cột hiện tại trong khung dữ liệu. Ngoài ra, bạn có thể thay thế chuỗi sao bằng *df.columns. Tại đây, ngôi sao sẽ khám phá danh sách thành các tham số riêng cho phương thức chọn.

from pyspark.sql import functions as F

df = df.select(
    "*",
    F.avg("col_1").over(window_1h).alias("col1_1h"),
    F.avg("col_1").over(window_2h).alias("col1_2h"),
    F.avg("col_2").over(window_1h).alias("col2_1h"),
    F.avg("col_2").over(window_1h).alias("col2_1h"),
)
    
- 1
2019-05-10 14: 47: 07Z
  1. Cảm ơn, điều này nhanh hơn và sạch hơn đáng kể
    2019-05-08 16: 39: 11Z
  2. Xin lỗi tôi đang xóa thẻ câu trả lời được chấp nhận, khi tôi kiểm tra chuỗi gỡ lỗi của rdd sau này, tôi vẫn thấy nhiều hoạt động mapPartition như có các cột được thêm vào Tôi đang nghi ngờ rằng chúng ta có thể làm tốt hơn? Điều này có nghĩa không? (Tôi không phải là người bỏ phiếu mặc dù;))
    2019-05-08 16: 46: 33Z
  3. Tôi không thực sự chắc chắn về những gì bạn đang tìm kiếm. Hãy cho tôi biết nếu bạn tìm thấy một giải pháp. Tôi sẽ thấy thú vị khi nhìn thấy nó
    2019-05-10 14: 09: 17Z
nguồn đặt đây