How to run Apache Spark Streaming against Apache Kafka¶
The following guide is to set up Apache Spark for structured streaming with Apache Kafka.
As a pre-requisite, Juju has to be installed together with a kubernetes-based juju controller.
Setup¶
First, create a fresh Juju model to be used as a workspace for spark-streaming experiments:
juju add-model spark-streaming
Deploy the Apache ZooKeeper and the Apache Kafka k8s-charms. Single units should be enough.
juju deploy zookeeper-k8s --series=jammy --channel=edge
juju deploy kafka-k8s --series=jammy --channel=edge
juju integrate kafka-k8s zookeeper-k8s
Deploy a test producer application, to write messages to Charmed Apache Kafka:
juju deploy kafka-test-app --series=jammy --channel=edge --config role=producer --config topic_name=spark-streaming-store --config num_messages=1000
juju integrate kafka-test-app kafka-k8s
To consume these messages we need to establish a connection between Apache Spark and Apache Kafka, which requires credentials.
We need to deploy the data-integrator charm, which performs credential retrieval:
juju deploy data-integrator --series=jammy --channel=edge --config extra-user-roles=consumer,admin --config topic-name=spark-streaming-store
juju integrate data-integrator kafka-k8s
juju run-action data-integrator/0 get-credentials --wait
Note
We are using the service account set up in the previous examples.
We need to set up the environment in a Kubernetes pod launched in the same namespace as the Juju model (i.e. spark-streaming in this example).
The pod specification yaml goes as below:
apiVersion: v1
kind: Pod
metadata:
name: testpod
spec:
containers:
- image: ghcr.io/canonical/charmed-spark:3.5-22.04_stable
name: spark
ports:
- containerPort: 18080
command: ["sleep"]
args: ["3600"]
Note
Please make sure to use the correct tag for the version of Apache Spark that you’d like to use.
For instance, if you want to use Apache Spark 4.0, you should use the image ghcr.io/canonical/charmed-spark:4.0-22.04_stable instead.
Create the pod in the same namespace as the Juju model.
Launch a Bash shell inside the test pod.
kubectl apply -f ./testpod.yaml --namespace=spark-streaming
kubectl exec -it testpod -n spark-streaming -- /bin/bash
Create a Kubernetes cluster configuration within the test pod shell session to be able to work with spark-client.
Launch a pyspark shell to read the structured stream from Apache Kafka.
cd /home/spark
mkdir .kube
cat > .kube/config << EOF
<<KUBECONFIG CONTENTS>>
EOF
spark-client.service-account-registry create --username hello --namespace spark-streaming
spark-client.service-account-registry list
spark-client.pyspark --username hello --namespace spark-streaming --conf spark.executor.instances=1 --conf spark.jars.ivy=/tmp --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.8,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.8
Note
Please make sure to use the correct package for the version of Apache Spark (eg, 3.4.4, 3.5.8, 4.0.2) and Scala (eg, 2.12, 2.13) that you’d like to use.
Please refer to the Sonatype query page for spark-streaming-kafka and spark-sql-kafka for the list of different versions.
Within the pyspark shell, now use the credentials retrieved previously to read stream from Apache Kafka.
from pyspark.sql.functions import udf
from json import loads
username="relation-8"
password="iGvE6HrCru1vqEsUdgRTsZKlOLqbebMJ"
lines = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-k8s-0.kafka-k8s-endpoints:9092") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("kafka.security.protocol", "SASL_PLAINTEXT") \
.option("kafka.sasl.jaas.config", f'org.apache.kafka.common.security.scram.ScramLoginModule required username={username} password={password};') \
.option("subscribe", "spark-streaming-store") \
.option("includeHeaders", "true") \
.load()
get_origin = udf(lambda x: loads(x)["origin"])
count = lines.withColumn("origin", get_origin(col("value"))).select("origin")\
.groupBy("origin", "partition")\
.count()
count.awaitTermination()