FullContact is a people company and a data company. We believe that businesses can deliver higher quality products through better relationships and 1-1 customized service. Our Identity Resolution products (both batch and API) are what enables that to happen. We have several different APIs but they can be boiled down to two simple use cases:
- Resolve – Who does this identifier connect to?
- Enrich – What other information do you know about that person that can help the business better connect?
Our API is embedded by customers when they want to make real-time decisions about their customers and users: what types of content might this person be interested in, have they talked about our business on social media, etc. These real-time queries and responses are made to our API, api.fullcontact.com. The client passes one or more identifiers (email, social handle, etc.) and a response is returned with several pieces of information that enriches and adds additional information about the person. These enrichment pieces can be things like age, interests, demographics and are grouped together into data packs.
Batch enrichment works much the same way but is more asynchronous and capable of dealing with millions of records all at once. A client of ours will securely ship us a file of identifiers, we process it, match it to our graph and append additional columns of enrichment data as defined by the agreement with the customer and the Data Packs they want to pay for.
In both of these cases, API and batch, we had similar problems that needed to be solved:
- How many calls did a client make to us over a given time period? We needed to find out how many 200s (matches), and 404s they received.
- Out of the data that was returned to them how many of each Data Pack did they receive?
- Based on the Data Packs returned, how much data has the client consumed towards their committed plan?
- Out of the Data Packs returned, how much do we owe our upstream data providers?
While we could store all of the data needed to compute these things in S3 and run large aggregation jobs using something like Hive or Spark, we wanted to do better. Our new usage dashboard requires that we have fast access to this usage data and can aggregate across several dimensions: account ID, data pack, time range, and more. In order to meet all of the above requirements, we built a streaming architecture that unifies usage data from both API and batch onto a Kafka topic and eventually makes it into Druid, an OLAP high-performance system where we can slice and dice the data in nearly any way we want.
Let’s examine the life of a usage event and how it flows through our architecture.
First, from the API side:
- A client with an authenticated FullContact API token makes a call to our API to resolve email
- Our internal API layer processes the request, figures out the result, and returns it to the client.
- As part of this process, an Avro message is emitted to a Kafka usage topic.
- This Avro message is registered in the schema registry.
- Secor (an open source project from Pinterest) is running in our Kubernetes cluster as a stateful set, reads from the Kafka topic and archives the data to S3 in columnar parquet storage.
- AWS Glue runs regular crawlers on this S3 data and registers tables on top of it, making it available for later querying if needed in Athena and other tools.
- A Druid Kafka Indexer supervisor runs on Druid and launches the Peon workers on the data nodes that do the actual ingestion. These jobs read the data, apply basic transformations, and roll up aggregations and store it in the appropriate segments.
From the batch side:
- A client securely delivers their bulk file with IDs to be enriched.
- A result file is created by the data pipeline system.
- A usage summary report file is generated that has summarized statistics for each row in the results file describing how many Data Packs were returned on that row and the source of each Data Pack. This file is persisted in S3.
- When the batch file is approved and delivered back to the customer, a process is run that reads the summary report file and creates standard Avro usage messages and streams them to the same Kafka topic used in the API side
From a user accessing our developer dashboard to view their usage at dashboard.fullcontact.com:
- User logins into the usage dashboard.
- Our web service backend passes their account ID and requested time ranges to our usage middle tier service.
- The usage middle tier service has several canned Druid JSON queries that are filled in and then submitted to the Druid Query Broker.
- Results from the Druid Query Broker are passed back to the client dashboard and rendered in various charts.
Deploying, Running and Maintaining Druid
Like you would with any new technology we went through several different phases of trying out and experimenting with Druid to actually getting it to a state we felt comfortable going with to production.
The Druid project offers a “quick start” configuration that is meant to be run on a small local instance. We found this to be useful to stand up Druid locally on our laptops to try out very simple configurations.
Once we graduated from that and wanted to ingest from a larger Kafka topic we manually configured Druid on a i2.xlarge instance we had unused reservation capacity for. Since this instance is quite a bit larger (has a decent amount more ephemeral disk and more CPU and memory) we modified some of the quickstart JVM parameters to use this additional capacity. In this initial configuration we were only using local deep storage, a local zookeeper and local in memory database for the metastore; all configurations you probably shouldn’t rely on in production.
Before going to production we wanted our infrastructure to have a few properties:
- Be completely automated and immutable, if we want to change something we can spin up new instances.
- Be able to replace or scale a single component without losing data.
- Be monitored and have automated alerts in case of error conditions.
We went with the common deployment approach of classifying the various Druid subprocesses into three different types of nodes:
- Coordinator (port 8081)
- Overlord (port 8090)
- Broker (port 8082)
- Router (port 8088)
- Historical (port 8083)
- Middle Manager (port 8091)
The three different kinds of nodes are described using ansible playbooks and roles. We bake a single Druid AMI that contains the configuration and JVMs that can support any one of the above three nodes then supply user data variables that the systemd services use to decide which components to launch.
We configured a RDS Mysql database for the metastore and a completely external and highly available 3 node zookeeper cluster to coordinate. Externalizing these components and adding S3 deep storage means we can completely replace any of the nodes above without losing any data.
For monitoring our shiny new Druid instance we setup the following prometheus components:
- Prometheus Node Exporter
- provides common system metrics like CPU, Memory, Network, disk usage
- Prometheus Druid Exporter
- exports druid specific like number of queries, query durations, failed queries and missing segments
We have a simple dashboard in grafana that will show us some of the important Druid metrics that are getting scraped to prometheus:
For alerting, we are using prometheus-alert manager to alert us when the missing segments count (the number of segments that the historical process has been unable to register) climbs above some specified value:
Once we have this Druid cluster setup we can submit our ingestion spec to Druid that defines how to ingest the stream from Kafka. In our initial setup of of storing segments per day with no roll ups we were storing about 2GB per day. Once we started rolling up similar events occurring during the same minute using HLLSketchBuild we were able to drop this down to around 150MB. Given this fairly low data footprint and the possibility for us to define more aggressive rollup windows once data ages we feel confident we will be able to store all the usage data we need at a relatively low cost.
Why Avro, why Schema registry?
FullContact historically uses Google Protobuf wrapped in a few extra layers of serialization for passing messages around on Kafka topics. Early on in the usage system design we decided to go with Avro. Why Avro?
One of the out of the box tools you can get with the open source Confluent Kafka distribution is the Schema registry. The schema registry is a lightweight REST service that can be run on the Kafka brokers that will keep track of the schema definition that each topic should use. Several Kafka clients and other third party tools like Secor and Druid integrate nicely with the schema registry. Going this route let us avoid writing any custom Ser/De logic.
Druid Migration v0.14 to v0.15
We initially started the 3 node Druid cluster with v14 which was soon outdated by v15. We tried to predict the steps for migration so that we could accomplish migration and keep our existing data with no downtime to the cluster. We decided to bring up a v15 cluster in parallel and migrate the existing data to the new cluster. One of the major changes to Druid v15 is in the way how the segment information is stored.
Prior to v15 there are two places where the segment information is stored:
- As descriptor.json in deep storage
- As loadspec in Metadata store
With v15, Druid now stores this information only in Metadata store. This lead to the deprecation of the tool `insert-segments-to-db-tool`. This brought a challenge with manual migration of metadata since we could no longer follow the guides we were able to find online.
To work around this we created a new Druid Cluster with S3 deep storage and metadata store. Then we copied the old deep storage and metastore to the new instances. Now the metastore contains the loadspec for each segment that point to old deep storage.
After examining the metadata for the segments you can see it is all pointing to the old deep storage location. In order to update the segment metadata to point to the new deep storage location we created a simple python script to scan through the database and update the records:
db = MySQLdb.connect(host="druid-**.qpcxrads.us-east-1.rds.amazonaws.com", # your host, usually localhost
user="druid", # your username
passwd="**", # your password
db="druid") # name of the database
# you must create a Cursor object. It will let
# you execute all the queries you need
cur = db.cursor()
# Use all the SQL you like
cur.execute("SELECT * FROM druid_segments")
# print all the first cell of all the rows
for row in cur.fetchall():
if loadSpec['bucket']=='druid-deepstorage-data': #Old deep storage
loadSpec['bucket']='druid-deepstorage-0-15-data' #New Deep Storage
sql = "UPDATE druid_segments SET payload = %s WHERE id = %s"
val = (json.dumps(payload), row)
Once this change is made, starting the Druid Master, Data and Query servers will bring up the cluster running with old data. Since this migration of v14 to v15 we have experimented with a few more data migration scenarios with Druid during our most recent Hack Week:
- Standing up a secondary “sandbox cluster” that shares the same deep storage as prod for all historical data and its own deep storage for new data
- Ingesting batch data from Parquet (work in progress)
Summary and Next Steps
Learning Druid has been a fun and informative process. This new journey feels similar to back in the day when I was learning the basics of Hadoop. So much to learn and so much potential with the technology. As you have probably noticed from our writeup the scale of our Druid cluster is fairly small (there are some descriptions of larger organizations running 1000+ node clusters) and we are only using some of the most basic features. We are excited to explore all of the additional features of Druid that can help give us even more real-time streaming insight to our data.
FullContact is hiring! We have several engineering roles open on our careers page. One of those is on the Foundation Integrations team (the same one that is working on the systems described in this blog). So if you want to join a fast moving team working on Druid, JVM microservices (Clojure and Java) all deployed on AWS apply now and shoot me a note at email@example.com.