Category: System Architecture

Migrating Elephants – How To Migrate Petabyte Scale Hadoop Clusters With Zero Downtime


Outbrain has been an early adopter of Hadoop and we, the team operating it, have acquired a lot of experience running it in production in terms of data ingestion, processing, monitoring, upgrading etc. This also means that we have a significant ecosystem around each cluster, with both open source and in-house systems.


A while back we decided to upgrade both the hardware and software versions of our Hadoop clusters.

“Why is that a big problem?” you might ask, so let me explain a bit about our current Hadoop architecture. We have two clusters of 300 machines in two different data centers, production and DR. Each cluster has a total dataset size of 1.5 PB with 5TB of compressed data loaded into it each day. There are ~10,000 job executions daily of about 1200 job definitions that were written by dozens of developers, data scientists and various other stakeholders within the company, spread across multiple teams around the globe. These jobs do everything from moving data into Hadoop (for ex. Sqoop or Mysql to Hive data loads), processing in Hadoop (for ex. running Hive, Scalding or Pig jobs), and pushing the results into external data stores (for ex. Vertica, Cassandra, Mysql etc.). An additional dimension of complexity originates from the dynamic nature of the system since developers, data scientists and researchers are pushing dozens of changes to how flows behave in production on a daily basis.

This system needed be migrated to run on new hardware, using new versions of multiple components of the Hadoop ecosystem, without impacting production processes and active users. A partial list of the components and technologies that are currently being used and should be taken into consideration is HDFS, Map-Reduce, Hive, Pig, Scalding and Sqoop. On top of that, of course, we have several more in-house services for data delivery, monitoring and retention that we have developed.

I’m sure you’ll agree that this is quite an elephant.

Storming Our Brains

We sat down with our users, and started thinking about a process to achieve this goal and quickly arrived at several guidelines that our selected process should abide by:

  1. Both Hadoop clusters (production and DR) should always be kept fully operational
  2. The migration process must be reversible
  3. Both value and risk should be incremental

After scratching our heads for quite a while, we came up with these options:


  1. In place: In place migration of the existing cluster to new version and then rolling the hardware upgrade by gradually pushing new machines into the cluster and removing the old machines. This is the simplest approach and you should probably have a very good reason to choose a different path if you can afford the risk. However since upgrading the system in place would expose clients to a huge change in an uncontrolled manner and is not by any means an easily reversible process we had to forego this option.
  2. Flipping the switch: The second option is to create a new cluster on new hardware, sync the required data, stop processing on the old cluster and move it to the new one. The problem here is that we still couldn’t manage the risk, because we would be stopping all processing and moving it to the new cluster. We wouldn’t know if the new cluster can handle the load or if each flow’s code is compatible with the new component’s version. As a matter of fact, there are a lot of unknowns that made it clear we had to split the problem into smaller pieces. The difficulty with splitting in this approach is that once you move a subset of the processing from the old cluster to the new, these results will no longer be accessible on the old cluster. This means that we would have had to migrate all dependencies of that initial subset. Since we have 1200 flow definitions with marvelous and beautiful interconnections between them, the task of splitting them would not have been practical and very quickly we found that we would have to migrate all flows together.
  3. Side by side execution: The 3rd option is to start processing on the new cluster without stopping the old cluster. This is a sort of an active-active approach, because both Hadoop clusters, new and old, will contain the processing results. This would allow us to migrate parts of the workload without risking interfering with any working pipeline in the old cluster. Sounds good, right?


First Steps

To better understand the chosen solution let’s take a look at our current architecture:

We have a framework that allows applications to push raw event data into multiple Hadoop clusters. For the sake of simplicity the diagram describes only one cluster. 

Once the data reaches Hadoop, processing begins to take place using a framework for orchestrating data flows we’ve developed in house that we like to call the Workflow Engine.

Each Workflow Engine belongs to a different business group. That Workflow Engine is responsible for triggering and orchestrating the execution of all flows developed and owned by that group. Each job execution can trigger more jobs on its current Workflow Engine or trigger jobs in other business groups’ Workflow Engines. We use this partitioning mainly for management and scale reasons but during the planning of the migration it provided us with a natural way to partition the workload, since there are very few dependencies between groups vs within each group.


Now that you have a better understanding of the existing layout you can see that the first step is to install a new Hadoop cluster with all required components of its ecosystem and begin pushing data into it.

To achieve this, we configured our dynamic data delivery pipeline system to send all events to the new cluster as well as the old, so now we have a new cluster with a fully operational data delivery pipeline:


Side by Side

Let’s think a bit about what options we had for running a side by side processing architecture.

We could use the same set of Workflow Engines to execute their jobs on both clusters, active and new. While this method would have the upside of saving machines and lower operational costs it would potentially double the load on each machine since jobs are assigned to machines in a static manner. This is due to the fact that each Workflow Engine is assigned a business group and all jobs that belong to this group are executed from it. To isolate the current production jobs execution from the ones for the new cluster we decided to allocate independent machines for the new cluster.


Let the Processing Commence!

Now that we have a fully operational Hadoop cluster running alongside our production cluster, and we now have raw data delivered into it, you might be tempted to say: “Great! Bring up a set of Workflow Engines and let’s start side by side processing!”.


Well… not really.


Since there are so many jobs and they doing varied types of operations we can’t really assume that letting them run side by side is a good idea. For instance, if a job calculates some results and then pushes them to MySql, these results will be pushed twice. Aside from doubling the load on the data bases for no good reason it may cause in some cases corruption or inconsistencies of the data due to race conditions. In essence, every job that writes to an external datasource should be allowed to run only once.


So we’ve described two types of execution modes a WorkflowEngine can have:

Leader: Run all the jobs!

Secondary: Run all jobs except those that might have a side effect external to that Hadoop cluster (e.g. write to external database or trigger an applicative service). This will be done automatically by the framework thus preventing any effort from the development teams.


When a Workflow Engine is in secondary mode, jobs executed from it can read from any source, but write only to a specific Hadoop cluster. That way they are essentially filling it up  and syncing (to a degree) with the other cluster.


Lets Do This…

Phase 1 of the migration should look something like this:


Notice that I’ve only included a Workflow Engine for one group in the diagram for simplicity but it will look similar for all other groups.

So the idea is to bring up a new Workflow Engine and give it the role of a migration secondary. This way it will run all jobs except for those writing to external data stores, thus eliminating all side effects external to the new Hadoop cluster.

