CEP is the term used to describe systems that process streams of events. In this post, we will use the the data structures created in a previous post and a GenEvent server to create a simple CEP processor in Elixir.
The application we are creating is going to recieve stock quotes (“ticks”) for different stock symbols and output hourly average prices for each stock. Rather than connect to a live stock feed we will simulate a real time stock feed using random data. Our application design is as follows:
A GenEvent server receives events and passes them on to any process that has registered as a handler. Every registered handler receives every message (we can’‘t subscribe to a subset of topics like you can with message queues) so our application has two GenEvent servers, one that recieves input tick events from our “sources” and one that recieves output average events from our workers and passes them on to our “sinks”.
Our broker recieves ticks from the input GenEvent server. It then looks up the pid of the process registered for that tick symbol, if it finds it passes the event on to the worker, otherwise it passes it on to the worker factory.
Our worker factory recieves tick events and starts a new worker process to handle the event. Finally, our workers have a timed window data structure that they update every time they recieve an event. They then pass on the 60 minute average to the output GenEvent server. Ok, lets see some code.
(All the code can be found in the GitHub repo but I recommended typing the code as you go and only referencing the repo if you get stuck.)
Our source just generates random data to test our application so I won’‘t go into it in any detail. A real world source would read this data from a feed or a file (if you were doing backtesting for example). It gets passed the input GenEvent process on startup and after every “interval” period, it sends it a tick event.
Our broker is the process that will recieve tick events and decide where to send them.
Our Broker recieves the factory process on start up and sends events to it if it can’‘t find the correct worker process.
Our worker code, will be fairly simple. One startup it creates a new timed window, every time it recieves a raw tick it adds it to the timed window and then sends the hourly average to the ouput GenEvent process.
Like the source module, our sink module is just a simple dummy useful for development. In real life you would probably use a sink to store the data to a database or a file (or both).
Tieing it all together
Not that all our code is inplace we need to tied everything together and start our processes, we do this in our Application module.
Ok, with everything in place we should now be able to test our application. If you run
iex -S mix
and after 5 seconds you should start seeing averages being printed to the console. Play around with variables in the application module and see how hard you can make your CPU work :)
We have seen how using GenEvent servers can decouple consumers/workers and producers. In the future this would allow us to easily add more “tick” consumers without having to modify the existing sources or sinks.