We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies.

We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies. Less

We use cookies and other tracking technologies... More

Login or register
to apply for this job!

Login or register
to save this job!

Login or register to start contributing with an article!

Login or register
to see more jobs from this company!

Login or register
to boost this post!

Show some love to the author of this blog by giving their post some rocket fuel 🚀.

Login or register to search for your ideal job!

Login or register to start working on this issue!

Engineers who find a new job through Functional Works average a 15% increase in salary 🚀

Blog hero image

Currying callbacks to reach the next level

Ricardo Miranda 20 December, 2020 | 2 min read

When talking with people new to functional programming currying is the hardest concept to explain. Usually, newcomers are very skeptical and keep asking "why is currying relevant?".

In this article, I will show an example of currying a callback. A callback is a function passed as an argument to another function (a higher-order function).

A real-world example

I created a Kafka consumer that runs in its own thread. To process messages, the Kafka consumer receives a callback with a single argument: a Kafka message to be processed. This way my Kafka consumer abstracts how a message should be processed, postponing this decision to runtime.

Sometimes this callback function requires complex message processing, for example, sending a reply message to a Kafka topic. Let's refer to this complex processing requirement as messaging the processing environment. The environment description, name of the reply topic, and everything else is known at run time, reading a configuration file.

Currying to the rescue

The Kafka consumer problem requires heavy usage of Currying. I find this solution aesthetically appealing and I would like to know your opinion.

First I define a Kafka message with headers and a payload (Kafka's value):

/** Abstract message to be sent using a messaging system, for instance, Pub/Sub or Kafka 
 * attributes are message metadata, payload is the message content
 */
case class Message(attributes: Map[String, String], payload: String)

Here is the Kafka consumer code:

import java.time.Duration
import java.util.{Arrays, Properties}

import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}

import scala.collection.JavaConverters._

object KafkaSubscriber extends StrictLogging {

  /** Creates Properties for the Kafka consumer
   *
   * @param bootstrap_servers        Zookeepers servers
   * @param group_id                 Group ID to which the consumer belongs to
   * @param key_deserializer_class   Key deserialization
   * @param value_deserializer_class Payload deserialization
   * @return Properties for the Kafka consumer
   */
  def kafkaConsumerSettings(bootstrap_servers: String,
                            group_id: String,
                            key_deserializer_class: String,
                            value_deserializer_class: String
                           ): Properties = {

    val properties: Properties = new Properties()

    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group_id)
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class)
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class)

    properties
  }

  /** Subscribe to a topic consuming it with a callback.
   *
   * @param consumerSettings Properties for the consumer
   * @param topic            Topic to read from
   * @param callback         Function that performs an action over the message
   */
  def subscribe(consumerSettings: Properties, topic: String)(callback: Message => Unit): Unit = {

    logger.info(s"Starting consumer on Kafka topic: ${topic}.")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerSettings)
    consumer.subscribe(Arrays.asList(topic))

    try {
      while (true) {
        val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(100))

        records.asScala.foreach(rec => {
          val attributes: Map[String, String] =
            rec
              .headers()
              .toArray
              .map(x => Tuple2(x.key, new String(x.value)))
              .toMap
          val payload: String = rec.value()
          callback(Message(attributes = attributes, payload = payload))
        })
      }
    } finally {
      consumer.close()
    }
  }
}

Let us look at the signature of a function to be sent as a callback to the above subscribe function. This callback sends an environment for the Kafka consumer:

/** Callback function to be passed to the Kafka consumer.
 *
 * @param environment This environment in real-world use case would be a Kafka producer
 *                    or any other complex processing to do in the message consumption.
 * @param message     The Message to be processed.
 */
def processMessage(environment: String) (message: Message): Unit = {
  ...
}

One possible use of the processMessage message is:

subscribe(consumerSettings = kafkaConsumerSettings, 
          topic = "example_topic")
         (callback: processMessage(environment = "Hello World"))

Final remarks

I hope this article has shown the power of currying with higher-order functions. This technique provides the programmer with a clean and easy way to postpone decisions to run time.

Originally published on codeaddition.com

    Haskell
    Scala
    OCaml
    Erlang
    Clojure

Related Issues

WorksHub / client
  • Started
  • 0
  • 17
  • Intermediate
  • Clojure
  • $150
viebel / klipse-clj
viebel / klipse-clj
  • Open
  • 0
  • 0
  • Intermediate
  • Clojure
viebel / klipse
  • Open
  • 0
  • 0
  • Intermediate
  • Clojure
viebel / klipse
  • 1
  • 0
  • Intermediate
  • Clojure
viebel / klipse
  • Started
  • 0
  • 2
  • Intermediate
  • Clojure
  • $80
viebel / klipse
  • Open
  • 0
  • 0
  • Advanced
  • Clojure
  • $80
viebel / klipse
  • Started
  • 0
  • 2
  • Advanced
  • Clojure
  • $180
viebel / klipse
  • Started
  • 0
  • 1
  • Intermediate
  • Clojure
viebel / klipse
  • 1
  • 1
  • Advanced
  • Clojure
  • $300

Get hired!

Sign up now and apply for roles at companies that interest you.

Engineers who find a new job through Functional Works average a 15% increase in salary.

Start with GitHubStart with Stack OverflowStart with Email