We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies.

We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies. Less

We use cookies and other tracking technologies... More

Login or register
to apply for this job!

Login or register
to save this job!

Login or register to start contributing with an article!

Login or register
to see more jobs from this company!

Login or register
to boost this post!

Show some love to the author of this blog by giving their post some rocket fuel 🚀.

Login or register to search for your ideal job!

Login or register to start working on this issue!

Engineers who find a new job through Functional Works average a 15% increase in salary 🚀

Blog hero image

IO & Logging Capabilities

Viktor Lövgren 25 September, 2018 | 13 min read

Using newtypes and type classes, we create a wrapper around IO with support for logging capabilities. This approach keeps log entries as values, enables log aggregation, and supports diagnostic contexts, while also making logging easily testable. For dispatching logs, any preferred library can be used.

Logging Challenges

When log entries aren't values, it's difficult to test logging. How would we test what gets output from println or logger.info? Hopefully we've defined a Logger[F] trait to enable testing and abstracting over the effect type F[_] (e.g. IO). Even with a trait in place, there are still a few things left to be desired.

  • Log entries are spread throughout the application. It's difficult to locate all of them. While this gives great flexibility, it can be painful to understand what and where an application is logging, especially when coming back to the code later on.
  • Logging code is not separated from other concerns. Since it's easy to put in an extra log expression, code for what and how entries get logged is often mixed with other concerns, instead of cleanly separating the different concerns.
  • Without log accumulation, it's tricky to understand how a request was processed. Most often, we'll need some identifier for the request, so we can later piece together all log entries for the request. This is especially true when we're processing several requests in parallel.
  • In tests, we're forced to assert on messages. It's a good idea to assert on what messages are dispatched, but we don't want to always assert on the message, since we'd then be forced to manually update several tests as we change the message.

The solution to these challenges is to model log entries as values. We'll also see how we can add explicit support for log accumulation, take full control over exactly when log messages are being dispatched, and make testing our logging easy.

Introducing MonadLog

The MonadLog type class, defined below, represents an immutable append-only log, where we can also replace the log with an empty one. Essentially, we can append log entries E with log, and then extract them again in context G[_] with extract. We can also clear all logs with clear. By combining extract and clear, we can write flush, which clears the logs after having dispatched them, using the provided dispatch function G[E] => F[Unit].

import cats.{Applicative, Monad, MonoidK}

trait MonadLog[F[_], G[_], E] {
  val monad: Monad[F]

  val applicative: Applicative[G]

  val monoidK: MonoidK[G]

  import monad.{as, flatMap}

  def log(e: E): F[Unit]

  def clear[A](fa: F[A]): F[A]

  def extract[A](fa: F[A]): F[(A, G[E])]

  def flush[A](fa: F[A])(f: G[E] => F[Unit]): F[A] =
    clear(flatMap(extract(fa)) { case (a, ge) => as(f(ge), a) })

If you're familiar with cats-mtl, log looks a bit like tell from FunctorTell, and extract reminds us of listen from FunctorListen. The main difference is that log is strictly an append operation as shown by the following laws. These laws are in addition to laws for Monad[F], Applicative[G], MonoidK[G].

import cats.laws.IsEqArrow
import cats.syntax.applicative._
import cats.syntax.apply._
import cats.syntax.functor._
import org.typelevel.discipline.Laws

trait MonadLogLaws[F[_], G[_], E] extends Laws {
  implicit val logInstance: MonadLog[F, G, E]
  implicit val monad: Monad[F] = logInstance.monad
  implicit val applicative: Applicative[G] = logInstance.applicative
  implicit val monoidK: MonoidK[G] = logInstance.monoidK

  import logInstance._
  private val G: MonoidK[G] = monoidK

  def clearRemovesLog(e: E) =
    clear(log(e)) <-> ().pure[F]

