Translog Checksums

Table of Contents

Issue

See: https://github.com/elasticsearch/elasticsearch/issues/6554

This file was last exported: 2014-10-21 Tue 16:30

We would like to add transaction log checksums for each entry, so we can tell whether an operation has been corrupted by an external factor (disk, malicious user, mistake, etc).

Currently the format is written like this:

Table 1: Translog with operation format
size content
int32 op size
int32 version
variable operation

Inside, each operation writes something like this (this is the Create operation):

Table 2: Translog.Create operation format
size content comment
byte Operation type id <- all Operations write this
vint serialization format Usually '6' for latest version
string id  
string type  
bytesref source  
optString routing boolean + string
optString parent boolean + string
long version  
long timestamp  
long ttl  
byte version type versioning type, internal or external

Ideas

checksums written by translog itself

Have the transaction handle writing a checksum before writing the operation. Inside of FsTranslog.add, we would write the checksum before each operation automatically. Additional logic would have to be added to make sure that when reading, the translog would handle either a size with no checksum, or a size with a checksum added (additional versioning checks).

Basically I think this could be handled by adding versioned translogs, and writing a special integer 0b11111111_11111111_00000000_00000001; for the version, so we can say anything that matching & 0xff00 is versioned.

Pros

  • written only in a single place
  • no need for operations to even know about checksums

Cons

  • additional versioning checks would be needed
  • tricky to be generic for future version changes

checksums written by individual Operation

Have each Operation write out a checksum, either through the abstract Operation class, or implemented individually.

This would mean incrementing each Operation's internal version from 6 to 7 and adding a method that allowed calculating a checksum for each operation to implement.

It means adding an additional long to each operation, and using CRC32 for the checksum, so after reading all prior values, try and checksum them.

Pros

  • already have a versioning infrastructure (big +)
  • operations could write checksums differently if desired

Cons

  • may have code duplication for multiple Operations

(could be combated by factoring into a helper method)

  • additional version checks
  • would this hamper us from adding fields in the future? It could make it hard to add additional fields, because the checksum would weirdly be in the middle of the operation

Future ideas

In the future, I'd like to have markers between each written translog operation, so that we can seek forward to the next 0xdeadbeef marker if a translog operation is corrupted.

Not that this has to be implemented in this phase, but it would be nice if whatever choice we made was flexible enough to accommodate this is the future.

Decision

Talked with Shay about it, we decided to go with a header for the translog file written at the beginning, then when initially opening a translog file, we can read the presence of a header (or not) and decide how to parse the translog (whether each operation has a checksum or not).

Looking at the code, the translog code is spread all over the place, so I will need to update it to be in a single, consistent place.

The new format is written like this:

Table 3: Translog version 1 format
size content
long64 header
variable operation
int32 prev operation checksum
variable operation
int32 prev operation checksum

With the same operation format as before:

Table 4: Translog version 1 operation
size content
int32 op size
int32 version
variable operation

How the translog works

First, there are currently two different types of translogs in ES, each sharing the FsTranslogFile interface: SimpleFsTranslogFile and BufferedFsTranslogFile.

It's not as important to understand the difference between them, the real difference is that the "Simple" one writes to the internal Channel immediately while the "Buffered" one writes to a byte array buffer which is flushed to the channel either during a sync() call or when the buffer size is exceeded.

Translog operations when a node is started up

See: LocalIndexShardGateway.java

In the recover method is where all the Magicâ„¢ happens.

  • read the SegmentInfos for the index

The segment info should contain the translog id, which is used to tell what the filename of the translog is, because translogs are named translog-<id>.

If there's no translog file, we're done with recovery, because we've already recovered all the segments for this shard!

If there is a translog file, we do a few things:

Using the ID of the translog, we try to rename it to translog-<id>.recovering, we try to rename it 3 times. If for some reason we can't rename it, we continue recovery and ignore the translog.

Now that the recovery translog has been renamed, we start a loop that reads a Translog.Operation operation from the translog, tries to perform the Operation, and repeats. At this point we expect either one of 4 things:

  • EOFException: translog file wasn't properly written
  • IOException: translog file wasn't properly written
  • A RestStatus.BAD_REQUEST from applying the operation
  • We successfully go through all operations and the look finishes

For the first 3, we assume that something happened to corrupt the translog, like ES was killed in the middle of writing it, or an fsync() didn't happen in time, etc, so they are ignored.

After this, if there were no other unexpected Exceptions, we delete the recovery translog.

Translog operations when recovering a replica from a primary

Shay: when a recover starts from a primary, ES flush is no longer allowed, to make sure we record all the changes that happen, it tracks where in the translog it is, first phase recovery is moving over segments, then, once its done, a reply of all the translog operations that happened when the first phase started is done, and then a 3rd catch up phase (today under a lock, should be able to remove it) during the recovery phase, we make sure to replicate data to initializing replicas as well, to make sure we don't miss anything, and rely on versioning (or just shard not ready) to resolve conflicts