By doing so, we were able to achieve multiple goals:

  1. Test basic software integration with the new Hadoop cluster version and all services of the ecosystem (hive, pig, scalding, etc.)
  2. Test new cluster’s hardware and performance compared to the currently active cluster
  3. Safely upgrade each business group’s Workflow Engine separately without impacting other groups.


Since the new cluster is running on new hardware and with a new version of Hadoop ecosystem, this is a huge milestone towards validating our new architecture. The fact the we managed to do so without risking any downtime that could have resulted from failing processing flows, wrong cluster configurations or any other potential issue was key in achieving our migration goals. 


Once we were confident that all phase 1 jobs were operating properly on the new cluster we could continue to phase 2 in which a migration leader becomes secondary and the secondary becomes a leader. Like this:


In this phase all jobs will begin running from the new Workflow Engine impacting all production systems, while the old Workflow Engine will only run jobs that create data to the old cluster. This method actually offers a fairly easy way to rollback to the old cluster in case of any serious failure (even after a few days or weeks) since all intermediate data will continue to be available on the old cluster.


The Overall Plan

The overall process is to push all Workflow Engines to phase 1 and then test and stabilize the system. We were able to run 70% (!) of our jobs in this phase. That’s 70% of our code, 70% of our integrations and APIs and at least 70% of the problems you would experience in a real live move. We were able to fix issues, analyze system performance and validate results. Only once everything seems to be working properly we can start pushing the groups to phase 2 one by one into a tested, stable new cluster.

Once again we benefit from the incremental nature of the process. Each business group can be pushed into phase 2 independently of other groups thus reducing risk and increasing our ability to debug and analyze issues. Additionally, each business group can start leveraging the new cluster’s capabilities (e.g. features from newer version, or improved performance) immediately after they have moved to phase 2 and not after we have migrated every one of the ~1200 jobs to run on the new cluster. One pain point that can’t be ignored is that inter-group dependencies can make this a significantly more complicated feat as you need to bring into consideration the state of multiple groups when migrating.


What Did We Achieve?

  1. Incremental Migration – Due to the fact that we had an active – active migration that we could apply on each business group, we benefited in terms of mitigating risk and gaining value from the new system gradually.
  2. Reversible process- since we kept all old workflowEngines (that executed their jobs on the old Hadoop cluster) in a state of secondary execution mode, all intermediate data was still being processed and was available in case we needed to revert groups independently from each other.
  3. Minimal impact on users – Since we defined an automated transition of jobs between secondary and leader modes users, didn’t need to duplicate any of their jobs.


What Now?

We have completed the upgrade and migration of our main cluster and have already started the migration of our DR cluster.

There are a lot more details and concerns to bring into account when migrating a production system at this scale. However, the basic abstractions we’ve introduced here, and the capabilities we’ve infused our systems with have equipped us with the tools to migrate elephants.

For more information about this project you can check out the video from Strata 2017 London where I discussed it in more detail.

Failure Testing for your private cloud – Introducing GomJabbar

Fork me on GitHub

TL;DR Chaos Drills can contribute a lot to your services resilience, and it’s actually quite a fun activity. We’ve built a tool called GomJabbar to help you run those drills.

Here at Outbrain we manage quite a large scale deployment of hundreds of services / modules, and thousands of hosts. We practice CI/CD, and implemented quite a sound infrastructure, which we believe is scalable, performant, and resilient. We do however experience many production issues on a daily basis, just like any other large scale organization. You simply can’t ensure a 100% fault free system. Servers will crash, run out of disk space, and lose connectivity to the network. Software will experience bugs, and erroneous conditions. Our job as software engineers is to anticipate these conditions, and design our code to handle them gracefully.

For quite a long time we were looking into ways of improving our resilience, and validate our assumptions, using a tool like Netflix’s Chaos Monkey. We also wanted to make sure our alerting system actually triggers when things go wrong. The main problem we were facing is that Chaos Monkey is a tool that was designed to work with cloud infrastructure, while we maintain our own private cloud.

The main motivation for developing such a tool, is that failures have the tendency of occurring when you’re least prepared, and in the least desirable time, e.g. Friday nights, when you’re out having a pint with your buddies. Now, to be honest with ourselves, when things fail during inconvenient times, we don’t always roll our sleeves and dive in to look for the root cause. Many times the incident will end after a service restart, and once the alerts clear we forget about it.

Wouldn’t it be great if we could have “chaos drills”, where we could practice handling failures, test and validate our assumptions, and learn how to improve our infrastructure?

Chaos Drills at Outbrain

We built GomJabbar exactly for the reasons specified above. Once a week, at a well known time, mid day, we randomly select a few targets where we trigger failures. At this point, the system should either auto-detect the failures, and auto-heal, or bypass them. In some cases alerts should be triggered to let teams know that a manual intervention is required.

After each chaos drill we conduct a quick take-in session for each of the triggered failures, and ask ourselves the following questions:

  1. Did the system handle the failure case correctly?
  2. Was our alerting strategy effective?
  3. Did the team have the knowledge to handle, and troubleshoot the failure?
  4. Was the issue investigated thoroughly?

These take-ins lead to super valuable inputs, which we probably wouldn’t collect any other way.

How did we kick this off?

Before we started running the chaos drills, there were a lot of concerns about the value of such drills, and the time it will require. Well, since eliminating our fear from production is one of the key goals of this activity, we had to take care of that first.

"I must not fear.
 Fear is the mind-killer.
 Fear is the little-death that brings total obliteration.
 I will face my fear.
 I will permit it to pass over me and through me.
 And when it has gone past I will turn the inner eye to see its path.
 Where the fear has gone there will be nothing. Only I will remain."

(Litany Against Fear - Frank Herbert - Dune)