  // Same as FunctorListen#listenAddsNoEffects
  def extractAddsNoEffects[A](fa: F[A]) =
    extract(fa).map(_._1) <-> fa

  def extractRespectsClear[A](fa: F[A]) =
    extract(clear(fa)) <-> clear(fa).map((_, G.empty[E]))

  // Similar to FunctorListen#listenRespectsTell
  def extractRespectsLog(e: E) =
    extract(log(e)) <-> log(e).map((_, e.pure[G]))

  def extractRespectsLogs(e1: E, e2: E) =
    extract(log(e1) *> log(e2)) <-> (log(e1) *> log(e2))
      .map((_, G.combineK(e1.pure[G], e2.pure[G])))

As far as concrete types go, F[_] will be our IO wrapper type. MonadLog requires Applicative[G] and MonoidK[G], and we will have to require Foldable[G] to be able to write the dispatch function for flush. Ideally, G[_] should support constant time O(1) append and a single O(n) in-order fold. The recently added Chain in Cats does fit those requirements. The log entry type E is up to us to define, and we'll define it as LogEntry in the next section.

Defining Log Entries

We will model log entries with LogEntry, which consists of a LogLevel and a NonEmptyString message. Let's start by defining LogLevel. A log level (info, warning, error, ...), is essentially an ordered set of levels. We use a wrapper around a refinement type for Int to represent the log levels. Values correspond to the severity levels used by syslog and log management systems like Graylog. Note that we could have used Byte, but there is no support for Byte literals in Scala, so the definition would not be as concise.

import cats.Order
import cats.instances.int._
import cats.instances.order._
import cats.syntax.contravariant._
import eu.timepit.refined.W
import eu.timepit.refined.api.Refined
import eu.timepit.refined.auto._
import eu.timepit.refined.cats.refTypeOrder
import eu.timepit.refined.numeric.Interval

object levels {
  sealed abstract class LogLevel(val value: LogLevel.Value)

  object LogLevel {
    type Value = Int Refined Interval.Closed[W.`0`.T, W.`7`.T]

    case object Error extends LogLevel(3)
    case object Warning extends LogLevel(4)
    case object Info extends LogLevel(6)
    case object Debug extends LogLevel(7)

    implicit val logLevelOrder: Order[LogLevel] =

We can then define LogEntry as follows, together with any log entries we want. We've now effectively already solved the first logging challenge: log entries are now in a single place, and it should be rather easy to figure out where they're being used throughout the application.

import cats.syntax.show._
import cats.instances.string._
import eu.timepit.refined.cats.refTypeShow
import eu.timepit.refined.types.numeric.NonNegInt
import eu.timepit.refined.types.string.NonEmptyString
import levels._

object entries {
  sealed abstract class LogEntry {
    def level: LogLevel
    def message: NonEmptyString

