Category: Backend

Migrating Elephants – How To Migrate Petabyte Scale Hadoop Clusters With Zero Downtime

 

Outbrain has been an early adopter of Hadoop and we, the team operating it, have acquired a lot of experience running it in production in terms of data ingestion, processing, monitoring, upgrading etc. This also means that we have a significant ecosystem around each cluster, with both open source and in-house systems.

 

A while back we decided to upgrade both the hardware and software versions of our Hadoop clusters.

“Why is that a big problem?” you might ask, so let me explain a bit about our current Hadoop architecture. We have two clusters of 300 machines in two different data centers, production and DR. Each cluster has a total dataset size of 1.5 PB with 5TB of compressed data loaded into it each day. There are ~10,000 job executions daily of about 1200 job definitions that were written by dozens of developers, data scientists and various other stakeholders within the company, spread across multiple teams around the globe. These jobs do everything from moving data into Hadoop (for ex. Sqoop or Mysql to Hive data loads), processing in Hadoop (for ex. running Hive, Scalding or Pig jobs), and pushing the results into external data stores (for ex. Vertica, Cassandra, Mysql etc.). An additional dimension of complexity originates from the dynamic nature of the system since developers, data scientists and researchers are pushing dozens of changes to how flows behave in production on a daily basis.

This system needed be migrated to run on new hardware, using new versions of multiple components of the Hadoop ecosystem, without impacting production processes and active users. A partial list of the components and technologies that are currently being used and should be taken into consideration is HDFS, Map-Reduce, Hive, Pig, Scalding and Sqoop. On top of that, of course, we have several more in-house services for data delivery, monitoring and retention that we have developed.

I’m sure you’ll agree that this is quite an elephant.

Storming Our Brains

We sat down with our users, and started thinking about a process to achieve this goal and quickly arrived at several guidelines that our selected process should abide by:

  1. Both Hadoop clusters (production and DR) should always be kept fully operational
  2. The migration process must be reversible
  3. Both value and risk should be incremental

After scratching our heads for quite a while, we came up with these options:

 

  1. In place: In place migration of the existing cluster to new version and then rolling the hardware upgrade by gradually pushing new machines into the cluster and removing the old machines. This is the simplest approach and you should probably have a very good reason to choose a different path if you can afford the risk. However since upgrading the system in place would expose clients to a huge change in an uncontrolled manner and is not by any means an easily reversible process we had to forego this option.
  2. Flipping the switch: The second option is to create a new cluster on new hardware, sync the required data, stop processing on the old cluster and move it to the new one. The problem here is that we still couldn’t manage the risk, because we would be stopping all processing and moving it to the new cluster. We wouldn’t know if the new cluster can handle the load or if each flow’s code is compatible with the new component’s version. As a matter of fact, there are a lot of unknowns that made it clear we had to split the problem into smaller pieces. The difficulty with splitting in this approach is that once you move a subset of the processing from the old cluster to the new, these results will no longer be accessible on the old cluster. This means that we would have had to migrate all dependencies of that initial subset. Since we have 1200 flow definitions with marvelous and beautiful interconnections between them, the task of splitting them would not have been practical and very quickly we found that we would have to migrate all flows together.
  3. Side by side execution: The 3rd option is to start processing on the new cluster without stopping the old cluster. This is a sort of an active-active approach, because both Hadoop clusters, new and old, will contain the processing results. This would allow us to migrate parts of the workload without risking interfering with any working pipeline in the old cluster. Sounds good, right?


  

First Steps

To better understand the chosen solution let’s take a look at our current architecture:

We have a framework that allows applications to push raw event data into multiple Hadoop clusters. For the sake of simplicity the diagram describes only one cluster. 

Once the data reaches Hadoop, processing begins to take place using a framework for orchestrating data flows we’ve developed in house that we like to call the Workflow Engine.

Each Workflow Engine belongs to a different business group. That Workflow Engine is responsible for triggering and orchestrating the execution of all flows developed and owned by that group. Each job execution can trigger more jobs on its current Workflow Engine or trigger jobs in other business groups’ Workflow Engines. We use this partitioning mainly for management and scale reasons but during the planning of the migration it provided us with a natural way to partition the workload, since there are very few dependencies between groups vs within each group.

 

Now that you have a better understanding of the existing layout you can see that the first step is to install a new Hadoop cluster with all required components of its ecosystem and begin pushing data into it.

To achieve this, we configured our dynamic data delivery pipeline system to send all events to the new cluster as well as the old, so now we have a new cluster with a fully operational data delivery pipeline:

 

Side by Side

Let’s think a bit about what options we had for running a side by side processing architecture.

