Hands-on InfluxDB
We are often interested in how things change over time, we want to analyze these changes to understand them and to anticipate future events. In this article, we will do some time-oriented analytics, and we'll need some storage. The best choice for this will be the one that puts time in the first place. We will see what a time series is, how it is modeled and stored in InfluxDB, and how to query and visualize it. We will use real-world cryptocurrency trades and price data. Read on!
What is the time series?
Let me quote this definition from the influxdata page:
Time series data, also referred to as time-stamped data, is a sequence of data points indexed in time order. Time-stamped is data collected at different points in time.
These data points typically consist of successive measurements made from the same source over a time interval and are used to track change over time.
I believe each of us can already think of an example here, but just to name some of them, it might be weather information from a given measuring center or CPU usage from our server instance.
Time series databases are designed to work with that kind of data. They are usually optimized for high read/write workloads, where deletes are mostly bulk processes triggered after a retention period and where updates are very rare - how can one rewind time in the end?
Among several of today's time series databases, according to db-engines ranking, as of the 18th of July 2022, the most popular one is InfluxDB. And we are going to put our hands on it today!
Use case
Let's imagine we are cryptocurrency trade rookies. We know that the market is very complex and changing constantly. We would like to know how it behaves and what is driving the changes we observe.
We are going to analyze cryptocurrency trades and prices. The data comes from coincap web sockets API. We are going to subscribe to two web sockets channels, the first being live trades from the binance market, and the second one being the live stream of the latest crypto prices.
Data model
We will have two measurements - trades and prices, and we will store them in a bucket called crypto. A bucket is a named location where we store time series data and it can be configured with a specific retention period.
In InfluxDB, data records are referred to as points. The point consists of:
- timestamp
- measurement name
- tag set (optional)
- field key and value
The best way to understand this structure is by looking at specific examples.
Web sockets payload for prices subscription looks like this:
{
"bitcoin":"6389.06",
"ethereum":"192.01",
...
}
So it's a map with currency names as keys and floating point numbers as values representing the price in USD. It doesn't contain a timestamp but we will create one ourselves. Let's put what we have so far into a classic table that we might end up in some relational database:
timestamp | measurement | currency | price |
---|---|---|---|
2022-07-18 09:00 | prices | bitcoin | 6389.06 |
2022-07-18 09:00 | prices | ethereum | 192.11 |
2022-07-18 09:10 | prices | bitcoin | 6380.01 |
2022-07-18 09:10 | prices | ethereum | 180.21 |
Let's translate that to the time series model. When we check the price of a given currency, we will look it up by its name. If it has to be fast, it has to be indexed and that is what tags are for in InfluxDB. The price itself is just a regular value, one that varies over time, therefore we will model it as field.
Time series are built of keys and values. The series key consists of measurement, tag set, and field key. Keys in our example will look as follows:
- K1:
[prices, currency=bitcoin, price]
- K2:
[prices, currency=ethereum, price]
Series value consists of timestamp and field value, therefore:
- K1-V1:
[2022-07-18 09:00, 6389.06]
- K1-V2:
[2022-07-18 09:10, 6380.01]
- K2-V1:
[2022-07-18 09:00, 192.01]
- K2-V2:
[2022-07-18 09:10, 180.21]
We can read the series in a plain language like:
Measurement prices indicate that the price of bitcoin was 6389.06 at 2022-07-18 09:00 and 6380.01 at 2022-07-18 09:10.
We can clearly see that the cryptocurrency prices are a perfect example of a measurement that appeared in the time series definition. But what about trades? A single trade on a market does not "measure" anything, it's more like an event, right? That's true, but we are going to model our trades measurement in a way that will allow us to aggregate and therefore measure the volume of buying or selling a given currency in a given period of time. That is also a valid use case of time series data!
The next question that should come to your mind is how is this data actually stored? InfluxDB introduces a storage engine built from several components.
The first one is an immutable Write Ahead Log (WAL). It's persistent storage that retains the state on engine restarts, all newly written data goes there first. The second component is a cache, used for series key-based lookups, updated after each write. The third component is a Time-Structured Merge Tree (TSM). At runtime, snapshots of data from the cache are written to TSM files, where data is compacted, grouped by series key, and values are ordered by time. After those files are safely stored, the WAL is truncated and the cache is cleared. For more reading on the storage engine, I will redirect you to the official documentation.
Hopefully, now you can see how series are built from points. This is enough for us to move to the next step - data collection.
Data collection
Finally, we will see some code. Let's subscribe to coincap web sockets, transform the data, and write it to InfluxDB.
The response model which we read from web socket frames is quite simple.
Trade consists of base
and quote
currency names, as well as direction
as either sell or buy, volume
, and a timestamp
.
case class Trade(
base: String,
quote: String,
direction: String,
volume: Double,
timestamp: Long
)
type Prices = Map[String, Double]
Let's define the subscription logic. We will use sttp and Akka Streams for that.
val subscribeTrades = basicRequest
.response(asWebSocketStream(AkkaStreams)(processFrame(subscription = "trades")))
.get(uri"wss://ws.coincap.io/trades/binance")
val subscribePrices = basicRequest
.response(asWebSocketStream(AkkaStreams)(processFrame(subscription = "prices")))
.get(uri"wss://ws.coincap.io/prices?assets=ALL")
Processing of a single frame places them on a separate queue for further processing.
def processFrame(subscription: String): AkkaStreams.Pipe[WebSocketFrame.Data[_], WebSocketFrame] =
Flow.fromFunction {
case WebSocketFrame.Text(payload, _, _) =>
framesQueue.offer(Frame(subscription, payload))
WebSocketFrame.pong
case _ => WebSocketFrame.pong
}
The frameQueue
buffers frames and does the heavy lifting, namely decodes the payload to either Trade
or Prices
structure and converts them into Point
's - InfluxDB client library representation of a point we described in the previous section. Finally, the points are batched into a group of 1000 and sent to the InfluxDB instance.
Let's look at how we define the trades measurement.
Point
.measurement("trades")
.addTag("base", trade.base)
.addTag("direction", trade.direction)
.addTag("quote", trade.quote)
.addField("volume", trade.volume)
.time(trade.timestamp, WritePrecision.MS)
There are three tags since we will be interested in a given currency buy/sale volume, so we want our series key to be built using those. The value that varies over time for a given trade "key" is the volume of the transaction, hence we model that as a field.
The code for connecting and talking to InfluxDB is as simple as:
class Influxdb {
private val options = InfluxDBClientOptions
.builder()
// set default bucket, user, password, url, log level
.build()
private val client = {
val client = InfluxDBClientFactory.create(options)
client.ping()
client
}
private val writeApi = client.makeWriteApi()
val write: Flow[Seq[Point], Unit, NotUsed] =
Flow[Seq[Point]].map(points => writeApi.writePoints(points.asJava))
def close(): Unit = client.close()
}
Data analytics
After running the code for one hour, and making websockets subscriptions, we collected almost one million data points in our crypto bucket, most of them being trades.
We will look at the InfluxDB console, which comes with the official docker image and see if we can find out something about trading from our data. Let's ask a simple question: Which currency was the most purchased over that time?
Our database uses its query language - Flux. It's referred to as a functional data scripting language and is designed to work with different data sources (MySQL, PostgreSQL, and CSV are supported today).
Here is an example query that will process the price of bitcoin. I think you will agree that it's quite self-explanatory.
from(bucket: "crypto")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "prices")
|> filter(fn: (r) => r["currency"] == "bitcoin")
|> filter(fn: (r) => r["_field"] == "price")
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
|> yield(name: "mean")
The majority of Flux queries includes:
- source - crypto bucket in our example
- filter - several here, first we say we are interested in the prices measurement, bitcoin in particular, and the value of price
- shape - usually a window or a group-by statement
- process - it can be an aggregate like here, but it can also be, for example, a
map
Steps are chained with a pipe-forward operator |>
. The flow looks very familiar if you worked with Java Streams API or some Scala streaming library, like Akka Streams. A lot is going on under the hood obviously, I encourage you to read about the Flux data model if you want to know more.
After opening localhost:8086
and navigating to Data Explorer, we can play around with Flux queries and some visualizations. We query for buys and aggregate every 5 minutes by counting the trades. Remember that our series key is built using tags, so it contains base and quote currency names.
It looks like everyone was mostly buying tether during the one-hour period we observed. The most trending type of trade was buying tether using bitcoin. But also other currencies, with lower trades rate visible below, almost all are swapped on tether. The price of tether shouldn't be a surprise and should be around 1 USD always, seems like people are falling back to stablecoins these days.
Let's compare it with bitcoin prices. We will also plot additional timed moving averages for 30 and 60 minutes periods.
This can be further used to detect moving average crossovers which is a technique to determine a trend end or reversal. You would probably look at the moving averages for days rather than minutes but we don't have that much data. A good case to try out InfluxDB alerts.
Summary
In this article, we've made an introduction to the time-series database InfluxDB. We learned some basics about its engine, way of storing series, and writing simple Flux queries.
What we've seen is not rocket science, I wanted to show you some example queries using real data, introduce you to time series, and provide some easy-to-start "hello world" project.
The InfluxDB ecosystem is quite big but sufficiently supported by the console. There are several ways we can load our data using Telegraf plugins. I've decided to write some custom code to get my hands on the client library.
As always, the full code is available on my GH account - here. I encourage you to run it and play with the InfluxDB console yourself ;) Stay safe!