monotone

monotone Mtn Source Tree

Root/netsync.cc

1// Copyright (C) 2008 Stephen Leake <stephen_leake@stephe-leake.org>
2// Copyright (C) 2004 Graydon Hoare <graydon@pobox.com>
3//
4// This program is made available under the GNU GPL version 2.0 or
5// greater. See the accompanying file COPYING for details.
6//
7// This program is distributed WITHOUT ANY WARRANTY; without even the
8// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
9// PURPOSE.
10
11#include "base.hh"
12#include <map>
13#include <cstdlib>
14#include <memory>
15#include <list>
16#include <deque>
17#include <stack>
18
19#include <time.h>
20
21#include "lexical_cast.hh"
22#include <boost/scoped_ptr.hpp>
23#include <boost/shared_ptr.hpp>
24#include <boost/bind.hpp>
25
26#include "lua_hooks.hh"
27#include "key_store.hh"
28#include "project.hh"
29#include "database.hh"
30#include "cert.hh"
31#include "constants.hh"
32#include "enumerator.hh"
33#include "keys.hh"
34#include "lua.hh"
35#include "merkle_tree.hh"
36#include "netcmd.hh"
37#include "netio.hh"
38#include "numeric_vocab.hh"
39#include "refiner.hh"
40#include "revision.hh"
41#include "sanity.hh"
42#include "transforms.hh"
43#include "ui.hh"
44#include "xdelta.hh"
45#include "epoch.hh"
46#include "platform.hh"
47#include "hmac.hh"
48#include "globish.hh"
49#include "uri.hh"
50#include "options.hh"
51
52#include "botan/botan.h"
53
54#include "netxx/address.h"
55#include "netxx/peer.h"
56#include "netxx/probe.h"
57#include "netxx/socket.h"
58#include "netxx/sockopt.h"
59#include "netxx/stream.h"
60#include "netxx/streamserver.h"
61#include "netxx/timeout.h"
62#include "netxx_pipe.hh"
63// TODO: things to do that will break protocol compatibility
64// -- need some way to upgrade anonymous to keyed pull, without user having
65// to explicitly specify which they want
66// just having a way to respond "access denied, try again" might work
67// but perhaps better to have the anonymous command include a note "I
68// _could_ use key <...> if you prefer", and if that would lead to more
69// access, could reply "I do prefer". (Does this lead to too much
70// information exposure? Allows anonymous people to probe what branches
71// a key has access to.)
72// -- "warning" packet type?
73// -- Richard Levitte wants, when you (e.g.) request '*' but don't have
74// access to all of it, you just get the parts you have access to
75// (maybe with warnings about skipped branches). to do this right,
76// should have a way for the server to send back to the client "right,
77// you're not getting the following branches: ...", so the client will
78// not include them in its merkle trie.
79// -- add some sort of vhost field to the client's first packet, saying who
80// they expect to talk to
81
82//
83// This is the "new" network synchronization (netsync) system in
84// monotone. It is based on synchronizing pairs of merkle trees over an
85// interactive connection.
86//
87// A netsync process between peers treats each peer as either a source, a
88// sink, or both. When a peer is only a source, it will not write any new
89// items to its database. when a peer is only a sink, it will not send any
90// items from its database. When a peer is both a source and sink, it may
91// send and write items freely.
92//
93// The post-state of a netsync is that each sink contains a superset of the
94// items in its corresponding source; when peers are behaving as both
95// source and sink, this means that the post-state of the sync is for the
96// peers to have identical item sets.
97//
98//
99// Data structure
100// --------------
101//
102// Each node in a merkle tree contains a fixed number of slots. this number
103// is derived from a global parameter of the protocol -- the tree fanout --
104// such that the number of slots is 2^fanout. For now we will assume that
105// fanout is 4 thus there are 16 slots in a node, because this makes
106// illustration easier. The other parameter of the protocol is the size of
107// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
108//
109// Each slot in a merkle tree node is in one of 3 states:
110//
111// - empty
112// - leaf
113// - subtree
114//
115// In addition, each leaf contains a hash code which identifies an element
116// of the set being synchronized. Each subtree slot contains a hash code of
117// the node immediately beneath it in the merkle tree. Empty slots contain
118// no hash codes.
119//
120// Since empty slots have no hash code, they are represented implicitly by
121// a bitmap at the head of each merkle tree node. As an additional
122// integrity check, each merkle tree node contains a label indicating its
123// prefix in the tree, and a hash of its own contents.
124//
125// In total, then, the byte-level representation of a <160,4> merkle tree
126// node is as follows:
127//
128// 20 bytes - hash of the remaining bytes in the node
129// 1 byte - type of this node (manifest, file, key, mcert, fcert)
130// 1-N bytes - level of this node in the tree (0 == "root", uleb128)
131// 0-20 bytes - the prefix of this node, 4 bits * level,
132// rounded up to a byte
133// 1-N bytes - number of leaves under this node (uleb128)
134// 4 bytes - slot-state bitmap of the node
135// 0-320 bytes - between 0 and 16 live slots in the node
136//
137// So, in the worst case such a node is 367 bytes, with these parameters.
138//
139//
140// Protocol
141// --------
142//
143// The protocol is a binary command-packet system over TCP; each packet
144// consists of a single byte which identifies the protocol version, a byte
145// which identifies the command name inside that version, a size_t sent as
146// a uleb128 indicating the length of the packet, that many bytes of
147// payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload.
148// The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a
149// 20-byte random key chosen by the client after authentication (discussed
150// below). Decoding involves simply buffering until a sufficient number of
151// bytes are received, then advancing the buffer pointer. Any time an
152// integrity check (the HMAC) fails, the protocol is assumed to have lost
153// synchronization, and the connection is dropped. The parties are free to
154// drop the TCP stream at any point, if too much data is received or too
155// much idle time passes; no commitments or transactions are made.
156//
157//
158// Authentication and setup
159// ------------------------
160//
161// The exchange begins in a non-authenticated state. The server sends a
162// "hello <id> <nonce>" command, which identifies the server's RSA key and
163// issues a nonce which must be used for a subsequent authentication.
164//
165// The client then responds with either:
166//
167// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
168// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
169// role it wishes to play in the synchronization, identifies the pattern it
170// wishes to sync with, signs the previous nonce with its own key, and informs
171// the server of the HMAC key it wishes to use for this session (encrypted
172// with the server's public key); or
173//
174// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
175// <hmac key>" command, which identifies the role it wishes to play in the
176// synchronization, the pattern it wishes to sync with, and the HMAC key it
177// wishes to use for this session (also encrypted with the server's public
178// key).
179//
180// The server then replies with a "confirm" command, which contains no
181// other data but will only have the correct HMAC integrity code if the
182// server received and properly decrypted the HMAC key offered by the
183// client. This transitions the peers into an authenticated state and
184// begins epoch refinement. If epoch refinement and epoch transmission
185// succeed, the peers switch to data refinement and data transmission.
186//
187//
188// Refinement
189// ----------
190//
191// Refinement is executed by "refiners"; there is a refiner for each
192// set of 'items' being exchanged: epochs, keys, certs, and revisions.
193// When refinement starts, each party knows only their own set of
194// items; when refinement completes, each party has learned of the
195// complete set of items it needs to send, and a count of items it's
196// expecting to receive.
197//
198// For more details on the refinement process, see refiner.cc.
199//
200//
201// Transmission
202// ------------
203//
204// Once the set of items to send has been determined (for keys, certs, and
205// revisions) each peer switches into a transmission mode. This mode
206// involves walking the revision graph in ancestry-order and sending all
207// the items the local peer has which the remote one does not. Since the
208// remote and local peers both know all the items which need to be
209// transferred (they learned during refinement) they know what to wait for
210// and what to send. The mechanisms of the transmission phase (notably,
211// enumerator.cc) simply ensure that things are sent in the proper order,
212// and without over-filling the output buffer too much.
213//
214//
215// Shutdown
216// --------
217//
218// After transmission completes, one special command, "bye", is used to
219// shut down a connection gracefully. The shutdown sequence based on "bye"
220// commands is documented below in session::process_bye_cmd.
221//
222//
223// Note on epochs
224// --------------
225//
226// One refinement and transmission phase preceeds all the others: epochs.
227// Epochs are exchanged and compared in order to be sure that further
228// refinement and transmission (on certs and revisions) makes sense; they
229// are a sort of "immune system" to prevent incompatible databases (say
230// between rebuilds due to bugs in monotone) from cross-contaminating. The
231// later refinements are only kicked off *after* all epochs are received
232// and compare correctly.
233//
234//
235// Note on dense coding
236// --------------------
237//
238// This protocol is "raw binary" (non-text) because coding density is
239// actually important here, and each packet consists of very
240// information-dense material that you wouldn't have a hope of typing in,
241// or interpreting manually anyways.
242//
243
244using std::auto_ptr;
245using std::deque;
246using std::make_pair;
247using std::map;
248using std::min;
249using std::pair;
250using std::set;
251using std::string;
252using std::vector;
253
254using boost::shared_ptr;
255using boost::lexical_cast;
256
257struct server_initiated_sync_request
258{
259 string what;
260 string address;
261 string include;
262 string exclude;
263};
264deque<server_initiated_sync_request> server_initiated_sync_requests;
265LUAEXT(server_request_sync, )
266{
267 char const * w = luaL_checkstring(L, 1);
268 char const * a = luaL_checkstring(L, 2);
269 char const * i = luaL_checkstring(L, 3);
270 char const * e = luaL_checkstring(L, 4);
271 server_initiated_sync_request request;
272 request.what = string(w);
273 request.address = string(a);
274 request.include = string(i);
275 request.exclude = string(e);
276 server_initiated_sync_requests.push_back(request);
277 return 0;
278}
279
280static inline void
281require(bool check, string const & context)
282{
283 if (!check)
284 throw bad_decode(F("check of '%s' failed") % context);
285}
286
287static void
288read_pubkey(string const & in,
289 rsa_keypair_id & id,
290 rsa_pub_key & pub)
291{
292 string tmp_id, tmp_key;
293 size_t pos = 0;
294 extract_variable_length_string(in, tmp_id, pos, "pubkey id");
295 extract_variable_length_string(in, tmp_key, pos, "pubkey value");
296 id = rsa_keypair_id(tmp_id);
297 pub = rsa_pub_key(tmp_key);
298}
299
300static void
301write_pubkey(rsa_keypair_id const & id,
302 rsa_pub_key const & pub,
303 string & out)
304{
305 insert_variable_length_string(id(), out);
306 insert_variable_length_string(pub(), out);
307}
308
309struct netsync_error
310{
311 string msg;
312 netsync_error(string const & s): msg(s) {}
313};
314
315class reactable
316{
317 static unsigned int count;
318protected:
319 static unsigned int num_reactables() { return count; }
320public:
321 reactable() { ++count; }
322 virtual ~reactable()
323 {
324 I(count != 0);
325 --count;
326 }
327
328 // Handle an I/O event.
329 virtual bool do_io(Netxx::Probe::ready_type event) = 0;
330 // Can we timeout after being idle for a long time?
331 virtual bool can_timeout() = 0;
332 // Have we been idle for too long?
333 virtual bool timed_out(time_t now) = 0;
334 // Do one unit of work.
335 virtual bool do_work(transaction_guard & guard) = 0;
336 // Is there any work waiting to be done?
337 virtual bool arm() = 0;
338 // Are we a pipe pair (as opposed to a socket)?
339 // Netxx::PipeCompatibleProbe acts slightly differently, depending.
340 virtual bool is_pipe_pair() = 0;
341 // Netxx::Probe::ready() returns sockets, reactor needs to be
342 // able to map them back to reactables.
343 virtual vector<Netxx::socket_type> get_sockets() = 0;
344 // Netxx::StreamBase and Netxx::StreamServer don't have a
345 // common base, so we don't have anything we can expose to
346 // let the reactor add us to the probe itself.
347 virtual void add_to_probe(Netxx::PipeCompatibleProbe & probe) = 0;
348 virtual void remove_from_probe(Netxx::PipeCompatibleProbe & probe) = 0;
349 // Where are we talking to / listening on?
350 virtual string name() = 0;
351};
352unsigned int reactable::count = 0;
353
354class session_base : public reactable
355{
356 bool read_some();
357 bool write_some();
358 void mark_recent_io()
359 {
360 last_io_time = ::time(NULL);
361 }
362protected:
363 virtual void note_bytes_in(int count) { return; }
364 virtual void note_bytes_out(int count) { return; }
365 string_queue inbuf;
366private:
367 deque< pair<string, size_t> > outbuf;
368 size_t outbuf_size; // so we can avoid queueing up too much stuff
369protected:
370 void queue_output(string const & s)
371 {
372 outbuf.push_back(make_pair(s, 0));
373 outbuf_size += s.size();
374 }
375 bool output_overfull() const
376 {
377 return outbuf.size() > constants::bufsz * 10;
378 }
379public:
380 string peer_id;
381 string name() { return peer_id; }
382private:
383 shared_ptr<Netxx::StreamBase> str;
384 time_t last_io_time;
385public:
386
387 enum
388 {
389 working_state,
390 shutdown_state,
391 confirmed_state
392 }
393 protocol_state;
394
395 bool encountered_error;
396
397 session_base(string const & peer_id,
398 shared_ptr<Netxx::StreamBase> str) :
399 outbuf_size(0),
400 peer_id(peer_id), str(str),
401 last_io_time(::time(NULL)),
402 protocol_state(working_state),
403 encountered_error(false)
404 { }
405 virtual ~session_base()
406 { }
407 virtual bool arm() = 0;
408 virtual bool do_work(transaction_guard & guard) = 0;
409
410private:
411 Netxx::Probe::ready_type which_events();
412public:
413 virtual bool do_io(Netxx::Probe::ready_type);
414 bool can_timeout() { return true; }
415 bool timed_out(time_t now)
416 {
417 return static_cast<unsigned long>(last_io_time + constants::netsync_timeout_seconds)
418 < static_cast<unsigned long>(now);
419 }
420
421 bool is_pipe_pair()
422 {
423 return str->get_socketfd() == -1;
424 }
425 vector<Netxx::socket_type> get_sockets()
426 {
427 vector<Netxx::socket_type> out;
428 Netxx::socket_type fd = str->get_socketfd();
429 if (fd == -1)
430 {
431 shared_ptr<Netxx::PipeStream> pipe =
432 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(str);
433 I(pipe);
434 out.push_back(pipe->get_readfd());
435 out.push_back(pipe->get_writefd());
436 }
437 else
438 out.push_back(fd);
439 return out;
440 }
441 void add_to_probe(Netxx::PipeCompatibleProbe & probe)
442 {
443 probe.add(*str, which_events());
444 }
445 void remove_from_probe(Netxx::PipeCompatibleProbe & probe)
446 {
447 I(!is_pipe_pair());
448 probe.remove(*str);
449 }
450};
451
452Netxx::Probe::ready_type
453session_base::which_events()
454{
455 Netxx::Probe::ready_type ret = Netxx::Probe::ready_oobd;
456 if (!outbuf.empty())
457 {
458 L(FL("probing write on %s") % peer_id);
459 ret = ret | Netxx::Probe::ready_write;
460 }
461 // Only ask to read if we're not armed, don't go storing
462 // 128 MB at a time unless we think we need to.
463 if (inbuf.size() < constants::netcmd_maxsz && !arm())
464 {
465 L(FL("probing read on %s") % peer_id);
466 ret = ret | Netxx::Probe::ready_read;
467 }
468 return ret;
469}
470
471bool
472session_base::read_some()
473{
474 I(inbuf.size() < constants::netcmd_maxsz);
475 char tmp[constants::bufsz];
476 Netxx::signed_size_type count = str->read(tmp, sizeof(tmp));
477 if (count > 0)
478 {
479 L(FL("read %d bytes from fd %d (peer %s)")
480 % count % str->get_socketfd() % peer_id);
481 if (encountered_error)
482 {
483 L(FL("in error unwind mode, so throwing them into the bit bucket"));
484 return true;
485 }
486 inbuf.append(tmp,count);
487 mark_recent_io();
488 note_bytes_in(count);
489 return true;
490 }
491 else
492 return false;
493}
494
495bool
496session_base::write_some()
497{
498 I(!outbuf.empty());
499 size_t writelen = outbuf.front().first.size() - outbuf.front().second;
500 Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second,
501 min(writelen,
502 constants::bufsz));
503 if (count > 0)
504 {
505 if ((size_t)count == writelen)
506 {
507 outbuf_size -= outbuf.front().first.size();
508 outbuf.pop_front();
509 }
510 else
511 {
512 outbuf.front().second += count;
513 }
514 L(FL("wrote %d bytes to fd %d (peer %s)")
515 % count % str->get_socketfd() % peer_id);
516 mark_recent_io();
517 note_bytes_out(count);
518 if (encountered_error && outbuf.empty())
519 {
520 // we've flushed our error message, so it's time to get out.
521 L(FL("finished flushing output queue in error unwind mode, disconnecting"));
522 return false;
523 }
524 return true;
525 }
526 else
527 return false;
528}
529
530bool
531session_base::do_io(Netxx::Probe::ready_type what)
532{
533 bool ok = true;
534 try
535 {
536 if (what & Netxx::Probe::ready_read)
537 {
538 if (!read_some())
539 ok = false;
540 }
541 if (what & Netxx::Probe::ready_write)
542 {
543 if (!write_some())
544 ok = false;
545 }
546
547 if (what & Netxx::Probe::ready_oobd)
548 {
549 P(F("got OOB from peer %s, disconnecting")
550 % peer_id);
551 ok = false;
552 }
553 else if (!ok)
554 {
555 switch (protocol_state)
556 {
557 case working_state:
558 P(F("peer %s IO failed in working state (error)")
559 % peer_id);
560 break;
561
562 case shutdown_state:
563 P(F("peer %s IO failed in shutdown state "
564 "(possibly client misreported error)")
565 % peer_id);
566 break;
567
568 case confirmed_state:
569 P(F("peer %s IO failed in confirmed state (success)")
570 % peer_id);
571 break;
572 }
573 }
574 }
575 catch (Netxx::Exception & e)
576 {
577 P(F("Network error on peer %s, disconnecting")
578 % peer_id);
579 ok = false;
580 }
581 return ok;
582}
583
584////////////////////////////////////////////////////////////////////////
585
586class
587session:
588 public refiner_callbacks,
589 public enumerator_callbacks,
590 public session_base
591{
592 protocol_role role;
593 protocol_voice const voice;
594 globish our_include_pattern;
595 globish our_exclude_pattern;
596 globish_matcher our_matcher;
597
598 project_t & project;
599 key_store & keys;
600 lua_hooks & lua;
601 bool use_transport_auth;
602 rsa_keypair_id const & signing_key;
603 vector<rsa_keypair_id> const & keys_to_push;
604
605 netcmd cmd;
606 bool armed;
607public:
608 bool arm();
609private:
610
611 bool received_remote_key;
612 rsa_keypair_id remote_peer_key_name;
613 netsync_session_key session_key;
614 chained_hmac read_hmac;
615 chained_hmac write_hmac;
616 bool authenticated;
617
618 auto_ptr<ticker> byte_in_ticker;
619 auto_ptr<ticker> byte_out_ticker;
620 auto_ptr<ticker> cert_in_ticker;
621 auto_ptr<ticker> cert_out_ticker;
622 auto_ptr<ticker> revision_in_ticker;
623 auto_ptr<ticker> revision_out_ticker;
624 size_t bytes_in, bytes_out;
625 size_t certs_in, certs_out;
626 size_t revs_in, revs_out;
627 size_t keys_in, keys_out;
628 // used to identify this session to the netsync hooks.
629 // We can't just use saved_nonce, because that's blank for all
630 // anonymous connections and could lead to confusion.
631 size_t session_id;
632 static size_t session_count;
633
634 // These are read from the server, written to the local database
635 vector<revision_id> written_revisions;
636 vector<rsa_keypair_id> written_keys;
637 vector<cert> written_certs;
638
639 // These are sent to the server
640 vector<revision_id> sent_revisions;
641 vector<rsa_keypair_id> sent_keys;
642 vector<cert> sent_certs;
643
644 id saved_nonce;
645
646 static const int no_error = 200;
647 static const int partial_transfer = 211;
648 static const int no_transfer = 212;
649
650 static const int not_permitted = 412;
651 static const int unknown_key = 422;
652 static const int mixing_versions = 432;
653
654 static const int role_mismatch = 512;
655 static const int bad_command = 521;
656
657 static const int failed_identification = 532;
658 //static const int bad_data = 541;
659
660 int error_code;
661
662 bool set_totals;
663
664 // Interface to refinement.
665 refiner epoch_refiner;
666 refiner key_refiner;
667 refiner cert_refiner;
668 refiner rev_refiner;
669
670 // Interface to ancestry grovelling.
671 revision_enumerator rev_enumerator;
672
673 // Enumerator_callbacks methods.
674 set<file_id> file_items_sent;
675 bool process_this_rev(revision_id const & rev);
676 bool queue_this_cert(id const & c);
677 bool queue_this_file(id const & f);
678 void note_file_data(file_id const & f);
679 void note_file_delta(file_id const & src, file_id const & dst);
680 void note_rev(revision_id const & rev);
681 void note_cert(id const & c);
682
683public:
684 session(options & opts,
685 lua_hooks & lua,
686 project_t & project,
687 key_store & keys,
688 protocol_role role,
689 protocol_voice voice,
690 globish const & our_include_pattern,
691 globish const & our_exclude_pattern,
692 string const & peer,
693 shared_ptr<Netxx::StreamBase> sock,
694 bool initiated_by_server = false);
695
696 virtual ~session();
697private:
698
699 id mk_nonce();
700
701 void set_session_key(string const & key);
702 void set_session_key(rsa_oaep_sha_data const & key_encrypted);
703
704 void setup_client_tickers();
705 bool done_all_refinements();
706 bool queued_all_items();
707 bool received_all_items();
708 bool finished_working();
709 void maybe_step();
710 void maybe_say_goodbye(transaction_guard & guard);
711
712 void note_item_arrived(netcmd_item_type ty, id const & i);
713 void maybe_note_epochs_finished();
714 void note_item_sent(netcmd_item_type ty, id const & i);
715
716public:
717 bool do_work(transaction_guard & guard);
718private:
719 void note_bytes_in(int count);
720 void note_bytes_out(int count);
721
722 void error(int errcode, string const & errmsg);
723
724 void write_netcmd_and_try_flush(netcmd const & cmd);
725
726 // Outgoing queue-writers.
727 void queue_bye_cmd(u8 phase);
728 void queue_error_cmd(string const & errmsg);
729 void queue_done_cmd(netcmd_item_type type, size_t n_items);
730 void queue_hello_cmd(rsa_keypair_id const & key_name,
731 rsa_pub_key const & pub_encoded,
732 id const & nonce);
733 void queue_anonymous_cmd(protocol_role role,
734 globish const & include_pattern,
735 globish const & exclude_pattern,
736 id const & nonce2);
737 void queue_auth_cmd(protocol_role role,
738 globish const & include_pattern,
739 globish const & exclude_pattern,
740 id const & client,
741 id const & nonce1,
742 id const & nonce2,
743 rsa_sha1_signature const & signature);
744 void queue_confirm_cmd();
745 void queue_refine_cmd(refinement_type ty, merkle_node const & node);
746 void queue_data_cmd(netcmd_item_type type,
747 id const & item,
748 string const & dat);
749 void queue_delta_cmd(netcmd_item_type type,
750 id const & base,
751 id const & ident,
752 delta const & del);
753
754 // Incoming dispatch-called methods.
755 bool process_error_cmd(string const & errmsg);
756 bool process_hello_cmd(rsa_keypair_id const & server_keyname,
757 rsa_pub_key const & server_key,
758 id const & nonce);
759 bool process_bye_cmd(u8 phase, transaction_guard & guard);
760 bool process_anonymous_cmd(protocol_role role,
761 globish const & their_include_pattern,
762 globish const & their_exclude_pattern);
763 bool process_auth_cmd(protocol_role role,
764 globish const & their_include_pattern,
765 globish const & their_exclude_pattern,
766 id const & client,
767 id const & nonce1,
768 rsa_sha1_signature const & signature);
769 bool process_refine_cmd(refinement_type ty, merkle_node const & node);
770 bool process_done_cmd(netcmd_item_type type, size_t n_items);
771 bool process_data_cmd(netcmd_item_type type,
772 id const & item,
773 string const & dat);
774 bool process_delta_cmd(netcmd_item_type type,
775 id const & base,
776 id const & ident,
777 delta const & del);
778 bool process_usher_cmd(utf8 const & msg);
779
780 // The incoming dispatcher.
781 bool dispatch_payload(netcmd const & cmd,
782 transaction_guard & guard);
783
784 // Various helpers.
785 void assume_corresponding_role(protocol_role their_role);
786 void respond_to_confirm_cmd();
787 bool data_exists(netcmd_item_type type,
788 id const & item);
789 void load_data(netcmd_item_type type,
790 id const & item,
791 string & out);
792
793 void rebuild_merkle_trees(set<branch_name> const & branches);
794
795 void send_all_data(netcmd_item_type ty, set<id> const & items);
796public:
797 void begin_service();
798private:
799 bool process(transaction_guard & guard);
800
801 bool initiated_by_server;
802};
803size_t session::session_count = 0;
804
805session::session(options & opts,
806 lua_hooks & lua,
807 project_t & project,
808 key_store & keys,
809 protocol_role role,
810 protocol_voice voice,
811 globish const & our_include_pattern,
812 globish const & our_exclude_pattern,
813 string const & peer,
814 shared_ptr<Netxx::StreamBase> sock,
815 bool initiated_by_server) :
816 session_base(peer, sock),
817 role(role),
818 voice(voice),
819 our_include_pattern(our_include_pattern),
820 our_exclude_pattern(our_exclude_pattern),
821 our_matcher(our_include_pattern, our_exclude_pattern),
822 project(project),
823 keys(keys),
824 lua(lua),
825 use_transport_auth(opts.use_transport_auth),
826 signing_key(opts.signing_key),
827 keys_to_push(opts.keys_to_push),
828 armed(false),
829 received_remote_key(false),
830 remote_peer_key_name(""),
831 session_key(constants::netsync_key_initializer),
832 read_hmac(netsync_session_key(constants::netsync_key_initializer),
833 use_transport_auth),
834 write_hmac(netsync_session_key(constants::netsync_key_initializer),
835 use_transport_auth),
836 authenticated(false),
837 byte_in_ticker(NULL),
838 byte_out_ticker(NULL),
839 cert_in_ticker(NULL),
840 cert_out_ticker(NULL),
841 revision_in_ticker(NULL),
842 revision_out_ticker(NULL),
843 bytes_in(0), bytes_out(0),
844 certs_in(0), certs_out(0),
845 revs_in(0), revs_out(0),
846 keys_in(0), keys_out(0),
847 session_id(++session_count),
848 saved_nonce(""),
849 error_code(no_transfer),
850 set_totals(false),
851 epoch_refiner(epoch_item, voice, *this),
852 key_refiner(key_item, voice, *this),
853 cert_refiner(cert_item, voice, *this),
854 rev_refiner(revision_item, voice, *this),
855 rev_enumerator(project, *this),
856 initiated_by_server(initiated_by_server)
857{}
858
859session::~session()
860{
861 if (protocol_state == confirmed_state)
862 error_code = no_error;
863 else if (error_code == no_transfer &&
864 (revs_in || revs_out ||
865 certs_in || certs_out ||
866 keys_in || keys_out))
867 error_code = partial_transfer;
868
869 vector<cert> unattached_written_certs;
870 map<revision_id, vector<cert> > rev_written_certs;
871 for (vector<revision_id>::iterator i = written_revisions.begin();
872 i != written_revisions.end(); ++i)
873 rev_written_certs.insert(make_pair(*i, vector<cert>()));
874 for (vector<cert>::iterator i = written_certs.begin();
875 i != written_certs.end(); ++i)
876 {
877 map<revision_id, vector<cert> >::iterator j;
878 j = rev_written_certs.find(revision_id(i->ident));
879 if (j == rev_written_certs.end())
880 unattached_written_certs.push_back(*i);
881 else
882 j->second.push_back(*i);
883 }
884
885 if (!written_keys.empty()
886 || !written_revisions.empty()
887 || !written_certs.empty())
888 {
889
890 //Keys
891 for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
892 i != written_keys.end(); ++i)
893 {
894 lua.hook_note_netsync_pubkey_received(*i, session_id);
895 }
896
897 //Revisions
898 for (vector<revision_id>::iterator i = written_revisions.begin();
899 i != written_revisions.end(); ++i)
900 {
901 vector<cert> & ctmp(rev_written_certs[*i]);
902 set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
903 for (vector<cert>::const_iterator j = ctmp.begin();
904 j != ctmp.end(); ++j)
905 certs.insert(make_pair(j->key, make_pair(j->name, j->value)));
906
907 revision_data rdat;
908 project.db.get_revision(*i, rdat);
909 lua.hook_note_netsync_revision_received(*i, rdat, certs,
910 session_id);
911 }
912
913 //Certs (not attached to a new revision)
914 for (vector<cert>::iterator i = unattached_written_certs.begin();
915 i != unattached_written_certs.end(); ++i)
916 lua.hook_note_netsync_cert_received(revision_id(i->ident), i->key,
917 i->name, i->value, session_id);
918 }
919
920 if (!sent_keys.empty()
921 || !sent_revisions.empty()
922 || !sent_certs.empty())
923 {
924
925 vector<cert> unattached_sent_certs;
926 map<revision_id, vector<cert> > rev_sent_certs;
927 for (vector<revision_id>::iterator i = sent_revisions.begin();
928 i != sent_revisions.end(); ++i)
929 rev_sent_certs.insert(make_pair(*i, vector<cert>()));
930 for (vector<cert>::iterator i = sent_certs.begin();
931 i != sent_certs.end(); ++i)
932 {
933 map<revision_id, vector<cert> >::iterator j;
934 j = rev_sent_certs.find(revision_id(i->ident));
935 if (j == rev_sent_certs.end())
936 unattached_sent_certs.push_back(*i);
937 else
938 j->second.push_back(*i);
939 }
940
941 //Keys
942 for (vector<rsa_keypair_id>::iterator i = sent_keys.begin();
943 i != sent_keys.end(); ++i)
944 {
945 lua.hook_note_netsync_pubkey_sent(*i, session_id);
946 }
947
948 //Revisions
949 for (vector<revision_id>::iterator i = sent_revisions.begin();
950 i != sent_revisions.end(); ++i)
951 {
952 vector<cert> & ctmp(rev_sent_certs[*i]);
953 set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
954 for (vector<cert>::const_iterator j = ctmp.begin();
955 j != ctmp.end(); ++j)
956 certs.insert(make_pair(j->key, make_pair(j->name, j->value)));
957
958 revision_data rdat;
959 project.db.get_revision(*i, rdat);
960 lua.hook_note_netsync_revision_sent(*i, rdat, certs,
961 session_id);
962 }
963
964 //Certs (not attached to a new revision)
965 for (vector<cert>::iterator i = unattached_sent_certs.begin();
966 i != unattached_sent_certs.end(); ++i)
967 lua.hook_note_netsync_cert_sent(revision_id(i->ident), i->key,
968 i->name, i->value, session_id);
969 }
970
971 lua.hook_note_netsync_end(session_id, error_code,
972 bytes_in, bytes_out,
973 certs_in, certs_out,
974 revs_in, revs_out,
975 keys_in, keys_out);
976}
977
978bool
979session::process_this_rev(revision_id const & rev)
980{
981 return (rev_refiner.items_to_send.find(rev.inner())
982 != rev_refiner.items_to_send.end());
983}
984
985bool
986session::queue_this_cert(id const & c)
987{
988 return (cert_refiner.items_to_send.find(c)
989 != cert_refiner.items_to_send.end());
990}
991
992bool
993session::queue_this_file(id const & f)
994{
995 return file_items_sent.find(file_id(f)) == file_items_sent.end();
996}
997
998void
999session::note_file_data(file_id const & f)
1000{
1001 if (role == sink_role)
1002 return;
1003 file_data fd;
1004 project.db.get_file_version(f, fd);
1005 queue_data_cmd(file_item, f.inner(), fd.inner()());
1006 file_items_sent.insert(f);
1007}
1008
1009void
1010session::note_file_delta(file_id const & src, file_id const & dst)
1011{
1012 if (role == sink_role)
1013 return;
1014 file_delta fdel;
1015 project.db.get_arbitrary_file_delta(src, dst, fdel);
1016 queue_delta_cmd(file_item, src.inner(), dst.inner(), fdel.inner());
1017 file_items_sent.insert(dst);
1018}
1019
1020void
1021session::note_rev(revision_id const & rev)
1022{
1023 if (role == sink_role)
1024 return;
1025 revision_t rs;
1026 project.db.get_revision(rev, rs);
1027 data tmp;
1028 write_revision(rs, tmp);
1029 queue_data_cmd(revision_item, rev.inner(), tmp());
1030 sent_revisions.push_back(rev);
1031}
1032
1033void
1034session::note_cert(id const & c)
1035{
1036 if (role == sink_role)
1037 return;
1038 revision<cert> cert;
1039 string str;
1040 project.db.get_revision_cert(c, cert);
1041 write_cert(cert.inner(), str);
1042 queue_data_cmd(cert_item, c, str);
1043 sent_certs.push_back(cert.inner());
1044}
1045
1046
1047id
1048session::mk_nonce()
1049{
1050 I(this->saved_nonce().empty());
1051 char buf[constants::merkle_hash_length_in_bytes];
1052 keys.get_rng().randomize(reinterpret_cast<Botan::byte *>(buf),
1053 constants::merkle_hash_length_in_bytes);
1054 this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes));
1055 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1056 return this->saved_nonce;
1057}
1058
1059void
1060session::set_session_key(string const & key)
1061{
1062 session_key = netsync_session_key(key);
1063 read_hmac.set_key(session_key);
1064 write_hmac.set_key(session_key);
1065}
1066
1067void
1068session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
1069{
1070 if (use_transport_auth)
1071 {
1072 string hmac_key;
1073 keys.decrypt_rsa(signing_key, hmac_key_encrypted, hmac_key);
1074 set_session_key(hmac_key);
1075 }
1076}
1077
1078void
1079session::setup_client_tickers()
1080{
1081 // xgettext: please use short message and try to avoid multibytes chars
1082 byte_in_ticker.reset(new ticker(N_("bytes in"), ">", 1024, true));
1083 // xgettext: please use short message and try to avoid multibytes chars
1084 byte_out_ticker.reset(new ticker(N_("bytes out"), "<", 1024, true));
1085 if (role == sink_role)
1086 {
1087 // xgettext: please use short message and try to avoid multibytes chars
1088 cert_in_ticker.reset(new ticker(N_("certs in"), "c", 3));
1089 // xgettext: please use short message and try to avoid multibytes chars
1090 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
1091 }
1092 else if (role == source_role)
1093 {
1094 // xgettext: please use short message and try to avoid multibytes chars
1095 cert_out_ticker.reset(new ticker(N_("certs out"), "C", 3));
1096 // xgettext: please use short message and try to avoid multibytes chars
1097 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
1098 }
1099 else
1100 {
1101 I(role == source_and_sink_role);
1102 // xgettext: please use short message and try to avoid multibytes chars
1103 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
1104 // xgettext: please use short message and try to avoid multibytes chars
1105 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
1106 }
1107}
1108
1109bool
1110session::done_all_refinements()
1111{
1112 bool all = rev_refiner.done
1113 && cert_refiner.done
1114 && key_refiner.done
1115 && epoch_refiner.done;
1116
1117 if (all && !set_totals)
1118 {
1119 L(FL("All refinements done for peer %s") % peer_id);
1120 if (cert_out_ticker.get())
1121 cert_out_ticker->set_total(cert_refiner.items_to_send.size());
1122
1123 if (revision_out_ticker.get())
1124 revision_out_ticker->set_total(rev_refiner.items_to_send.size());
1125
1126 if (cert_in_ticker.get())
1127 cert_in_ticker->set_total(cert_refiner.items_to_receive);
1128
1129 if (revision_in_ticker.get())
1130 revision_in_ticker->set_total(rev_refiner.items_to_receive);
1131
1132 set_totals = true;
1133 }
1134 return all;
1135}
1136
1137
1138
1139bool
1140session::received_all_items()
1141{
1142 if (role == source_role)
1143 return true;
1144 bool all = rev_refiner.items_to_receive == 0
1145 && cert_refiner.items_to_receive == 0
1146 && key_refiner.items_to_receive == 0
1147 && epoch_refiner.items_to_receive == 0;
1148 return all;
1149}
1150
1151bool
1152session::finished_working()
1153{
1154 bool all = done_all_refinements()
1155 && received_all_items()
1156 && queued_all_items()
1157 && rev_enumerator.done();
1158 return all;
1159}
1160
1161bool
1162session::queued_all_items()
1163{
1164 if (role == sink_role)
1165 return true;
1166 bool all = rev_refiner.items_to_send.empty()
1167 && cert_refiner.items_to_send.empty()
1168 && key_refiner.items_to_send.empty()
1169 && epoch_refiner.items_to_send.empty();
1170 return all;
1171}
1172
1173
1174void
1175session::maybe_note_epochs_finished()
1176{
1177 // Maybe there are outstanding epoch requests.
1178 // These only matter if we're in sink or source-and-sink mode.
1179 if (!(epoch_refiner.items_to_receive == 0) && !(role == source_role))
1180 return;
1181
1182 // And maybe we haven't even finished the refinement.
1183 if (!epoch_refiner.done)
1184 return;
1185
1186 // If we ran into an error -- say a mismatched epoch -- don't do any
1187 // further refinements.
1188 if (encountered_error)
1189 return;
1190
1191 // But otherwise, we're ready to go. Start the next
1192 // set of refinements.
1193 if (voice == client_voice)
1194 {
1195 L(FL("epoch refinement finished; beginning other refinements"));
1196 key_refiner.begin_refinement();
1197 cert_refiner.begin_refinement();
1198 rev_refiner.begin_refinement();
1199 }
1200 else
1201 L(FL("epoch refinement finished"));
1202}
1203
1204static void
1205decrement_if_nonzero(netcmd_item_type ty,
1206 size_t & n)
1207{
1208 if (n == 0)
1209 {
1210 string typestr;
1211 netcmd_item_type_to_string(ty, typestr);
1212 E(false, F("underflow on count of %s items to receive") % typestr);
1213 }
1214 --n;
1215 if (n == 0)
1216 {
1217 string typestr;
1218 netcmd_item_type_to_string(ty, typestr);
1219 L(FL("count of %s items to receive has reached zero") % typestr);
1220 }
1221}
1222
1223void
1224session::note_item_arrived(netcmd_item_type ty, id const & ident)
1225{
1226 switch (ty)
1227 {
1228 case cert_item:
1229 decrement_if_nonzero(ty, cert_refiner.items_to_receive);
1230 if (cert_in_ticker.get() != NULL)
1231 ++(*cert_in_ticker);
1232 ++certs_in;
1233 break;
1234 case revision_item:
1235 decrement_if_nonzero(ty, rev_refiner.items_to_receive);
1236 if (revision_in_ticker.get() != NULL)
1237 ++(*revision_in_ticker);
1238 ++revs_in;
1239 break;
1240 case key_item:
1241 decrement_if_nonzero(ty, key_refiner.items_to_receive);
1242 ++keys_in;
1243 break;
1244 case epoch_item:
1245 decrement_if_nonzero(ty, epoch_refiner.items_to_receive);
1246 break;
1247 default:
1248 // No ticker for other things.
1249 break;
1250 }
1251}
1252
1253
1254
1255void
1256session::note_item_sent(netcmd_item_type ty, id const & ident)
1257{
1258 switch (ty)
1259 {
1260 case cert_item:
1261 cert_refiner.items_to_send.erase(ident);
1262 if (cert_out_ticker.get() != NULL)
1263 ++(*cert_out_ticker);
1264 ++certs_out;
1265 break;
1266 case revision_item:
1267 rev_refiner.items_to_send.erase(ident);
1268 if (revision_out_ticker.get() != NULL)
1269 ++(*revision_out_ticker);
1270 ++revs_out;
1271 break;
1272 case key_item:
1273 key_refiner.items_to_send.erase(ident);
1274 ++keys_out;
1275 break;
1276 case epoch_item:
1277 epoch_refiner.items_to_send.erase(ident);
1278 break;
1279 default:
1280 // No ticker for other things.
1281 break;
1282 }
1283}
1284
1285void
1286session::write_netcmd_and_try_flush(netcmd const & cmd)
1287{
1288 if (!encountered_error)
1289 {
1290 string buf;
1291 cmd.write(buf, write_hmac);
1292 queue_output(buf);
1293 }
1294 else
1295 L(FL("dropping outgoing netcmd (because we're in error unwind mode)"));
1296}
1297
1298// This method triggers a special "error unwind" mode to netsync. In this
1299// mode, all received data is ignored, and no new data is queued. We simply
1300// stay connected long enough for the current write buffer to be flushed, to
1301// ensure that our peer receives the error message.
1302// Affects read_some, write_some, and process .
1303void
1304session::error(int errcode, string const & errmsg)
1305{
1306 error_code = errcode;
1307 throw netsync_error(errmsg);
1308}
1309
1310bool
1311session::do_work(transaction_guard & guard)
1312{
1313 if (process(guard))
1314 {
1315 maybe_step();
1316 maybe_say_goodbye(guard);
1317 return true;
1318 }
1319 else
1320 return false;
1321}
1322
1323void
1324session::note_bytes_in(int count)
1325{
1326 if (byte_in_ticker.get() != NULL)
1327 (*byte_in_ticker) += count;
1328 bytes_in += count;
1329}
1330
1331void
1332session::note_bytes_out(int count)
1333{
1334 if (byte_out_ticker.get() != NULL)
1335 (*byte_out_ticker) += count;
1336 bytes_out += count;
1337}
1338
1339// senders
1340
1341void
1342session::queue_error_cmd(string const & errmsg)
1343{
1344 L(FL("queueing 'error' command"));
1345 netcmd cmd;
1346 cmd.write_error_cmd(errmsg);
1347 write_netcmd_and_try_flush(cmd);
1348}
1349
1350void
1351session::queue_bye_cmd(u8 phase)
1352{
1353 L(FL("queueing 'bye' command, phase %d")
1354 % static_cast<size_t>(phase));
1355 netcmd cmd;
1356 cmd.write_bye_cmd(phase);
1357 write_netcmd_and_try_flush(cmd);
1358}
1359
1360void
1361session::queue_done_cmd(netcmd_item_type type,
1362 size_t n_items)
1363{
1364 string typestr;
1365 netcmd_item_type_to_string(type, typestr);
1366 L(FL("queueing 'done' command for %s (%d items)")
1367 % typestr % n_items);
1368 netcmd cmd;
1369 cmd.write_done_cmd(type, n_items);
1370 write_netcmd_and_try_flush(cmd);
1371}
1372
1373void
1374session::queue_hello_cmd(rsa_keypair_id const & key_name,
1375 rsa_pub_key const & pub,
1376 id const & nonce)
1377{
1378 if (use_transport_auth)
1379 cmd.write_hello_cmd(key_name, pub, nonce);
1380 else
1381 cmd.write_hello_cmd(key_name, rsa_pub_key(), nonce);
1382 write_netcmd_and_try_flush(cmd);
1383}
1384
1385void
1386session::queue_anonymous_cmd(protocol_role role,
1387 globish const & include_pattern,
1388 globish const & exclude_pattern,
1389 id const & nonce2)
1390{
1391 netcmd cmd;
1392 rsa_oaep_sha_data hmac_key_encrypted;
1393 if (use_transport_auth)
1394 project.db.encrypt_rsa(remote_peer_key_name, nonce2(), hmac_key_encrypted);
1395 cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
1396 hmac_key_encrypted);
1397 write_netcmd_and_try_flush(cmd);
1398 set_session_key(nonce2());
1399}
1400
1401void
1402session::queue_auth_cmd(protocol_role role,
1403 globish const & include_pattern,
1404 globish const & exclude_pattern,
1405 id const & client,
1406 id const & nonce1,
1407 id const & nonce2,
1408 rsa_sha1_signature const & signature)
1409{
1410 netcmd cmd;
1411 rsa_oaep_sha_data hmac_key_encrypted;
1412 I(use_transport_auth);
1413 project.db.encrypt_rsa(remote_peer_key_name, nonce2(), hmac_key_encrypted);
1414 cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
1415 nonce1, hmac_key_encrypted, signature);
1416 write_netcmd_and_try_flush(cmd);
1417 set_session_key(nonce2());
1418}
1419
1420void
1421session::queue_confirm_cmd()
1422{
1423 netcmd cmd;
1424 cmd.write_confirm_cmd();
1425 write_netcmd_and_try_flush(cmd);
1426}
1427
1428void
1429session::queue_refine_cmd(refinement_type ty, merkle_node const & node)
1430{
1431 string typestr;
1432 hexenc<prefix> hpref;
1433 node.get_hex_prefix(hpref);
1434 netcmd_item_type_to_string(node.type, typestr);
1435 L(FL("queueing refinement %s of %s node '%s', level %d")
1436 % (ty == refinement_query ? "query" : "response")
1437 % typestr % hpref() % static_cast<int>(node.level));
1438 netcmd cmd;
1439 cmd.write_refine_cmd(ty, node);
1440 write_netcmd_and_try_flush(cmd);
1441}
1442
1443void
1444session::queue_data_cmd(netcmd_item_type type,
1445 id const & item,
1446 string const & dat)
1447{
1448 string typestr;
1449 netcmd_item_type_to_string(type, typestr);
1450 hexenc<id> hid;
1451
1452 if (global_sanity.debug_p())
1453 encode_hexenc(item, hid);
1454
1455 if (role == sink_role)
1456 {
1457 L(FL("not queueing %s data for '%s' as we are in pure sink role")
1458 % typestr % hid());
1459 return;
1460 }
1461
1462 L(FL("queueing %d bytes of data for %s item '%s'")
1463 % dat.size() % typestr % hid());
1464
1465 netcmd cmd;
1466 // TODO: This pair of functions will make two copies of a large
1467 // file, the first in cmd.write_data_cmd, and the second in
1468 // write_netcmd_and_try_flush when the data is copied from the
1469 // cmd.payload variable to the string buffer for output. This
1470 // double copy should be collapsed out, it may be better to use
1471 // a string_queue for output as well as input, as that will reduce
1472 // the amount of mallocs that happen when the string queue is large
1473 // enough to just store the data.
1474 cmd.write_data_cmd(type, item, dat);
1475 write_netcmd_and_try_flush(cmd);
1476 note_item_sent(type, item);
1477}
1478
1479void
1480session::queue_delta_cmd(netcmd_item_type type,
1481 id const & base,
1482 id const & ident,
1483 delta const & del)
1484{
1485 I(type == file_item);
1486 string typestr;
1487 netcmd_item_type_to_string(type, typestr);
1488 hexenc<id> base_hid,
1489 ident_hid;
1490
1491 if (global_sanity.debug_p())
1492 {
1493 encode_hexenc(base, base_hid);
1494 encode_hexenc(ident, ident_hid);
1495 }
1496
1497 if (role == sink_role)
1498 {
1499 L(FL("not queueing %s delta '%s' -> '%s' as we are in pure sink role")
1500 % typestr % base_hid() % ident_hid());
1501 return;
1502 }
1503
1504 L(FL("queueing %s delta '%s' -> '%s'")
1505 % typestr % base_hid() % ident_hid());
1506 netcmd cmd;
1507 cmd.write_delta_cmd(type, base, ident, del);
1508 write_netcmd_and_try_flush(cmd);
1509 note_item_sent(type, ident);
1510}
1511
1512
1513// processors
1514
1515bool
1516session::process_error_cmd(string const & errmsg)
1517{
1518 // "xxx string" with xxx being digits means there's an error code
1519 if (errmsg.size() > 4 && errmsg.substr(3,1) == " ")
1520 {
1521 try
1522 {
1523 int err = boost::lexical_cast<int>(errmsg.substr(0,3));
1524 if (err >= 100)
1525 {
1526 error_code = err;
1527 throw bad_decode(F("received network error: %s")
1528 % errmsg.substr(4));
1529 }
1530 }
1531 catch (boost::bad_lexical_cast)
1532 { // ok, so it wasn't a number
1533 }
1534 }
1535 throw bad_decode(F("received network error: %s") % errmsg);
1536}
1537
1538static const var_domain known_servers_domain = var_domain("known-servers");
1539
1540bool
1541session::process_hello_cmd(rsa_keypair_id const & their_keyname,
1542 rsa_pub_key const & their_key,
1543 id const & nonce)
1544{
1545 I(!this->received_remote_key);
1546 I(this->saved_nonce().empty());
1547
1548 if (use_transport_auth)
1549 {
1550 id their_key_hash;
1551 key_hash_code(their_keyname, their_key, their_key_hash);
1552 var_value printable_key_hash(encode_hexenc(their_key_hash()));
1553 L(FL("server key has name %s, hash %s")
1554 % their_keyname % printable_key_hash);
1555 var_key their_key_key(known_servers_domain, var_name(peer_id));
1556 if (project.db.var_exists(their_key_key))
1557 {
1558 var_value expected_key_hash;
1559 project.db.get_var(their_key_key, expected_key_hash);
1560 if (expected_key_hash != printable_key_hash)
1561 {
1562 P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1563 "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"
1564 "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1565 "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
1566 "it is also possible that the server key has just been changed\n"
1567 "remote host sent key %s\n"
1568 "I expected %s\n"
1569 "'%s unset %s %s' overrides this check")
1570 % printable_key_hash
1571 % expected_key_hash
1572 % ui.prog_name % their_key_key.first % their_key_key.second);
1573 E(false, F("server key changed"));
1574 }
1575 }
1576 else
1577 {
1578 P(F("first time connecting to server %s\n"
1579 "I'll assume it's really them, but you might want to double-check\n"
1580 "their key's fingerprint: %s")
1581 % peer_id
1582 % printable_key_hash);
1583 project.db.set_var(their_key_key, printable_key_hash);
1584 }
1585
1586 if (project.db.public_key_exists(their_keyname))
1587 {
1588 rsa_pub_key tmp;
1589 project.db.get_key(their_keyname, tmp);
1590
1591 E(keys_match(their_keyname, tmp, their_keyname, their_key),
1592 F("the server sent a key with the key id '%s'\n"
1593 "which is already in use in your database. you may want to execute\n"
1594 " %s dropkey %s\n"
1595 "on your local database before you run this command again,\n"
1596 "assuming that key currently present in your database does NOT have\n"
1597 "a private counterpart (or in other words, is one of YOUR keys)")
1598 % their_keyname % ui.prog_name % their_keyname);
1599 }
1600 else
1601 {
1602 // this should now always return true since we just checked
1603 // for the existence of this particular key
1604 I(project.db.put_key(their_keyname, their_key));
1605 W(F("saving public key for %s to database") % their_keyname);
1606 }
1607
1608 {
1609 hexenc<id> hnonce;
1610 encode_hexenc(nonce, hnonce);
1611 L(FL("received 'hello' netcmd from server '%s' with nonce '%s'")
1612 % printable_key_hash % hnonce);
1613 }
1614
1615 I(project.db.public_key_exists(their_key_hash));
1616
1617 // save their identity
1618 this->received_remote_key = true;
1619 this->remote_peer_key_name = their_keyname;
1620 }
1621
1622 // clients always include in the synchronization set, every branch that the
1623 // user requested
1624 set<branch_name> all_branches, ok_branches;
1625 project.get_branch_list(all_branches);
1626 for (set<branch_name>::const_iterator i = all_branches.begin();
1627 i != all_branches.end(); i++)
1628 {
1629 if (our_matcher((*i)()))
1630 ok_branches.insert(*i);
1631 }
1632 rebuild_merkle_trees(ok_branches);
1633
1634 if (!initiated_by_server)
1635 setup_client_tickers();
1636
1637 if (use_transport_auth && signing_key() != "")
1638 {
1639 // get our key pair
1640 load_key_pair(keys, signing_key);
1641
1642 // make a signature with it;
1643 // this also ensures our public key is in the database
1644 rsa_sha1_signature sig;
1645 keys.make_signature(project.db, signing_key, nonce(), sig);
1646
1647 // get the hash identifier for our pubkey
1648 rsa_pub_key our_pub;
1649 project.db.get_key(signing_key, our_pub);
1650 id our_key_hash_raw;
1651 key_hash_code(signing_key, our_pub, our_key_hash_raw);
1652
1653 // make a new nonce of our own and send off the 'auth'
1654 queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern,
1655 our_key_hash_raw, nonce, mk_nonce(), sig);
1656 }
1657 else
1658 {
1659 queue_anonymous_cmd(this->role, our_include_pattern,
1660 our_exclude_pattern, mk_nonce());
1661 }
1662
1663 lua.hook_note_netsync_start(session_id, "client", this->role,
1664 peer_id, their_keyname,
1665 our_include_pattern, our_exclude_pattern);
1666 return true;
1667}
1668
1669bool
1670session::process_anonymous_cmd(protocol_role their_role,
1671 globish const & their_include_pattern,
1672 globish const & their_exclude_pattern)
1673{
1674 // Internally netsync thinks in terms of sources and sinks. Users like
1675 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1676 //
1677 // We therefore use the read/write terminology when dealing with the UI:
1678 // if the user asks to run a "read only" service, this means they are
1679 // willing to be a source but not a sink.
1680 //
1681 // nb: The "role" here is the role the *client* wants to play
1682 // so we need to check that the opposite role is allowed for us,
1683 // in our this->role field.
1684 //
1685
1686 lua.hook_note_netsync_start(session_id, "server", their_role,
1687 peer_id, rsa_keypair_id(),
1688 their_include_pattern, their_exclude_pattern);
1689
1690 // Client must be a sink and server must be a source (anonymous
1691 // read-only), unless transport auth is disabled.
1692 //
1693 // If running in no-transport-auth mode, we operate anonymously and
1694 // permit adoption of any role.
1695
1696 if (use_transport_auth)
1697 {
1698 if (their_role != sink_role)
1699 {
1700 this->saved_nonce = id("");
1701 error(not_permitted,
1702 F("rejected attempt at anonymous connection for write").str());
1703 }
1704
1705 if (this->role == sink_role)
1706 {
1707 this->saved_nonce = id("");
1708 error(role_mismatch,
1709 F("rejected attempt at anonymous connection while running as sink").str());
1710 }
1711 }
1712
1713 set<branch_name> all_branches, ok_branches;
1714 project.get_branch_list(all_branches);
1715 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1716 for (set<branch_name>::const_iterator i = all_branches.begin();
1717 i != all_branches.end(); i++)
1718 {
1719 if (their_matcher((*i)()))
1720 {
1721 if (use_transport_auth &&
1722 !lua.hook_get_netsync_read_permitted((*i)()))
1723 {
1724 error(not_permitted,
1725 (F("anonymous access to branch '%s' denied by server")
1726 % *i).str());
1727 }
1728 else
1729 ok_branches.insert(*i);
1730 }
1731 }
1732
1733 if (use_transport_auth)
1734 {
1735 P(F("allowed anonymous read permission for '%s' excluding '%s'")
1736 % their_include_pattern % their_exclude_pattern);
1737 this->role = source_role;
1738 }
1739 else
1740 {
1741 P(F("allowed anonymous read/write permission for '%s' excluding '%s'")
1742 % their_include_pattern % their_exclude_pattern);
1743 assume_corresponding_role(their_role);
1744 }
1745
1746 rebuild_merkle_trees(ok_branches);
1747
1748 this->remote_peer_key_name = rsa_keypair_id("");
1749 this->authenticated = true;
1750 return true;
1751}
1752
1753void
1754session::assume_corresponding_role(protocol_role their_role)
1755{
1756 // Assume the (possibly degraded) opposite role.
1757 switch (their_role)
1758 {
1759 case source_role:
1760 I(this->role != source_role);
1761 this->role = sink_role;
1762 break;
1763
1764 case source_and_sink_role:
1765 I(this->role == source_and_sink_role);
1766 break;
1767
1768 case sink_role:
1769 I(this->role != sink_role);
1770 this->role = source_role;
1771 break;
1772 }
1773}
1774
1775bool
1776session::process_auth_cmd(protocol_role their_role,
1777 globish const & their_include_pattern,
1778 globish const & their_exclude_pattern,
1779 id const & client,
1780 id const & nonce1,
1781 rsa_sha1_signature const & signature)
1782{
1783 I(!this->received_remote_key);
1784 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1785
1786 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1787
1788 if (!project.db.public_key_exists(client))
1789 {
1790 // If it's not in the db, it still could be in the keystore if we
1791 // have the private key that goes with it.
1792 rsa_keypair_id their_key_id;
1793 keypair their_keypair;
1794 if (keys.maybe_get_key_pair(client, their_key_id, their_keypair))
1795 project.db.put_key(their_key_id, their_keypair.pub);
1796 else
1797 {
1798 return process_anonymous_cmd(their_role,
1799 their_include_pattern,
1800 their_exclude_pattern);
1801 /*
1802 this->saved_nonce = id("");
1803
1804 lua.hook_note_netsync_start(session_id, "server", their_role,
1805 peer_id, rsa_keypair_id("-unknown-"),
1806 their_include_pattern,
1807 their_exclude_pattern);
1808 error(unknown_key,
1809 (F("remote public key hash '%s' is unknown")
1810 % encode_hexenc(client())).str());
1811 */
1812 }
1813 }
1814
1815 // Get their public key.
1816 rsa_keypair_id their_id;
1817 rsa_pub_key their_key;
1818 project.db.get_pubkey(client, their_id, their_key);
1819
1820 lua.hook_note_netsync_start(session_id, "server", their_role,
1821 peer_id, their_id,
1822 their_include_pattern, their_exclude_pattern);
1823
1824 // Check that they replied with the nonce we asked for.
1825 if (!(nonce1 == this->saved_nonce))
1826 {
1827 this->saved_nonce = id("");
1828 error(failed_identification,
1829 F("detected replay attack in auth netcmd").str());
1830 }
1831
1832 // Internally netsync thinks in terms of sources and sinks. users like
1833 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1834 //
1835 // We therefore use the read/write terminology when dealing with the UI:
1836 // if the user asks to run a "read only" service, this means they are
1837 // willing to be a source but not a sink.
1838 //
1839 // nb: The "their_role" here is the role the *client* wants to play
1840 // so we need to check that the opposite role is allowed for us,
1841 // in our this->role field.
1842
1843 // Client as sink, server as source (reading).
1844
1845 if (their_role == sink_role || their_role == source_and_sink_role)
1846 {
1847 if (this->role != source_role && this->role != source_and_sink_role)
1848 {
1849 this->saved_nonce = id("");
1850 error(not_permitted,
1851 (F("denied '%s' read permission for '%s' excluding '%s' while running as pure sink")
1852 % their_id % their_include_pattern % their_exclude_pattern).str());
1853 }
1854 }
1855
1856 set<branch_name> all_branches, ok_branches;
1857 project.get_branch_list(all_branches);
1858 for (set<branch_name>::const_iterator i = all_branches.begin();
1859 i != all_branches.end(); i++)
1860 {
1861 if (their_matcher((*i)()))
1862 {
1863 if (!lua.hook_get_netsync_read_permitted((*i)(), their_id))
1864 {
1865 error(not_permitted,
1866 (F("denied '%s' read permission for '%s' excluding '%s' because of branch '%s'")
1867 % their_id % their_include_pattern % their_exclude_pattern % *i).str());
1868 }
1869 else
1870 ok_branches.insert(*i);
1871 }
1872 }
1873
1874 // If we're source_and_sink_role, continue even with no branches readable
1875 // eg. serve --db=empty.db
1876 P(F("allowed '%s' read permission for '%s' excluding '%s'")
1877 % their_id % their_include_pattern % their_exclude_pattern);
1878
1879 // Client as source, server as sink (writing).
1880
1881 if (their_role == source_role || their_role == source_and_sink_role)
1882 {
1883 if (this->role != sink_role && this->role != source_and_sink_role)
1884 {
1885 this->saved_nonce = id("");
1886 error(not_permitted,
1887 (F("denied '%s' write permission for '%s' excluding '%s' while running as pure source")
1888 % their_id % their_include_pattern % their_exclude_pattern).str());
1889 }
1890
1891 if (!lua.hook_get_netsync_write_permitted(their_id))
1892 {
1893 this->saved_nonce = id("");
1894 error(not_permitted,
1895 (F("denied '%s' write permission for '%s' excluding '%s'")
1896 % their_id % their_include_pattern % their_exclude_pattern).str());
1897 }
1898
1899 P(F("allowed '%s' write permission for '%s' excluding '%s'")
1900 % their_id % their_include_pattern % their_exclude_pattern);
1901 }
1902
1903 rebuild_merkle_trees(ok_branches);
1904
1905 this->received_remote_key = true;
1906
1907 // Check the signature.
1908 if (project.db.check_signature(their_id, nonce1(), signature) == cert_ok)
1909 {
1910 // Get our private key and sign back.
1911 L(FL("client signature OK, accepting authentication"));
1912 this->authenticated = true;
1913 this->remote_peer_key_name = their_id;
1914
1915 assume_corresponding_role(their_role);
1916 return true;
1917 }
1918 else
1919 {
1920 error(failed_identification, (F("bad client signature")).str());
1921 }
1922 return false;
1923}
1924
1925bool
1926session::process_refine_cmd(refinement_type ty, merkle_node const & node)
1927{
1928 string typestr;
1929 netcmd_item_type_to_string(node.type, typestr);
1930 L(FL("processing refine cmd for %s node at level %d")
1931 % typestr % node.level);
1932
1933 switch (node.type)
1934 {
1935 case file_item:
1936 W(F("Unexpected 'refine' command on non-refined item type"));
1937 break;
1938
1939 case key_item:
1940 key_refiner.process_refinement_command(ty, node);
1941 break;
1942
1943 case revision_item:
1944 rev_refiner.process_refinement_command(ty, node);
1945 break;
1946
1947 case cert_item:
1948 cert_refiner.process_refinement_command(ty, node);
1949 break;
1950
1951 case epoch_item:
1952 epoch_refiner.process_refinement_command(ty, node);
1953 break;
1954 }
1955 return true;
1956}
1957
1958bool
1959session::process_bye_cmd(u8 phase,
1960 transaction_guard & guard)
1961{
1962
1963// Ideal shutdown
1964// ~~~~~~~~~~~~~~~
1965//
1966// I/O events state transitions
1967// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
1968// client: C_WORKING
1969// server: S_WORKING
1970// 0. [refinement, data, deltas, etc.]
1971// client: C_SHUTDOWN
1972// (client checkpoints here)
1973// 1. client -> "bye 0"
1974// 2. "bye 0" -> server
1975// server: S_SHUTDOWN
1976// (server checkpoints here)
1977// 3. "bye 1" <- server
1978// 4. client <- "bye 1"
1979// client: C_CONFIRMED
1980// 5. client -> "bye 2"
1981// 6. "bye 2" -> server
1982// server: S_CONFIRMED
1983// 7. [server drops connection]
1984//
1985//
1986// Affects of I/O errors or disconnections
1987// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1988// C_WORKING: report error and fault
1989// S_WORKING: report error and recover
1990// C_SHUTDOWN: report error and fault
1991// S_SHUTDOWN: report success and recover
1992// (and warn that client might falsely see error)
1993// C_CONFIRMED: report success
1994// S_CONFIRMED: report success
1995
1996 switch (phase)
1997 {
1998 case 0:
1999 if (voice == server_voice &&
2000 protocol_state == working_state)
2001 {
2002 protocol_state = shutdown_state;
2003 guard.do_checkpoint();
2004 queue_bye_cmd(1);
2005 }
2006 else
2007 error(bad_command, "unexpected bye phase 0 received");
2008 break;
2009
2010 case 1:
2011 if (voice == client_voice &&
2012 protocol_state == shutdown_state)
2013 {
2014 protocol_state = confirmed_state;
2015 queue_bye_cmd(2);
2016 }
2017 else
2018 error(bad_command, "unexpected bye phase 1 received");
2019 break;
2020
2021 case 2:
2022 if (voice == server_voice &&
2023 protocol_state == shutdown_state)
2024 {
2025 protocol_state = confirmed_state;
2026 return false;
2027 }
2028 else
2029 error(bad_command, "unexpected bye phase 2 received");
2030 break;
2031
2032 default:
2033 error(bad_command, (F("unknown bye phase %d received") % phase).str());
2034 }
2035
2036 return true;
2037}
2038
2039bool
2040session::process_done_cmd(netcmd_item_type type, size_t n_items)
2041{
2042 string typestr;
2043 netcmd_item_type_to_string(type, typestr);
2044 L(FL("received 'done' command for %s (%s items)") % typestr % n_items);
2045 switch (type)
2046 {
2047 case file_item:
2048 W(F("Unexpected 'done' command on non-refined item type"));
2049 break;
2050
2051 case key_item:
2052 key_refiner.process_done_command(n_items);
2053 if (key_refiner.done && role != sink_role)
2054 send_all_data(key_item, key_refiner.items_to_send);
2055 break;
2056
2057 case revision_item:
2058 rev_refiner.process_done_command(n_items);
2059 break;
2060
2061 case cert_item:
2062 cert_refiner.process_done_command(n_items);
2063 break;
2064
2065 case epoch_item:
2066 epoch_refiner.process_done_command(n_items);
2067 if (epoch_refiner.done)
2068 {
2069 send_all_data(epoch_item, epoch_refiner.items_to_send);
2070 maybe_note_epochs_finished();
2071 }
2072 break;
2073 }
2074 return true;
2075}
2076
2077void
2078session::respond_to_confirm_cmd()
2079{
2080 epoch_refiner.begin_refinement();
2081}
2082
2083bool
2084session::data_exists(netcmd_item_type type,
2085 id const & item)
2086{
2087 switch (type)
2088 {
2089 case key_item:
2090 return key_refiner.local_item_exists(item)
2091 || project.db.public_key_exists(item);
2092 case file_item:
2093 return project.db.file_version_exists(file_id(item));
2094 case revision_item:
2095 return rev_refiner.local_item_exists(item)
2096 || project.db.revision_exists(revision_id(item));
2097 case cert_item:
2098 return cert_refiner.local_item_exists(item)
2099 || project.db.revision_cert_exists(revision_id(item));
2100 case epoch_item:
2101 return epoch_refiner.local_item_exists(item)
2102 || project.db.epoch_exists(epoch_id(item));
2103 }
2104 return false;
2105}
2106
2107void
2108session::load_data(netcmd_item_type type,
2109 id const & item,
2110 string & out)
2111{
2112 string typestr;
2113 netcmd_item_type_to_string(type, typestr);
2114 hexenc<id> hitem;
2115 encode_hexenc(item, hitem);
2116
2117 if (!data_exists(type, item))
2118 throw bad_decode(F("%s with hash '%s' does not exist in our database")
2119 % typestr % hitem());
2120
2121 switch (type)
2122 {
2123 case epoch_item:
2124 {
2125 branch_name branch;
2126 epoch_data epoch;
2127 project.db.get_epoch(epoch_id(item), branch, epoch);
2128 write_epoch(branch, epoch, out);
2129 }
2130 break;
2131 case key_item:
2132 {
2133 rsa_keypair_id keyid;
2134 rsa_pub_key pub;
2135 project.db.get_pubkey(item, keyid, pub);
2136 L(FL("public key '%s' is also called '%s'") % hitem() % keyid);
2137 write_pubkey(keyid, pub, out);
2138 sent_keys.push_back(keyid);
2139 }
2140 break;
2141
2142 case revision_item:
2143 {
2144 revision_data mdat;
2145 data dat;
2146 project.db.get_revision(revision_id(item), mdat);
2147 out = mdat.inner()();
2148 }
2149 break;
2150
2151 case file_item:
2152 {
2153 file_data fdat;
2154 data dat;
2155 project.db.get_file_version(file_id(item), fdat);
2156 out = fdat.inner()();
2157 }
2158 break;
2159
2160 case cert_item:
2161 {
2162 revision<cert> c;
2163 project.db.get_revision_cert(item, c);
2164 string tmp;
2165 write_cert(c.inner(), out);
2166 }
2167 break;
2168 }
2169}
2170
2171bool
2172session::process_data_cmd(netcmd_item_type type,
2173 id const & item,
2174 string const & dat)
2175{
2176 hexenc<id> hitem;
2177 encode_hexenc(item, hitem);
2178
2179 string typestr;
2180 netcmd_item_type_to_string(type, typestr);
2181
2182 note_item_arrived(type, item);
2183 if (data_exists(type, item))
2184 {
2185 L(FL("%s '%s' already exists in our database") % typestr % hitem());
2186 if (type == epoch_item)
2187 maybe_note_epochs_finished();
2188 return true;
2189 }
2190
2191 switch (type)
2192 {
2193 case epoch_item:
2194 {
2195 branch_name branch;
2196 epoch_data epoch;
2197 read_epoch(dat, branch, epoch);
2198 L(FL("received epoch %s for branch %s")
2199 % encode_hexenc(epoch.inner()()) % branch);
2200 map<branch_name, epoch_data> epochs;
2201 project.db.get_epochs(epochs);
2202 map<branch_name, epoch_data>::const_iterator i;
2203 i = epochs.find(branch);
2204 if (i == epochs.end())
2205 {
2206 L(FL("branch %s has no epoch; setting epoch to %s")
2207 % branch % encode_hexenc(epoch.inner()()));
2208 project.db.set_epoch(branch, epoch);
2209 }
2210 else
2211 {
2212 L(FL("branch %s already has an epoch; checking") % branch);
2213 // If we get here, then we know that the epoch must be
2214 // different, because if it were the same then the
2215 // if (epoch_exists()) branch up above would have been taken.
2216 // if somehow this is wrong, then we have broken epoch
2217 // hashing or something, which is very dangerous, so play it
2218 // safe...
2219 I(!(i->second == epoch));
2220
2221 // It is safe to call 'error' here, because if we get here,
2222 // then the current netcmd packet cannot possibly have
2223 // written anything to the database.
2224 error(mixing_versions,
2225 (F("Mismatched epoch on branch %s."
2226 " Server has '%s', client has '%s'.")
2227 % branch
2228 % encode_hexenc((voice == server_voice
2229 ? i->second: epoch).inner()())
2230 % encode_hexenc((voice == server_voice
2231 ? epoch : i->second).inner()())).str());
2232 }
2233 }
2234 maybe_note_epochs_finished();
2235 break;
2236
2237 case key_item:
2238 {
2239 rsa_keypair_id keyid;
2240 rsa_pub_key pub;
2241 read_pubkey(dat, keyid, pub);
2242 id tmp;
2243 key_hash_code(keyid, pub, tmp);
2244 if (! (tmp == item))
2245 {
2246 throw bad_decode(F("hash check failed for public key '%s' (%s);"
2247 " wanted '%s' got '%s'")
2248 % hitem() % keyid % hitem()
2249 % encode_hexenc(tmp()));
2250 }
2251 if (project.db.put_key(keyid, pub))
2252 written_keys.push_back(keyid);
2253 else
2254 error(partial_transfer,
2255 (F("Received duplicate key %s") % keyid).str());
2256 }
2257 break;
2258
2259 case cert_item:
2260 {
2261 cert c;
2262 read_cert(dat, c);
2263 id tmp;
2264 cert_hash_code(c, tmp);
2265 if (! (tmp == item))
2266 throw bad_decode(F("hash check failed for revision cert '%s'") % hitem());
2267 if (project.db.put_revision_cert(revision<cert>(c)))
2268 written_certs.push_back(c);
2269 }
2270 break;
2271
2272 case revision_item:
2273 {
2274 L(FL("received revision '%s'") % hitem());
2275 revision_t rev;
2276 read_revision(data(dat, made_from_network), rev);
2277 if (project.db.put_revision(revision_id(item), rev))
2278 written_revisions.push_back(revision_id(item));
2279 }
2280 break;
2281
2282 case file_item:
2283 {
2284 L(FL("received file '%s'") % hitem());
2285 project.db.put_file(file_id(item),
2286 file_data(dat, made_from_network));
2287 }
2288 break;
2289 }
2290 return true;
2291}
2292
2293bool
2294session::process_delta_cmd(netcmd_item_type type,
2295 id const & base,
2296 id const & ident,
2297 delta const & del)
2298{
2299 string typestr;
2300 netcmd_item_type_to_string(type, typestr);
2301
2302 pair<id,id> id_pair = make_pair(base, ident);
2303
2304 note_item_arrived(type, ident);
2305
2306 switch (type)
2307 {
2308 case file_item:
2309 {
2310 file_id src_file(base), dst_file(ident);
2311 project.db.put_file_version(src_file, dst_file, file_delta(del));
2312 }
2313 break;
2314
2315 default:
2316 L(FL("ignoring delta received for item type %s") % typestr);
2317 break;
2318 }
2319 return true;
2320}
2321
2322bool
2323session::process_usher_cmd(utf8 const & msg)
2324{
2325 if (msg().size())
2326 {
2327 if (msg()[0] == '!')
2328 P(F("Received warning from usher: %s") % msg().substr(1));
2329 else
2330 L(FL("Received greeting from usher: %s") % msg().substr(1));
2331 }
2332 netcmd cmdout;
2333 cmdout.write_usher_reply_cmd(utf8(peer_id), our_include_pattern);
2334 write_netcmd_and_try_flush(cmdout);
2335 L(FL("Sent reply."));
2336 return true;
2337}
2338
2339
2340void
2341session::send_all_data(netcmd_item_type ty, set<id> const & items)
2342{
2343 string typestr;
2344 netcmd_item_type_to_string(ty, typestr);
2345
2346 // Use temporary; passed arg will be invalidated during iteration.
2347 set<id> tmp = items;
2348
2349 for (set<id>::const_iterator i = tmp.begin();
2350 i != tmp.end(); ++i)
2351 {
2352 if (data_exists(ty, *i))
2353 {
2354 string out;
2355 load_data(ty, *i, out);
2356 queue_data_cmd(ty, *i, out);
2357 }
2358 }
2359}
2360
2361bool
2362session::dispatch_payload(netcmd const & cmd,
2363 transaction_guard & guard)
2364{
2365
2366 switch (cmd.get_cmd_code())
2367 {
2368
2369 case error_cmd:
2370 {
2371 string errmsg;
2372 cmd.read_error_cmd(errmsg);
2373 return process_error_cmd(errmsg);
2374 }
2375 break;
2376
2377 case hello_cmd:
2378 require(! authenticated, "hello netcmd received when not authenticated");
2379 require(voice == client_voice, "hello netcmd received in client voice");
2380 {
2381 rsa_keypair_id server_keyname;
2382 rsa_pub_key server_key;
2383 id nonce;
2384 cmd.read_hello_cmd(server_keyname, server_key, nonce);
2385 return process_hello_cmd(server_keyname, server_key, nonce);
2386 }
2387 break;
2388
2389 case bye_cmd:
2390 require(authenticated, "bye netcmd received when not authenticated");
2391 {
2392 u8 phase;
2393 cmd.read_bye_cmd(phase);
2394 return process_bye_cmd(phase, guard);
2395 }
2396 break;
2397
2398 case anonymous_cmd:
2399 require(! authenticated, "anonymous netcmd received when not authenticated");
2400 require(voice == server_voice, "anonymous netcmd received in server voice");
2401 require(role == source_role ||
2402 role == source_and_sink_role,
2403 "anonymous netcmd received in source or source/sink role");
2404 {
2405 protocol_role role;
2406 globish their_include_pattern, their_exclude_pattern;
2407 rsa_oaep_sha_data hmac_key_encrypted;
2408 cmd.read_anonymous_cmd(role, their_include_pattern, their_exclude_pattern, hmac_key_encrypted);
2409 L(FL("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' "
2410 "in %s mode\n")
2411 % their_include_pattern % their_exclude_pattern
2412 % (role == source_and_sink_role ? _("source and sink") :
2413 (role == source_role ? _("source") : _("sink"))));
2414
2415 set_session_key(hmac_key_encrypted);
2416 if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern))
2417 return false;
2418 queue_confirm_cmd();
2419 return true;
2420 }
2421 break;
2422
2423 case auth_cmd:
2424 require(! authenticated, "auth netcmd received when not authenticated");
2425 require(voice == server_voice, "auth netcmd received in server voice");
2426 {
2427 protocol_role role;
2428 rsa_sha1_signature signature;
2429 globish their_include_pattern, their_exclude_pattern;
2430 id client, nonce1, nonce2;
2431 rsa_oaep_sha_data hmac_key_encrypted;
2432 cmd.read_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2433 client, nonce1, hmac_key_encrypted, signature);
2434
2435 hexenc<id> their_key_hash;
2436 encode_hexenc(client, their_key_hash);
2437 hexenc<id> hnonce1;
2438 encode_hexenc(nonce1, hnonce1);
2439
2440 L(FL("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' "
2441 "exclude '%s' in %s mode with nonce1 '%s'\n")
2442 % their_key_hash % their_include_pattern % their_exclude_pattern
2443 % (role == source_and_sink_role ? _("source and sink") :
2444 (role == source_role ? _("source") : _("sink")))
2445 % hnonce1);
2446
2447 set_session_key(hmac_key_encrypted);
2448
2449 if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2450 client, nonce1, signature))
2451 return false;
2452 queue_confirm_cmd();
2453 return true;
2454 }
2455 break;
2456
2457 case confirm_cmd:
2458 require(! authenticated, "confirm netcmd received when not authenticated");
2459 require(voice == client_voice, "confirm netcmd received in client voice");
2460 {
2461 string signature;
2462 cmd.read_confirm_cmd();
2463 this->authenticated = true;
2464 respond_to_confirm_cmd();
2465 return true;
2466 }
2467 break;
2468
2469 case refine_cmd:
2470 require(authenticated, "refine netcmd received when authenticated");
2471 {
2472 merkle_node node;
2473 refinement_type ty;
2474 cmd.read_refine_cmd(ty, node);
2475 return process_refine_cmd(ty, node);
2476 }
2477 break;
2478
2479 case done_cmd:
2480 require(authenticated, "done netcmd received when not authenticated");
2481 {
2482 size_t n_items;
2483 netcmd_item_type type;
2484 cmd.read_done_cmd(type, n_items);
2485 return process_done_cmd(type, n_items);
2486 }
2487 break;
2488
2489 case data_cmd:
2490 require(authenticated, "data netcmd received when not authenticated");
2491 require(role == sink_role ||
2492 role == source_and_sink_role,
2493 "data netcmd received in source or source/sink role");
2494 {
2495 netcmd_item_type type;
2496 id item;
2497 string dat;
2498 cmd.read_data_cmd(type, item, dat);
2499 return process_data_cmd(type, item, dat);
2500 }
2501 break;
2502
2503 case delta_cmd:
2504 require(authenticated, "delta netcmd received when not authenticated");
2505 require(role == sink_role ||
2506 role == source_and_sink_role,
2507 "delta netcmd received in source or source/sink role");
2508 {
2509 netcmd_item_type type;
2510 id base, ident;
2511 delta del;
2512 cmd.read_delta_cmd(type, base, ident, del);
2513 return process_delta_cmd(type, base, ident, del);
2514 }
2515 break;
2516
2517 case usher_cmd:
2518 {
2519 utf8 greeting;
2520 cmd.read_usher_cmd(greeting);
2521 return process_usher_cmd(greeting);
2522 }
2523 break;
2524
2525 case usher_reply_cmd:
2526 return false; // Should not happen.
2527 break;
2528 }
2529 return false;
2530}
2531
2532// This kicks off the whole cascade starting from "hello".
2533void
2534session::begin_service()
2535{
2536 keypair kp;
2537 if (use_transport_auth)
2538 keys.get_key_pair(signing_key, kp);
2539 queue_hello_cmd(signing_key, kp.pub, mk_nonce());
2540}
2541
2542void
2543session::maybe_step()
2544{
2545 while (done_all_refinements()
2546 && !rev_enumerator.done()
2547 && !output_overfull())
2548 {
2549 rev_enumerator.step();
2550 }
2551}
2552
2553void
2554session::maybe_say_goodbye(transaction_guard & guard)
2555{
2556 if (voice == client_voice
2557 && protocol_state == working_state
2558 && finished_working())
2559 {
2560 protocol_state = shutdown_state;
2561 guard.do_checkpoint();
2562 queue_bye_cmd(0);
2563 }
2564}
2565
2566bool
2567session::arm()
2568{
2569 if (!armed)
2570 {
2571 // Don't pack the buffer unnecessarily.
2572 if (output_overfull())
2573 return false;
2574
2575 if (cmd.read(inbuf, read_hmac))
2576 {
2577 armed = true;
2578 }
2579 }
2580 return armed;
2581}
2582
2583bool session::process(transaction_guard & guard)
2584{
2585 if (encountered_error)
2586 return true;
2587 try
2588 {
2589 if (!arm())
2590 return true;
2591
2592 armed = false;
2593 L(FL("processing %d byte input buffer from peer %s")
2594 % inbuf.size() % peer_id);
2595
2596 size_t sz = cmd.encoded_size();
2597 bool ret = dispatch_payload(cmd, guard);
2598
2599 if (inbuf.size() >= constants::netcmd_maxsz)
2600 W(F("input buffer for peer %s is overfull "
2601 "after netcmd dispatch") % peer_id);
2602
2603 guard.maybe_checkpoint(sz);
2604
2605 if (!ret)
2606 L(FL("peer %s finishing processing with '%d' packet")
2607 % peer_id % cmd.get_cmd_code());
2608 return ret;
2609 }
2610 catch (bad_decode & bd)
2611 {
2612 W(F("protocol error while processing peer %s: '%s'")
2613 % peer_id % bd.what);
2614 return false;
2615 }
2616 catch (netsync_error & err)
2617 {
2618 W(F("error: %s") % err.msg);
2619 queue_error_cmd(boost::lexical_cast<string>(error_code) + " " + err.msg);
2620 encountered_error = true;
2621 return true; // Don't terminate until we've send the error_cmd.
2622 }
2623}
2624
2625static shared_ptr<Netxx::StreamServer>
2626make_server(std::list<utf8> const & addresses,
2627 Netxx::port_type default_port,
2628 Netxx::Timeout timeout,
2629 bool use_ipv6,
2630 Netxx::Address & addr)
2631{
2632 try
2633 {
2634 addr = Netxx::Address(use_ipv6);
2635
2636 if (addresses.empty())
2637 addr.add_all_addresses(default_port);
2638 else
2639 {
2640 for (std::list<utf8>::const_iterator it = addresses.begin();
2641 it != addresses.end(); ++it)
2642 {
2643 const utf8 & address = *it;
2644 if (!address().empty())
2645 {
2646 size_t l_colon = address().find(':');
2647 size_t r_colon = address().rfind(':');
2648
2649 if (l_colon == r_colon && l_colon == 0)
2650 {
2651 // can't be an IPv6 address as there is only one colon
2652 // must be a : followed by a port
2653 string port_str = address().substr(1);
2654 addr.add_all_addresses(std::atoi(port_str.c_str()));
2655 }
2656 else
2657 addr.add_address(address().c_str(), default_port);
2658 }
2659 }
2660 }
2661 shared_ptr<Netxx::StreamServer> ret(new Netxx::StreamServer(addr, timeout));
2662
2663 char const * name;
2664 name = addr.get_name();
2665 P(F("beginning service on %s : %s")
2666 % (name != NULL ? name : _("<all interfaces>"))
2667 % lexical_cast<string>(addr.get_port()));
2668
2669 return ret;
2670 }
2671 // If we use IPv6 and the initialisation of server fails, we want
2672 // to try again with IPv4. The reason is that someone may have
2673 // downloaded a IPv6-enabled monotone on a system that doesn't
2674 // have IPv6, and which might fail therefore.
2675 catch(Netxx::NetworkException & e)
2676 {
2677 if (use_ipv6)
2678 return make_server(addresses, default_port, timeout, false, addr);
2679 else
2680 throw;
2681 }
2682 catch(Netxx::Exception & e)
2683 {
2684 if (use_ipv6)
2685 return make_server(addresses, default_port, timeout, false, addr);
2686 else
2687 throw;
2688 }
2689}
2690
2691class reactor;
2692
2693class listener_base : public reactable
2694{
2695protected:
2696 shared_ptr<Netxx::StreamServer> srv;
2697public:
2698 listener_base(shared_ptr<Netxx::StreamServer> srv)
2699 : srv(srv)
2700 {
2701 }
2702 virtual ~listener_base()
2703 {
2704 }
2705 virtual bool do_io(Netxx::Probe::ready_type event) = 0;
2706 bool timed_out(time_t now) { return false; }
2707 bool do_work(transaction_guard & guard) { return true; }
2708 bool arm() { return false; }
2709 bool can_timeout() { return false; }
2710
2711 string name() { return ""; } // FIXME
2712
2713 bool is_pipe_pair()
2714 {
2715 return false;
2716 }
2717 vector<Netxx::socket_type> get_sockets()
2718 {
2719 return srv->get_probe_info()->get_sockets();
2720 }
2721 void add_to_probe(Netxx::PipeCompatibleProbe & probe)
2722 {
2723 if (num_reactables() >= constants::netsync_connection_limit)
2724 {
2725 W(F("session limit %d reached, some connections "
2726 "will be refused") % constants::netsync_connection_limit);
2727 }
2728 else
2729 {
2730 probe.add(*srv);
2731 }
2732 }
2733 void remove_from_probe(Netxx::PipeCompatibleProbe & probe)
2734 {
2735 probe.remove(*srv);
2736 }
2737};
2738
2739class listener : public listener_base
2740{
2741 options & opts;
2742 lua_hooks & lua;
2743 project_t & project;
2744 key_store & keys;
2745
2746 reactor & react;
2747
2748 protocol_role role;
2749 Netxx::Timeout timeout;
2750
2751 shared_ptr<transaction_guard> &guard;
2752 Netxx::Address addr;
2753public:
2754
2755 listener(options & opts,
2756 lua_hooks & lua,
2757 project_t & project,
2758 key_store & keys,
2759 reactor & react,
2760 protocol_role role,
2761 std::list<utf8> const & addresses,
2762 shared_ptr<transaction_guard> &guard,
2763 bool use_ipv6)
2764 : listener_base(shared_ptr<Netxx::StreamServer>()),
2765 opts(opts), lua(lua), project(project), keys(keys),
2766 react(react), role(role),
2767 timeout(static_cast<long>(constants::netsync_timeout_seconds)),
2768 guard(guard),
2769 addr(use_ipv6)
2770 {
2771 srv = make_server(addresses, constants::netsync_default_port,
2772 timeout, use_ipv6, addr);
2773 }
2774
2775 bool do_io(Netxx::Probe::ready_type event);
2776};
2777
2778class reactor
2779{
2780 bool have_pipe;
2781 Netxx::Timeout forever, timeout, instant;
2782 bool can_have_timeout;
2783
2784 Netxx::PipeCompatibleProbe probe;
2785 set<shared_ptr<reactable> > items;
2786
2787 map<Netxx::socket_type, shared_ptr<reactable> > lookup;
2788
2789 bool readying;
2790 int have_armed;
2791 void ready_for_io(shared_ptr<reactable> item, transaction_guard & guard)
2792 {
2793 if (item->do_work(guard))
2794 {
2795 try
2796 {
2797 if (item->arm())
2798 {
2799 ++have_armed;
2800 }
2801 item->add_to_probe(probe);
2802 vector<Netxx::socket_type> ss = item->get_sockets();
2803 for (vector<Netxx::socket_type>::iterator i = ss.begin();
2804 i != ss.end(); ++i)
2805 {
2806 lookup.insert(make_pair(*i, item));
2807 }
2808 if (item->can_timeout())
2809 can_have_timeout = true;
2810 }
2811 catch (bad_decode & bd)
2812 {
2813 W(F("protocol error while processing peer %s: '%s'")
2814 % item->name() % bd.what);
2815 remove(item);
2816 }
2817 }
2818 else
2819 {
2820 remove(item);
2821 }
2822 }
2823public:
2824 reactor()
2825 : have_pipe(false),
2826 timeout(static_cast<long>(constants::netsync_timeout_seconds)),
2827 instant(0,1),
2828 readying(false),
2829 have_armed(0)
2830 { }
2831 void add(shared_ptr<reactable> item, transaction_guard & guard)
2832 {
2833 I(!have_pipe);
2834 if (item->is_pipe_pair())
2835 {
2836 I(items.size() == 0);
2837 have_pipe = true;
2838 }
2839 items.insert(item);
2840 if (readying)
2841 ready_for_io(item, guard);
2842 }
2843 void remove(shared_ptr<reactable> item)
2844 {
2845 set<shared_ptr<reactable> >::iterator i = items.find(item);
2846 if (i != items.end())
2847 {
2848 items.erase(i);
2849 have_pipe = false;
2850 }
2851 }
2852
2853 int size() const
2854 {
2855 return items.size();
2856 }
2857
2858 void ready(transaction_guard & guard)
2859 {
2860 readying = true;
2861 have_armed = 0;
2862 can_have_timeout = false;
2863
2864 probe.clear();
2865 lookup.clear();
2866 set<shared_ptr<reactable> > todo = items;
2867 for (set<shared_ptr<reactable> >::iterator i = todo.begin();
2868 i != todo.end(); ++i)
2869 {
2870 ready_for_io(*i, guard);
2871 }
2872 }
2873 bool do_io()
2874 {
2875 // so it doesn't get reset under us if we drop the session
2876 bool pipe = have_pipe;
2877 readying = false;
2878 bool timed_out = true;
2879 Netxx::Timeout how_long;
2880 if (!can_have_timeout)
2881 how_long = forever;
2882 else if (have_armed > 0)
2883 {
2884 how_long = instant;
2885 timed_out = false;
2886 }
2887 else
2888 how_long = timeout;
2889
2890 L(FL("i/o probe with %d armed") % have_armed);
2891 Netxx::socket_type fd;
2892 do
2893 {
2894 Netxx::Probe::result_type res = probe.ready(how_long);
2895 how_long = instant;
2896 fd = res.first;
2897 Netxx::Probe::ready_type event = res.second;
2898
2899 if (fd == -1)
2900 break;
2901
2902 timed_out = false;
2903
2904 map<Netxx::socket_type, shared_ptr<reactable> >::iterator r
2905 = lookup.find(fd);
2906 if (r != lookup.end())
2907 {
2908 if (items.find(r->second) != items.end())
2909 {
2910 if (!r->second->do_io(event))
2911 {
2912 remove(r->second);
2913 }
2914 }
2915 else
2916 {
2917 L(FL("Got i/o on dead peer %s") % r->second->name());
2918 }
2919 if (!pipe)
2920 r->second->remove_from_probe(probe);
2921 }
2922 else
2923 {
2924 L(FL("got woken up for action on unknown fd %d") % fd);
2925 }
2926 }
2927 while (fd != -1 && !pipe);
2928 return !timed_out;
2929 }
2930 void prune()
2931 {
2932 time_t now = ::time(NULL);
2933 set<shared_ptr<reactable> > todo = items;
2934 for (set<shared_ptr<reactable> >::iterator i = todo.begin();
2935 i != todo.end(); ++i)
2936 {
2937 if ((*i)->timed_out(now))
2938 {
2939 P(F("peer %s has been idle too long, disconnecting")
2940 % (*i)->name());
2941 remove(*i);
2942 }
2943 }
2944 }
2945};
2946
2947bool
2948listener::do_io(Netxx::Probe::ready_type event)
2949{
2950 L(FL("accepting new connection on %s : %s")
2951 % (addr.get_name()?addr.get_name():"") % lexical_cast<string>(addr.get_port()));
2952 Netxx::Peer client = srv->accept_connection();
2953
2954 if (!client)
2955 {
2956 L(FL("accept() returned a dead client"));
2957 }
2958 else
2959 {
2960 P(F("accepted new client connection from %s : %s")
2961 % client.get_address() % lexical_cast<string>(client.get_port()));
2962
2963 // 'false' here means not to revert changes when the SockOpt
2964 // goes out of scope.
2965 Netxx::SockOpt socket_options(client.get_socketfd(), false);
2966 socket_options.set_non_blocking();
2967
2968 shared_ptr<Netxx::Stream> str =
2969 shared_ptr<Netxx::Stream>
2970 (new Netxx::Stream(client.get_socketfd(), timeout));
2971
2972 shared_ptr<session> sess(new session(opts, lua, project, keys,
2973 role, server_voice,
2974 globish("*"), globish(""),
2975 lexical_cast<string>(client), str));
2976 sess->begin_service();
2977 I(guard);
2978 react.add(sess, *guard);
2979 }
2980 return true;
2981}
2982
2983
2984static shared_ptr<Netxx::StreamBase>
2985build_stream_to_server(options & opts, lua_hooks & lua,
2986 netsync_connection_info info,
2987 Netxx::port_type default_port,
2988 Netxx::Timeout timeout)
2989{
2990 shared_ptr<Netxx::StreamBase> server;
2991
2992 if (info.client.use_argv)
2993 {
2994 I(!info.client.argv.empty());
2995 string cmd = info.client.argv[0];
2996 info.client.argv.erase(info.client.argv.begin());
2997 return shared_ptr<Netxx::StreamBase>
2998 (new Netxx::PipeStream(cmd, info.client.argv));
2999 }
3000 else
3001 {
3002#ifdef USE_IPV6
3003 bool use_ipv6=true;
3004#else
3005 bool use_ipv6=false;
3006#endif
3007 string host(info.client.u.host);
3008 if (host.empty())
3009 host = info.client.unparsed();
3010 if (!info.client.u.port.empty())
3011 default_port = lexical_cast<Netxx::port_type>(info.client.u.port);
3012 Netxx::Address addr(info.client.unparsed().c_str(),
3013 default_port, use_ipv6);
3014 return shared_ptr<Netxx::StreamBase>
3015 (new Netxx::Stream(addr, timeout));
3016 }
3017}
3018
3019static void
3020call_server(options & opts,
3021 lua_hooks & lua,
3022 project_t & project,
3023 key_store & keys,
3024 protocol_role role,
3025 netsync_connection_info const & info)
3026{
3027 Netxx::PipeCompatibleProbe probe;
3028 transaction_guard guard(project.db);
3029
3030 Netxx::Timeout timeout(static_cast<long>(constants::netsync_timeout_seconds)),
3031 instant(0,1);
3032
3033 P(F("connecting to %s") % info.client.unparsed);
3034
3035 shared_ptr<Netxx::StreamBase> server
3036 = build_stream_to_server(opts, lua,
3037 info, constants::netsync_default_port,
3038 timeout);
3039
3040
3041 // 'false' here means not to revert changes when the SockOpt
3042 // goes out of scope.
3043 Netxx::SockOpt socket_options(server->get_socketfd(), false);
3044 socket_options.set_non_blocking();
3045
3046 shared_ptr<session> sess(new session(opts, lua, project, keys,
3047 role, client_voice,
3048 info.client.include_pattern,
3049 info.client.exclude_pattern,
3050 info.client.unparsed(), server));
3051
3052 reactor react;
3053 react.add(sess, guard);
3054
3055 while (true)
3056 {
3057 react.ready(guard);
3058
3059 if (react.size() == 0)
3060 {
3061 // Commit whatever work we managed to accomplish anyways.
3062 guard.commit();
3063
3064 // We failed during processing. This should only happen in
3065 // client voice when we have a decode exception, or received an
3066 // error from our server (which is translated to a decode
3067 // exception). We call these cases E() errors.
3068 E(false, F("processing failure while talking to "
3069 "peer %s, disconnecting")
3070 % sess->peer_id);
3071 return;
3072 }
3073
3074 bool io_ok = react.do_io();
3075
3076 E(io_ok, (F("timed out waiting for I/O with "
3077 "peer %s, disconnecting")
3078 % sess->peer_id));
3079
3080 if (react.size() == 0)
3081 {
3082 // Commit whatever work we managed to accomplish anyways.
3083 guard.commit();
3084
3085 // We had an I/O error. We must decide if this represents a
3086 // user-reported error or a clean disconnect. See protocol
3087 // state diagram in session::process_bye_cmd.
3088
3089 if (sess->protocol_state == session::confirmed_state)
3090 {
3091 P(F("successful exchange with %s")
3092 % sess->peer_id);
3093 return;
3094 }
3095 else if (sess->encountered_error)
3096 {
3097 P(F("peer %s disconnected after we informed them of error")
3098 % sess->peer_id);
3099 return;
3100 }
3101 else
3102 E(false, (F("I/O failure while talking to "
3103 "peer %s, disconnecting")
3104 % sess->peer_id));
3105 }
3106 }
3107}
3108
3109static shared_ptr<session>
3110session_from_server_sync_item(options & opts,
3111 lua_hooks & lua,
3112 project_t & project,
3113 key_store & keys,
3114 server_initiated_sync_request const & request)
3115{
3116 netsync_connection_info info;
3117 info.client.unparsed = utf8(request.address);
3118 info.client.include_pattern = globish(request.include);
3119 info.client.exclude_pattern = globish(request.exclude);
3120 info.client.use_argv = false;
3121 parse_uri(info.client.unparsed(), info.client.u);
3122
3123 try
3124 {
3125 P(F("connecting to %s") % info.client.unparsed);
3126 shared_ptr<Netxx::StreamBase> server
3127 = build_stream_to_server(opts, lua,
3128 info, constants::netsync_default_port,
3129 Netxx::Timeout(constants::netsync_timeout_seconds));
3130
3131 // 'false' here means not to revert changes when
3132 // the SockOpt goes out of scope.
3133 Netxx::SockOpt socket_options(server->get_socketfd(), false);
3134 socket_options.set_non_blocking();
3135
3136 protocol_role role = source_and_sink_role;
3137 if (request.what == "sync")
3138 role = source_and_sink_role;
3139 else if (request.what == "push")
3140 role = source_role;
3141 else if (request.what == "pull")
3142 role = sink_role;
3143
3144 shared_ptr<session> sess(new session(opts, lua,
3145 project, keys,
3146 role, client_voice,
3147 info.client.include_pattern,
3148 info.client.exclude_pattern,
3149 info.client.unparsed(),
3150 server, true));
3151
3152 return sess;
3153 }
3154 catch (Netxx::NetworkException & e)
3155 {
3156 P(F("Network error: %s") % e.what());
3157 return shared_ptr<session>();
3158 }
3159}
3160
3161static void
3162serve_connections(options & opts,
3163 lua_hooks & lua,
3164 project_t & project,
3165 key_store & keys,
3166 protocol_role role,
3167 std::list<utf8> const & addresses)
3168{
3169#ifdef USE_IPV6
3170 bool use_ipv6=true;
3171#else
3172 bool use_ipv6=false;
3173#endif
3174
3175 shared_ptr<transaction_guard> guard(new transaction_guard(project.db));
3176
3177 reactor react;
3178 shared_ptr<listener> listen(new listener(opts, lua, project, keys,
3179 react, role, addresses,
3180 guard, use_ipv6));
3181 react.add(listen, *guard);
3182
3183
3184 while (true)
3185 {
3186 if (!guard)
3187 guard = shared_ptr<transaction_guard>
3188 (new transaction_guard(project.db));
3189 I(guard);
3190
3191 react.ready(*guard);
3192
3193 while (!server_initiated_sync_requests.empty())
3194 {
3195 server_initiated_sync_request request
3196 = server_initiated_sync_requests.front();
3197 server_initiated_sync_requests.pop_front();
3198 shared_ptr<session> sess
3199 = session_from_server_sync_item(opts, lua, project, keys,
3200 request);
3201
3202 if (sess)
3203 {
3204 react.add(sess, *guard);
3205 L(FL("Opened connection to %s") % sess->peer_id);
3206 }
3207 }
3208
3209 react.do_io();
3210
3211 react.prune();
3212
3213 if (react.size() == 1 /* 1 listener + 0 sessions */)
3214 {
3215 // Let the guard die completely if everything's gone quiet.
3216 guard->commit();
3217 guard.reset();
3218 }
3219 }
3220}
3221
3222static void
3223serve_single_connection(project_t & project,
3224 shared_ptr<session> sess)
3225{
3226 sess->begin_service();
3227 P(F("beginning service on %s") % sess->peer_id);
3228
3229 transaction_guard guard(project.db);
3230
3231 reactor react;
3232 react.add(sess, guard);
3233
3234 while (react.size() > 0)
3235 {
3236 react.ready(guard);
3237 react.do_io();
3238 react.prune();
3239 }
3240 guard.commit();
3241}
3242
3243
3244void
3245insert_with_parents(revision_id rev,
3246 refiner & ref,
3247 revision_enumerator & rev_enumerator,
3248 set<revision_id> & revs,
3249 ticker & revisions_ticker)
3250{
3251 deque<revision_id> work;
3252 work.push_back(rev);
3253 while (!work.empty())
3254 {
3255 revision_id rid = work.front();
3256 work.pop_front();
3257
3258 if (!null_id(rid) && revs.find(rid) == revs.end())
3259 {
3260 revs.insert(rid);
3261 ++revisions_ticker;
3262 ref.note_local_item(rid.inner());
3263 vector<revision_id> parents;
3264 rev_enumerator.get_revision_parents(rid, parents);
3265 for (vector<revision_id>::const_iterator i = parents.begin();
3266 i != parents.end(); ++i)
3267 {
3268 work.push_back(*i);
3269 }
3270 }
3271 }
3272}
3273
3274void
3275session::rebuild_merkle_trees(set<branch_name> const & branchnames)
3276{
3277 P(F("finding items to synchronize:"));
3278 for (set<branch_name>::const_iterator i = branchnames.begin();
3279 i != branchnames.end(); ++i)
3280 L(FL("including branch %s") % *i);
3281
3282 // xgettext: please use short message and try to avoid multibytes chars
3283 ticker revisions_ticker(N_("revisions"), "r", 64);
3284 // xgettext: please use short message and try to avoid multibytes chars
3285 ticker certs_ticker(N_("certificates"), "c", 256);
3286 // xgettext: please use short message and try to avoid multibytes chars
3287 ticker keys_ticker(N_("keys"), "k", 1);
3288
3289 set<revision_id> revision_ids;
3290 set<rsa_keypair_id> inserted_keys;
3291
3292 {
3293 for (set<branch_name>::const_iterator i = branchnames.begin();
3294 i != branchnames.end(); ++i)
3295 {
3296 // Get branch certs.
3297 vector< revision<cert> > certs;
3298 project.get_branch_certs(*i, certs);
3299 for (vector< revision<cert> >::const_iterator j = certs.begin();
3300 j != certs.end(); j++)
3301 {
3302 revision_id rid(j->inner().ident);
3303 insert_with_parents(rid, rev_refiner, rev_enumerator,
3304 revision_ids, revisions_ticker);
3305 // Branch certs go in here, others later on.
3306 id item;
3307 cert_hash_code(j->inner(), item);
3308 cert_refiner.note_local_item(item);
3309 rev_enumerator.note_cert(rid, item);
3310 if (inserted_keys.find(j->inner().key) == inserted_keys.end())
3311 inserted_keys.insert(j->inner().key);
3312 }
3313 }
3314 }
3315
3316 {
3317 map<branch_name, epoch_data> epochs;
3318 project.db.get_epochs(epochs);
3319
3320 epoch_data epoch_zero(string(constants::epochlen_bytes, '\x00'));
3321 for (set<branch_name>::const_iterator i = branchnames.begin();
3322 i != branchnames.end(); ++i)
3323 {
3324 branch_name const & branch(*i);
3325 map<branch_name, epoch_data>::const_iterator j;
3326 j = epochs.find(branch);
3327
3328 // Set to zero any epoch which is not yet set.
3329 if (j == epochs.end())
3330 {
3331 L(FL("setting epoch on %s to zero") % branch);
3332 epochs.insert(make_pair(branch, epoch_zero));
3333 project.db.set_epoch(branch, epoch_zero);
3334 }
3335
3336 // Then insert all epochs into merkle tree.
3337 j = epochs.find(branch);
3338 I(j != epochs.end());
3339 epoch_id eid;
3340 epoch_hash_code(j->first, j->second, eid);
3341 epoch_refiner.note_local_item(eid.inner());
3342 }
3343 }
3344
3345 {
3346 typedef vector< pair<revision_id,
3347 pair<revision_id, rsa_keypair_id> > > cert_idx;
3348
3349 cert_idx idx;
3350 project.db.get_revision_cert_nobranch_index(idx);
3351
3352 // Insert all non-branch certs reachable via these revisions
3353 // (branch certs were inserted earlier).
3354
3355 for (cert_idx::const_iterator i = idx.begin(); i != idx.end(); ++i)
3356 {
3357 revision_id const & hash = i->first;
3358 revision_id const & ident = i->second.first;
3359 rsa_keypair_id const & key = i->second.second;
3360
3361 rev_enumerator.note_cert(ident, hash.inner());
3362
3363 if (revision_ids.find(ident) == revision_ids.end())
3364 continue;
3365
3366 cert_refiner.note_local_item(hash.inner());
3367 ++certs_ticker;
3368 if (inserted_keys.find(key) == inserted_keys.end())
3369 inserted_keys.insert(key);
3370 }
3371 }
3372
3373 // Add any keys specified on the command line.
3374 for (vector<rsa_keypair_id>::const_iterator key
3375 = keys_to_push.begin();
3376 key != keys_to_push.end(); ++key)
3377 {
3378 if (inserted_keys.find(*key) == inserted_keys.end())
3379 {
3380 if (!project.db.public_key_exists(*key))
3381 {
3382 keypair kp;
3383 if (keys.maybe_get_key_pair(*key, kp))
3384 project.db.put_key(*key, kp.pub);
3385 else
3386 W(F("Cannot find key '%s'") % *key);
3387 }
3388 inserted_keys.insert(*key);
3389 }
3390 }
3391
3392 // Insert all the keys.
3393 for (set<rsa_keypair_id>::const_iterator key = inserted_keys.begin();
3394 key != inserted_keys.end(); key++)
3395 {
3396 if (project.db.public_key_exists(*key))
3397 {
3398 rsa_pub_key pub;
3399 project.db.get_key(*key, pub);
3400 id keyhash;
3401 key_hash_code(*key, pub, keyhash);
3402
3403 if (global_sanity.debug_p())
3404 L(FL("noting key '%s' = '%s' to send")
3405 % *key
3406 % encode_hexenc(keyhash()));
3407
3408 key_refiner.note_local_item(keyhash);
3409 ++keys_ticker;
3410 }
3411 }
3412
3413 rev_refiner.reindex_local_items();
3414 cert_refiner.reindex_local_items();
3415 key_refiner.reindex_local_items();
3416 epoch_refiner.reindex_local_items();
3417}
3418
3419void
3420run_netsync_protocol(options & opts, lua_hooks & lua,
3421 project_t & project, key_store & keys,
3422 protocol_voice voice,
3423 protocol_role role,
3424 netsync_connection_info const & info)
3425{
3426 if (info.client.include_pattern().find_first_of("'\"") != string::npos)
3427 {
3428 W(F("include branch pattern contains a quote character:\n"
3429 "%s") % info.client.include_pattern());
3430 }
3431
3432 if (info.client.exclude_pattern().find_first_of("'\"") != string::npos)
3433 {
3434 W(F("exclude branch pattern contains a quote character:\n"
3435 "%s") % info.client.exclude_pattern());
3436 }
3437
3438 // We do not want to be killed by SIGPIPE from a network disconnect.
3439 ignore_sigpipe();
3440
3441 try
3442 {
3443 if (voice == server_voice)
3444 {
3445 if (opts.bind_stdio)
3446 {
3447 shared_ptr<Netxx::PipeStream> str(new Netxx::PipeStream(0,1));
3448 shared_ptr<session> sess(new session(opts, lua, project, keys,
3449 role, server_voice,
3450 globish("*"), globish(""),
3451 "stdio", str));
3452 serve_single_connection(project, sess);
3453 }
3454 else
3455 serve_connections(opts, lua, project, keys,
3456 role, info.server.addrs);
3457 }
3458 else
3459 {
3460 I(voice == client_voice);
3461 call_server(opts, lua, project, keys,
3462 role, info);
3463 }
3464 }
3465 catch (Netxx::NetworkException & e)
3466 {
3467 throw informative_failure((F("network error: %s") % e.what()).str());
3468 }
3469 catch (Netxx::Exception & e)
3470 {
3471 throw oops((F("network error: %s") % e.what()).str());;
3472 }
3473}
3474
3475// Local Variables:
3476// mode: C++
3477// fill-column: 76
3478// c-file-style: "gnu"
3479// indent-tabs-mode: nil
3480// End:
3481// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status