Category: System Architecture

ScyllaDB POC – (not so :) live blogging – Update #3.

Scylla attacking Olysseus's ship

Scylla attacking Olysseus’s ship

Hi all

It has been a long time (more than 4 months) since we last updated.

You can read the previous update here.

It is not that we abandoned the POC, we actually continued to invest time and effort on it since there is a good progress. It is just that we did not yet ended it and got the proofs that we wanted. While there was a lot of progress in both Outbrain system side and ScyllaDB side on those 4 months, there is one things that is holding us back from showing trying to prove the main point of this POC. Our current bottleneck is the network. The network on the datacenter where we are running the tests on is 1Gbps ethernet network. We found out that although Scylla is not loaded and works with good latencies we are saturating the NICs. We did some improvements along the way to still show that Scylla is behaving better than C* but if we want to show that we can significantly reduce the number of nodes in the cluster, we need to upgrade to 10Gbps ethernet.

This upgrade will come shortly.

 

This is where we currently stand. However – a lot was done in those 4 months and there is a lot of learnings I want to share. The rest of the post is the way Doron and the Scylla guys describes what happened. It looks more like captain’s log but it tells the story pretty well.

 

Scylla:

  • 23/6 – We created special app server cluster to call Scylla, and delegated all calls both to C* and Scylla cluster. We wanted to do that so there will be less coupling between the C* path and the Scylla path and less mutual interruptions that will interfere our conclusions. The app servers for Scylla were configured not to use cache, so entire load (1.5M-3M RPM) went directly to Scylla. C* stayed behind cache and actually handled ~10% of the load. This went smoothly.
  • In the following ~3 weeks we tried to load the snapshot again from C* and stumbled with some difficulties, some related to bugs in Scylla, some to networking limits (1Gpbs). During this time we had to stop the writes to Scylla for few days, so the data was not sync again. Some actions we have done to resolve
    1. We investigated bursts of usage we had and decreased them in some use-cases (both for C* and Scylla). They caused the network usage to be very high for few seconds, sometimes for a few tens of milliseconds. This also helped C* a lot. The tool is now open source.
    2. We added client-server compression (LZ4). It was supported by Scylla, but client needed to configure it.
    3. Scylla added server-server compression during this period.
    4. Changed the “multi” calls back to N parallel single calls (instead of one IN request) – it better utilize the network.
    5. Scylla client was (mistakably) using latency aware over the token aware. This caused app to go to the “wrong” node a lot – causing more traffic within Scylla nodes. Removing the latency-aware helped reducing the server-server network usage and the overall latency.
  • 14/7 – with all the above fixes (and more from Scylla) we were able to load the data and stabilize the cluster with the entire load.
  • Until 29/7 I see many spikes in the latency. We are  not sure what we did to fix it… but on 29/7 the spikes stopped and the latency is stable until today.
  • During this period we have seen 1-5 errors from Scylla per minute. Those errors were explained by trying to reach partitions coming from very old C* version. It was verified by logging the partitions we fail for in the app server side. Scylla fixed that on 29/7.
  • 7/8-15/8 – we have changed the consistency level of both Scylla and C* to local one (to test C*) – this caused a slight decrease in the (already) low latencies.
  • Up to 19/8 we have seen occasional momentarily errors coming from Scylla (few hundreds every few hours). This has not happened since 19/8.. I don’t think we can explain why.
  • Current latencies – Scylla holds over 2M RPM with latency of 2 ms (99%ile) for single requests and 40-50 ms (99%ile) for multi requests of ~100 partitions in avg per request. Latencies are very stable with no apparent spikes. All latencies are measured from the app servers.

 

Next steps on Scylla:

  • Load the data again from C* to sync.
  • Repeat the data consistency check to verify the results from C* and Scylla are still the same.
  • Run repairs to see cluster can hold while running heavy tasks in the background..
  • Try to understand the errors I mentioned above if they repeat.
  • Create the cluster in Outbrain’s new Sacramento datacenter that have 10Gbps network. with minimum nodes (3?) and try the same load there.

 

Cassandra:

  • 7/8 – we changed consistency level to local-one and tried to remove cache from Cassandra. The test was successful and Cassandra handled the full load with latency increasing from 5 ms (99%ile for single requests) to 10-15ms in the peak hours.
  • 15/8 – we changed back to local-quorum (we do not like to have local-one for this cluster… we can explain in more details why) and set the cache back.
  • 21/8 – we removed the cache again, this time with local-quorum. Cassandra handled it, but single requests latency increased to 30-40 ms for the 99%ile in the peak hours. In addition, we have started timeouts from Cassandra (timeout is 1 second) – up to 500 per minute, in the peak hours.

 

