Need help with kotlin-spark-api?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

JetBrains
244 Stars 20 Forks Apache License 2.0 191 Commits 6 Opened issues

Description

This projects gives Kotlin bindings and several extensions for Apache Spark. We are looking to have this as a part of Apache Spark 3.x

Services available

!
?

Need anything else?

Contributors list

Kotlin for Apache® Spark™ Maven Central official JetBrains project

Your next API to work with Apache Spark.

This project adds a missing layer of compatibility between Kotlin and Apache Spark. It allows Kotlin developers to use familiar language features such as data classes, and lambda expressions as simple expressions in curly braces or method references.

We have opened a Spark Project Improvement Proposal: Kotlin support for Apache Spark to work with the community towards getting Kotlin support as a first-class citizen in Apache Spark. We encourage you to voice your opinions and participate in the discussion.

Table of Contents

Supported versions of Apache Spark

| Apache Spark | Scala | Kotlin for Apache Spark | |:------------:|:-----:|:-------------------------------:| | 3.0.0+ | 2.12 | kotlin-spark-api-3.0:1.0.2 | | 2.4.1+ | 2.12 | kotlin-spark-api-2.42.12:1.0.2 | | 2.4.1+ | 2.11 | kotlin-spark-api-2.42.11:1.0.2 |

Releases

The list of Kotlin for Apache Spark releases is available here. The Kotlin for Spark artifacts adhere to the following convention:

[Apache Spark version]_[Scala core version]:[Kotlin for Apache Spark API version]

Maven Central

How to configure Kotlin for Apache Spark in your project

You can add Kotlin for Apache Spark as a dependency to your project:

Maven
,
Gradle
,
SBT
, and
leinengen
are supported.

Here's an example

pom.xml
:
  org.jetbrains.kotlinx.spark
  kotlin-spark-api-3.0.0
  ${kotlin-spark-api.version}


    org.apache.spark
    spark-sql_2.12
    ${spark.version}

Note that

core
is being compiled against Scala version
2.12
.
You can find a complete example with
pom.xml
and
build.gradle
in the Quick Start Guide.

Once you have configured the dependency, you only need to add the following import to your Kotlin file:

kotlin
import org.jetbrains.kotlinx.spark.api.*

Kotlin for Apache Spark features

Creating a SparkSession in Kotlin

val spark = SparkSession
        .builder()
        .master("local[2]")
        .appName("Simple Application").orCreate

Creating a Dataset in Kotlin

spark.toDS("a" to 1, "b" to 2)

The example above produces

Dataset>
.

Null safety

There are several aliases in API, like

leftJoin
,
rightJoin
etc. These are null-safe by design. For example,
leftJoin
is aware of nullability and returns
Dataset>
. Note that we are forcing
RIGHT
to be nullable for you as a developer to be able to handle this situation.
NullPointerException
s are hard to debug in Spark, and we doing our best to make them as rare as possible.

withSpark function

We provide you with useful function

withSpark
, which accepts everything that may be needed to run Spark — properties, name, master location and so on. It also accepts a block of code to execute inside Spark context.

After work block ends,

spark.stop()
is called automatically.
withSpark {
    dsOf(1, 2)
            .map { it to it }
            .show()
}

dsOf
is just one more way to create
Dataset
(
Dataset
) from varargs.

withCached function

It can easily happen that we need to fork our computation to several paths. To compute things only once we should call

cache
method. However, it becomes difficult to control when we're using cached
Dataset
and when not. It is also easy to forget to unpersist cached data, which can break things unexpectedly or take up more memory than intended.

To solve these problems we've added

withCached
function
withSpark {
    dsOf(1, 2, 3, 4, 5)
            .map { it to (it + 2) }
            .withCached {
                showDS()

            filter { it.first % 2 == 0 }.showDS()
        }
        .map { c(it.first, it.second, (it.first + it.second) * 2) }
        .show()

}

Here we're showing cached

Dataset
for debugging purposes then filtering it. The
filter
method returns filtered
Dataset
and then the cached
Dataset
is being unpersisted, so we have more memory t o call the
map
method and collect the resulting
Dataset
.

toList and toArray methods

For more idiomatic Kotlin code we've added

toList
and
toArray
methods in this API. You can still use the
collect
method as in Scala API, however the result should be casted to
Array
. This is because
collect
returns a Scala array, which is not the same as Java/Kotlin one.

Column infix/operator functions

Similar to the Scala API for

Columns
, many of the operator functions could be ported over. For example: ```kotlin dataset.select( col("colA") + 5 ) dataset.select( col("colA") / col("colB") )

dataset.where( col("colA")

===
6 ) // or alternatively dataset.where( col("colA") eq 6) ```

In short, all supported operators are:

  • ==
    ,
  • !=
    ,
  • eq
    /
    `===`
    ,
  • neq
    /
    `=!=`
    ,
  • -col(...)
    ,
  • !col(...)
    ,
  • gt
    ,
  • lt
    ,
  • geq
    ,
  • leq
    ,
  • or
    ,
  • and
    /
    `&&`
    ,
  • +
    ,
  • -
    ,
  • *
    ,
  • /
    ,
  • %

Secondly, there are some quality of life additions as well:

In Kotlin, Ranges are often used to solve inclusive/exclusive situations for a range. So, you can now do:

kotlin
dataset.where( col("colA") inRangeOf 0..2 )

Also, for columns containing map- or array like types:

dataset.where( col("colB")[0] geq 5 )

Finally, thanks to Kotlin reflection, we can provide a type- and refactor safe way to create

TypedColumn
s and with those a new Dataset from pieces of another using the
selectTyped()
function, added to the API: ```kotlin val dataset: Dataset = ... val newDataset: Dataset> = dataset.selectTyped(col(YourClass::colA), col(YourClass::colB))

// Alternatively, for instance when working with a Dataset val typedDataset: Dataset> = otherDataset.selectTyped(col("a").

as
(), col("b").
as
()) ```

Overload resolution ambiguity

We had to implement the functions

reduceGroups
and
reduce
for Kotlin separately as
reduceGroupsK
and
reduceK
respectively, because otherwise it caused resolution ambiguity between Kotlin, Scala and Java APIs, which was quite hard to solve.

We have a special example of work with this function in the Groups example.

Examples

For more, check out examples module. To get up and running quickly, check out this tutorial.

Reporting issues/Support

Please use GitHub issues for filing feature requests and bug reports. You are also welcome to join kotlin-spark channel in the Kotlin Slack.

Code of Conduct

This project and the corresponding community is governed by the JetBrains Open Source and Community Code of Conduct. Please make sure you read it.

License

Kotlin for Apache Spark is licensed under the Apache 2.0 License.

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.