We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies.

We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies. Less

We use cookies and other tracking technologies... More

Login or register
to apply for this job!

Login or register to start contributing with an article!

Login or register
to see more jobs from this company!

Login or register
to boost this post!

Show some love to the author of this blog by giving their post some rocket fuel 🚀.

Login or register to search for your ideal job!

Login or register to start working on this issue!

Engineers who find a new job through Functional Works average a 15% increase in salary 🚀

Blog hero image

Writing a simple Telegram bot with tagless final, http4s and fs2

Pavkin Vladimir 26 June, 2018 | 12 min read

Hello all!

Recently I’ve started diving in fs2 and http4s. They looked so awesome, that to properly introduce myself I decided to implement something interesting. A bot for Telegram messenger seemed like a nice idea. So here’s a small tutorial on how to implement a bot in a purely functional way using tagless final encoding, fs2 for streaming and http4s client to talk to telegram API.

Most of the code snippets here have comments, that explain in more detail what’s going on. I encourage the reader to not skip them — many small details are not covered in the text. Otherwise it would be twice as large.

Disclaimer 1: There are feature rich telegram bot libraries out there. If you need something to be done quickly and you don’t mind bringing akka to your dependencies — you’ll be better off with those solutions.

Disclaimer 2: I’m in no way an expert in both fs2 or http4s, so if you find a more optimal way to do something presented here — please, leave me a comment! 🙂

“Gimme da code!”. Here’s the github repo.

So what are we going to build?

A todo-list bot. To not overload the tutorial with specifics of Telegram Bot API, we’ll make it very simple:

  • It will keep a separate todo-list for each chat (either personal or group one)
  • Bot will be “long polling” the messages that were sent to it. I selected long polling, because it’s simpler to implement and, more importantly, it works in development mode.
  • The interface will look like this:
  • /show command will make the bot answer with the current todo-list.
  • /clear command will erase all the tasks for this chat
  • any other message will be interpreted as an instruction to add a new todo-list item(with message content used as list item content)

So in the end interaction with the bot should look like this:

telegram1.png

Let’s proceed to designing the algebras.

Designing Algebras

Despite the simplicity of the bot, there’s quite a bunch of algebras we’re going to operate with:

  • A simplified Telegram Bot API algebra, that will only contain the requests we need
  • Some kind of storage algebra, that will handle the storage of our todo-lists
  • Logger algebra for purely functional logging
  • Higher-level todo-list bot algebra, that will be composed out of other algebras above

Logger algebra

We’ll start with the simplest one. Logging is quite a common task, and there are projects already that provide logging algebras out of the box. We’ll use log4cats.

For those who are wondering, a simplified example of Logger algebra would look like this:

trait Logger[F[_]]{

  def error(message: => String): F[Unit]
  def error(t: Throwable)(message: => String): F[Unit]
  def warn (message: => String): F[Unit]
  def info (message: => String): F[Unit]
  def debug(message: => String): F[Unit]
  def trace(message: => String): F[Unit]
}

Log4cats provides several implementations, we’ll pick slf4j.

Storage algebra

Storage algebra should also be quite simple to digest:

/**
  * Algebra for managing storage of todo-list items
  */
trait TodoListStorage[F[_]] {
  def addItem(  chatId: ChatId, item: Item): F[Unit]
  def getItems( chatId: ChatId): F[List[Item]]
  def clearList(chatId: ChatId): F[Unit]
}

Basically, for each particular chat, it allows us to add items, query the whole list and erase it.

In a real world app, this algebra would be interpreted into some database storage service. But for our purposes an in-memory implementation will be enough.

For purely functional and asynchronous concurrent access to shared state, we’ll use fs2.async.Ref. We could use Ref from cats-effect 1.0.0-RC2, but at the moment of writing it was not possible due to http4s depending on incompatible version of cats-effect.

Ref will store a Map[ChatId, List[Item]], which is essentially what we want to store. Here’s the implementation:

/**
  * Simple in-memory implementation of [[TodoListStorage]] algebra, using [[Ref]].
  * In real world this would go to some database of sort.
  */
