In a previous post, I demonstrated how to consume a Kafka topic using Spark in a resilient manner. The resiliency code was written in Scala. Now, I want to leverage that Scala code to connect Spark to Kafka in a PySpark application. We will see how we can call Scala code from Python code and what are the restrictions.
Basic method call through Py4J
PySpark relies on Py4J to execute Python code that can call objects that reside in the JVM. To do that, Py4J uses a gateway between the JVM and the Python interpreter, and PySpark sets it up for you.
Let’s see how we can make a basic method call. We first create a minimal Scala object with a single method:
package com.ippontech
object Hello {
def hello = println("hello")
}
We need to package this class in a JAR. I’m reusing the spark-kafka-source project from the previous post but any Maven/SBT/… project should work.
$ mvn package
...
Building jar: .../target/spark-kafka-source-0.2.0-SNAPSHOT.jar
We can now launch the PySpark console and add the JAR to the classpath:
$ pyspark --driver-class-path .../target/spark-kafka-source-0.2.0-SNAPSHOT.jar
>>>
From there, JVM objects are accessible through the _jvm
field of the SparkContext object (sc._jvm
):
>>> sc._jvm.com.ippontech.Hello.hello()
hello
Good, we can call Scala code from Python. That’s a good start.
Notice that we can also assign an instance of a JVM object to a Python variable, then make the method call on that variable:
>>> h = sc._jvm.com.ippontech.Hello
>>> h.hello()
hello
Real-life method call
Here is the method we want to call:
object KafkaSource extends LazyLogging {
def kafkaStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
(ssc: StreamingContext, brokers: String, offsetsStore: OffsetsStore, topic: String): InputDStream[(K, V)]
{
...
There are a few challenges here:
- This method has generic parameters. At that point, I’m not sure we can pass them from Python. We will need to create a helper method that doesn’t have generic arguments.
- The
StreamingContext
parameter and theInputDStream
return value are objects from the Spark framework. Notice that PySpark works with Python wrappers around the Java version of Spark objects, not around the Scala version of Spark objects. We will have to wrap/unwrap objects accordingly. - The
brokers
andtopic
parameters are strings. Python strings and Java strings are inter-changeable. - The
OffsetsStore
parameter is an object from our code base. We will have to create it on the Python side and simply pass the reference as a parameter.
Let’s create a helper class and helper method to lift the constraints described above:
object KafkaSourcePythonHelper {
def kafkaStream(jssc: JavaStreamingContext, brokers: String, offsetsStore: OffsetsStore,
topic: String): JavaDStream[(String, String)] = {
val dstream = KafkaSource.kafkaStream[String, String, StringDecoder, StringDecoder](jssc.ssc, brokers, offsetsStore, topic)
val jdstream = new JavaDStream(dstream)
jdstream
}
}
Here are the things to notice:
- We have removed the generic parameters from the helper method’s signature and hard-coded them when calling the actual method.
- We expect a
JavaStreamingContext
instance and unwrap it by callingjssc.ssc
to get theStreamingContext
instance. - We wrap the
DStream
into aJavaDStream
that will be returned to the Python caller.
We can now compile this code. Notice that, since our code uses dependent libraries, we need these dependencies to be included in the JAR. I’m using a profile and the Maven Shade plugin to do so:
$ mvn package -Puber-jar
We can now restart PySpark and test our code. We start by creating a StreamingContext
:
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
Before calling the kafkaStream
method, we need to create an instance of an OffsetStore
:
>>> zkStore = sc._jvm.com.ippontech.kafka.stores.ZooKeeperOffsetsStore("localhost:2181", "/my_topic/offsets")
Notice that the zkStore
instance is just a pointer to an object that resides in the JVM:
>>> zkStore
JavaObject id=o21
Now is time to call the kafkaStream
method:
>>> jstream = sc._jvm.com.ippontech.kafka.KafkaSourcePythonHelper.kafkaStream(ssc._jssc, "localhost:9092", zkStore, "my_topic")
Notice we unwrapped the Python StreamingContext
to get the JavaStreamingContext
by calling ssc._jssc
.
The last thing we need to do is to wrap the JavaDStream
into a Python DStream
. We have to provide a deserializer for the messages:
>>> from pyspark.serializers import PairDeserializer, NoOpSerializer
>>> from pyspark.streaming import DStream
>>> ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
>>> stream = DStream(jstream, ssc, ser)
We can now define our transformations and actions on the DStream and start the application:
>>> stream.pprint()
>>> ssc.start()
>>> -------------------------------------------
Time: 2016-09-01 15:21:38
-------------------------------------------
-------------------------------------------
Time: 2016-09-01 15:21:39
-------------------------------------------
...
Now, if you send some data into Kafka, you should see it in PySpark:
$ echo "foo bar" | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
-------------------------------------------
Time: 2016-09-01 15:26:18
-------------------------------------------
(None, 'foo bar')
Conclusion
We have seen it is fairly easy to call Scala code from Python. In a context where Data Scientists write Python code but Software Engineers prefer to write Java/Scala code, that’s a good thing because we can share the responsibilities within the team. That makes me happy because I’m not a big fan of shipping code written by Data Scientists to production, and that also means we will have less code duplication between languages.
In the context of Spark, the key thing to remember is to appropriately wrap/unwrap your objects. Spark has 3 versions of each class (Scala, Java, Python) and the Python classes are wrappers over the Java classes. This means:
- When passing a Spark object as a parameter to as Scala method: - On the Python side, unwrap the Python object to get the Java object (e.g. unwrap a Python
StreamingContext
into a JavaJavaStreamingContext
). - On the Scala side, unwrap the Java object to get the Scala object (e.g. unwrap a Java
JavaStreamingContext
into a ScalaStreamingContext
). - When returning a Spark object from the Scala method: - On the Scala side, wrap the object into the corresponding Java object (e.g. wrap a Scala
DStream
into a JavaJavaDStream
). - On the Python side, wrap the Java object into the correspondong Python object (e.g. wrap a Java
JavaDStream
into a PythonDStream
).