Next steps on C*:

  • Run repairs to see cluster can hold while running heavy tasks in the BG.
  • Try compression (like we do with Scylla).
  • Try some additional tweaks by the C* expert.
  • In case errors continue, will have to set cache back.

 

Current status comparison – Aug 20th:

 

The following table shows comparison under load of 2m RPM in peak hours.

Latencies are in 99%ile.

 

Cassandra ScyllaDB
Single call latency 30-40 ms (spikes to 70) 2 ms
Multi call latency 150-200 ms (spikes to 600) 40-50 ms
Errors (note: these are query times exceeding 1 second, not necessarily database failures) Up to 150 a minute every few minutes timeouts per minute, with some higher spikes every few days few hundreds every few days

 

Below are the graphs showing the differences.

Latency and errors graphs showing both C* and Scylla getting requests without cache (1M-2M RPM):

Latency comparison of single requests

Screen Shot 2016-08-27 at 9.36.00 PM

Latency comparison of multi requests

Screen Shot 2016-08-27 at 9.36.27 PM

Errors (timeouts for queries > 1 second)

Screen Shot 2016-08-27 at 9.36.42 PM

 

Summary

There are very good signs that Scylla DB does make a difference in throughput but due to the network bottleneck we could not verify it. We will update as soon as we have results on a faster network. Scylla guys are working on solution for slower networks too.

 

Hope to update soon.

Micro Service Split

image07

In this post I will describe a technical methodology we used to remove a piece of functionality from a Monolith/Service and turn it into a Micro-Service. I will try to reason about some of the decisions we have made and the path we took, as well as a more detailed description of internal tools, libraries and frameworks we use at Outbrain and in our team, to shed some light on the way we work in the team. And as a bonus you might learn from our mistakes!
Let’s start with the description of the original service and what it does.
Outbrain runs the largest content discovery platform. From the web surfer’s perspective it means serving a recommended content list that might interest her, in the form of ‘You might also like’ links. Some of those impression links are sponsored. ie: when she clicks on a link, someone is paying for that click, and the revenue is shared between Outbrain and the owner of the page with the link on it. That is how Outbrain makes its revenue.

My team, among other things, is responsible for the routing of the user to the requested page after pressing the link, and for the bookkeeping and accounting that is required in order to calculate the cost of the click, who should be charged, etc.
In our case the service we are trying to split is the ‘Bookkeeper’. Its primary role is to manage the paid impression links budget. After a budget is spent, The ‘Bookkeeper’ should notify Outbrain’s backend servers to refrain from showing the impression link again. And this has to be done as fast as possible. If not, people will click on links we cannot charge because the budget was already spent. Technically, this is done by an update to a database record. However, there are other cases we might want to stop the exposure of impression links. One such an example is a request from the customer paying for the future click to disable the impression link exposure. So for such cases we have an API endpoint that does exactly the same with the same code. That endpoint is actually part of the ‘Bookkeeper’ that is enabled by a feature toggle on specific machines. This ‘Activate-Impressionable’ endpoint as we call it, is what was decided to split out of the ‘Bookkeeper’ into a designated Micro-Service .
In order to execute the split, we have chosen a step-by-step strategy that will allow us to reduce the risk during execution and keep it as controlled and reversible as possible. From a bird’s eye view I will describe it as a three steps process: Plan, Up and Running as fast as possible and Refactor. The rest of the post describes these steps.

Plan (The who’s and why’s)

In my opinion this is the most important step. You don’t want to split a service just in order to split. Each Micro Service introduces maintenance and management overhead, with its own set of challenges[1]. On the other hand, Microservices architecture is known for its benefits such as code maintainability (for each Micro Service), the ability to scale out and improved resilience[2].
Luckily for me, someone already did that part for me and took the decision that ‘Activate-Impressionable’ should split from the ‘Bookkeeper’. But still, Let’s name some of the key factor of our planning step.
Basically I would say that a good separation is a logical separation with its own non-overlap RESTful endpoints and isolated code base. The logical separation should be clear. You should think what is the functionality of the new service, and how isolated it is. It is possible to analyze the code for inter-dependencies among classes and packages using tools such as lattix. At the bottom line, it is important to have a clear definition of the responsibility of the new Micro Service.
In our case, the ‘Bookkeeper’ was eventually split so that it remain the bigger component, ‘Activate-Impressionable’ was smaller and the common library was smaller than both. The exact lines of code can be seen in the table below.