So we started a series of chats with the teams, in order to understand what was bothering them, and found ways to mitigate it. So here goes:

  • There’s an obvious need to avoid unnecessary damage.
    • We’ve created filters to ensure only approved targets get to participate in the drills.
      This has a side effect of pre-marking areas in the code we need to take care of.
    • We currently schedule drills via, so teams know when to be ready, and if the time is inappropriate,
      we reschedule.
    • When we introduce a new kind of fault, we let everybody know, and explain what should they prepare for in advance.
    • We started out from minor faults like graceful shutdowns, continued to graceless shutdowns,
      and moved on to more interesting testing like faulty network emulation.
  • We’ve measured the time teams spent on these drills, and it turned out to be negligible.
    Most of the time was spent on preparations. For example ensuring we have proper alerting,
    and correct resilience features in the clients.
    This is actually something you need to do anyway. At the end of the day, we’ve heard no complaints about interruptions, nor time waste.
  • We’ve made sure teams, and engineers on call were not left on their own. We wanted everybody to learn
    from this drill, and when they were’nt sure how to proceed, we jumped in to help. It’s important
    to make everyone feel safe about this drill, and remind everybody that we only want to learn and improve.

All that said, it’s important to remember that we basically simulate failures that occur on a daily basis. It’s only that when we do that in a controlled manner, it’s easier to observe where are our blind spots, what knowledge are we lacking, and what we need to improve.

Our roadmap – What next?

  • Up until now, this drill was executed in a semi-automatic procedure. The next level is to let the teams run this drill on a fixed interval, at a well known time.
  • Add new kinds of failures, like disk space issues, power failures, etc.
  • So far, we were only brave enough to run this on applicative nodes, and there’s no reason to stop there. Data-stores, load-balancers, network switches, and the like are also on our radar in the near future.
  • Multi-target failure injection. For example, inject a failure to a percentage of the instances of some module in a random cluster. Yes, even a full cluster outage should be tested at some point, in case you were asking yourself.

The GomJabbar Internals

GomJabbar is basically an integration between a discovery system, a (fault) command execution scheduler, and your desired configuration. The configuration contains mostly the target filtering rules, and fault commands.

The fault commands are completely up to you. Out of the box we provide the following example commands, (but you can really write your own script to do what suits your platform, needs, and architecture):

  • Graceful shutdowns of service instances.
  • Graceless shutdowns of service instances.
  • Faulty Network Emulation (high latency, and packet-loss).

Upon startup, GomJabbar drills down via the discovery system, fetches the clusters, modules, and their instances, and passes each via the filters provided in the configuration files. This process is also performed periodically. We currently support discovery via consul, but adding other methods of discovery is quite trivial.

When a users wishes to trigger faults, GomJabbar selects a random target, and returns it to the user, along with a token that identifies this target. The user can then trigger one of the configured fault commands, or scripts, on the random target. At this point GomJabbar uses the configured CommandExecutor in order to execute the remote commands on the target hosts.

GomJabbar also maintains a audit log of all executions, which allows you to revert quickly in the face of a real production issue, or an unexpected catastrophe cause by this tool.

What have we learned so far?

If you’ve read so far, you may be asking yourself what’s in it for me? What kind of lessons can I learn from these drills?

We’ve actually found and fixed many issues by running these drills, and here’s what we can share:

  1. We had broken monitoring and alerting around the detection of the integrity of our production environment. We wanted to make sure that everything that runs in our data-centers is managed, and at a well known (version, health, etc). We’ve found that we didn’t compute the difference between the desired state, and the actual state properly, due to reliance on bogus data-sources. This sort of bug attacked us from two sides: once when we triggered graceful shutdowns, and once for graceless shutdowns.
  2. We’ve found services that had no owner, became obsolete, and were basically running unattended in production. The horror.
  3. During the faulty network emulations, we’ve found that we had clients that didn’t implement proper resilience features, and caused cascading failures in the consumers several layers up our service stack. We’ve also noticed that in some cases, the high latency also cascaded. This was fixed by adding proper timeouts, double-dispatch, and circuit-breakers.
  4. We’ve also found that these drills motivated developers to improve their knowledge about the metrics we expose, logs, and the troubleshooting tools we provide.


We’ve found the chaos drills to be an incredibly useful technique, which helps us improve our resilience and integrity, while helping everybody learn about how things work. We’re by no means anywhere near perfection. We’re actually pretty sure we’ll find many many more issues we need to take care of. We’re hoping this exciting new tool will help us move to the next level, and we hope you find it useful too 😉

ScyllaDB POC – (not so :) live blogging – Update #3.

Scylla attacking Olysseus's ship

Scylla attacking Olysseus’s ship

Hi all

It has been a long time (more than 4 months) since we last updated.

You can read the previous update here.

It is not that we abandoned the POC, we actually continued to invest time and effort on it since there is a good progress. It is just that we did not yet ended it and got the proofs that we wanted. While there was a lot of progress in both Outbrain system side and ScyllaDB side on those 4 months, there is one things that is holding us back from showing trying to prove the main point of this POC. Our current bottleneck is the network. The network on the datacenter where we are running the tests on is 1Gbps ethernet network. We found out that although Scylla is not loaded and works with good latencies we are saturating the NICs. We did some improvements along the way to still show that Scylla is behaving better than C* but if we want to show that we can significantly reduce the number of nodes in the cluster, we need to upgrade to 10Gbps ethernet.

This upgrade will come shortly.


This is where we currently stand. However – a lot was done in those 4 months and there is a lot of learnings I want to share. The rest of the post is the way Doron and the Scylla guys describes what happened. It looks more like captain’s log but it tells the story pretty well.



  • 23/6 – We created special app server cluster to call Scylla, and delegated all calls both to C* and Scylla cluster. We wanted to do that so there will be less coupling between the C* path and the Scylla path and less mutual interruptions that will interfere our conclusions. The app servers for Scylla were configured not to use cache, so entire load (1.5M-3M RPM) went directly to Scylla. C* stayed behind cache and actually handled ~10% of the load. This went smoothly.
  • In the following ~3 weeks we tried to load the snapshot again from C* and stumbled with some difficulties, some related to bugs in Scylla, some to networking limits (1Gpbs). During this time we had to stop the writes to Scylla for few days, so the data was not sync again. Some actions we have done to resolve
    1. We investigated bursts of usage we had and decreased them in some use-cases (both for C* and Scylla). They caused the network usage to be very high for few seconds, sometimes for a few tens of milliseconds. This also helped C* a lot. The tool is now open source.
    2. We added client-server compression (LZ4). It was supported by Scylla, but client needed to configure it.
    3. Scylla added server-server compression during this period.
    4. Changed the “multi” calls back to N parallel single calls (instead of one IN request) – it better utilize the network.
    5. Scylla client was (mistakably) using latency aware over the token aware. This caused app to go to the “wrong” node a lot – causing more traffic within Scylla nodes. Removing the latency-aware helped reducing the server-server network usage and the overall latency.
  • 14/7 – with all the above fixes (and more from Scylla) we were able to load the data and stabilize the cluster with the entire load.
  • Until 29/7 I see many spikes in the latency. We are  not sure what we did to fix it… but on 29/7 the spikes stopped and the latency is stable until today.
  • During this period we have seen 1-5 errors from Scylla per minute. Those errors were explained by trying to reach partitions coming from very old C* version. It was verified by logging the partitions we fail for in the app server side. Scylla fixed that on 29/7.
  • 7/8-15/8 – we have changed the consistency level of both Scylla and C* to local one (to test C*) – this caused a slight decrease in the (already) low latencies.
  • Up to 19/8 we have seen occasional momentarily errors coming from Scylla (few hundreds every few hours). This has not happened since 19/8.. I don’t think we can explain why.
  • Current latencies – Scylla holds over 2M RPM with latency of 2 ms (99%ile) for single requests and 40-50 ms (99%ile) for multi requests of ~100 partitions in avg per request. Latencies are very stable with no apparent spikes. All latencies are measured from the app servers.


