Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated the spark and almaren version #48

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 10 additions & 125 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
To add http.almaren dependency to your sbt build:

```
libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.8-3.4"
libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.9-3.4"
```

To run in spark-shell:

```
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.9-3.4,com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.4"
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.4,com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.4"
```

## Table of Contents
Expand Down Expand Up @@ -43,14 +43,14 @@ repository.

| versions | Connector Artifact |
|----------------------------|-------------------------------------------------------------|
| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.8-3.4` |
| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.4` |
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.8-3.3` |
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.3` |
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.2` |
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.1` |
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-2.4` |
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.8-2.4` |
| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.9-3.4` |
| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.4` |
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.9-3.3` |
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.3` |
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.2` |
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.1` |
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-2.4` |
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.9-2.4` |

## Methods

Expand Down Expand Up @@ -501,118 +501,3 @@ How to concatenate by new line:
(rows: Seq[Row]) => rows.map(row => row.getAs[String](Alias.DataCol)).mkString("\n")
```

### HTTP Row

It will initiate an HTTP request for each Row, extracting headers, parameters, and hidden parameters from each Row.

```
$ curl -X PUT -H "Authorization: {SESSION_ID}" \
-H "Content-Type: text/csv" \
-H "Accept: text/csv" \
--data-binary @"filename" \
https://localhost/objects/documents/batch
```

#### Example

```scala
import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.http.HTTPConn.HTTPImplicit
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
import scala.collection.JavaConverters.asScalaIteratorConverter
import spark.implicits._

val almaren = Almaren("http-almaren")

val df = Seq(
("John", "Smith", "London"),
("David", "Jones", "India"),
("Michael", "Johnson", "Indonesia"),
("Chris", "Lee", "Brazil"),
("Mike", "Brown", "Russia")
).toDF("first_name", "last_name", "country").coalesce(1)

df.createOrReplaceTempView("person_info")
val requestSchema = StructType(Seq(
StructField("__URL__", StringType),
StructField("__DATA__", StringType),
StructField("__REQUEST_HEADERS__", MapType(StringType, StringType)),
StructField("__REQUEST_PARAMS__", MapType(StringType, StringType)),
StructField("__REQUEST_HIDDEN_PARAMS__", MapType(StringType, StringType))
))

//Constructing the request dataframe by generating necessary input columns from each row in the input dataframe.
val requestRows: Seq[Row] = df.toLocalIterator.asScala.toList.map(row => {
val firstName = row.getAs[String]("first_name")
val lastName = row.getAs[String]("last_name")
val country = row.getAs[String]("country")
val url = s"http://localhost:3000/fireshots/getInfo"
val headers = scala.collection.mutable.Map[String, String]()
headers.put("data", firstName)
val params = scala.collection.mutable.Map[String, String]()
params.put("params", lastName)
val hiddenParams = scala.collection.mutable.Map[String, String]()
hiddenParams.put("hidden_params", country)

Row(url,
s"""{"first_name" : "$firstName","last_name":"$lastName","country":"$country"} """,
headers,
params,
hiddenParams
)
})

val requestDataframe = spark.createDataFrame(spark.sparkContext.parallelize(requestRows), requestSchema)


val responseDf = almaren.builder
.sourceDataFrame(requestDataframe)
.sqlExpr("monotonically_increasing_id() as __ID__", "__DATA__", "__URL__", "__REQUEST_HEADERS__", "__REQUEST_PARAMS__", "__REQUEST_HIDDEN_PARAMS__")
.httpRow(method = "POST")
.batch

responseDf.show(false)
```

#### Parameters

| Parameter | Description | Type |
|----------------|-------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------|
| headers | HTTP headers | Map[String,String] |
| params | HTTP params | Map[String,String] |
| hiddenParams | HTTP params which are hidden (not exposed in logs) | Map[String,String] |
| method | HTTP Method | String |
| requestHandler | Closure to handle HTTP request | (Row,Session,String,Map[String,String],String) => requests.Respons |
| session | Closure to handle HTTP sessions | () = requests.Session |
| connectTimeout | Timeout in ms to keep the connection keep-alive, it's recommended to keep this number high | Int |
| readTimeout | Maximum number of ms to perform a single HTTP request | Int |
| threadPoolSize | How many connections in parallel for each executor. parallelism = number of excutors * number of cores * threadPoolSize | Int |
| batchSize | How many records a single thread will process | Int |

| Parameters | Mandatory | Description | Column Type |
|-------------------------------|-----------|------------------------------------------------------------------------------------|--------------------|
| \_\_ID\_\_ | Yes | This field will be in response of http.almaren component, it's useful to join data | String |
| \_\_URL\_\_ | Yes | Used to perform the HTTP request | String |
| \_\_DATA\_\_ | No | Data Content, used in POST/PUT Method HTTP requests | String |
| \_\_REQUEST_HEADERS\_\_ | Yes | HTTP headers | Map[String,String] |
| \_\_REQUEST_PARAMS\_\_ | Yes | HTTP params | Map[String,String] |
| \_\_REQUEST_HIDDEN_PARAMS\_\_ | Yes | HTTP params which are hidden (not exposed in logs) | Map[String,String] |

| Parameters | Description |
|-------------------------------|------------------------------------------------------------|
| \_\_ID\_\_ | Custom ID , This field will be useful to join data |
| \_\_BODY\_\_ | HTTP response |
| \_\_HEADER\_\_ | HTTP response header |
| \_\_STATUS_CODE\_\_ | HTTP response code |
| \_\_STATUS_MSG\_\_ | HTTP response message |
| \_\_ERROR\_\_ | Java Exception |
| \_\_ELAPSED_TIME\_\_ | Request time in ms |
| \_\_URL\_\_ | HTTP request URL |
| \_\_DATA\_\_ | Data Content, used in POST/PUT Method HTTP requests |
| \_\_REQUEST_HEADERS\_\_ | HTTP Request headers |
| \_\_REQUEST_PARAMS\_\_ | HTTP Request params |
| \_\_REQUEST_HIDDEN_PARAMS\_\_ | HTTP Request params which are hidden (not exposed in logs) |


4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ lazy val scala213 = "2.13.9"
crossScalaVersions := Seq(scala212,scala213)
ThisBuild / scalaVersion := scala213

val sparkVersion = "3.4.0"
val sparkVersion = "3.4.1"
val majorVersionReg = "([0-9]+\\.[0-9]+).{0,}".r

val majorVersionReg(majorVersion) = sparkVersion
Expand All @@ -17,7 +17,7 @@ scalacOptions ++= Seq("-deprecation", "-feature")
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.9-${majorVersion}" % "provided",
"com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.10-${majorVersion}" % "provided",
"com.lihaoyi" %% "requests" % "0.7.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"org.scalatest" %% "scalatest" % "3.2.14" % "test"
Expand Down
Loading