Translog Checksums
Table of Contents
Issue
See: https://github.com/elasticsearch/elasticsearch/issues/6554
This file was last exported: 2014-10-21 Tue 16:30
We would like to add transaction log checksums for each entry, so we can tell whether an operation has been corrupted by an external factor (disk, malicious user, mistake, etc).
Currently the format is written like this:
size | content |
---|---|
int32 | op size |
int32 | version |
variable | operation |
Inside, each operation writes something like this (this is the Create
operation):
size | content | comment |
---|---|---|
byte | Operation type id | <- all Operations write this |
vint | serialization format | Usually '6' for latest version |
string | id | |
string | type | |
bytesref | source | |
optString | routing | boolean + string |
optString | parent | boolean + string |
long | version | |
long | timestamp | |
long | ttl | |
byte | version type | versioning type, internal or external |
Ideas
checksums written by translog itself
Have the transaction handle writing a checksum before writing the operation.
Inside of FsTranslog.add
, we would write the checksum before each operation
automatically. Additional logic would have to be added to make sure that when
reading, the translog would handle either a size with no checksum, or a size
with a checksum added (additional versioning checks).
Basically I think this could be handled by adding versioned translogs, and
writing a special integer 0b11111111_11111111_00000000_00000001;
for the
version, so we can say anything that matching & 0xff00
is versioned.
Pros
- written only in a single place
- no need for operations to even know about checksums
Cons
- additional versioning checks would be needed
- tricky to be generic for future version changes
checksums written by individual Operation
Have each Operation write out a checksum, either through the abstract Operation class, or implemented individually.
This would mean incrementing each Operation's internal version from 6 to 7 and adding a method that allowed calculating a checksum for each operation to implement.
It means adding an additional long
to each operation, and using CRC32
for
the checksum, so after reading all prior values, try and checksum them.
Pros
- already have a versioning infrastructure (big +)
- operations could write checksums differently if desired
Cons
- may have code duplication for multiple Operations
(could be combated by factoring into a helper method)
- additional version checks
- would this hamper us from adding fields in the future? It could make it hard to add additional fields, because the checksum would weirdly be in the middle of the operation
Future ideas
In the future, I'd like to have markers between each written translog operation,
so that we can seek forward to the next 0xdeadbeef
marker if a translog
operation is corrupted.
Not that this has to be implemented in this phase, but it would be nice if whatever choice we made was flexible enough to accommodate this is the future.
Decision
Talked with Shay about it, we decided to go with a header for the translog file written at the beginning, then when initially opening a translog file, we can read the presence of a header (or not) and decide how to parse the translog (whether each operation has a checksum or not).
Looking at the code, the translog code is spread all over the place, so I will need to update it to be in a single, consistent place.
The new format is written like this:
size | content |
---|---|
long64 | header |
variable | operation |
int32 | prev operation checksum |
variable | operation |
int32 | prev operation checksum |
… | … |
With the same operation format as before:
size | content |
---|---|
int32 | op size |
int32 | version |
variable | operation |
How the translog works
First, there are currently two different types of translogs in ES, each sharing
the FsTranslogFile
interface: SimpleFsTranslogFile
and
BufferedFsTranslogFile
.
It's not as important to understand the difference between them, the real
difference is that the "Simple" one writes to the internal Channel immediately
while the "Buffered" one writes to a byte array buffer which is flushed to the
channel either during a sync()
call or when the buffer size is exceeded.
Translog operations when a node is started up
See: LocalIndexShardGateway.java
In the recover
method is where all the Magicâ„¢ happens.
- read the
SegmentInfos
for the index
The segment info should contain the translog id, which is used to tell what the
filename of the translog is, because translogs are named translog-<id>
.
If there's no translog file, we're done with recovery, because we've already recovered all the segments for this shard!
If there is a translog file, we do a few things:
Using the ID of the translog, we try to rename it to translog-<id>.recovering
,
we try to rename it 3 times. If for some reason we can't rename it, we continue
recovery and ignore the translog.
Now that the recovery translog has been renamed, we start a loop that reads a
Translog.Operation
operation from the translog, tries to perform the
Operation, and repeats. At this point we expect either one of 4 things:
EOFException
: translog file wasn't properly writtenIOException
: translog file wasn't properly written- A
RestStatus.BAD_REQUEST
from applying the operation - We successfully go through all operations and the look finishes
For the first 3, we assume that something happened to corrupt the translog, like
ES was killed in the middle of writing it, or an fsync()
didn't happen in
time, etc, so they are ignored.
After this, if there were no other unexpected Exceptions, we delete the recovery translog.
Translog operations when recovering a replica from a primary
Shay: when a recover starts from a primary, ES flush is no longer allowed, to make sure we record all the changes that happen, it tracks where in the translog it is, first phase recovery is moving over segments, then, once its done, a reply of all the translog operations that happened when the first phase started is done, and then a 3rd catch up phase (today under a lock, should be able to remove it) during the recovery phase, we make sure to replicate data to initializing replicas as well, to make sure we don't miss anything, and rely on versioning (or just shard not ready) to resolve conflicts
The "transient" translog
Shay: the transient translog is a feature from another story :)
Shay: when we do a Lucene commit, we can't have an atomic operation of Lcuene commit + start new translog, well, we could, but then we would block all operations during that time
Shay: so, we create a transient translog before the commit starts, and play the operations to both "orig" one and "transient" one during the Lucene commit, without the need to block, once the commit is done, we move the transient to be the "real" translog, and the old one gets deleted
See: InternalEngine.java
There are 3 different types of flushes:
NEW_WRITER
A flush that causes a new writer to be created. Also creates a new translog and causes the old one to be deleted. Does not create a transient translog in the meantime.
This is used when updating the indexWriter
settings, like:
index.index_concurrency
, index.codec
, index.fail_on_merge_failure
, and
index.codec.bloom.load
.
COMMIT_TRANSLOG
A flush that does a Lucene commit as well as clearing the translog. This creates
the transient translog that Shay described above, making it the current translog
with translog.makeTransientCurrent()
after the commit.
This is the default if you issue a flush request with no options.
COMMIT
A flush that just does a Lucene commit, no translog cleanup. This doesn't create
a new translog, so when the index is opened (recovery or _open
), the translog
will replay against an index that already has some of the actions. That's okay
though, because that's handled and not a bad thing in the recovery.
Used when updating the indexing buffer size and generating snapshots of an index.
Translog operations during a "realtime" Get
See: InternalEngine.java
When doing a Get
, the operation can be marked as realtime
(the default), if
this is marked, we check the versionMap
to see if the operation has been
committed to a segment yet. If the document is not in the versionMap, it means
it's already in a segment, so nothing to do with the translog!
If the uid
is in the versionMap, it means the document's _source
is in the
translog also, so we use the Translog.Location
from the version map to read
the Translog.Source
directly from the translog (if the user asked for the
source, if they did not, we skip retrieving it from the translog).
If there was an error retrieving it from the translog, it means it must have been flushed to a segment, so we can get it from the reader instead.
Translog operations during indexing
Translog interface in 1.4
What do we actually need in a translog interface:
LocalIndexShardGateway -> TranslogStream : translogStreamFor(File) LocalIndexShardGateway -> TranslogStream : read(StreamInput) RecoveryTranslogOperationsRequest -> TranslogStream : read(StreamInput) RecoveryTranslogOperationsRequest -> TranslogStream : write(StreamInput) InternalEngine -> Translog : newTranslog(long)
New Translog service design
The current TranslogStreams
interface is a bit strange because it treats
translogs as both stateful and stateless at the same time. On one hand it's
stateful in that it provides openers for things like translogStreamFor
, but
it also dispatches to other static methods using the readTranslogOperation
and
writeTranslogOperation
methods.
What is needed is a unified, better interface for translogs, these should be
part of the TranslogService
. TranslogService
should also be the one
providing FsChannelSnapshot
instances.
interface TranslogStream TranslogStream <|-- LegacyTranslogStream TranslogStream <|-- ChecksummedTranslogStream ChecksummedTranslogStream <|-- GreedyChecksummedTranslogStream TranslogSnapshot <|-- FsChannelSnapshot class TranslogService { +TranslogStream openTranslogStream(File) } class TranslogStream { Translog.Operation read(StreamInput) void write(StreamOutput, Translog.Operation) } class LegacyTranslogStream { Translog.Operation read(StreamInput) void write(StreamOutput, Translog.Operation) } class ChecksummedTranslogStream { Translog.Operation read(StreamInput) void write(StreamOutput, Translog.Operation) } class GreedyTranslogStream { Translog.Operation read(StreamInput) void write(StreamOutput, Translog.Operation) } class Snapshot { long translogId() long position() int estimatedNumberOfOperations() Translog.Operation next() void seekTo(long) long lengthInBytes() }
Problems
- How to create an
FsChannelSnapshot
in the TranslogService
Creating a V1 translog
DELETE /test {} POST /test { "settings": { "index": { "number_of_shards": 1, "refresh_interval": -1, "index.translog.interval": "100m", "index.gateway.local.sync": "1s" } } } POST /test/doc/ {"body": "foo"}
{"acknowledged":true} {"acknowledged":true} {"_index":"test","_type":"doc","_id":"AUh-8RMht62h-S9lNjb0","_version":1,"created":true}