1 प्रतिक्रिया स्ट्रीम करने के लिए अक्का-एचटीपी के साथ अक्का स्ट्रीम का उपयोग कैसे करें

पर बनाया गया सवाल Sun, Mar 17, 2019 12:00 AM

मैं अक्का स्ट्रीम में नया हूं। मैंने CSV पार्सिंग के लिए निम्नलिखित कोड का उपयोग किया।

class CsvParser(config: Config)(implicit system: ActorSystem) extends LazyLogging with NumberValidation {

  import system.dispatcher

  private val importDirectory = Paths.get(config.getString("importer.import-directory")).toFile
  private val linesToSkip = config.getInt("importer.lines-to-skip")
  private val concurrentFiles = config.getInt("importer.concurrent-files")
  private val concurrentWrites = config.getInt("importer.concurrent-writes")
  private val nonIOParallelism = config.getInt("importer.non-io-parallelism")

  def save(r: ValidReading): Future[Unit] = {
      Future()
  }

  def parseLine(filePath: String)(line: String): Future[Reading] = Future {
    val fields = line.split(";")
    val id = fields(0).toInt
    try {
      val value = fields(1).toDouble
      ValidReading(id, value)
    } catch {
      case t: Throwable =>
        logger.error(s"Unable to parse line in $filePath:\n$line: ${t.getMessage}")
        InvalidReading(id)
    }
  }

  val lineDelimiter: Flow[ByteString, ByteString, NotUsed] =
    Framing.delimiter(ByteString("\n"), 128, allowTruncation = true)

  val parseFile: Flow[File, Reading, NotUsed] =
    Flow[File].flatMapConcat { file =>
      val src = FileSource.fromFile(file).getLines()
      val source : Source[String, NotUsed] = Source.fromIterator(() => src)
      // val gzipInputStream = new GZIPInputStream(new FileInputStream(file))

      source
        .mapAsync(parallelism = nonIOParallelism)(parseLine(file.getPath))
    }

  val computeAverage: Flow[Reading, ValidReading, NotUsed] =
    Flow[Reading].grouped(2).mapAsyncUnordered(parallelism = nonIOParallelism) { readings =>
      Future {
        val validReadings = readings.collect { case r: ValidReading => r }
        val average = if (validReadings.nonEmpty) validReadings.map(_.value).sum / validReadings.size else -1
        ValidReading(readings.head.id, average)
      }
    }

  val storeReadings: Sink[ValidReading, Future[Done]] =
    Flow[ValidReading]
      .mapAsyncUnordered(concurrentWrites)(save)
      .toMat(Sink.ignore)(Keep.right)

  val processSingleFile: Flow[File, ValidReading, NotUsed] =
    Flow[File]
      .via(parseFile)
      .via(computeAverage)

  def importFromFiles = {
    implicit val materializer = ActorMaterializer()

    val files = importDirectory.listFiles.toList
    logger.info(s"Starting import of ${files.size} files from ${importDirectory.getPath}")

    val startTime = System.currentTimeMillis()

    val balancer = GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val balance = builder.add(Balance[File](concurrentFiles))
      val merge = builder.add(Merge[ValidReading](concurrentFiles))

      (1 to concurrentFiles).foreach { _ =>
        balance ~> processSingleFile ~> merge
      }

      FlowShape(balance.in, merge.out)
    }

    Source(files)
      .via(balancer)
      .withAttributes(ActorAttributes.supervisionStrategy { e =>
        logger.error("Exception thrown during stream processing", e)
        Supervision.Resume
      })
      .runWith(storeReadings)
      .andThen {
        case Success(_) =>
          val elapsedTime = (System.currentTimeMillis() - startTime) / 1000.0
          logger.info(s"Import finished in ${elapsedTime}s")
        case Failure(e) => logger.error("Import failed", e)
      }
  }
}

मैं अक्का एचटीटीपी का उपयोग करना चाहता था जो सभी 0600350991100100101035062 संस्थाओं को सीएसवी से पार्स कर देता था लेकिन मैं यह नहीं समझ सकता था कि मैं ऐसा कैसे करूंगा।

उपरोक्त कोड सर्वर से फाइल प्राप्त करता है और प्रत्येक लाइनों को ValidReading //जनरेट करने के लिए पार्स करता है।

मैं CSV को akka-http के माध्यम से कैसे पास /अपलोड कर सकता हूं, फ़ाइल को पार्स कर सकता हूं और परिणामी प्रतिक्रिया को अंतिम बिंदु पर स्ट्रीम कर सकता हूं?

    
0
  1. 0600350991100100101035062 के बजाय, आप परिणाम को HTTP-समापन बिंदु पर Akka-HTTP का उपयोग करके स्ट्रीम करना चाहते हैं?
    2019-03-17 12: 29: 12Z
  2. हाँ। मैं अकाका-HTTP
    का उपयोग करके HTTP एंडपॉइंट के परिणाम को स्ट्रीम करना चाहूंगा
    2019-03-17 12: 32: 32Z
    1 उत्तर                              1                         

    समाधान का "सार" कुछ इस प्रकार है:

    ValidReading

    आप पता लगाते हैं कि अपलोड की गई चीज़ एक मल्टी-फॉर्म-डेटा है जिसमें "सीएसवी" नाम का एक हिस्सा है। आप उस से बाइट्स सोर्स प्राप्त करें। गणना करें (अपने तर्क को save भाग पर डालें)। अपना डेटा वापस

    import akka.http.scaladsl.server.Directives._
    val route = fileUpload("csv") {
      case (metadata, byteSource) =>
        val source = byteSource.map(x => x)
        complete(HttpResponse(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, source)))
    }
    
    में परिवर्तित करें। नए स्रोत के साथ अनुरोध पूरा करें। यह आपके एंडॉइंट को प्रॉक्सी की तरह बना देगा।     
    0
    2019-03-17 16: 48: 22Z
    .map(x=>x)
स्रोत रखा गया यहाँ