Screen Shot 2016-02-21 at 11.21.56

Unfortunately I assessed it only after the split and not in the plan step. We might say that there is too much code in common when looking at those numbers. It is something worth considering when deciding what to split. A lot of common code implies low isolation level.

Of course part of the planning is time estimation. Although I am not a big fan of “guestimates” I can tell that the task was planned for couple of weeks and took about that long.
Now that we have a plan, let’s get to work.

Up and Running – A Step by Step guide

As in every good refactor, we want to do it in small baby steps, and remain ‘green’ all the time[3]. In continuous deployment that means we can and do deploy to production as often as possible to make sure it is ‘business as usual’. In this step we want to get to a point the new service is working in parallel to the original service. At the end of this step we will point our load-balancers to the new service endpoints. In addition, the code remains mutual in this step, means we can always deploy the original fully functioning ‘Bookkeeper’. We actually do that if we feel the latest changes had any risk.
So let’s break it down into the actual phases:

Overview Step Details
micro service split 0 Starting phase
micro service split 1 Create the new empty Micro-Service ‘Activate-Impressionable’. In outbrain we do it using scaffolding of ob1k framework. Ob1k is an open source Micro Services Framework that was developed in-house.
micro service split 2 Create a new empty Library dependent both by the new ‘Activate-Impressionable’ service and the ‘Bookkeeper’. Ideally, if there is a full logic separation with no mutual code between the services that library will be deleted in the cleanup phase.
micro service split 3 Move the relevant source code to the library. Luckily in our case, there was one directory that was clearly what we have to split out. Unluckily, that code also pulled up some more code it was dependent on and this had to be done carefully not to pull too much nor too little. The good news are that this phase is pretty safe for static typing languages such as Java, in which our service is written in. The compiler protects us here with compilation errors so the feedback loop is very short. Tip: don’t forget to move unit tests as well.
micro service split 4 Move common resources to the library, such as spring beans defined in xml files and our feature flags files that defined in yaml files. This is the dangerous part. We don’t have the compiler here to help, so we actually test it in production. And when I say production I mean using staging/canary/any environment with production configuration but without real impact. Luckily again, both yaml and spring beans are configured to fail fast, so if we did something wrong it will just blow out in our face and the service will refuse to go up. For this step I even ended up developing a one-liner bash script to assist with those wicked yaml files.
micro service split 5 Copy and edit web resources (web.xml) to define the service endpoints. In our case web.xml can’t reside in a library so it has to be copied. Remember we still want the endpoints active in the ‘Bookkeeper’ at that phase. Lesson learned: inspect all files closely. In our case log4j.xml which seems like an innocent file by its name contains designated appenders that are consumed by other production services. I didn’t notice that and didn’t move the required appender, and it was found only later in production.
Deploy Deploy the new service to production. What we did is deploy the ‘Activate-Impressionable’ side-by-side on the same machines as the ‘Bookkeeper’, just with a different ports and context path. Definitely makes you sleep better at night.
Up-And-Running Now is a good time to test once again if both ‘Bookkeeper’ and ‘Activate-Impressionable’ are working as expected. Generally now we are up and running with only few more things to do here.
Clients Redirect Point clients of the service to the new endpoints (port + context path). A step that might take some time depends on the number of clients and the ability to redeploy them. In outbrain we use HA-Proxy, so reconfiguring it did most of the work, but some clients did require code modifications.
(More) Validation Move/copy simulator tests and monitors. In our team, we heavily rely on tests we call simulator tests. These are actually black-box tests written in JUnit that runs against the service installed on a designated machine. These tests see the service as a black-box and calls its endpoints while mock/emulate other services and data in the database for the test run. So usually a test run can look like: put something in the database, trigger the endpoint, and see the result in the database or in the http response. There is also a question here whether to test ‘Activate-Impressionable’ or the ‘Bookkeeper’. Ideally you will test them both (tests are duplicated for that phase), and that is what we did.

 

Refactor, Disconnect & Cleanup

When we got here the new service is working and we should expect no more behaviour changes from the endpoints point of view. But we still want the code to be fully split and the services to be independent from each other. In the previous step we performed the phases in a way that everything remains reversible with a simple feature toggle & deploy.