Next steps on Scylla:

  • Load the data again from C* to sync.
  • Repeat the data consistency check to verify the results from C* and Scylla are still the same.
  • Run repairs to see cluster can hold while running heavy tasks in the background..
  • Try to understand the errors I mentioned above if they repeat.
  • Create the cluster in Outbrain’s new Sacramento datacenter that have 10Gbps network. with minimum nodes (3?) and try the same load there.



  • 7/8 – we changed consistency level to local-one and tried to remove cache from Cassandra. The test was successful and Cassandra handled the full load with latency increasing from 5 ms (99%ile for single requests) to 10-15ms in the peak hours.
  • 15/8 – we changed back to local-quorum (we do not like to have local-one for this cluster… we can explain in more details why) and set the cache back.
  • 21/8 – we removed the cache again, this time with local-quorum. Cassandra handled it, but single requests latency increased to 30-40 ms for the 99%ile in the peak hours. In addition, we have started timeouts from Cassandra (timeout is 1 second) – up to 500 per minute, in the peak hours.


Next steps on C*:

  • Run repairs to see cluster can hold while running heavy tasks in the BG.
  • Try compression (like we do with Scylla).
  • Try some additional tweaks by the C* expert.
  • In case errors continue, will have to set cache back.


Current status comparison – Aug 20th:


The following table shows comparison under load of 2m RPM in peak hours.

Latencies are in 99%ile.


Cassandra ScyllaDB
Single call latency 30-40 ms (spikes to 70) 2 ms
Multi call latency 150-200 ms (spikes to 600) 40-50 ms
Errors (note: these are query times exceeding 1 second, not necessarily database failures) Up to 150 a minute every few minutes timeouts per minute, with some higher spikes every few days few hundreds every few days


Below are the graphs showing the differences.

Latency and errors graphs showing both C* and Scylla getting requests without cache (1M-2M RPM):

Latency comparison of single requests

Screen Shot 2016-08-27 at 9.36.00 PM

Latency comparison of multi requests

Screen Shot 2016-08-27 at 9.36.27 PM

Errors (timeouts for queries > 1 second)

Screen Shot 2016-08-27 at 9.36.42 PM



There are very good signs that Scylla DB does make a difference in throughput but due to the network bottleneck we could not verify it. We will update as soon as we have results on a faster network. Scylla guys are working on solution for slower networks too.


Hope to update soon.

Micro Service Split


In this post I will describe a technical methodology we used to remove a piece of functionality from a Monolith/Service and turn it into a Micro-Service. I will try to reason about some of the decisions we have made and the path we took, as well as a more detailed description of internal tools, libraries and frameworks we use at Outbrain and in our team, to shed some light on the way we work in the team. And as a bonus you might learn from our mistakes!
Let’s start with the description of the original service and what it does.
Outbrain runs the largest content discovery platform. From the web surfer’s perspective it means serving a recommended content list that might interest her, in the form of ‘You might also like’ links. Some of those impression links are sponsored. ie: when she clicks on a link, someone is paying for that click, and the revenue is shared between Outbrain and the owner of the page with the link on it. That is how Outbrain makes its revenue.

My team, among other things, is responsible for the routing of the user to the requested page after pressing the link, and for the bookkeeping and accounting that is required in order to calculate the cost of the click, who should be charged, etc.
In our case the service we are trying to split is the ‘Bookkeeper’. Its primary role is to manage the paid impression links budget. After a budget is spent, The ‘Bookkeeper’ should notify Outbrain’s backend servers to refrain from showing the impression link again. And this has to be done as fast as possible. If not, people will click on links we cannot charge because the budget was already spent. Technically, this is done by an update to a database record. However, there are other cases we might want to stop the exposure of impression links. One such an example is a request from the customer paying for the future click to disable the impression link exposure. So for such cases we have an API endpoint that does exactly the same with the same code. That endpoint is actually part of the ‘Bookkeeper’ that is enabled by a feature toggle on specific machines. This ‘Activate-Impressionable’ endpoint as we call it, is what was decided to split out of the ‘Bookkeeper’ into a designated Micro-Service .
In order to execute the split, we have chosen a step-by-step strategy that will allow us to reduce the risk during execution and keep it as controlled and reversible as possible. From a bird’s eye view I will describe it as a three steps process: Plan, Up and Running as fast as possible and Refactor. The rest of the post describes these steps.

Plan (The who’s and why’s)

In my opinion this is the most important step. You don’t want to split a service just in order to split. Each Micro Service introduces maintenance and management overhead, with its own set of challenges[1]. On the other hand, Microservices architecture is known for its benefits such as code maintainability (for each Micro Service), the ability to scale out and improved resilience[2].
Luckily for me, someone already did that part for me and took the decision that ‘Activate-Impressionable’ should split from the ‘Bookkeeper’. But still, Let’s name some of the key factor of our planning step.
Basically I would say that a good separation is a logical separation with its own non-overlap RESTful endpoints and isolated code base. The logical separation should be clear. You should think what is the functionality of the new service, and how isolated it is. It is possible to analyze the code for inter-dependencies among classes and packages using tools such as lattix. At the bottom line, it is important to have a clear definition of the responsibility of the new Micro Service.
In our case, the ‘Bookkeeper’ was eventually split so that it remain the bigger component, ‘Activate-Impressionable’ was smaller and the common library was smaller than both. The exact lines of code can be seen in the table below.