class InMemoryTodoListStorage[F[_] : Functor](
  private val ref: Ref[F, Map[ChatId, List[Item]]]) extends TodoListStorage[F] {

  def addItem(chatId: ChatId, item: Item): F[Unit] =
    ref.modify(m => m.updated(chatId, item :: m.getOrElse(chatId, Nil))).void

  def getItems(chatId: ChatId): F[List[Item]] =
    ref.get.map(_.getOrElse(chatId, Nil))

  def clearList(chatId: ChatId): F[Unit] =
    ref.modify(_ - chatId).void
}

Telegram Bot API

Telegram Bot API algebra will look this way:

/**
  * Simplified bot api algebra that exposes only APIs required for this project
  */
trait BotAPI[F[_], S[_]] {
  /**
    * Send a message to specified chat
    */
  def sendMessage(chatId: ChatId, message: String): F[Unit]
 
  /**
    * Stream all updated for this bot using long polling.
    *
    * @param fromOffset offset of the fist message to start polling from
    */
  def pollUpdates(fromOffset: Offset): S[BotUpdate]
}

Yep, for this bot we’ll only need to poll incoming messages and post responses.

Notice, that I introduced a separate effect S[_] for streamed result. The idea came from this recent article. While in this particular example we’ll only use fs2.Stream[F, ?] as the streaming effect, it makes sense to abstract it out anyway. As far as I understand, abstracting over streaming effects in Scala is an area of research, so in future we may find out a better way to work with streams in tagless final encoding.

Here goes one of the most exciting parts of this post. Let’s implement bot API algebra!

First let’s see what it is constructed from. There’s a comment for each constructor parameter.

trait StreamingBotAPI[F[_]] extends BotAPI[F, Stream[F, ?]]

class Http4SBotAPI[F[_]](
  token: String,      // bot auth token, received from @BotFather
  client: Client[F],  // http4s HTTP client
  logger: Logger[F])( // we'll need to log errors
  implicit
  F: Sync[F],         // F has to be able to suspend effects
                      // and we need a response body decoder for http4s client to pick up
  D: EntityDecoder[F, BotResponse[List[BotUpdate]]]) extends StreamingBotAPI[F] {

  def sendMessage(chatId: ChatId, message: String): F[Unit] = ???
  def pollUpdates(fromOffset: Offset): Stream[F, BotUpdate] = ???
  
}

Now let’s implement the logic using this toolkit. sendMessage is a simple one, so we’ll start with it. It’s going to call the endpoint with the same name:

private val botApiUri: Uri = Uri.uri("https://api.telegram.org") / s"bot$token"

def sendMessage(chatId: ChatId, message: String): F[Unit] = {

  // safely build a uri to query
  val uri = botApiUri / "sendMessage" =? Map(
    "chat_id" -> Seq(chatId.toString),
    "parse_mode" -> Seq("Markdown"),
    "text" -> Seq(message)
  )

  client.expect[Unit](uri) // run the http request and ignore the result body.
}

Here we’re using query parameters flavour of the API, which is not very convenient in general. For production bots there are other ways, including JSON.

Also, we don’t care about result body here, so we pass Unit type to the client.expect call.

Now to the juice — streaming updates.

Telegram Bot API — stream updates

In plain words, what we’re going to do is to repeatedly call the getUpdates endpoint. Since getUpdates returns a list of updates, we’ll also have to flatten the result of each call. Finally, our stream has to be aware of the last requested offset — we don’t want to receive any duplicates.

So let’s begin step by step.

repeatedly call

Here we create a stream of single Unit value, repeat it and lift into our effect F using covary.

def pollUpdates(fromOffset: Offset): Stream[F, BotUpdate] =
  Stream(()).repeat.covary[F]
  // ...

Here we don’t add any delay between stream elements, because we’ll instead rely on long polling timeout for throttling, which is recommended in the API docs. Just in case you wonder — throttling an fs2 stream is very simple.

Next Step.

call the getUpdates endpoint

and

be aware of the last requested offset

This feels like a stateful stream stage. Moreover, it also requires to run side-effects to obtain the new state — new offset can only be obtained from the pack of fresh updates.

Let’s dive a little deeper here. For pure stateless mapping fs2 defines a map function on streams:

def map[O2](f: O => O2): Stream[F, O2]