We could use the same set of Workflow Engines to execute their jobs on both clusters, active and new. While this method would have the upside of saving machines and lower operational costs it would potentially double the load on each machine since jobs are assigned to machines in a static manner. This is due to the fact that each Workflow Engine is assigned a business group and all jobs that belong to this group are executed from it. To isolate the current production jobs execution from the ones for the new cluster we decided to allocate independent machines for the new cluster.

 

Let the Processing Commence!

Now that we have a fully operational Hadoop cluster running alongside our production cluster, and we now have raw data delivered into it, you might be tempted to say: “Great! Bring up a set of Workflow Engines and let’s start side by side processing!”.

 

Well… not really.

 

Since there are so many jobs and they doing varied types of operations we can’t really assume that letting them run side by side is a good idea. For instance, if a job calculates some results and then pushes them to MySql, these results will be pushed twice. Aside from doubling the load on the data bases for no good reason it may cause in some cases corruption or inconsistencies of the data due to race conditions. In essence, every job that writes to an external datasource should be allowed to run only once.

 

So we’ve described two types of execution modes a WorkflowEngine can have:

Leader: Run all the jobs!

Secondary: Run all jobs except those that might have a side effect external to that Hadoop cluster (e.g. write to external database or trigger an applicative service). This will be done automatically by the framework thus preventing any effort from the development teams.

 

When a Workflow Engine is in secondary mode, jobs executed from it can read from any source, but write only to a specific Hadoop cluster. That way they are essentially filling it up  and syncing (to a degree) with the other cluster.

 

Lets Do This…

Phase 1 of the migration should look something like this:

 

Notice that I’ve only included a Workflow Engine for one group in the diagram for simplicity but it will look similar for all other groups.

So the idea is to bring up a new Workflow Engine and give it the role of a migration secondary. This way it will run all jobs except for those writing to external data stores, thus eliminating all side effects external to the new Hadoop cluster.

By doing so, we were able to achieve multiple goals:

  1. Test basic software integration with the new Hadoop cluster version and all services of the ecosystem (hive, pig, scalding, etc.)
  2. Test new cluster’s hardware and performance compared to the currently active cluster
  3. Safely upgrade each business group’s Workflow Engine separately without impacting other groups.

 

Since the new cluster is running on new hardware and with a new version of Hadoop ecosystem, this is a huge milestone towards validating our new architecture. The fact the we managed to do so without risking any downtime that could have resulted from failing processing flows, wrong cluster configurations or any other potential issue was key in achieving our migration goals. 

 

Once we were confident that all phase 1 jobs were operating properly on the new cluster we could continue to phase 2 in which a migration leader becomes secondary and the secondary becomes a leader. Like this:

 

In this phase all jobs will begin running from the new Workflow Engine impacting all production systems, while the old Workflow Engine will only run jobs that create data to the old cluster. This method actually offers a fairly easy way to rollback to the old cluster in case of any serious failure (even after a few days or weeks) since all intermediate data will continue to be available on the old cluster.

 

The Overall Plan

The overall process is to push all Workflow Engines to phase 1 and then test and stabilize the system. We were able to run 70% (!) of our jobs in this phase. That’s 70% of our code, 70% of our integrations and APIs and at least 70% of the problems you would experience in a real live move. We were able to fix issues, analyze system performance and validate results. Only once everything seems to be working properly we can start pushing the groups to phase 2 one by one into a tested, stable new cluster.

Once again we benefit from the incremental nature of the process. Each business group can be pushed into phase 2 independently of other groups thus reducing risk and increasing our ability to debug and analyze issues. Additionally, each business group can start leveraging the new cluster’s capabilities (e.g. features from newer version, or improved performance) immediately after they have moved to phase 2 and not after we have migrated every one of the ~1200 jobs to run on the new cluster. One pain point that can’t be ignored is that inter-group dependencies can make this a significantly more complicated feat as you need to bring into consideration the state of multiple groups when migrating.

 

What Did We Achieve?

  1. Incremental Migration – Due to the fact that we had an active – active migration that we could apply on each business group, we benefited in terms of mitigating risk and gaining value from the new system gradually.
  2. Reversible process- since we kept all old workflowEngines (that executed their jobs on the old Hadoop cluster) in a state of secondary execution mode, all intermediate data was still being processed and was available in case we needed to revert groups independently from each other.
  3. Minimal impact on users – Since we defined an automated transition of jobs between secondary and leader modes users, didn’t need to duplicate any of their jobs.

 

What Now?

We have completed the upgrade and migration of our main cluster and have already started the migration of our DR cluster.

There are a lot more details and concerns to bring into account when migrating a production system at this scale. However, the basic abstractions we’ve introduced here, and the capabilities we’ve infused our systems with have equipped us with the tools to migrate elephants.

For more information about this project you can check out the video from Strata 2017 London where I discussed it in more detail.