In this step we move to a state where the ‘Bookkeeper’ will no longer host the ‘Activate-Impressionable’ functionality. Sometimes it is a good idea to have a gap from the previous step to make sure that there are no rejections and backfires that we didn’t trace in our tests and monitoring.
First thing, If was not done up until now, is deploying the ‘Bookkeeper’ without the service functionality and make sure everything is still working. And wait a little bit more…
And now we just have to push the sources and the resources from the library to the ‘Activate-Impressionable’ service. In the ideal case there is no common code, we can also delete the library. This was not how it was in our case. We still have a lot of common code we can’t separate for the time being.
Now is also the time to do resources cleanup, web.xml edit etc’.
And for the bold and OCD among us – packages rename and refactor of code with the new service naming conventions.

Conclusion

image02
The entire process in our case took a couple of weeks. Part of the fun and advantage in such process, is the opportunity to know better an old code and its structure and functionality without the need to modify something for a new feature with its constraints. Especially when someone else wrote it originally.
In order to perform well such a process it is important to plan and remain organized and on track. In case of a context switch it is very important to keep a bookmark of where you need to return to in order to continue. In our team we even did that with a handoff of the task between developers. Extreme Programming, it is.
It is interesting to see the surprising results in terms of lines of code. Originally we thought of it as splitting a micro-service from a monolith. In retrospective, it looks to me more like splitting a service into two services. ‘Micro’ in this case is in the eye of the beholder.

References

[1] http://highscalability.com/blog/2014/4/8/microservices-not-a-free-lunch.html
[2] http://eugenedvorkin.com/seven-micro-services-architecture-advantages/
[3] http://blog.cleancoder.com/uncle-bob/2014/12/17/TheCyclesOfTDD.html

http://martinfowler.com/articles/microservices.html
https://github.com/outbrain/ob1k
http://www.yaml.org/
http://lattix.com/

Real Time Performance Monitoring @ Outbrain

Outbrain serves millions of requests per minute, based on a micro service architecture.Consequently, as you might expect, visibility and performance monitoring are crucial.

Serving millions of requests per minute, across multiple data centers, in a micro services environment, is not an easy task. Every request is routed to many applications, and may potentially stall or fail at every step in the flow. Identifying bottlenecks, troubleshooting failures and knowing our capacity limits are all difficult tasks. Yet, these are not things you can just give up on “because they’re hard”, but are rather tasks that every engineer must be able to tackle without too much overhead. It is clear that we have to aim for all engineers to be able to understand how their applications are doing at any given time.

Since we face all of these challenges every day, we’ve reached the point where a paradigm shift was required. For example, move from the old, familiar “investigate the past” to the new, unfamiliar “investigate the present”. That’s only one of the requirements we came up with. Here are few more:

 

Real time visibility

Sounds pretty straightforward, right? However when using a persistent monitoring system, it always has at least few minutes of delay. These few minutes might contain millions of errors that potentially affect your business. Aiming for low MTTR means cutting delays where possible, thus moving from minute-based granularity to second-based.

 

Throughput, Latency and error rate are linked

Some components might suffer from high latency, but maybe the amount of traffic they receive is negligible. Others might have low latency under high load, but that’s only because they fail fast for almost every request (we are reactive!). We wanted to view these metrics together, and rank them by importance.

 

Mathematical correctness at any given aggregation (Don’t lie!)

When dealing with latency, one should look at percentiles, not averages, as averages can be deceiving and might not tell the whole story. But what if we want to view latency per host, and then view it per data center ? if we store only percentiles per host (which is highly common in our industry), it is not mathematically correct to average them! On the other hand, we have so much traffic that we can’t just store any measurement with its latency; and definitely not view them all in real time

 

Latency resolution matters

JVM based systems tend to display crazy numbers when looking at the high percentiles (how crazy ? With heavy gc storms and lock contention there is no limit to how worse these values can get). It’s crucial for us to differentiate between latency in the 99.5 and 99.9 percentiles, while values at the 5 or 10 percentiles don’t really matter.

Summing up all of the requirements above, we reached a conclusion that our fancy persistent monitoring system, with its minute-based resolution, supporting millions of metrics per minute, doesn’t cut it anymore. We like it that every host can write thousands of metric values every minute, and we like being able to view historical data over long periods of time, but moving forward, it’s just not good enough. So, as we often do, we sat down to rethink our application-level metric collection and came up with a new, improved solution.

 