Screen Shot 2016-02-21 at 11.21.56

Unfortunately I assessed it only after the split and not in the plan step. We might say that there is too much code in common when looking at those numbers. It is something worth considering when deciding what to split. A lot of common code implies low isolation level.

Of course part of the planning is time estimation. Although I am not a big fan of “guestimates” I can tell that the task was planned for couple of weeks and took about that long.
Now that we have a plan, let’s get to work.

Up and Running – A Step by Step guide

As in every good refactor, we want to do it in small baby steps, and remain ‘green’ all the time[3]. In continuous deployment that means we can and do deploy to production as often as possible to make sure it is ‘business as usual’. In this step we want to get to a point the new service is working in parallel to the original service. At the end of this step we will point our load-balancers to the new service endpoints. In addition, the code remains mutual in this step, means we can always deploy the original fully functioning ‘Bookkeeper’. We actually do that if we feel the latest changes had any risk.
So let’s break it down into the actual phases:

Overview Step Details
micro service split 0 Starting phase
micro service split 1 Create the new empty Micro-Service ‘Activate-Impressionable’. In outbrain we do it using scaffolding of ob1k framework. Ob1k is an open source Micro Services Framework that was developed in-house.
micro service split 2 Create a new empty Library dependent both by the new ‘Activate-Impressionable’ service and the ‘Bookkeeper’. Ideally, if there is a full logic separation with no mutual code between the services that library will be deleted in the cleanup phase.
micro service split 3 Move the relevant source code to the library. Luckily in our case, there was one directory that was clearly what we have to split out. Unluckily, that code also pulled up some more code it was dependent on and this had to be done carefully not to pull too much nor too little. The good news are that this phase is pretty safe for static typing languages such as Java, in which our service is written in. The compiler protects us here with compilation errors so the feedback loop is very short. Tip: don’t forget to move unit tests as well.
micro service split 4 Move common resources to the library, such as spring beans defined in xml files and our feature flags files that defined in yaml files. This is the dangerous part. We don’t have the compiler here to help, so we actually test it in production. And when I say production I mean using staging/canary/any environment with production configuration but without real impact. Luckily again, both yaml and spring beans are configured to fail fast, so if we did something wrong it will just blow out in our face and the service will refuse to go up. For this step I even ended up developing a one-liner bash script to assist with those wicked yaml files.
micro service split 5 Copy and edit web resources (web.xml) to define the service endpoints. In our case web.xml can’t reside in a library so it has to be copied. Remember we still want the endpoints active in the ‘Bookkeeper’ at that phase. Lesson learned: inspect all files closely. In our case log4j.xml which seems like an innocent file by its name contains designated appenders that are consumed by other production services. I didn’t notice that and didn’t move the required appender, and it was found only later in production.
Deploy Deploy the new service to production. What we did is deploy the ‘Activate-Impressionable’ side-by-side on the same machines as the ‘Bookkeeper’, just with a different ports and context path. Definitely makes you sleep better at night.
Up-And-Running Now is a good time to test once again if both ‘Bookkeeper’ and ‘Activate-Impressionable’ are working as expected. Generally now we are up and running with only few more things to do here.
Clients Redirect Point clients of the service to the new endpoints (port + context path). A step that might take some time depends on the number of clients and the ability to redeploy them. In outbrain we use HA-Proxy, so reconfiguring it did most of the work, but some clients did require code modifications.
(More) Validation Move/copy simulator tests and monitors. In our team, we heavily rely on tests we call simulator tests. These are actually black-box tests written in JUnit that runs against the service installed on a designated machine. These tests see the service as a black-box and calls its endpoints while mock/emulate other services and data in the database for the test run. So usually a test run can look like: put something in the database, trigger the endpoint, and see the result in the database or in the http response. There is also a question here whether to test ‘Activate-Impressionable’ or the ‘Bookkeeper’. Ideally you will test them both (tests are duplicated for that phase), and that is what we did.


Refactor, Disconnect & Cleanup

When we got here the new service is working and we should expect no more behaviour changes from the endpoints point of view. But we still want the code to be fully split and the services to be independent from each other. In the previous step we performed the phases in a way that everything remains reversible with a simple feature toggle & deploy.

In this step we move to a state where the ‘Bookkeeper’ will no longer host the ‘Activate-Impressionable’ functionality. Sometimes it is a good idea to have a gap from the previous step to make sure that there are no rejections and backfires that we didn’t trace in our tests and monitoring.
First thing, If was not done up until now, is deploying the ‘Bookkeeper’ without the service functionality and make sure everything is still working. And wait a little bit more…
And now we just have to push the sources and the resources from the library to the ‘Activate-Impressionable’ service. In the ideal case there is no common code, we can also delete the library. This was not how it was in our case. We still have a lot of common code we can’t separate for the time being.
Now is also the time to do resources cleanup, web.xml edit etc’.
And for the bold and OCD among us – packages rename and refactor of code with the new service naming conventions.


The entire process in our case took a couple of weeks. Part of the fun and advantage in such process, is the opportunity to know better an old code and its structure and functionality without the need to modify something for a new feature with its constraints. Especially when someone else wrote it originally.
In order to perform well such a process it is important to plan and remain organized and on track. In case of a context switch it is very important to keep a bookmark of where you need to return to in order to continue. In our team we even did that with a handoff of the task between developers. Extreme Programming, it is.
It is interesting to see the surprising results in terms of lines of code. Originally we thought of it as splitting a micro-service from a monolith. In retrospective, it looks to me more like splitting a service into two services. ‘Micro’ in this case is in the eye of the beholder.



Real Time Performance Monitoring @ Outbrain

Outbrain serves millions of requests per minute, based on a micro service architecture.Consequently, as you might expect, visibility and performance monitoring are crucial.

Serving millions of requests per minute, across multiple data centers, in a micro services environment, is not an easy task. Every request is routed to many applications, and may potentially stall or fail at every step in the flow. Identifying bottlenecks, troubleshooting failures and knowing our capacity limits are all difficult tasks. Yet, these are not things you can just give up on “because they’re hard”, but are rather tasks that every engineer must be able to tackle without too much overhead. It is clear that we have to aim for all engineers to be able to understand how their applications are doing at any given time.

