monotone

monotone Mtn Source Tree

Root/refiner.cc

1// Copyright (C) 2005 Graydon Hoare <graydon@pobox.com>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include <algorithm>
11#include <set>
12#include <string>
13#include <utility>
14
15#include <boost/shared_ptr.hpp>
16
17#include "refiner.hh"
18#include "vocab.hh"
19#include "merkle_tree.hh"
20#include "netcmd.hh"
21#include "netsync.hh"
22
23using std::inserter;
24using std::make_pair;
25using std::set;
26using std::set_difference;
27using std::string;
28
29using boost::dynamic_bitset;
30
31// Our goal is to learn the complete set of items to send. To do this
32// we exchange two types of refinement commands: queries and responses.
33//
34// - On receiving a 'query' refinement for a node (p,l) you have:
35// - Compare the query node to your node (p,l), noting all the leaves
36// you must send as a result of what you learn in comparison.
37// - For each slot, if you have a subtree where the peer does not
38// (or you both do, and yours differs) send a sub-query for that
39// node, incrementing your query-in-flight counter.
40// - Send a 'response' refinement carrying your node (p,l)
41//
42// - On receiving a 'query' refinement for a node (p,l) you don't have:
43// - Send a 'response' refinement carrying an empty synthetic node (p,l)
44//
45// - On receiving a 'response' refinement for (p,l)
46// - Compare the query node to your node (p,l), noting all the leaves
47// you must send as a result of what you learn in comparison.
48// - Decrement your query-in-flight counter.
49//
50// The client kicks the process off by sending a query refinement for the
51// root node. When the client's query-in-flight counter drops to zero,
52// the client sends a done command, stating how many items it will be
53// sending.
54//
55// When the server receives a done command, it echoes it back stating how
56// many items *it* is going to send.
57//
58// When either side receives a done command, it transitions to
59// streaming send mode, sending all the items it's calculated.
60
61void
62refiner::note_local_item(id const & item)
63{
64 local_items.insert(item);
65 insert_into_merkle_tree(table, type, item, 0);
66}
67
68void
69refiner::reindex_local_items()
70{
71 recalculate_merkle_codes(table, prefix(""), 0);
72}
73
74void
75refiner::load_merkle_node(size_t level, prefix const & pref,
76 merkle_ptr & node)
77{
78 merkle_table::const_iterator j = table.find(make_pair(pref, level));
79 I(j != table.end());
80 node = j->second;
81}
82
83bool
84refiner::merkle_node_exists(size_t level,
85 prefix const & pref)
86{
87 merkle_table::const_iterator j = table.find(make_pair(pref, level));
88 return (j != table.end());
89}
90
91void
92refiner::calculate_items_to_send()
93{
94 if (calculated_items_to_send)
95 return;
96
97 items_to_send.clear();
98 items_to_receive = 0;
99
100 set_difference(local_items.begin(), local_items.end(),
101 peer_items.begin(), peer_items.end(),
102 inserter(items_to_send, items_to_send.begin()));
103
104 string typestr;
105 netcmd_item_type_to_string(type, typestr);
106
107 // L(FL("%s determined %d %s items to send")
108 // % voicestr() % items_to_send.size() % typestr);
109 calculated_items_to_send = true;
110}
111
112
113void
114refiner::send_subquery(merkle_node const & our_node, size_t slot)
115{
116 prefix subprefix;
117 our_node.extended_raw_prefix(slot, subprefix);
118 merkle_ptr our_subtree;
119 load_merkle_node(our_node.level + 1, subprefix, our_subtree);
120 // L(FL("%s queueing subquery on level %d\n") % voicestr() % (our_node.level + 1));
121 cb.queue_refine_cmd(refinement_query, *our_subtree);
122 ++queries_in_flight;
123}
124
125void
126refiner::send_synthetic_subquery(merkle_node const & our_node, size_t slot)
127{
128 id val;
129 size_t subslot;
130 dynamic_bitset<unsigned char> subprefix;
131
132 our_node.get_raw_slot(slot, val);
133 pick_slot_and_prefix_for_value(val, our_node.level + 1, subslot, subprefix);
134
135 merkle_node synth_node;
136 synth_node.pref = subprefix;
137 synth_node.level = our_node.level + 1;
138 synth_node.type = our_node.type;
139 synth_node.set_raw_slot(subslot, val);
140 synth_node.set_slot_state(subslot, our_node.get_slot_state(slot));
141
142 // L(FL("%s queueing synthetic subquery on level %d\n") % voicestr() % (our_node.level + 1));
143 cb.queue_refine_cmd(refinement_query, synth_node);
144 ++queries_in_flight;
145}
146
147void
148refiner::note_subtree_shared_with_peer(merkle_node const & our_node, size_t slot)
149{
150 prefix pref;
151 our_node.extended_raw_prefix(slot, pref);
152 collect_items_in_subtree(table, pref, our_node.level+1, peer_items);
153}
154
155refiner::refiner(netcmd_item_type type, protocol_voice voice, refiner_callbacks & cb)
156 : type(type), voice (voice), cb(cb),
157 sent_initial_query(false),
158 queries_in_flight(0),
159 calculated_items_to_send(false),
160 done(false),
161 items_to_receive(0)
162{
163 merkle_ptr root = merkle_ptr(new merkle_node());
164 root->type = type;
165 table.insert(make_pair(make_pair(prefix(""), 0), root));
166}
167
168void
169refiner::note_item_in_peer(merkle_node const & their_node, size_t slot)
170{
171 I(slot < constants::merkle_num_slots);
172 id slotval;
173 their_node.get_raw_slot(slot, slotval);
174 peer_items.insert(slotval);
175
176 // Write a debug message
177 /*
178 {
179 hexenc<id> hslotval;
180 their_node.get_hex_slot(slot, hslotval);
181
182 hexenc<prefix> hpref;
183 their_node.get_hex_prefix(hpref);
184
185 string typestr;
186 netcmd_item_type_to_string(their_node.type, typestr);
187
188 L(FL("%s's peer has %s '%s' at slot %d (in node '%s', level %d)")
189 % voicestr() % typestr % hslotval % slot % hpref % their_node.level);
190 }
191 */
192}
193
194
195void
196refiner::begin_refinement()
197{
198 merkle_ptr root;
199 load_merkle_node(0, prefix(""), root);
200 // L(FL("%s queueing initial node\n") % voicestr());
201 cb.queue_refine_cmd(refinement_query, *root);
202 ++queries_in_flight;
203 sent_initial_query = true;
204 string typestr;
205 netcmd_item_type_to_string(type, typestr);
206 L(FL("Beginning %s refinement on %s.") % typestr % voicestr());
207}
208
209void
210refiner::process_done_command(size_t n_items)
211{
212 string typestr;
213 netcmd_item_type_to_string(type, typestr);
214
215 calculate_items_to_send();
216 items_to_receive = n_items;
217
218 L(FL("%s finished %s refinement: %d to send, %d to receive")
219 % voicestr() % typestr % items_to_send.size() % items_to_receive);
220
221 /*
222 if (local_items.size() < 25)
223 {
224 // Debugging aid.
225 L(FL("+++ %d items in %s") % local_items.size() % voicestr());
226 for (set<id>::const_iterator i = local_items.begin();
227 i != local_items.end(); ++i)
228 {
229 hexenc<id> hid;
230 encode_hexenc(*i, hid);
231 L(FL("%s item %s") % voicestr() % hid);
232 }
233 L(FL("--- items in %s") % voicestr());
234 }
235 */
236
237 if (voice == server_voice)
238 {
239 // L(FL("server responding to [done %s %d] with [done %s %d]")
240 // % typestr % n_items % typestr % items_to_send.size());
241 cb.queue_done_cmd(type, items_to_send.size());
242 }
243
244 done = true;
245}
246
247
248void
249refiner::process_refinement_command(refinement_type ty,
250 merkle_node const & their_node)
251{
252 prefix pref;
253 hexenc<prefix> hpref;
254 their_node.get_raw_prefix(pref);
255 their_node.get_hex_prefix(hpref);
256 string typestr;
257
258 netcmd_item_type_to_string(their_node.type, typestr);
259 size_t lev = static_cast<size_t>(their_node.level);
260
261 // L(FL("%s received refinement %s netcmd on %s node '%s', level %d") %
262 // voicestr() % (ty == refinement_query ? "query" : "response") %
263 // typestr % hpref % lev);
264
265 merkle_ptr our_node;
266
267 if (merkle_node_exists(their_node.level, pref))
268 load_merkle_node(their_node.level, pref, our_node);
269 else
270 {
271 // Synthesize empty node if we don't have one.
272 our_node = merkle_ptr(new merkle_node);
273 our_node->pref = their_node.pref;
274 our_node->level = their_node.level;
275 our_node->type = their_node.type;
276 }
277
278 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
279 {
280 // Note any leaves they have.
281 if (their_node.get_slot_state(slot) == leaf_state)
282 note_item_in_peer(their_node, slot);
283
284 if (ty == refinement_query)
285 {
286 // This block handles the interesting asymmetric cases of subtree
287 // vs. leaf.
288 //
289 // Note that in general we're not allowed to send a new query
290 // packet when we're looking at a response. This wrinkle is both
291 // why this block appears to do slightly more work than necessary,
292 // and why it's predicated on "ty == refinement_query". More detail
293 // in the cases below.
294
295 if (their_node.get_slot_state(slot) == leaf_state
296 && our_node->get_slot_state(slot) == subtree_state)
297 {
298 // If they have a leaf and we have a subtree, we need to look
299 // in our subtree to find if their leaf is present, and send
300 // them a "query" that will inform them, in passing, of the
301 // presence of our node.
302
303 id their_slotval;
304 their_node.get_raw_slot(slot, their_slotval);
305 size_t snum;
306 merkle_ptr mp;
307 if (locate_item(table, their_slotval, snum, mp))
308 {
309 cb.queue_refine_cmd(refinement_query, *mp);
310 ++queries_in_flight;
311 }
312
313 }
314
315 else if (their_node.get_slot_state(slot) == subtree_state
316 && our_node->get_slot_state(slot) == leaf_state)
317 {
318 // If they have a subtree and we have a leaf, we need to
319 // arrange for a subquery to explore the subtree looking for
320 // the leaf in *their* subtree. The tricky part is that we
321 // cannot have this subquery triggered by our response
322 // packet. We need to initiate a new (redundant) query here to
323 // prompt our peer to explore the subtree.
324 //
325 // This is purely for the sake of balancing the bracketing of
326 // queries and responses: if they were to reply to our
327 // response packet, our query-in-flight counter would have
328 // temporarily dropped to zero and we'd have initiated
329 // streaming send mode.
330 //
331 // Yes, the need to invert the sense of queries in this case
332 // represents a misdesign in this generation of the netsync
333 // protocol. It still contains much less hair than it used to,
334 // so I'm willing to accept it.
335
336 send_synthetic_subquery(*our_node, slot);
337 }
338
339 // Finally: if they had an empty slot in either case, there's no
340 // subtree exploration to perform; the response packet will inform
341 // the peer of everything relevant know about this node: namely
342 // that they're going to receive a complete subtree, we know
343 // what's in it, and we'll tell them how many nodes to expect in
344 // the aggregate count of the 'done' commane.
345
346 }
347
348 // Compare any subtrees, if we both have subtrees.
349 if (their_node.get_slot_state(slot) == subtree_state
350 && our_node->get_slot_state(slot) == subtree_state)
351 {
352 id our_slotval, their_slotval;
353 their_node.get_raw_slot(slot, their_slotval);
354 our_node->get_raw_slot(slot, our_slotval);
355
356 // Always note when you share a subtree.
357 if (their_slotval == our_slotval)
358 note_subtree_shared_with_peer(*our_node, slot);
359
360 // Send subqueries when you have a different subtree
361 // and you're answering a query message.
362 else if (ty == refinement_query)
363 send_subquery(*our_node, slot);
364 }
365 }
366
367 if (ty == refinement_response)
368 {
369 E((queries_in_flight > 0),
370 F("underflow on query-in-flight counter"));
371 --queries_in_flight;
372
373 // Possibly this signals the end of refinement.
374 if (voice == client_voice && queries_in_flight == 0)
375 {
376 string typestr;
377 netcmd_item_type_to_string(their_node.type, typestr);
378 calculate_items_to_send();
379 // L(FL("client sending [done %s %d]") % typestr % items_to_send.size());
380 cb.queue_done_cmd(type, items_to_send.size());
381 }
382 }
383 else
384 {
385 // Always reply to every query with the current node.
386 I(ty == refinement_query);
387 // L(FL("%s queueing response to query on %d\n") % voicestr() % our_node->level);
388 cb.queue_refine_cmd(refinement_response, *our_node);
389 }
390}
391
392#ifdef BUILD_UNIT_TESTS
393#include "randomizer.hh"
394#include "unit_tests.hh"
395
396#include <deque>
397#include <boost/shared_ptr.hpp>
398
399using std::deque;
400using boost::shared_ptr;
401
402struct
403refiner_pair
404{
405 // This structure acts as a mock netsync session. It's only purpose is to
406 // construct two refiners that are connected to one another, and route
407 // refinement calls back and forth between them.
408
409 struct
410 refiner_pair_callbacks : refiner_callbacks
411 {
412 refiner_pair & p;
413 bool is_client;
414 refiner_pair_callbacks(refiner_pair & p, bool is_client)
415 : p(p), is_client(is_client)
416 {}
417
418 virtual void queue_refine_cmd(refinement_type ty,
419 merkle_node const & our_node)
420 {
421 p.events.push_back(shared_ptr<msg>(new msg(is_client, ty, our_node)));
422 }
423
424 virtual void queue_done_cmd(netcmd_item_type ty,
425 size_t n_items)
426 {
427 p.events.push_back(shared_ptr<msg>(new msg(is_client, n_items)));
428 }
429 virtual ~refiner_pair_callbacks() {}
430 };
431
432 refiner_pair_callbacks client_cb;
433 refiner_pair_callbacks server_cb;
434 refiner client;
435 refiner server;
436
437 struct msg
438 {
439 msg(bool is_client, refinement_type ty, merkle_node const & node)
440 : op(refine),
441 ty(ty),
442 send_to_client(!is_client),
443 node(node)
444 {}
445
446 msg(bool is_client, size_t items)
447 : op(done),
448 send_to_client(!is_client),
449 n_items(items)
450 {}
451
452 enum { refine, done } op;
453 refinement_type ty;
454 bool send_to_client;
455 size_t n_items;
456 merkle_node node;
457 };
458
459 deque<shared_ptr<msg> > events;
460 size_t n_msgs;
461
462 void crank()
463 {
464
465 shared_ptr<msg> m = events.front();
466 events.pop_front();
467 ++n_msgs;
468
469 switch (m->op)
470 {
471
472 case msg::refine:
473 if (m->send_to_client)
474 client.process_refinement_command(m->ty, m->node);
475 else
476 server.process_refinement_command(m->ty, m->node);
477 break;
478
479 case msg::done:
480 if (m->send_to_client)
481 client.process_done_command(m->n_items);
482 else
483 server.process_done_command(m->n_items);
484 break;
485 }
486 }
487
488 refiner_pair(set<id> const & client_items,
489 set<id> const & server_items) :
490 client_cb(*this, true),
491 server_cb(*this, false),
492 // The item type here really doesn't matter.
493 client(file_item, client_voice, client_cb),
494 server(file_item, server_voice, server_cb),
495 n_msgs(0)
496 {
497 for (set<id>::const_iterator i = client_items.begin();
498 i != client_items.end(); ++i)
499 client.note_local_item(*i);
500
501 for (set<id>::const_iterator i = server_items.begin();
502 i != server_items.end(); ++i)
503 server.note_local_item(*i);
504
505 client.reindex_local_items();
506 server.reindex_local_items();
507 client.begin_refinement();
508
509 while (! events.empty())
510 crank();
511
512 // Refinement should have completed by here.
513 BOOST_CHECK(client.done);
514 BOOST_CHECK(server.done);
515
516 check_set_differences("client", client);
517 check_set_differences("server", server);
518 check_no_redundant_sends("client->server",
519 client.items_to_send,
520 server.get_local_items());
521 check_no_redundant_sends("server->client",
522 server.items_to_send,
523 client.get_local_items());
524 BOOST_CHECK(client.items_to_send.size() == server.items_to_receive);
525 BOOST_CHECK(server.items_to_send.size() == client.items_to_receive);
526 L(FL("stats: %d total, %d cs, %d sc, %d msgs")
527 % (server.items_to_send.size() + client.get_local_items().size())
528 % client.items_to_send.size()
529 % server.items_to_send.size()
530 % n_msgs);
531 }
532
533 void print_if_unequal(char const * context,
534 char const * name1,
535 set<id> const & set1,
536 char const * name2,
537 set<id> const & set2)
538 {
539 if (set1 != set2)
540 {
541 L(FL("WARNING: Unequal sets in %s!") % context);
542 for (set<id>::const_iterator i = set1.begin(); i != set1.end(); ++i)
543 {
544 hexenc<id> hid;
545 encode_hexenc(*i, hid);
546 L(FL("%s: %s") % name1 % hid);
547 }
548
549 for (set<id>::const_iterator i = set2.begin(); i != set2.end(); ++i)
550 {
551 hexenc<id> hid;
552 encode_hexenc(*i, hid);
553 L(FL("%s: %s") % name2 % hid);
554 }
555 L(FL("end of unequal sets"));
556 }
557 }
558
559 void check_no_redundant_sends(char const * context,
560 set<id> const & src,
561 set<id> const & dst)
562 {
563 for (set<id>::const_iterator i = src.begin(); i != src.end(); ++i)
564 {
565 set<id>::const_iterator j = dst.find(*i);
566 if (j != dst.end())
567 {
568 hexenc<id> hid;
569 encode_hexenc(*i, hid);
570 L(FL("WARNING: %s transmission will send redundant item %s")
571 % context % hid);
572 }
573 BOOST_CHECK(j == dst.end());
574 }
575 }
576
577 void check_set_differences(char const * context, refiner const & r)
578 {
579 set<id> tmp;
580 set_difference(r.get_local_items().begin(), r.get_local_items().end(),
581 r.get_peer_items().begin(), r.get_peer_items().end(),
582 inserter(tmp, tmp.begin()));
583 print_if_unequal(context,
584 "diff(local,peer)", tmp,
585 "items_to_send", r.items_to_send);
586
587 BOOST_CHECK(tmp == r.items_to_send);
588 }
589};
590
591
592void
593check_combinations_of_sets(set<id> const & s0,
594 set<id> const & a,
595 set<id> const & b)
596{
597 // Having composed our two input sets s0 and s1, we now construct the 2
598 // auxilary union-combinations of them -- {} and {s0 U s1} -- giving 4
599 // basic input sets. We then run 9 "interesting" pairwise combinations
600 // of these input sets.
601
602 set<id> e, u, v;
603 set_union(s0.begin(), s0.end(), a.begin(), a.end(), inserter(u, u.begin()));
604 set_union(s0.begin(), s0.end(), b.begin(), b.end(), inserter(v, v.begin()));
605
606 { refiner_pair x(e, u); } // a large initial transfer
607 { refiner_pair x(u, e); } // a large initial transfer
608
609 { refiner_pair x(s0, u); } // a mostly-shared superset/subset
610 { refiner_pair x(u, s0); } // a mostly-shared superset/subset
611
612 { refiner_pair x(a, u); } // a mostly-unshared superset/subset
613 { refiner_pair x(u, a); } // a mostly-unshared superset/subset
614
615 { refiner_pair x(u, v); } // things to send in both directions
616 { refiner_pair x(v, u); } // things to send in both directions
617
618 { refiner_pair x(u, u); } // a large no-op
619}
620
621
622void
623build_random_set(set<id> & s, size_t sz, bool clumpy, randomizer & rng)
624{
625 while (s.size() < sz)
626 {
627 string str(constants::merkle_hash_length_in_bytes, ' ');
628 for (size_t i = 0; i < constants::merkle_hash_length_in_bytes; ++i)
629 str[i] = static_cast<char>(rng.uniform(0xff));
630 s.insert(id(str));
631 if (clumpy && rng.flip())
632 {
633 size_t clumpsz = rng.uniform(7) + 1;
634 size_t pos = rng.flip() ? str.size() - 1 : rng.uniform(str.size());
635 for (size_t i = 0; s.size() < sz && i < clumpsz; ++i)
636 {
637 char c = str[pos];
638 if (c == static_cast<char>(0xff))
639 break;
640 ++c;
641 str[pos] = c;
642 s.insert(id(str));
643 }
644 }
645 }
646}
647
648size_t
649perturbed(size_t n, randomizer & rng)
650{
651 // we sometimes perturb sizes to deviate a bit from natural word-multiple sizes
652 if (rng.flip())
653 return n + rng.uniform(5);
654 return n;
655}
656
657size_t
658modulated_size(size_t base_set_size, size_t i)
659{
660 if (i < 3)
661 return i+1;
662 else
663 return static_cast<size_t>((static_cast<double>(i - 2) / 5.0)
664 * static_cast<double>(base_set_size));
665}
666
667
668void
669check_with_count(size_t base_set_size, randomizer & rng)
670{
671 if (base_set_size == 0)
672 return;
673
674 L(FL("running refinement check with base set size %d") % base_set_size);
675
676 // Our goal here is to construct a base set of a given size, and two
677 // secondary sets which will be combined with the base set in various
678 // ways.
679 //
680 // The secondary sets will be built at the following sizes:
681 //
682 // 1 element
683 // 2 elements
684 // 3 elements
685 // 0.2 * size of base set
686 // 0.4 * size of base set
687 // 0.8 * size of base set
688 //
689 // The base set is constructed in both clumpy and non-clumpy forms,
690 // making 6 * 6 * 2 = 72 variations.
691 //
692 // Since each group of sets creates 9 sync scenarios, each "size" creates
693 // 648 sync scenarios.
694
695 for (size_t c = 0; c < 2; ++c)
696 {
697 set<id> s0;
698 build_random_set(s0, perturbed(base_set_size, rng), c == 0, rng);
699
700 for (size_t a = 0; a < 6; ++a)
701 {
702 set<id> sa;
703 build_random_set(sa, modulated_size(perturbed(base_set_size, rng), a), false, rng);
704
705 for (size_t b = 0; b < 6; ++b)
706 {
707 set<id> sb;
708 build_random_set(sb, modulated_size(perturbed(base_set_size, rng), b), false, rng);
709 check_combinations_of_sets(s0, sa, sb);
710 }
711 }
712 }
713}
714
715UNIT_TEST(refiner, various_counts)
716{
717 {
718 // Once with zero-zero, for good measure.
719 set<id> s0;
720 refiner_pair x(s0, s0);
721 }
722
723 // We run 3 primary counts, giving 1944 tests. Note that there is some
724 // perturbation within the test, so we're not likely to feel side effects
725 // of landing on such pleasant round numbers.
726
727 randomizer rng;
728 check_with_count(1, rng);
729 check_with_count(128, rng);
730 check_with_count(1024, rng);
731}
732
733#endif
734
735// Local Variables:
736// mode: C++
737// fill-column: 76
738// c-file-style: "gnu"
739// indent-tabs-mode: nil
740// End:
741// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status