By: Dean Markwick's Blog -- Julia
Re-posted from: https://dm13450.github.io/2021/08/05/questdb-part-1.html
QuestDB is a timeseries database that is well suited for financial
data. It is built with timestamps in mind both when storing the data
and also when getting the data out. This makes it the ideal candidate
as a storage system for crypto trades.
You might have heard of KDB (or q the language used to work in KDB)
which is a popular comercial timeseries database. You’ll see it being
used all over the finance industry. Unfortunately KDB
costs money if you go beyond their personal license terms whereas
QuestDB is a free and also open source.
So this is a blog post on how to get started with QuestDB. We will be
connecting to Coinbase WebSockets in Julia to download trades that
will be stored in a QuestDB table.
Enjoy these types of post? Then you should sign up to my newsletter. It’s a short monthly recap of anything and everything I’ve found interesting recently plus
any posts I’ve written. So sign up and stay informed!
I’ve written about getting data using Coinbase’s REST APIs
here and
this WebSocket method is an alternative way to get the same
data. Using a WebSocket though has other benefits. With the REST API it is impolite to constantly query the endpoint to pull data in realtime. Plus you don’t know when something has changed, you pull the data, compare it to what you know and then see if there has been an update. With a WebSocket, it just tells you when something has changed, like when a new trade has occurred. This makes it great for building up a database, you can just connect to the firehose and save down all the trades that are coming through. Open the flood gates rather than always asking if something had changed.
Before we get started though you need to download and install QuestDB. As I’m on a Mac I downloaded it using Homebrew and started the process like questdb start
.
You can get more information on downloading QuestDB here for your system.
The rest of the blog cost will walk you through the following steps.
- The Producer/Consumer pattern
- Using WebSockets in Julia
- Putting data into QuestDB
In part two, which I will post sometime next week, I will show you how
we get data out of the database and use some of the specialised
features for timeseries data.
The Producer/Consumer Pattern
I am setting the processes up using a programming pattern called
‘product/consumer’. This means building one process that produces data and a separate process that can consume data. Having the two functions separate allows for a better scalability if you wanted to add in more exchanges or workers. It also means that there is a degree of independence between the two processes and reducing the coupling should make for an easier development experience.
To set this up in Julia we need to create a RemoteChannel
which is how the producer and consumer processes will comunicate. It will be fileld up with the type Trade
that we will also create.
struct Trade
id::String
time::String
price::Float64
size::Float64
side::Int64
exchange::String
end
Trade() = Trade("", "", NaN, NaN, 0, "")
Base.isempty(x::Trade) = x.id == ""
After creating the struct we also add in the null creator function and also a method for checking whether a trade but overal it is a simple type that just contains the relevant information for each trade found.
The RemoteChannel
comes from the Distributed
package.
using Distributed
const trades = RemoteChannel(()->Channel{Trade}(500));
It can store 500 trades and we will fill it up by connecting to the CoinbasePro WebSocket feed. Any of the producer processes would be able to add to this trades
channel if needed.
WebSockets in Julia
A WebSocket needs a url and a call back function to run on the WebSocket. In our case we want to connect to the WebSocket, subscribe to the market data and parse the incoming messages.
Parsing the message is simple. As it is a JSON object it gets converted to a Julia dictionary, so we can just pull the appropriate fields and parse them to number if needed.
This can be accomplished as so:
using JSON
using Dates
using WebSockets
coinbase_url = "wss://ws-feed.pro.coinbase.com"
coinbase_subscribe_string = JSON.json(Dict(:type=>"subscribe",
:product_ids=>["BTC-USD"],
:channels=>["ticker", "heartbeat"]))
function parse_coinbase_data(x)
if (get(x, "type", "") == "heartbeat") || (haskey(x, "channels"))
println("Worker $(myid()): Coinbase Heartbeat")
return Trade()
end
ts = get(x, "time", "")
side = get(x, "side", "")
tradedprice = parse(Float64, get(x, "price", "NaN"))
size = parse(Float64, get(x, "last_size", "NaN"))
id = get(x, "trade_id", "")
Trade(string(id), ts, tradedprice, size, lowercase(side) == "buy" ? 1 : -1, "Coinbase")
end
function save_coinbase_trades(coinbase_url, coinbase_subscribe_string)
WebSockets.open(coinbase_url) do ws
write(ws, coinbase_subscribe_string)
data, success = readguarded(ws)
println("Entering Loop")
while true
data, success = readguarded(ws)
jdata = JSON.parse(String(data))
clean_data = parse_coinbase_data(jdata)
if !isempty(clean_data)
put!(trades, clean_data)
end
end
end
end
We subscribe to the ticker channel, which gives us trades as they occur and we also use the heartbeat channel to keep the WebSocket alive.
Once the message has been parsed and we have created a Trade
object, we can then add it to the queue for the database writer to pick up and save down.
This is finishes the producer part. We can now move onto the consumer process.
Getting Julia to Talk to QuestDB
We’ve connected to the WebSocket and our RemoteChannel
is filling up. How do we get this into a database?. QuestDB exposes a socket (a normal socket not a WebSocket!) that Julia can connect to. So we simply connect to that exposed port and can send data to QuestDB.
QuestDB uses the
InfluxDB line protocol
to ingest data. This is as easy as sending a string down the
connection and QuestDB does the parsing to place it into the database
table. This string needs to take on a specific format:
table, string_column=value, numeric_column_1=value, numeric_column_2=value timestamp
We build this using an IOBuffer
to incrementally add to the payload string.
The timestamp is the number of nanoseconds since the UNIX epoch. The timestamp of the trade from Coinbase does have this precision, but unfortunately Julia DateTime
’s do not support nanoseconds but the Time
type does. So we have to be a bit creative.
The timestamp looks like 2021-01-01T12:00:00.123456
. I split on the .
to get the datetime up to seconds and the nanoseconds. The datetime gets easily parsed into epoch time which we get into nanoseconds since the epoch by multiplying by 1e9. For the nanoseconds, we right pad it with any 0 to makes sure it is 9 digits long and can then convert to nanoseconds. Then it is as simple as adding the two values together and using @sprintf
to get the full integer number without scientific notation.
using Printf
function parse_timestamp(ts::String)
p1, p2 = split(ts, ".")
ut = datetime2unix(DateTime(p1)) * 1e9
ns = Nanosecond(rpad(chop(String(p2), tail=1), 9, "0"))
@sprintf "%.0f" ut + ns.value
end
function build_payload(x::Trade)
buff = IOBuffer()
write(buff, "coinbase_trades2,")
write(buff, "exchange=$(getfield(x, :exchange)), ")
for field in [:id, :price, :size]
val = getfield(x, field)
write(buff, "$(field)=$(val),")
end
write(buff, "side=$(getfield(x, :side)) ")
tspretty = parse_timestamp(getfield(x, :time))
write(buff, tspretty)
write(buff, "\n")
String(take!(buff))
end
The build_payload
function takes in a trade and outputs a string to
write to QuestDB. We connect to port 9009 and continuously take trades
from the trades
RemoteChannel and write it to the database.
using Sockets
function save_trades_quest(trades)
cs = connect("localhost", 9009)
while true
payload = build_payload(take!(trades))
write(cs, (payload))
end
close(cs)
end
All pretty simple. The annoying thing is that it won’t give any
indication as to whether it was successful in writing the file or
not. You can check by looking at the QuestDB web interface and seeing
if your value appears after querying the database or you can check the
QuestDB logs to see if any errors have been found. You’ll find the logs at /usr/local/var/questdb
on a Mac.
Now we’ve got everything sorted we just need to get both processes
running. We will kick off the WebSocket process asynchronously so that is runs in the background and likewise, the save_trades_quest
function so that it doesn’t lock your computer up if you are running the code along side it. With scaling in mind, you could run both of these processes on different threads or cores if needed. But in this case, both processes are light enough to be ran asynchronously on the main thread.
@async save_coinbase_trades(coinbase_url, coinbase_subscribe_string)
@async save_trades_quest(trades)
This is now saving down into the database every time a new trade is seen. If you go to localhost:9000
you query the data and see how it is evolving. QuestDB uses SQL like equivalent and so you can write things like
select * from coinbase_trades
and it will show you all the trades saved down so far. Or
select min(timestamp), max(timestamp) from coinbase_trades
to see the earliest and lastest timestamp in traditional SQL. Or using the timeseries database features:
select * from coinbase_trades
latest by exchange
which will pull out the last timestamp.
Summary
That’s the end of part 1. You should hopefully QuestDB installed and slowly filling up with trades from Coinbase now. In the next part, I’ll show you how to connect to the database and pull the data to do some analysis.
If you want something extra to do, why not try extending the program to pull in other cryptocurrencies, you’ll want to edit the subscribe string and how each message is parsed.
You can also connect to QuestDB using Grafana and build out some dashboards to monitor the trades without needing any other code.
Questions? Feedback? Comments? Let me know below!
Version Info
- QuestDB 6.0.4
- Julia 1.6