Since we face all of these challenges every day, we’ve reached the point where a paradigm shift was required. For example, move from the old, familiar “investigate the past” to the new, unfamiliar “investigate the present”. That’s only one of the requirements we came up with. Here are few more:


Real time visibility

Sounds pretty straightforward, right? However when using a persistent monitoring system, it always has at least few minutes of delay. These few minutes might contain millions of errors that potentially affect your business. Aiming for low MTTR means cutting delays where possible, thus moving from minute-based granularity to second-based.


Throughput, Latency and error rate are linked

Some components might suffer from high latency, but maybe the amount of traffic they receive is negligible. Others might have low latency under high load, but that’s only because they fail fast for almost every request (we are reactive!). We wanted to view these metrics together, and rank them by importance.


Mathematical correctness at any given aggregation (Don’t lie!)

When dealing with latency, one should look at percentiles, not averages, as averages can be deceiving and might not tell the whole story. But what if we want to view latency per host, and then view it per data center ? if we store only percentiles per host (which is highly common in our industry), it is not mathematically correct to average them! On the other hand, we have so much traffic that we can’t just store any measurement with its latency; and definitely not view them all in real time


Latency resolution matters

JVM based systems tend to display crazy numbers when looking at the high percentiles (how crazy ? With heavy gc storms and lock contention there is no limit to how worse these values can get). It’s crucial for us to differentiate between latency in the 99.5 and 99.9 percentiles, while values at the 5 or 10 percentiles don’t really matter.

Summing up all of the requirements above, we reached a conclusion that our fancy persistent monitoring system, with its minute-based resolution, supporting millions of metrics per minute, doesn’t cut it anymore. We like it that every host can write thousands of metric values every minute, and we like being able to view historical data over long periods of time, but moving forward, it’s just not good enough. So, as we often do, we sat down to rethink our application-level metric collection and came up with a new, improved solution.


Our Monitoring Unit

First, consider metric collection from the application perspective. Logically, it is an application’s point-of-view of some component: a call to another application, to a backend or plain CPU bound computation. Therefore, for every component, we measure its number of requests, failures, timeouts and push backs along with a latency histogram over a short period of time.

In addition, we want to see the worst performing hosts in terms of any such metric (can be mean latency, num errors, etc)


To achieve this display for each measured component we decided to use these great technologies:


HDR Histograms

HdrHistogram supports the recording and analysis of sampled data value counts, across a configurable value range, with configurable value precision within the range. It is designed for recording histograms of latency measurements in performance-sensitive applications.

Why is this important? Because when using such histograms to measure the latency of some component, it allows you to have good accuracy of the values in the high percentiles at the expense of the low percentiles

So, we decided to store in memory instances of histograms (as well as counters for requests, errors, timeouts, push backs, etc) for each measured component. We then replace them each second and expose these histograms in the form of rx.Observable using our own OB1K application server capabilities.

All that is left is to aggregate and display.

Java Reactive extensions

rx is a great tool to merge and aggregate streams of data in memory. In our case, we built a service to merge raw streams of measured components; group them by the measured type, and aggregate them in a window of a few seconds. But here’s the trick – we do that on demand. This allows us to let the users view results grouped by any dimension they desire without losing the mathematical correctness of latency histograms aggregation.

Some examples on the operators we use to aggregate the multiple monitoring units:



rx merge operator enables treating multiple streams as a single stream



rx window operator enables sliding window abstraction



rx scan operator enables aggregation over each window


To simplify things, we can say that for each component we want to display, we connect to each machine to fetch the monitored stream endpoint, perform ‘merge’ to get a single stream abstraction, ‘window’ to get a result per time unit, and ‘scan’ to perform the aggregation


Hystrix Dashboard

The guys at Netflix found a great formula for displaying serving components’ status in a way that links between volume, error percentage and latency in a single view. We really liked that, so we adopted this UI to show our aggregated results.

The hystrix dashboard view of a single measured component shows counters of successes, failures, timeouts and push backs, along with a latency histogram, information on the number of hosts, and more. In addition, it provides a balloon view, which grows/shrinks with traffic volume per component, and is color-coded by the error rate.

See below how this looks in the breakdown view of all request components. The user gets a view of all measured components, sorted by volume, with a listing of the worst performing hosts.


Another example shows the view of one application, with nothing but its entry points, grouped by data center. Our Operations guys find this extremely useful when needing to re-balance traffic across data centers.



OK, so far so good. Now let’s talk about what we actually do with it.


Sometimes an application doesn’t meet its SLA, be it in latency or error rate. The simple case is due to a broken internal component (for example, some backend went down and all calls to it result in failures). At this point we can view the application dashboard and easily locate the failing call. A more complex use case is an increase in the amount of calls to a high latency component at the expense of a low latency one (for example, cache hit rate drop). Here our drill down will need to focus on the relative amount of traffic each component receives – we might be expecting a 1:2 ratio, while in reality we might observe a 1:3 ratio.

With enough alerting in place, this could be caught by an alert. Having the real time view will allow us to locate the root cause quickly even when the alert is a general one.


Performance comparison

In many cases we want to compare the performance of two groups of hosts doing the same operation, such as version upgrades or topology changes. We use tags to differentiate groups of machines (each datacenter is a tag, each environment, and even each hostname). We then can ask for a specific metric, grouped by tags, to get the following view:



Load testing

We conduct several types of load tests. One is where we shift as much traffic as possible to one data center, trying to hit the first system-wide bottleneck. Another is performed on specific applications. In both cases we use the application dashboard to view the bottlenecks, just like we would when troubleshooting unexpected events.

One thing to keep in mind is that when an application is loaded, sometimes the CPU is bounded and measurements are false because threads just don’t get CPU time. Another case where this happens is during GC. In such cases we must also measure the effects of this phenomenon.

The measured unit in this case is ‘jvm hiccup’, which basically means taking one thread, letting it sleep for a while and measuring “measurement time on top of the sleep time”. Low hiccups means we can rely on the numbers presented by other metrics.



What’s next?

Real time monitoring holds a critical role in our everyday work, and we have many plans to leverage these measurements. From smarter metric driven load balancing in the client to canary deployments based on real application behavior – there is no limit to what you can achieve when you measure stuff in a fast, reliable manner.

Goodbye static CNAMEs, hello Consul

