incubator-predictionio
ada0591f2afe
Improve insert performance for ES storage
Shinsuke Sugaya
2 days ago
Closes #373

72 73 74 75 76 77 78 79 80 81 82 72 73 74 75 76 77 78 79 80 81 82
("event" -> ("type" -> "keyword")) ~ ("entityType" -> ("type" -> "keyword")) ~ ("entityId" -> ("type" -> "keyword")) ~ ("targetEntityType" -> ("type" -> "keyword")) ~ ("targetEntityId" -> ("type" -> "keyword")) ~
("properties" -> ("type" -> "keyword")) ~
("properties" -> ("enabled" -> 0)) ~
("eventTime" -> ("type" -> "date")) ~ ("tags" -> ("type" -> "keyword")) ~ ("prId" -> ("type" -> "keyword")) ~ ("creationTime" -> ("type" -> "date")))) ESUtils.createMapping(restClient, index, estype, compact(render(json)))

44 45 46 47 48 49 50 51 52 53 54 44 45 46 47 48 49 50 51 52 53 54 55 56
ESUtils.createIndex(restClient, index, ESUtils.getNumberOfShards(config, index.toUpperCase), ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype ->
("_all" -> ("enabled" -> 0)))
("_all" -> ("enabled" -> 0)) ~ ("properties" -> ("n" -> ("enabled" -> 0))))
ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) } finally { restClient.close() }
...
57 58 59 60 61 62 63 64 65 66 67 59 60 61 62 63 64 65 66 67 68 69
try { val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON) val response = restClient.performRequest( "POST", s"/$index/$estype/$name",
Map("refresh" -> "true").asJava,
Map("refresh" -> "false").asJava,
entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) val result = (jsonResponse \ "result").extract[String] result match { case "created" =>

24 25 26 27 28 29 30 31 24 25 26 27 28 29 30 31
name := "template-scala-parallel-recommendation" organization := "org.apache.predictionio" libraryDependencies ++= Seq(
"org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided",
"org.apache.predictionio" %% "apache-predictionio-core" % "0.11.1-SNAPSHOT" % "provided",
"org.apache.spark" %% "spark-core" % sys.env.getOrElse("PIO_SPARK_VERSION", "1.6.3") % "provided", "org.apache.spark" %% "spark-mllib" % sys.env.getOrElse("PIO_SPARK_VERSION", "1.6.3") % "provided")
About FluentSend Feedback