Then, if we need to run an effectful mapping, we use evalMap (notice the mapping function is now a Kleisli-like effectful function)

 def evalMap[O2](f: O => F[O2]): Stream[F, O2]

If we want to go from a simple map to a stateful map, there’s mapAccumulate. Now you need to specify initial state and a way to obtain the new state after mapping each element:

def mapAccumulate[S, O2](init: S)(f: (S, O) => (S, O2)): Stream[F, (S, O2)]

Notice also, that the resulting stream for each incoming element emits both the stream state after the mapping and the mapped element itself.

Probably you see where it’s going — we need both! Each of our polls needs the latest offset. But to update the offset for the next request we have to execute the poll! So we want something like mapAccumulate, but the calculation step has to be effectful.

And there’s such combinator, it’s (quite expectedly) called evalMapAccumulate:

def evalMapAccumulate[S, O2](s: S)(f: (S, O) => F[(S, O2)]): Stream[F, (S, O2)]

Now we’re ready to proceed with our stream of updates:

def pollUpdates(fromOffset: Offset): Stream[F, BotUpdate] =
    Stream(()).repeat.covary[F]
      // initial state is the fromOffsetParameter
      // requestUpdates returns an F[(Offset, BotResponse[List[BotUpdate]])]
      .evalMapAccumulate(fromOffset) { case (offset, _) => requestUpdates(offset) }
      // ...

requestUpdates stage has to do 2 things: get the new messages from the Telegram Bot API and calculate the new offset:


private def requestUpdates(offset: Offset): F[(Offset, BotResponse[List[BotUpdate]])] = {

  val uri = botApiUri / "getUpdates" =? Map(
    "offset" -> Seq((offset + 1).toString),
    "timeout" -> Seq("0.5"), // timeout to throttle the polling
    "allowed_updates" -> Seq("""["message"]""")
  )

  // this is where we use the implicit decoder parameter, passed to the algebra
  client.expect[BotResponse[List[BotUpdate]]](uri)
    .map(response => (lastOffset(response).getOrElse(offset), response))
    .recoverWith {
      // in case of a failed request, we just log the error and recover the stage.
      // if it was some glitch, then one of the next updates eventually succeeds.
      case ex => logger.error(ex)("Failed to poll updates").as(offset -> BotResponse(ok = true, Nil))
    }
}

// just get the maximum id out of all received updates
private def lastOffset(response: BotResponse[List[BotUpdate]]): Option[Offset] =
  response.result match {
    case Nil => None
    case nonEmpty => Some(nonEmpty.maxBy(_.update_id).update_id)
  }

Good! One last step remains.

flatten the result

Each response contains a list of updates. For a better API user experience we’d like to have each update as a standalone stream element. This is quite simple to achieve with flatMap:

def pollUpdates(fromOffset: Offset): Stream[F, BotUpdate] =
  Stream(()).repeat.covary[F]
    .evalMapAccumulate(fromOffset) { case (offset, _) => requestUpdates(offset) }
    .flatMap { case (_, response) => Stream.emits(response.result) }

Basically, we transform each polled bunch of updates into a separate stream of updates (using Stream.emits), and concatenate them together into a single flattened stream.

And that’s it. Types line-up, our stream of updates is ready to be processed by some domain specific algebras.

Todo-list bot logic

Now that we have all the lower level machinery in place, let’s develop the business logic.

First of all, let’s define all the possible inputs to our bot. We’ll also need a way to convert raw incoming messages into domain specific commands:

sealed trait BotCommand

object BotCommand {

  case class ShowHelp(chatId: ChatId) extends BotCommand
  case class ClearTodoList(chatId: ChatId) extends BotCommand
  case class ShowTodoList(chatId: ChatId) extends BotCommand
  case class AddEntry(chatId: ChatId, content: String) extends BotCommand

  def fromRawMessage(chatId: ChatId, message: String): BotCommand = message match {
    case `help` | "/start" => ShowHelp(chatId)
    case `show` => ShowTodoList(chatId)
    case `clear` => ClearTodoList(chatId)
    case _ => AddEntry(chatId, message)
  }

  val help = "?"
  val show = "/show"
  val clear = "/clear"
}

And now we have everything we need to define todo-list bot algebra:

class TodoListBot[F[_]](
  api: StreamingBotAPI[F],     // lower level bot API algebra
  storage: TodoListStorage[F], // storage algebra to persist todo-lists
  logger: Logger[F])(
  implicit F: Sync[F]) {

  /**
    * Launches the bot process.
    * Launched process periodically polls incoming commands
    * and processes them using todo-list storage algebra.
    */
  def launch: Stream[F, Unit] = pollCommands.evalMap(handleCommand)
  
}

This is a higher level overview. The only interaction this algebra provides is to launch the bot process. The process itself is a simple stream that polls the updates (using the lower level bot API) and handles each update with an effect — hence the evalMap we’re already familiar with.

Let’s take a look at both stages.

pollCommands is quite simple: we start long polling from zero offset and map all non-empty messages into domain commands. Conversion involves some hoops since message.text is an option.

private def pollCommands: Stream[F, BotCommand] = for {
  update <- api.pollUpdates(0)
  chatIdAndMessage <- Stream.emits(update.message.flatMap(a => a.text.map(a.chat.id -> _)).toSeq)
} yield BotCommand.fromRawMessage(chatIdAndMessage._1, chatIdAndMessage._2)

Also, in a real world implementation after each processed command we’d persist the offset somewhere, so that when bot is restarted, that offset would be used instead of zero. But, actually, it’s not that bad — once getUpdates was called with some non-zero offset, long polling API marks all the commands before that offset as read, and they are no longer served, even if you then call the API with zero offset.

handleCommand is the place where we’re going to invoke our storage algebra. This one should be pretty clear, despite the verboseness:

private def handleCommand(command: BotCommand): F[Unit] = command match {
  case c: ClearTodoList => clearTodoList(c.chatId)
  case c: ShowTodoList => showTodoList(c.chatId)
  case c: AddEntry => addItem(c.chatId, c.content)
  case c: ShowHelp => api.sendMessage(c.chatId, List(
    "This bot stores your todo-list. Just write a task and the bot will store it! Other commands:",
    s"`$help` - show this help message",
    s"`$show` - show current todo-list",
    s"`$clear` - clear current list (vacation!)",
  ).mkString("\n"))
}

private def clearTodoList(chatId: ChatId): F[Unit] = for {
  _ <- storage.clearList(chatId)
  _ <- logger.info(s"todo list cleared for chat $chatId") *> api.sendMessage(chatId, "Your todo-list was cleared!")
} yield ()

private def showTodoList(chatId: ChatId): F[Unit] = for {
  items <- storage.getItems(chatId)
  _ <- logger.info(s"todo list queried for chat $chatId") *> api.sendMessage(chatId,
    if (items.isEmpty) "You have no tasks planned!"
    else ("Your todo-list:" :: "" :: items.map(" - " + _)).mkString("\n"))
} yield ()

private def addItem(chatId: ChatId, item: Item): F[Unit] = for {
  _ <- storage.addItem(chatId, item)
  response <- F.suspend(F.catchNonFatal(Random.shuffle(List("Ok!", "Sure!", "Noted", "Certainly!")).head))
  _ <- logger.info(s"entry added for chat $chatId") *> api.sendMessage(chatId, response)
} yield ()

All commands are handled in a similar fashion: update the storage and trace the change to the log.

There’s one small caveat with addItem. I want to make the bot a little less boring than a stone, so why won’t it have several different answers to choose from when an item is added? Such cases are when we should carefully track and suspend side-effects. Random number generation is a side-effect, so we use the Sync[F] to suspend it until the “end of the universe”. Also, List.head is unsafe, so we use F.catchNonFatal to lift the error into F. It’s possible, because Sync[F] extends MonadError[Throwable, F]:

F.suspend(F.catchNonFatal(Random.shuffle(List("Ok!", "Sure!", "Noted", "Certainly!")).head))

Wiring up and “launching” the bot process

Now we have everything to construct our bot.

import io.circe.generic.auto._
import org.http4s.circe._