Nearly every large scale system becomes distributed at some point: a collection of many instances and services that compose the solution you provide. And as you scale horizontally to provide high availability, better load distribution, etc…, you find yourself spinning up multiple instances of services, or using systems that function in a clustered architecture. That’s all cool in theory, but soon after you ask yourself, “how do I manage all of this? How should these services communicate with each other? And how do they even know what instances (or machines) exist?”

Those are excellent questions!

What methods are in use today?

The naive approach, which we’d followed in Outbrain for many years, is to route all inter-service traffic via load balancers (HAProxy in our case). Every call to another system, such as a MySql slave, is done to the load balancer (one in a pool of many), via an agreed upon name, such as a DNS CNAME. The load balancer, which holds a static configuration of all the different services and their instances, directs the call to one of those instances, based on the predefined policy.

backend be_onering_es   ## backend name
  balance leastconn     ## how to distribute load
  option httpchk GET /  ## service health check method
  option httpclose      ## add “Connection: close” header if missing
  option forwardfor     ## send client IP through XFF header
  server ringdb-20001 ringdb-20001:9200 check slowstart 10s weight 100   ## backend node 1
  server ringdb-20002 ringdb-20002:9200 check slowstart 10s weight 100   ## backend node 2

The load balancer is also responsible for checking service health, to make sure requests are routed only to live services, as dead ones are “kicked out of the pool”, and revived ones are brought back in.

An alternative to the load balancer method, used in high throughput systems such as Cassandra, is configuring CNAMEs that point to specific nodes in the cluster. We then use those CNAMES in the consuming applications’s configuration. The client is then responsible to activate a policy of balancing between those nodes, both for load and availability.

OK, so what’s the problem here?

There’s a few actually:

  1. The mediator (Load balancer), as quick as it may be in processing requests (and HAProxy is really fast!), is another hop on the network. With many services talking to each other, this could prove a choke point in some network topologies. It’s also a shared resource between multiple services and if one service misbehaves, everyone pays the price. This is especially painful with big payloads.
  2. The world becomes very static! Moving services between hosts, scaling them out/in, adding new services – it all involves changing the mediator’s config, and in many cases done manually. Manual work requires expertise and is error prone. When the changes becomes frequent… it simply does not scale.
  3. When moving ahead to infrastructure that is based on containers and resource management, where instances of services and resources are allocated dynamically, the whole notion of HOSTNAME goes away and you cannot count on it in ANY configuration.

What this all adds up to is “the end of the static configuration era”. Goodbye static configs, hello Dynamic Service Discovery! And cue Consul.

What is Consul?

In a nutshell, Consul is a Service Discovery System, with a few interesting features:

  1. It’s a distributed system, made out of an agent in each node. Nodes talk to each other via a gossip protocol, making node discovery simple, robust, and dynamic. There’s no configuration file describing all members of a Consul cluster.
  2. It’s fault tolerant by design, and using concepts such as Anti Entropy, gracefully handles nodes disappearing and reappearing – a common scenario in VM/container based infrastructure.
  3. It has first-class treatment of datacenters, as self-contained, interconnected entities. This means that DC failure / disconnection would be self-contained. It also means that a node in one DC can query for information in another DC with as little knowledge as the remote DC’s name.
  4. It holds the location (URI) and health of every service on every host, and makes this data available via multiple channels, such as a REST API and GUI. The API also lets you make complex queries and get the service data segment you’re interested in. For example: Get me all the addresses of all instances of service ‘X’ from Datacenter ‘Y’ in ‘staging env’ (tag).
  5. There is a very simple way to get access to “Healthy” service instances by leveraging the Consul DNS interface. Perfect for those pesky 3rd party services whose code you can’t or don’t want to modify, or just to get up and running quickly without modifying any client code (disclaimer: doesn’t fit all scenarios).

How does Consul work?

You can read all about it here, but let me take you through a quick tour of the architecture:

click to enlarge

As you can see, Consul has multi datacenter awareness built right in (you can read more about it here). But for our case, let’s keep it simple, and look at the case of a single datacenter (Datacenter 1 in the diagram).

What the diagram tags as “Clients” are actually “Consul agents”, running locally on every participating host. Those talk to each other, as well as the Consul servers (which are “agents” configured as Servers), through a “Gossip protocol”. If you’re familiar with Cassandra, and that rings a bell, then you’re right, it’s the same concept used by Cassandra nodes to find out which ones are up or down in a cluster. A Gossip protocol essentially makes sure “Everybody knows Everything about Everyone”. So within reasonable delay, all agents know (and propagate) state information about other agents. And you just so happen to have an agent running locally on your node, ready to share everything it knows via API, DNS or whatnot. How convenient!

Agents are also the ones performing health checks to the services on the hosts they run on, and gossiping any health state changes. To make this work, every service must expose a means to query its health status, and when registered with its local Consul agent, also register its health check information. At Outbrain we use an HTTP based “SelfTest” endpoint that every one of our homegrown services exposes (through our OB1K container, practically for free!).

Consul servers are also part of the gossip pool and thus propagate state in the cluster. However, they also maintain a quorum and elect a leader, who receives all updates (via RPC calls forwarded from the other servers) and registers them in it’s database. From here on, the data is replicated to the other servers and propagated to all the agents via Gossip. This method is a bit different from other Gossip based systems that have no servers and leaders, but it allows the system to support stronger consistency models.

There’s also a distributed key-value store we haven’t mentioned, rich ACLs, and a whole ecosystem of supporting and derived tools… but we said we’d keep it simple for now.

Where does that help with service discovery?

First, what we’ve done is taken all of our systems already organized in clusters and registered them with Consul. Systems such as Kafka, Zookeeper, Cassandra and others. This allows us to select a live service node from a cluster, simply by calling a hostname through the Consul DNS interface. For example, take Graphite: Outbrain’s systems are currently generating ~4M metrics per minute. Getting all of these metrics through a load balancer, or even a cluster of LBs, would be suboptimal, to say the least. Consul allows us to have each host send metrics to a hostname, such as “graphite.service.consul”, which returns a random IP of a live graphite relay node. Want to add a couple more nodes to share the load? no problem, just register them with Consul and they automagically appear in the list the next time a client resolves that hostname. Which, as we mentioned, happens quite a few times a minute. No load balancers in the way to serve as choke points, no editing of static config files. Just simple, fast, out-of-band communication.

How do these 3rd party services register?