The "transient" translog

Shay: the transient translog is a feature from another story :)

Shay: when we do a Lucene commit, we can't have an atomic operation of Lcuene commit + start new translog, well, we could, but then we would block all operations during that time

Shay: so, we create a transient translog before the commit starts, and play the operations to both "orig" one and "transient" one during the Lucene commit, without the need to block, once the commit is done, we move the transient to be the "real" translog, and the old one gets deleted

See: InternalEngine.java

There are 3 different types of flushes:

  • NEW_WRITER

A flush that causes a new writer to be created. Also creates a new translog and causes the old one to be deleted. Does not create a transient translog in the meantime.

This is used when updating the indexWriter settings, like: index.index_concurrency, index.codec, index.fail_on_merge_failure, and index.codec.bloom.load.

  • COMMIT_TRANSLOG

A flush that does a Lucene commit as well as clearing the translog. This creates the transient translog that Shay described above, making it the current translog with translog.makeTransientCurrent() after the commit.

This is the default if you issue a flush request with no options.

  • COMMIT

A flush that just does a Lucene commit, no translog cleanup. This doesn't create a new translog, so when the index is opened (recovery or _open), the translog will replay against an index that already has some of the actions. That's okay though, because that's handled and not a bad thing in the recovery.

Used when updating the indexing buffer size and generating snapshots of an index.

Translog operations during a "realtime" Get

See: InternalEngine.java

When doing a Get, the operation can be marked as realtime (the default), if this is marked, we check the versionMap to see if the operation has been committed to a segment yet. If the document is not in the versionMap, it means it's already in a segment, so nothing to do with the translog!

If the uid is in the versionMap, it means the document's _source is in the translog also, so we use the Translog.Location from the version map to read the Translog.Source directly from the translog (if the user asked for the source, if they did not, we skip retrieving it from the translog).

If there was an error retrieving it from the translog, it means it must have been flushed to a segment, so we can get it from the reader instead.

Translog operations during indexing

Translog interface in 1.4

What do we actually need in a translog interface:

LocalIndexShardGateway -> TranslogStream : translogStreamFor(File)
LocalIndexShardGateway -> TranslogStream : read(StreamInput)

RecoveryTranslogOperationsRequest -> TranslogStream : read(StreamInput)
RecoveryTranslogOperationsRequest -> TranslogStream : write(StreamInput)

InternalEngine -> Translog : newTranslog(long)

translog-interface-1.4-1.png

New Translog service design

The current TranslogStreams interface is a bit strange because it treats translogs as both stateful and stateless at the same time. On one hand it's stateful in that it provides openers for things like translogStreamFor, but it also dispatches to other static methods using the readTranslogOperation and writeTranslogOperation methods.

What is needed is a unified, better interface for translogs, these should be part of the TranslogService. TranslogService should also be the one providing FsChannelSnapshot instances.

interface TranslogStream

TranslogStream <|-- LegacyTranslogStream
TranslogStream <|-- ChecksummedTranslogStream

ChecksummedTranslogStream <|-- GreedyChecksummedTranslogStream

TranslogSnapshot <|-- FsChannelSnapshot

class TranslogService {
  +TranslogStream openTranslogStream(File)
}

class TranslogStream {
  Translog.Operation read(StreamInput)
  void write(StreamOutput, Translog.Operation)
}

class LegacyTranslogStream {
  Translog.Operation read(StreamInput)
  void write(StreamOutput, Translog.Operation)
}

class ChecksummedTranslogStream {
  Translog.Operation read(StreamInput)
  void write(StreamOutput, Translog.Operation)
}

class GreedyTranslogStream {
  Translog.Operation read(StreamInput)
  void write(StreamOutput, Translog.Operation)
}

class Snapshot {
  long translogId()
  long position()
  int estimatedNumberOfOperations()
  Translog.Operation next()
  void seekTo(long)
  long lengthInBytes()
}

translog-checksums-uml-1.png

Problems

  • How to create an FsChannelSnapshot in the TranslogService

Creating a V1 translog

DELETE /test
{}

POST /test
{
  "settings": {
    "index": {
      "number_of_shards": 1,
      "refresh_interval": -1,
      "index.translog.interval": "100m",
      "index.gateway.local.sync": "1s"
    }
  }
}

POST /test/doc/
{"body": "foo"}
{"acknowledged":true}
{"acknowledged":true}
{"_index":"test","_type":"doc","_id":"AUh-8RMht62h-S9lNjb0","_version":1,"created":true}

Author: Lee Hinman

Created: 2014-10-21 Tue 16:30

Emacs 24.4.1 (Org mode 8.2.7c)

Validate