Stream processing is quite a popular topic nowadays. Companies deal with thousands of events which need to be processed in real time or near-real time. Businesses require to analyze customer behaviour, transactions, stock price changes or even self-driving car sensor readings. Today, however, we would like to focus on Complex Event Processing.

What is Complex Event Processing (CEP)

Event streams can be processed in different manners. In simplest situations it is sufficient to analyze single events independently. If value from the temperature sensor is higher than 50°C, then send an alert. In more advanced cases, groups or windows of events are analyzed together, e.g. if a 5-minute average of temperature sensor readings is higher than 50°C, then send an alert. This allows to filter outliers and false readings. CEP, however, covers even more complex cases, where streams are analyzed in search of specific patterns or trends in data. For example, we look at 5 temperature readings with growing values, exceeding target limit, plus at the same time smoke sensor turns on.

CEP allows us to define various rules. We may try to find events of type A followed by type B. We may also want to find event A, without event B. There may be requirements related to order, time, numbers. Some of them may be relaxed, meaning that there might be other types of events occurring between them. This may sound simple for a human, but when dealing with Big Data it starts to become a major computational challenge. What is more, the outcome of the algorithm often needs to be received instantaneously in order to save people's lives.

In the era of Industry 4.0, complex analysis does not need to be performed only in big data centers. We have noticed increasing trends around the terms of edge AI and edge analytics. Specialized IoT microchips are deployed together with sensors, so that analysis can be performed locally, omitting latencies related to data transfer to the cloud. This way, appropriate action can be taken even faster.

Read also: “What’s the Difference Between ESP and CEP?” – article by David Luckham about ESP and CEP history and potential future.

Complex Event Processing Use Cases

Which use case are the most appropriate for CEP? Well, everything where we look for specific patterns in data streams. To explain more, let's look at a few examples.

Algorithm stock trading

In the financial world, technical analysis is leveraged to forecast stock prices directions. There is a lot of information which can be used for it, but the most important ones concern transaction prices and volumes. There are a lot of patterns based just on those two metrics. Various triangles, wedges, head and shoulders, double top/bottom and others. Some of them actually look quite simple, but not everyone has time to manually track graphs from the whole stock market. There are also some more complicated ones, like those related to the Wyckoff Method. CEP is used to create systems supporting the stock trading, but also for Algorithmic trading systems.

Fraud detection

The most common topic in this area is credit card frauds. Banks try to detect if your card details could have been stolen so that they don’t lose money. What can be included in suspicious patterns? For example, late hours of transactions or locations in various parts of the world. If there is any doubt, customers will get contacted in order to confirm the transactions.

SEB bank in Estonia leverages Apama Complex Event Processing for payment frauds detection.

Systems security

Security is a difficult topic and concerns various organization areas. Available solutions focus mainly on anomaly or intrusion detection and network monitoring. Among sources of information, the most important ones are network logs, which can be analyzed for suspicious patterns.

Further reading: Application of the Complex Event Processing system for anomaly detection and network monitoring

Predictive maintenance

Unexpected hardware failures are always costly. They result in downtimes and production schedule complications. How to avoid them? Some techniques are based just on failure statistics of specific models. However those are just statistics, which will never be 100% accurate. The more modern way is to monitor devices and gather readings from various sensors. Patterns in data may indicate that equipment will soon fail.

Other use cases

We have described just a few specific areas of use cases, however more of them are possible. Complex Event Processing is mentioned also in the context of buisness intelligence, weather reports, business activity monitoring, click-stream analysis or autonomous vehicles. The scope is quite broad and in the trend of growing IoT for Industry and 5G, more and more data will need to be analyzed for complex scenarios.

Technologies used for Complex Event Processing

Due to a broad range of domains that can be analyzed using Complex Event Processing, there is actually a big scope of products and tools implementing this technique. Some of them are specialized for specific use cases, some of them are available only in paid products. Every major player has a product related to CEP – e.g. Azure Stream Analytics, Microsoft StreamInsight, SAP ESP, TIBCO BusinessEvents & Streambase. Let’s take a deeper look into one promising Open Source solution, Apache Flink.

Apache Flink

Apache Flink is a tool leveraged both for batch and stream processing. It is often compared to Apache Spark as it offers quite similar features. There is, however, one major difference from CEP perspective – separate module and DSL for Complex event processing.

Let’s say we want to send alert after at least three events with temperature higher than 50°C, followed by smoke.

First we need to define event type which will represent messages processed by our system.

  sealed trait Event

  case class TemperatureEvent(value: Double) extends Event
  case class SmokeEvent() extends Event
  case class LightEvent(value: Double) extends Event

The given code (in Scala) declares event trait and three possible types of events – one for temperature, one for smoke and one additional one - LightEvent to introduce some noise in the event stream.

Then let’s define the most important part – the pattern:

  val pattern = Pattern.begin[Event]("temperature").subtype(classOf[TemperatureEvent]).where(_.value >= 50.0).timesOrMore(3)
    .followedBy("smoke").subtype(classOf[SmokeEvent])

The pattern starts with at least three events of type TemperatureEvent followed by a SmokeEvent. followedBy indicates that other events may happen in between.

The last part is related to running and reporting detected patterns:

 val env = StreamExecutionEnvironment.createLocalEnvironment()

 val patternStream = CEP.pattern(input, pattern)

  case class Alert(message: String) // just for presenting results

  val result: DataStream[Alert] = patternStream.process(
    new PatternProcessFunction[Event, Alert]() {
      override def processMatch(
                                 `match`: util.Map[String, util.List[Event]],
                                 ctx: PatternProcessFunction.Context,
                                 out: Collector[Alert]): Unit = {
        out.collect(Alert(`match`.toString)) // match map converted to string
      }
    })

  result.print()  // let’s display the results

  env.execute() // run the flow

I said that this was the last part, but we have forgotten about the input! Let’s say it will be:

  val input: DataStream[Event] = env.fromElements(
    TemperatureEvent(49.0),
    TemperatureEvent(51.0),
    LightEvent(100),
    TemperatureEvent(52.0),
    TemperatureEvent(53.0),
    LightEvent(125),
    SmokeEvent(),
    LightEvent(135)
  )

Result?

Alert({
    temperature=[TemperatureEvent(51.0), TemperatureEvent(52.0), TemperatureEvent(53.0)],
    smoke=[SmokeEvent()]
})

Everything worked as expected and the alert was reported. However, the presented example is not perfect. It can be improved by adding timestamps to actual events together with a maximum time interval within which the input sequence has to match the pattern. Current code is not limited by any time frames. If you’d like to learn more about FlinkCEP, take a look at its documentation.

You may also like: Business benefits of real-time data streams with Kafka

Conclusions

Complex event processing has already been around for many years. It concerns various domains and can be applied in many areas. The CEP tools on the market add an additional layer to standard stream processing, so that it is possible to easier define complex patterns. So far the majority of production-ready Complex Event Processing tools were closed-source and available only commercially.

Apache Flink is a reliable tool for CEP. It has some adoption in batch and stream processing areas, so additional CEP module gives a wider access to this technology, allowing us to discover potentialy new use cases.

Blog Comments powered by Disqus.
Find more articles like this in Blog section