monotone

monotone Mtn Source Tree

Root/refiner.cc

1// Copyright (C) 2005 Graydon Hoare <graydon@pobox.com>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include "base.hh"
11#include <algorithm>
12#include <set>
13#include <utility>
14
15#include <boost/shared_ptr.hpp>
16
17#include "refiner.hh"
18#include "vocab.hh"
19#include "merkle_tree.hh"
20#include "netcmd.hh"
21
22using std::inserter;
23using std::make_pair;
24using std::set;
25using std::set_difference;
26using std::string;
27
28using boost::dynamic_bitset;
29
30// Our goal is to learn the complete set of items to send. To do this
31// we exchange two types of refinement commands: queries and responses.
32//
33// - On receiving a 'query' refinement for a node (p,l) you have:
34// - Compare the query node to your node (p,l), noting all the leaves
35// you must send as a result of what you learn in comparison.
36// - For each slot, if you have a subtree where the peer does not
37// (or you both do, and yours differs) send a sub-query for that
38// node, incrementing your query-in-flight counter.
39// - Send a 'response' refinement carrying your node (p,l)
40//
41// - On receiving a 'query' refinement for a node (p,l) you don't have:
42// - Send a 'response' refinement carrying an empty synthetic node (p,l)
43//
44// - On receiving a 'response' refinement for (p,l)
45// - Compare the query node to your node (p,l), noting all the leaves
46// you must send as a result of what you learn in comparison.
47// - Decrement your query-in-flight counter.
48//
49// The client kicks the process off by sending a query refinement for the
50// root node. When the client's query-in-flight counter drops to zero,
51// the client sends a done command, stating how many items it will be
52// sending.
53//
54// When the server receives a done command, it echoes it back stating how
55// many items *it* is going to send.
56//
57// When either side receives a done command, it transitions to
58// streaming send mode, sending all the items it's calculated.
59
60void
61refiner::note_local_item(id const & item)
62{
63 local_items.insert(item);
64 insert_into_merkle_tree(table, type, item, 0);
65}
66
67void
68refiner::reindex_local_items()
69{
70 recalculate_merkle_codes(table, prefix(""), 0);
71}
72
73void
74refiner::load_merkle_node(size_t level, prefix const & pref,
75 merkle_ptr & node)
76{
77 merkle_table::const_iterator j = table.find(make_pair(pref, level));
78 I(j != table.end());
79 node = j->second;
80}
81
82bool
83refiner::merkle_node_exists(size_t level,
84 prefix const & pref)
85{
86 merkle_table::const_iterator j = table.find(make_pair(pref, level));
87 return (j != table.end());
88}
89
90void
91refiner::calculate_items_to_send()
92{
93 if (calculated_items_to_send)
94 return;
95
96 items_to_send.clear();
97 items_to_receive = 0;
98
99 set_difference(local_items.begin(), local_items.end(),
100 peer_items.begin(), peer_items.end(),
101 inserter(items_to_send, items_to_send.begin()));
102
103 string typestr;
104 netcmd_item_type_to_string(type, typestr);
105
106 // L(FL("%s determined %d %s items to send")
107 // % voicestr() % items_to_send.size() % typestr);
108 calculated_items_to_send = true;
109}
110
111
112void
113refiner::send_subquery(merkle_node const & our_node, size_t slot)
114{
115 prefix subprefix;
116 our_node.extended_raw_prefix(slot, subprefix);
117 merkle_ptr our_subtree;
118 load_merkle_node(our_node.level + 1, subprefix, our_subtree);
119 // L(FL("%s queueing subquery on level %d\n") % voicestr() % (our_node.level + 1));
120 cb.queue_refine_cmd(refinement_query, *our_subtree);
121 ++queries_in_flight;
122}
123
124void
125refiner::send_synthetic_subquery(merkle_node const & our_node, size_t slot)
126{
127 id val;
128 size_t subslot;
129 dynamic_bitset<unsigned char> subprefix;
130
131 our_node.get_raw_slot(slot, val);
132 pick_slot_and_prefix_for_value(val, our_node.level + 1, subslot, subprefix);
133
134 merkle_node synth_node;
135 synth_node.pref = subprefix;
136 synth_node.level = our_node.level + 1;
137 synth_node.type = our_node.type;
138 synth_node.set_raw_slot(subslot, val);
139 synth_node.set_slot_state(subslot, our_node.get_slot_state(slot));
140
141 // L(FL("%s queueing synthetic subquery on level %d\n") % voicestr() % (our_node.level + 1));
142 cb.queue_refine_cmd(refinement_query, synth_node);
143 ++queries_in_flight;
144}
145
146void
147refiner::note_subtree_shared_with_peer(merkle_node const & our_node, size_t slot)
148{
149 prefix pref;
150 our_node.extended_raw_prefix(slot, pref);
151 collect_items_in_subtree(table, pref, our_node.level+1, peer_items);
152}
153
154refiner::refiner(netcmd_item_type type, protocol_voice voice, refiner_callbacks & cb)
155 : type(type), voice (voice), cb(cb),
156 sent_initial_query(false),
157 queries_in_flight(0),
158 calculated_items_to_send(false),
159 done(false),
160 items_to_receive(0)
161{
162 merkle_ptr root = merkle_ptr(new merkle_node());
163 root->type = type;
164 table.insert(make_pair(make_pair(prefix(""), 0), root));
165}
166
167void
168refiner::note_item_in_peer(merkle_node const & their_node, size_t slot)
169{
170 I(slot < constants::merkle_num_slots);
171 id slotval;
172 their_node.get_raw_slot(slot, slotval);
173 peer_items.insert(slotval);
174
175 // Write a debug message
176 /*
177 {
178 id slotval;
179 their_node.get_raw_slot(slot, slotval);
180
181 hexenc<prefix> hpref;
182 their_node.get_hex_prefix(hpref);
183
184 string typestr;
185 netcmd_item_type_to_string(their_node.type, typestr);
186
187 L(FL("%s's peer has %s '%s' at slot %d (in node '%s', level %d)")
188 % voicestr() % typestr % slotval
189 % slot % hpref % their_node.level);
190 }
191 */
192}
193
194
195void
196refiner::begin_refinement()
197{
198 merkle_ptr root;
199 load_merkle_node(0, prefix(""), root);
200 // L(FL("%s queueing initial node\n") % voicestr());
201 cb.queue_refine_cmd(refinement_query, *root);
202 ++queries_in_flight;
203 sent_initial_query = true;
204 string typestr;
205 netcmd_item_type_to_string(type, typestr);
206 L(FL("Beginning %s refinement on %s.") % typestr % voicestr());
207}
208
209void
210refiner::process_done_command(size_t n_items)
211{
212 string typestr;
213 netcmd_item_type_to_string(type, typestr);
214
215 calculate_items_to_send();
216 items_to_receive = n_items;
217
218 L(FL("%s finished %s refinement: %d to send, %d to receive")
219 % voicestr() % typestr % items_to_send.size() % items_to_receive);
220
221 /*
222 if (local_items.size() < 25)
223 {
224 // Debugging aid.
225 L(FL("+++ %d items in %s") % local_items.size() % voicestr());
226 for (set<id>::const_iterator i = local_items.begin();
227 i != local_items.end(); ++i)
228 {
229 L(FL("%s item %s") % voicestr() % *i);
230 }
231 L(FL("--- items in %s") % voicestr());
232 }
233 */
234
235 if (voice == server_voice)
236 {
237 // L(FL("server responding to [done %s %d] with [done %s %d]")
238 // % typestr % n_items % typestr % items_to_send.size());
239 cb.queue_done_cmd(type, items_to_send.size());
240 }
241
242 done = true;
243
244 // we can clear up the merkle trie's memory now
245 table.clear();
246}
247
248
249void
250refiner::process_refinement_command(refinement_type ty,
251 merkle_node const & their_node)
252{
253 prefix pref;
254 hexenc<prefix> hpref;
255 their_node.get_raw_prefix(pref);
256 their_node.get_hex_prefix(hpref);
257 string typestr;
258
259 netcmd_item_type_to_string(their_node.type, typestr);
260 size_t lev = static_cast<size_t>(their_node.level);
261
262 // L(FL("%s received refinement %s netcmd on %s node '%s', level %d") %
263 // voicestr() % (ty == refinement_query ? "query" : "response") %
264 // typestr % hpref % lev);
265
266 merkle_ptr our_node;
267
268 if (merkle_node_exists(their_node.level, pref))
269 load_merkle_node(their_node.level, pref, our_node);
270 else
271 {
272 // Synthesize empty node if we don't have one.
273 our_node = merkle_ptr(new merkle_node);
274 our_node->pref = their_node.pref;
275 our_node->level = their_node.level;
276 our_node->type = their_node.type;
277 }
278
279 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
280 {
281 // Note any leaves they have.
282 if (their_node.get_slot_state(slot) == leaf_state)
283 note_item_in_peer(their_node, slot);
284
285 if (ty == refinement_query)
286 {
287 // This block handles the interesting asymmetric cases of subtree
288 // vs. leaf.
289 //
290 // Note that in general we're not allowed to send a new query
291 // packet when we're looking at a response. This wrinkle is both
292 // why this block appears to do slightly more work than necessary,
293 // and why it's predicated on "ty == refinement_query". More detail
294 // in the cases below.
295
296 if (their_node.get_slot_state(slot) == leaf_state
297 && our_node->get_slot_state(slot) == subtree_state)
298 {
299 // If they have a leaf and we have a subtree, we need to look
300 // in our subtree to find if their leaf is present, and send
301 // them a "query" that will inform them, in passing, of the
302 // presence of our node.
303
304 id their_slotval;
305 their_node.get_raw_slot(slot, their_slotval);
306 size_t snum;
307 merkle_ptr mp;
308 if (locate_item(table, their_slotval, snum, mp))
309 {
310 cb.queue_refine_cmd(refinement_query, *mp);
311 ++queries_in_flight;
312 }
313
314 }
315
316 else if (their_node.get_slot_state(slot) == subtree_state
317 && our_node->get_slot_state(slot) == leaf_state)
318 {
319 // If they have a subtree and we have a leaf, we need to
320 // arrange for a subquery to explore the subtree looking for
321 // the leaf in *their* subtree. The tricky part is that we
322 // cannot have this subquery triggered by our response
323 // packet. We need to initiate a new (redundant) query here to
324 // prompt our peer to explore the subtree.
325 //
326 // This is purely for the sake of balancing the bracketing of
327 // queries and responses: if they were to reply to our
328 // response packet, our query-in-flight counter would have
329 // temporarily dropped to zero and we'd have initiated
330 // streaming send mode.
331 //
332 // Yes, the need to invert the sense of queries in this case
333 // represents a misdesign in this generation of the netsync
334 // protocol. It still contains much less hair than it used to,
335 // so I'm willing to accept it.
336
337 send_synthetic_subquery(*our_node, slot);
338 }
339
340 // Finally: if they had an empty slot in either case, there's no
341 // subtree exploration to perform; the response packet will inform
342 // the peer of everything relevant know about this node: namely
343 // that they're going to receive a complete subtree, we know
344 // what's in it, and we'll tell them how many nodes to expect in
345 // the aggregate count of the 'done' commane.
346
347 }
348
349 // Compare any subtrees, if we both have subtrees.
350 if (their_node.get_slot_state(slot) == subtree_state
351 && our_node->get_slot_state(slot) == subtree_state)
352 {
353 id our_slotval, their_slotval;
354 their_node.get_raw_slot(slot, their_slotval);
355 our_node->get_raw_slot(slot, our_slotval);
356
357 // Always note when you share a subtree.
358 if (their_slotval == our_slotval)
359 note_subtree_shared_with_peer(*our_node, slot);
360
361 // Send subqueries when you have a different subtree
362 // and you're answering a query message.
363 else if (ty == refinement_query)
364 send_subquery(*our_node, slot);
365 }
366 }
367
368 if (ty == refinement_response)
369 {
370 E((queries_in_flight > 0),
371 F("underflow on query-in-flight counter"));
372 --queries_in_flight;
373
374 // Possibly this signals the end of refinement.
375 if (voice == client_voice && queries_in_flight == 0)
376 {
377 string typestr;
378 netcmd_item_type_to_string(their_node.type, typestr);
379 calculate_items_to_send();
380 // L(FL("client sending [done %s %d]") % typestr % items_to_send.size());
381 cb.queue_done_cmd(type, items_to_send.size());
382 }
383 }
384 else
385 {
386 // Always reply to every query with the current node.
387 I(ty == refinement_query);
388 // L(FL("%s queueing response to query on %d\n") % voicestr() % our_node->level);
389 cb.queue_refine_cmd(refinement_response, *our_node);
390 }
391}
392
393#ifdef BUILD_UNIT_TESTS
394#include "randomizer.hh"
395#include "unit_tests.hh"
396
397#include <deque>
398#include <boost/shared_ptr.hpp>
399
400using std::deque;
401using boost::shared_ptr;
402
403struct
404refiner_pair
405{
406 // This structure acts as a mock netsync session. It's only purpose is to
407 // construct two refiners that are connected to one another, and route
408 // refinement calls back and forth between them.
409
410 struct
411 refiner_pair_callbacks : refiner_callbacks
412 {
413 refiner_pair & p;
414 bool is_client;
415 refiner_pair_callbacks(refiner_pair & p, bool is_client)
416 : p(p), is_client(is_client)
417 {}
418
419 virtual void queue_refine_cmd(refinement_type ty,
420 merkle_node const & our_node)
421 {
422 p.events.push_back(shared_ptr<msg>(new msg(is_client, ty, our_node)));
423 }
424
425 virtual void queue_done_cmd(netcmd_item_type ty,
426 size_t n_items)
427 {
428 p.events.push_back(shared_ptr<msg>(new msg(is_client, n_items)));
429 }
430 virtual ~refiner_pair_callbacks() {}
431 };
432
433 refiner_pair_callbacks client_cb;
434 refiner_pair_callbacks server_cb;
435 refiner client;
436 refiner server;
437
438 struct msg
439 {
440 msg(bool is_client, refinement_type ty, merkle_node const & node)
441 : op(refine),
442 ty(ty),
443 send_to_client(!is_client),
444 node(node)
445 {}
446
447 msg(bool is_client, size_t items)
448 : op(done),
449 send_to_client(!is_client),
450 n_items(items)
451 {}
452
453 enum { refine, done } op;
454 refinement_type ty;
455 bool send_to_client;
456 size_t n_items;
457 merkle_node node;
458 };
459
460 deque<shared_ptr<msg> > events;
461 size_t n_msgs;
462
463 void crank()
464 {
465
466 shared_ptr<msg> m = events.front();
467 events.pop_front();
468 ++n_msgs;
469
470 switch (m->op)
471 {
472
473 case msg::refine:
474 if (m->send_to_client)
475 client.process_refinement_command(m->ty, m->node);
476 else
477 server.process_refinement_command(m->ty, m->node);
478 break;
479
480 case msg::done:
481 if (m->send_to_client)
482 client.process_done_command(m->n_items);
483 else
484 server.process_done_command(m->n_items);
485 break;
486 }
487 }
488
489 refiner_pair(set<id> const & client_items,
490 set<id> const & server_items) :
491 client_cb(*this, true),
492 server_cb(*this, false),
493 // The item type here really doesn't matter.
494 client(file_item, client_voice, client_cb),
495 server(file_item, server_voice, server_cb),
496 n_msgs(0)
497 {
498 for (set<id>::const_iterator i = client_items.begin();
499 i != client_items.end(); ++i)
500 client.note_local_item(*i);
501
502 for (set<id>::const_iterator i = server_items.begin();
503 i != server_items.end(); ++i)
504 server.note_local_item(*i);
505
506 client.reindex_local_items();
507 server.reindex_local_items();
508 client.begin_refinement();
509
510 while (! events.empty())
511 crank();
512
513 // Refinement should have completed by here.
514 UNIT_TEST_CHECK(client.done);
515 UNIT_TEST_CHECK(server.done);
516
517 check_set_differences("client", client);
518 check_set_differences("server", server);
519 check_no_redundant_sends("client->server",
520 client.items_to_send,
521 server.get_local_items());
522 check_no_redundant_sends("server->client",
523 server.items_to_send,
524 client.get_local_items());
525 UNIT_TEST_CHECK(client.items_to_send.size() == server.items_to_receive);
526 UNIT_TEST_CHECK(server.items_to_send.size() == client.items_to_receive);
527 L(FL("stats: %d total, %d cs, %d sc, %d msgs")
528 % (server.items_to_send.size() + client.get_local_items().size())
529 % client.items_to_send.size()
530 % server.items_to_send.size()
531 % n_msgs);
532 }
533
534 void print_if_unequal(char const * context,
535 char const * name1,
536 set<id> const & set1,
537 char const * name2,
538 set<id> const & set2)
539 {
540 if (set1 != set2)
541 {
542 L(FL("WARNING: Unequal sets in %s!") % context);
543 for (set<id>::const_iterator i = set1.begin(); i != set1.end(); ++i)
544 {
545 L(FL("%s: %s") % name1 % *i);
546 }
547
548 for (set<id>::const_iterator i = set2.begin(); i != set2.end(); ++i)
549 {
550 L(FL("%s: %s") % name2 % *i);
551 }
552 L(FL("end of unequal sets"));
553 }
554 }
555
556 void check_no_redundant_sends(char const * context,
557 set<id> const & src,
558 set<id> const & dst)
559 {
560 for (set<id>::const_iterator i = src.begin(); i != src.end(); ++i)
561 {
562 set<id>::const_iterator j = dst.find(*i);
563 if (j != dst.end())
564 {
565 L(FL("WARNING: %s transmission will send redundant item %s")
566 % context % *i);
567 }
568 UNIT_TEST_CHECK(j == dst.end());
569 }
570 }
571
572 void check_set_differences(char const * context, refiner const & r)
573 {
574 set<id> tmp;
575 set_difference(r.get_local_items().begin(), r.get_local_items().end(),
576 r.get_peer_items().begin(), r.get_peer_items().end(),
577 inserter(tmp, tmp.begin()));
578 print_if_unequal(context,
579 "diff(local,peer)", tmp,
580 "items_to_send", r.items_to_send);
581
582 UNIT_TEST_CHECK(tmp == r.items_to_send);
583 }
584};
585
586
587void
588check_combinations_of_sets(set<id> const & s0,
589 set<id> const & a,
590 set<id> const & b)
591{
592 // Having composed our two input sets s0 and s1, we now construct the 2
593 // auxilary union-combinations of them -- {} and {s0 U s1} -- giving 4
594 // basic input sets. We then run 9 "interesting" pairwise combinations
595 // of these input sets.
596
597 set<id> e, u, v;
598 set_union(s0.begin(), s0.end(), a.begin(), a.end(), inserter(u, u.begin()));
599 set_union(s0.begin(), s0.end(), b.begin(), b.end(), inserter(v, v.begin()));
600
601 { refiner_pair x(e, u); } // a large initial transfer
602 { refiner_pair x(u, e); } // a large initial transfer
603
604 { refiner_pair x(s0, u); } // a mostly-shared superset/subset
605 { refiner_pair x(u, s0); } // a mostly-shared superset/subset
606
607 { refiner_pair x(a, u); } // a mostly-unshared superset/subset
608 { refiner_pair x(u, a); } // a mostly-unshared superset/subset
609
610 { refiner_pair x(u, v); } // things to send in both directions
611 { refiner_pair x(v, u); } // things to send in both directions
612
613 { refiner_pair x(u, u); } // a large no-op
614}
615
616
617void
618build_random_set(set<id> & s, size_t sz, bool clumpy, randomizer & rng)
619{
620 while (s.size() < sz)
621 {
622 string str(constants::merkle_hash_length_in_bytes, ' ');
623 for (size_t i = 0; i < constants::merkle_hash_length_in_bytes; ++i)
624 str[i] = static_cast<char>(rng.uniform(0xff));
625 s.insert(id(str));
626 if (clumpy && rng.flip())
627 {
628 size_t clumpsz = rng.uniform(7) + 1;
629 size_t pos = rng.flip() ? str.size() - 1 : rng.uniform(str.size());
630 for (size_t i = 0; s.size() < sz && i < clumpsz; ++i)
631 {
632 char c = str[pos];
633 if (c == static_cast<char>(0xff))
634 break;
635 ++c;
636 str[pos] = c;
637 s.insert(id(str));
638 }
639 }
640 }
641}
642
643size_t
644perturbed(size_t n, randomizer & rng)
645{
646 // we sometimes perturb sizes to deviate a bit from natural word-multiple sizes
647 if (rng.flip())
648 return n + rng.uniform(5);
649 return n;
650}
651
652size_t
653modulated_size(size_t base_set_size, size_t i)
654{
655 if (i < 3)
656 return i+1;
657 else
658 return static_cast<size_t>((static_cast<double>(i - 2) / 5.0)
659 * static_cast<double>(base_set_size));
660}
661
662
663void
664check_with_count(size_t base_set_size, randomizer & rng)
665{
666 if (base_set_size == 0)
667 return;
668
669 L(FL("running refinement check with base set size %d") % base_set_size);
670
671 // Our goal here is to construct a base set of a given size, and two
672 // secondary sets which will be combined with the base set in various
673 // ways.
674 //
675 // The secondary sets will be built at the following sizes:
676 //
677 // 1 element
678 // 2 elements
679 // 3 elements
680 // 0.2 * size of base set
681 // 0.4 * size of base set
682 // 0.8 * size of base set
683 //
684 // The base set is constructed in both clumpy and non-clumpy forms,
685 // making 6 * 6 * 2 = 72 variations.
686 //
687 // Since each group of sets creates 9 sync scenarios, each "size" creates
688 // 648 sync scenarios.
689
690 for (size_t c = 0; c < 2; ++c)
691 {
692 set<id> s0;
693 build_random_set(s0, perturbed(base_set_size, rng), c == 0, rng);
694
695 for (size_t a = 0; a < 6; ++a)
696 {
697 set<id> sa;
698 build_random_set(sa, modulated_size(perturbed(base_set_size, rng), a), false, rng);
699
700 for (size_t b = 0; b < 6; ++b)
701 {
702 set<id> sb;
703 build_random_set(sb, modulated_size(perturbed(base_set_size, rng), b), false, rng);
704 check_combinations_of_sets(s0, sa, sb);
705 }
706 }
707 }
708}
709
710UNIT_TEST(refiner, various_counts)
711{
712 {
713 // Once with zero-zero, for good measure.
714 set<id> s0;
715 refiner_pair x(s0, s0);
716 }
717
718 // We run 3 primary counts, giving 1944 tests. Note that there is some
719 // perturbation within the test, so we're not likely to feel side effects
720 // of landing on such pleasant round numbers.
721
722 randomizer rng;
723 check_with_count(1, rng);
724 check_with_count(128, rng);
725 check_with_count(1024, rng);
726}
727
728#endif
729
730// Local Variables:
731// mode: C++
732// fill-column: 76
733// c-file-style: "gnu"
734// indent-tabs-mode: nil
735// End:
736// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status