monotone

monotone Mtn Source Tree

Root/refiner.cc

1// Copyright (C) 2005 Graydon Hoare <graydon@pobox.com>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include <algorithm>
11#include <set>
12#include <string>
13#include <utility>
14
15#include <boost/shared_ptr.hpp>
16
17#include "refiner.hh"
18#include "vocab.hh"
19#include "merkle_tree.hh"
20#include "netcmd.hh"
21#include "netsync.hh"
22
23using std::inserter;
24using std::make_pair;
25using std::set;
26using std::set_difference;
27using std::string;
28
29using boost::dynamic_bitset;
30
31// Our goal is to learn the complete set of items to send. To do this
32// we exchange two types of refinement commands: queries and responses.
33//
34// - On receiving a 'query' refinement for a node (p,l) you have:
35// - Compare the query node to your node (p,l), noting all the leaves
36// you must send as a result of what you learn in comparison.
37// - For each slot, if you have a subtree where the peer does not
38// (or you both do, and yours differs) send a sub-query for that
39// node, incrementing your query-in-flight counter.
40// - Send a 'response' refinement carrying your node (p,l)
41//
42// - On receiving a 'query' refinement for a node (p,l) you don't have:
43// - Send a 'response' refinement carrying an empty synthetic node (p,l)
44//
45// - On receiving a 'response' refinement for (p,l)
46// - Compare the query node to your node (p,l), noting all the leaves
47// you must send as a result of what you learn in comparison.
48// - Decrement your query-in-flight counter.
49//
50// The client kicks the process off by sending a query refinement for the
51// root node. When the client's query-in-flight counter drops to zero,
52// the client sends a done command, stating how many items it will be
53// sending.
54//
55// When the server receives a done command, it echoes it back stating how
56// many items *it* is going to send.
57//
58// When either side receives a done command, it transitions to
59// streaming send mode, sending all the items it's calculated.
60
61void
62refiner::note_local_item(id const & item)
63{
64 local_items.insert(item);
65 insert_into_merkle_tree(table, type, item, 0);
66}
67
68void
69refiner::reindex_local_items()
70{
71 recalculate_merkle_codes(table, prefix(""), 0);
72}
73
74void
75refiner::load_merkle_node(size_t level, prefix const & pref,
76 merkle_ptr & node)
77{
78 merkle_table::const_iterator j = table.find(make_pair(pref, level));
79 I(j != table.end());
80 node = j->second;
81}
82
83bool
84refiner::merkle_node_exists(size_t level,
85 prefix const & pref)
86{
87 merkle_table::const_iterator j = table.find(make_pair(pref, level));
88 return (j != table.end());
89}
90
91void
92refiner::calculate_items_to_send()
93{
94 if (calculated_items_to_send)
95 return;
96
97 items_to_send.clear();
98 items_to_receive = 0;
99
100 set_difference(local_items.begin(), local_items.end(),
101 peer_items.begin(), peer_items.end(),
102 inserter(items_to_send, items_to_send.begin()));
103
104 string typestr;
105 netcmd_item_type_to_string(type, typestr);
106
107 // L(FL("%s determined %d %s items to send")
108 // % voicestr() % items_to_send.size() % typestr);
109 calculated_items_to_send = true;
110}
111
112
113void
114refiner::send_subquery(merkle_node const & our_node, size_t slot)
115{
116 prefix subprefix;
117 our_node.extended_raw_prefix(slot, subprefix);
118 merkle_ptr our_subtree;
119 load_merkle_node(our_node.level + 1, subprefix, our_subtree);
120 // L(FL("%s queueing subquery on level %d\n") % voicestr() % (our_node.level + 1));
121 cb.queue_refine_cmd(refinement_query, *our_subtree);
122 ++queries_in_flight;
123}
124
125void
126refiner::send_synthetic_subquery(merkle_node const & our_node, size_t slot)
127{
128 id val;
129 size_t subslot;
130 dynamic_bitset<unsigned char> subprefix;
131
132 our_node.get_raw_slot(slot, val);
133 pick_slot_and_prefix_for_value(val, our_node.level + 1, subslot, subprefix);
134
135 merkle_node synth_node;
136 synth_node.pref = subprefix;
137 synth_node.level = our_node.level + 1;
138 synth_node.type = our_node.type;
139 synth_node.set_raw_slot(subslot, val);
140 synth_node.set_slot_state(subslot, our_node.get_slot_state(slot));
141
142 // L(FL("%s queueing synthetic subquery on level %d\n") % voicestr() % (our_node.level + 1));
143 cb.queue_refine_cmd(refinement_query, synth_node);
144 ++queries_in_flight;
145}
146
147void
148refiner::note_subtree_shared_with_peer(merkle_node const & our_node, size_t slot)
149{
150 prefix pref;
151 our_node.extended_raw_prefix(slot, pref);
152 collect_items_in_subtree(table, pref, our_node.level+1, peer_items);
153}
154
155refiner::refiner(netcmd_item_type type, protocol_voice voice, refiner_callbacks & cb)
156 : type(type), voice (voice), cb(cb),
157 sent_initial_query(false),
158 queries_in_flight(0),
159 calculated_items_to_send(false),
160 done(false),
161 items_to_receive(0)
162{
163 merkle_ptr root = merkle_ptr(new merkle_node());
164 root->type = type;
165 table.insert(make_pair(make_pair(prefix(""), 0), root));
166}
167
168void
169refiner::note_item_in_peer(merkle_node const & their_node, size_t slot)
170{
171 I(slot < constants::merkle_num_slots);
172 id slotval;
173 their_node.get_raw_slot(slot, slotval);
174 peer_items.insert(slotval);
175
176 // Write a debug message
177 /*
178 {
179 hexenc<id> hslotval;
180 their_node.get_hex_slot(slot, hslotval);
181
182 hexenc<prefix> hpref;
183 their_node.get_hex_prefix(hpref);
184
185 string typestr;
186 netcmd_item_type_to_string(their_node.type, typestr);
187
188 L(FL("%s's peer has %s '%s' at slot %d (in node '%s', level %d)")
189 % voicestr() % typestr % hslotval % slot % hpref % their_node.level);
190 }
191 */
192}
193
194
195void
196refiner::begin_refinement()
197{
198 merkle_ptr root;
199 load_merkle_node(0, prefix(""), root);
200 // L(FL("%s queueing initial node\n") % voicestr());
201 cb.queue_refine_cmd(refinement_query, *root);
202 ++queries_in_flight;
203 sent_initial_query = true;
204 string typestr;
205 netcmd_item_type_to_string(type, typestr);
206 L(FL("Beginning %s refinement on %s.") % typestr % voicestr());
207}
208
209void
210refiner::process_done_command(size_t n_items)
211{
212 string typestr;
213 netcmd_item_type_to_string(type, typestr);
214
215 calculate_items_to_send();
216 items_to_receive = n_items;
217
218 L(FL("%s finished %s refinement: %d to send, %d to receive")
219 % voicestr() % typestr % items_to_send.size() % items_to_receive);
220
221 /*
222 if (local_items.size() < 25)
223 {
224 // Debugging aid.
225 L(FL("+++ %d items in %s") % local_items.size() % voicestr());
226 for (set<id>::const_iterator i = local_items.begin();
227 i != local_items.end(); ++i)
228 {
229 hexenc<id> hid;
230 encode_hexenc(*i, hid);
231 L(FL("%s item %s") % voicestr() % hid);
232 }
233 L(FL("--- items in %s") % voicestr());
234 }
235 */
236
237 if (voice == server_voice)
238 {
239 // L(FL("server responding to [done %s %d] with [done %s %d]")
240 // % typestr % n_items % typestr % items_to_send.size());
241 cb.queue_done_cmd(type, items_to_send.size());
242 }
243
244 done = true;
245}
246
247
248void
249refiner::process_refinement_command(refinement_type ty,
250 merkle_node const & their_node)
251{
252 prefix pref;
253 hexenc<prefix> hpref;
254 their_node.get_raw_prefix(pref);
255 their_node.get_hex_prefix(hpref);
256 string typestr;
257
258 netcmd_item_type_to_string(their_node.type, typestr);
259 size_t lev = static_cast<size_t>(their_node.level);
260
261 // L(FL("%s received refinement %s netcmd on %s node '%s', level %d") %
262 // voicestr() % (ty == refinement_query ? "query" : "response") %
263 // typestr % hpref % lev);
264
265 merkle_ptr our_node;
266
267 if (merkle_node_exists(their_node.level, pref))
268 load_merkle_node(their_node.level, pref, our_node);
269 else
270 {
271 // Synthesize empty node if we don't have one.
272 our_node = merkle_ptr(new merkle_node);
273 our_node->pref = their_node.pref;
274 our_node->level = their_node.level;
275 our_node->type = their_node.type;
276 }
277
278 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
279 {
280 // Note any leaves they have.
281 if (their_node.get_slot_state(slot) == leaf_state)
282 note_item_in_peer(their_node, slot);
283
284 if (ty == refinement_query)
285 {
286 // This block handles the interesting asymmetric cases of subtree
287 // vs. leaf.
288 //
289 // Note that in general we're not allowed to send a new query
290 // packet when we're looking at a response. This wrinkle is both
291 // why this block appears to do slightly more work than necessary,
292 // and why it's predicated on "ty == refinement_query". More detail
293 // in the cases below.
294
295 if (their_node.get_slot_state(slot) == leaf_state
296 && our_node->get_slot_state(slot) == subtree_state)
297 {
298 // If they have a leaf and we have a subtree, we need to look
299 // in our subtree to find if their leaf is present, and send
300 // them a "query" that will inform them, in passing, of the
301 // presence of our node.
302
303 id their_slotval;
304 their_node.get_raw_slot(slot, their_slotval);
305 size_t snum;
306 merkle_ptr mp;
307 if (locate_item(table, their_slotval, snum, mp))
308 {
309 cb.queue_refine_cmd(refinement_query, *mp);
310 ++queries_in_flight;
311 }
312
313 }
314
315 else if (their_node.get_slot_state(slot) == subtree_state
316 && our_node->get_slot_state(slot) == leaf_state)
317 {
318 // If they have a subtree and we have a leaf, we need to
319 // arrange for a subquery to explore the subtree looking for
320 // the leaf in *their* subtree. The tricky part is that we
321 // cannot have this subquery triggered by our response
322 // packet. We need to initiate a new (redundant) query here to
323 // prompt our peer to explore the subtree.
324 //
325 // This is purely for the sake of balancing the bracketing of
326 // queries and responses: if they were to reply to our
327 // response packet, our query-in-flight counter would have
328 // temporarily dropped to zero and we'd have initiated
329 // streaming send mode.
330 //
331 // Yes, the need to invert the sense of queries in this case
332 // represents a misdesign in this generation of the netsync
333 // protocol. It still contains much less hair than it used to,
334 // so I'm willing to accept it.
335
336 send_synthetic_subquery(*our_node, slot);
337 }
338
339 // Finally: if they had an empty slot in either case, there's no
340 // subtree exploration to perform; the response packet will inform
341 // the peer of everything relevant know about this node: namely
342 // that they're going to receive a complete subtree, we know
343 // what's in it, and we'll tell them how many nodes to expect in
344 // the aggregate count of the 'done' commane.
345
346 }
347
348 // Compare any subtrees, if we both have subtrees.
349 if (their_node.get_slot_state(slot) == subtree_state
350 && our_node->get_slot_state(slot) == subtree_state)
351 {
352 id our_slotval, their_slotval;
353 their_node.get_raw_slot(slot, their_slotval);
354 our_node->get_raw_slot(slot, our_slotval);
355
356 // Always note when you share a subtree.
357 if (their_slotval == our_slotval)
358 note_subtree_shared_with_peer(*our_node, slot);
359
360 // Send subqueries when you have a different subtree
361 // and you're answering a query message.
362 else if (ty == refinement_query)
363 send_subquery(*our_node, slot);
364 }
365 }
366
367 if (ty == refinement_response)
368 {
369 E((queries_in_flight > 0),
370 F("underflow on query-in-flight counter"));
371 --queries_in_flight;
372
373 // Possibly this signals the end of refinement.
374 if (voice == client_voice && queries_in_flight == 0)
375 {
376 string typestr;
377 netcmd_item_type_to_string(their_node.type, typestr);
378 calculate_items_to_send();
379 // L(FL("client sending [done %s %d]") % typestr % items_to_send.size());
380 cb.queue_done_cmd(type, items_to_send.size());
381 }
382 }
383 else
384 {
385 // Always reply to every query with the current node.
386 I(ty == refinement_query);
387 // L(FL("%s queueing response to query on %d\n") % voicestr() % our_node->level);
388 cb.queue_refine_cmd(refinement_response, *our_node);
389 }
390}
391
392#ifdef BUILD_UNIT_TESTS
393#include "randomizer.hh"
394#include "unit_tests.hh"
395
396#include <deque>
397#include <boost/shared_ptr.hpp>
398
399using std::deque;
400using boost::shared_ptr;
401using randomizer::flip;
402using randomizer::uniform;
403using randomizer::bernoulli;
404
405struct
406refiner_pair
407{
408 // This structure acts as a mock netsync session. It's only purpose is to
409 // construct two refiners that are connected to one another, and route
410 // refinement calls back and forth between them.
411
412 struct
413 refiner_pair_callbacks : refiner_callbacks
414 {
415 refiner_pair & p;
416 bool is_client;
417 refiner_pair_callbacks(refiner_pair & p, bool is_client)
418 : p(p), is_client(is_client)
419 {}
420
421 virtual void queue_refine_cmd(refinement_type ty,
422 merkle_node const & our_node)
423 {
424 p.events.push_back(shared_ptr<msg>(new msg(is_client, ty, our_node)));
425 }
426
427 virtual void queue_done_cmd(netcmd_item_type ty,
428 size_t n_items)
429 {
430 p.events.push_back(shared_ptr<msg>(new msg(is_client, n_items)));
431 }
432 virtual ~refiner_pair_callbacks() {}
433 };
434
435 refiner_pair_callbacks client_cb;
436 refiner_pair_callbacks server_cb;
437 refiner client;
438 refiner server;
439
440 struct msg
441 {
442 msg(bool is_client, refinement_type ty, merkle_node const & node)
443 : op(refine),
444 ty(ty),
445 send_to_client(!is_client),
446 node(node)
447 {}
448
449 msg(bool is_client, size_t items)
450 : op(done),
451 send_to_client(!is_client),
452 n_items(items)
453 {}
454
455 enum { refine, done } op;
456 refinement_type ty;
457 bool send_to_client;
458 size_t n_items;
459 merkle_node node;
460 };
461
462 deque<shared_ptr<msg> > events;
463 size_t n_msgs;
464
465 void crank()
466 {
467
468 shared_ptr<msg> m = events.front();
469 events.pop_front();
470 ++n_msgs;
471
472 switch (m->op)
473 {
474
475 case msg::refine:
476 if (m->send_to_client)
477 client.process_refinement_command(m->ty, m->node);
478 else
479 server.process_refinement_command(m->ty, m->node);
480 break;
481
482 case msg::done:
483 if (m->send_to_client)
484 client.process_done_command(m->n_items);
485 else
486 server.process_done_command(m->n_items);
487 break;
488 }
489 }
490
491 refiner_pair(set<id> const & client_items,
492 set<id> const & server_items) :
493 client_cb(*this, true),
494 server_cb(*this, false),
495 // The item type here really doesn't matter.
496 client(file_item, client_voice, client_cb),
497 server(file_item, server_voice, server_cb),
498 n_msgs(0)
499 {
500 for (set<id>::const_iterator i = client_items.begin();
501 i != client_items.end(); ++i)
502 client.note_local_item(*i);
503
504 for (set<id>::const_iterator i = server_items.begin();
505 i != server_items.end(); ++i)
506 server.note_local_item(*i);
507
508 client.reindex_local_items();
509 server.reindex_local_items();
510 client.begin_refinement();
511
512 while (! events.empty())
513 crank();
514
515 // Refinement should have completed by here.
516 BOOST_CHECK(client.done);
517 BOOST_CHECK(server.done);
518
519 check_set_differences("client", client);
520 check_set_differences("server", server);
521 check_no_redundant_sends("client->server",
522 client.items_to_send,
523 server.get_local_items());
524 check_no_redundant_sends("server->client",
525 server.items_to_send,
526 client.get_local_items());
527 BOOST_CHECK(client.items_to_send.size() == server.items_to_receive);
528 BOOST_CHECK(server.items_to_send.size() == client.items_to_receive);
529 L(FL("stats: %d total, %d cs, %d sc, %d msgs")
530 % (server.items_to_send.size() + client.get_local_items().size())
531 % client.items_to_send.size()
532 % server.items_to_send.size()
533 % n_msgs);
534 }
535
536 void print_if_unequal(char const * context,
537 char const * name1,
538 set<id> const & set1,
539 char const * name2,
540 set<id> const & set2)
541 {
542 if (set1 != set2)
543 {
544 L(FL("WARNING: Unequal sets in %s!") % context);
545 for (set<id>::const_iterator i = set1.begin(); i != set1.end(); ++i)
546 {
547 hexenc<id> hid;
548 encode_hexenc(*i, hid);
549 L(FL("%s: %s") % name1 % hid);
550 }
551
552 for (set<id>::const_iterator i = set2.begin(); i != set2.end(); ++i)
553 {
554 hexenc<id> hid;
555 encode_hexenc(*i, hid);
556 L(FL("%s: %s") % name2 % hid);
557 }
558 L(FL("end of unequal sets"));
559 }
560 }
561
562 void check_no_redundant_sends(char const * context,
563 set<id> const & src,
564 set<id> const & dst)
565 {
566 for (set<id>::const_iterator i = src.begin(); i != src.end(); ++i)
567 {
568 set<id>::const_iterator j = dst.find(*i);
569 if (j != dst.end())
570 {
571 hexenc<id> hid;
572 encode_hexenc(*i, hid);
573 L(FL("WARNING: %s transmission will send redundant item %s")
574 % context % hid);
575 }
576 BOOST_CHECK(j == dst.end());
577 }
578 }
579
580 void check_set_differences(char const * context, refiner const & r)
581 {
582 set<id> tmp;
583 set_difference(r.get_local_items().begin(), r.get_local_items().end(),
584 r.get_peer_items().begin(), r.get_peer_items().end(),
585 inserter(tmp, tmp.begin()));
586 print_if_unequal(context,
587 "diff(local,peer)", tmp,
588 "items_to_send", r.items_to_send);
589
590 BOOST_CHECK(tmp == r.items_to_send);
591 }
592};
593
594
595void
596check_combinations_of_sets(set<id> const & s0,
597 set<id> const & a,
598 set<id> const & b)
599{
600 // Having composed our two input sets s0 and s1, we now construct the 2
601 // auxilary union-combinations of them -- {} and {s0 U s1} -- giving 4
602 // basic input sets. We then run 9 "interesting" pairwise combinations
603 // of these input sets.
604
605 set<id> e, u, v;
606 set_union(s0.begin(), s0.end(), a.begin(), a.end(), inserter(u, u.begin()));
607 set_union(s0.begin(), s0.end(), b.begin(), b.end(), inserter(v, v.begin()));
608
609 { refiner_pair x(e, u); } // a large initial transfer
610 { refiner_pair x(u, e); } // a large initial transfer
611
612 { refiner_pair x(s0, u); } // a mostly-shared superset/subset
613 { refiner_pair x(u, s0); } // a mostly-shared superset/subset
614
615 { refiner_pair x(a, u); } // a mostly-unshared superset/subset
616 { refiner_pair x(u, a); } // a mostly-unshared superset/subset
617
618 { refiner_pair x(u, v); } // things to send in both directions
619 { refiner_pair x(v, u); } // things to send in both directions
620
621 { refiner_pair x(u, u); } // a large no-op
622}
623
624
625void
626build_random_set(set<id> & s, size_t sz, bool clumpy)
627{
628 while (s.size() < sz)
629 {
630 string str(constants::merkle_hash_length_in_bytes, ' ');
631 for (size_t i = 0; i < constants::merkle_hash_length_in_bytes; ++i)
632 str[i] = static_cast<char>(uniform(0xff));
633 s.insert(id(str));
634 if (clumpy && flip())
635 {
636 size_t clumpsz = uniform(7) + 1;
637 size_t pos = flip() ? str.size() - 1 : uniform(str.size());
638 for (size_t i = 0; s.size() < sz && i < clumpsz; ++i)
639 {
640 char c = str[pos];
641 if (c == static_cast<char>(0xff))
642 break;
643 ++c;
644 str[pos] = c;
645 s.insert(id(str));
646 }
647 }
648 }
649}
650
651size_t
652perturbed(size_t n)
653{
654 // we sometimes perturb sizes to deviate a bit from natural word-multiple sizes
655 if (flip())
656 return n + uniform(5);
657 return n;
658}
659
660size_t
661modulated_size(size_t base_set_size, size_t i)
662{
663 if (i < 3)
664 return i+1;
665 else
666 return static_cast<size_t>((static_cast<double>(i - 2) / 5.0)
667 * static_cast<double>(base_set_size));
668}
669
670
671void
672check_with_count(size_t base_set_size)
673{
674 if (base_set_size == 0)
675 return;
676
677 L(FL("running refinement check with base set size %d") % base_set_size);
678
679 // Our goal here is to construct a base set of a given size, and two
680 // secondary sets which will be combined with the base set in various
681 // ways.
682 //
683 // The secondary sets will be built at the following sizes:
684 //
685 // 1 element
686 // 2 elements
687 // 3 elements
688 // 0.2 * size of base set
689 // 0.4 * size of base set
690 // 0.8 * size of base set
691 //
692 // The base set is constructed in both clumpy and non-clumpy forms,
693 // making 6 * 6 * 2 = 72 variations.
694 //
695 // Since each group of sets creates 9 sync scenarios, each "size" creates
696 // 648 sync scenarios.
697
698 for (size_t c = 0; c < 2; ++c)
699 {
700 set<id> s0;
701 build_random_set(s0, perturbed(base_set_size), c == 0);
702
703 for (size_t a = 0; a < 6; ++a)
704 {
705 set<id> sa;
706 build_random_set(sa, modulated_size(perturbed(base_set_size), a), false);
707
708 for (size_t b = 0; b < 6; ++b)
709 {
710 set<id> sb;
711 build_random_set(sb, modulated_size(perturbed(base_set_size), b), false);
712 check_combinations_of_sets(s0, sa, sb);
713 }
714 }
715 }
716}
717
718void
719check_various_counts()
720{
721 {
722 // Once with zero-zero, for good measure.
723 set<id> s0;
724 refiner_pair x(s0, s0);
725 }
726
727 // We run 3 primary counts, giving 1944 tests. Note that there is some
728 // perturbation within the test, so we're not likely to feel side effects
729 // of landing on such pleasant round numbers.
730
731 check_with_count(1);
732 check_with_count(128);
733 check_with_count(1024);
734}
735
736void
737add_refiner_tests(test_suite * suite)
738{
739 suite->add(BOOST_TEST_CASE(&check_various_counts));
740}
741
742#endif
743
744// Local Variables:
745// mode: C++
746// fill-column: 76
747// c-file-style: "gnu"
748// indent-tabs-mode: nil
749// End:
750// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status