class TodoListBotProcess[F[_]](
  token: String)(
  implicit F: Effect[F]) {

  // derive the streaming decoder in F using circe decoder
  implicit val decoder: EntityDecoder[F, BotResponse[List[BotUpdate]]] = jsonOf[F, BotResponse[List[BotUpdate]]]

  /**
    * Wires up everything that is needed to launch a [[TodoListBot]] and launches it.
    */
  def run: Stream[F, Unit] = Http1Client.stream[F]().flatMap { client =>
    
    // We use Http1Client.stream so that client is safely released after the stream completes or crashes.
    
    val streamF = for {
      logger <- Slf4jLogger.create[F]  // creation of logger is a side-effect
      storage <- Ref(Map.empty[ChatId, List[Item]]).map(new InMemoryTodoListStorage(_)) // creation of a Ref is also a side-effect
      botAPI <- F.delay(new Http4SBotAPI(token, client, logger)) // create the telegram bot API...
      todoListBot <- F.delay(new TodoListBot(botAPI, storage, logger)) // ... and the bot itself
    } yield todoListBot.launch 

    Stream.force(streamF)
  }
}

Quite a lot going on here:

  • We derive the JSON decoder for API response using auto derivation from circe generic
  • Http client is created safely using Http1Client.stream. This produces a stream of a single element, and guarantees that all allocated resources (connection pool) are released when the stream either finishes or crashes.
  • We flatMap the http client stream into our todo-list bot process stream. To create it, we cook all the ingredients. Some of these “cooking” steps are also side-effectful: for example the Logger interpreter or the Ref instance for our storage algebra.
  • Stream.force in the end just allows to go from F[Stream[F, A] to a Stream[F, A] that we’d like to return

End of the universe

We can be really proud of ourselves — we managed to develop a nice useful computation without even specifying our effect type, let alone executing any side-effects! That’s cool.

Now we’re ready to pull the trigger: specify our effect type and actually run the computation. We’ll use fs2.StreamApp[IO] helper so that we don’t even have to write “unsafe” with our own hands.

object App extends StreamApp[IO] {

  def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, StreamApp.ExitCode] = for {
    token <- Stream.eval(IO(System.getenv("TODOLIST_BOT_TOKEN")))
    exitCode <- new TodoListBotProcess[IO](token).run.last.map(_ => StreamApp.ExitCode.Success)
  } yield exitCode
}

To test the bot out, you’ll have to create a one for yourself. It’s a no-brainer, just follow the instructions here. Once you get a token, just plug it in and you’re ready to go!

To not expose my test bot token, app grabs it from TODOLIST_BOT_TOKEN env variable. If that doesn’t fit you, just put your token directly into the TodoListBotProcess constructor instead.

Conclusion

So it turns out that you don’t need akka or other complex frameworks to build a Telegram Bot. Moreover, you can do it in a purely functional way, and it actually looks beautiful! And as a bonus — the process of writing a tagless final program is a real joy, I definitely recommend to try it out 🙂

Some ideas to implement as an exercise for interested readers:

  • Allow to edit items (through editing original messages in the chat)
  • Wrap the bot with an administration http api, to send announcements for example

That’s all I have this time, thanks for reading!

Originally published on pavkin.ru

Author's avatar
Pavkin Vladimir
Scala Developer @ Evolution Gaming
    Scala
    Functional Programming
    Haskell
    Scala.js

Related Jobs

Related Issues

WorksHub / client
  • Open
  • 0
  • 0
  • Intermediate
  • Clojure
  • $50
WorksHub / client
WorksHub / client
  • Started
  • 0
  • 4
  • Intermediate
  • Clojure
  • $100
WorksHub / client
  • 1
  • 0
  • Intermediate
  • Clojure
WorksHub / client
  • 1
  • 0
  • Intermediate
  • Clojure
WorksHub / client
  • 1
  • 0
  • Intermediate
  • Clojure
WorksHub / client
  • Open
  • 0
  • 0
  • Intermediate
  • Clojure
cosmwasm / wasmd
  • 1
  • 2
  • Intermediate
  • Go
cosmwasm / wasmd
  • Started
  • 0
  • 1
  • Intermediate
  • Go
cosmwasm / wasmd
  • Started
  • 0
  • 1
  • Intermediate
  • Go

Get hired!

Sign up now and apply for roles at companies that interest you.

Engineers who find a new job through Functional Works average a 15% increase in salary.

Get Started with