Our Monitoring Unit

First, consider metric collection from the application perspective. Logically, it is an application’s point-of-view of some component: a call to another application, to a backend or plain CPU bound computation. Therefore, for every component, we measure its number of requests, failures, timeouts and push backs along with a latency histogram over a short period of time.

In addition, we want to see the worst performing hosts in terms of any such metric (can be mean latency, num errors, etc)

mu

To achieve this display for each measured component we decided to use these great technologies:

 

HDR Histograms

http://hdrhistogram.github.com/HdrHistogram/

HdrHistogram supports the recording and analysis of sampled data value counts, across a configurable value range, with configurable value precision within the range. It is designed for recording histograms of latency measurements in performance-sensitive applications.

Why is this important? Because when using such histograms to measure the latency of some component, it allows you to have good accuracy of the values in the high percentiles at the expense of the low percentiles

So, we decided to store in memory instances of histograms (as well as counters for requests, errors, timeouts, push backs, etc) for each measured component. We then replace them each second and expose these histograms in the form of rx.Observable using our own OB1K application server capabilities.

All that is left is to aggregate and display.

Java Reactive extensions

https://github.com/ReactiveX/RxJava

rx is a great tool to merge and aggregate streams of data in memory. In our case, we built a service to merge raw streams of measured components; group them by the measured type, and aggregate them in a window of a few seconds. But here’s the trick – we do that on demand. This allows us to let the users view results grouped by any dimension they desire without losing the mathematical correctness of latency histograms aggregation.

Some examples on the operators we use to aggregate the multiple monitoring units:

 

merge

rx merge operator enables treating multiple streams as a single stream

 

window

rx window operator enables sliding window abstraction

 

scan

rx scan operator enables aggregation over each window

 

To simplify things, we can say that for each component we want to display, we connect to each machine to fetch the monitored stream endpoint, perform ‘merge’ to get a single stream abstraction, ‘window’ to get a result per time unit, and ‘scan’ to perform the aggregation

 

Hystrix Dashboard

https://github.com/Netflix/Hystrix

The guys at Netflix found a great formula for displaying serving components’ status in a way that links between volume, error percentage and latency in a single view. We really liked that, so we adopted this UI to show our aggregated results.

The hystrix dashboard view of a single measured component shows counters of successes, failures, timeouts and push backs, along with a latency histogram, information on the number of hosts, and more. In addition, it provides a balloon view, which grows/shrinks with traffic volume per component, and is color-coded by the error rate.

See below how this looks in the breakdown view of all request components. The user gets a view of all measured components, sorted by volume, with a listing of the worst performing hosts.

view1

Another example shows the view of one application, with nothing but its entry points, grouped by data center. Our Operations guys find this extremely useful when needing to re-balance traffic across data centers.

REBALANCE

 

OK, so far so good. Now let’s talk about what we actually do with it.

Troubleshooting

Sometimes an application doesn’t meet its SLA, be it in latency or error rate. The simple case is due to a broken internal component (for example, some backend went down and all calls to it result in failures). At this point we can view the application dashboard and easily locate the failing call. A more complex use case is an increase in the amount of calls to a high latency component at the expense of a low latency one (for example, cache hit rate drop). Here our drill down will need to focus on the relative amount of traffic each component receives – we might be expecting a 1:2 ratio, while in reality we might observe a 1:3 ratio.

With enough alerting in place, this could be caught by an alert. Having the real time view will allow us to locate the root cause quickly even when the alert is a general one.

troubleshoot

Performance comparison

In many cases we want to compare the performance of two groups of hosts doing the same operation, such as version upgrades or topology changes. We use tags to differentiate groups of machines (each datacenter is a tag, each environment, and even each hostname). We then can ask for a specific metric, grouped by tags, to get the following view:

compare

 

Load testing

We conduct several types of load tests. One is where we shift as much traffic as possible to one data center, trying to hit the first system-wide bottleneck. Another is performed on specific applications. In both cases we use the application dashboard to view the bottlenecks, just like we would when troubleshooting unexpected events.

One thing to keep in mind is that when an application is loaded, sometimes the CPU is bounded and measurements are false because threads just don’t get CPU time. Another case where this happens is during GC. In such cases we must also measure the effects of this phenomenon.

The measured unit in this case is ‘jvm hiccup’, which basically means taking one thread, letting it sleep for a while and measuring “measurement time on top of the sleep time”. Low hiccups means we can rely on the numbers presented by other metrics.

