2 Вопрос: Spark Структурированная потоковая передача: первичный ключ в приемнике JDBC

вопрос создан в Thu, May 2, 2019 12:00 AM

Я читаю поток данных из темы kafka, используя структурированную потоковую передачу в режиме обновления., а затем выполняю некоторые преобразования.

Затем я создал приемник jdbc для отправки данных в приемник mysql в режиме добавления. Проблема заключается в том, как сказать моему приемнику, чтобы он знал, что это мой первичный ключ, и выполнить обновление на его основе, чтобы в моей таблице не было повторяющихся строк.

   val df: DataFrame = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<List-here>")
  .option("subscribe", "emp-topic")
  .load()


  import spark.implicits._
  // value in kafka is bytes so cast it to String
  val empList: Dataset[Employee] = df.
  selectExpr("CAST(value AS STRING)")
  .map(row => Employee(row.getString(0)))

  // window aggregations on 1 min windows
  val aggregatedDf= ......

  // How to tell here that id is my primary key and do the update
  // based on id column
  aggregatedDf
  .writeStream
  .trigger(Trigger.ProcessingTime(60.seconds))
  .outputMode(OutputMode.Update)
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      batchDF
      .select("id", "name","salary","dept")
      .write.format("jdbc")
      .option("url", "jdbc:mysql://localhost/empDb")
      .option("driver","com.mysql.cj.jdbc.Driver")
      .option("dbtable", "empDf")
      .option("user", "root")
      .option("password", "root")
      .mode(SaveMode.Append)
      .save()
     }
    
0
2 ответа                              2                         

Одним из способов является то, что вы можете использовать ON DUPLICATE KEY UPDATE, а foreachPartition - для этой цели.

Ниже приведен фрагмент кода псевдо

/**
    * Insert in to database using foreach partition.
    * @param dataframe : DataFrame
    * @param sqlDatabaseConnectionString
    * @param sqlTableName
    */
  def insertToTable(dataframe: DataFrame, sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

//numPartitions = number of simultaneous DB connections you can planning to give
datframe.repartition(numofpartitionsyouwant)

    val tableHeader: String = dataFrame.columns.mkString(",")
    dataFrame.foreachPartition { partition =>
      // Note : Each partition one connection (more better way is to use connection pools)
      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
      partition.grouped(1000).foreach {
        group =>
          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
          group.foreach {
            record => insertString.append("('" + record.mkString(",") + "'),")
          }

val sql =   s"""
               | INSERT INTO $sqlTableName  VALUES  
               | $tableHeader
               | ${insertString}
               | ON DUPLICATE KEY UPDATE 
               | yourprimarykeycolumn='${record.getAs[String]("key")}'
    sqlExecutorConnection.createStatement()
                .executeUpdate(sql)
          }
    sqlExecutorConnection.close() // close the connection
        }
      }

вы можете использовать готовое заявление вместо выражения jdbc.

Дополнительная информация: SPARK SQL - обновите таблицу MySql, используя DataFrames и JDBC

    
1
2019-05-03 03: 06: 36Z
  1. спасибо за ответ. Но тот вопрос, который вы задали, кажется, 3 года назад, поэтому мне интересно, должна ли быть какая-либо другая функция присутствовать в текущей версии spark 2.4.0.
    2019-05-03 05: 39: 53Z
  2. вышеупомянутый подход будет работать и с текущей версией spark, так как его уровень RDD
    2019-05-03 14: 28: 24Z

знаете ли вы, почему я получаю эту ошибку, используя writestream с jdbc так же, как это сделал thedevd?

java.lang.UnsupportedOperationException: источник данных jdbc не поддерживает потоковую запись

Кроме того, я слышал, что обходным путем было введение foreachBatch, и я попытался использовать .foreachBatch {(batchDF: DataFrame, batchId: Long) = > batchDF .writeStream .... но получите эту ошибку: Значение foreachBatch не является членом org.apache.spark.sql.streaming.DataStreamWriter [org.apache.spark.sql.Row]

    
0
2019-05-09 17: 18: 18Z
  1. , так как операция writestream не поддерживается в приемнике JDBC, API-интерфейс spark не допускает этого. Аналогично, writeStream теперь разрешен с Redis в качестве источника. Вы должны использовать foreachBatch с write для помещения в приемник JDBC
    2019-05-10 15: 43: 06Z
источник размещен Вот