  object LogEntry {
    case object ApplicationStarted extends LogEntry {
      override def level: LogLevel = LogLevel.Info
      override def message: NonEmptyString = "Application started"

    final case class ReceivedRequest(request: NonEmptyString) extends LogEntry {
      override def level: LogLevel = LogLevel.Info
      override def message: NonEmptyString =
        NonEmptyString.unsafeFrom(show"Received request: $request")

    final case class RequestStillFailing(retries: NonNegInt) extends LogEntry {
      override def level: LogLevel = LogLevel.Error
      override def message: NonEmptyString =
        NonEmptyString.unsafeFrom(show"Request still failing after $retries retries")

For log entries without parameters, we can use case objects, and for entries with parameters, we use case classes. Note that we can vary both the log level and message depending on the arguments. This bring us to the second logging challenge: logic relating to how and what gets logged is now encapsulated in the LogEntry, rather than mixed with other concerns.

We use the show interpolator to generate Strings with Show instances, rather than relying on toString, for extra compile-time safety. With String interpolation, we cannot rely on compile-time refinement, so we have to wrap our message in NonEmptyString.unsafeFrom. It's still clear the message is not empty.

Finally, since we're looking to support diagnostic contexts, we'll define MdcEntry for that purpose.

object mdc {
  sealed abstract class MdcEntry(
    val key: NonEmptyString,
    val value: NonEmptyString

  object MdcEntry {
    final case class TraceToken(override val value: NonEmptyString)
        extends MdcEntry("traceToken", value)

A Logging Algebra

Now that we've defined the relevant logging models, let's focus our attention on functions. We're going to define an algebra in final tagless style, but before doing so, we're going to need functions for determining the LogLevel and NonEmptyString message from log entries G[LogEntry]. We are going to have to require Foldable[G] below to implement these functions.

We define the log level of G[LogEntry] as the most critical level among the log entries. This means we want the lowest level, since level 3 is for Error and level 7 is for Debug. We have already defined Order[LogLevel] and can make use of it here.

import cats.Foldable
import cats.syntax.foldable._
import entries._

def logLevel[G[_]: Foldable](entries: G[LogEntry]): Option[LogLevel] =

For the log message of G[LogEntry], we have more options available, but will go with the following.

  • If G[LogEntry] is empty, there are no log entries, so return None.
  • If G[LogEntry] contains a single entry, return the message of that entry.
  • If G[LogEntry] has two or more entries, prepend - to the entries, and separate by newline.

Following is an example of a message for two log entries.

Received request: test
Request still failing after 3 retries

There are multiple ways to write such a function. One way, which uses a single fold, but likely is quite slow due to the heavy use of String concatenation, is the following. Depending on your use case, you might want to explore writing more efficient variants of this function. Note that we make use of Eval to avoid constructing the NonEmptyString in cases where we're trying to log at a level which has been disabled.

import cats.{Always, Eval, Semigroup}
import cats.syntax.option._
import cats.syntax.semigroup._

implicit val nonEmptyStringSemigroup: Semigroup[NonEmptyString] =
  Semigroup.instance((a, b) => NonEmptyString.unsafeFrom(a.value ++ b.value))

def logMessage[G[_]: Foldable](entries: G[LogEntry]): Option[Eval[NonEmptyString]] = {
  def entry(e: LogEntry): NonEmptyString =
    ("- ": NonEmptyString) combine e.message

  def append(s: NonEmptyString, e: LogEntry): NonEmptyString =
    s combine "\n" combine entry(e)

  def combine(a: LogEntry, b: LogEntry): NonEmptyString =
    append(entry(a), b)

    .foldLeft[Option[(Option[LogEntry], Eval[NonEmptyString])]](None) {
      case (None, a)               => (a.some -> Always(a.message)).some
      case (Some((Some(a), _)), b) => (none -> Always(combine(a, b))).some
      case (Some((None, a)), b)    => (none -> a.map(append(_, b))).some
    .map { case (_, message) => message }

We're now ready to define the Logging algebra as follows.

import mdc._

trait Logging[F[_], G[_]] {
  def log(entry: LogEntry): F[Unit]
  def logNow(entry: LogEntry, mdc: G[MdcEntry]): F[Unit]
  def dispatchLogs[A](fa: F[A], mdc: G[MdcEntry]): F[A]
  def extractLogs[A](fa: F[A]): F[(A, G[LogEntry])]

The log function accepts a LogEntry and stores it in a F[Unit]. The idea is that we compose several F[_] in our application, and accumulate the logs from included log expressions. At some point we'll call dispatchLogs, which accepts the diagnostic context, and dispatches the logs using flush on MonadLog. The logNow function logs the provided LogEntry, but immediately dispatches it with dispatchLogs, like how a normal log expression would work. Finally, we're including extractLogs as it can be useful to analyse logs, e.g. in order to derive certain metrics, or for testing purposes.

In practice, you might add more useful functions to Logging. For example, overloaded versions of the logNow and dispatchLogs functions which do not require you to pass an empty G[_] whenever you don't want a diagnostic context. We can easily add those functions by also requiring MonoidK[G]. Other candidates include logN and logNowN for logging multiple entries at once.

Let's take a look at how we could implement Logging. The main function of interest is dispatchLogs, which makes use of the logLevel and logMessage functions we wrote before. If there are no logs accumulated when dispatchLogs gets called, we'll simply not do anything. Assuming there are logs to dispatch, we dispatch them with the provided dispatch function. This function will finally log the accumulated logs with a logging library.

import cats.instances.option._

object Logging {
  def create[F[_], G[_]](dispatch: (Eval[NonEmptyString], G[MdcEntry], LogLevel) => F[Unit])(
    implicit F: MonadLog[F, G, LogEntry],
    G: Foldable[G]
  ): Logging[F, G] =
    new Logging[F, G] {
      override def log(entry: LogEntry): F[Unit] =

      override def logNow(entry: LogEntry, mdc: G[MdcEntry]): F[Unit] =
        dispatchLogs(log(entry), mdc)

      override def dispatchLogs[A](fa: F[A], mdc: G[MdcEntry]): F[A] =
        F.flush(fa) { entries =>
          (logMessage(entries), mdc.some, logLevel(entries))

      override def extractLogs[A](fa: F[A]): F[(A, G[LogEntry])] =

The IOLog Newtype

What remains to do before we can use our Logging algebra? We need to define the dispatch function with our logging library of choice, and we need to implement MonadLog for our effect type. We'll start by implementing MonadLog for a wrapper type around IO, since MonadLog cannot directly be implemented for IO. We'll call this wrapper type IOLog.

import cats.data.Chain
import cats.effect.IO

object effects {
  import IOLog.Context

  final case class IOLog[A, E](toIO: IO[Context[A, E]]) extends AnyVal

  object IOLog {
    final case class Context[A, E](logs: Chain[E], value: A)

There are a couple of things worth highlighting in the definition above.

  • IOLog is a wrapper around IO with a Context containing the accumulated logs. For simplicity, we're using extends AnyVal as the newtype here, but there are alternative newtype encodings without any kind of runtime overhead.
  • Having to wrap values with Context is the cost of achieving log accumulation. For simplicity, we're using a case class for Context, but we could use a more compact representation, like e.g. a tuple.

Notice that IO[Context[A, E]] looks like IO[(Chain[E], A)] without the case class. This happens to be the exact definition of WriterT. This means we can define IOLog using WriterT instead, and we have just saved ourselves having to write the newtype above.

import cats.data.WriterT

type IOLog[A, E] = WriterT[IO, Chain[E], A]

In our case, the log entry E in IOLog will be LogEntry, so we define a type alias AIO for it.

type AIO[A] = IOLog[A, LogEntry]

AIO can be thought of as 'Application IO' or 'All-In-One'.

Type Class Instances

To use IOLog as our F[_] in the application, we'll have to rely on the type classes in Cats Effect. Luckily, Cats Effect already provides type class instances for WriterT, so we only have to implement MonadLog for IOLog. We can go one step further and implement MonadLog for WriterT more generally.

implicit def writerTMonadLog[F[_], G[_], E](
  M: Monad[WriterT[F, G[E], ?]],
  F: Applicative[F],
  A: Applicative[G],
  K: MonoidK[G]
): MonadLog[WriterT[F, G[E], ?], G, E] =
  new MonadLog[WriterT[F, G[E], ?], G, E] {
    override val monad: Monad[WriterT[F, G[E], ?]] = M

    override val applicative: Applicative[G] = A

    override val monoidK: MonoidK[G] = K

    override def log(e: E): WriterT[F, G[E], Unit] =

    override def clear[A](wt: WriterT[F, G[E], A]): WriterT[F, G[E], A] =
      wt.mapWritten(_ => K.empty)

    override def extract[A](wt: WriterT[F, G[E], A]): WriterT[F, G[E], (A, G[E])] =
      wt.mapBoth((ge, a) => (ge, (a, ge)))

Be sure to check the MonadLog laws for the instance. Refer to the sources to see how it's done.

Dispatching Messages

We're now ready to create an instance of our Logging algebra. In our case, we'll make use of SL4J and Logback to define the dispatch function, but you're free to implement it with whatever logging library, or way of dispatching logs, you would like.

import cats.effect.Sync
import org.slf4j.{LoggerFactory, MDC}

def createLogger[F[_], G[_]](
  implicit F: Sync[F],
  M: MonadLog[F, G, LogEntry],
  G: Foldable[G]
): F[Logging[F, G]] =
  F.delay(LoggerFactory.getLogger("App")).map { logger =>
    Logging.create {
      case (message, mdc, level) =>
        val levelEnabled = level match {
          case LogLevel.Error   => logger.isErrorEnabled
          case LogLevel.Warning => logger.isWarnEnabled
          case LogLevel.Info    => logger.isInfoEnabled
          case LogLevel.Debug   => logger.isDebugEnabled

        if (levelEnabled) {
          val setContext =
            mdc.foldLeft(F.unit) { (ms, m) =>
              ms *> F.delay(MDC.put(m.key.value, m.value.value))

          val resetContext =
            mdc.foldLeft(F.unit) { (ms, m) =>
              ms *> F.delay(MDC.remove(m.key.value)).void

          F.bracket(setContext) { _ =>
            F.delay(level match {
              case LogLevel.Error   => logger.error(message.value.value)
              case LogLevel.Warning => logger.warn(message.value.value)
              case LogLevel.Info    => logger.info(message.value.value)
              case LogLevel.Debug   => logger.debug(message.value.value)
          }(_ => resetContext)
        } else F.unit

Putting It Together

We can now start to use our Logging algebra, either via functions, or as arguments to other algebras. In the example below, we've defined a Processing algebra which makes use of the Logging algebra to log some details about how the request was processed.

import cats.FlatMap
import cats.syntax.flatMap._
import LogEntry._

trait Processing[F[_]] {
  def processRequest(request: NonEmptyString): F[Unit]

object Processing {
  def create[F[_], G[_]](logging: Logging[F, G])(implicit F: FlatMap[F]): Processing[F] =
    new Processing[F] {
      import logging._

      override def processRequest(request: NonEmptyString): F[Unit] =
        for {
          _ <- log(ReceivedRequest(request))
          _ <- log(RequestStillFailing(3))
        } yield ()

Finally, we're putting everything together inside IOApp.

import cats.effect.{ExitCode, IOApp}

object App extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val noMdc: Chain[MdcEntry] = Chain.empty

    (for {
      logging <- createLogger[AIO, Chain]
      _ <- logging.logNow(ApplicationStarted, noMdc)
      processing = Processing.create(logging)
      mdc = Chain.one(MdcEntry.TraceToken("traceToken"))
      process = processing.processRequest("test")
      _ <- logging.dispatchLogs(process, mdc)
    } yield ExitCode.Success).value

The application outputs the following two log messages. The first one as Info, the second as Error.

Application started
Received request: test
Request still failing after 3 retries

We've finally accomplished log accumulation and the third logging challenge: log entries relating to a single request can be accumulated, and logged together as a single message with diagnostic context.

Testing Logging

The fourth logging challenge relates to testing: how can we avoid to always assert on messages? When log entries are modelled as values with LogEntrys, this becomes rather easy. We'll start by defining a test implementation of the Logging algebra, which simply stores all dispatched messages in a Ref, instead of dispatching log messages with a logging library.

import cats.effect.concurrent.Ref
import cats.SemigroupK
import cats.syntax.semigroupk._

object TestLogging {
  def create[F[_], G[_]](logs: Ref[F, G[(NonEmptyString, G[MdcEntry], LogLevel)]])(
    M: MonadLog[F, G, LogEntry],
    F: Foldable[G],
    S: SemigroupK[G],
    A: Applicative[G]
  ): Logging[F, G] =
    Logging.create {
      case (message, mdc, level) =>
        logs.modify(g => (g.combineK((message.value, mdc, level).pure[G]), ()))

We can then write tests for the Processing algebra using the test Logging implementation. Note that we can assert both on accumulated log entries and on dispatched log messages. We're free to assert on accumulated logs by comparing LogEntrys, instead of always having to assert on messages.

import cats.effect.ContextShift
import cats.effect.laws.util.TestContext
import cats.instances.tuple._
import eu.timepit.refined.cats.refTypeEq
import org.scalatest.{Assertion, AsyncFunSpec}

final class ProcessingSpec extends AsyncFunSpec with AIOTestInstances {
  test("should accumulate two log entries") {
    case (_, logging, processing) =>
      import logging._
      import processing._

      val request: NonEmptyString = "test"

      val expected =

        .map { case (_, logs) => assert(logs === expected) }

  test("should dispatch a single log message") {
    case (logs, logging, processing) =>
      import logging._
      import processing._

      val request: NonEmptyString = "test"

      val mdc = Chain.one(MdcEntry.TraceToken("traceToken"))

      val message =
        NonEmptyString.unsafeFrom {
                |- Received request: $request
                |- Request still failing after 3 retries

      val expected = Chain.one((message, mdc, LogLevel.Error))

      for {
        _ <- dispatchLogs(processRequest(request), mdc)
        dispatchedLogs <- logs.get
      } yield assert(dispatchedLogs === expected)

  type Dispatched = Chain[(NonEmptyString, Chain[MdcEntry], LogLevel)]

  type TestInput = (Ref[AIO, Dispatched], Logging[AIO, Chain], Processing[AIO])

  def test(name: String)(f: TestInput => AIO[Assertion]): Unit = {
    implicit val contextShift: ContextShift[IO] =

    it(name) {
      (for {
        logs <- Ref.of[AIO, Dispatched](Chain.empty)
        logging = TestLogging.create(logs)
        processing = Processing.create(logging)
        assertion <- f((logs, logging, processing))
      } yield assertion).value.unsafeToFuture()


There is one important limitation to the outlined logging approach.

  • When an error occurs, any accumulated log entries are lost. This is a consequence of keeping logs as values. If that's unacceptable for some logs, then resort to logNow for those entries. This means you might have to piece together the log entries later using an identifier in the diagnostic context.

There is also one technical limitation worth knowing about.

  • Logs captured in asyncF, and release of bracketCase (and bracket) will be discarded. (#371


The code shown throughout this post is also available on GitHub

Originally published on blog.vlovgr.se

Related Issues

WorksHub / client
  • Started
  • 0
  • 17
  • Intermediate
  • Clojure
  • $150
viebel / klipse-clj
viebel / klipse-clj
  • Open
  • 0
  • 0
  • Intermediate
  • Clojure
viebel / klipse
  • Open
  • 0
  • 0
  • Intermediate
  • Clojure
viebel / klipse
  • 1
  • 0
  • Intermediate
  • Clojure
viebel / klipse
  • Started
  • 0
  • 2
  • Intermediate
  • Clojure
  • $80
viebel / klipse
  • Open
  • 0
  • 0
  • Advanced
  • Clojure
  • $80
viebel / klipse
  • Started
  • 0
  • 2
  • Advanced
  • Clojure
  • $180
viebel / klipse
  • Started
  • 0
  • 1
  • Intermediate
  • Clojure
viebel / klipse
  • 1
  • 1
  • Advanced
  • Clojure
  • $300

Get hired!

Sign up now and apply for roles at companies that interest you.

Engineers who find a new job through Functional Works average a 15% increase in salary.

Start with GitHubStart with Stack OverflowStart with Email