The fundamental definition of consensus already requires that some proposal will eventually (given sufficient communicating, non-faulty, non-malicious nodes) win, so that doesn't seem particularly novel: https://lamport.azurewebsites.net/pubs/lower-bound.pdf
The core problem here that it's impossible for any replica to tell whether its speculatively-executed operations were legal or not; it might have to admit "sorry, I lied to you", and back up at any point. That seems to be what they mean by "partial progress". You can guess that things might happen, but you will often be wrong.
This idea's been around for a while--Fekete et al outlined speculative execution at local replicas with eventual convergence on a Serializable history in their 1996 PODC paper "Eventually Serializable Data Services": https://groups.csail.mit.edu/tds/papers/Lynch/podc96-esds.pd.... These systems have essentially two tiers of operations: weak operations, which the system might re-order (which could cause totally different results, including turning successes to failures and vice-versa!), and strong operations, which are Serializable. I assume Cassandra does the same. Assuming it's correct, the strong operations work like any other consensus system: they must block (or abort) when there isn't sufficient communication with other replicas. The weak ones might give arbitrarily weird results. In CAP's terms, the strong operations are CP, the weak ones are AP.
You see similar dynamics at play in probabilistic consensus systems, like Bitcoin, by the way. In Bitcoin technically all operations are weak ones, but the probability of non-Serializable outcomes should decrease quickly over time.
Having a consensus system that merges conflicting proposals is a nice idea, but I don't think Cassandra is novel here either. I don't have a citation handy, but I recall a conversation with Heidi Howard at HPTS (maybe 2017?) where she explained that one of the advantages of leaderless Paxos is that when you're building a replicated state machine, you can treat what would normally be conflicting proposals from multiple replicas as sets of transitions. Instead of rejecting all but one proposal, you can union them, then apply them to the state machine in some (deterministic) order--say, by lexicographically sorting the proposals.
Kafka actually does call these transactions! However (and this is a loooong discussion I can't really dig into right now) there's sort of two ways to look at "exactly once". One is in the sense that DB transactions are "exactly once"; a transaction's effects shouldn't be duplicated or lost. But in another sense "exactly once" is a sort of dataflow graph property that relates messages across topic-partitions. That's a little more akin to ACID "consistency".
You can use transactions to get to that dataflow property, in the same sort of way that Serializable transaction systems guarantee certain kinds of domain-level consistency. For example, Serializability guarantees that any invariant preserved by a set of transactions, considered purely in isolation, is also preserved by concurrent histories of those transactions. I think you can argue Kafka intends to reach "exactly-once semantics" through transactions in that way.
Both are true, but we use "transactions" for clarity, since the semantics of consumers outside transactions is even murkier. Every read in this workload takes place in the context of a transaction, and goes through the transactional offset commit path.
Ah, got it; I was assuming that “transactions” was referring to the transactions mentioned as the subject of the previous sentence, not the transactions active in consumers observing those. My mistake!
If you are looking for fun targets, may I suggest KubeMQ too? Its author claims that it’s better than Kafka, Redis and RabbitMQ. It’s also "kubernetes native" but the open source version refuses to start if it detects kubernetes.
Not that I'm a Kafka user, but I greatly appreciate your posts, so thank you :)
Maybe Kafka users should do a crowdfund for it if the companies aren't willing. Realistically, what would the goal of the crowdfund have to be for you to consider it?
I don’t think ayphr would disagree with me when I say that FDB’s testing regime is the gold standard and Jepsen is trying to get there, not the other way around.
I'm not sure. I've worked on a few projects now which employed simulation testing and passed, only to discover serious bugs using Jepsen. State space exploration and oracle design are hard problems, and I'm not convinced there's a single, ideal path for DB testing that subsumes all others. I prefer more of a "complete breakfast" approach.
On another axis: Jepsen isn't "trying to get there [to FDB's testing]" because Jepsen and FDB's tests are solving different problems. Jepsen exists to test arbitrary, third-party databases without their cooperation, or even access to the source. FoundationDB's test suite is designed to test FoundationDB, and they have political and engineering buy-in to design the database from the ground up to cooperate with a deterministic (and, I suspect, protocol-aware) simulation framework.
To some extent Antithesis may be able to bridge the gap by rendering arbitrary distributed binaries deterministic. Something I'd like to explore!
Has your opinion changed on that in the last few years? I could have sworn you were on record as saying this about foundation in the past but I couldn’t find it in my links.
I don't think so, but I've said a lot about databases in the last fifteen years haha.
Sometimes I look at what people say about FDB and it feels like... folks are putting words in my mouth that I don't recognize. I was very impressed by a short phone conversation with their engineers ~12 years ago. That's good, but that's not, like, a substantive experimental evaluation. That's "I focus my unpaid efforts on databases which seem more likely to yield fun, interesting results".
Hey mate, think we interacted briefly on the Confluent Slack while you were working on this, something about outstanding TXes potentially interfering with consumption in the same process IIRC?
This isn't the first time you've discussed how parlous the Kafka tx spec is - not that that's even really a spec as such. I think this came up in your Redpanda analysis.
(And totally agree with you btw, some of the worst ever customer Kafka issues I dealt with at RH involved transactions.)
So was wondering what your ideal spec would look like, because I'd be interested in trying to capture the tx semantics in something like TLA+ as a learning experience - and because it would only help FOSS Kafka and FOSS clients improve, especially now that Confluent has withdrawn so much from Apache Kafka development.
I'm not really sure how to answer this question, but even a few chapters worth of clear prose would go a long way. We lay out a bunch of questions in the discussion section that would be really helpful in firming up intended txn semantics.
It is a little surprising, and I agree, the docs here are not doing a particularly good job of explaining it. It might help to ask: if you don't explicitly commit, how does Kafka know when you've processed the messages it gave you? It doesn't! It assumes any message it hands you is instantaneously processed.
Auto-commit is a bit like handing someone an ice cream cone, then immediately walking away and assuming they ate it. Sometimes people drop their ice cream immediately after you hand it to them, and never get a bite.
Information on the internet about this seems unreliable, confusing and contradictory... It's crazy for something so critical, especially when it's enabled by default.
Note: Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that you must consume all data returned from each call to poll(Duration) before any subsequent calls, or before closing the consumer.
E.g. the following commits every 10s - on each call to `poll`, it doesn't automagically commit every 5 s.
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "5000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Thread.sleep(10_000);
}
Just a note: I am not claiming it is working correctly, only saying there is a clear and documented way how the client knows when to commit, and that it works as expected in a simple scenario.
> if you don't explicitly commit, how does Kafka know when you've processed the messages it gave you?
I did expect that auto-commit still involved an explicit commit. I expected that it meant that the consumer side would commit _after_ processing a message/batch _if_ it had been >= autocommit_interval since the last commit. In other words, that it was a functionality baked into the Kafka client library (which does know when a message has been processed by the application). I don't know if it really makes sense, I never really thought hard about it before!
I'm still a bit skeptical... I'm pretty sure (although not positive) that I've seen consumers with autocommit being stuck because of timeouts that were much greater than the autocommit interval, and yet retrying the same message in a loop
Auto commit has always seemed super shady. Manual commit I have assumed is safe though - something something vector clocks - and it’d be really interesting to know if that trust is misplaced.
What is the process and cost for having you do a Jepsen test for something like that?
For this test, you can do it on pretty much any reasonable Linux machine. Longer histories can churn through more CPU and RAM--some of the more aggressive tests I ran for this work involved 20 GB heaps and 50 cores--but you can tune all that lower.
I think we're probably getting at the same thing, but I do want to clarify a bit. A Strict Serializable history, like a Serializable one, requires equivalence to a total order of transactions. That's clearly not true for etcd+jetcd: no possible order of transactions can allow (e.g.) a transaction to read from its own future. It's totally fine to submit non-idempotent transactions against a Serializable system: systems which actually provide Serializable will execute known-committed transactions exactly once.
Plenty of other databases pass this test; etcd+jetcd does not. This system is simply not Serializable.
Maybe what I should have said is "you can't just retry transactions against a strict serializable database and expect to still get strict serializability (from the applications's perspective)". This is true of distributed system APIs more generally, too.
The core problem here that it's impossible for any replica to tell whether its speculatively-executed operations were legal or not; it might have to admit "sorry, I lied to you", and back up at any point. That seems to be what they mean by "partial progress". You can guess that things might happen, but you will often be wrong.
This idea's been around for a while--Fekete et al outlined speculative execution at local replicas with eventual convergence on a Serializable history in their 1996 PODC paper "Eventually Serializable Data Services": https://groups.csail.mit.edu/tds/papers/Lynch/podc96-esds.pd.... These systems have essentially two tiers of operations: weak operations, which the system might re-order (which could cause totally different results, including turning successes to failures and vice-versa!), and strong operations, which are Serializable. I assume Cassandra does the same. Assuming it's correct, the strong operations work like any other consensus system: they must block (or abort) when there isn't sufficient communication with other replicas. The weak ones might give arbitrarily weird results. In CAP's terms, the strong operations are CP, the weak ones are AP.
You see similar dynamics at play in probabilistic consensus systems, like Bitcoin, by the way. In Bitcoin technically all operations are weak ones, but the probability of non-Serializable outcomes should decrease quickly over time.
Having a consensus system that merges conflicting proposals is a nice idea, but I don't think Cassandra is novel here either. I don't have a citation handy, but I recall a conversation with Heidi Howard at HPTS (maybe 2017?) where she explained that one of the advantages of leaderless Paxos is that when you're building a replicated state machine, you can treat what would normally be conflicting proposals from multiple replicas as sets of transitions. Instead of rejecting all but one proposal, you can union them, then apply them to the state machine in some (deterministic) order--say, by lexicographically sorting the proposals.
reply