At Netflix, to advertise and recommend the content to users in the most effective possible way there are lots of Media Algorithm teams which work hand in hand with content creators and editors. Several of those algorithms aim to enhance different manual workflows in order that we show the personalized promotional image, trailer or the show to the user.
These media focused machine learning algorithms in addition to other teams generate a whole lot of data from the media files, which we described in our previous blog, are stored as annotations in Marken. We designed a singular concept called Annotation Operations which allows teams to create data pipelines and simply write annotations without worrying about access patterns of their data from different applications.
Lets pick an example use case of identifying objects (like trees, cars etc.) in a video file. As described within the above picture
- Through the first run of the algorithm it identified 500 objects in a selected Video file. These 500 objects were stored as annotations of a particular schema type, let’s say Objects, in Marken.
- The Algorithm team improved their algorithm. Now after we re-ran the algorithm on the identical video file it created 600 annotations of schema type Objects and stored them in our service.
Notice that we cannot update the annotations from previous runs because we don’t know the way many annotations a recent algorithm run will result into. Additionally it is very expensive for us to maintain track of which annotation must be updated.
The goal is that when the patron comes and searches for annotations of type Objects for the given video file then the next should occur.
- Before Algo run 1, in the event that they search they mustn’t find anything.
- After the completion of Algo run 1, the query should find the primary set of 500 annotations.
- Through the time when Algo run 2 was creating the set of 600 annotations, clients search should still return the older 500 annotations.
- When all the 600 annotations are successfully created, they need to replace the older set of 500.
- So now when clients search annotations for Objects then they need to get 600 annotations.
Does this remind you of something? This seems very similar (not exactly same) to a distributed transaction.
Typically, an algorithm run can have 2k-5k annotations. There are lots of naive solutions possible for this problem for instance:
- Write different runs in several databases. This is clearly very expensive.
- Write algo runs into files. But we cannot search or present low latency retrievals from files
- Etc.
As a substitute our challenge was to implement this feature on top of Cassandra and ElasticSearch databases because that’s what Marken uses. The answer which we present on this blog shouldn’t be limited to annotations and could be used for every other domain which uses ES and Cassandra as well.
Marken’s architecture diagram is as follows. We refer the reader to our previous blog article for details. We use Cassandra as a source of truth where we store the annotations while we index annotations in ElasticSearch to offer wealthy search functionalities.
Our goal was to assist teams at Netflix to create data pipelines without fascinated by how that data is out there to the readers or the client teams. Similarly, client teams don’t need to worry about when or how the info is written. That is what we call decoupling producer flows from clients of the info.
Lifecycle of a movie goes through a whole lot of creative stages. Now we have many temporary files that are delivered before we get to the ultimate file of the movie. Similarly, a movie has many alternative languages and every of those languages can have different files delivered. Teams generally wish to run algorithms and create annotations using all those media files.
Since algorithms could be run on a distinct permutations of how the media files are created and delivered we are able to simplify an algorithm run as follows
- Annotation Schema Type — identifies the schema for the annotation generated by the Algorithm.
- Annotation Schema Version — identifies the schema version of the annotation generated by the Algorithm.
- PivotId — a singular string identifier which identifies the file or method which is used to generate the annotations. This may very well be the SHA hash of the file or just the movie Identifier number.
Given above we are able to describe the info model for an annotation operation as follows.
{
"annotationOperationKeys": [
{
"annotationType": "string", ❶
"annotationTypeVersion": “integer”,
"pivotId": "string",
"operationNumber": “integer” ❷
}
],
"id": "UUID",
"operationStatus": "STARTED", ❸
"isActive": true ❹
}
- We already explained AnnotationType, AnnotationTypeVersion and PivotId above.
- OperationNumber is an auto incremented number for every recent operation.
- OperationStatus — An operation goes through three phases, Began, Finished and Canceled.
- IsActive — Whether an operation and its associated annotations are lively and searchable.
As you may see from the info model that the producer of an annotation has to decide on an AnnotationOperationKey which lets them define how they need UPSERT annotations in an AnnotationOperation. Inside, AnnotationOperationKey the essential field is pivotId and the way it’s generated.
Our source of truth for all objects in Marken in Cassandra. To store Annotation Operations we’ve got the next important tables.
- AnnotationOperationById — It stores the AnnotationOperations
- AnnotationIdByAnnotationOperationId — it stores the Ids of all annotations in an operation.
Since Cassandra is NoSql, we’ve got more tables which help us create reverse indices and run admin jobs in order that we are able to scan all annotation operations every time there may be a necessity.
Each annotation in Marken can also be indexed in ElasticSearch for powering various searches. To record the connection between annotation and operation we also index two fields
- annotationOperationId — The ID of the operation to which this annotation belongs
- isAnnotationOperationActive — Whether the operation is in an ACTIVE state.
We offer three APIs to our users. In following sections we describe the APIs and the state management done throughout the APIs.
When this API known as we store the operation with its OperationKey (tuple of annotationType, annotationType Version and pivotId) in our database. This recent operation is marked to be in STARTED state. We store all OperationIDs that are in STARTED state in a distributed cache (EVCache) for fast access during searches.
Users call this API to upsert the annotations in an Operation. They pass annotations together with the OperationID. We store the annotations and likewise record the connection between the annotation IDs and the Operation ID in Cassandra. During this phase operations are in isAnnotationOperationActive = ACTIVE and operationStatus = STARTED state.
Note that typically in a single operation run there could be 2K to 5k annotations which could be created. Clients can call this API from many alternative machines or threads for fast upserts.
Once the annotations have been created in an operation clients call FinishAnnotationOperation which changes following
- Marks the present operation (let’s say with ID2) to be operationStatus = FINISHED and isAnnotationOperationActive=ACTIVE.
- We remove the ID2 from the Memcache because it shouldn’t be in STARTED state.
- Any previous operation (let’s say with ID1) which was ACTIVE is now marked isAnnotationOperationActive=FALSE in Cassandra.
- Finally, we call updateByQuery API in ElasticSearch. This API finds all Elasticsearch documents with ID1 and marks isAnnotationOperationActive=FALSE.
That is the important thing part for our readers. When a client calls our search API we must exclude
- any annotations that are from isAnnotationOperationActive=FALSE operations or
- for which Annotation operations are currently in STARTED state. We try this by excluding the next from all queries in our system.
To attain above
- We add a filter in our ES query to exclude isAnnotationOperationStatus is FALSE.
- We query EVCache to search out out all operations that are in STARTED state. Then we exclude all those annotations with annotationId present in memcache. Using memcache allows us to maintain latencies for our search low (most of our queries are lower than 100ms).
Cassandra is our source of truth so if an error happens we fail the client call. Nevertheless, once we commit to Cassandra we must handle Elasticsearch errors. In our experience, all errors have happened when the Elasticsearch database is having some issue. Within the above case, we created a retry logic for updateByQuery calls to ElasticSearch. If the decision fails we push a message to SQS so we are able to retry in an automatic fashion after some interval.
In near term, we wish to put in writing a high level abstraction single API which could be called by our clients as an alternative of calling three APIs. For instance, they’ll store the annotations in a blob storage like S3 and provides us a link to the file as a part of the one API.