hiccup

 

What’s next?

Real time monitoring holds a critical role in our everyday work, and we have many plans to leverage these measurements. From smarter metric driven load balancing in the client to canary deployments based on real application behavior – there is no limit to what you can achieve when you measure stuff in a fast, reliable manner.

Goodbye static CNAMEs, hello Consul

Nearly every large scale system becomes distributed at some point: a collection of many instances and services that compose the solution you provide. And as you scale horizontally to provide high availability, better load distribution, etc…, you find yourself spinning up multiple instances of services, or using systems that function in a clustered architecture. That’s all cool in theory, but soon after you ask yourself, “how do I manage all of this? How should these services communicate with each other? And how do they even know what instances (or machines) exist?”

Those are excellent questions!

What methods are in use today?

The naive approach, which we’d followed in Outbrain for many years, is to route all inter-service traffic via load balancers (HAProxy in our case). Every call to another system, such as a MySql slave, is done to the load balancer (one in a pool of many), via an agreed upon name, such as a DNS CNAME. The load balancer, which holds a static configuration of all the different services and their instances, directs the call to one of those instances, based on the predefined policy.

backend be_onering_es   ## backend name
  balance leastconn     ## how to distribute load
  option httpchk GET /  ## service health check method
  option httpclose      ## add “Connection: close” header if missing
  option forwardfor     ## send client IP through XFF header
  server ringdb-20001 ringdb-20001:9200 check slowstart 10s weight 100   ## backend node 1
  server ringdb-20002 ringdb-20002:9200 check slowstart 10s weight 100   ## backend node 2

The load balancer is also responsible for checking service health, to make sure requests are routed only to live services, as dead ones are “kicked out of the pool”, and revived ones are brought back in.

An alternative to the load balancer method, used in high throughput systems such as Cassandra, is configuring CNAMEs that point to specific nodes in the cluster. We then use those CNAMES in the consuming applications’s configuration. The client is then responsible to activate a policy of balancing between those nodes, both for load and availability.

OK, so what’s the problem here?

There’s a few actually:

  1. The mediator (Load balancer), as quick as it may be in processing requests (and HAProxy is really fast!), is another hop on the network. With many services talking to each other, this could prove a choke point in some network topologies. It’s also a shared resource between multiple services and if one service misbehaves, everyone pays the price. This is especially painful with big payloads.
  2. The world becomes very static! Moving services between hosts, scaling them out/in, adding new services – it all involves changing the mediator’s config, and in many cases done manually. Manual work requires expertise and is error prone. When the changes becomes frequent… it simply does not scale.
  3. When moving ahead to infrastructure that is based on containers and resource management, where instances of services and resources are allocated dynamically, the whole notion of HOSTNAME goes away and you cannot count on it in ANY configuration.

What this all adds up to is “the end of the static configuration era”. Goodbye static configs, hello Dynamic Service Discovery! And cue Consul.

What is Consul?

In a nutshell, Consul is a Service Discovery System, with a few interesting features:

  1. It’s a distributed system, made out of an agent in each node. Nodes talk to each other via a gossip protocol, making node discovery simple, robust, and dynamic. There’s no configuration file describing all members of a Consul cluster.
  2. It’s fault tolerant by design, and using concepts such as Anti Entropy, gracefully handles nodes disappearing and reappearing – a common scenario in VM/container based infrastructure.
  3. It has first-class treatment of datacenters, as self-contained, interconnected entities. This means that DC failure / disconnection would be self-contained. It also means that a node in one DC can query for information in another DC with as little knowledge as the remote DC’s name.
  4. It holds the location (URI) and health of every service on every host, and makes this data available via multiple channels, such as a REST API and GUI. The API also lets you make complex queries and get the service data segment you’re interested in. For example: Get me all the addresses of all instances of service ‘X’ from Datacenter ‘Y’ in ‘staging env’ (tag).
  5. There is a very simple way to get access to “Healthy” service instances by leveraging the Consul DNS interface. Perfect for those pesky 3rd party services whose code you can’t or don’t want to modify, or just to get up and running quickly without modifying any client code (disclaimer: doesn’t fit all scenarios).

How does Consul work?

You can read all about it here, but let me take you through a quick tour of the architecture:

click to enlarge

As you can see, Consul has multi datacenter awareness built right in (you can read more about it here). But for our case, let’s keep it simple, and look at the case of a single datacenter (Datacenter 1 in the diagram).

