monotone

monotone Mtn Source Tree

Root/netsync.cc

1// Copyright (C) 2008 Stephen Leake <stephen_leake@stephe-leake.org>
2// Copyright (C) 2004 Graydon Hoare <graydon@pobox.com>
3//
4// This program is made available under the GNU GPL version 2.0 or
5// greater. See the accompanying file COPYING for details.
6//
7// This program is distributed WITHOUT ANY WARRANTY; without even the
8// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
9// PURPOSE.
10
11#include "base.hh"
12#include <map>
13#include <cstdlib>
14#include <memory>
15#include <list>
16#include <deque>
17#include <stack>
18
19#include <time.h>
20
21#include "lexical_cast.hh"
22#include <boost/scoped_ptr.hpp>
23#include <boost/shared_ptr.hpp>
24#include <boost/bind.hpp>
25
26#include "lua_hooks.hh"
27#include "key_store.hh"
28#include "project.hh"
29#include "database.hh"
30#include "cert.hh"
31#include "constants.hh"
32#include "enumerator.hh"
33#include "keys.hh"
34#include "lua.hh"
35#include "merkle_tree.hh"
36#include "netcmd.hh"
37#include "netio.hh"
38#include "numeric_vocab.hh"
39#include "refiner.hh"
40#include "revision.hh"
41#include "sanity.hh"
42#include "transforms.hh"
43#include "ui.hh"
44#include "xdelta.hh"
45#include "epoch.hh"
46#include "platform.hh"
47#include "hmac.hh"
48#include "globish.hh"
49#include "uri.hh"
50#include "options.hh"
51
52#include "botan/botan.h"
53
54#include "netxx/address.h"
55#include "netxx/peer.h"
56#include "netxx/probe.h"
57#include "netxx/socket.h"
58#include "netxx/sockopt.h"
59#include "netxx/stream.h"
60#include "netxx/streamserver.h"
61#include "netxx/timeout.h"
62#include "netxx_pipe.hh"
63// TODO: things to do that will break protocol compatibility
64// -- need some way to upgrade anonymous to keyed pull, without user having
65// to explicitly specify which they want
66// just having a way to respond "access denied, try again" might work
67// but perhaps better to have the anonymous command include a note "I
68// _could_ use key <...> if you prefer", and if that would lead to more
69// access, could reply "I do prefer". (Does this lead to too much
70// information exposure? Allows anonymous people to probe what branches
71// a key has access to.)
72// -- "warning" packet type?
73// -- Richard Levitte wants, when you (e.g.) request '*' but don't have
74// access to all of it, you just get the parts you have access to
75// (maybe with warnings about skipped branches). to do this right,
76// should have a way for the server to send back to the client "right,
77// you're not getting the following branches: ...", so the client will
78// not include them in its merkle trie.
79// -- add some sort of vhost field to the client's first packet, saying who
80// they expect to talk to
81
82//
83// This is the "new" network synchronization (netsync) system in
84// monotone. It is based on synchronizing pairs of merkle trees over an
85// interactive connection.
86//
87// A netsync process between peers treats each peer as either a source, a
88// sink, or both. When a peer is only a source, it will not write any new
89// items to its database. when a peer is only a sink, it will not send any
90// items from its database. When a peer is both a source and sink, it may
91// send and write items freely.
92//
93// The post-state of a netsync is that each sink contains a superset of the
94// items in its corresponding source; when peers are behaving as both
95// source and sink, this means that the post-state of the sync is for the
96// peers to have identical item sets.
97//
98//
99// Data structure
100// --------------
101//
102// Each node in a merkle tree contains a fixed number of slots. this number
103// is derived from a global parameter of the protocol -- the tree fanout --
104// such that the number of slots is 2^fanout. For now we will assume that
105// fanout is 4 thus there are 16 slots in a node, because this makes
106// illustration easier. The other parameter of the protocol is the size of
107// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
108//
109// Each slot in a merkle tree node is in one of 3 states:
110//
111// - empty
112// - leaf
113// - subtree
114//
115// In addition, each leaf contains a hash code which identifies an element
116// of the set being synchronized. Each subtree slot contains a hash code of
117// the node immediately beneath it in the merkle tree. Empty slots contain
118// no hash codes.
119//
120// Since empty slots have no hash code, they are represented implicitly by
121// a bitmap at the head of each merkle tree node. As an additional
122// integrity check, each merkle tree node contains a label indicating its
123// prefix in the tree, and a hash of its own contents.
124//
125// In total, then, the byte-level representation of a <160,4> merkle tree
126// node is as follows:
127//
128// 20 bytes - hash of the remaining bytes in the node
129// 1 byte - type of this node (manifest, file, key, mcert, fcert)
130// 1-N bytes - level of this node in the tree (0 == "root", uleb128)
131// 0-20 bytes - the prefix of this node, 4 bits * level,
132// rounded up to a byte
133// 1-N bytes - number of leaves under this node (uleb128)
134// 4 bytes - slot-state bitmap of the node
135// 0-320 bytes - between 0 and 16 live slots in the node
136//
137// So, in the worst case such a node is 367 bytes, with these parameters.
138//
139//
140// Protocol
141// --------
142//
143// The protocol is a binary command-packet system over TCP; each packet
144// consists of a single byte which identifies the protocol version, a byte
145// which identifies the command name inside that version, a size_t sent as
146// a uleb128 indicating the length of the packet, that many bytes of
147// payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload.
148// The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a
149// 20-byte random key chosen by the client after authentication (discussed
150// below). Decoding involves simply buffering until a sufficient number of
151// bytes are received, then advancing the buffer pointer. Any time an
152// integrity check (the HMAC) fails, the protocol is assumed to have lost
153// synchronization, and the connection is dropped. The parties are free to
154// drop the TCP stream at any point, if too much data is received or too
155// much idle time passes; no commitments or transactions are made.
156//
157//
158// Authentication and setup
159// ------------------------
160//
161// The exchange begins in a non-authenticated state. The server sends a
162// "hello <id> <nonce>" command, which identifies the server's RSA key and
163// issues a nonce which must be used for a subsequent authentication.
164//
165// The client then responds with either:
166//
167// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
168// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
169// role it wishes to play in the synchronization, identifies the pattern it
170// wishes to sync with, signs the previous nonce with its own key, and informs
171// the server of the HMAC key it wishes to use for this session (encrypted
172// with the server's public key); or
173//
174// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
175// <hmac key>" command, which identifies the role it wishes to play in the
176// synchronization, the pattern it wishes to sync with, and the HMAC key it
177// wishes to use for this session (also encrypted with the server's public
178// key).
179//
180// The server then replies with a "confirm" command, which contains no
181// other data but will only have the correct HMAC integrity code if the
182// server received and properly decrypted the HMAC key offered by the
183// client. This transitions the peers into an authenticated state and
184// begins epoch refinement. If epoch refinement and epoch transmission
185// succeed, the peers switch to data refinement and data transmission.
186//
187//
188// Refinement
189// ----------
190//
191// Refinement is executed by "refiners"; there is a refiner for each
192// set of 'items' being exchanged: epochs, keys, certs, and revisions.
193// When refinement starts, each party knows only their own set of
194// items; when refinement completes, each party has learned of the
195// complete set of items it needs to send, and a count of items it's
196// expecting to receive.
197//
198// For more details on the refinement process, see refiner.cc.
199//
200//
201// Transmission
202// ------------
203//
204// Once the set of items to send has been determined (for keys, certs, and
205// revisions) each peer switches into a transmission mode. This mode
206// involves walking the revision graph in ancestry-order and sending all
207// the items the local peer has which the remote one does not. Since the
208// remote and local peers both know all the items which need to be
209// transferred (they learned during refinement) they know what to wait for
210// and what to send. The mechanisms of the transmission phase (notably,
211// enumerator.cc) simply ensure that things are sent in the proper order,
212// and without over-filling the output buffer too much.
213//
214//
215// Shutdown
216// --------
217//
218// After transmission completes, one special command, "bye", is used to
219// shut down a connection gracefully. The shutdown sequence based on "bye"
220// commands is documented below in session::process_bye_cmd.
221//
222//
223// Note on epochs
224// --------------
225//
226// One refinement and transmission phase preceeds all the others: epochs.
227// Epochs are exchanged and compared in order to be sure that further
228// refinement and transmission (on certs and revisions) makes sense; they
229// are a sort of "immune system" to prevent incompatible databases (say
230// between rebuilds due to bugs in monotone) from cross-contaminating. The
231// later refinements are only kicked off *after* all epochs are received
232// and compare correctly.
233//
234//
235// Note on dense coding
236// --------------------
237//
238// This protocol is "raw binary" (non-text) because coding density is
239// actually important here, and each packet consists of very
240// information-dense material that you wouldn't have a hope of typing in,
241// or interpreting manually anyways.
242//
243
244using std::auto_ptr;
245using std::deque;
246using std::make_pair;
247using std::map;
248using std::min;
249using std::pair;
250using std::set;
251using std::string;
252using std::vector;
253
254using boost::shared_ptr;
255using boost::lexical_cast;
256
257struct server_initiated_sync_request
258{
259 string what;
260 string address;
261 string include;
262 string exclude;
263};
264deque<server_initiated_sync_request> server_initiated_sync_requests;
265LUAEXT(server_request_sync, )
266{
267 char const * w = luaL_checkstring(L, 1);
268 char const * a = luaL_checkstring(L, 2);
269 char const * i = luaL_checkstring(L, 3);
270 char const * e = luaL_checkstring(L, 4);
271 server_initiated_sync_request request;
272 request.what = string(w);
273 request.address = string(a);
274 request.include = string(i);
275 request.exclude = string(e);
276 server_initiated_sync_requests.push_back(request);
277 return 0;
278}
279
280static inline void
281require(bool check, string const & context)
282{
283 if (!check)
284 throw bad_decode(F("check of '%s' failed") % context);
285}
286
287static void
288read_pubkey(string const & in,
289 rsa_keypair_id & id,
290 rsa_pub_key & pub)
291{
292 string tmp_id, tmp_key;
293 size_t pos = 0;
294 extract_variable_length_string(in, tmp_id, pos, "pubkey id");
295 extract_variable_length_string(in, tmp_key, pos, "pubkey value");
296 id = rsa_keypair_id(tmp_id);
297 pub = rsa_pub_key(tmp_key);
298}
299
300static void
301write_pubkey(rsa_keypair_id const & id,
302 rsa_pub_key const & pub,
303 string & out)
304{
305 insert_variable_length_string(id(), out);
306 insert_variable_length_string(pub(), out);
307}
308
309struct netsync_error
310{
311 string msg;
312 netsync_error(string const & s): msg(s) {}
313};
314
315struct
316session:
317 public refiner_callbacks,
318 public enumerator_callbacks
319{
320 protocol_role role;
321 protocol_voice const voice;
322 globish our_include_pattern;
323 globish our_exclude_pattern;
324 globish_matcher our_matcher;
325
326 project_t & project;
327 key_store & keys;
328 lua_hooks & lua;
329 bool use_transport_auth;
330 rsa_keypair_id const & signing_key;
331 vector<rsa_keypair_id> const & keys_to_push;
332
333 string peer_id;
334 shared_ptr<Netxx::StreamBase> str;
335
336 string_queue inbuf;
337 // deque of pair<string data, size_t cur_pos>
338 deque< pair<string,size_t> > outbuf;
339 // the total data stored in outbuf - this is
340 // used as a valve to stop too much data
341 // backing up
342 size_t outbuf_size;
343
344 netcmd cmd;
345 bool armed;
346 bool arm();
347
348 bool received_remote_key;
349 rsa_keypair_id remote_peer_key_name;
350 netsync_session_key session_key;
351 chained_hmac read_hmac;
352 chained_hmac write_hmac;
353 bool authenticated;
354
355 time_t last_io_time;
356 auto_ptr<ticker> byte_in_ticker;
357 auto_ptr<ticker> byte_out_ticker;
358 auto_ptr<ticker> cert_in_ticker;
359 auto_ptr<ticker> cert_out_ticker;
360 auto_ptr<ticker> revision_in_ticker;
361 auto_ptr<ticker> revision_out_ticker;
362 size_t bytes_in, bytes_out;
363 size_t certs_in, certs_out;
364 size_t revs_in, revs_out;
365 size_t keys_in, keys_out;
366 // used to identify this session to the netsync hooks.
367 // We can't just use saved_nonce, because that's blank for all
368 // anonymous connections and could lead to confusion.
369 size_t session_id;
370 static size_t session_count;
371
372 // These are read from the server, written to the local database
373 vector<revision_id> written_revisions;
374 vector<rsa_keypair_id> written_keys;
375 vector<cert> written_certs;
376
377 // These are sent to the server
378 vector<revision_id> sent_revisions;
379 vector<rsa_keypair_id> sent_keys;
380 vector<cert> sent_certs;
381
382 id saved_nonce;
383
384 enum
385 {
386 working_state,
387 shutdown_state,
388 confirmed_state
389 }
390 protocol_state;
391
392 bool encountered_error;
393
394 static const int no_error = 200;
395 static const int partial_transfer = 211;
396 static const int no_transfer = 212;
397
398 static const int not_permitted = 412;
399 static const int unknown_key = 422;
400 static const int mixing_versions = 432;
401
402 static const int role_mismatch = 512;
403 static const int bad_command = 521;
404
405 static const int failed_identification = 532;
406 //static const int bad_data = 541;
407
408 int error_code;
409
410 bool set_totals;
411
412 // Interface to refinement.
413 refiner epoch_refiner;
414 refiner key_refiner;
415 refiner cert_refiner;
416 refiner rev_refiner;
417
418 // Interface to ancestry grovelling.
419 revision_enumerator rev_enumerator;
420
421 // Enumerator_callbacks methods.
422 set<file_id> file_items_sent;
423 bool process_this_rev(revision_id const & rev);
424 bool queue_this_cert(id const & c);
425 bool queue_this_file(id const & f);
426 void note_file_data(file_id const & f);
427 void note_file_delta(file_id const & src, file_id const & dst);
428 void note_rev(revision_id const & rev);
429 void note_cert(id const & c);
430
431 session(options & opts,
432 lua_hooks & lua,
433 project_t & project,
434 key_store & keys,
435 protocol_role role,
436 protocol_voice voice,
437 globish const & our_include_pattern,
438 globish const & our_exclude_pattern,
439 string const & peer,
440 shared_ptr<Netxx::StreamBase> sock,
441 bool initiated_by_server = false);
442
443 virtual ~session();
444
445 id mk_nonce();
446 void mark_recent_io();
447
448 void set_session_key(string const & key);
449 void set_session_key(rsa_oaep_sha_data const & key_encrypted);
450
451 void setup_client_tickers();
452 bool done_all_refinements();
453 bool queued_all_items();
454 bool received_all_items();
455 bool finished_working();
456 void maybe_step();
457 void maybe_say_goodbye(transaction_guard & guard);
458
459 void note_item_arrived(netcmd_item_type ty, id const & i);
460 void maybe_note_epochs_finished();
461 void note_item_sent(netcmd_item_type ty, id const & i);
462
463 Netxx::Probe::ready_type which_events() const;
464 bool read_some();
465 bool write_some();
466
467 void error(int errcode, string const & errmsg);
468
469 void write_netcmd_and_try_flush(netcmd const & cmd);
470
471 // Outgoing queue-writers.
472 void queue_bye_cmd(u8 phase);
473 void queue_error_cmd(string const & errmsg);
474 void queue_done_cmd(netcmd_item_type type, size_t n_items);
475 void queue_hello_cmd(rsa_keypair_id const & key_name,
476 rsa_pub_key const & pub_encoded,
477 id const & nonce);
478 void queue_anonymous_cmd(protocol_role role,
479 globish const & include_pattern,
480 globish const & exclude_pattern,
481 id const & nonce2);
482 void queue_auth_cmd(protocol_role role,
483 globish const & include_pattern,
484 globish const & exclude_pattern,
485 id const & client,
486 id const & nonce1,
487 id const & nonce2,
488 rsa_sha1_signature const & signature);
489 void queue_confirm_cmd();
490 void queue_refine_cmd(refinement_type ty, merkle_node const & node);
491 void queue_data_cmd(netcmd_item_type type,
492 id const & item,
493 string const & dat);
494 void queue_delta_cmd(netcmd_item_type type,
495 id const & base,
496 id const & ident,
497 delta const & del);
498
499 // Incoming dispatch-called methods.
500 bool process_error_cmd(string const & errmsg);
501 bool process_hello_cmd(rsa_keypair_id const & server_keyname,
502 rsa_pub_key const & server_key,
503 id const & nonce);
504 bool process_bye_cmd(u8 phase, transaction_guard & guard);
505 bool process_anonymous_cmd(protocol_role role,
506 globish const & their_include_pattern,
507 globish const & their_exclude_pattern);
508 bool process_auth_cmd(protocol_role role,
509 globish const & their_include_pattern,
510 globish const & their_exclude_pattern,
511 id const & client,
512 id const & nonce1,
513 rsa_sha1_signature const & signature);
514 bool process_refine_cmd(refinement_type ty, merkle_node const & node);
515 bool process_done_cmd(netcmd_item_type type, size_t n_items);
516 bool process_data_cmd(netcmd_item_type type,
517 id const & item,
518 string const & dat);
519 bool process_delta_cmd(netcmd_item_type type,
520 id const & base,
521 id const & ident,
522 delta const & del);
523 bool process_usher_cmd(utf8 const & msg);
524
525 // The incoming dispatcher.
526 bool dispatch_payload(netcmd const & cmd,
527 transaction_guard & guard);
528
529 // Various helpers.
530 void assume_corresponding_role(protocol_role their_role);
531 void respond_to_confirm_cmd();
532 bool data_exists(netcmd_item_type type,
533 id const & item);
534 void load_data(netcmd_item_type type,
535 id const & item,
536 string & out);
537
538 void rebuild_merkle_trees(set<branch_name> const & branches);
539
540 void send_all_data(netcmd_item_type ty, set<id> const & items);
541 void begin_service();
542 bool process(transaction_guard & guard);
543
544 bool initiated_by_server;
545};
546size_t session::session_count = 0;
547
548session::session(options & opts,
549 lua_hooks & lua,
550 project_t & project,
551 key_store & keys,
552 protocol_role role,
553 protocol_voice voice,
554 globish const & our_include_pattern,
555 globish const & our_exclude_pattern,
556 string const & peer,
557 shared_ptr<Netxx::StreamBase> sock,
558 bool initiated_by_server) :
559 role(role),
560 voice(voice),
561 our_include_pattern(our_include_pattern),
562 our_exclude_pattern(our_exclude_pattern),
563 our_matcher(our_include_pattern, our_exclude_pattern),
564 project(project),
565 keys(keys),
566 lua(lua),
567 use_transport_auth(opts.use_transport_auth),
568 signing_key(opts.signing_key),
569 keys_to_push(opts.keys_to_push),
570 peer_id(peer),
571 str(sock),
572 inbuf(),
573 outbuf_size(0),
574 armed(false),
575 received_remote_key(false),
576 remote_peer_key_name(""),
577 session_key(constants::netsync_key_initializer),
578 read_hmac(netsync_session_key(constants::netsync_key_initializer),
579 use_transport_auth),
580 write_hmac(netsync_session_key(constants::netsync_key_initializer),
581 use_transport_auth),
582 authenticated(false),
583 last_io_time(::time(NULL)),
584 byte_in_ticker(NULL),
585 byte_out_ticker(NULL),
586 cert_in_ticker(NULL),
587 cert_out_ticker(NULL),
588 revision_in_ticker(NULL),
589 revision_out_ticker(NULL),
590 bytes_in(0), bytes_out(0),
591 certs_in(0), certs_out(0),
592 revs_in(0), revs_out(0),
593 keys_in(0), keys_out(0),
594 session_id(++session_count),
595 saved_nonce(""),
596 protocol_state(working_state),
597 encountered_error(false),
598 error_code(no_transfer),
599 set_totals(false),
600 epoch_refiner(epoch_item, voice, *this),
601 key_refiner(key_item, voice, *this),
602 cert_refiner(cert_item, voice, *this),
603 rev_refiner(revision_item, voice, *this),
604 rev_enumerator(project, *this),
605 initiated_by_server(initiated_by_server)
606{}
607
608session::~session()
609{
610 if (protocol_state == confirmed_state)
611 error_code = no_error;
612 else if (error_code == no_transfer &&
613 (revs_in || revs_out ||
614 certs_in || certs_out ||
615 keys_in || keys_out))
616 error_code = partial_transfer;
617
618 vector<cert> unattached_written_certs;
619 map<revision_id, vector<cert> > rev_written_certs;
620 for (vector<revision_id>::iterator i = written_revisions.begin();
621 i != written_revisions.end(); ++i)
622 rev_written_certs.insert(make_pair(*i, vector<cert>()));
623 for (vector<cert>::iterator i = written_certs.begin();
624 i != written_certs.end(); ++i)
625 {
626 map<revision_id, vector<cert> >::iterator j;
627 j = rev_written_certs.find(revision_id(i->ident));
628 if (j == rev_written_certs.end())
629 unattached_written_certs.push_back(*i);
630 else
631 j->second.push_back(*i);
632 }
633
634 if (!written_keys.empty()
635 || !written_revisions.empty()
636 || !written_certs.empty())
637 {
638
639 //Keys
640 for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
641 i != written_keys.end(); ++i)
642 {
643 lua.hook_note_netsync_pubkey_received(*i, session_id);
644 }
645
646 //Revisions
647 for (vector<revision_id>::iterator i = written_revisions.begin();
648 i != written_revisions.end(); ++i)
649 {
650 vector<cert> & ctmp(rev_written_certs[*i]);
651 set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
652 for (vector<cert>::const_iterator j = ctmp.begin();
653 j != ctmp.end(); ++j)
654 certs.insert(make_pair(j->key, make_pair(j->name, j->value)));
655
656 revision_data rdat;
657 project.db.get_revision(*i, rdat);
658 lua.hook_note_netsync_revision_received(*i, rdat, certs,
659 session_id);
660 }
661
662 //Certs (not attached to a new revision)
663 for (vector<cert>::iterator i = unattached_written_certs.begin();
664 i != unattached_written_certs.end(); ++i)
665 lua.hook_note_netsync_cert_received(revision_id(i->ident), i->key,
666 i->name, i->value, session_id);
667 }
668
669 if (!sent_keys.empty()
670 || !sent_revisions.empty()
671 || !sent_certs.empty())
672 {
673
674 vector<cert> unattached_sent_certs;
675 map<revision_id, vector<cert> > rev_sent_certs;
676 for (vector<revision_id>::iterator i = sent_revisions.begin();
677 i != sent_revisions.end(); ++i)
678 rev_sent_certs.insert(make_pair(*i, vector<cert>()));
679 for (vector<cert>::iterator i = sent_certs.begin();
680 i != sent_certs.end(); ++i)
681 {
682 map<revision_id, vector<cert> >::iterator j;
683 j = rev_sent_certs.find(revision_id(i->ident));
684 if (j == rev_sent_certs.end())
685 unattached_sent_certs.push_back(*i);
686 else
687 j->second.push_back(*i);
688 }
689
690 //Keys
691 for (vector<rsa_keypair_id>::iterator i = sent_keys.begin();
692 i != sent_keys.end(); ++i)
693 {
694 lua.hook_note_netsync_pubkey_sent(*i, session_id);
695 }
696
697 //Revisions
698 for (vector<revision_id>::iterator i = sent_revisions.begin();
699 i != sent_revisions.end(); ++i)
700 {
701 vector<cert> & ctmp(rev_sent_certs[*i]);
702 set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
703 for (vector<cert>::const_iterator j = ctmp.begin();
704 j != ctmp.end(); ++j)
705 certs.insert(make_pair(j->key, make_pair(j->name, j->value)));
706
707 revision_data rdat;
708 project.db.get_revision(*i, rdat);
709 lua.hook_note_netsync_revision_sent(*i, rdat, certs,
710 session_id);
711 }
712
713 //Certs (not attached to a new revision)
714 for (vector<cert>::iterator i = unattached_sent_certs.begin();
715 i != unattached_sent_certs.end(); ++i)
716 lua.hook_note_netsync_cert_sent(revision_id(i->ident), i->key,
717 i->name, i->value, session_id);
718 }
719
720 lua.hook_note_netsync_end(session_id, error_code,
721 bytes_in, bytes_out,
722 certs_in, certs_out,
723 revs_in, revs_out,
724 keys_in, keys_out);
725}
726
727bool
728session::process_this_rev(revision_id const & rev)
729{
730 return (rev_refiner.items_to_send.find(rev.inner())
731 != rev_refiner.items_to_send.end());
732}
733
734bool
735session::queue_this_cert(id const & c)
736{
737 return (cert_refiner.items_to_send.find(c)
738 != cert_refiner.items_to_send.end());
739}
740
741bool
742session::queue_this_file(id const & f)
743{
744 return file_items_sent.find(file_id(f)) == file_items_sent.end();
745}
746
747void
748session::note_file_data(file_id const & f)
749{
750 if (role == sink_role)
751 return;
752 file_data fd;
753 project.db.get_file_version(f, fd);
754 queue_data_cmd(file_item, f.inner(), fd.inner()());
755 file_items_sent.insert(f);
756}
757
758void
759session::note_file_delta(file_id const & src, file_id const & dst)
760{
761 if (role == sink_role)
762 return;
763 file_delta fdel;
764 project.db.get_arbitrary_file_delta(src, dst, fdel);
765 queue_delta_cmd(file_item, src.inner(), dst.inner(), fdel.inner());
766 file_items_sent.insert(dst);
767}
768
769void
770session::note_rev(revision_id const & rev)
771{
772 if (role == sink_role)
773 return;
774 revision_t rs;
775 project.db.get_revision(rev, rs);
776 data tmp;
777 write_revision(rs, tmp);
778 queue_data_cmd(revision_item, rev.inner(), tmp());
779 sent_revisions.push_back(rev);
780}
781
782void
783session::note_cert(id const & c)
784{
785 if (role == sink_role)
786 return;
787 revision<cert> cert;
788 string str;
789 project.db.get_revision_cert(c, cert);
790 write_cert(cert.inner(), str);
791 queue_data_cmd(cert_item, c, str);
792 sent_certs.push_back(cert.inner());
793}
794
795
796id
797session::mk_nonce()
798{
799 I(this->saved_nonce().size() == 0);
800 char buf[constants::merkle_hash_length_in_bytes];
801 Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf),
802 constants::merkle_hash_length_in_bytes);
803 this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes));
804 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
805 return this->saved_nonce;
806}
807
808void
809session::mark_recent_io()
810{
811 last_io_time = ::time(NULL);
812}
813
814void
815session::set_session_key(string const & key)
816{
817 session_key = netsync_session_key(key);
818 read_hmac.set_key(session_key);
819 write_hmac.set_key(session_key);
820}
821
822void
823session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
824{
825 if (use_transport_auth)
826 {
827 string hmac_key;
828 keys.decrypt_rsa(signing_key, hmac_key_encrypted, hmac_key);
829 set_session_key(hmac_key);
830 }
831}
832
833void
834session::setup_client_tickers()
835{
836 // xgettext: please use short message and try to avoid multibytes chars
837 byte_in_ticker.reset(new ticker(N_("bytes in"), ">", 1024, true));
838 // xgettext: please use short message and try to avoid multibytes chars
839 byte_out_ticker.reset(new ticker(N_("bytes out"), "<", 1024, true));
840 if (role == sink_role)
841 {
842 // xgettext: please use short message and try to avoid multibytes chars
843 cert_in_ticker.reset(new ticker(N_("certs in"), "c", 3));
844 // xgettext: please use short message and try to avoid multibytes chars
845 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
846 }
847 else if (role == source_role)
848 {
849 // xgettext: please use short message and try to avoid multibytes chars
850 cert_out_ticker.reset(new ticker(N_("certs out"), "C", 3));
851 // xgettext: please use short message and try to avoid multibytes chars
852 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
853 }
854 else
855 {
856 I(role == source_and_sink_role);
857 // xgettext: please use short message and try to avoid multibytes chars
858 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
859 // xgettext: please use short message and try to avoid multibytes chars
860 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
861 }
862}
863
864bool
865session::done_all_refinements()
866{
867 bool all = rev_refiner.done
868 && cert_refiner.done
869 && key_refiner.done
870 && epoch_refiner.done;
871
872 if (all && !set_totals)
873 {
874 if (cert_out_ticker.get())
875 cert_out_ticker->set_total(cert_refiner.items_to_send.size());
876
877 if (revision_out_ticker.get())
878 revision_out_ticker->set_total(rev_refiner.items_to_send.size());
879
880 if (cert_in_ticker.get())
881 cert_in_ticker->set_total(cert_refiner.items_to_receive);
882
883 if (revision_in_ticker.get())
884 revision_in_ticker->set_total(rev_refiner.items_to_receive);
885
886 set_totals = true;
887 }
888 return all;
889}
890
891
892
893bool
894session::received_all_items()
895{
896 if (role == source_role)
897 return true;
898 bool all = rev_refiner.items_to_receive == 0
899 && cert_refiner.items_to_receive == 0
900 && key_refiner.items_to_receive == 0
901 && epoch_refiner.items_to_receive == 0;
902 return all;
903}
904
905bool
906session::finished_working()
907{
908 bool all = done_all_refinements()
909 && received_all_items()
910 && queued_all_items()
911 && rev_enumerator.done();
912 return all;
913}
914
915bool
916session::queued_all_items()
917{
918 if (role == sink_role)
919 return true;
920 bool all = rev_refiner.items_to_send.empty()
921 && cert_refiner.items_to_send.empty()
922 && key_refiner.items_to_send.empty()
923 && epoch_refiner.items_to_send.empty();
924 return all;
925}
926
927
928void
929session::maybe_note_epochs_finished()
930{
931 // Maybe there are outstanding epoch requests.
932 // These only matter if we're in sink or source-and-sink mode.
933 if (!(epoch_refiner.items_to_receive == 0) && !(role == source_role))
934 return;
935
936 // And maybe we haven't even finished the refinement.
937 if (!epoch_refiner.done)
938 return;
939
940 // If we ran into an error -- say a mismatched epoch -- don't do any
941 // further refinements.
942 if (encountered_error)
943 return;
944
945 // But otherwise, we're ready to go. Start the next
946 // set of refinements.
947 if (voice == client_voice)
948 {
949 L(FL("epoch refinement finished; beginning other refinements"));
950 key_refiner.begin_refinement();
951 cert_refiner.begin_refinement();
952 rev_refiner.begin_refinement();
953 }
954 else
955 L(FL("epoch refinement finished"));
956}
957
958static void
959decrement_if_nonzero(netcmd_item_type ty,
960 size_t & n)
961{
962 if (n == 0)
963 {
964 string typestr;
965 netcmd_item_type_to_string(ty, typestr);
966 E(false, F("underflow on count of %s items to receive") % typestr);
967 }
968 --n;
969 if (n == 0)
970 {
971 string typestr;
972 netcmd_item_type_to_string(ty, typestr);
973 L(FL("count of %s items to receive has reached zero") % typestr);
974 }
975}
976
977void
978session::note_item_arrived(netcmd_item_type ty, id const & ident)
979{
980 switch (ty)
981 {
982 case cert_item:
983 decrement_if_nonzero(ty, cert_refiner.items_to_receive);
984 if (cert_in_ticker.get() != NULL)
985 ++(*cert_in_ticker);
986 ++certs_in;
987 break;
988 case revision_item:
989 decrement_if_nonzero(ty, rev_refiner.items_to_receive);
990 if (revision_in_ticker.get() != NULL)
991 ++(*revision_in_ticker);
992 ++revs_in;
993 break;
994 case key_item:
995 decrement_if_nonzero(ty, key_refiner.items_to_receive);
996 ++keys_in;
997 break;
998 case epoch_item:
999 decrement_if_nonzero(ty, epoch_refiner.items_to_receive);
1000 break;
1001 default:
1002 // No ticker for other things.
1003 break;
1004 }
1005}
1006
1007
1008
1009void
1010session::note_item_sent(netcmd_item_type ty, id const & ident)
1011{
1012 switch (ty)
1013 {
1014 case cert_item:
1015 cert_refiner.items_to_send.erase(ident);
1016 if (cert_out_ticker.get() != NULL)
1017 ++(*cert_out_ticker);
1018 ++certs_out;
1019 break;
1020 case revision_item:
1021 rev_refiner.items_to_send.erase(ident);
1022 if (revision_out_ticker.get() != NULL)
1023 ++(*revision_out_ticker);
1024 ++revs_out;
1025 break;
1026 case key_item:
1027 key_refiner.items_to_send.erase(ident);
1028 ++keys_out;
1029 break;
1030 case epoch_item:
1031 epoch_refiner.items_to_send.erase(ident);
1032 break;
1033 default:
1034 // No ticker for other things.
1035 break;
1036 }
1037}
1038
1039void
1040session::write_netcmd_and_try_flush(netcmd const & cmd)
1041{
1042 if (!encountered_error)
1043 {
1044 string buf;
1045 cmd.write(buf, write_hmac);
1046 outbuf.push_back(make_pair(buf, 0));
1047 outbuf_size += buf.size();
1048 }
1049 else
1050 L(FL("dropping outgoing netcmd (because we're in error unwind mode)"));
1051 // FIXME: this helps keep the protocol pipeline full but it seems to
1052 // interfere with initial and final sequences. careful with it.
1053 // write_some();
1054 // read_some();
1055}
1056
1057// This method triggers a special "error unwind" mode to netsync. In this
1058// mode, all received data is ignored, and no new data is queued. We simply
1059// stay connected long enough for the current write buffer to be flushed, to
1060// ensure that our peer receives the error message.
1061// Affects read_some, write_some, and process .
1062void
1063session::error(int errcode, string const & errmsg)
1064{
1065 error_code = errcode;
1066 throw netsync_error(errmsg);
1067}
1068
1069Netxx::Probe::ready_type
1070session::which_events() const
1071{
1072 // Only ask to read if we're not armed.
1073 if (outbuf.empty())
1074 {
1075 if (inbuf.size() < constants::netcmd_maxsz && !armed)
1076 return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1077 else
1078 return Netxx::Probe::ready_oobd;
1079 }
1080 else
1081 {
1082 if (inbuf.size() < constants::netcmd_maxsz && !armed)
1083 return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1084 else
1085 return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
1086 }
1087}
1088
1089bool
1090session::read_some()
1091{
1092 I(inbuf.size() < constants::netcmd_maxsz);
1093 char tmp[constants::bufsz];
1094 Netxx::signed_size_type count = str->read(tmp, sizeof(tmp));
1095 if (count > 0)
1096 {
1097 L(FL("read %d bytes from fd %d (peer %s)")
1098 % count % str->get_socketfd() % peer_id);
1099 if (encountered_error)
1100 {
1101 L(FL("in error unwind mode, so throwing them into the bit bucket"));
1102 return true;
1103 }
1104 inbuf.append(tmp,count);
1105 mark_recent_io();
1106 if (byte_in_ticker.get() != NULL)
1107 (*byte_in_ticker) += count;
1108 bytes_in += count;
1109 return true;
1110 }
1111 else
1112 return false;
1113}
1114
1115bool
1116session::write_some()
1117{
1118 I(!outbuf.empty());
1119 size_t writelen = outbuf.front().first.size() - outbuf.front().second;
1120 Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second,
1121 min(writelen,
1122 constants::bufsz));
1123 if (count > 0)
1124 {
1125 if ((size_t)count == writelen)
1126 {
1127 outbuf_size -= outbuf.front().first.size();
1128 outbuf.pop_front();
1129 }
1130 else
1131 {
1132 outbuf.front().second += count;
1133 }
1134 L(FL("wrote %d bytes to fd %d (peer %s)")
1135 % count % str->get_socketfd() % peer_id);
1136 mark_recent_io();
1137 if (byte_out_ticker.get() != NULL)
1138 (*byte_out_ticker) += count;
1139 bytes_out += count;
1140 if (encountered_error && outbuf.empty())
1141 {
1142 // we've flushed our error message, so it's time to get out.
1143 L(FL("finished flushing output queue in error unwind mode, disconnecting"));
1144 return false;
1145 }
1146 return true;
1147 }
1148 else
1149 return false;
1150}
1151
1152// senders
1153
1154void
1155session::queue_error_cmd(string const & errmsg)
1156{
1157 L(FL("queueing 'error' command"));
1158 netcmd cmd;
1159 cmd.write_error_cmd(errmsg);
1160 write_netcmd_and_try_flush(cmd);
1161}
1162
1163void
1164session::queue_bye_cmd(u8 phase)
1165{
1166 L(FL("queueing 'bye' command, phase %d")
1167 % static_cast<size_t>(phase));
1168 netcmd cmd;
1169 cmd.write_bye_cmd(phase);
1170 write_netcmd_and_try_flush(cmd);
1171}
1172
1173void
1174session::queue_done_cmd(netcmd_item_type type,
1175 size_t n_items)
1176{
1177 string typestr;
1178 netcmd_item_type_to_string(type, typestr);
1179 L(FL("queueing 'done' command for %s (%d items)")
1180 % typestr % n_items);
1181 netcmd cmd;
1182 cmd.write_done_cmd(type, n_items);
1183 write_netcmd_and_try_flush(cmd);
1184}
1185
1186void
1187session::queue_hello_cmd(rsa_keypair_id const & key_name,
1188 rsa_pub_key const & pub,
1189 id const & nonce)
1190{
1191 if (use_transport_auth)
1192 cmd.write_hello_cmd(key_name, pub, nonce);
1193 else
1194 cmd.write_hello_cmd(key_name, rsa_pub_key(), nonce);
1195 write_netcmd_and_try_flush(cmd);
1196}
1197
1198void
1199session::queue_anonymous_cmd(protocol_role role,
1200 globish const & include_pattern,
1201 globish const & exclude_pattern,
1202 id const & nonce2)
1203{
1204 netcmd cmd;
1205 rsa_oaep_sha_data hmac_key_encrypted;
1206 if (use_transport_auth)
1207 project.db.encrypt_rsa(remote_peer_key_name, nonce2(), hmac_key_encrypted);
1208 cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
1209 hmac_key_encrypted);
1210 write_netcmd_and_try_flush(cmd);
1211 set_session_key(nonce2());
1212}
1213
1214void
1215session::queue_auth_cmd(protocol_role role,
1216 globish const & include_pattern,
1217 globish const & exclude_pattern,
1218 id const & client,
1219 id const & nonce1,
1220 id const & nonce2,
1221 rsa_sha1_signature const & signature)
1222{
1223 netcmd cmd;
1224 rsa_oaep_sha_data hmac_key_encrypted;
1225 I(use_transport_auth);
1226 project.db.encrypt_rsa(remote_peer_key_name, nonce2(), hmac_key_encrypted);
1227 cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
1228 nonce1, hmac_key_encrypted, signature);
1229 write_netcmd_and_try_flush(cmd);
1230 set_session_key(nonce2());
1231}
1232
1233void
1234session::queue_confirm_cmd()
1235{
1236 netcmd cmd;
1237 cmd.write_confirm_cmd();
1238 write_netcmd_and_try_flush(cmd);
1239}
1240
1241void
1242session::queue_refine_cmd(refinement_type ty, merkle_node const & node)
1243{
1244 string typestr;
1245 hexenc<prefix> hpref;
1246 node.get_hex_prefix(hpref);
1247 netcmd_item_type_to_string(node.type, typestr);
1248 L(FL("queueing refinement %s of %s node '%s', level %d")
1249 % (ty == refinement_query ? "query" : "response")
1250 % typestr % hpref() % static_cast<int>(node.level));
1251 netcmd cmd;
1252 cmd.write_refine_cmd(ty, node);
1253 write_netcmd_and_try_flush(cmd);
1254}
1255
1256void
1257session::queue_data_cmd(netcmd_item_type type,
1258 id const & item,
1259 string const & dat)
1260{
1261 string typestr;
1262 netcmd_item_type_to_string(type, typestr);
1263 hexenc<id> hid;
1264
1265 if (global_sanity.debug_p())
1266 encode_hexenc(item, hid);
1267
1268 if (role == sink_role)
1269 {
1270 L(FL("not queueing %s data for '%s' as we are in pure sink role")
1271 % typestr % hid());
1272 return;
1273 }
1274
1275 L(FL("queueing %d bytes of data for %s item '%s'")
1276 % dat.size() % typestr % hid());
1277
1278 netcmd cmd;
1279 // TODO: This pair of functions will make two copies of a large
1280 // file, the first in cmd.write_data_cmd, and the second in
1281 // write_netcmd_and_try_flush when the data is copied from the
1282 // cmd.payload variable to the string buffer for output. This
1283 // double copy should be collapsed out, it may be better to use
1284 // a string_queue for output as well as input, as that will reduce
1285 // the amount of mallocs that happen when the string queue is large
1286 // enough to just store the data.
1287 cmd.write_data_cmd(type, item, dat);
1288 write_netcmd_and_try_flush(cmd);
1289 note_item_sent(type, item);
1290}
1291
1292void
1293session::queue_delta_cmd(netcmd_item_type type,
1294 id const & base,
1295 id const & ident,
1296 delta const & del)
1297{
1298 I(type == file_item);
1299 string typestr;
1300 netcmd_item_type_to_string(type, typestr);
1301 hexenc<id> base_hid,
1302 ident_hid;
1303
1304 if (global_sanity.debug_p())
1305 {
1306 encode_hexenc(base, base_hid);
1307 encode_hexenc(ident, ident_hid);
1308 }
1309
1310 if (role == sink_role)
1311 {
1312 L(FL("not queueing %s delta '%s' -> '%s' as we are in pure sink role")
1313 % typestr % base_hid() % ident_hid());
1314 return;
1315 }
1316
1317 L(FL("queueing %s delta '%s' -> '%s'")
1318 % typestr % base_hid() % ident_hid());
1319 netcmd cmd;
1320 cmd.write_delta_cmd(type, base, ident, del);
1321 write_netcmd_and_try_flush(cmd);
1322 note_item_sent(type, ident);
1323}
1324
1325
1326// processors
1327
1328bool
1329session::process_error_cmd(string const & errmsg)
1330{
1331 // "xxx string" with xxx being digits means there's an error code
1332 if (errmsg.size() > 4 && errmsg.substr(3,1) == " ")
1333 {
1334 try
1335 {
1336 int err = boost::lexical_cast<int>(errmsg.substr(0,3));
1337 if (err >= 100)
1338 {
1339 error_code = err;
1340 throw bad_decode(F("received network error: %s")
1341 % errmsg.substr(4));
1342 }
1343 }
1344 catch (boost::bad_lexical_cast)
1345 { // ok, so it wasn't a number
1346 }
1347 }
1348 throw bad_decode(F("received network error: %s") % errmsg);
1349}
1350
1351static const var_domain known_servers_domain = var_domain("known-servers");
1352
1353bool
1354session::process_hello_cmd(rsa_keypair_id const & their_keyname,
1355 rsa_pub_key const & their_key,
1356 id const & nonce)
1357{
1358 I(!this->received_remote_key);
1359 I(this->saved_nonce().size() == 0);
1360
1361 if (use_transport_auth)
1362 {
1363 id their_key_hash;
1364 key_hash_code(their_keyname, their_key, their_key_hash);
1365 var_value printable_key_hash(encode_hexenc(their_key_hash()));
1366 L(FL("server key has name %s, hash %s")
1367 % their_keyname % printable_key_hash);
1368 var_key their_key_key(known_servers_domain, var_name(peer_id));
1369 if (project.db.var_exists(their_key_key))
1370 {
1371 var_value expected_key_hash;
1372 project.db.get_var(their_key_key, expected_key_hash);
1373 if (expected_key_hash != printable_key_hash)
1374 {
1375 P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1376 "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"
1377 "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1378 "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
1379 "it is also possible that the server key has just been changed\n"
1380 "remote host sent key %s\n"
1381 "I expected %s\n"
1382 "'%s unset %s %s' overrides this check")
1383 % printable_key_hash
1384 % expected_key_hash
1385 % ui.prog_name % their_key_key.first % their_key_key.second);
1386 E(false, F("server key changed"));
1387 }
1388 }
1389 else
1390 {
1391 P(F("first time connecting to server %s\n"
1392 "I'll assume it's really them, but you might want to double-check\n"
1393 "their key's fingerprint: %s")
1394 % peer_id
1395 % printable_key_hash);
1396 project.db.set_var(their_key_key, printable_key_hash);
1397 }
1398
1399 if (project.db.public_key_exists(their_keyname))
1400 {
1401 rsa_pub_key tmp;
1402 project.db.get_key(their_keyname, tmp);
1403
1404 E(keys_match(their_keyname, tmp, their_keyname, their_key),
1405 F("the server sent a key with the key id '%s'\n"
1406 "which is already in use in your database. you may want to execute\n"
1407 " %s dropkey %s\n"
1408 "on your local database before you run this command again,\n"
1409 "assuming that key currently present in your database does NOT have\n"
1410 "a private counterpart (or in other words, is one of YOUR keys)")
1411 % their_keyname % ui.prog_name % their_keyname);
1412 }
1413 else
1414 {
1415 // this should now always return true since we just checked
1416 // for the existence of this particular key
1417 I(project.db.put_key(their_keyname, their_key));
1418 W(F("saving public key for %s to database") % their_keyname);
1419 }
1420
1421 {
1422 hexenc<id> hnonce;
1423 encode_hexenc(nonce, hnonce);
1424 L(FL("received 'hello' netcmd from server '%s' with nonce '%s'")
1425 % printable_key_hash % hnonce);
1426 }
1427
1428 I(project.db.public_key_exists(their_key_hash));
1429
1430 // save their identity
1431 this->received_remote_key = true;
1432 this->remote_peer_key_name = their_keyname;
1433 }
1434
1435 // clients always include in the synchronization set, every branch that the
1436 // user requested
1437 set<branch_name> all_branches, ok_branches;
1438 project.get_branch_list(all_branches);
1439 for (set<branch_name>::const_iterator i = all_branches.begin();
1440 i != all_branches.end(); i++)
1441 {
1442 if (our_matcher((*i)()))
1443 ok_branches.insert(*i);
1444 }
1445 rebuild_merkle_trees(ok_branches);
1446
1447 if (!initiated_by_server)
1448 setup_client_tickers();
1449
1450 if (use_transport_auth && signing_key() != "")
1451 {
1452 // get our key pair
1453 load_key_pair(keys, signing_key);
1454
1455 // make a signature with it;
1456 // this also ensures our public key is in the database
1457 rsa_sha1_signature sig;
1458 keys.make_signature(project.db, signing_key, nonce(), sig);
1459
1460 // get the hash identifier for our pubkey
1461 rsa_pub_key our_pub;
1462 project.db.get_key(signing_key, our_pub);
1463 id our_key_hash_raw;
1464 key_hash_code(signing_key, our_pub, our_key_hash_raw);
1465
1466 // make a new nonce of our own and send off the 'auth'
1467 queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern,
1468 our_key_hash_raw, nonce, mk_nonce(), sig);
1469 }
1470 else
1471 {
1472 queue_anonymous_cmd(this->role, our_include_pattern,
1473 our_exclude_pattern, mk_nonce());
1474 }
1475
1476 lua.hook_note_netsync_start(session_id, "client", this->role,
1477 peer_id, their_keyname,
1478 our_include_pattern, our_exclude_pattern);
1479 return true;
1480}
1481
1482bool
1483session::process_anonymous_cmd(protocol_role their_role,
1484 globish const & their_include_pattern,
1485 globish const & their_exclude_pattern)
1486{
1487 // Internally netsync thinks in terms of sources and sinks. Users like
1488 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1489 //
1490 // We therefore use the read/write terminology when dealing with the UI:
1491 // if the user asks to run a "read only" service, this means they are
1492 // willing to be a source but not a sink.
1493 //
1494 // nb: The "role" here is the role the *client* wants to play
1495 // so we need to check that the opposite role is allowed for us,
1496 // in our this->role field.
1497 //
1498
1499 lua.hook_note_netsync_start(session_id, "server", their_role,
1500 peer_id, rsa_keypair_id(),
1501 their_include_pattern, their_exclude_pattern);
1502
1503 // Client must be a sink and server must be a source (anonymous
1504 // read-only), unless transport auth is disabled.
1505 //
1506 // If running in no-transport-auth mode, we operate anonymously and
1507 // permit adoption of any role.
1508
1509 if (use_transport_auth)
1510 {
1511 if (their_role != sink_role)
1512 {
1513 this->saved_nonce = id("");
1514 error(not_permitted,
1515 F("rejected attempt at anonymous connection for write").str());
1516 }
1517
1518 if (this->role == sink_role)
1519 {
1520 this->saved_nonce = id("");
1521 error(role_mismatch,
1522 F("rejected attempt at anonymous connection while running as sink").str());
1523 }
1524 }
1525
1526 set<branch_name> all_branches, ok_branches;
1527 project.get_branch_list(all_branches);
1528 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1529 for (set<branch_name>::const_iterator i = all_branches.begin();
1530 i != all_branches.end(); i++)
1531 {
1532 if (their_matcher((*i)()))
1533 {
1534 if (use_transport_auth &&
1535 !lua.hook_get_netsync_read_permitted((*i)()))
1536 {
1537 error(not_permitted,
1538 (F("anonymous access to branch '%s' denied by server")
1539 % *i).str());
1540 }
1541 else
1542 ok_branches.insert(*i);
1543 }
1544 }
1545
1546 if (use_transport_auth)
1547 {
1548 P(F("allowed anonymous read permission for '%s' excluding '%s'")
1549 % their_include_pattern % their_exclude_pattern);
1550 this->role = source_role;
1551 }
1552 else
1553 {
1554 P(F("allowed anonymous read/write permission for '%s' excluding '%s'")
1555 % their_include_pattern % their_exclude_pattern);
1556 assume_corresponding_role(their_role);
1557 }
1558
1559 rebuild_merkle_trees(ok_branches);
1560
1561 this->remote_peer_key_name = rsa_keypair_id("");
1562 this->authenticated = true;
1563 return true;
1564}
1565
1566void
1567session::assume_corresponding_role(protocol_role their_role)
1568{
1569 // Assume the (possibly degraded) opposite role.
1570 switch (their_role)
1571 {
1572 case source_role:
1573 I(this->role != source_role);
1574 this->role = sink_role;
1575 break;
1576
1577 case source_and_sink_role:
1578 I(this->role == source_and_sink_role);
1579 break;
1580
1581 case sink_role:
1582 I(this->role != sink_role);
1583 this->role = source_role;
1584 break;
1585 }
1586}
1587
1588bool
1589session::process_auth_cmd(protocol_role their_role,
1590 globish const & their_include_pattern,
1591 globish const & their_exclude_pattern,
1592 id const & client,
1593 id const & nonce1,
1594 rsa_sha1_signature const & signature)
1595{
1596 I(!this->received_remote_key);
1597 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1598
1599 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1600
1601 if (!project.db.public_key_exists(client))
1602 {
1603 // If it's not in the db, it still could be in the keystore if we
1604 // have the private key that goes with it.
1605 rsa_keypair_id their_key_id;
1606 keypair their_keypair;
1607 if (keys.maybe_get_key_pair(client, their_key_id, their_keypair))
1608 project.db.put_key(their_key_id, their_keypair.pub);
1609 else
1610 {
1611 return process_anonymous_cmd(their_role,
1612 their_include_pattern,
1613 their_exclude_pattern);
1614 /*
1615 this->saved_nonce = id("");
1616
1617 lua.hook_note_netsync_start(session_id, "server", their_role,
1618 peer_id, rsa_keypair_id("-unknown-"),
1619 their_include_pattern,
1620 their_exclude_pattern);
1621 error(unknown_key,
1622 (F("remote public key hash '%s' is unknown")
1623 % encode_hexenc(client())).str());
1624 */
1625 }
1626 }
1627
1628 // Get their public key.
1629 rsa_keypair_id their_id;
1630 rsa_pub_key their_key;
1631 project.db.get_pubkey(client, their_id, their_key);
1632
1633 lua.hook_note_netsync_start(session_id, "server", their_role,
1634 peer_id, their_id,
1635 their_include_pattern, their_exclude_pattern);
1636
1637 // Check that they replied with the nonce we asked for.
1638 if (!(nonce1 == this->saved_nonce))
1639 {
1640 this->saved_nonce = id("");
1641 error(failed_identification,
1642 F("detected replay attack in auth netcmd").str());
1643 }
1644
1645 // Internally netsync thinks in terms of sources and sinks. users like
1646 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1647 //
1648 // We therefore use the read/write terminology when dealing with the UI:
1649 // if the user asks to run a "read only" service, this means they are
1650 // willing to be a source but not a sink.
1651 //
1652 // nb: The "their_role" here is the role the *client* wants to play
1653 // so we need to check that the opposite role is allowed for us,
1654 // in our this->role field.
1655
1656 // Client as sink, server as source (reading).
1657
1658 if (their_role == sink_role || their_role == source_and_sink_role)
1659 {
1660 if (this->role != source_role && this->role != source_and_sink_role)
1661 {
1662 this->saved_nonce = id("");
1663 error(not_permitted,
1664 (F("denied '%s' read permission for '%s' excluding '%s' while running as pure sink")
1665 % their_id % their_include_pattern % their_exclude_pattern).str());
1666 }
1667 }
1668
1669 set<branch_name> all_branches, ok_branches;
1670 project.get_branch_list(all_branches);
1671 for (set<branch_name>::const_iterator i = all_branches.begin();
1672 i != all_branches.end(); i++)
1673 {
1674 if (their_matcher((*i)()))
1675 {
1676 if (!lua.hook_get_netsync_read_permitted((*i)(), their_id))
1677 {
1678 error(not_permitted,
1679 (F("denied '%s' read permission for '%s' excluding '%s' because of branch '%s'")
1680 % their_id % their_include_pattern % their_exclude_pattern % *i).str());
1681 }
1682 else
1683 ok_branches.insert(*i);
1684 }
1685 }
1686
1687 // If we're source_and_sink_role, continue even with no branches readable
1688 // eg. serve --db=empty.db
1689 P(F("allowed '%s' read permission for '%s' excluding '%s'")
1690 % their_id % their_include_pattern % their_exclude_pattern);
1691
1692 // Client as source, server as sink (writing).
1693
1694 if (their_role == source_role || their_role == source_and_sink_role)
1695 {
1696 if (this->role != sink_role && this->role != source_and_sink_role)
1697 {
1698 this->saved_nonce = id("");
1699 error(not_permitted,
1700 (F("denied '%s' write permission for '%s' excluding '%s' while running as pure source")
1701 % their_id % their_include_pattern % their_exclude_pattern).str());
1702 }
1703
1704 if (!lua.hook_get_netsync_write_permitted(their_id))
1705 {
1706 this->saved_nonce = id("");
1707 error(not_permitted,
1708 (F("denied '%s' write permission for '%s' excluding '%s'")
1709 % their_id % their_include_pattern % their_exclude_pattern).str());
1710 }
1711
1712 P(F("allowed '%s' write permission for '%s' excluding '%s'")
1713 % their_id % their_include_pattern % their_exclude_pattern);
1714 }
1715
1716 rebuild_merkle_trees(ok_branches);
1717
1718 this->received_remote_key = true;
1719
1720 // Check the signature.
1721 if (project.db.check_signature(their_id, nonce1(), signature) == cert_ok)
1722 {
1723 // Get our private key and sign back.
1724 L(FL("client signature OK, accepting authentication"));
1725 this->authenticated = true;
1726 this->remote_peer_key_name = their_id;
1727
1728 assume_corresponding_role(their_role);
1729 return true;
1730 }
1731 else
1732 {
1733 error(failed_identification, (F("bad client signature")).str());
1734 }
1735 return false;
1736}
1737
1738bool
1739session::process_refine_cmd(refinement_type ty, merkle_node const & node)
1740{
1741 string typestr;
1742 netcmd_item_type_to_string(node.type, typestr);
1743 L(FL("processing refine cmd for %s node at level %d")
1744 % typestr % node.level);
1745
1746 switch (node.type)
1747 {
1748 case file_item:
1749 W(F("Unexpected 'refine' command on non-refined item type"));
1750 break;
1751
1752 case key_item:
1753 key_refiner.process_refinement_command(ty, node);
1754 break;
1755
1756 case revision_item:
1757 rev_refiner.process_refinement_command(ty, node);
1758 break;
1759
1760 case cert_item:
1761 cert_refiner.process_refinement_command(ty, node);
1762 break;
1763
1764 case epoch_item:
1765 epoch_refiner.process_refinement_command(ty, node);
1766 break;
1767 }
1768 return true;
1769}
1770
1771bool
1772session::process_bye_cmd(u8 phase,
1773 transaction_guard & guard)
1774{
1775
1776// Ideal shutdown
1777// ~~~~~~~~~~~~~~~
1778//
1779// I/O events state transitions
1780// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
1781// client: C_WORKING
1782// server: S_WORKING
1783// 0. [refinement, data, deltas, etc.]
1784// client: C_SHUTDOWN
1785// (client checkpoints here)
1786// 1. client -> "bye 0"
1787// 2. "bye 0" -> server
1788// server: S_SHUTDOWN
1789// (server checkpoints here)
1790// 3. "bye 1" <- server
1791// 4. client <- "bye 1"
1792// client: C_CONFIRMED
1793// 5. client -> "bye 2"
1794// 6. "bye 2" -> server
1795// server: S_CONFIRMED
1796// 7. [server drops connection]
1797//
1798//
1799// Affects of I/O errors or disconnections
1800// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1801// C_WORKING: report error and fault
1802// S_WORKING: report error and recover
1803// C_SHUTDOWN: report error and fault
1804// S_SHUTDOWN: report success and recover
1805// (and warn that client might falsely see error)
1806// C_CONFIRMED: report success
1807// S_CONFIRMED: report success
1808
1809 switch (phase)
1810 {
1811 case 0:
1812 if (voice == server_voice &&
1813 protocol_state == working_state)
1814 {
1815 protocol_state = shutdown_state;
1816 guard.do_checkpoint();
1817 queue_bye_cmd(1);
1818 }
1819 else
1820 error(bad_command, "unexpected bye phase 0 received");
1821 break;
1822
1823 case 1:
1824 if (voice == client_voice &&
1825 protocol_state == shutdown_state)
1826 {
1827 protocol_state = confirmed_state;
1828 queue_bye_cmd(2);
1829 }
1830 else
1831 error(bad_command, "unexpected bye phase 1 received");
1832 break;
1833
1834 case 2:
1835 if (voice == server_voice &&
1836 protocol_state == shutdown_state)
1837 {
1838 protocol_state = confirmed_state;
1839 return false;
1840 }
1841 else
1842 error(bad_command, "unexpected bye phase 2 received");
1843 break;
1844
1845 default:
1846 error(bad_command, (F("unknown bye phase %d received") % phase).str());
1847 }
1848
1849 return true;
1850}
1851
1852bool
1853session::process_done_cmd(netcmd_item_type type, size_t n_items)
1854{
1855 string typestr;
1856 netcmd_item_type_to_string(type, typestr);
1857 L(FL("received 'done' command for %s (%s items)") % typestr % n_items);
1858 switch (type)
1859 {
1860 case file_item:
1861 W(F("Unexpected 'done' command on non-refined item type"));
1862 break;
1863
1864 case key_item:
1865 key_refiner.process_done_command(n_items);
1866 if (key_refiner.done && role != sink_role)
1867 send_all_data(key_item, key_refiner.items_to_send);
1868 break;
1869
1870 case revision_item:
1871 rev_refiner.process_done_command(n_items);
1872 break;
1873
1874 case cert_item:
1875 cert_refiner.process_done_command(n_items);
1876 break;
1877
1878 case epoch_item:
1879 epoch_refiner.process_done_command(n_items);
1880 if (epoch_refiner.done)
1881 {
1882 send_all_data(epoch_item, epoch_refiner.items_to_send);
1883 maybe_note_epochs_finished();
1884 }
1885 break;
1886 }
1887 return true;
1888}
1889
1890void
1891session::respond_to_confirm_cmd()
1892{
1893 epoch_refiner.begin_refinement();
1894}
1895
1896bool
1897session::data_exists(netcmd_item_type type,
1898 id const & item)
1899{
1900 switch (type)
1901 {
1902 case key_item:
1903 return key_refiner.local_item_exists(item)
1904 || project.db.public_key_exists(item);
1905 case file_item:
1906 return project.db.file_version_exists(file_id(item));
1907 case revision_item:
1908 return rev_refiner.local_item_exists(item)
1909 || project.db.revision_exists(revision_id(item));
1910 case cert_item:
1911 return cert_refiner.local_item_exists(item)
1912 || project.db.revision_cert_exists(revision_id(item));
1913 case epoch_item:
1914 return epoch_refiner.local_item_exists(item)
1915 || project.db.epoch_exists(epoch_id(item));
1916 }
1917 return false;
1918}
1919
1920void
1921session::load_data(netcmd_item_type type,
1922 id const & item,
1923 string & out)
1924{
1925 string typestr;
1926 netcmd_item_type_to_string(type, typestr);
1927 hexenc<id> hitem;
1928 encode_hexenc(item, hitem);
1929
1930 if (!data_exists(type, item))
1931 throw bad_decode(F("%s with hash '%s' does not exist in our database")
1932 % typestr % hitem());
1933
1934 switch (type)
1935 {
1936 case epoch_item:
1937 {
1938 branch_name branch;
1939 epoch_data epoch;
1940 project.db.get_epoch(epoch_id(item), branch, epoch);
1941 write_epoch(branch, epoch, out);
1942 }
1943 break;
1944 case key_item:
1945 {
1946 rsa_keypair_id keyid;
1947 rsa_pub_key pub;
1948 project.db.get_pubkey(item, keyid, pub);
1949 L(FL("public key '%s' is also called '%s'") % hitem() % keyid);
1950 write_pubkey(keyid, pub, out);
1951 sent_keys.push_back(keyid);
1952 }
1953 break;
1954
1955 case revision_item:
1956 {
1957 revision_data mdat;
1958 data dat;
1959 project.db.get_revision(revision_id(item), mdat);
1960 out = mdat.inner()();
1961 }
1962 break;
1963
1964 case file_item:
1965 {
1966 file_data fdat;
1967 data dat;
1968 project.db.get_file_version(file_id(item), fdat);
1969 out = fdat.inner()();
1970 }
1971 break;
1972
1973 case cert_item:
1974 {
1975 revision<cert> c;
1976 project.db.get_revision_cert(item, c);
1977 string tmp;
1978 write_cert(c.inner(), out);
1979 }
1980 break;
1981 }
1982}
1983
1984bool
1985session::process_data_cmd(netcmd_item_type type,
1986 id const & item,
1987 string const & dat)
1988{
1989 hexenc<id> hitem;
1990 encode_hexenc(item, hitem);
1991
1992 string typestr;
1993 netcmd_item_type_to_string(type, typestr);
1994
1995 note_item_arrived(type, item);
1996 if (data_exists(type, item))
1997 {
1998 L(FL("%s '%s' already exists in our database") % typestr % hitem());
1999 if (type == epoch_item)
2000 maybe_note_epochs_finished();
2001 return true;
2002 }
2003
2004 switch (type)
2005 {
2006 case epoch_item:
2007 {
2008 branch_name branch;
2009 epoch_data epoch;
2010 read_epoch(dat, branch, epoch);
2011 L(FL("received epoch %s for branch %s")
2012 % encode_hexenc(epoch.inner()()) % branch);
2013 map<branch_name, epoch_data> epochs;
2014 project.db.get_epochs(epochs);
2015 map<branch_name, epoch_data>::const_iterator i;
2016 i = epochs.find(branch);
2017 if (i == epochs.end())
2018 {
2019 L(FL("branch %s has no epoch; setting epoch to %s")
2020 % branch % encode_hexenc(epoch.inner()()));
2021 project.db.set_epoch(branch, epoch);
2022 }
2023 else
2024 {
2025 L(FL("branch %s already has an epoch; checking") % branch);
2026 // If we get here, then we know that the epoch must be
2027 // different, because if it were the same then the
2028 // if (epoch_exists()) branch up above would have been taken.
2029 // if somehow this is wrong, then we have broken epoch
2030 // hashing or something, which is very dangerous, so play it
2031 // safe...
2032 I(!(i->second == epoch));
2033
2034 // It is safe to call 'error' here, because if we get here,
2035 // then the current netcmd packet cannot possibly have
2036 // written anything to the database.
2037 error(mixing_versions,
2038 (F("Mismatched epoch on branch %s."
2039 " Server has '%s', client has '%s'.")
2040 % branch
2041 % encode_hexenc((voice == server_voice
2042 ? i->second: epoch).inner()())
2043 % encode_hexenc((voice == server_voice
2044 ? epoch : i->second).inner()())).str());
2045 }
2046 }
2047 maybe_note_epochs_finished();
2048 break;
2049
2050 case key_item:
2051 {
2052 rsa_keypair_id keyid;
2053 rsa_pub_key pub;
2054 read_pubkey(dat, keyid, pub);
2055 id tmp;
2056 key_hash_code(keyid, pub, tmp);
2057 if (! (tmp == item))
2058 {
2059 throw bad_decode(F("hash check failed for public key '%s' (%s);"
2060 " wanted '%s' got '%s'")
2061 % hitem() % keyid % hitem()
2062 % encode_hexenc(tmp()));
2063 }
2064 if (project.db.put_key(keyid, pub))
2065 written_keys.push_back(keyid);
2066 else
2067 error(partial_transfer,
2068 (F("Received duplicate key %s") % keyid).str());
2069 }
2070 break;
2071
2072 case cert_item:
2073 {
2074 cert c;
2075 read_cert(dat, c);
2076 id tmp;
2077 cert_hash_code(c, tmp);
2078 if (! (tmp == item))
2079 throw bad_decode(F("hash check failed for revision cert '%s'") % hitem());
2080 if (project.db.put_revision_cert(revision<cert>(c)))
2081 written_certs.push_back(c);
2082 }
2083 break;
2084
2085 case revision_item:
2086 {
2087 L(FL("received revision '%s'") % hitem());
2088 if (project.db.put_revision(revision_id(item), revision_data(dat)))
2089 written_revisions.push_back(revision_id(item));
2090 }
2091 break;
2092
2093 case file_item:
2094 {
2095 L(FL("received file '%s'") % hitem());
2096 project.db.put_file(file_id(item), file_data(dat));
2097 }
2098 break;
2099 }
2100 return true;
2101}
2102
2103bool
2104session::process_delta_cmd(netcmd_item_type type,
2105 id const & base,
2106 id const & ident,
2107 delta const & del)
2108{
2109 string typestr;
2110 netcmd_item_type_to_string(type, typestr);
2111
2112 pair<id,id> id_pair = make_pair(base, ident);
2113
2114 note_item_arrived(type, ident);
2115
2116 switch (type)
2117 {
2118 case file_item:
2119 {
2120 file_id src_file(base), dst_file(ident);
2121 project.db.put_file_version(src_file, dst_file, file_delta(del));
2122 }
2123 break;
2124
2125 default:
2126 L(FL("ignoring delta received for item type %s") % typestr);
2127 break;
2128 }
2129 return true;
2130}
2131
2132bool
2133session::process_usher_cmd(utf8 const & msg)
2134{
2135 if (msg().size())
2136 {
2137 if (msg()[0] == '!')
2138 P(F("Received warning from usher: %s") % msg().substr(1));
2139 else
2140 L(FL("Received greeting from usher: %s") % msg().substr(1));
2141 }
2142 netcmd cmdout;
2143 cmdout.write_usher_reply_cmd(utf8(peer_id), our_include_pattern);
2144 write_netcmd_and_try_flush(cmdout);
2145 L(FL("Sent reply."));
2146 return true;
2147}
2148
2149
2150void
2151session::send_all_data(netcmd_item_type ty, set<id> const & items)
2152{
2153 string typestr;
2154 netcmd_item_type_to_string(ty, typestr);
2155
2156 // Use temporary; passed arg will be invalidated during iteration.
2157 set<id> tmp = items;
2158
2159 for (set<id>::const_iterator i = tmp.begin();
2160 i != tmp.end(); ++i)
2161 {
2162 if (data_exists(ty, *i))
2163 {
2164 string out;
2165 load_data(ty, *i, out);
2166 queue_data_cmd(ty, *i, out);
2167 }
2168 }
2169}
2170
2171bool
2172session::dispatch_payload(netcmd const & cmd,
2173 transaction_guard & guard)
2174{
2175
2176 switch (cmd.get_cmd_code())
2177 {
2178
2179 case error_cmd:
2180 {
2181 string errmsg;
2182 cmd.read_error_cmd(errmsg);
2183 return process_error_cmd(errmsg);
2184 }
2185 break;
2186
2187 case hello_cmd:
2188 require(! authenticated, "hello netcmd received when not authenticated");
2189 require(voice == client_voice, "hello netcmd received in client voice");
2190 {
2191 rsa_keypair_id server_keyname;
2192 rsa_pub_key server_key;
2193 id nonce;
2194 cmd.read_hello_cmd(server_keyname, server_key, nonce);
2195 return process_hello_cmd(server_keyname, server_key, nonce);
2196 }
2197 break;
2198
2199 case bye_cmd:
2200 require(authenticated, "bye netcmd received when not authenticated");
2201 {
2202 u8 phase;
2203 cmd.read_bye_cmd(phase);
2204 return process_bye_cmd(phase, guard);
2205 }
2206 break;
2207
2208 case anonymous_cmd:
2209 require(! authenticated, "anonymous netcmd received when not authenticated");
2210 require(voice == server_voice, "anonymous netcmd received in server voice");
2211 require(role == source_role ||
2212 role == source_and_sink_role,
2213 "anonymous netcmd received in source or source/sink role");
2214 {
2215 protocol_role role;
2216 globish their_include_pattern, their_exclude_pattern;
2217 rsa_oaep_sha_data hmac_key_encrypted;
2218 cmd.read_anonymous_cmd(role, their_include_pattern, their_exclude_pattern, hmac_key_encrypted);
2219 L(FL("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' "
2220 "in %s mode\n")
2221 % their_include_pattern % their_exclude_pattern
2222 % (role == source_and_sink_role ? _("source and sink") :
2223 (role == source_role ? _("source") : _("sink"))));
2224
2225 set_session_key(hmac_key_encrypted);
2226 if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern))
2227 return false;
2228 queue_confirm_cmd();
2229 return true;
2230 }
2231 break;
2232
2233 case auth_cmd:
2234 require(! authenticated, "auth netcmd received when not authenticated");
2235 require(voice == server_voice, "auth netcmd received in server voice");
2236 {
2237 protocol_role role;
2238 rsa_sha1_signature signature;
2239 globish their_include_pattern, their_exclude_pattern;
2240 id client, nonce1, nonce2;
2241 rsa_oaep_sha_data hmac_key_encrypted;
2242 cmd.read_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2243 client, nonce1, hmac_key_encrypted, signature);
2244
2245 hexenc<id> their_key_hash;
2246 encode_hexenc(client, their_key_hash);
2247 hexenc<id> hnonce1;
2248 encode_hexenc(nonce1, hnonce1);
2249
2250 L(FL("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' "
2251 "exclude '%s' in %s mode with nonce1 '%s'\n")
2252 % their_key_hash % their_include_pattern % their_exclude_pattern
2253 % (role == source_and_sink_role ? _("source and sink") :
2254 (role == source_role ? _("source") : _("sink")))
2255 % hnonce1);
2256
2257 set_session_key(hmac_key_encrypted);
2258
2259 if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2260 client, nonce1, signature))
2261 return false;
2262 queue_confirm_cmd();
2263 return true;
2264 }
2265 break;
2266
2267 case confirm_cmd:
2268 require(! authenticated, "confirm netcmd received when not authenticated");
2269 require(voice == client_voice, "confirm netcmd received in client voice");
2270 {
2271 string signature;
2272 cmd.read_confirm_cmd();
2273 this->authenticated = true;
2274 respond_to_confirm_cmd();
2275 return true;
2276 }
2277 break;
2278
2279 case refine_cmd:
2280 require(authenticated, "refine netcmd received when authenticated");
2281 {
2282 merkle_node node;
2283 refinement_type ty;
2284 cmd.read_refine_cmd(ty, node);
2285 return process_refine_cmd(ty, node);
2286 }
2287 break;
2288
2289 case done_cmd:
2290 require(authenticated, "done netcmd received when not authenticated");
2291 {
2292 size_t n_items;
2293 netcmd_item_type type;
2294 cmd.read_done_cmd(type, n_items);
2295 return process_done_cmd(type, n_items);
2296 }
2297 break;
2298
2299 case data_cmd:
2300 require(authenticated, "data netcmd received when not authenticated");
2301 require(role == sink_role ||
2302 role == source_and_sink_role,
2303 "data netcmd received in source or source/sink role");
2304 {
2305 netcmd_item_type type;
2306 id item;
2307 string dat;
2308 cmd.read_data_cmd(type, item, dat);
2309 return process_data_cmd(type, item, dat);
2310 }
2311 break;
2312
2313 case delta_cmd:
2314 require(authenticated, "delta netcmd received when not authenticated");
2315 require(role == sink_role ||
2316 role == source_and_sink_role,
2317 "delta netcmd received in source or source/sink role");
2318 {
2319 netcmd_item_type type;
2320 id base, ident;
2321 delta del;
2322 cmd.read_delta_cmd(type, base, ident, del);
2323 return process_delta_cmd(type, base, ident, del);
2324 }
2325 break;
2326
2327 case usher_cmd:
2328 {
2329 utf8 greeting;
2330 cmd.read_usher_cmd(greeting);
2331 return process_usher_cmd(greeting);
2332 }
2333 break;
2334
2335 case usher_reply_cmd:
2336 return false; // Should not happen.
2337 break;
2338 }
2339 return false;
2340}
2341
2342// This kicks off the whole cascade starting from "hello".
2343void
2344session::begin_service()
2345{
2346 keypair kp;
2347 if (use_transport_auth)
2348 keys.get_key_pair(signing_key, kp);
2349 queue_hello_cmd(signing_key, kp.pub, mk_nonce());
2350}
2351
2352void
2353session::maybe_step()
2354{
2355 while (done_all_refinements()
2356 && !rev_enumerator.done()
2357 && outbuf_size < constants::bufsz * 10)
2358 {
2359 rev_enumerator.step();
2360 }
2361}
2362
2363void
2364session::maybe_say_goodbye(transaction_guard & guard)
2365{
2366 if (voice == client_voice
2367 && protocol_state == working_state
2368 && finished_working())
2369 {
2370 protocol_state = shutdown_state;
2371 guard.do_checkpoint();
2372 queue_bye_cmd(0);
2373 }
2374}
2375
2376bool
2377session::arm()
2378{
2379 if (!armed)
2380 {
2381 // Don't pack the buffer unnecessarily.
2382 if (outbuf_size > constants::bufsz * 10)
2383 return false;
2384
2385 if (cmd.read(inbuf, read_hmac))
2386 {
2387 armed = true;
2388 }
2389 }
2390 return armed;
2391}
2392
2393bool session::process(transaction_guard & guard)
2394{
2395 if (encountered_error)
2396 return true;
2397 try
2398 {
2399 if (!arm())
2400 return true;
2401
2402 armed = false;
2403 L(FL("processing %d byte input buffer from peer %s")
2404 % inbuf.size() % peer_id);
2405
2406 size_t sz = cmd.encoded_size();
2407 bool ret = dispatch_payload(cmd, guard);
2408
2409 if (inbuf.size() >= constants::netcmd_maxsz)
2410 W(F("input buffer for peer %s is overfull "
2411 "after netcmd dispatch") % peer_id);
2412
2413 guard.maybe_checkpoint(sz);
2414
2415 if (!ret)
2416 L(FL("finishing processing with '%d' packet")
2417 % cmd.get_cmd_code());
2418 return ret;
2419 }
2420 catch (bad_decode & bd)
2421 {
2422 W(F("protocol error while processing peer %s: '%s'")
2423 % peer_id % bd.what);
2424 return false;
2425 }
2426 catch (netsync_error & err)
2427 {
2428 W(F("error: %s") % err.msg);
2429 queue_error_cmd(boost::lexical_cast<string>(error_code) + " " + err.msg);
2430 encountered_error = true;
2431 return true; // Don't terminate until we've send the error_cmd.
2432 }
2433}
2434
2435
2436static shared_ptr<Netxx::StreamBase>
2437build_stream_to_server(options & opts, lua_hooks & lua,
2438 netsync_connection_info info,
2439 Netxx::port_type default_port,
2440 Netxx::Timeout timeout)
2441{
2442 shared_ptr<Netxx::StreamBase> server;
2443
2444 if (info.client.use_argv)
2445 {
2446 I(info.client.argv.size() > 0);
2447 string cmd = info.client.argv[0];
2448 info.client.argv.erase(info.client.argv.begin());
2449 return shared_ptr<Netxx::StreamBase>
2450 (new Netxx::PipeStream(cmd, info.client.argv));
2451 }
2452 else
2453 {
2454#ifdef USE_IPV6
2455 bool use_ipv6=true;
2456#else
2457 bool use_ipv6=false;
2458#endif
2459 string host(info.client.u.host);
2460 if (host.empty())
2461 host = info.client.unparsed();
2462 if (!info.client.u.port.empty())
2463 default_port = lexical_cast<Netxx::port_type>(info.client.u.port);
2464 Netxx::Address addr(info.client.unparsed().c_str(),
2465 default_port, use_ipv6);
2466 return shared_ptr<Netxx::StreamBase>
2467 (new Netxx::Stream(addr, timeout));
2468 }
2469}
2470
2471static void
2472call_server(options & opts,
2473 lua_hooks & lua,
2474 project_t & project,
2475 key_store & keys,
2476 protocol_role role,
2477 netsync_connection_info const & info,
2478 Netxx::port_type default_port,
2479 unsigned long timeout_seconds)
2480{
2481 Netxx::PipeCompatibleProbe probe;
2482 transaction_guard guard(project.db);
2483
2484 Netxx::Timeout timeout(static_cast<long>(timeout_seconds)), instant(0,1);
2485
2486 P(F("connecting to %s") % info.client.unparsed);
2487
2488 shared_ptr<Netxx::StreamBase> server
2489 = build_stream_to_server(opts, lua,
2490 info, default_port,
2491 timeout);
2492
2493
2494 // 'false' here means not to revert changes when the SockOpt
2495 // goes out of scope.
2496 Netxx::SockOpt socket_options(server->get_socketfd(), false);
2497 socket_options.set_non_blocking();
2498
2499 session sess(opts, lua, project, keys,
2500 role, client_voice,
2501 info.client.include_pattern,
2502 info.client.exclude_pattern,
2503 info.client.unparsed(), server);
2504
2505 while (true)
2506 {
2507 bool armed = false;
2508 try
2509 {
2510 armed = sess.arm();
2511 }
2512 catch (bad_decode & bd)
2513 {
2514 E(false, F("protocol error while processing peer %s: '%s'")
2515 % sess.peer_id % bd.what);
2516 }
2517
2518 sess.maybe_step();
2519 sess.maybe_say_goodbye(guard);
2520
2521 probe.clear();
2522 probe.add(*(sess.str), sess.which_events());
2523 Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout);
2524 Netxx::Probe::ready_type event = res.second;
2525 Netxx::socket_type fd = res.first;
2526
2527 if (fd == -1 && !armed)
2528 {
2529 E(false, (F("timed out waiting for I/O with "
2530 "peer %s, disconnecting")
2531 % sess.peer_id));
2532 }
2533
2534 bool all_io_clean = (event != Netxx::Probe::ready_oobd);
2535
2536 if (event & Netxx::Probe::ready_read)
2537 all_io_clean = all_io_clean && sess.read_some();
2538
2539 if (event & Netxx::Probe::ready_write)
2540 all_io_clean = all_io_clean && sess.write_some();
2541
2542 if (armed)
2543 if (!sess.process(guard))
2544 {
2545 // Commit whatever work we managed to accomplish anyways.
2546 guard.commit();
2547
2548 // We failed during processing. This should only happen in
2549 // client voice when we have a decode exception, or received an
2550 // error from our server (which is translated to a decode
2551 // exception). We call these cases E() errors.
2552 E(false, F("processing failure while talking to "
2553 "peer %s, disconnecting")
2554 % sess.peer_id);
2555 return;
2556 }
2557
2558 if (!all_io_clean)
2559 {
2560 // Commit whatever work we managed to accomplish anyways.
2561 guard.commit();
2562
2563 // We had an I/O error. We must decide if this represents a
2564 // user-reported error or a clean disconnect. See protocol
2565 // state diagram in session::process_bye_cmd.
2566
2567 if (sess.protocol_state == session::confirmed_state)
2568 {
2569 P(F("successful exchange with %s")
2570 % sess.peer_id);
2571 return;
2572 }
2573 else if (sess.encountered_error)
2574 {
2575 P(F("peer %s disconnected after we informed them of error")
2576 % sess.peer_id);
2577 return;
2578 }
2579 else
2580 E(false, (F("I/O failure while talking to "
2581 "peer %s, disconnecting")
2582 % sess.peer_id));
2583 }
2584 }
2585}
2586
2587static void
2588drop_session_associated_with_fd(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2589 Netxx::socket_type fd)
2590{
2591 // This is a bit of a hack. Initially all "file descriptors" in
2592 // netsync were full duplex, so we could get away with indexing
2593 // sessions by their file descriptor.
2594 //
2595 // When using pipes in unix, it's no longer true: a session gets
2596 // entered in the session map under its read pipe fd *and* its write
2597 // pipe fd. When we're in such a situation the socket fd is "-1" and
2598 // we downcast to a PipeStream and use its read+write fds.
2599 //
2600 // When using pipes in windows, we use a full duplex pipe (named
2601 // pipe) so the socket-like abstraction holds.
2602
2603 I(fd != -1);
2604 map<Netxx::socket_type, shared_ptr<session> >::const_iterator i = sessions.find(fd);
2605 I(i != sessions.end());
2606 shared_ptr<session> sess = i->second;
2607 fd = sess->str->get_socketfd();
2608 if (fd != -1)
2609 {
2610 sessions.erase(fd);
2611 }
2612 else
2613 {
2614 shared_ptr<Netxx::PipeStream> pipe =
2615 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
2616 I(static_cast<bool>(pipe));
2617 I(pipe->get_writefd() != -1);
2618 I(pipe->get_readfd() != -1);
2619 sessions.erase(pipe->get_readfd());
2620 sessions.erase(pipe->get_writefd());
2621 }
2622}
2623
2624static void
2625arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe,
2626 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2627 set<Netxx::socket_type> & armed_sessions,
2628 transaction_guard & guard)
2629{
2630 set<Netxx::socket_type> arm_failed;
2631 for (map<Netxx::socket_type,
2632 shared_ptr<session> >::const_iterator i = sessions.begin();
2633 i != sessions.end(); ++i)
2634 {
2635 i->second->maybe_step();
2636 i->second->maybe_say_goodbye(guard);
2637 try
2638 {
2639 if (i->second->arm())
2640 {
2641 L(FL("fd %d is armed") % i->first);
2642 armed_sessions.insert(i->first);
2643 }
2644 probe.add(*i->second->str, i->second->which_events());
2645 }
2646 catch (bad_decode & bd)
2647 {
2648 W(F("protocol error while processing peer %s: '%s', marking as bad")
2649 % i->second->peer_id % bd.what);
2650 arm_failed.insert(i->first);
2651 }
2652 }
2653 for (set<Netxx::socket_type>::const_iterator i = arm_failed.begin();
2654 i != arm_failed.end(); ++i)
2655 {
2656 drop_session_associated_with_fd(sessions, *i);
2657 }
2658}
2659
2660static void
2661handle_new_connection(options & opts,
2662 lua_hooks & lua,
2663 project_t & project,
2664 key_store & keys,
2665 Netxx::Address & addr,
2666 Netxx::StreamServer & server,
2667 Netxx::Timeout & timeout,
2668 protocol_role role,
2669 map<Netxx::socket_type, shared_ptr<session> > & sessions)
2670{
2671 L(FL("accepting new connection on %s : %s")
2672 % (addr.get_name()?addr.get_name():"") % lexical_cast<string>(addr.get_port()));
2673 Netxx::Peer client = server.accept_connection();
2674
2675 if (!client)
2676 {
2677 L(FL("accept() returned a dead client"));
2678 }
2679 else
2680 {
2681 P(F("accepted new client connection from %s : %s")
2682 % client.get_address() % lexical_cast<string>(client.get_port()));
2683
2684 // 'false' here means not to revert changes when the SockOpt
2685 // goes out of scope.
2686 Netxx::SockOpt socket_options(client.get_socketfd(), false);
2687 socket_options.set_non_blocking();
2688
2689 shared_ptr<Netxx::Stream> str =
2690 shared_ptr<Netxx::Stream>
2691 (new Netxx::Stream(client.get_socketfd(), timeout));
2692
2693 shared_ptr<session> sess(new session(opts, lua, project, keys,
2694 role, server_voice,
2695 globish("*"), globish(""),
2696 lexical_cast<string>(client), str));
2697 sess->begin_service();
2698 sessions.insert(make_pair(client.get_socketfd(), sess));
2699 }
2700}
2701
2702static void
2703handle_read_available(Netxx::socket_type fd,
2704 shared_ptr<session> sess,
2705 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2706 set<Netxx::socket_type> & armed_sessions,
2707 bool & live_p)
2708{
2709 if (sess->read_some())
2710 {
2711 try
2712 {
2713 if (sess->arm())
2714 armed_sessions.insert(fd);
2715 }
2716 catch (bad_decode & bd)
2717 {
2718 W(F("protocol error while processing peer %s: '%s', disconnecting")
2719 % sess->peer_id % bd.what);
2720 drop_session_associated_with_fd(sessions, fd);
2721 live_p = false;
2722 }
2723 }
2724 else
2725 {
2726 switch (sess->protocol_state)
2727 {
2728 case session::working_state:
2729 P(F("peer %s read failed in working state (error)")
2730 % sess->peer_id);
2731 break;
2732
2733 case session::shutdown_state:
2734 P(F("peer %s read failed in shutdown state "
2735 "(possibly client misreported error)")
2736 % sess->peer_id);
2737 break;
2738
2739 case session::confirmed_state:
2740 P(F("peer %s read failed in confirmed state (success)")
2741 % sess->peer_id);
2742 break;
2743 }
2744 drop_session_associated_with_fd(sessions, fd);
2745 live_p = false;
2746 }
2747}
2748
2749
2750static void
2751handle_write_available(Netxx::socket_type fd,
2752 shared_ptr<session> sess,
2753 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2754 bool & live_p)
2755{
2756 if (!sess->write_some())
2757 {
2758 switch (sess->protocol_state)
2759 {
2760 case session::working_state:
2761 P(F("peer %s write failed in working state (error)")
2762 % sess->peer_id);
2763 break;
2764
2765 case session::shutdown_state:
2766 P(F("peer %s write failed in shutdown state "
2767 "(possibly client misreported error)")
2768 % sess->peer_id);
2769 break;
2770
2771 case session::confirmed_state:
2772 P(F("peer %s write failed in confirmed state (success)")
2773 % sess->peer_id);
2774 break;
2775 }
2776
2777 drop_session_associated_with_fd(sessions, fd);
2778 live_p = false;
2779 }
2780}
2781
2782static void
2783process_armed_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2784 set<Netxx::socket_type> & armed_sessions,
2785 transaction_guard & guard)
2786{
2787 for (set<Netxx::socket_type>::const_iterator i = armed_sessions.begin();
2788 i != armed_sessions.end(); ++i)
2789 {
2790 map<Netxx::socket_type, shared_ptr<session> >::iterator j;
2791 j = sessions.find(*i);
2792 if (j == sessions.end())
2793 continue;
2794 else
2795 {
2796 shared_ptr<session> sess = j->second;
2797 if (!sess->process(guard))
2798 {
2799 P(F("peer %s processing finished, disconnecting")
2800 % sess->peer_id);
2801 drop_session_associated_with_fd(sessions, *i);
2802 }
2803 }
2804 }
2805}
2806
2807static void
2808reap_dead_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2809 unsigned long timeout_seconds)
2810{
2811 // Kill any clients which haven't done any i/o inside the timeout period
2812 // or who have exchanged all items and flushed their output buffers.
2813 set<Netxx::socket_type> dead_clients;
2814 time_t now = ::time(NULL);
2815 for (map<Netxx::socket_type, shared_ptr<session> >::const_iterator
2816 i = sessions.begin(); i != sessions.end(); ++i)
2817 {
2818 if (static_cast<unsigned long>(i->second->last_io_time + timeout_seconds)
2819 < static_cast<unsigned long>(now))
2820 {
2821 P(F("fd %d (peer %s) has been idle too long, disconnecting")
2822 % i->first % i->second->peer_id);
2823 dead_clients.insert(i->first);
2824 }
2825 }
2826 for (set<Netxx::socket_type>::const_iterator i = dead_clients.begin();
2827 i != dead_clients.end(); ++i)
2828 {
2829 drop_session_associated_with_fd(sessions, *i);
2830 }
2831}
2832
2833static void
2834serve_connections(options & opts,
2835 lua_hooks & lua,
2836 project_t & project,
2837 key_store & keys,
2838 protocol_role role,
2839 std::list<utf8> const & addresses,
2840 Netxx::port_type default_port,
2841 unsigned long timeout_seconds,
2842 unsigned long session_limit)
2843{
2844 Netxx::PipeCompatibleProbe probe;
2845
2846 Netxx::Timeout
2847 forever,
2848 timeout(static_cast<long>(timeout_seconds)),
2849 instant(0,1);
2850
2851#ifdef USE_IPV6
2852 bool use_ipv6=true;
2853#else
2854 bool use_ipv6=false;
2855#endif
2856 // This will be true when we try to bind while using IPv6. See comments
2857 // further down.
2858 bool try_again=false;
2859
2860 do
2861 {
2862 try
2863 {
2864 try_again = false;
2865
2866 Netxx::Address addr(use_ipv6);
2867
2868 if (addresses.empty())
2869 addr.add_all_addresses(default_port);
2870 else
2871 {
2872 for (std::list<utf8>::const_iterator it = addresses.begin(); it != addresses.end(); ++it)
2873 {
2874 const utf8 & address = *it;
2875 if (!address().empty())
2876 {
2877 size_t l_colon = address().find(':');
2878 size_t r_colon = address().rfind(':');
2879
2880 if (l_colon == r_colon && l_colon == 0)
2881 {
2882 // can't be an IPv6 address as there is only one colon
2883 // must be a : followed by a port
2884 string port_str = address().substr(1);
2885 addr.add_all_addresses(std::atoi(port_str.c_str()));
2886 }
2887 else
2888 addr.add_address(address().c_str(), default_port);
2889 }
2890 }
2891 }
2892
2893 // If se use IPv6 and the initialisation of server fails, we want
2894 // to try again with IPv4. The reason is that someone may have
2895 // downloaded a IPv6-enabled monotone on a system that doesn't
2896 // have IPv6, and which might fail therefore.
2897 // On failure, Netxx::NetworkException is thrown, and we catch
2898 // it further down.
2899 try_again=use_ipv6;
2900
2901 Netxx::StreamServer server(addr, timeout);
2902
2903 // If we came this far, whatever we used (IPv6 or IPv4) was
2904 // accepted, so we don't need to try again any more.
2905 try_again=false;
2906
2907 const char *name = addr.get_name();
2908 P(F("beginning service on %s : %s")
2909 % (name != NULL ? name : _("<all interfaces>"))
2910 % lexical_cast<string>(addr.get_port()));
2911
2912 map<Netxx::socket_type, shared_ptr<session> > sessions;
2913 set<Netxx::socket_type> armed_sessions;
2914
2915 shared_ptr<transaction_guard> guard;
2916
2917 while (true)
2918 {
2919 probe.clear();
2920 armed_sessions.clear();
2921
2922 if (sessions.size() >= session_limit)
2923 W(F("session limit %d reached, some connections "
2924 "will be refused") % session_limit);
2925 else
2926 probe.add(server);
2927
2928 if (!guard)
2929 guard = shared_ptr<transaction_guard>
2930 (new transaction_guard(project.db));
2931
2932 I(guard);
2933
2934 while (!server_initiated_sync_requests.empty())
2935 {
2936 server_initiated_sync_request request
2937 = server_initiated_sync_requests.front();
2938 server_initiated_sync_requests.pop_front();
2939
2940 netsync_connection_info info;
2941 info.client.unparsed = utf8(request.address);
2942 info.client.include_pattern = globish(request.include);
2943 info.client.exclude_pattern = globish(request.exclude);
2944 info.client.use_argv = false;
2945 parse_uri(info.client.unparsed(), info.client.u);
2946
2947 try
2948 {
2949 P(F("connecting to %s") % info.client.unparsed);
2950 shared_ptr<Netxx::StreamBase> server
2951 = build_stream_to_server(opts, lua,
2952 info, default_port,
2953 timeout);
2954
2955 // 'false' here means not to revert changes when
2956 // the SockOpt goes out of scope.
2957 Netxx::SockOpt socket_options(server->get_socketfd(), false);
2958 socket_options.set_non_blocking();
2959
2960 protocol_role role = source_and_sink_role;
2961 if (request.what == "sync")
2962 role = source_and_sink_role;
2963 else if (request.what == "push")
2964 role = source_role;
2965 else if (request.what == "pull")
2966 role = sink_role;
2967
2968 shared_ptr<session> sess(new session(opts, lua,
2969 project, keys,
2970 role, client_voice,
2971 info.client.include_pattern,
2972 info.client.exclude_pattern,
2973 info.client.unparsed(),
2974 server, true));
2975
2976 sessions.insert(make_pair(server->get_socketfd(), sess));
2977 }
2978 catch (Netxx::NetworkException & e)
2979 {
2980 P(F("Network error: %s") % e.what());
2981 }
2982 }
2983
2984 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, *guard);
2985
2986 L(FL("i/o probe with %d armed") % armed_sessions.size());
2987 Netxx::socket_type fd;
2988 Netxx::Timeout how_long;
2989 if (sessions.empty())
2990 how_long = forever;
2991 else if (armed_sessions.empty())
2992 how_long = timeout;
2993 else
2994 how_long = instant;
2995 do
2996 {
2997 Netxx::Probe::result_type res = probe.ready(how_long);
2998 how_long = instant;
2999 Netxx::Probe::ready_type event = res.second;
3000 fd = res.first;
3001
3002 if (fd == -1)
3003 {
3004 if (armed_sessions.empty())
3005 L(FL("timed out waiting for I/O (listening on %s : %s)")
3006 % addr.get_name() % lexical_cast<string>(addr.get_port()));
3007 }
3008
3009 // we either got a new connection
3010 else if (fd == server)
3011 handle_new_connection(opts, lua, project, keys,
3012 addr, server, timeout, role,
3013 sessions);
3014
3015 // or an existing session woke up
3016 else
3017 {
3018 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
3019 i = sessions.find(fd);
3020 if (i == sessions.end())
3021 {
3022 L(FL("got woken up for action on unknown fd %d") % fd);
3023 }
3024 else
3025 {
3026 probe.remove(*(i->second->str));
3027 shared_ptr<session> sess = i->second;
3028 bool live_p = true;
3029
3030 try
3031 {
3032 if (event & Netxx::Probe::ready_read)
3033 handle_read_available(fd, sess, sessions,
3034 armed_sessions, live_p);
3035
3036 if (live_p && (event & Netxx::Probe::ready_write))
3037 handle_write_available(fd, sess, sessions, live_p);
3038 }
3039 catch (Netxx::Exception &)
3040 {
3041 P(F("Network error on peer %s, disconnecting")
3042 % sess->peer_id);
3043 drop_session_associated_with_fd(sessions, fd);
3044 }
3045 if (live_p && (event & Netxx::Probe::ready_oobd))
3046 {
3047 P(F("got OOB from peer %s, disconnecting")
3048 % sess->peer_id);
3049 drop_session_associated_with_fd(sessions, fd);
3050 }
3051 }
3052 }
3053 }
3054 while (fd != -1);
3055 process_armed_sessions(sessions, armed_sessions, *guard);
3056 reap_dead_sessions(sessions, timeout_seconds);
3057
3058 if (sessions.empty())
3059 {
3060 // Let the guard die completely if everything's gone quiet.
3061 guard->commit();
3062 guard.reset();
3063 }
3064 }
3065 }
3066 // This exception is thrown when bind() fails somewhere in Netxx.
3067 catch (Netxx::NetworkException &)
3068 {
3069 // If we tried with IPv6 and failed, we want to try again using IPv4.
3070 if (try_again)
3071 {
3072 use_ipv6 = false;
3073 }
3074 // In all other cases, just rethrow the exception.
3075 else
3076 throw;
3077 }
3078 // This exception is thrown when there is no support for the type of
3079 // connection we want to do in the kernel, for example when a socket()
3080 // call fails somewhere in Netxx.
3081 catch (Netxx::Exception &)
3082 {
3083 // If we tried with IPv6 and failed, we want to try again using IPv4.
3084 if (try_again)
3085 {
3086 use_ipv6 = false;
3087 }
3088 // In all other cases, just rethrow the exception.
3089 else
3090 throw;
3091 }
3092 }
3093 while(try_again);
3094 }
3095
3096static void
3097serve_single_connection(shared_ptr<session> sess,
3098 unsigned long timeout_seconds)
3099{
3100 Netxx::PipeCompatibleProbe probe;
3101
3102 Netxx::Timeout
3103 forever,
3104 timeout(static_cast<long>(timeout_seconds)),
3105 instant(0,1);
3106
3107 P(F("beginning service on %s") % sess->peer_id);
3108
3109 sess->begin_service();
3110
3111 transaction_guard guard(sess->project.db);
3112
3113 map<Netxx::socket_type, shared_ptr<session> > sessions;
3114 set<Netxx::socket_type> armed_sessions;
3115
3116 if (sess->str->get_socketfd() == -1)
3117 {
3118 // Unix pipes are non-duplex, have two filedescriptors
3119 shared_ptr<Netxx::PipeStream> pipe =
3120 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
3121 I(pipe);
3122 sessions[pipe->get_writefd()]=sess;
3123 sessions[pipe->get_readfd()]=sess;
3124 }
3125 else
3126 sessions[sess->str->get_socketfd()]=sess;
3127
3128 while (!sessions.empty())
3129 {
3130 probe.clear();
3131 armed_sessions.clear();
3132
3133 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, guard);
3134
3135 L(FL("i/o probe with %d armed") % armed_sessions.size());
3136 Netxx::Probe::result_type res = probe.ready((armed_sessions.empty() ? timeout
3137 : instant));
3138 Netxx::Probe::ready_type event = res.second;
3139 Netxx::socket_type fd = res.first;
3140
3141 if (fd == -1)
3142 {
3143 if (armed_sessions.empty())
3144 L(FL("timed out waiting for I/O (listening on %s)")
3145 % sess->peer_id);
3146 }
3147
3148 // an existing session woke up
3149 else
3150 {
3151 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
3152 i = sessions.find(fd);
3153 if (i == sessions.end())
3154 {
3155 L(FL("got woken up for action on unknown fd %d") % fd);
3156 }
3157 else
3158 {
3159 shared_ptr<session> sess = i->second;
3160 bool live_p = true;
3161
3162 if (event & Netxx::Probe::ready_read)
3163 handle_read_available(fd, sess, sessions, armed_sessions, live_p);
3164
3165 if (live_p && (event & Netxx::Probe::ready_write))
3166 handle_write_available(fd, sess, sessions, live_p);
3167
3168 if (live_p && (event & Netxx::Probe::ready_oobd))
3169 {
3170 P(F("got some OOB data on fd %d (peer %s), disconnecting")
3171 % fd % sess->peer_id);
3172 drop_session_associated_with_fd(sessions, fd);
3173 }
3174 }
3175 }
3176 process_armed_sessions(sessions, armed_sessions, guard);
3177 reap_dead_sessions(sessions, timeout_seconds);
3178 }
3179}
3180
3181
3182void
3183insert_with_parents(revision_id rev,
3184 refiner & ref,
3185 revision_enumerator & rev_enumerator,
3186 set<revision_id> & revs,
3187 ticker & revisions_ticker)
3188{
3189 deque<revision_id> work;
3190 work.push_back(rev);
3191 while (!work.empty())
3192 {
3193 revision_id rid = work.front();
3194 work.pop_front();
3195
3196 if (!null_id(rid) && revs.find(rid) == revs.end())
3197 {
3198 revs.insert(rid);
3199 ++revisions_ticker;
3200 ref.note_local_item(rid.inner());
3201 vector<revision_id> parents;
3202 rev_enumerator.get_revision_parents(rid, parents);
3203 for (vector<revision_id>::const_iterator i = parents.begin();
3204 i != parents.end(); ++i)
3205 {
3206 work.push_back(*i);
3207 }
3208 }
3209 }
3210}
3211
3212void
3213session::rebuild_merkle_trees(set<branch_name> const & branchnames)
3214{
3215 P(F("finding items to synchronize:"));
3216 for (set<branch_name>::const_iterator i = branchnames.begin();
3217 i != branchnames.end(); ++i)
3218 L(FL("including branch %s") % *i);
3219
3220 // xgettext: please use short message and try to avoid multibytes chars
3221 ticker revisions_ticker(N_("revisions"), "r", 64);
3222 // xgettext: please use short message and try to avoid multibytes chars
3223 ticker certs_ticker(N_("certificates"), "c", 256);
3224 // xgettext: please use short message and try to avoid multibytes chars
3225 ticker keys_ticker(N_("keys"), "k", 1);
3226
3227 set<revision_id> revision_ids;
3228 set<rsa_keypair_id> inserted_keys;
3229
3230 {
3231 for (set<branch_name>::const_iterator i = branchnames.begin();
3232 i != branchnames.end(); ++i)
3233 {
3234 // Get branch certs.
3235 vector< revision<cert> > certs;
3236 project.get_branch_certs(*i, certs);
3237 for (vector< revision<cert> >::const_iterator j = certs.begin();
3238 j != certs.end(); j++)
3239 {
3240 revision_id rid(j->inner().ident);
3241 insert_with_parents(rid, rev_refiner, rev_enumerator,
3242 revision_ids, revisions_ticker);
3243 // Branch certs go in here, others later on.
3244 id item;
3245 cert_hash_code(j->inner(), item);
3246 cert_refiner.note_local_item(item);
3247 rev_enumerator.note_cert(rid, item);
3248 if (inserted_keys.find(j->inner().key) == inserted_keys.end())
3249 inserted_keys.insert(j->inner().key);
3250 }
3251 }
3252 }
3253
3254 {
3255 map<branch_name, epoch_data> epochs;
3256 project.db.get_epochs(epochs);
3257
3258 epoch_data epoch_zero(string(constants::epochlen_bytes, '\x00'));
3259 for (set<branch_name>::const_iterator i = branchnames.begin();
3260 i != branchnames.end(); ++i)
3261 {
3262 branch_name const & branch(*i);
3263 map<branch_name, epoch_data>::const_iterator j;
3264 j = epochs.find(branch);
3265
3266 // Set to zero any epoch which is not yet set.
3267 if (j == epochs.end())
3268 {
3269 L(FL("setting epoch on %s to zero") % branch);
3270 epochs.insert(make_pair(branch, epoch_zero));
3271 project.db.set_epoch(branch, epoch_zero);
3272 }
3273
3274 // Then insert all epochs into merkle tree.
3275 j = epochs.find(branch);
3276 I(j != epochs.end());
3277 epoch_id eid;
3278 epoch_hash_code(j->first, j->second, eid);
3279 epoch_refiner.note_local_item(eid.inner());
3280 }
3281 }
3282
3283 {
3284 typedef vector< pair<revision_id,
3285 pair<revision_id, rsa_keypair_id> > > cert_idx;
3286
3287 cert_idx idx;
3288 project.db.get_revision_cert_nobranch_index(idx);
3289
3290 // Insert all non-branch certs reachable via these revisions
3291 // (branch certs were inserted earlier).
3292
3293 for (cert_idx::const_iterator i = idx.begin(); i != idx.end(); ++i)
3294 {
3295 revision_id const & hash = i->first;
3296 revision_id const & ident = i->second.first;
3297 rsa_keypair_id const & key = i->second.second;
3298
3299 rev_enumerator.note_cert(ident, hash.inner());
3300
3301 if (revision_ids.find(ident) == revision_ids.end())
3302 continue;
3303
3304 cert_refiner.note_local_item(hash.inner());
3305 ++certs_ticker;
3306 if (inserted_keys.find(key) == inserted_keys.end())
3307 inserted_keys.insert(key);
3308 }
3309 }
3310
3311 // Add any keys specified on the command line.
3312 for (vector<rsa_keypair_id>::const_iterator key
3313 = keys_to_push.begin();
3314 key != keys_to_push.end(); ++key)
3315 {
3316 if (inserted_keys.find(*key) == inserted_keys.end())
3317 {
3318 if (!project.db.public_key_exists(*key))
3319 {
3320 keypair kp;
3321 if (keys.maybe_get_key_pair(*key, kp))
3322 project.db.put_key(*key, kp.pub);
3323 else
3324 W(F("Cannot find key '%s'") % *key);
3325 }
3326 inserted_keys.insert(*key);
3327 }
3328 }
3329
3330 // Insert all the keys.
3331 for (set<rsa_keypair_id>::const_iterator key = inserted_keys.begin();
3332 key != inserted_keys.end(); key++)
3333 {
3334 if (project.db.public_key_exists(*key))
3335 {
3336 rsa_pub_key pub;
3337 project.db.get_key(*key, pub);
3338 id keyhash;
3339 key_hash_code(*key, pub, keyhash);
3340
3341 if (global_sanity.debug_p())
3342 L(FL("noting key '%s' = '%s' to send")
3343 % *key
3344 % encode_hexenc(keyhash()));
3345
3346 key_refiner.note_local_item(keyhash);
3347 ++keys_ticker;
3348 }
3349 }
3350
3351 rev_refiner.reindex_local_items();
3352 cert_refiner.reindex_local_items();
3353 key_refiner.reindex_local_items();
3354 epoch_refiner.reindex_local_items();
3355}
3356
3357void
3358run_netsync_protocol(options & opts, lua_hooks & lua,
3359 project_t & project, key_store & keys,
3360 protocol_voice voice,
3361 protocol_role role,
3362 netsync_connection_info const & info)
3363{
3364 if (info.client.include_pattern().find_first_of("'\"") != string::npos)
3365 {
3366 W(F("include branch pattern contains a quote character:\n"
3367 "%s") % info.client.include_pattern());
3368 }
3369
3370 if (info.client.exclude_pattern().find_first_of("'\"") != string::npos)
3371 {
3372 W(F("exclude branch pattern contains a quote character:\n"
3373 "%s") % info.client.exclude_pattern());
3374 }
3375
3376 // We do not want to be killed by SIGPIPE from a network disconnect.
3377 ignore_sigpipe();
3378
3379 try
3380 {
3381 if (voice == server_voice)
3382 {
3383 if (opts.bind_stdio)
3384 {
3385 shared_ptr<Netxx::PipeStream> str(new Netxx::PipeStream(0,1));
3386 shared_ptr<session> sess(new session(opts, lua, project, keys,
3387 role, server_voice,
3388 globish("*"), globish(""),
3389 "stdio", str));
3390 serve_single_connection(sess,constants::netsync_timeout_seconds);
3391 }
3392 else
3393 serve_connections(opts, lua, project, keys,
3394 role,
3395 info.server.addrs,
3396 static_cast<Netxx::port_type>(constants::netsync_default_port),
3397 static_cast<unsigned long>(constants::netsync_timeout_seconds),
3398 static_cast<unsigned long>(constants::netsync_connection_limit));
3399 }
3400 else
3401 {
3402 I(voice == client_voice);
3403 call_server(opts, lua, project, keys,
3404 role, info,
3405 static_cast<Netxx::port_type>(constants::netsync_default_port),
3406 static_cast<unsigned long>(constants::netsync_timeout_seconds));
3407 }
3408 }
3409 catch (Netxx::NetworkException & e)
3410 {
3411 throw informative_failure((F("network error: %s") % e.what()).str());
3412 }
3413 catch (Netxx::Exception & e)
3414 {
3415 throw oops((F("network error: %s") % e.what()).str());;
3416 }
3417}
3418
3419// Local Variables:
3420// mode: C++
3421// fill-column: 76
3422// c-file-style: "gnu"
3423// indent-tabs-mode: nil
3424// End:
3425// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status