monotone

monotone Mtn Source Tree

Root/netsync.cc

1// copyright (C) 2004 graydon hoare <graydon@pobox.com>
2// all rights reserved.
3// licensed to the public under the terms of the GNU GPL (>= 2)
4// see the file COPYING for details
5
6#include <map>
7#include <string>
8
9#include <time.h>
10
11#include <boost/dynamic_bitset.hpp>
12#include <boost/lexical_cast.hpp>
13#include <boost/scoped_ptr.hpp>
14#include <boost/shared_ptr.hpp>
15
16#include "app_state.hh"
17#include "cert.hh"
18#include "constants.hh"
19#include "keys.hh"
20#include "merkle_tree.hh"
21#include "netcmd.hh"
22#include "netio.hh"
23#include "netsync.hh"
24#include "numeric_vocab.hh"
25#include "patch_set.hh"
26#include "sanity.hh"
27#include "transforms.hh"
28#include "ui.hh"
29#include "xdelta.hh"
30
31#include "cryptopp/osrng.h"
32
33#include "Netxx/Address.h"
34#include "Netxx/Peer.h"
35#include "Netxx/Probe.h"
36#include "Netxx/Stream.h"
37#include "Netxx/StreamServer.h"
38#include "Netxx/Timeout.h"
39#include "Netxx/Types.h"
40
41//
42// this is the "new" network synchronization (netsync) system in
43// monotone. it is based on synchronizing a pair of merkle trees over an
44// interactive connection.
45//
46// a netsync process between peers treats each peer as either a source, a
47// sink, or both. when a peer is only a source, it will not write any new
48// items to its database. when a peer is only a sink, it will not send any
49// items from its database. when a peer is both a source and sink, it may
50// send and write items freely.
51//
52// the post-state of a netsync is that each sink contains a superset of the
53// items in its corresponding source; when peers are behaving as both
54// source and sink, this means that the post-state of the sync is for the
55// peers to have identical item sets.
56//
57// a peer can be a sink in at most one netsync process at a time; it can
58// however be a source for multiple netsyncs simultaneously.
59//
60//
61// data structure
62// --------------
63//
64// each node in a merkle tree contains a fixed number of slots. this number
65// is derived from a global parameter of the protocol -- the tree fanout --
66// such that the number of slots is 2^fanout. for now we will assume that
67// fanout is 4 thus there are 16 slots in a node, because this makes
68// illustration easier. the other parameter of the protocol is the size of
69// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
70//
71// each slot in a merkle tree node is in one of 4 states:
72//
73// - empty
74// - live leaf
75// - dead leaf
76// - subtree
77//
78// in addition, each live or dead leaf contains a hash code which
79// identifies an element of the set being synchronized. each subtree slot
80// contains a hash code of the node immediately beneath it in the merkle
81// tree. empty slots contain no hash codes.
82//
83// each node also summarizes, for sake of statistic-gathering, the number
84// of set elements and total number of bytes in all of its subtrees, each
85// stored as a size_t and sent as a uleb128.
86//
87// since empty slots have no hash code, they are represented implicitly by
88// a bitmap at the head of each merkle tree node. as an additional
89// integrity check, each merkle tree node contains a label indicating its
90// prefix in the tree, and a hash of its own contents.
91//
92// in total, then, the byte-level representation of a <160,4> merkle tree
93// node is as follows:
94//
95// 20 bytes - hash of the remaining bytes in the node
96// 1 byte - type of this node (manifest, file, key, mcert, fcert)
97// 1-N bytes - level of this node in the tree (0 == "root", uleb128)
98// 0-20 bytes - the prefix of this node, 4 bits * level,
99// rounded up to a byte
100// 1-N bytes - number of leaves under this node (uleb128)
101// 4 bytes - slot-state bitmap of the node
102// 0-320 bytes - between 0 and 16 live slots in the node
103//
104// so, in the worst case such a node is 367 bytes, with these parameters.
105//
106//
107// protocol
108// --------
109//
110// the protocol is a simple binary command-packet system over tcp; each
111// packet consists of a byte which identifies the protocol version, a byte
112// which identifies the command name inside that version, a size_t sent as
113// a uleb128 indicating the length of the packet, and then that many bytes
114// of payload, and finally 4 bytes of adler32 checksum (in LSB order) over
115// the payload. decoding involves simply buffering until a sufficient
116// number of bytes are received, then advancing the buffer pointer. any
117// time an adler32 check fails, the protocol is assumed to have lost
118// synchronization, and the connection is dropped. the parties are free to
119// drop the tcp stream at any point, if too much data is received or too
120// much idle time passes; no commitments or transactions are made.
121//
122// one special command, "bye", is used to shut down a connection
123// gracefully. once each side has received all the data they want, they
124// can send a "bye" command to the other side. as soon as either side has
125// both sent and received a "bye" command, they drop the connection. if
126// either side sees an i/o failure (dropped connection) after they have
127// sent a "bye" command, they consider the shutdown successful.
128//
129// the exchange begins in a non-authenticated state. the server sends a
130// "hello <id> <nonce>" command, which identifies the server's RSA key and
131// issues a nonce which must be used for a subsequent authentication.
132//
133// the client can then respond with an "auth (source|sink|both)
134// <collection> <id> <nonce1> <nonce2> <sig>" command which identifies its
135// RSA key, notes the role it wishes to play in the synchronization,
136// identifies the collection it wishes to sync with, signs the previous
137// nonce with its own key, and issues a nonce of its own for mutual
138// authentication.
139//
140// the server can then respond with a "confirm <sig>" command, which is
141// the signature of the second nonce sent by the client. this
142// transitions the peers into an authenticated state and begins refinement.
143//
144// refinement begins with the client sending its root public key and
145// manifest certificate merkle nodes to the server. the server then
146// compares the root to each slot in *its* root node, and for each slot
147// either sends refined subtrees to the client, or (if it detects a missing
148// item in one collection or the other) sends either "data" or "send_data"
149// commands corresponding to the role of the missing item (source or
150// sink). the client then receives each refined subtree and compares it
151// with its own, performing similar description/request behavior depending
152// on role, and the cycle continues.
153//
154// detecting the end of refinement is subtle: after sending the refinement
155// of the root node, the server sends a "done 0" command (queued behind all
156// the other refinement traffic). when either peer receives a "done N"
157// command it immediately responds with a "done N+1" command. when two done
158// commands for a given merkle tree arrive with no interveining refinements,
159// the entire merkle tree is considered complete.
160//
161// any "send_data" command received prompts a "data" command in response,
162// if the requested item exists. if an item does not exist, a "nonexistant"
163// response command is sent.
164//
165// once a response is received for each requested key and manifest cert
166// (either data or nonexistant) the requesting party walks the graph of
167// received manifest certs and transmits send_data or send_delta commands
168// for all the manifests mentionned in the certs which it does not already
169// have in its database.
170//
171// for each manifest edge it receives, the recipient builds a patch_set
172// out of the manifests and then requests all the file data or deltas
173// described in that patch_set.
174//
175// once all requested files, manifests and certs are received (or noted as
176// nonexistant), the recipient closes its connection.
177//
178// (aside: this protocol is raw binary because coding density is actually
179// important here, and each packet consists of very information-dense
180// material that you wouldn't have a hope of typing in manually anyways)
181//
182
183using namespace Netxx;
184using namespace boost;
185using namespace std;
186
187static inline void require(bool check, string const & context)
188{
189 if (!check)
190 throw bad_decode(F("check of '%s' failed") % context);
191}
192
193struct done_marker
194{
195 bool current_level_had_refinements;
196 bool tree_is_done;
197 done_marker() :
198 current_level_had_refinements(false),
199 tree_is_done(false)
200 {}
201};
202
203struct session
204{
205 protocol_role const role;
206 protocol_voice const voice;
207 vector<utf8> const & collections;
208 set<string> const & all_collections;
209 app_state & app;
210
211 string peer_id;
212 socket_type fd;
213 Stream stream;
214
215 string inbuf;
216 string outbuf;
217
218 netcmd cmd;
219 bool armed;
220 bool arm();
221
222 utf8 collection;
223 id remote_peer_key_hash;
224 bool authenticated;
225
226 time_t last_io_time;
227
228 map<netcmd_item_type, done_marker> done_refinements;
229 set< pair<netcmd_item_type, id> > requested_items;
230 multimap<id,id> ancestry_edges;
231
232 id saved_nonce;
233 bool received_goodbye;
234 bool sent_goodbye;
235 boost::scoped_ptr<CryptoPP::AutoSeededRandomPool> prng;
236
237 session(protocol_role role,
238 protocol_voice voice,
239 vector<utf8> const & collections,
240 set<string> const & all_collections,
241 app_state & app,
242 string const & peer,
243 socket_type sock,
244 Timeout const & to);
245
246 id mk_nonce();
247 void mark_recent_io();
248 bool done_all_refinements();
249 bool got_all_data();
250 void maybe_say_goodbye();
251 void analyze_ancestry_graph();
252 void analyze_manifest(manifest_map const & man);
253 void analyze_manifest_edge(manifest_map const & parent,
254 manifest_map const & child);
255 void request_manifests_recursive(id const & i, set<id> & visited);
256
257 Probe::ready_type which_events() const;
258 bool read_some(ticker * t = NULL);
259 bool write_some(ticker * t = NULL);
260 void update_merkle_trees(netcmd_item_type type,
261 hexenc<id> const & hident,
262 bool live_p);
263
264 void queue_bye_cmd();
265 void queue_error_cmd(string const & errmsg);
266 void queue_done_cmd(size_t level, netcmd_item_type type);
267 void queue_hello_cmd(id const & server,
268 id const & nonce);
269 void queue_anonymous_cmd(protocol_role role,
270 string const & collection,
271 id const & nonce2);
272 void queue_auth_cmd(protocol_role role,
273 string const & collection,
274 id const & client,
275 id const & nonce1,
276 id const & nonce2,
277 string const & signature);
278 void queue_confirm_cmd(string const & signature);
279 void queue_refine_cmd(merkle_node const & node);
280 void queue_send_data_cmd(netcmd_item_type type,
281 id const & item);
282 void queue_send_delta_cmd(netcmd_item_type type,
283 id const & base,
284 id const & ident);
285 void queue_data_cmd(netcmd_item_type type,
286 id const & item,
287 string const & dat);
288 void queue_delta_cmd(netcmd_item_type type,
289 id const & base,
290 id const & ident,
291 delta const & del);
292 void queue_nonexistant_cmd(netcmd_item_type type,
293 id const & item);
294
295 bool process_bye_cmd();
296 bool process_error_cmd(string const & errmsg);
297 bool process_done_cmd(size_t level, netcmd_item_type type);
298 bool process_hello_cmd(id const & server,
299 id const & nonce);
300 bool process_anonymous_cmd(protocol_role role,
301 string const & collection,
302 id const & nonce2);
303 bool process_auth_cmd(protocol_role role,
304string const & collection,
305id const & client,
306id const & nonce1,
307id const & nonce2,
308string const & signature);
309 bool process_confirm_cmd(string const & signature);
310 bool process_refine_cmd(merkle_node const & node);
311 bool process_send_data_cmd(netcmd_item_type type,
312 id const & item);
313 bool process_send_delta_cmd(netcmd_item_type type,
314 id const & base,
315 id const & ident);
316 bool process_data_cmd(netcmd_item_type type,
317id const & item,
318string const & dat);
319 bool process_delta_cmd(netcmd_item_type type,
320 id const & base,
321 id const & ident,
322 delta const & del);
323 bool process_nonexistant_cmd(netcmd_item_type type,
324 id const & item);
325
326
327 bool dispatch_payload(netcmd const & cmd);
328 void begin_service();
329 bool process();
330};
331
332static inline string tohex(string const & s)
333{
334 return lowercase(xform<CryptoPP::HexEncoder>(s));
335}
336
337
338struct root_prefix
339{
340 hexenc<prefix> val;
341 root_prefix()
342 {
343 encode_hexenc(prefix(""), val);
344 }
345};
346static root_prefix ROOT_PREFIX;
347
348
349session::session(protocol_role role,
350 protocol_voice voice,
351 vector<utf8> const & collections,
352 set<string> const & all_coll,
353 app_state & app,
354 string const & peer,
355 socket_type sock,
356 Timeout const & to) :
357 role(role),
358 voice(voice),
359 collections(collections),
360 all_collections(all_coll),
361 app(app),
362 peer_id(peer),
363 fd(sock),
364 stream(sock, to),
365 inbuf(""),
366 outbuf(""),
367 armed(false),
368 collection(""),
369 remote_peer_key_hash(""),
370 authenticated(false),
371 last_io_time(::time(NULL)),
372 saved_nonce(""),
373 received_goodbye(false),
374 sent_goodbye(false)
375{
376 if (voice == client_voice)
377 {
378 N(collections.size() == 1,
379"client can only sync one collection at a time");
380 this->collection = idx(collections, 0);
381 }
382
383 // we will panic here if the user doesn't like urandom and we can't give
384 // them a real entropy-driven random.
385 bool request_blocking_rng = false;
386 if (!app.lua.hook_non_blocking_rng_ok())
387 {
388#ifndef BLOCKING_RNG_AVAILABLE
389 throw oops("no blocking RNG available and non-blocking RNG rejected");
390#else
391 request_blocking_rng = true;
392#endif
393 }
394 prng.reset(new CryptoPP::AutoSeededRandomPool(request_blocking_rng));
395
396 done_refinements.insert(make_pair(mcert_item, done_marker()));
397 done_refinements.insert(make_pair(fcert_item, done_marker()));
398 done_refinements.insert(make_pair(key_item, done_marker()));
399}
400
401id session::mk_nonce()
402{
403 I(this->saved_nonce().size() == 0);
404 char buf[constants::merkle_hash_length_in_bytes];
405 prng->GenerateBlock(reinterpret_cast<byte *>(buf), constants::merkle_hash_length_in_bytes);
406 this->saved_nonce = string(buf, buf + constants::merkle_hash_length_in_bytes);
407 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
408 return this->saved_nonce;
409}
410
411void session::mark_recent_io()
412{
413 last_io_time = ::time(NULL);
414}
415
416bool session::done_all_refinements()
417{
418 bool all = true;
419 for(map< netcmd_item_type, done_marker>::const_iterator j = done_refinements.begin();
420 j != done_refinements.end(); ++j)
421 {
422 if (j->second.tree_is_done == false)
423all = false;
424 }
425 return all;
426}
427
428bool session::got_all_data()
429{
430 return requested_items.empty();
431}
432
433struct always_true_version_check :
434 public version_existence_check
435{
436 virtual bool check(file_id i)
437 {
438 return true;
439 }
440};
441
442void session::analyze_manifest_edge(manifest_map const & parent,
443 manifest_map const & child)
444{
445 L(F("analyzing %d parent, %d child manifest entries\n")
446 % parent.size() % child.size());
447
448 always_true_version_check atc;
449 patch_set ps;
450 manifests_to_patch_set(parent, child, this->app, atc, ps);
451
452 for (set<patch_delta>::const_iterator i = ps.f_deltas.begin();
453 i != ps.f_deltas.end(); ++i)
454 {
455 if (this->app.db.file_version_exists(i->id_new))
456{
457 L(F("file delta target '%s' already exists on our side\n"));
458}
459 else
460{
461 L(F("requesting file delta from '%s' -> '%s' (path %s)\n")
462 % i->id_old % i->id_new % i->path);
463 id tmp1, tmp2;
464 decode_hexenc(i->id_old.inner(), tmp1);
465 decode_hexenc(i->id_new.inner(), tmp2);
466 queue_send_delta_cmd(file_item, tmp1, tmp2);
467}
468 }
469
470 for (set<patch_addition>::const_iterator i = ps.f_adds.begin();
471 i != ps.f_adds.end(); ++i)
472 {
473 if (this->app.db.file_version_exists(i->ident))
474{
475 L(F("added file version '%s' already exists on our side\n"));
476}
477 else
478{
479 L(F("requesting missing data for file '%s' (path %s)\n")
480 % i->ident % i->path);
481 id tmp;
482 decode_hexenc(i->ident.inner(), tmp);
483 queue_send_data_cmd(file_item, tmp);
484}
485 }
486}
487
488void session::analyze_manifest(manifest_map const & man)
489{
490 L(F("analyzing %d entries in manifest\n") % man.size());
491 for (manifest_map::const_iterator i = man.begin();
492 i != man.end(); ++i)
493 {
494 path_id_pair pip(i);
495 if (! this->app.db.file_version_exists(pip.ident()))
496{
497 id tmp;
498 decode_hexenc(pip.ident().inner(), tmp);
499 queue_send_data_cmd(file_item, tmp);
500}
501 }
502}
503
504void session::request_manifests_recursive(id const & i, set<id> & visited)
505{
506 if (visited.find(i) != visited.end())
507 return;
508
509 visited.insert(i);
510
511 hexenc<id> hid;
512 encode_hexenc(i, hid);
513
514 L(F("visiting manifest '%s'\n") % hid);
515
516 typedef multimap<id,id>::const_iterator ite;
517
518 if (ancestry_edges.find(i) == ancestry_edges.end())
519 {
520 // we are at a root, request full data
521 if (this->app.db.manifest_version_exists(manifest_id(hid)))
522{
523 L(F("not requesting manifest '%s' as we already have it\n") % hid);
524}
525 else
526{
527 queue_send_data_cmd(manifest_item, i);
528}
529 }
530 else
531 {
532 // first make sure we've requested enough to get to here by
533 // calling ourselves recursively
534 pair<ite,ite> range = ancestry_edges.equal_range(i);
535 for (ite p = range.first; p != range.second; ++p)
536{
537 id const & child = p->first;
538 id const & parent = p->second;
539 I(i == child);
540 request_manifests_recursive(parent, visited);
541}
542
543 // then perhaps request the edge that leads from an arbitrary parent
544 // to here. we'll pick the first parent, why not?
545 id const & child = range.first->first;
546 id const & parent = range.first->second;
547 I(i == child);
548 if (this->app.db.manifest_version_exists(manifest_id(hid)))
549{
550 L(F("not requesting manifest delta to '%s' as we already have it\n") % hid);
551}
552 else
553{
554 queue_send_delta_cmd(manifest_item, parent, child);
555}
556 }
557}
558
559void session::analyze_ancestry_graph()
560{
561 set<id> heads;
562
563 L(F("analyzing %d ancestry edges\n") % ancestry_edges.size());
564
565 // each ancestry edge goes from child -> parent
566
567 for (multimap<id,id>::const_iterator i = ancestry_edges.begin();
568 i != ancestry_edges.end(); ++i)
569 {
570 // first we add all children we're aware of to the heads set
571 heads.insert(i->first);
572 }
573
574 for (multimap<id,id>::const_iterator i = ancestry_edges.begin();
575 i != ancestry_edges.end(); ++i)
576 {
577 // then we remove any which are also parents
578 heads.erase(i->second);
579 }
580
581 L(F("isolated %d heads\n") % heads.size());
582
583 // then we walk the graph upwards, recursively, starting from
584 // each of the heads
585
586 set<id> visited;
587 for (set<id>::const_iterator i = heads.begin();
588 i != heads.end(); ++i)
589 {
590 hexenc<id> hid;
591 encode_hexenc(*i, hid);
592 L(F("walking upwards from '%s'\n") % hid);
593 request_manifests_recursive(*i, visited);
594 }
595}
596
597Probe::ready_type session::which_events() const
598{
599 if (outbuf.empty())
600 {
601 if (inbuf.size() < constants::netcmd_maxsz)
602return Probe::ready_read | Probe::ready_oobd;
603 else
604return Probe::ready_oobd;
605 }
606 else
607 {
608 if (inbuf.size() < constants::netcmd_maxsz)
609return Probe::ready_write | Probe::ready_read | Probe::ready_oobd;
610 else
611return Probe::ready_write | Probe::ready_oobd;
612 }
613}
614
615bool session::read_some(ticker * tick)
616{
617 I(inbuf.size() < constants::netcmd_maxsz);
618 char tmp[constants::bufsz];
619 signed_size_type count = stream.read(tmp, sizeof(tmp));
620 if(count > 0)
621 {
622 L(F("read %d bytes from fd %d (peer %s)\n") % count % fd % peer_id);
623 inbuf.append(string(tmp, tmp + count));
624 mark_recent_io();
625 if (tick != NULL)
626(*tick) += count;
627 return true;
628 }
629 else
630 return false;
631}
632
633bool session::write_some(ticker * tick)
634{
635 I(!outbuf.empty());
636 signed_size_type count = stream.write(outbuf.data(),
637std::min(outbuf.size(), constants::bufsz));
638 if(count > 0)
639 {
640 outbuf.erase(0, count);
641 L(F("wrote %d bytes to fd %d (peer %s), %d remain in output buffer\n")
642% count % fd % peer_id % outbuf.size());
643 mark_recent_io();
644 if (tick != NULL)
645(*tick) += count;
646 return true;
647 }
648 else
649 return false;
650}
651
652// senders
653
654void session::queue_bye_cmd()
655{
656 L(F("queueing 'bye' command\n"));
657 netcmd cmd;
658 cmd.cmd_code = bye_cmd;
659 write_netcmd(cmd, outbuf);
660 this->sent_goodbye = true;
661}
662
663void session::queue_error_cmd(string const & errmsg)
664{
665 L(F("queueing 'error' command\n"));
666 netcmd cmd;
667 cmd.cmd_code = error_cmd;
668 write_error_cmd_payload(errmsg, cmd.payload);
669 write_netcmd(cmd, outbuf);
670}
671
672void session::queue_done_cmd(size_t level, netcmd_item_type type)
673{
674 string typestr;
675 netcmd_item_type_to_string(type, typestr);
676 L(F("queueing 'done' command for %s level %s\n") % typestr % level);
677 netcmd cmd;
678 cmd.cmd_code = done_cmd;
679 write_done_cmd_payload(level, type, cmd.payload);
680 write_netcmd(cmd, outbuf);
681}
682
683void session::queue_hello_cmd(id const & server,
684 id const & nonce)
685{
686 netcmd cmd;
687 cmd.cmd_code = hello_cmd;
688 write_hello_cmd_payload(server, nonce, cmd.payload);
689 write_netcmd(cmd, outbuf);
690}
691
692void session::queue_anonymous_cmd(protocol_role role,
693 string const & collection,
694 id const & nonce2)
695{
696 netcmd cmd;
697 cmd.cmd_code = anonymous_cmd;
698 write_anonymous_cmd_payload(role, collection, nonce2, cmd.payload);
699 write_netcmd(cmd, outbuf);
700}
701
702void session::queue_auth_cmd(protocol_role role,
703 string const & collection,
704 id const & client,
705 id const & nonce1,
706 id const & nonce2,
707 string const & signature)
708{
709 netcmd cmd;
710 cmd.cmd_code = auth_cmd;
711 write_auth_cmd_payload(role, collection, client,
712 nonce1, nonce2, signature,
713 cmd.payload);
714 write_netcmd(cmd, outbuf);
715}
716
717void session::queue_confirm_cmd(string const & signature)
718{
719 netcmd cmd;
720 cmd.cmd_code = confirm_cmd;
721 write_confirm_cmd_payload(signature, cmd.payload);
722 write_netcmd(cmd, outbuf);
723}
724
725void session::queue_refine_cmd(merkle_node const & node)
726{
727 string typestr;
728 hexenc<prefix> hpref;
729 node.get_hex_prefix(hpref);
730 netcmd_item_type_to_string(node.type, typestr);
731 L(F("queueing request for refinement of %s node '%s', level %d\n")
732 % typestr % hpref % static_cast<int>(node.level));
733 netcmd cmd;
734 cmd.cmd_code = refine_cmd;
735 write_refine_cmd_payload(node, cmd.payload);
736 write_netcmd(cmd, outbuf);
737}
738
739void session::queue_send_data_cmd(netcmd_item_type type,
740 id const & item)
741{
742
743 string typestr;
744 netcmd_item_type_to_string(type, typestr);
745
746 if (this->requested_items.find(make_pair(type, item)) !=
747 this->requested_items.end())
748 {
749 hexenc<id> hid;
750 encode_hexenc(item, hid);
751 L(F("not queueing request for %s '%s' as we already requested it\n")
752% typestr % hid);
753 return;
754 }
755
756 L(F("queueing request for data of %s item '%s'\n")
757 % typestr % tohex(item()));
758 netcmd cmd;
759 cmd.cmd_code = send_data_cmd;
760 write_send_data_cmd_payload(type, item, cmd.payload);
761 write_netcmd(cmd, outbuf);
762 this->requested_items.insert(make_pair(type, item));
763}
764
765void session::queue_send_delta_cmd(netcmd_item_type type,
766 id const & base,
767 id const & ident)
768{
769
770 string typestr;
771 netcmd_item_type_to_string(type, typestr);
772 I(type == manifest_item || type == file_item);
773
774 if (this->requested_items.find(make_pair(type, ident)) !=
775 this->requested_items.end())
776 {
777 hexenc<id> base_hid;
778 encode_hexenc(base, base_hid);
779 hexenc<id> ident_hid;
780 encode_hexenc(ident, ident_hid);
781 L(F("not queueing request for %s delta '%s' -> '%s' as we already requested the target\n")
782% typestr % base_hid % ident_hid);
783 return;
784 }
785
786 L(F("queueing request for contents of %s delta '%s' -> '%s'\n")
787 % typestr % tohex(base()) % tohex(ident()));
788 netcmd cmd;
789 cmd.cmd_code = send_delta_cmd;
790 write_send_delta_cmd_payload(type, base, ident, cmd.payload);
791 write_netcmd(cmd, outbuf);
792 this->requested_items.insert(make_pair(type, ident));
793}
794
795void session::queue_data_cmd(netcmd_item_type type,
796 id const & item,
797 string const & dat)
798{
799 string typestr;
800 netcmd_item_type_to_string(type, typestr);
801 L(F("queueing %d bytes of data for %s item '%s'\n")
802 % dat.size() % typestr % tohex(item()));
803 netcmd cmd;
804 cmd.cmd_code = data_cmd;
805 write_data_cmd_payload(type, item, dat, cmd.payload);
806 write_netcmd(cmd, outbuf);
807}
808
809void session::queue_delta_cmd(netcmd_item_type type,
810 id const & base,
811 id const & ident,
812 delta const & del)
813{
814 string typestr;
815 netcmd_item_type_to_string(type, typestr);
816 L(F("queueing %s delta '%s' -> '%s'\n")
817 % typestr % tohex(base()) % tohex(ident()));
818 netcmd cmd;
819 cmd.cmd_code = delta_cmd;
820 write_delta_cmd_payload(type, base, ident, del, cmd.payload);
821 write_netcmd(cmd, outbuf);
822}
823
824void session::queue_nonexistant_cmd(netcmd_item_type type,
825 id const & item)
826{
827 string typestr;
828 netcmd_item_type_to_string(type, typestr);
829 L(F("queueing note of nonexistance of %s item '%s'\n")
830 % typestr % tohex(item()));
831 netcmd cmd;
832 cmd.cmd_code = nonexistant_cmd;
833 write_nonexistant_cmd_payload(type, item, cmd.payload);
834 write_netcmd(cmd, outbuf);
835}
836
837// processors
838
839bool session::process_bye_cmd()
840{
841 L(F("received 'bye' netcmd\n"));
842 this->received_goodbye = true;
843 return true;
844}
845
846bool session::process_error_cmd(string const & errmsg)
847{
848 W(F("received network error: %s\n") % errmsg);
849 this->received_goodbye = true;
850 return true;
851}
852
853bool session::process_done_cmd(size_t level, netcmd_item_type type)
854{
855
856 map< netcmd_item_type, done_marker>::iterator i = done_refinements.find(type);
857 I(i != done_refinements.end());
858
859 string typestr;
860 netcmd_item_type_to_string(type, typestr);
861
862 if ((! i->second.current_level_had_refinements) || (level >= 0xff))
863 {
864 // we received *no* refinements on this level -- or we ran out of
865 // levels -- so refinement for this type is finished.
866 L(F("received 'done' for empty %s level %d, marking as complete\n")
867% typestr % static_cast<int>(level));
868
869 // possibly echo it back one last time, for shutdown purposes
870 if (!i->second.tree_is_done)
871queue_done_cmd(level + 1, type);
872
873 // tombstone it
874 i->second.current_level_had_refinements = false;
875 i->second.tree_is_done = true;
876
877 // if it's mcerts, look over the ancestry graph
878 if (type == mcert_item)
879analyze_ancestry_graph();
880 }
881
882 else if (i->second.current_level_had_refinements
883 && (! i->second.tree_is_done))
884 {
885 // we *did* receive some refinements on this level, reset to zero and
886 // queue an echo of the 'done' marker.
887 L(F("received 'done' for %s level %d, which had refinements; "
888 "sending echo of done for level %d\n")
889% typestr
890% static_cast<int>(level)
891% static_cast<int>(level + 1));
892 i->second.current_level_had_refinements = false;
893 queue_done_cmd(level + 1, type);
894 return true;
895 }
896 return true;
897}
898
899bool session::process_hello_cmd(id const & server,
900id const & nonce)
901{
902 I(this->remote_peer_key_hash().size() == 0);
903 I(this->saved_nonce().size() == 0);
904
905 hexenc<id> hnonce;
906 encode_hexenc(nonce, hnonce);
907 hexenc<id> their_key_hash;
908 encode_hexenc(server, their_key_hash);
909
910 L(F("received 'hello' netcmd from server '%s' with nonce '%s'\n")
911 % their_key_hash % hnonce);
912
913 if (app.db.public_key_exists(their_key_hash))
914 {
915 // save their identity
916 this->remote_peer_key_hash = server;
917
918 if (app.signing_key() != "")
919{
920 // get our public key for its hash identifier
921 base64<rsa_pub_key> our_pub;
922 hexenc<id> our_key_hash;
923 id our_key_hash_raw;
924 app.db.get_key(app.signing_key, our_pub);
925 key_hash_code(app.signing_key, our_pub, our_key_hash);
926 decode_hexenc(our_key_hash, our_key_hash_raw);
927
928 // get our private key and make a signature
929 base64<rsa_sha1_signature> sig;
930 rsa_sha1_signature sig_raw;
931 base64< arc4<rsa_priv_key> > our_priv;
932 app.db.get_key(app.signing_key, our_priv);
933 make_signature(app.lua, app.signing_key, our_priv, nonce(), sig);
934 decode_base64(sig, sig_raw);
935
936 // make a new nonce of our own and send off the 'auth'
937 queue_auth_cmd(this->role, this->collection(), our_key_hash_raw,
938 nonce, mk_nonce(), sig_raw());
939}
940 else
941{
942 queue_anonymous_cmd(this->role, this->collection(), mk_nonce());
943}
944 return true;
945 }
946 else
947 {
948 W(F("unknown server key\n"));
949 }
950 return false;
951}
952
953bool session::process_anonymous_cmd(protocol_role role,
954 string const & collection,
955 id const & nonce2)
956{
957 hexenc<id> hnonce2;
958 encode_hexenc(nonce2, hnonce2);
959
960 L(F("received 'anonymous' netcmd from client for collection '%s' "
961 "in %s mode with nonce2 '%s'\n")
962 % collection % (role == source_and_sink_role ? "source and sink" :
963 (role == source_role ? "source " : "sink"))
964 % hnonce2);
965
966 // check they're asking for a collection we're serving
967 bool collection_ok = false;
968 for (vector<utf8>::const_iterator i = collections.begin();
969 i != collections.end(); ++i)
970 {
971 if (*i == collection)
972{
973 collection_ok = true;
974 break;
975}
976 }
977 if (!collection_ok)
978 {
979 W(F("not currently serving requested collection '%s'\n") % collection);
980 this->saved_nonce = id("");
981 return false;
982 }
983
984 //
985 // internally netsync thinks in terms of sources and sinks. users like
986 // thinking of repositories as "readonly", "readwrite", or "writeonly".
987 //
988 // we therefore use the read/write terminology when dealing with the UI:
989 // if the user asks to run a "read only" service, this means they are
990 // willing to be a source but not a sink.
991 //
992 // nb: the "role" here is the role the *client* wants to play
993 // so we need to check that the opposite role is allowed for us,
994 // in our this->role field.
995 //
996
997 if (role != sink_role)
998 {
999 W(F("rejected attempt at anonymous connection for write\n"));
1000 this->saved_nonce = id("");
1001 return false;
1002 }
1003
1004 if (! ((this->role == source_role || this->role == source_and_sink_role)
1005 && app.lua.hook_get_netsync_anonymous_read_permitted(collection)))
1006 {
1007 W(F("anonymous read permission denied for '%s'\n") % collection);
1008 this->saved_nonce = id("");
1009 return false;
1010 }
1011
1012 // get our private key and sign back
1013 L(F("anonymous read permitted, signing back nonce\n"));
1014 base64<rsa_sha1_signature> sig;
1015 rsa_sha1_signature sig_raw;
1016 base64< arc4<rsa_priv_key> > our_priv;
1017 app.db.get_key(app.signing_key, our_priv);
1018 make_signature(app.lua, app.signing_key, our_priv, nonce2(), sig);
1019 decode_base64(sig, sig_raw);
1020 queue_confirm_cmd(sig_raw());
1021 this->collection = collection;
1022 this->authenticated = true;
1023 return true;
1024}
1025
1026bool session::process_auth_cmd(protocol_role role,
1027 string const & collection,
1028 id const & client,
1029 id const & nonce1,
1030 id const & nonce2,
1031 string const & signature)
1032{
1033 I(this->remote_peer_key_hash().size() == 0);
1034 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1035
1036 hexenc<id> hnonce1, hnonce2;
1037 encode_hexenc(nonce1, hnonce1);
1038 encode_hexenc(nonce2, hnonce2);
1039 hexenc<id> their_key_hash;
1040 encode_hexenc(client, their_key_hash);
1041
1042 L(F("received 'auth' netcmd from client '%s' for collection '%s' "
1043 "in %s mode with nonce1 '%s' and nonce2 '%s'\n")
1044 % their_key_hash % collection % (role == source_and_sink_role ? "source and sink" :
1045 (role == source_role ? "source " : "sink"))
1046 % hnonce1 % hnonce2);
1047
1048 // check that they replied with the nonce we asked for
1049 if (!(nonce1 == this->saved_nonce))
1050 {
1051 W(F("detected replay attack in auth netcmd\n"));
1052 this->saved_nonce = id("");
1053 return false;
1054 }
1055
1056 // check they're asking for a collection we're serving
1057 bool collection_ok = false;
1058 for (vector<utf8>::const_iterator i = collections.begin();
1059 i != collections.end(); ++i)
1060 {
1061 if (*i == collection)
1062{
1063 collection_ok = true;
1064 break;
1065}
1066 }
1067 if (!collection_ok)
1068 {
1069 W(F("not currently serving requested collection '%s'\n") % collection);
1070 this->saved_nonce = id("");
1071 return false;
1072 }
1073
1074 //
1075 // internally netsync thinks in terms of sources and sinks. users like
1076 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1077 //
1078 // we therefore use the read/write terminology when dealing with the UI:
1079 // if the user asks to run a "read only" service, this means they are
1080 // willing to be a source but not a sink.
1081 //
1082 // nb: the "role" here is the role the *client* wants to play
1083 // so we need to check that the opposite role is allowed for us,
1084 // in our this->role field.
1085 //
1086
1087 if (!app.db.public_key_exists(their_key_hash))
1088 {
1089 W(F("unknown key hash '%s'\n") % their_key_hash);
1090 this->saved_nonce = id("");
1091 return false;
1092 }
1093
1094 // get their public key
1095 rsa_keypair_id their_id;
1096 base64<rsa_pub_key> their_key;
1097 app.db.get_pubkey(their_key_hash, their_id, their_key);
1098
1099 if (role == sink_role || role == source_and_sink_role)
1100 {
1101 if (! ((this->role == source_role || this->role == source_and_sink_role)
1102 && app.lua.hook_get_netsync_read_permitted(collection,
1103their_id())))
1104{
1105 W(F("read permission denied for '%s'\n") % collection);
1106 this->saved_nonce = id("");
1107 return false;
1108}
1109 }
1110
1111 if (role == source_role || role == source_and_sink_role)
1112 {
1113 if (! ((this->role == sink_role || this->role == source_and_sink_role)
1114 && app.lua.hook_get_netsync_write_permitted(collection,
1115 their_id())))
1116{
1117 W(F("write permission denied for '%s'\n") % collection);
1118 this->saved_nonce = id("");
1119 return false;
1120}
1121 }
1122
1123 // save their identity
1124 this->remote_peer_key_hash = client;
1125
1126 // check the signature
1127 base64<rsa_sha1_signature> sig;
1128 encode_base64(rsa_sha1_signature(signature), sig);
1129 if (check_signature(app.lua, their_id, their_key, nonce1(), sig))
1130 {
1131 // get our private key and sign back
1132 L(F("client signature OK, accepting authentication\n"));
1133 base64<rsa_sha1_signature> sig;
1134 rsa_sha1_signature sig_raw;
1135 base64< arc4<rsa_priv_key> > our_priv;
1136 app.db.get_key(app.signing_key, our_priv);
1137 make_signature(app.lua, app.signing_key, our_priv, nonce2(), sig);
1138 decode_base64(sig, sig_raw);
1139 queue_confirm_cmd(sig_raw());
1140 this->collection = collection;
1141 this->authenticated = true;
1142 return true;
1143 }
1144 else
1145 {
1146 W(F("bad client signature\n"));
1147 }
1148 return false;
1149}
1150
1151bool session::process_confirm_cmd(string const & signature)
1152{
1153 I(this->remote_peer_key_hash().size() == constants::merkle_hash_length_in_bytes);
1154 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1155
1156 hexenc<id> their_key_hash;
1157 encode_hexenc(id(remote_peer_key_hash), their_key_hash);
1158
1159 // nb. this->role is our role, the server is in the opposite role
1160 L(F("received 'confirm' netcmd from server '%s' for collection '%s' in %s mode\n")
1161 % their_key_hash % this->collection % (this->role == source_and_sink_role ? "source and sink" :
1162 (this->role == source_role ? "sink" : "source")));
1163
1164 // check their signature
1165 if (app.db.public_key_exists(their_key_hash))
1166 {
1167 // get their public key and check the signature
1168 rsa_keypair_id their_id;
1169 base64<rsa_pub_key> their_key;
1170 app.db.get_pubkey(their_key_hash, their_id, their_key);
1171 base64<rsa_sha1_signature> sig;
1172 encode_base64(rsa_sha1_signature(signature), sig);
1173 if (check_signature(app.lua, their_id, their_key, this->saved_nonce(), sig))
1174{
1175 L(F("server signature OK, accepting authentication\n"));
1176 this->authenticated = true;
1177 merkle_node root;
1178 load_merkle_node(app, key_item, this->collection, 0, ROOT_PREFIX.val, root);
1179 queue_refine_cmd(root);
1180 queue_done_cmd(0, key_item);
1181
1182 load_merkle_node(app, mcert_item, this->collection, 0, ROOT_PREFIX.val, root);
1183 queue_refine_cmd(root);
1184 queue_done_cmd(0, mcert_item);
1185
1186 load_merkle_node(app, fcert_item, this->collection, 0, ROOT_PREFIX.val, root);
1187 queue_refine_cmd(root);
1188 queue_done_cmd(0, fcert_item);
1189 return true;
1190}
1191 else
1192{
1193 W(F("bad server signature\n"));
1194}
1195 }
1196 else
1197 {
1198 W(F("unknown server key\n"));
1199 }
1200 return false;
1201}
1202
1203static bool data_exists(netcmd_item_type type,
1204id const & item,
1205app_state & app)
1206{
1207 hexenc<id> hitem;
1208 encode_hexenc(item, hitem);
1209 switch (type)
1210 {
1211 case key_item:
1212 return app.db.public_key_exists(hitem);
1213 case fcert_item:
1214 return app.db.file_cert_exists(hitem);
1215 case mcert_item:
1216 return app.db.manifest_cert_exists(hitem);
1217 case manifest_item:
1218 return app.db.manifest_version_exists(manifest_id(hitem));
1219 case file_item:
1220 return app.db.file_version_exists(file_id(hitem));
1221 }
1222 return false;
1223}
1224
1225static void load_data(netcmd_item_type type,
1226 id const & item,
1227 app_state & app,
1228 string & out)
1229{
1230 string typestr;
1231 netcmd_item_type_to_string(type, typestr);
1232 hexenc<id> hitem;
1233 encode_hexenc(item, hitem);
1234 switch (type)
1235 {
1236 case key_item:
1237 if (app.db.public_key_exists(hitem))
1238{
1239 rsa_keypair_id keyid;
1240 base64<rsa_pub_key> pub_encoded;
1241 app.db.get_pubkey(hitem, keyid, pub_encoded);
1242 L(F("public key '%s' is also called '%s'\n") % hitem % keyid);
1243 write_pubkey(keyid, pub_encoded, out);
1244}
1245 else
1246{
1247 throw bad_decode(F("public key '%s' does not exist in our database") % hitem);
1248}
1249 break;
1250
1251 case manifest_item:
1252 if (app.db.manifest_version_exists(manifest_id(hitem)))
1253{
1254 manifest_data mdat;
1255 data dat;
1256 app.db.get_manifest_version(manifest_id(hitem), mdat);
1257 unpack(mdat.inner(), dat);
1258 out = dat();
1259}
1260 else
1261{
1262 throw bad_decode(F("manifest '%s' does not exist in our database") % hitem);
1263}
1264 break;
1265
1266 case file_item:
1267 if (app.db.file_version_exists(file_id(hitem)))
1268{
1269 file_data fdat;
1270 data dat;
1271 app.db.get_file_version(file_id(hitem), fdat);
1272 unpack(fdat.inner(), dat);
1273 out = dat();
1274}
1275 else
1276{
1277 throw bad_decode(F("file '%s' does not exist in our database") % hitem);
1278}
1279 break;
1280
1281 case mcert_item:
1282 if(app.db.manifest_cert_exists(hitem))
1283{
1284 manifest<cert> c;
1285 app.db.get_manifest_cert(hitem, c);
1286 string tmp;
1287 write_cert(c.inner(), out);
1288}
1289 else
1290{
1291 throw bad_decode(F("mcert '%s' does not exist in our database") % hitem);
1292}
1293 break;
1294
1295 case fcert_item:
1296 if(app.db.file_cert_exists(hitem))
1297{
1298 file<cert> c;
1299 app.db.get_file_cert(hitem, c);
1300 string tmp;
1301 write_cert(c.inner(), out);
1302}
1303 else
1304{
1305 throw bad_decode(F("fcert '%s' does not exist in our database") % hitem);
1306}
1307 break;
1308 }
1309}
1310
1311
1312bool session::process_refine_cmd(merkle_node const & their_node)
1313{
1314 hexenc<prefix> hpref;
1315 their_node.get_hex_prefix(hpref);
1316 string typestr;
1317
1318 netcmd_item_type_to_string(their_node.type, typestr);
1319 size_t lev = static_cast<size_t>(their_node.level);
1320
1321 L(F("received 'refine' netcmd on %s node '%s', level %d\n")
1322 % typestr % hpref % lev);
1323
1324 if (!app.db.merkle_node_exists(typestr, this->collection,
1325 their_node.level, hpref))
1326 {
1327 L(F("no corresponding %s merkle node for prefix '%s', level %d\n")
1328% typestr % hpref % lev);
1329
1330 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
1331{
1332 switch (their_node.get_slot_state(slot))
1333 {
1334 case empty_state:
1335 {
1336// we agree, this slot is empty
1337L(F("(#0) they have an empty slot %d (in a %s node '%s', level %d, we do not have)\n")
1338 % slot % typestr % hpref % lev);
1339continue;
1340 }
1341 break;
1342 case live_leaf_state:
1343 {
1344// we want what *they* have
1345id slotval;
1346hexenc<id> hslotval;
1347their_node.get_raw_slot(slot, slotval);
1348their_node.get_hex_slot(slot, hslotval);
1349L(F("(#0) they have a live leaf at slot %d (in a %s node '%s', level %d, we do not have)\n")
1350 % slot % typestr % hpref % lev);
1351L(F("(#0) requesting their %s leaf %s\n") % typestr % hslotval);
1352queue_send_data_cmd(their_node.type, slotval);
1353 }
1354 break;
1355 case dead_leaf_state:
1356 {
1357// we cannot ask for what they have, it is dead
1358L(F("(#0) they have a dead leaf at slot %d (in a %s node '%s', level %d, we do not have)\n")
1359 % slot % typestr % hpref % lev);
1360continue;
1361 }
1362 break;
1363 case subtree_state:
1364 {
1365// they have a subtree; might as well ask for that
1366L(F("(#0) they have a subtree at slot %d (in a %s node '%s', level %d, we do not have)\n")
1367 % slot % typestr % hpref % lev);
1368merkle_node our_fake_subtree;
1369their_node.extended_prefix(slot, our_fake_subtree.pref);
1370our_fake_subtree.level = their_node.level + 1;
1371our_fake_subtree.type = their_node.type;
1372queue_refine_cmd(our_fake_subtree);
1373 }
1374 break;
1375 }
1376}
1377 }
1378 else
1379 {
1380 // we have a corresponding merkle node. there are 16 branches
1381 // to the following switch condition. it is awful. sorry.
1382 L(F("found corresponding %s merkle node for prefix '%s', level %d\n")
1383% typestr % hpref % lev);
1384 merkle_node our_node;
1385 load_merkle_node(app, their_node.type, this->collection,
1386 their_node.level, hpref, our_node);
1387 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
1388{
1389 switch (their_node.get_slot_state(slot))
1390 {
1391 case empty_state:
1392 switch (our_node.get_slot_state(slot))
1393{
1394
1395case empty_state:
1396 // 1: theirs == empty, ours == empty
1397 L(F("(#1) they have an empty slot %d in %s node '%s', level %d, and so do we\n")
1398 % slot % typestr % hpref % lev);
1399 continue;
1400 break;
1401
1402case live_leaf_state:
1403 // 2: theirs == empty, ours == live
1404 L(F("(#2) they have an empty slot %d in %s node '%s', level %d, we have a live leaf\n")
1405 % slot % typestr % hpref % lev);
1406 {
1407 I(their_node.type == our_node.type);
1408 string tmp;
1409 id slotval;
1410 our_node.get_raw_slot(slot, slotval);
1411 load_data(their_node.type, slotval, this->app, tmp);
1412 queue_data_cmd(their_node.type, slotval, tmp);
1413 }
1414 break;
1415
1416case dead_leaf_state:
1417 // 3: theirs == empty, ours == dead
1418 L(F("(#3) they have an empty slot %d in %s node '%s', level %d, we have a dead leaf\n")
1419 % slot % typestr % hpref % lev);
1420 continue;
1421 break;
1422
1423case subtree_state:
1424 // 4: theirs == empty, ours == subtree
1425 L(F("(#4) they have an empty slot %d in %s node '%s', level %d, we have a subtree\n")
1426 % slot % typestr % hpref % lev);
1427 {
1428 hexenc<prefix> subprefix;
1429 our_node.extended_hex_prefix(slot, subprefix);
1430 merkle_node our_subtree;
1431 I(our_node.type == their_node.type);
1432 load_merkle_node(app, their_node.type, this->collection,
1433 our_node.level + 1, subprefix, our_subtree);
1434 I(our_node.type == our_subtree.type);
1435 queue_refine_cmd(our_subtree);
1436 }
1437 break;
1438
1439}
1440 break;
1441
1442
1443 case live_leaf_state:
1444 switch (our_node.get_slot_state(slot))
1445{
1446
1447case empty_state:
1448 // 5: theirs == live, ours == empty
1449 L(F("(#5) they have a live leaf at slot %d in %s node '%s', level %d, we have nothing\n")
1450 % slot % typestr % hpref % lev);
1451 {
1452 id slotval;
1453 their_node.get_raw_slot(slot, slotval);
1454 queue_send_data_cmd(their_node.type, slotval);
1455 }
1456 break;
1457
1458case live_leaf_state:
1459 // 6: theirs == live, ours == live
1460 L(F("(#6) they have a live leaf at slot %d in %s node '%s', and so do we\n")
1461 % slot % typestr % hpref);
1462 {
1463 id our_slotval, their_slotval;
1464 their_node.get_raw_slot(slot, their_slotval);
1465 our_node.get_raw_slot(slot, our_slotval);
1466 if (their_slotval == our_slotval)
1467 {
1468hexenc<id> hslotval;
1469their_node.get_hex_slot(slot, hslotval);
1470L(F("(#6) we both have live %s leaf '%s'\n") % typestr % hslotval);
1471continue;
1472 }
1473 else
1474 {
1475I(their_node.type == our_node.type);
1476string tmp;
1477id our_slotval, their_slotval;
1478our_node.get_raw_slot(slot, our_slotval);
1479our_node.get_raw_slot(slot, their_slotval);
1480load_data(our_node.type, our_slotval, this->app, tmp);
1481queue_send_data_cmd(their_node.type, their_slotval);
1482queue_data_cmd(our_node.type, our_slotval, tmp);
1483 }
1484 }
1485 break;
1486
1487case dead_leaf_state:
1488 // 7: theirs == live, ours == dead
1489 L(F("(#7) they have a live leaf at slot %d in %s node %s, level %d, we have a dead one\n")
1490 % slot % typestr % hpref % lev);
1491 {
1492 id our_slotval, their_slotval;
1493 our_node.get_raw_slot(slot, our_slotval);
1494 their_node.get_raw_slot(slot, their_slotval);
1495 if (their_slotval == our_slotval)
1496 {
1497hexenc<id> hslotval;
1498their_node.get_hex_slot(slot, hslotval);
1499L(F("(#7) it's the same %s leaf '%s', but ours is dead\n")
1500 % typestr % hslotval);
1501continue;
1502 }
1503 else
1504 {
1505queue_send_data_cmd(their_node.type, their_slotval);
1506 }
1507 }
1508 break;
1509
1510case subtree_state:
1511 // 8: theirs == live, ours == subtree
1512 L(F("(#8) they have a live leaf in slot %d of %s node '%s', level %d, we have a subtree\n")
1513 % slot % typestr % hpref % lev);
1514 {
1515 hexenc<prefix> subprefix;
1516 our_node.extended_hex_prefix(slot, subprefix);
1517 merkle_node our_subtree;
1518 load_merkle_node(app, our_node.type, this->collection,
1519 our_node.level + 1, subprefix, our_subtree);
1520 queue_refine_cmd(our_subtree);
1521 }
1522 break;
1523}
1524 break;
1525
1526
1527 case dead_leaf_state:
1528 switch (our_node.get_slot_state(slot))
1529{
1530case empty_state:
1531 // 9: theirs == dead, ours == empty
1532 L(F("(#9) they have a dead leaf at slot %d in %s node '%s', level %d, we have nothing\n")
1533 % slot % typestr % hpref % lev);
1534 continue;
1535 break;
1536
1537case live_leaf_state:
1538 // 10: theirs == dead, ours == live
1539 L(F("(#10) they have a dead leaf at slot %d in %s node '%s', level %d, we have a live one\n")
1540 % slot % typestr % hpref % lev);
1541 {
1542 id our_slotval, their_slotval;
1543 their_node.get_raw_slot(slot, their_slotval);
1544 our_node.get_raw_slot(slot, our_slotval);
1545 hexenc<id> hslotval;
1546 our_node.get_hex_slot(slot, hslotval);
1547 if (their_slotval == our_slotval)
1548 {
1549L(F("(#10) we both have %s leaf %s, theirs is dead\n")
1550 % typestr % hslotval);
1551continue;
1552 }
1553 else
1554 {
1555I(their_node.type == our_node.type);
1556string tmp;
1557load_data(our_node.type, our_slotval, this->app, tmp);
1558queue_data_cmd(our_node.type, our_slotval, tmp);
1559 }
1560 }
1561 break;
1562
1563case dead_leaf_state:
1564 // 11: theirs == dead, ours == dead
1565 L(F("(#11) they have a dead leaf at slot %d in %s node '%s', level %d, so do we\n")
1566 % slot % typestr % hpref % lev);
1567 continue;
1568 break;
1569
1570case subtree_state:
1571 // theirs == dead, ours == subtree
1572 L(F("(#12) they have a dead leaf in slot %d of %s node '%s', we have a subtree\n")
1573 % slot % typestr % hpref % lev);
1574 {
1575 hexenc<prefix> subprefix;
1576 our_node.extended_hex_prefix(slot, subprefix);
1577 merkle_node our_subtree;
1578 load_merkle_node(app, our_node.type, this->collection,
1579 our_node.level + 1, subprefix, our_subtree);
1580 queue_refine_cmd(our_subtree);
1581 }
1582 break;
1583}
1584 break;
1585
1586
1587 case subtree_state:
1588 switch (our_node.get_slot_state(slot))
1589{
1590case empty_state:
1591 // 13: theirs == subtree, ours == empty
1592 L(F("(#13) they have a subtree at slot %d in %s node '%s', level %d, we have nothing\n")
1593 % slot % typestr % hpref % lev);
1594 {
1595 merkle_node our_fake_subtree;
1596 their_node.extended_prefix(slot, our_fake_subtree.pref);
1597 our_fake_subtree.level = their_node.level + 1;
1598 our_fake_subtree.type = their_node.type;
1599 queue_refine_cmd(our_fake_subtree);
1600 }
1601 break;
1602
1603case live_leaf_state:
1604 // 14: theirs == subtree, ours == live
1605 L(F("(#14) they have a subtree at slot %d in %s node '%s', level %d, we have a live leaf\n")
1606 % slot % typestr % hpref % lev);
1607 {
1608 size_t subslot;
1609 id our_slotval;
1610 merkle_node our_fake_subtree;
1611 our_node.get_raw_slot(slot, our_slotval);
1612 pick_slot_and_prefix_for_value(our_slotval, our_node.level + 1, subslot,
1613 our_fake_subtree.pref);
1614 our_fake_subtree.type = their_node.type;
1615 our_fake_subtree.level = our_node.level + 1;
1616 our_fake_subtree.set_raw_slot(subslot, our_slotval);
1617 our_fake_subtree.set_slot_state(subslot, our_node.get_slot_state(slot));
1618 queue_refine_cmd(our_fake_subtree);
1619 }
1620 break;
1621
1622case dead_leaf_state:
1623 // 15: theirs == subtree, ours == dead
1624 L(F("(#15) they have a subtree at slot %d in %s node '%s', level %d, we have a dead leaf\n")
1625 % slot % typestr % hpref % lev);
1626 {
1627 size_t subslot;
1628 id our_slotval;
1629 merkle_node our_fake_subtree;
1630 our_node.get_raw_slot(slot, our_slotval);
1631 pick_slot_and_prefix_for_value(our_slotval, our_node.level + 1, subslot,
1632 our_fake_subtree.pref);
1633 our_fake_subtree.type = their_node.type;
1634 our_fake_subtree.level = our_node.level + 1;
1635 our_fake_subtree.set_raw_slot(subslot, our_slotval);
1636 our_fake_subtree.set_slot_state(subslot, our_node.get_slot_state(slot));
1637 queue_refine_cmd(our_fake_subtree);
1638 }
1639 break;
1640
1641case subtree_state:
1642 // 16: theirs == subtree, ours == subtree
1643 L(F("(#16) they have a subtree at slot %d in %s node '%s', level %d, and so do we\n")
1644 % slot % typestr % hpref % lev);
1645 {
1646 id our_slotval, their_slotval;
1647 hexenc<id> hslotval;
1648 their_node.get_raw_slot(slot, their_slotval);
1649 our_node.get_raw_slot(slot, our_slotval);
1650 our_node.get_hex_slot(slot, hslotval);
1651 if (their_slotval == our_slotval)
1652 {
1653L(F("(#16) we both have %s subtree '%s'\n") % typestr % hslotval);
1654continue;
1655 }
1656 else
1657 {
1658L(F("(#16) %s subtrees at slot %d differ, refining ours\n") % typestr % slot);
1659hexenc<prefix> subprefix;
1660our_node.extended_hex_prefix(slot, subprefix);
1661merkle_node our_subtree;
1662load_merkle_node(app, our_node.type, this->collection,
1663 our_node.level + 1, subprefix, our_subtree);
1664queue_refine_cmd(our_subtree);
1665 }
1666 }
1667 break;
1668}
1669 break;
1670 }
1671}
1672 }
1673 return true;
1674}
1675
1676
1677bool session::process_send_data_cmd(netcmd_item_type type,
1678 id const & item)
1679{
1680 string typestr;
1681 netcmd_item_type_to_string(type, typestr);
1682 hexenc<id> hitem;
1683 encode_hexenc(item, hitem);
1684 L(F("received 'send_data' netcmd requesting %s '%s'\n")
1685 % typestr % hitem);
1686 if (data_exists(type, item, this->app))
1687 {
1688 string out;
1689 load_data(type, item, this->app, out);
1690 queue_data_cmd(type, item, out);
1691 }
1692 else
1693 {
1694 queue_nonexistant_cmd(type, item);
1695 }
1696 return true;
1697}
1698
1699bool session::process_send_delta_cmd(netcmd_item_type type,
1700 id const & base,
1701 id const & ident)
1702{
1703 string typestr;
1704 netcmd_item_type_to_string(type, typestr);
1705 delta del;
1706
1707 hexenc<id> hbase, hident;
1708 encode_hexenc(base, hbase);
1709 encode_hexenc(ident, hident);
1710
1711 L(F("received 'send_delta' netcmd requesting %s edge '%s' -> '%s'\n")
1712 % typestr % hbase % hident);
1713
1714 switch (type)
1715 {
1716 case file_item:
1717 {
1718file_id fbase(hbase), fident(hident);
1719file_delta fdel;
1720if (this->app.db.file_version_exists(fbase)
1721 && this->app.db.file_version_exists(fident))
1722 {
1723 file_data base_fdat, ident_fdat;
1724 data base_dat, ident_dat;
1725 this->app.db.get_file_version(fbase, base_fdat);
1726 this->app.db.get_file_version(fident, ident_fdat);
1727 string tmp;
1728 unpack(base_fdat.inner(), base_dat);
1729 unpack(ident_fdat.inner(), ident_dat);
1730 compute_delta(base_dat(), ident_dat(), tmp);
1731 del = delta(tmp);
1732 }
1733else
1734 {
1735 queue_nonexistant_cmd(type, ident);
1736 }
1737 }
1738 break;
1739
1740 case manifest_item:
1741 {
1742manifest_id mbase(hbase), mident(hident);
1743manifest_delta mdel;
1744if (this->app.db.manifest_version_exists(mbase)
1745 && this->app.db.manifest_version_exists(mident))
1746 {
1747 manifest_data base_mdat, ident_mdat;
1748 data base_dat, ident_dat;
1749 this->app.db.get_manifest_version(mbase, base_mdat);
1750 this->app.db.get_manifest_version(mident, ident_mdat);
1751 string tmp;
1752 unpack(base_mdat.inner(), base_dat);
1753 unpack(ident_mdat.inner(), ident_dat);
1754 compute_delta(base_dat(), ident_dat(), tmp);
1755 del = delta(tmp);
1756 }
1757else
1758 {
1759 queue_nonexistant_cmd(type, ident);
1760 }
1761 }
1762 break;
1763
1764 default:
1765 throw bad_decode(F("delta requested for item type %s\n") % typestr);
1766 }
1767 queue_delta_cmd(type, base, ident, del);
1768 return true;
1769}
1770
1771void session::update_merkle_trees(netcmd_item_type type,
1772 hexenc<id> const & hident,
1773 bool live_p)
1774{
1775 id raw_id;
1776 decode_hexenc(hident, raw_id);
1777 string typestr;
1778 netcmd_item_type_to_string(type, typestr);
1779 for (set<string>::const_iterator i = this->all_collections.begin();
1780 i != this->all_collections.end(); ++i)
1781 {
1782 if (this->collection().find(*i) == 0)
1783{
1784 L(F("updating %s collection '%s' with item %s\n")
1785 % typestr % *i % hident);
1786 insert_into_merkle_tree(this->app, live_p, type, *i, raw_id(), 0);
1787}
1788 }
1789}
1790
1791bool session::process_data_cmd(netcmd_item_type type,
1792 id const & item,
1793 string const & dat)
1794{
1795 hexenc<id> hitem;
1796 encode_hexenc(item, hitem);
1797
1798 // it's ok if we received something we didn't ask for; it might
1799 // be a spontaneous transmission from refinement
1800 requested_items.erase(make_pair(type, item));
1801
1802 switch (type)
1803 {
1804 case key_item:
1805 if (this->app.db.public_key_exists(hitem))
1806L(F("public key '%s' already exists in our database\n") % hitem);
1807 else
1808{
1809 rsa_keypair_id keyid;
1810 base64<rsa_pub_key> pub;
1811 read_pubkey(dat, keyid, pub);
1812 hexenc<id> tmp;
1813 key_hash_code(keyid, pub, tmp);
1814 if (! (tmp == hitem))
1815 throw bad_decode(F("hash check failed for public key '%s' (%s);"
1816 " wanted '%s' got '%s'")
1817 % hitem % keyid % hitem % tmp);
1818 this->app.db.put_key(keyid, pub);
1819 update_merkle_trees(key_item, tmp, true);
1820}
1821 break;
1822
1823 case mcert_item:
1824 if (this->app.db.manifest_cert_exists(hitem))
1825L(F("manifest cert '%s' already exists in our database\n") % hitem);
1826 else
1827{
1828 cert c;
1829 read_cert(dat, c);
1830 hexenc<id> tmp;
1831 cert_hash_code(c, tmp);
1832 if (! (tmp == hitem))
1833 throw bad_decode(F("hash check failed for manifest cert '%s'") % hitem);
1834 this->app.db.put_manifest_cert(manifest<cert>(c));
1835 update_merkle_trees(mcert_item, tmp, true);
1836 if (c.name == ancestor_cert_name)
1837 {
1838 cert_value tmp_value;
1839 hexenc<id> tmp_parent;
1840 id child, parent;
1841
1842 decode_base64(c.value, tmp_value);
1843 tmp_parent = tmp_value();
1844 decode_hexenc(c.ident, child);
1845 decode_hexenc(tmp_parent, parent);
1846 L(F("noticed ancestry edge from '%s' -> '%s'\n") % tmp_parent % c.ident);
1847 this->ancestry_edges.insert(make_pair(child, parent));
1848 }
1849}
1850 break;
1851
1852 case fcert_item:
1853 if (this->app.db.file_cert_exists(hitem))
1854L(F("file cert '%s' already exists in our database\n") % hitem);
1855 else
1856{
1857 cert c;
1858 read_cert(dat, c);
1859 hexenc<id> tmp;
1860 cert_hash_code(c, tmp);
1861 if (! (tmp == hitem))
1862 throw bad_decode(F("hash check failed for file cert '%s'") % hitem);
1863 this->app.db.put_file_cert(file<cert>(c));
1864 update_merkle_trees(fcert_item, tmp, true);
1865}
1866 break;
1867
1868 case manifest_item:
1869 {
1870manifest_id mid(hitem);
1871if (this->app.db.manifest_version_exists(mid))
1872 L(F("manifest version '%s' already exists in our database\n") % hitem);
1873else
1874 {
1875 base64< gzip<data> > packed_dat;
1876 pack(data(dat), packed_dat);
1877 this->app.db.put_manifest(mid, manifest_data(packed_dat));
1878 manifest_map man;
1879 read_manifest_map(data(dat), man);
1880 analyze_manifest(man);
1881 }
1882 }
1883 break;
1884
1885 case file_item:
1886 {
1887file_id fid(hitem);
1888if (this->app.db.file_version_exists(fid))
1889 L(F("file version '%s' already exists in our database\n") % hitem);
1890else
1891 {
1892 base64< gzip<data> > packed_dat;
1893 pack(data(dat), packed_dat);
1894 this->app.db.put_file(fid, file_data(packed_dat));
1895 }
1896 }
1897 break;
1898
1899 }
1900 return true;
1901}
1902
1903bool session::process_delta_cmd(netcmd_item_type type,
1904id const & base,
1905id const & ident,
1906delta const & del)
1907{
1908 string typestr;
1909 netcmd_item_type_to_string(type, typestr);
1910 hexenc<id> hbase, hident;
1911 encode_hexenc(base, hbase);
1912 encode_hexenc(ident, hident);
1913
1914 // it's ok if we received something we didn't ask for; it might
1915 // be a spontaneous transmission from refinement
1916 requested_items.erase(make_pair(type, ident));
1917
1918 switch (type)
1919 {
1920 case manifest_item:
1921 {
1922manifest_id old_manifest(hbase), new_manifest(hident);
1923if (! this->app.db.manifest_version_exists(old_manifest))
1924 L(F("manifest delta base '%s' does not exist in our database\n")
1925 % hbase);
1926else if (this->app.db.manifest_version_exists(new_manifest))
1927 L(F("manifest delta head '%s' already exists in our database\n")
1928 % hident);
1929else
1930 {
1931 manifest_data old_dat;
1932 this->app.db.get_manifest_version(old_manifest, old_dat);
1933 data old_unpacked;
1934 unpack(old_dat.inner(), old_unpacked);
1935 string tmp;
1936 apply_delta(old_unpacked(), del(), tmp);
1937 hexenc<id> confirm;
1938 calculate_ident(data(tmp), confirm);
1939 if (!(confirm == hident))
1940 {
1941L(F("reconstructed manifest from delta '%s' -> '%s' has wrong id '%s'\n")
1942 % hbase % hident % confirm);
1943 }
1944 else
1945 {
1946base64< gzip<delta> > packed_del;
1947pack(del, packed_del);
1948this->app.db.put_manifest_version(old_manifest, new_manifest,
1949 manifest_delta(packed_del));
1950manifest_map parent_man, child_man;
1951read_manifest_map(old_unpacked, parent_man);
1952read_manifest_map(data(tmp), child_man);
1953analyze_manifest_edge(parent_man, child_man);
1954 }
1955 }
1956 }
1957 break;
1958
1959 case file_item:
1960 {
1961file_id old_file(hbase), new_file(hident);
1962if (! this->app.db.file_version_exists(old_file))
1963 L(F("file delta base '%s' does not exist in our database\n")
1964 % hbase);
1965else if (this->app.db.file_version_exists(new_file))
1966 L(F("file delta head '%s' already exists in our database\n")
1967 % hident);
1968else
1969 {
1970 file_data old_dat;
1971 this->app.db.get_file_version(old_file, old_dat);
1972 data old_unpacked;
1973 unpack(old_dat.inner(), old_unpacked);
1974 string tmp;
1975 apply_delta(old_unpacked(), del(), tmp);
1976 hexenc<id> confirm;
1977 calculate_ident(data(tmp), confirm);
1978 if (!(confirm == hident))
1979 {
1980L(F("reconstructed file from delta '%s' -> '%s' has wrong id '%s'\n")
1981 % hbase % hident % confirm);
1982 }
1983 else
1984 {
1985base64< gzip<delta> > packed_del;
1986pack(del, packed_del);
1987this->app.db.put_file_version(old_file, new_file, file_delta(packed_del));
1988 }
1989 }
1990 }
1991 break;
1992
1993 default:
1994 L(F("ignoring delta received for item type %s\n") % typestr);
1995 break;
1996 }
1997 return true;
1998}
1999
2000bool session::process_nonexistant_cmd(netcmd_item_type type,
2001 id const & item)
2002{
2003 string typestr;
2004 netcmd_item_type_to_string(type, typestr);
2005 hexenc<id> hitem;
2006 encode_hexenc(item, hitem);
2007 L(F("received 'nonexistant' netcmd for %s '%s'\n")
2008 % typestr % hitem);
2009 requested_items.erase(make_pair(type, item));
2010 return true;
2011}
2012
2013
2014
2015bool session::dispatch_payload(netcmd const & cmd)
2016{
2017
2018 switch (cmd.cmd_code)
2019 {
2020
2021 case bye_cmd:
2022 return process_bye_cmd();
2023 break;
2024
2025 case error_cmd:
2026 {
2027string errmsg;
2028read_error_cmd_payload(cmd.payload, errmsg);
2029return process_error_cmd(errmsg);
2030 }
2031 break;
2032
2033 case hello_cmd:
2034 require(! authenticated, "hello netcmd received when not authenticated");
2035 require(voice == client_voice, "hello netcmd received in client voice");
2036 {
2037id server, nonce;
2038read_hello_cmd_payload(cmd.payload, server, nonce);
2039return process_hello_cmd(server, nonce);
2040 }
2041 break;
2042
2043 case anonymous_cmd:
2044 require(! authenticated, "anonymous netcmd received when not authenticated");
2045 require(voice == server_voice, "anonymous netcmd received in server voice");
2046 require(role == source_role ||
2047 role == source_and_sink_role,
2048 "anonymous netcmd received in source or source/sink role");
2049 {
2050protocol_role role;
2051string collection;
2052id nonce2;
2053read_anonymous_cmd_payload(cmd.payload, role, collection, nonce2);
2054return process_anonymous_cmd(role, collection, nonce2);
2055 }
2056 break;
2057
2058 case auth_cmd:
2059 require(! authenticated, "auth netcmd received when not authenticated");
2060 require(voice == server_voice, "auth netcmd received in server voice");
2061 {
2062protocol_role role;
2063string collection, signature;
2064id client, nonce1, nonce2;
2065read_auth_cmd_payload(cmd.payload, role, collection, client, nonce1, nonce2, signature);
2066return process_auth_cmd(role, collection, client, nonce1, nonce2, signature);
2067 }
2068 break;
2069
2070 case confirm_cmd:
2071 require(! authenticated, "confirm netcmd received when not authenticated");
2072 require(voice == client_voice, "confirm netcmd received in client voice");
2073 {
2074string signature;
2075read_confirm_cmd_payload(cmd.payload, signature);
2076return process_confirm_cmd(signature);
2077 }
2078 break;
2079
2080 case refine_cmd:
2081 require(authenticated, "refine netcmd received when authenticated");
2082 {
2083merkle_node node;
2084read_refine_cmd_payload(cmd.payload, node);
2085map< netcmd_item_type, done_marker>::iterator i = done_refinements.find(node.type);
2086require(i != done_refinements.end(), "refinement netcmd refers to valid type");
2087require(i->second.tree_is_done == false, "refinement netcmd received when tree is live");
2088i->second.current_level_had_refinements = true;
2089return process_refine_cmd(node);
2090 }
2091 break;
2092
2093 case done_cmd:
2094 require(authenticated, "done netcmd received when authenticated");
2095 {
2096size_t level;
2097netcmd_item_type type;
2098read_done_cmd_payload(cmd.payload, level, type);
2099return process_done_cmd(level, type);
2100 }
2101 break;
2102
2103 case send_data_cmd:
2104 require(authenticated, "send_data netcmd received when authenticated");
2105 require(role == source_role ||
2106 role == source_and_sink_role,
2107 "send_data netcmd received in source or source/sink role");
2108 {
2109netcmd_item_type type;
2110id item;
2111read_send_data_cmd_payload(cmd.payload, type, item);
2112return process_send_data_cmd(type, item);
2113 }
2114 break;
2115
2116 case send_delta_cmd:
2117 require(authenticated, "send_delta netcmd received when authenticated");
2118 require(role == source_role ||
2119 role == source_and_sink_role,
2120 "send_delta netcmd received in source or source/sink role");
2121 {
2122netcmd_item_type type;
2123id base, ident;
2124read_send_delta_cmd_payload(cmd.payload, type, base, ident);
2125return process_send_delta_cmd(type, base, ident);
2126 }
2127
2128 case data_cmd:
2129 require(authenticated, "data netcmd received when authenticated");
2130 require(role == sink_role ||
2131 role == source_and_sink_role,
2132 "data netcmd received in source or source/sink role");
2133 {
2134netcmd_item_type type;
2135id item;
2136string dat;
2137read_data_cmd_payload(cmd.payload, type, item, dat);
2138return process_data_cmd(type, item, dat);
2139 }
2140 break;
2141
2142 case delta_cmd:
2143 require(authenticated, "delta netcmd received when authenticated");
2144 require(role == sink_role ||
2145 role == source_and_sink_role,
2146 "delta netcmd received in source or source/sink role");
2147 {
2148netcmd_item_type type;
2149id base, ident;
2150delta del;
2151read_delta_cmd_payload(cmd.payload, type, base, ident, del);
2152return process_delta_cmd(type, base, ident, del);
2153 }
2154 break;
2155
2156 case nonexistant_cmd:
2157 require(authenticated, "nonexistant netcmd received when authenticated");
2158 require(role == sink_role ||
2159 role == source_and_sink_role,
2160 "nonexistant netcmd received in sink or source/sink role");
2161 {
2162netcmd_item_type type;
2163id item;
2164read_nonexistant_cmd_payload(cmd.payload, type, item);
2165return process_nonexistant_cmd(type, item);
2166 }
2167 break;
2168 }
2169 return false;
2170}
2171
2172// this kicks off the whole cascade starting from "hello"
2173void session::begin_service()
2174{
2175 base64<rsa_pub_key> pub_encoded;
2176 app.db.get_key(app.signing_key, pub_encoded);
2177 hexenc<id> keyhash;
2178 id keyhash_raw;
2179 key_hash_code(app.signing_key, pub_encoded, keyhash);
2180 decode_hexenc(keyhash, keyhash_raw);
2181 queue_hello_cmd(keyhash_raw(), mk_nonce());
2182}
2183
2184void session::maybe_say_goodbye()
2185{
2186 if (done_all_refinements() &&
2187 got_all_data())
2188 queue_bye_cmd();
2189}
2190
2191bool session::arm()
2192{
2193 if (!armed)
2194 {
2195 if (read_netcmd(inbuf, cmd))
2196{
2197 inbuf.erase(0, cmd.encoded_size());
2198 armed = true;
2199}
2200 }
2201 return armed;
2202}
2203
2204bool session::process()
2205{
2206 try
2207 {
2208 if (!arm())
2209return true;
2210
2211 transaction_guard guard(app.db);
2212 armed = false;
2213 L(F("processing %d byte input buffer from peer %s\n") % inbuf.size() % peer_id);
2214 bool ret = dispatch_payload(cmd);
2215 if (inbuf.size() >= constants::netcmd_maxsz)
2216W(F("input buffer for peer %s is overfull after netcmd dispatch\n") % peer_id);
2217 guard.commit();
2218 maybe_say_goodbye();
2219 return ret;
2220 }
2221 catch (bad_decode & bd)
2222 {
2223 W(F("caught bad_decode exception processing peer %s: '%s'\n") % peer_id % bd.what);
2224 return false;
2225 }
2226}
2227
2228
2229static void call_server(protocol_role role,
2230vector<utf8> const & collections,
2231set<string> const & all_collections,
2232app_state & app,
2233utf8 const & address,
2234port_type default_port,
2235unsigned long timeout_seconds)
2236{
2237 Probe probe;
2238 Timeout timeout(static_cast<long>(timeout_seconds)), instant(0,1);
2239
2240 // FIXME: split into labels and convert to ace here.
2241
2242 P(F("connecting to %s\n") % address());
2243 Stream server(address().c_str(), default_port, timeout);
2244 session sess(role, client_voice, collections, all_collections, app,
2245 address(), server.get_socketfd(), timeout);
2246
2247 ticker input("bytes in"), output("bytes out");
2248
2249 while (true)
2250 {
2251 bool armed = false;
2252 try
2253{
2254 armed = sess.arm();
2255}
2256 catch (bad_decode & bd)
2257{
2258 W(F("caught bad_decode exception decoding input from peer %s: '%s'\n")
2259 % sess.peer_id % bd.what);
2260 return;
2261}
2262
2263 probe.clear();
2264 probe.add(sess.stream, sess.which_events());
2265 Probe::result_type res = probe.ready(armed ? instant : timeout);
2266 Probe::ready_type event = res.second;
2267 socket_type fd = res.first;
2268
2269 if (fd == -1 && !armed)
2270{
2271 P(F("timed out waiting for I/O with peer %s, disconnecting\n") % sess.peer_id);
2272 return;
2273}
2274
2275 if (event & Probe::ready_read)
2276{
2277 if (sess.read_some(&input))
2278 {
2279 try
2280{
2281 armed = sess.arm();
2282}
2283 catch (bad_decode & bd)
2284{
2285 W(F("caught bad_decode exception decoding input from peer %s: '%s'\n")
2286 % sess.peer_id % bd.what);
2287 return;
2288}
2289 }
2290 else
2291 {
2292 if (sess.sent_goodbye)
2293P(F("read from fd %d (peer %s) closed OK after goodbye\n") % fd % sess.peer_id);
2294 else
2295P(F("read from fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id);
2296 return;
2297 }
2298}
2299
2300 if (event & Probe::ready_write)
2301{
2302 if (! sess.write_some(&output))
2303 {
2304 if (sess.sent_goodbye)
2305P(F("write on fd %d (peer %s) closed OK after goodbye\n") % fd % sess.peer_id);
2306 else
2307P(F("write on fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id);
2308 return;
2309 }
2310}
2311
2312 if (event & Probe::ready_oobd)
2313{
2314 P(F("got OOB data on fd %d (peer %s), disconnecting\n")
2315 % fd % sess.peer_id);
2316 return;
2317}
2318
2319 if (armed)
2320{
2321 if (!sess.process())
2322 {
2323 P(F("processing on fd %d (peer %s) finished, disconnecting\n")
2324% fd % sess.peer_id);
2325 return;
2326 }
2327}
2328
2329 if (sess.sent_goodbye && sess.outbuf.empty() && sess.received_goodbye)
2330{
2331 P(F("exchanged goodbyes and flushed output on fd %d (peer %s), disconnecting\n")
2332 % fd % sess.peer_id);
2333 return;
2334}
2335 }
2336}
2337
2338static void
2339arm_sessions_and_calculate_probe(Probe & probe,
2340 map<socket_type, shared_ptr<session> > & sessions,
2341 set<socket_type> & armed_sessions)
2342{
2343 set<socket_type> arm_failed;
2344 for (map<socket_type, shared_ptr<session> >::const_iterator i = sessions.begin();
2345 i != sessions.end(); ++i)
2346 {
2347 try
2348{
2349 if (i->second->arm())
2350 {
2351 L(F("fd %d is armed\n") % i->first);
2352 armed_sessions.insert(i->first);
2353 }
2354 probe.add(i->second->stream, i->second->which_events());
2355}
2356 catch (bad_decode & bd)
2357{
2358 W(F("caught bad_decode exception decoding input from peer %s: '%s', marking as bad\n")
2359 % i->second->peer_id % bd.what);
2360 arm_failed.insert(i->first);
2361}
2362 }
2363 for (set<socket_type>::const_iterator i = arm_failed.begin();
2364 i != arm_failed.end(); ++i)
2365 {
2366 sessions.erase(*i);
2367 }
2368}
2369
2370static void
2371handle_new_connection(Address & addr,
2372 StreamServer & server,
2373 Timeout & timeout,
2374 protocol_role role,
2375 vector<utf8> const & collections,
2376 set<string> const & all_collections,
2377 map<socket_type, shared_ptr<session> > & sessions,
2378 app_state & app)
2379{
2380 L(F("accepting new connection on %s : %d\n")
2381 % addr.get_name() % addr.get_port());
2382 Peer client = server.accept_connection();
2383
2384 if (!client)
2385 {
2386 L(F("accept() returned a dead client\n"));
2387 }
2388 else
2389 {
2390 P(F("accepted new client connection from %s\n") % client);
2391 shared_ptr<session> sess(new session(role, server_voice, collections,
2392 all_collections, app,
2393 lexical_cast<string>(client),
2394 client.get_socketfd(), timeout));
2395 sess->begin_service();
2396 sessions.insert(make_pair(client.get_socketfd(), sess));
2397 }
2398}
2399
2400static void
2401handle_read_available(socket_type fd,
2402 shared_ptr<session> sess,
2403 map<socket_type, shared_ptr<session> > & sessions,
2404 set<socket_type> & armed_sessions)
2405{
2406 if (sess->read_some())
2407 {
2408 try
2409{
2410 if (sess->arm())
2411 armed_sessions.insert(fd);
2412}
2413 catch (bad_decode & bd)
2414{
2415 W(F("caught bad_decode exception decoding input from peer %s: '%s', disconnecting\n")
2416 % sess->peer_id % bd.what);
2417 sessions.erase(fd);
2418}
2419 }
2420 else
2421 {
2422 P(F("fd %d (peer %s) read failed, disconnecting\n")
2423% fd % sess->peer_id);
2424 sessions.erase(fd);
2425 }
2426}
2427
2428
2429static void
2430handle_write_available(socket_type fd,
2431 shared_ptr<session> sess,
2432 map<socket_type, shared_ptr<session> > & sessions)
2433{
2434 if (! sess->write_some())
2435 {
2436 P(F("fd %d (peer %s) write failed, disconnecting\n")
2437% fd % sess->peer_id);
2438 sessions.erase(fd);
2439 }
2440}
2441
2442static void
2443process_armed_sessions(map<socket_type, shared_ptr<session> > & sessions,
2444 set<socket_type> & armed_sessions)
2445{
2446 for (set<socket_type>::const_iterator i = armed_sessions.begin();
2447 i != armed_sessions.end(); ++i)
2448 {
2449 map<socket_type, shared_ptr<session> >::iterator j;
2450 j = sessions.find(*i);
2451 if (j == sessions.end())
2452continue;
2453 else
2454{
2455 socket_type fd = j->first;
2456 shared_ptr<session> sess = j->second;
2457 if (!sess->process())
2458 {
2459 P(F("fd %d (peer %s) processing finished, disconnecting\n")
2460% fd % sess->peer_id);
2461 sessions.erase(j);
2462 }
2463}
2464 }
2465}
2466
2467static void
2468reap_dead_sessions(map<socket_type, shared_ptr<session> > & sessions,
2469 unsigned long timeout_seconds)
2470{
2471 // kill any clients which haven't done any i/o inside the timeout period
2472 // or who have said goodbye and flushed their output buffers
2473 set<socket_type> dead_clients;
2474 time_t now = ::time(NULL);
2475 for (map<socket_type, shared_ptr<session> >::const_iterator i = sessions.begin();
2476 i != sessions.end(); ++i)
2477 {
2478 if (static_cast<unsigned long>(i->second->last_io_time + timeout_seconds)
2479 < static_cast<unsigned long>(now))
2480{
2481 P(F("fd %d (peer %s) has been idle too long, disconnecting\n")
2482 % i->first % i->second->peer_id);
2483 dead_clients.insert(i->first);
2484}
2485 if (i->second->sent_goodbye && i->second->outbuf.empty() && i->second->received_goodbye)
2486{
2487 P(F("fd %d (peer %s) exchanged goodbyes and flushed output, disconnecting\n")
2488 % i->first % i->second->peer_id);
2489 dead_clients.insert(i->first);
2490}
2491 }
2492 for (set<socket_type>::const_iterator i = dead_clients.begin();
2493 i != dead_clients.end(); ++i)
2494 {
2495 sessions.erase(*i);
2496 }
2497}
2498
2499static void
2500serve_connections(protocol_role role,
2501 vector<utf8> const & collections,
2502 set<string> const & all_collections,
2503 app_state & app,
2504 utf8 const & address,
2505 port_type default_port,
2506 unsigned long timeout_seconds,
2507 unsigned long session_limit)
2508{
2509 Probe probe;
2510
2511 Timeout
2512 forever,
2513 timeout(static_cast<long>(timeout_seconds)),
2514 instant(0,1);
2515
2516 Address addr(address().c_str(), default_port, true);
2517 StreamServer server(addr, timeout);
2518
2519 map<socket_type, shared_ptr<session> > sessions;
2520 set<socket_type> armed_sessions;
2521
2522 P(F("beginning service on %s : %d\n")
2523 % addr.get_name() % addr.get_port());
2524
2525 while (true)
2526 {
2527 probe.clear();
2528 armed_sessions.clear();
2529
2530 if (sessions.size() >= session_limit)
2531W(F("session limit %d reached, some connections will be refused\n") % session_limit);
2532 else
2533probe.add(server);
2534
2535 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions);
2536
2537 L(F("i/o probe with %d armed\n") % armed_sessions.size());
2538 Probe::result_type res = probe.ready(sessions.empty() ? forever
2539 : (armed_sessions.empty() ? timeout
2540 : instant));
2541 Probe::ready_type event = res.second;
2542 socket_type fd = res.first;
2543
2544 if (fd == -1)
2545{
2546 if (armed_sessions.empty())
2547 L(F("timed out waiting for I/O (listening on %s : %d)\n")
2548 % addr.get_name() % addr.get_port());
2549}
2550
2551 // we either got a new connection
2552 else if (fd == server)
2553handle_new_connection(addr, server, timeout, role,
2554 collections, all_collections, sessions, app);
2555
2556 // or an existing session woke up
2557 else
2558{
2559 map<socket_type, shared_ptr<session> >::iterator i;
2560 i = sessions.find(fd);
2561 if (i == sessions.end())
2562 {
2563 L(F("got woken up for action on unknown fd %d\n") % fd);
2564 }
2565 else
2566 {
2567 shared_ptr<session> sess = i->second;
2568 if (event & Probe::ready_read)
2569handle_read_available(fd, sess, sessions, armed_sessions);
2570
2571 if (event & Probe::ready_write)
2572handle_write_available(fd, sess, sessions);
2573
2574 if (event & Probe::ready_oobd)
2575{
2576 P(F("got some OOB data on fd %d (peer %s), disconnecting\n")
2577 % fd % sess->peer_id);
2578 sessions.erase(i);
2579}
2580 }
2581}
2582 process_armed_sessions(sessions, armed_sessions);
2583 reap_dead_sessions(sessions, timeout_seconds);
2584 }
2585}
2586
2587
2588/////////////////////////////////////////////////
2589//
2590// layer 4: monotone interface layer
2591//
2592/////////////////////////////////////////////////
2593
2594void rebuild_merkle_trees(app_state & app,
2595 utf8 const & collection)
2596{
2597 transaction_guard guard(app.db);
2598
2599 P(F("rebuilding merkle trees for collection %s\n") % collection);
2600
2601 string typestr;
2602 merkle_node empty_root_node;
2603
2604 empty_root_node.type = mcert_item;
2605 netcmd_item_type_to_string(mcert_item, typestr);
2606 app.db.erase_merkle_nodes(typestr, collection);
2607 store_merkle_node(app, collection, empty_root_node);
2608
2609 empty_root_node.type = fcert_item;
2610 netcmd_item_type_to_string(fcert_item, typestr);
2611 app.db.erase_merkle_nodes(typestr, collection);
2612 store_merkle_node(app, collection, empty_root_node);
2613
2614 empty_root_node.type = key_item;
2615 netcmd_item_type_to_string(key_item, typestr);
2616 app.db.erase_merkle_nodes(typestr, collection);
2617 store_merkle_node(app, collection, empty_root_node);
2618
2619 // FIXME: do fcerts later
2620 // ticker fcerts("fcerts");
2621
2622 ticker mcerts("mcerts");
2623 ticker keys("keys");
2624
2625 set<manifest_id> manifest_ids;
2626 set<rsa_keypair_id> inserted_keys;
2627
2628 {
2629 // get all matching branch names
2630 vector< manifest<cert> > certs;
2631 app.db.get_manifest_certs(branch_cert_name, certs);
2632 for (size_t i = 0; i < certs.size(); ++i)
2633 {
2634cert_value name;
2635decode_base64(idx(certs, i).inner().value, name);
2636if (name().find(collection()) == 0)
2637 {
2638 manifest_ids.insert(manifest_id(idx(certs, i).inner().ident));
2639 }
2640 }
2641
2642 // insert all certs and keys reachable via these manifests
2643 for (set<manifest_id>::const_iterator man = manifest_ids.begin();
2644 man != manifest_ids.end(); ++man)
2645 {
2646app.db.get_manifest_certs(*man, certs);
2647for (size_t i = 0; i < certs.size(); ++i)
2648 {
2649 hexenc<id> certhash;
2650 id raw_id;
2651 cert_hash_code(idx(certs, i).inner(), certhash);
2652 decode_hexenc(certhash, raw_id);
2653 insert_into_merkle_tree(app, true, mcert_item, collection, raw_id(), 0);
2654 ++mcerts;
2655 rsa_keypair_id const & k = idx(certs, i).inner().key;
2656 if (inserted_keys.find(k) == inserted_keys.end())
2657 {
2658if (app.db.public_key_exists(k))
2659 {
2660 base64<rsa_pub_key> pub_encoded;
2661 app.db.get_key(k, pub_encoded);
2662 hexenc<id> keyhash;
2663 key_hash_code(k, pub_encoded, keyhash);
2664 decode_hexenc(keyhash, raw_id);
2665 insert_into_merkle_tree(app, true, key_item, collection, raw_id(), 0);
2666 ++keys;
2667 }
2668inserted_keys.insert(k);
2669 }
2670 }
2671 }
2672 }
2673 guard.commit();
2674}
2675
2676static void ensure_merkle_tree_ready(app_state & app,
2677 utf8 const & collection)
2678{
2679 string mcert_item_str, fcert_item_str, key_item_str;
2680 netcmd_item_type_to_string(mcert_item, mcert_item_str);
2681 netcmd_item_type_to_string(mcert_item, fcert_item_str);
2682 netcmd_item_type_to_string(mcert_item, key_item_str);
2683
2684// if (! (app.db.merkle_node_exists(mcert_item_str, collection, 0, ROOT_PREFIX.val)
2685// && app.db.merkle_node_exists(fcert_item_str, collection, 0, ROOT_PREFIX.val)
2686// && app.db.merkle_node_exists(key_item_str, collection, 0, ROOT_PREFIX.val)))
2687// {
2688
2689 // FIXME: for now we always rebuild merkle trees. that's a bit coarse but it
2690 // saves us having to hunt down all the possible write conditions in the packet
2691 // writers and make sure they update the indices properly
2692
2693 rebuild_merkle_trees(app, collection);
2694
2695// }
2696}
2697
2698void run_netsync_protocol(protocol_voice voice,
2699 protocol_role role,
2700 utf8 const & addr,
2701 vector<utf8> collections,
2702 app_state & app)
2703{
2704 for (vector<utf8>::const_iterator i = collections.begin();
2705 i != collections.end(); ++i)
2706 ensure_merkle_tree_ready(app, *i);
2707
2708 set<string> all_collections;
2709 for (vector<utf8>::const_iterator j = collections.begin();
2710 j != collections.end(); ++j)
2711 {
2712 all_collections.insert((*j)());
2713 }
2714
2715 vector< manifest<cert> > certs;
2716 app.db.get_manifest_certs(branch_cert_name, certs);
2717 for (vector< manifest<cert> >::const_iterator i = certs.begin();
2718 i != certs.end(); ++i)
2719 {
2720 cert_value name;
2721 decode_base64(i->inner().value, name);
2722 for (vector<utf8>::const_iterator j = collections.begin();
2723 j != collections.end(); ++j)
2724{
2725 if ((*j)().find(name()) == 0
2726 && all_collections.find(name()) == all_collections.end())
2727 {
2728 if (name() != (*j)())
2729P(F("%s included in collection %s\n") % (*j) % name);
2730 all_collections.insert(name());
2731 }
2732}
2733 }
2734
2735
2736 if (voice == server_voice)
2737 {
2738 serve_connections(role, collections, all_collections, app,
2739addr, static_cast<port_type>(constants::netsync_default_port),
2740static_cast<unsigned long>(constants::netsync_timeout_seconds),
2741static_cast<unsigned long>(constants::netsync_connection_limit));
2742 }
2743 else
2744 {
2745 I(voice == client_voice);
2746 call_server(role, collections, all_collections, app,
2747 addr, static_cast<port_type>(constants::netsync_default_port),
2748 static_cast<unsigned long>(constants::netsync_timeout_seconds));
2749 }
2750}
2751

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status