What the diagram tags as “Clients” are actually “Consul agents”, running locally on every participating host. Those talk to each other, as well as the Consul servers (which are “agents” configured as Servers), through a “Gossip protocol”. If you’re familiar with Cassandra, and that rings a bell, then you’re right, it’s the same concept used by Cassandra nodes to find out which ones are up or down in a cluster. A Gossip protocol essentially makes sure “Everybody knows Everything about Everyone”. So within reasonable delay, all agents know (and propagate) state information about other agents. And you just so happen to have an agent running locally on your node, ready to share everything it knows via API, DNS or whatnot. How convenient!

Agents are also the ones performing health checks to the services on the hosts they run on, and gossiping any health state changes. To make this work, every service must expose a means to query its health status, and when registered with its local Consul agent, also register its health check information. At Outbrain we use an HTTP based “SelfTest” endpoint that every one of our homegrown services exposes (through our OB1K container, practically for free!).

Consul servers are also part of the gossip pool and thus propagate state in the cluster. However, they also maintain a quorum and elect a leader, who receives all updates (via RPC calls forwarded from the other servers) and registers them in it’s database. From here on, the data is replicated to the other servers and propagated to all the agents via Gossip. This method is a bit different from other Gossip based systems that have no servers and leaders, but it allows the system to support stronger consistency models.

There’s also a distributed key-value store we haven’t mentioned, rich ACLs, and a whole ecosystem of supporting and derived tools… but we said we’d keep it simple for now.

Where does that help with service discovery?

First, what we’ve done is taken all of our systems already organized in clusters and registered them with Consul. Systems such as Kafka, Zookeeper, Cassandra and others. This allows us to select a live service node from a cluster, simply by calling a hostname through the Consul DNS interface. For example, take Graphite: Outbrain’s systems are currently generating ~4M metrics per minute. Getting all of these metrics through a load balancer, or even a cluster of LBs, would be suboptimal, to say the least. Consul allows us to have each host send metrics to a hostname, such as “graphite.service.consul”, which returns a random IP of a live graphite relay node. Want to add a couple more nodes to share the load? no problem, just register them with Consul and they automagically appear in the list the next time a client resolves that hostname. Which, as we mentioned, happens quite a few times a minute. No load balancers in the way to serve as choke points, no editing of static config files. Just simple, fast, out-of-band communication.

How do these 3rd party services register?

We’re heavy users of Chef, and have thus created a chef cookbook to help us get the job done. Here’s a (simplified) code sample we use to register Graphite servers:

ob_consul 'graphite' do
  owner 'ops-vis'         ## add ‘owner’ tag to identify owning group
  port 1231               ## port the service is running on
  check_cmd "echo '' | nc localhost 1231 || exit 2"    ## health check shell command
  check_interval '60s'    ## health check execution interval
  template false          ## whether the health check command is a Chef template (for scripts)
  tags [‘prod’]           ## more tags
end

How to do clients consume services?

Clients simply resolve the DNS record they’re interested in… and that’s it. Consul takes care of all the rest, including randomizing the results.

$ host graphite
graphite.dc_name.outbrain.com is an alias for relayng.service.consul.
relayng.service.consul has address 10.10.10.11
relayng.service.consul has address 10.10.10.12

How does this data reach the DNS?

We’ve chosen to place Consul “behind” our internal DNS servers, and forward all requests for the “consul” domain name to a consul agent running on the DNS servers.

zone "consul" IN {
    type forward;
    forward only;
    forwarders { 127.0.0.1 port 8600; };
};

Note that there’s other ways to go about this, such as routing all DNS requests to the local Consul agent running on each node, and having it forward everything “non-Consul” to your DNS servers. There’s advantages and disadvantages to each approach. For our current needs, having an agent sit behind the DNS servers works quite well.

Where does the Consul implementation at Outbrain stand now?

At Outbrain we’re already using Consul for:

  • Graphite servers.
  • Hive Thrift servers that are Hive interfaces to the Hadoop cluster they’re running on. Here the Consul CNAME represents the actual Hadoop cluster you want your query to run on. We’ve also added a layer that enables accessing these clusters from different datacenters using Consul’s multi-DC support.
  • Kafka servers.
  • Elasticsearch servers.

And our roadmap for the near future:

  • MySql Slaves – so we can eliminate the use of HAProxy in that path.
  • Cassandra servers where maintaining a list of active nodes in the app configuration becomes stale over time.
  • Prometheus – our new monitoring and alerting system.
  • Zookeeper clusters.

 