Effective Testing with Loan Pattern in Scala

Tests are crucial in systems that rely on CI/CD as part of their release cycle. One of the challenges is to write stable tests that work for you without spending a lot of time on maintaining bad tests.

Tests are Hard

They’re hard to write, hard to maintain and it’s even harder to stabilize a flaky test. At Outbrain, we take special pride in our ability (for the most part) to deliver new features to production and doing so with the confidence that only reliable tests can give you. These tests play a crucial role in our ability to deliver fast, good and stable code making sure no regression bugs were introduced in the process. It is crucial then, to not only maintain good test suites (unit tests, integration, and e2e) but also to fix any test that misbehaves (flaky tests).

We have a special environment to facilitate integration and e2e tests called simulation environment (it is only one of the set of tools we have for that purpose). This is a dedicated set of servers which we use to simulate our production environment. We deploy every new version of our services to that environment before we deploy to production, and run tests that check new flows of code, regression, and interoperability to other services.

In order to write an effective test for a new feature, we sometimes need to setup the environment with entities that are required to the feature we’re testing. If, for example, our new feature is to register a car to an owner (a Person entity). Before running the tests we need the required entities, a Car and a Person in our database. We’re not trying to test a flow for creating a new car, or a new person in this scenario. Therefore there is no need in creating the car and/or the person entities explicitly in the test before the actual test scenario happens. And in order to make our tests as clear and succinct as possible — we don’t want to be creating this data explicitly in each and every test.

 

Bad Practices

So, it was a common practice (albeit a bad one) to have pre-existing data on which we would rely on to run tests (for the whole simulation environment!). This led to two big (interconnected) problems:

  1. No test isolation – a test mistakenly deleting some or all of the pre-existing data, for example, would do so for all the tests that run in that environment
  2. Flaky tests – tests running concurrently are creating, deleting and generally changing data that affects others, which in turn would fail tests for no good reason — which makes it really hard to analyze and fix a failing test

 

We’ve tackled this problem by creating the needed data before the tests in a test class and deleting it after the test run. Which mitigated the problem somewhat — not only the tests in the same class were interconnected but also added boilerplate to the test class. Now, a test class looked something like (assuming these are entities autogenerated by Scalike for the relevant tables):

ScalaTest:

class MyTestClass extends WordSpec with BeforeAndAfterAll {

  val person = Person(carId = None).save() // creating data for test
  val car = Car(color = "green", ownerId = None).save()

  override def afterAll(): Unit = {
    // Clean db after test.
    person.destroy()
    car.destroy()
  }

  "My service" should {
    "set the owner of the car" in {
      val service = new Service

      service.setCarOwner(carId = car.id, ownerId = person.id)

      service.getPerson(person.id).carId shouldEqual Some(car.id)
    }
  }
}

Specs2:

class MyTestClass extends SpecificationWithJUnit with AfterAll {
  val person = Person(carId = None).save() // creating data for test
  val car = Car(color = "green", ownerId = None).save()

  override def afterAll(): Unit = {
    // Clean db after test.
    person.destroy()
    car.destroy()
  }

  "My service" should {
    "set the owner of the car" in {
      val service = new Service

      service.setCarOwner(carId = car.id, ownerId = person.id)

      service.getPerson(person.id).carId must be equalTo Some(car.id) 
    }
  }
}

Looking at this, we were presented with a challenge. First, the data is created for all the tests that run in a class, which must be deleted only after all tests have finished running — this means that the tests are not isolated one from another and potentially may become flaky. Second, we wanted an elegant way of creating and deleting the needed entities seamlessly in order to minimize the boilerplate for each test class.

Note: It is possible however, in Specs2, to make a better solution by using the ‘Scope’ trait like so:

trait Context extends Scope with After {
  val person = Person(carId = None).save() // creating data for test
  val car = Car(color = "green", ownerId = None).save()

  override def after: Any = {
    // Clean db after test.
    person.destroy()
    car.destroy()
  }
}

And using it in a test like so:

class MyTestClass extends SpecificationWithJUnit {
  "My service" should {
    "set the owner of the car" in new Context {
      val service = new Service

      service.setCarOwner(carId = car.id, ownerId = person.id)

      service.getPerson(person.id).carId must be equalTo Some(car.id)
    }
  }
}


It’s a good solution, for a simpler problem than we faced. We needed the tests running in a single transaction, with a supplied session and a configurable db name (indicating a set of Scalike connection parameters).

Enter Loan Pattern

We first encountered this pattern when using ScalaTest and quickly moved to using it also in Specs2 (as most of our tests are written in Specs2). From ScalaTest documentation for Sharing fixtures:

“A test fixture is composed of the objects and other artifacts (files, sockets, database connections, etc.) tests use to do their work. When multiple tests need to work with the same fixtures, it is important to try and avoid duplicating the fixture code across those tests.”
“If you need to both pass a fixture object into a test and perform cleanup at the end of the test, you’ll need to use the loan pattern”

Which means, we can use fixtures to set up ‘artifacts’ for the tests to use, promoting the DRY principle by minimizing code duplication. It is also a good way to reduce boilerplate when writing tests. So, we wrote this one simple trait:

ScalaTest:

trait TestDataSupport extends DefaultGenerator {

  def withTestData(testCode: DefaultObjects => Any): Unit = {
    val testData = createTestData()
    try {
      testCode(testData) // "loan" the fixture to the test
    }
    finally clearTestData(testData) // clean up the fixture
  }

  private def createTestData(): DefaultObjects = {
    DefaultObjects.create(name = Random.alphanumeric.take(10).mkString)
  }

  private def clearTestData(testData: DefaultObjects): Unit = {
    testData.cleanup
  }
}

Let’s go over what’s happening in this trait. We’re mixing in a custom trait called ‘DefaultGenerator’ which gives us the ‘DefaultObjects’ which are the entities we need to be pre-created for our tests to run. We have two private methods. One that calls ‘create’ on ‘DefaultObjects’ with a custom name to generate the needed entities. The other calls ‘cleanup’ on the test data to clean the environment after the test has finished running. And the star of this trait, the method (or fixture if you will) ‘withTestData’ which gets the test function as a parameter, calls the private method ‘createTestData’, calls the test and passing it the data we just generated and finally cleans up the generated data after the test finishes.

When mixing this trait in our test class, we get the following code:

class MyTestClass extends WordSpec with TestDataSupport {

  "My service" should {
    "set the owner of the car" in withTestData { testData =>
      val service = new Service

      service.setCarOwner(carId = testData.car.id, ownerId = testData.person.id)

      service.getPerson(testData.person.id).carId shouldEqual Some(testData.car.id) 
    }
  }
}


‘testData’ is the data generated in our ‘withTestData’ method (a car and a person in our case).

The Specs2 version of the Loan Pattern is a bit more complex, as we’ve added some more bells and whistles to make it easier for us to create those entities in our domain. We’re using Scalike to create the entities in MySQL database, and we need a somewhat more refined control over the session we’re using, DB name etc’.

Specs2:

trait DataContextName {
  def className: String
}
trait DataContextDbName {
  val dbName: Symbol = 'default
}
import scalikejdbc.{DBSession, NamedDB}

package object testdata {
  private[testdata] implicit class NamedDbSession(namedDB: NamedDB) {
    def withSession[A](session: DBSession)(execution: DBSession => A): A =
      execution(session)
  }
}
import org.specs2.execute.{AsResult, Result}
import org.specs2.specification.ForEach
import scalikejdbc.{DB, DBSession, NamedDB}

import scala.util.Try

trait DefaultDataContext extends ForEach[DefaultObjects] 
  with DataContextDbName with DataContextName with DefaultGenerator {
  
  implicit lazy val session: DBSession = DB.autoCommitSession()

  override def foreach[R](f: (DefaultObjects) => R)(implicit evidence$3: AsResult[R]): Result =
    NamedDB(dbName).withSession(session) { implicit session: DBSession =>

      val testData = DefaultObjects.create(name = randomName(className))

      val result = Try {
        AsResult(f(testData))
      }

      testData.cleanup(session)
      result.get
    }
}

It’s very similar to the ScalaTest flavor, but with several changes we needed to make to better facilitate our needs in the Specs2 tests. We have a mechanism to initialize a named DB connection, with a named connection pool and an explicit session. Besides these additions, it’s pretty similar to ScalaTest — generate the test data, run the test and clean the generated data.

The test class now looks like this:

trait TestClassDataContext extends DefaultDataContext {
  override val dbName: Symbol = TestClassConnectionPoolName
  def className: String = "Test class name"
}
class MyTestClass extends SpecificationWithJUnit with TestClassDataContext {
  "My service" should {
    "set the owner of the car" >> { testData: DefaultObjects =>
      val service = new Service

      service.setCarOwner(carId = testData.car.id, ownerId = testData.person.id)

      service.getPerson(testData.person.id).carId shouldEqual Some(testData.car.id) 
    }
  }
}

Summary

We tackled several issues our team faced on a day to day basis, which made our simulation environment unstable, hard to maintain and generally very frustrating to work on. By extracting data generation and cleanup to an external trait and using a clever mechanism to reduce boilerplate, we managed to clean and simplify the test class, reduce code duplication and generally made our lives easier. Tests are still hard, but a bit easier to write and nicer to read. What do you think?