We’re heavy users of Chef, and have thus created a chef cookbook to help us get the job done. Here’s a (simplified) code sample we use to register Graphite servers:

ob_consul 'graphite' do
  owner 'ops-vis'         ## add ‘owner’ tag to identify owning group
  port 1231               ## port the service is running on
  check_cmd "echo '' | nc localhost 1231 || exit 2"    ## health check shell command
  check_interval '60s'    ## health check execution interval
  template false          ## whether the health check command is a Chef template (for scripts)
  tags [‘prod’]           ## more tags

How to do clients consume services?

Clients simply resolve the DNS record they’re interested in… and that’s it. Consul takes care of all the rest, including randomizing the results.

$ host graphite is an alias for relayng.service.consul.
relayng.service.consul has address
relayng.service.consul has address

How does this data reach the DNS?

We’ve chosen to place Consul “behind” our internal DNS servers, and forward all requests for the “consul” domain name to a consul agent running on the DNS servers.

zone "consul" IN {
    type forward;
    forward only;
    forwarders { port 8600; };

Note that there’s other ways to go about this, such as routing all DNS requests to the local Consul agent running on each node, and having it forward everything “non-Consul” to your DNS servers. There’s advantages and disadvantages to each approach. For our current needs, having an agent sit behind the DNS servers works quite well.

Where does the Consul implementation at Outbrain stand now?

At Outbrain we’re already using Consul for:

  • Graphite servers.
  • Hive Thrift servers that are Hive interfaces to the Hadoop cluster they’re running on. Here the Consul CNAME represents the actual Hadoop cluster you want your query to run on. We’ve also added a layer that enables accessing these clusters from different datacenters using Consul’s multi-DC support.
  • Kafka servers.
  • Elasticsearch servers.

And our roadmap for the near future:

  • MySql Slaves – so we can eliminate the use of HAProxy in that path.
  • Cassandra servers where maintaining a list of active nodes in the app configuration becomes stale over time.
  • Prometheus – our new monitoring and alerting system.
  • Zookeeper clusters.


But that’s not all! stay tuned for more on Consul, client-side load balancing, and making your environment more dynamic.

So Long Spring XMLs

Like many java projects these days, we use Spring in Outbrain for configuring our java dependencies wiring. Spring is a technology that started in order to solve a common, yet not so simple, issue – wiring all the dependencies in a java project. This was done by utilizing the IoC (Inversion of Control) principles. Today Spring does a lot more than just wiring and bootstrapping, but in this post I will focus mainly on that.

When Spring just started, the only way to configure the wirings of an application, was to use XMLs which defined the dependencies between different beans. As Spring had continued to develop, 2 more methods were added to configure dependencies – the annotation method and the @Configuration method. In Outbrain we use XML configuration. I found this method has a lot of pain points which I found remedy to using spring @Configuration

What is this @Configuration class?

You can think of a @Configuration class just like XML definitions, only defined by code. Using code instead of XMLs allows some advantages over XMLs which made me switch to this method:

  1. No typos – You can’t have a typo in code. The code just won’t compile
  2. Compile time check (fail fast) – With XMLs it’s possible to add an argument to a bean’s constructor but to forget to inject this argument when defining the bean in the XML. Again, this can’t happen with code. The code just won’t compile
  3. IDE features come for free – Using code allows you to find usages of the bean’s constructor to find out easily the contexts that use it; It allows you to jump back and forth between beans definitions and basically everything you can do with code, you get for free.
  4. Feature flags – In Outbrain we use feature-flags a lot. Due to the continuous-deployment culture of the company, a code that is pushed to the trunk can find itself in production in a matter of minutes. Sometimes, when developing features, we use feature flags to enable/disable certain features. This is pretty easy to do by defining 2 different implementations to the same interface and decide which one to load according to the flag. When using XMLs we had to use the alias feature which makes it not intuitive enough to create feature-flags. With @Configuration, we can create a simple if clause for choosing the right implementation.

Read more >

Finding a needle in a Storm-stack

Using Storm for real time distributed computations has become a widely adopted approach, and today one can easily find more than a few posts on Storm’s architecture, internals, and what have you (e.g., Storm wiki, Understanding the parallelism of a storm topology, Understanding storm internal message buffers, etc).

So you read all these posts and and got yourself a running Storm cluster. You even wrote a topology that does something you need, and managed to get it deployed. “How cool is this?”, you think to yourself. “Extremely cool”, you reply to yourself sipping the morning coffee. The next step would probably be writing some sort of a validation procedure, to make sure your distributed Storm computation does what you think it does, and does it well. Here at Outbrain we have these validation processes running hourly, making sure our realtime layer data is consistent with our batch layer data – which we consider to be the source of truth.

It was when the validation of a newly written computation started failing, that we embarked on a great journey to the land of “How does one go about debugging a distributed Storm computation?”, true story. The validation process was reporting intermittent inconsistencies when, intermittent being the operative word here, since it was not like the new topology was completely and utterly messed up, rather, it was failing to produce correct results for some of the input, all the time (by correct results I mean such that match our source of truth).

Read more >

Leader Election with Zookeeper


Recently we had to implement an active-passive redundancy of a singleton service in our production environment where the general rule is always have “more than one of anything”. The main motivation is to alleviate the need to manually monitor and manage these services, whose presence is crucial to the overall health of the site.

This means that we sometime have a service installed on several machines for redundancy, but only one of the is active at any given moment. If the active services goes down for some reason, another service rises to do its work. This is actually called leader election. One of the most prominent open source implementation facilitating the process of leader election is Zookeeper. So what is Zookeeper?

Originally developed by Yahoo reasearch, Zookeepr acts as a service providing reliable distributed coordination. It is highly concurrent, very fast and suitable mainly for read-heavy access patterns. Reads can be done against any node of a Zookeeper cluster while writes a quorum-based. To reach a quorum, Zookeeper utilizes an atomic broadcast protocol. So how does it work?

Read more >

Under the Hood of Our Algorithmic Engine – How We Serve Content Recommendations

Outbrain Algorithms Team

Let me tell you a little on how we actually give content recommendations here at Outbrain. This will be only a short introduction. We might elaborate on some of the below issues in future posts.

Our main goal is to serve good content recommendations to readers on the Internet. The typical situation is a user reading a content page. We want to recommend content for further reading, which is a “good” recommendation.

Read more >