But that’s not all! stay tuned for more on Consul, client-side load balancing, and making your environment more dynamic.

So Long Spring XMLs

Like many java projects these days, we use Spring in Outbrain for configuring our java dependencies wiring. Spring is a technology that started in order to solve a common, yet not so simple, issue – wiring all the dependencies in a java project. This was done by utilizing the IoC (Inversion of Control) principles. Today Spring does a lot more than just wiring and bootstrapping, but in this post I will focus mainly on that.

When Spring just started, the only way to configure the wirings of an application, was to use XMLs which defined the dependencies between different beans. As Spring had continued to develop, 2 more methods were added to configure dependencies – the annotation method and the @Configuration method. In Outbrain we use XML configuration. I found this method has a lot of pain points which I found remedy to using spring @Configuration

What is this @Configuration class?

You can think of a @Configuration class just like XML definitions, only defined by code. Using code instead of XMLs allows some advantages over XMLs which made me switch to this method:

  1. No typos – You can’t have a typo in code. The code just won’t compile
  2. Compile time check (fail fast) – With XMLs it’s possible to add an argument to a bean’s constructor but to forget to inject this argument when defining the bean in the XML. Again, this can’t happen with code. The code just won’t compile
  3. IDE features come for free – Using code allows you to find usages of the bean’s constructor to find out easily the contexts that use it; It allows you to jump back and forth between beans definitions and basically everything you can do with code, you get for free.
  4. Feature flags – In Outbrain we use feature-flags a lot. Due to the continuous-deployment culture of the company, a code that is pushed to the trunk can find itself in production in a matter of minutes. Sometimes, when developing features, we use feature flags to enable/disable certain features. This is pretty easy to do by defining 2 different implementations to the same interface and decide which one to load according to the flag. When using XMLs we had to use the alias feature which makes it not intuitive enough to create feature-flags. With @Configuration, we can create a simple if clause for choosing the right implementation.

Read more >

Finding a needle in a Storm-stack


Using Storm for real time distributed computations has become a widely adopted approach, and today one can easily find more than a few posts on Storm’s architecture, internals, and what have you (e.g., Storm wiki, Understanding the parallelism of a storm topology, Understanding storm internal message buffers, etc).

So you read all these posts and and got yourself a running Storm cluster. You even wrote a topology that does something you need, and managed to get it deployed. “How cool is this?”, you think to yourself. “Extremely cool”, you reply to yourself sipping the morning coffee. The next step would probably be writing some sort of a validation procedure, to make sure your distributed Storm computation does what you think it does, and does it well. Here at Outbrain we have these validation processes running hourly, making sure our realtime layer data is consistent with our batch layer data – which we consider to be the source of truth.

It was when the validation of a newly written computation started failing, that we embarked on a great journey to the land of “How does one go about debugging a distributed Storm computation?”, true story. The validation process was reporting intermittent inconsistencies when, intermittent being the operative word here, since it was not like the new topology was completely and utterly messed up, rather, it was failing to produce correct results for some of the input, all the time (by correct results I mean such that match our source of truth).

Read more >

Leader Election with Zookeeper

Zoo

Recently we had to implement an active-passive redundancy of a singleton service in our production environment where the general rule is always have “more than one of anything”. The main motivation is to alleviate the need to manually monitor and manage these services, whose presence is crucial to the overall health of the site.

This means that we sometime have a service installed on several machines for redundancy, but only one of the is active at any given moment. If the active services goes down for some reason, another service rises to do its work. This is actually called leader election. One of the most prominent open source implementation facilitating the process of leader election is Zookeeper. So what is Zookeeper?

Originally developed by Yahoo reasearch, Zookeepr acts as a service providing reliable distributed coordination. It is highly concurrent, very fast and suitable mainly for read-heavy access patterns. Reads can be done against any node of a Zookeeper cluster while writes a quorum-based. To reach a quorum, Zookeeper utilizes an atomic broadcast protocol. So how does it work?

Read more >

Under the Hood of Our Algorithmic Engine – How We Serve Content Recommendations

Outbrain Algorithms Team

Let me tell you a little on how we actually give content recommendations here at Outbrain. This will be only a short introduction. We might elaborate on some of the below issues in future posts.

Our main goal is to serve good content recommendations to readers on the Internet. The typical situation is a user reading a content page. We want to recommend content for further reading, which is a “good” recommendation.

Read more >