monotone

monotone Mtn Source Tree

Root/netsync.cc

1// Copyright (C) 2004 Graydon Hoare <graydon@pobox.com>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include <map>
11#include <string>
12#include <cstdlib>
13#include <memory>
14#include <list>
15#include <deque>
16#include <stack>
17
18#include <time.h>
19
20#include <boost/lexical_cast.hpp>
21#include <boost/scoped_ptr.hpp>
22#include <boost/shared_ptr.hpp>
23#include <boost/bind.hpp>
24#include <boost/regex.hpp>
25
26#include "app_state.hh"
27#include "cert.hh"
28#include "constants.hh"
29#include "enumerator.hh"
30#include "keys.hh"
31#include "merkle_tree.hh"
32#include "netcmd.hh"
33#include "netio.hh"
34#include "netsync.hh"
35#include "numeric_vocab.hh"
36#include "packet.hh"
37#include "refiner.hh"
38#include "revision.hh"
39#include "sanity.hh"
40#include "transforms.hh"
41#include "ui.hh"
42#include "xdelta.hh"
43#include "epoch.hh"
44#include "platform.hh"
45#include "hmac.hh"
46#include "globish.hh"
47#include "uri.hh"
48
49#include "botan/botan.h"
50
51#include "netxx/address.h"
52#include "netxx/peer.h"
53#include "netxx/probe.h"
54#include "netxx/socket.h"
55#include "netxx/sockopt.h"
56#include "netxx/stream.h"
57#include "netxx/streamserver.h"
58#include "netxx/timeout.h"
59#include "netxx_pipe.hh"
60// TODO: things to do that will break protocol compatibility
61// -- need some way to upgrade anonymous to keyed pull, without user having
62// to explicitly specify which they want
63// just having a way to respond "access denied, try again" might work
64// but perhaps better to have the anonymous command include a note "I
65// _could_ use key <...> if you prefer", and if that would lead to more
66// access, could reply "I do prefer". (Does this lead to too much
67// information exposure? Allows anonymous people to probe what branches
68// a key has access to.)
69// -- "warning" packet type?
70// -- Richard Levitte wants, when you (e.g.) request '*' but don't access to
71// all of it, you just get the parts you have access to (maybe with
72// warnings about skipped branches). to do this right, should have a way
73// for the server to send back to the client "right, you're not getting
74// the following branches: ...", so the client will not include them in
75// its merkle trie.
76// -- add some sort of vhost field to the client's first packet, saying who
77// they expect to talk to
78
79//
80// This is the "new" network synchronization (netsync) system in
81// monotone. It is based on synchronizing pairs of merkle trees over an
82// interactive connection.
83//
84// A netsync process between peers treats each peer as either a source, a
85// sink, or both. When a peer is only a source, it will not write any new
86// items to its database. when a peer is only a sink, it will not send any
87// items from its database. When a peer is both a source and sink, it may
88// send and write items freely.
89//
90// The post-state of a netsync is that each sink contains a superset of the
91// items in its corresponding source; when peers are behaving as both
92// source and sink, this means that the post-state of the sync is for the
93// peers to have identical item sets.
94//
95//
96// Data structure
97// --------------
98//
99// Each node in a merkle tree contains a fixed number of slots. this number
100// is derived from a global parameter of the protocol -- the tree fanout --
101// such that the number of slots is 2^fanout. For now we will assume that
102// fanout is 4 thus there are 16 slots in a node, because this makes
103// illustration easier. The other parameter of the protocol is the size of
104// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
105//
106// Each slot in a merkle tree node is in one of 3 states:
107//
108// - empty
109// - leaf
110// - subtree
111//
112// In addition, each leaf contains a hash code which identifies an element
113// of the set being synchronized. Each subtree slot contains a hash code of
114// the node immediately beneath it in the merkle tree. Empty slots contain
115// no hash codes.
116//
117// Since empty slots have no hash code, they are represented implicitly by
118// a bitmap at the head of each merkle tree node. As an additional
119// integrity check, each merkle tree node contains a label indicating its
120// prefix in the tree, and a hash of its own contents.
121//
122// In total, then, the byte-level representation of a <160,4> merkle tree
123// node is as follows:
124//
125// 20 bytes - hash of the remaining bytes in the node
126// 1 byte - type of this node (manifest, file, key, mcert, fcert)
127// 1-N bytes - level of this node in the tree (0 == "root", uleb128)
128// 0-20 bytes - the prefix of this node, 4 bits * level,
129// rounded up to a byte
130// 1-N bytes - number of leaves under this node (uleb128)
131// 4 bytes - slot-state bitmap of the node
132// 0-320 bytes - between 0 and 16 live slots in the node
133//
134// So, in the worst case such a node is 367 bytes, with these parameters.
135//
136//
137// Protocol
138// --------
139//
140// The protocol is a binary command-packet system over TCP; each packet
141// consists of a single byte which identifies the protocol version, a byte
142// which identifies the command name inside that version, a size_t sent as
143// a uleb128 indicating the length of the packet, that many bytes of
144// payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload.
145// The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a
146// 20-byte random key chosen by the client after authentication (discussed
147// below). Decoding involves simply buffering until a sufficient number of
148// bytes are received, then advancing the buffer pointer. Any time an
149// integrity check (the HMAC) fails, the protocol is assumed to have lost
150// synchronization, and the connection is dropped. The parties are free to
151// drop the TCP stream at any point, if too much data is received or too
152// much idle time passes; no commitments or transactions are made.
153//
154//
155// Authentication and setup
156// ------------------------
157//
158// The exchange begins in a non-authenticated state. The server sends a
159// "hello <id> <nonce>" command, which identifies the server's RSA key and
160// issues a nonce which must be used for a subsequent authentication.
161//
162// The client then responds with either:
163//
164// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
165// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
166// role it wishes to play in the synchronization, identifies the pattern it
167// wishes to sync with, signs the previous nonce with its own key, and informs
168// the server of the HMAC key it wishes to use for this session (encrypted
169// with the server's public key); or
170//
171// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
172// <hmac key>" command, which identifies the role it wishes to play in the
173// synchronization, the pattern it ishes to sync with, and the HMAC key it
174// wishes to use for this session (also encrypted with the server's public
175// key).
176//
177// The server then replies with a "confirm" command, which contains no
178// other data but will only have the correct HMAC integrity code if the
179// server received and properly decrypted the HMAC key offered by the
180// client. This transitions the peers into an authenticated state and
181// begins epoch refinement. If epoch refinement and epoch transmission
182// succeed, the peers switch to data refinement and data transmission.
183//
184//
185// Refinement
186// ----------
187//
188// Refinement is executed by "refiners"; there is a refiner for each
189// set of 'items' being exchanged: epochs, keys, certs, and revisions.
190// When refinement starts, each party knows only their own set of
191// items; when refinement completes, each party has learned of the
192// complete set of items it needs to send, and a count of items it's
193// expecting to receive.
194//
195// For more details on the refinement process, see refiner.cc.
196//
197//
198// Transmission
199// ------------
200//
201// Once the set of items to send has been determined (for keys, certs, and
202// revisions) each peer switches into a transmission mode. This mode
203// involves walking the revision graph in ancestry-order and sending all
204// the items the local peer has which the remote one does not. Since the
205// remote and local peers both know all the items which need to be
206// transferred (they learned during refinement) they know what to wait for
207// and what to send. The mechanisms of the transmission phase (notably,
208// enumerator.cc) simply ensure that things are sent in the proper order,
209// and without over-filling the output buffer too much.
210//
211//
212// Shutdown
213// --------
214//
215// After transmission completes, one special command, "bye", is used to
216// shut down a connection gracefully. The shutdown sequence based on "bye"
217// commands is documented below in session::process_bye_cmd.
218//
219//
220// Note on epochs
221// --------------
222//
223// One refinement and transmission phase preceeds all the others: epochs.
224// Epochs are exchanged and compared in order to be sure that further
225// refinement and transmission (on certs and revisions) makes sense; they
226// are a sort of "immune system" to prevent incompatible databases (say
227// between rebuilds due to bugs in monotone) from cross-contaminating. The
228// later refinements are only kicked off *after* all epochs are received
229// and compare correctly.
230//
231//
232// Note on dense coding
233// --------------------
234//
235// This protocol is "raw binary" (non-text) because coding density is
236// actually important here, and each packet consists of very
237// information-dense material that you wouldn't have a hope of typing in,
238// interpreting manually anyways.
239//
240
241using std::auto_ptr;
242using std::deque;
243using std::make_pair;
244using std::map;
245using std::min;
246using std::pair;
247using std::set;
248using std::string;
249using std::vector;
250
251using boost::shared_ptr;
252using boost::lexical_cast;
253
254static inline void
255require(bool check, string const & context)
256{
257 if (!check)
258 throw bad_decode(F("check of '%s' failed") % context);
259}
260
261struct netsync_error
262{
263 string msg;
264 netsync_error(string const & s): msg(s) {}
265};
266
267struct
268session:
269 public refiner_callbacks,
270 public enumerator_callbacks
271{
272 protocol_role role;
273 protocol_voice const voice;
274 utf8 const & our_include_pattern;
275 utf8 const & our_exclude_pattern;
276 globish_matcher our_matcher;
277 app_state & app;
278
279 string peer_id;
280 shared_ptr<Netxx::StreamBase> str;
281
282 string_queue inbuf;
283 // deque of pair<string data, size_t cur_pos>
284 deque< pair<string,size_t> > outbuf;
285 // the total data stored in outbuf - this is
286 // used as a valve to stop too much data
287 // backing up
288 size_t outbuf_size;
289
290 netcmd cmd;
291 bool armed;
292 bool arm();
293
294 id remote_peer_key_hash;
295 rsa_keypair_id remote_peer_key_name;
296 netsync_session_key session_key;
297 chained_hmac read_hmac;
298 chained_hmac write_hmac;
299 bool authenticated;
300
301 time_t last_io_time;
302 auto_ptr<ticker> byte_in_ticker;
303 auto_ptr<ticker> byte_out_ticker;
304 auto_ptr<ticker> cert_in_ticker;
305 auto_ptr<ticker> cert_out_ticker;
306 auto_ptr<ticker> revision_in_ticker;
307 auto_ptr<ticker> revision_out_ticker;
308
309 vector<revision_id> written_revisions;
310 vector<rsa_keypair_id> written_keys;
311 vector<cert> written_certs;
312
313 id saved_nonce;
314 packet_db_writer dbw;
315
316 enum
317 {
318 working_state,
319 shutdown_state,
320 confirmed_state
321 }
322 protocol_state;
323 bool encountered_error;
324 bool set_totals;
325
326 // Interface to refinement.
327 refiner epoch_refiner;
328 refiner key_refiner;
329 refiner cert_refiner;
330 refiner rev_refiner;
331
332 // Interface to ancestry grovelling.
333 revision_enumerator rev_enumerator;
334
335 // Enumerator_callbacks methods.
336 set<file_id> file_items_sent;
337 bool process_this_rev(revision_id const & rev);
338 bool queue_this_cert(hexenc<id> const & c);
339 bool queue_this_file(hexenc<id> const & f);
340 void note_file_data(file_id const & f);
341 void note_file_delta(file_id const & src, file_id const & dst);
342 void note_rev(revision_id const & rev);
343 void note_cert(hexenc<id> const & c);
344
345 session(protocol_role role,
346 protocol_voice voice,
347 utf8 const & our_include_pattern,
348 utf8 const & our_exclude_pattern,
349 app_state & app,
350 string const & peer,
351 shared_ptr<Netxx::StreamBase> sock);
352
353 virtual ~session();
354
355 void rev_written_callback(revision_id rid);
356 void key_written_callback(rsa_keypair_id kid);
357 void cert_written_callback(cert const & c);
358
359 id mk_nonce();
360 void mark_recent_io();
361
362 void set_session_key(string const & key);
363 void set_session_key(rsa_oaep_sha_data const & key_encrypted);
364
365 void setup_client_tickers();
366 bool done_all_refinements();
367 bool queued_all_items();
368 bool received_all_items();
369 bool finished_working();
370 void maybe_step();
371 void maybe_say_goodbye(transaction_guard & guard);
372
373 void note_item_arrived(netcmd_item_type ty, id const & i);
374 void maybe_note_epochs_finished();
375 void note_item_sent(netcmd_item_type ty, id const & i);
376
377 Netxx::Probe::ready_type which_events() const;
378 bool read_some();
379 bool write_some();
380
381 void error(string const & errmsg);
382
383 void write_netcmd_and_try_flush(netcmd const & cmd);
384
385 // Outgoing queue-writers.
386 void queue_bye_cmd(u8 phase);
387 void queue_error_cmd(string const & errmsg);
388 void queue_done_cmd(netcmd_item_type type, size_t n_items);
389 void queue_hello_cmd(rsa_keypair_id const & key_name,
390 base64<rsa_pub_key> const & pub_encoded,
391 id const & nonce);
392 void queue_anonymous_cmd(protocol_role role,
393 utf8 const & include_pattern,
394 utf8 const & exclude_pattern,
395 id const & nonce2,
396 base64<rsa_pub_key> server_key_encoded);
397 void queue_auth_cmd(protocol_role role,
398 utf8 const & include_pattern,
399 utf8 const & exclude_pattern,
400 id const & client,
401 id const & nonce1,
402 id const & nonce2,
403 string const & signature,
404 base64<rsa_pub_key> server_key_encoded);
405 void queue_confirm_cmd();
406 void queue_refine_cmd(refinement_type ty, merkle_node const & node);
407 void queue_data_cmd(netcmd_item_type type,
408 id const & item,
409 string const & dat);
410 void queue_delta_cmd(netcmd_item_type type,
411 id const & base,
412 id const & ident,
413 delta const & del);
414
415 // Incoming dispatch-called methods.
416 bool process_error_cmd(string const & errmsg);
417 bool process_hello_cmd(rsa_keypair_id const & server_keyname,
418 rsa_pub_key const & server_key,
419 id const & nonce);
420 bool process_bye_cmd(u8 phase, transaction_guard & guard);
421 bool process_anonymous_cmd(protocol_role role,
422 utf8 const & their_include_pattern,
423 utf8 const & their_exclude_pattern);
424 bool process_auth_cmd(protocol_role role,
425 utf8 const & their_include_pattern,
426 utf8 const & their_exclude_pattern,
427 id const & client,
428 id const & nonce1,
429 string const & signature);
430 bool process_refine_cmd(refinement_type ty, merkle_node const & node);
431 bool process_done_cmd(netcmd_item_type type, size_t n_items);
432 bool process_data_cmd(netcmd_item_type type,
433 id const & item,
434 string const & dat);
435 bool process_delta_cmd(netcmd_item_type type,
436 id const & base,
437 id const & ident,
438 delta const & del);
439 bool process_usher_cmd(utf8 const & msg);
440
441 // The incoming dispatcher.
442 bool dispatch_payload(netcmd const & cmd,
443 transaction_guard & guard);
444
445 // Various helpers.
446 void assume_corresponding_role(protocol_role their_role);
447 void respond_to_confirm_cmd();
448 bool data_exists(netcmd_item_type type,
449 id const & item);
450 void load_data(netcmd_item_type type,
451 id const & item,
452 string & out);
453
454 void rebuild_merkle_trees(app_state & app,
455 set<utf8> const & branches);
456
457 void send_all_data(netcmd_item_type ty, set<id> const & items);
458 void begin_service();
459 bool process(transaction_guard & guard);
460};
461
462
463session::session(protocol_role role,
464 protocol_voice voice,
465 utf8 const & our_include_pattern,
466 utf8 const & our_exclude_pattern,
467 app_state & app,
468 string const & peer,
469 shared_ptr<Netxx::StreamBase> sock) :
470 role(role),
471 voice(voice),
472 our_include_pattern(our_include_pattern),
473 our_exclude_pattern(our_exclude_pattern),
474 our_matcher(our_include_pattern, our_exclude_pattern),
475 app(app),
476 peer_id(peer),
477 str(sock),
478 inbuf(),
479 outbuf_size(0),
480 armed(false),
481 remote_peer_key_hash(""),
482 remote_peer_key_name(""),
483 session_key(constants::netsync_key_initializer),
484 read_hmac(constants::netsync_key_initializer, app.use_transport_auth),
485 write_hmac(constants::netsync_key_initializer, app.use_transport_auth),
486 authenticated(false),
487 last_io_time(::time(NULL)),
488 byte_in_ticker(NULL),
489 byte_out_ticker(NULL),
490 cert_in_ticker(NULL),
491 cert_out_ticker(NULL),
492 revision_in_ticker(NULL),
493 revision_out_ticker(NULL),
494 saved_nonce(""),
495 dbw(app),
496 protocol_state(working_state),
497 encountered_error(false),
498 set_totals(false),
499 epoch_refiner(epoch_item, voice, *this),
500 key_refiner(key_item, voice, *this),
501 cert_refiner(cert_item, voice, *this),
502 rev_refiner(revision_item, voice, *this),
503 rev_enumerator(*this, app)
504{
505 dbw.set_on_revision_written(boost::bind(&session::rev_written_callback,
506 this, _1));
507 dbw.set_on_cert_written(boost::bind(&session::cert_written_callback,
508 this, _1));
509 dbw.set_on_pubkey_written(boost::bind(&session::key_written_callback,
510 this, _1));
511}
512
513session::~session()
514{
515 static const char letters[] = "0123456789abcdef";
516 string nonce;
517 for (int i = 0; i < 16; i++)
518 nonce.append(1, letters[Botan::Global_RNG::random(Botan::Nonce)
519 % (sizeof(letters) - 1)]);
520
521 vector<cert> unattached_certs;
522 map<revision_id, vector<cert> > revcerts;
523 for (vector<revision_id>::iterator i = written_revisions.begin();
524 i != written_revisions.end(); ++i)
525 revcerts.insert(make_pair(*i, vector<cert>()));
526 for (vector<cert>::iterator i = written_certs.begin();
527 i != written_certs.end(); ++i)
528 {
529 map<revision_id, vector<cert> >::iterator j;
530 j = revcerts.find(i->ident);
531 if (j == revcerts.end())
532 unattached_certs.push_back(*i);
533 else
534 j->second.push_back(*i);
535 }
536
537 // if (role == sink_role || role == source_and_sink_role)
538 if (!written_keys.empty()
539 || !written_revisions.empty()
540 || !written_certs.empty())
541 {
542 //Start
543 app.lua.hook_note_netsync_start(nonce);
544
545 //Keys
546 for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
547 i != written_keys.end(); ++i)
548 {
549 app.lua.hook_note_netsync_pubkey_received(*i, nonce);
550 }
551
552 //Revisions
553 for (vector<revision_id>::iterator i = written_revisions.begin();
554 i != written_revisions.end(); ++i)
555 {
556 vector<cert> & ctmp(revcerts[*i]);
557 set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
558 for (vector<cert>::const_iterator j = ctmp.begin();
559 j != ctmp.end(); ++j)
560 {
561 cert_value vtmp;
562 decode_base64(j->value, vtmp);
563 certs.insert(make_pair(j->key, make_pair(j->name, vtmp)));
564 }
565 revision_data rdat;
566 app.db.get_revision(*i, rdat);
567 app.lua.hook_note_netsync_revision_received(*i, rdat, certs, nonce);
568 }
569
570 //Certs (not attached to a new revision)
571 for (vector<cert>::iterator i = unattached_certs.begin();
572 i != unattached_certs.end(); ++i)
573 {
574 cert_value tmp;
575 decode_base64(i->value, tmp);
576 app.lua.hook_note_netsync_cert_received(i->ident, i->key,
577 i->name, tmp, nonce);
578 }
579
580 //Start
581 app.lua.hook_note_netsync_end(nonce);
582 }
583}
584
585bool
586session::process_this_rev(revision_id const & rev)
587{
588 id item;
589 decode_hexenc(rev.inner(), item);
590 return (rev_refiner.items_to_send.find(item)
591 != rev_refiner.items_to_send.end());
592}
593
594bool
595session::queue_this_cert(hexenc<id> const & c)
596{
597 id item;
598 decode_hexenc(c, item);
599 return (cert_refiner.items_to_send.find(item)
600 != cert_refiner.items_to_send.end());
601}
602
603bool
604session::queue_this_file(hexenc<id> const & f)
605{
606 return file_items_sent.find(f) == file_items_sent.end();
607}
608
609void
610session::note_file_data(file_id const & f)
611{
612 if (role == sink_role)
613 return;
614 file_data fd;
615 id item;
616 decode_hexenc(f.inner(), item);
617 app.db.get_file_version(f, fd);
618 queue_data_cmd(file_item, item, fd.inner()());
619 file_items_sent.insert(f);
620}
621
622void
623session::note_file_delta(file_id const & src, file_id const & dst)
624{
625 if (role == sink_role)
626 return;
627 file_delta fdel;
628 id fid1, fid2;
629 decode_hexenc(src.inner(), fid1);
630 decode_hexenc(dst.inner(), fid2);
631 app.db.get_arbitrary_file_delta(src, dst, fdel);
632 queue_delta_cmd(file_item, fid1, fid2, fdel.inner());
633 file_items_sent.insert(dst);
634}
635
636void
637session::note_rev(revision_id const & rev)
638{
639 if (role == sink_role)
640 return;
641 revision_t rs;
642 id item;
643 decode_hexenc(rev.inner(), item);
644 app.db.get_revision(rev, rs);
645 data tmp;
646 write_revision(rs, tmp);
647 queue_data_cmd(revision_item, item, tmp());
648}
649
650void
651session::note_cert(hexenc<id> const & c)
652{
653 if (role == sink_role)
654 return;
655 id item;
656 decode_hexenc(c, item);
657 revision<cert> cert;
658 string str;
659 app.db.get_revision_cert(c, cert);
660 write_cert(cert.inner(), str);
661 queue_data_cmd(cert_item, item, str);
662}
663
664
665void session::rev_written_callback(revision_id rid)
666{
667 written_revisions.push_back(rid);
668}
669
670void session::key_written_callback(rsa_keypair_id kid)
671{
672 written_keys.push_back(kid);
673}
674
675void session::cert_written_callback(cert const & c)
676{
677 written_certs.push_back(c);
678}
679
680id
681session::mk_nonce()
682{
683 I(this->saved_nonce().size() == 0);
684 char buf[constants::merkle_hash_length_in_bytes];
685 Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf),
686 constants::merkle_hash_length_in_bytes);
687 this->saved_nonce = string(buf, buf + constants::merkle_hash_length_in_bytes);
688 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
689 return this->saved_nonce;
690}
691
692void
693session::mark_recent_io()
694{
695 last_io_time = ::time(NULL);
696}
697
698void
699session::set_session_key(string const & key)
700{
701 session_key = netsync_session_key(key);
702 read_hmac.set_key(session_key);
703 write_hmac.set_key(session_key);
704}
705
706void
707session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
708{
709 if (app.use_transport_auth)
710 {
711 keypair our_kp;
712 load_key_pair(app, app.signing_key, our_kp);
713 string hmac_key;
714 decrypt_rsa(app.lua, app.signing_key, our_kp.priv,
715 hmac_key_encrypted, hmac_key);
716 set_session_key(hmac_key);
717 }
718}
719
720void
721session::setup_client_tickers()
722{
723 // xgettext: please use short message and try to avoid multibytes chars
724 byte_in_ticker.reset(new ticker(N_("bytes in"), ">", 1024, true));
725 // xgettext: please use short message and try to avoid multibytes chars
726 byte_out_ticker.reset(new ticker(N_("bytes out"), "<", 1024, true));
727 if (role == sink_role)
728 {
729 // xgettext: please use short message and try to avoid multibytes chars
730 cert_in_ticker.reset(new ticker(N_("certs in"), "c", 3));
731 // xgettext: please use short message and try to avoid multibytes chars
732 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
733 }
734 else if (role == source_role)
735 {
736 // xgettext: please use short message and try to avoid multibytes chars
737 cert_out_ticker.reset(new ticker(N_("certs out"), "C", 3));
738 // xgettext: please use short message and try to avoid multibytes chars
739 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
740 }
741 else
742 {
743 I(role == source_and_sink_role);
744 // xgettext: please use short message and try to avoid multibytes chars
745 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
746 // xgettext: please use short message and try to avoid multibytes chars
747 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
748 }
749}
750
751bool
752session::done_all_refinements()
753{
754 bool all = rev_refiner.done
755 && cert_refiner.done
756 && key_refiner.done
757 && epoch_refiner.done;
758
759 if (all && !set_totals)
760 {
761 if (cert_out_ticker.get())
762 cert_out_ticker->set_total(cert_refiner.items_to_send.size());
763
764 if (revision_out_ticker.get())
765 revision_out_ticker->set_total(rev_refiner.items_to_send.size());
766
767 if (cert_in_ticker.get())
768 cert_in_ticker->set_total(cert_refiner.items_to_receive);
769
770 if (revision_in_ticker.get())
771 revision_in_ticker->set_total(rev_refiner.items_to_receive);
772
773 set_totals = true;
774 }
775 return all;
776}
777
778
779
780bool
781session::received_all_items()
782{
783 if (role == source_role)
784 return true;
785 bool all = rev_refiner.items_to_receive == 0
786 && cert_refiner.items_to_receive == 0
787 && key_refiner.items_to_receive == 0
788 && epoch_refiner.items_to_receive == 0;
789 return all;
790}
791
792bool
793session::finished_working()
794{
795 bool all = done_all_refinements()
796 && received_all_items()
797 && queued_all_items()
798 && rev_enumerator.done();
799 return all;
800}
801
802bool
803session::queued_all_items()
804{
805 if (role == sink_role)
806 return true;
807 bool all = rev_refiner.items_to_send.empty()
808 && cert_refiner.items_to_send.empty()
809 && key_refiner.items_to_send.empty()
810 && epoch_refiner.items_to_send.empty();
811 return all;
812}
813
814
815void
816session::maybe_note_epochs_finished()
817{
818 // Maybe there are outstanding epoch requests.
819 // These only matter if we're in sink or source-and-sink mode.
820 if (!(epoch_refiner.items_to_receive == 0) && !(role == source_role))
821 return;
822
823 // And maybe we haven't even finished the refinement.
824 if (!epoch_refiner.done)
825 return;
826
827 // If we ran into an error -- say a mismatched epoch -- don't do any
828 // further refinements.
829 if (encountered_error)
830 return;
831
832 // But otherwise, we're ready to go. Start the next
833 // set of refinements.
834 if (voice == client_voice)
835 {
836 L(FL("epoch refinement finished; beginning other refinements"));
837 key_refiner.begin_refinement();
838 cert_refiner.begin_refinement();
839 rev_refiner.begin_refinement();
840 }
841 else
842 L(FL("epoch refinement finished"));
843}
844
845static void
846decrement_if_nonzero(netcmd_item_type ty,
847 size_t & n)
848{
849 if (n == 0)
850 {
851 string typestr;
852 netcmd_item_type_to_string(ty, typestr);
853 E(false, F("underflow on count of %s items to receive") % typestr);
854 }
855 --n;
856}
857
858void
859session::note_item_arrived(netcmd_item_type ty, id const & ident)
860{
861 switch (ty)
862 {
863 case cert_item:
864 decrement_if_nonzero(ty, cert_refiner.items_to_receive);
865 if (cert_in_ticker.get() != NULL)
866 ++(*cert_in_ticker);
867 break;
868 case revision_item:
869 decrement_if_nonzero(ty, rev_refiner.items_to_receive);
870 if (revision_in_ticker.get() != NULL)
871 ++(*revision_in_ticker);
872 break;
873 case key_item:
874 decrement_if_nonzero(ty, key_refiner.items_to_receive);
875 break;
876 case epoch_item:
877 decrement_if_nonzero(ty, epoch_refiner.items_to_receive);
878 break;
879 default:
880 // No ticker for other things.
881 break;
882 }
883}
884
885
886
887void
888session::note_item_sent(netcmd_item_type ty, id const & ident)
889{
890 switch (ty)
891 {
892 case cert_item:
893 cert_refiner.items_to_send.erase(ident);
894 if (cert_out_ticker.get() != NULL)
895 ++(*cert_out_ticker);
896 break;
897 case revision_item:
898 rev_refiner.items_to_send.erase(ident);
899 if (revision_out_ticker.get() != NULL)
900 ++(*revision_out_ticker);
901 break;
902 case key_item:
903 key_refiner.items_to_send.erase(ident);
904 break;
905 case epoch_item:
906 epoch_refiner.items_to_send.erase(ident);
907 break;
908 default:
909 // No ticker for other things.
910 break;
911 }
912}
913
914void
915session::write_netcmd_and_try_flush(netcmd const & cmd)
916{
917 if (!encountered_error)
918 {
919 string buf;
920 cmd.write(buf, write_hmac);
921 outbuf.push_back(make_pair(buf, 0));
922 outbuf_size += buf.size();
923 }
924 else
925 L(FL("dropping outgoing netcmd (because we're in error unwind mode)"));
926 // FIXME: this helps keep the protocol pipeline full but it seems to
927 // interfere with initial and final sequences. careful with it.
928 // write_some();
929 // read_some();
930}
931
932// This method triggers a special "error unwind" mode to netsync. In this
933// mode, all received data is ignored, and no new data is queued. We simply
934// stay connected long enough for the current write buffer to be flushed, to
935// ensure that our peer receives the error message.
936// Affects read_some, write_some, and process .
937void
938session::error(string const & errmsg)
939{
940 throw netsync_error(errmsg);
941}
942
943Netxx::Probe::ready_type
944session::which_events() const
945{
946 // Only ask to read if we're not armed.
947 if (outbuf.empty())
948 {
949 if (inbuf.size() < constants::netcmd_maxsz && !armed)
950 return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
951 else
952 return Netxx::Probe::ready_oobd;
953 }
954 else
955 {
956 if (inbuf.size() < constants::netcmd_maxsz && !armed)
957 return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
958 else
959 return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
960 }
961}
962
963bool
964session::read_some()
965{
966 I(inbuf.size() < constants::netcmd_maxsz);
967 char tmp[constants::bufsz];
968 Netxx::signed_size_type count = str->read(tmp, sizeof(tmp));
969 if (count > 0)
970 {
971 L(FL("read %d bytes from fd %d (peer %s)") % count % str->get_socketfd() % peer_id);
972 if (encountered_error)
973 {
974 L(FL("in error unwind mode, so throwing them into the bit bucket"));
975 return true;
976 }
977 inbuf.append(tmp,count);
978 mark_recent_io();
979 if (byte_in_ticker.get() != NULL)
980 (*byte_in_ticker) += count;
981 return true;
982 }
983 else
984 return false;
985}
986
987bool
988session::write_some()
989{
990 I(!outbuf.empty());
991 size_t writelen = outbuf.front().first.size() - outbuf.front().second;
992 Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second,
993 min(writelen,
994 constants::bufsz));
995 if (count > 0)
996 {
997 if ((size_t)count == writelen)
998 {
999 outbuf_size -= outbuf.front().first.size();
1000 outbuf.pop_front();
1001 }
1002 else
1003 {
1004 outbuf.front().second += count;
1005 }
1006 L(FL("wrote %d bytes to fd %d (peer %s)")
1007 % count % str->get_socketfd() % peer_id);
1008 mark_recent_io();
1009 if (byte_out_ticker.get() != NULL)
1010 (*byte_out_ticker) += count;
1011 if (encountered_error && outbuf.empty())
1012 {
1013 // we've flushed our error message, so it's time to get out.
1014 L(FL("finished flushing output queue in error unwind mode, disconnecting"));
1015 return false;
1016 }
1017 return true;
1018 }
1019 else
1020 return false;
1021}
1022
1023// senders
1024
1025void
1026session::queue_error_cmd(string const & errmsg)
1027{
1028 L(FL("queueing 'error' command"));
1029 netcmd cmd;
1030 cmd.write_error_cmd(errmsg);
1031 write_netcmd_and_try_flush(cmd);
1032}
1033
1034void
1035session::queue_bye_cmd(u8 phase)
1036{
1037 L(FL("queueing 'bye' command, phase %d")
1038 % static_cast<size_t>(phase));
1039 netcmd cmd;
1040 cmd.write_bye_cmd(phase);
1041 write_netcmd_and_try_flush(cmd);
1042}
1043
1044void
1045session::queue_done_cmd(netcmd_item_type type,
1046 size_t n_items)
1047{
1048 string typestr;
1049 netcmd_item_type_to_string(type, typestr);
1050 L(FL("queueing 'done' command for %s (%d items)")
1051 % typestr % n_items);
1052 netcmd cmd;
1053 cmd.write_done_cmd(type, n_items);
1054 write_netcmd_and_try_flush(cmd);
1055}
1056
1057void
1058session::queue_hello_cmd(rsa_keypair_id const & key_name,
1059 base64<rsa_pub_key> const & pub_encoded,
1060 id const & nonce)
1061{
1062 rsa_pub_key pub;
1063 if (app.use_transport_auth)
1064 decode_base64(pub_encoded, pub);
1065 cmd.write_hello_cmd(key_name, pub, nonce);
1066 write_netcmd_and_try_flush(cmd);
1067}
1068
1069void
1070session::queue_anonymous_cmd(protocol_role role,
1071 utf8 const & include_pattern,
1072 utf8 const & exclude_pattern,
1073 id const & nonce2,
1074 base64<rsa_pub_key> server_key_encoded)
1075{
1076 netcmd cmd;
1077 rsa_oaep_sha_data hmac_key_encrypted;
1078 if (app.use_transport_auth)
1079 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1080 nonce2(), hmac_key_encrypted);
1081 cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
1082 hmac_key_encrypted);
1083 write_netcmd_and_try_flush(cmd);
1084 set_session_key(nonce2());
1085}
1086
1087void
1088session::queue_auth_cmd(protocol_role role,
1089 utf8 const & include_pattern,
1090 utf8 const & exclude_pattern,
1091 id const & client,
1092 id const & nonce1,
1093 id const & nonce2,
1094 string const & signature,
1095 base64<rsa_pub_key> server_key_encoded)
1096{
1097 netcmd cmd;
1098 rsa_oaep_sha_data hmac_key_encrypted;
1099 I(app.use_transport_auth);
1100 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1101 nonce2(), hmac_key_encrypted);
1102 cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
1103 nonce1, hmac_key_encrypted, signature);
1104 write_netcmd_and_try_flush(cmd);
1105 set_session_key(nonce2());
1106}
1107
1108void
1109session::queue_confirm_cmd()
1110{
1111 netcmd cmd;
1112 cmd.write_confirm_cmd();
1113 write_netcmd_and_try_flush(cmd);
1114}
1115
1116void
1117session::queue_refine_cmd(refinement_type ty, merkle_node const & node)
1118{
1119 string typestr;
1120 hexenc<prefix> hpref;
1121 node.get_hex_prefix(hpref);
1122 netcmd_item_type_to_string(node.type, typestr);
1123 L(FL("queueing refinement %s of %s node '%s', level %d")
1124 % (ty == refinement_query ? "query" : "response")
1125 % typestr % hpref % static_cast<int>(node.level));
1126 netcmd cmd;
1127 cmd.write_refine_cmd(ty, node);
1128 write_netcmd_and_try_flush(cmd);
1129}
1130
1131void
1132session::queue_data_cmd(netcmd_item_type type,
1133 id const & item,
1134 string const & dat)
1135{
1136 string typestr;
1137 netcmd_item_type_to_string(type, typestr);
1138 hexenc<id> hid;
1139 encode_hexenc(item, hid);
1140
1141 if (role == sink_role)
1142 {
1143 L(FL("not queueing %s data for '%s' as we are in pure sink role")
1144 % typestr % hid);
1145 return;
1146 }
1147
1148 L(FL("queueing %d bytes of data for %s item '%s'")
1149 % dat.size() % typestr % hid);
1150
1151 netcmd cmd;
1152 // TODO: This pair of functions will make two copies of a large
1153 // file, the first in cmd.write_data_cmd, and the second in
1154 // write_netcmd_and_try_flush when the data is copied from the
1155 // cmd.payload variable to the string buffer for output. This
1156 // double copy should be collapsed out, it may be better to use
1157 // a string_queue for output as well as input, as that will reduce
1158 // the amount of mallocs that happen when the string queue is large
1159 // enough to just store the data.
1160 cmd.write_data_cmd(type, item, dat);
1161 write_netcmd_and_try_flush(cmd);
1162 note_item_sent(type, item);
1163}
1164
1165void
1166session::queue_delta_cmd(netcmd_item_type type,
1167 id const & base,
1168 id const & ident,
1169 delta const & del)
1170{
1171 I(type == file_item);
1172 I(! del().empty() || ident == base);
1173 string typestr;
1174 netcmd_item_type_to_string(type, typestr);
1175 hexenc<id> base_hid;
1176 encode_hexenc(base, base_hid);
1177 hexenc<id> ident_hid;
1178 encode_hexenc(ident, ident_hid);
1179
1180 if (role == sink_role)
1181 {
1182 L(FL("not queueing %s delta '%s' -> '%s' as we are in pure sink role")
1183 % typestr % base_hid % ident_hid);
1184 return;
1185 }
1186
1187 L(FL("queueing %s delta '%s' -> '%s'")
1188 % typestr % base_hid % ident_hid);
1189 netcmd cmd;
1190 cmd.write_delta_cmd(type, base, ident, del);
1191 write_netcmd_and_try_flush(cmd);
1192 note_item_sent(type, ident);
1193}
1194
1195
1196// processors
1197
1198bool
1199session::process_error_cmd(string const & errmsg)
1200{
1201 throw bad_decode(F("received network error: %s") % errmsg);
1202}
1203
1204void
1205get_branches(app_state & app, vector<string> & names)
1206{
1207 app.db.get_branches(names);
1208 sort(names.begin(), names.end());
1209}
1210
1211static const var_domain known_servers_domain = var_domain("known-servers");
1212
1213bool
1214session::process_hello_cmd(rsa_keypair_id const & their_keyname,
1215 rsa_pub_key const & their_key,
1216 id const & nonce)
1217{
1218 I(this->remote_peer_key_hash().size() == 0);
1219 I(this->saved_nonce().size() == 0);
1220
1221 base64<rsa_pub_key> their_key_encoded;
1222
1223 if (app.use_transport_auth)
1224 {
1225 hexenc<id> their_key_hash;
1226 encode_base64(their_key, their_key_encoded);
1227 key_hash_code(their_keyname, their_key_encoded, their_key_hash);
1228 L(FL("server key has name %s, hash %s") % their_keyname % their_key_hash);
1229 var_key their_key_key(known_servers_domain, var_name(peer_id));
1230 if (app.db.var_exists(their_key_key))
1231 {
1232 var_value expected_key_hash;
1233 app.db.get_var(their_key_key, expected_key_hash);
1234 if (expected_key_hash() != their_key_hash())
1235 {
1236 P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1237 "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"
1238 "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1239 "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
1240 "it is also possible that the server key has just been changed\n"
1241 "remote host sent key %s\n"
1242 "I expected %s\n"
1243 "'%s unset %s %s' overrides this check")
1244 % their_key_hash % expected_key_hash
1245 % app.prog_name % their_key_key.first % their_key_key.second);
1246 E(false, F("server key changed"));
1247 }
1248 }
1249 else
1250 {
1251 P(F("first time connecting to server %s\n"
1252 "I'll assume it's really them, but you might want to double-check\n"
1253 "their key's fingerprint: %s\n") % peer_id % their_key_hash);
1254 app.db.set_var(their_key_key, var_value(their_key_hash()));
1255 }
1256 if (!app.db.public_key_exists(their_key_hash))
1257 {
1258 W(F("saving public key for %s to database") % their_keyname);
1259 app.db.put_key(their_keyname, their_key_encoded);
1260 }
1261 {
1262 hexenc<id> hnonce;
1263 encode_hexenc(nonce, hnonce);
1264 L(FL("received 'hello' netcmd from server '%s' with nonce '%s'")
1265 % their_key_hash % hnonce);
1266 }
1267
1268 I(app.db.public_key_exists(their_key_hash));
1269
1270 // save their identity
1271 id their_key_hash_decoded;
1272 decode_hexenc(their_key_hash, their_key_hash_decoded);
1273 this->remote_peer_key_hash = their_key_hash_decoded;
1274 }
1275
1276 // clients always include in the synchronization set, every branch that the
1277 // user requested
1278 vector<string> branchnames;
1279 set<utf8> ok_branches;
1280 get_branches(app, branchnames);
1281 for (vector<string>::const_iterator i = branchnames.begin();
1282 i != branchnames.end(); i++)
1283 {
1284 if (our_matcher(*i))
1285 ok_branches.insert(utf8(*i));
1286 }
1287 rebuild_merkle_trees(app, ok_branches);
1288
1289 setup_client_tickers();
1290
1291 if (app.use_transport_auth &&
1292 app.signing_key() != "")
1293 {
1294 // get our key pair
1295 keypair our_kp;
1296 load_key_pair(app, app.signing_key, our_kp);
1297
1298 // get the hash identifier for our pubkey
1299 hexenc<id> our_key_hash;
1300 id our_key_hash_raw;
1301 key_hash_code(app.signing_key, our_kp.pub, our_key_hash);
1302 decode_hexenc(our_key_hash, our_key_hash_raw);
1303
1304 // make a signature
1305 base64<rsa_sha1_signature> sig;
1306 rsa_sha1_signature sig_raw;
1307 make_signature(app, app.signing_key, our_kp.priv, nonce(), sig);
1308 decode_base64(sig, sig_raw);
1309
1310 // make a new nonce of our own and send off the 'auth'
1311 queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern,
1312 our_key_hash_raw, nonce, mk_nonce(), sig_raw(),
1313 their_key_encoded);
1314 }
1315 else
1316 {
1317 queue_anonymous_cmd(this->role, our_include_pattern,
1318 our_exclude_pattern, mk_nonce(), their_key_encoded);
1319 }
1320
1321 return true;
1322}
1323
1324bool
1325session::process_anonymous_cmd(protocol_role their_role,
1326 utf8 const & their_include_pattern,
1327 utf8 const & their_exclude_pattern)
1328{
1329 // Internally netsync thinks in terms of sources and sinks. Users like
1330 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1331 //
1332 // We therefore use the read/write terminology when dealing with the UI:
1333 // if the user asks to run a "read only" service, this means they are
1334 // willing to be a source but not a sink.
1335 //
1336 // nb: The "role" here is the role the *client* wants to play
1337 // so we need to check that the opposite role is allowed for us,
1338 // in our this->role field.
1339 //
1340
1341 // Client must be a sink and server must be a source (anonymous
1342 // read-only), unless transport auth is disabled.
1343 //
1344 // If running in no-transport-auth mode, we operate anonymously and
1345 // permit adoption of any role.
1346
1347 if (app.use_transport_auth)
1348 {
1349 if (their_role != sink_role)
1350 {
1351 this->saved_nonce = id("");
1352 error(F("rejected attempt at anonymous connection for write").str());
1353 }
1354
1355 if (this->role == sink_role)
1356 {
1357 this->saved_nonce = id("");
1358 error(F("rejected attempt at anonymous connection while running as sink").str());
1359 }
1360 }
1361
1362 vector<string> branchnames;
1363 set<utf8> ok_branches;
1364 get_branches(app, branchnames);
1365 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1366 for (vector<string>::const_iterator i = branchnames.begin();
1367 i != branchnames.end(); i++)
1368 {
1369 if (their_matcher(*i))
1370 if (!our_matcher(*i))
1371 {
1372 error((F("not serving branch '%s'") % *i).str());
1373 }
1374 else if (app.use_transport_auth &&
1375 !app.lua.hook_get_netsync_read_permitted(*i))
1376 {
1377 error((F("anonymous access to branch '%s' denied by server") % *i).str());
1378 }
1379 else
1380 ok_branches.insert(utf8(*i));
1381 }
1382
1383 if (app.use_transport_auth)
1384 {
1385 P(F("allowed anonymous read permission for '%s' excluding '%s'")
1386 % their_include_pattern % their_exclude_pattern);
1387 this->role = source_role;
1388 }
1389 else
1390 {
1391 P(F("allowed anonymous read/write permission for '%s' excluding '%s'")
1392 % their_include_pattern % their_exclude_pattern);
1393 assume_corresponding_role(their_role);
1394 }
1395
1396 rebuild_merkle_trees(app, ok_branches);
1397
1398 this->remote_peer_key_name = rsa_keypair_id("");
1399 this->authenticated = true;
1400 return true;
1401}
1402
1403void
1404session::assume_corresponding_role(protocol_role their_role)
1405{
1406 // Assume the (possibly degraded) opposite role.
1407 switch (their_role)
1408 {
1409 case source_role:
1410 I(this->role != source_role);
1411 this->role = sink_role;
1412 break;
1413
1414 case source_and_sink_role:
1415 I(this->role == source_and_sink_role);
1416 break;
1417
1418 case sink_role:
1419 I(this->role != sink_role);
1420 this->role = source_role;
1421 break;
1422 }
1423}
1424
1425bool
1426session::process_auth_cmd(protocol_role their_role,
1427 utf8 const & their_include_pattern,
1428 utf8 const & their_exclude_pattern,
1429 id const & client,
1430 id const & nonce1,
1431 string const & signature)
1432{
1433 I(this->remote_peer_key_hash().size() == 0);
1434 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1435
1436 hexenc<id> their_key_hash;
1437 encode_hexenc(client, their_key_hash);
1438 set<utf8> ok_branches;
1439 vector<string> branchnames;
1440 get_branches(app, branchnames);
1441 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1442
1443 // Check that they replied with the nonce we asked for.
1444 if (!(nonce1 == this->saved_nonce))
1445 {
1446 this->saved_nonce = id("");
1447 error(F("detected replay attack in auth netcmd").str());
1448 }
1449
1450 // Internally netsync thinks in terms of sources and sinks. users like
1451 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1452 //
1453 // We therefore use the read/write terminology when dealing with the UI:
1454 // if the user asks to run a "read only" service, this means they are
1455 // willing to be a source but not a sink.
1456 //
1457 // nb: The "their_role" here is the role the *client* wants to play
1458 // so we need to check that the opposite role is allowed for us,
1459 // in our this->role field.
1460
1461 if (!app.db.public_key_exists(their_key_hash))
1462 {
1463 // If it's not in the db, it still could be in the keystore if we
1464 // have the private key that goes with it.
1465 if (!app.keys.try_ensure_in_db(their_key_hash))
1466 {
1467 this->saved_nonce = id("");
1468 error((F("remote public key hash '%s' is unknown") % their_key_hash).str());
1469 }
1470 }
1471
1472 // Get their public key.
1473 rsa_keypair_id their_id;
1474 base64<rsa_pub_key> their_key;
1475 app.db.get_pubkey(their_key_hash, their_id, their_key);
1476
1477 // Client as sink, server as source (reading).
1478
1479 if (their_role == sink_role || their_role == source_and_sink_role)
1480 {
1481 if (this->role != source_role && this->role != source_and_sink_role)
1482 {
1483 this->saved_nonce = id("");
1484 error((F("denied '%s' read permission for '%s' excluding '%s' while running as pure sink")
1485 % their_id % their_include_pattern % their_exclude_pattern).str());
1486 }
1487 }
1488
1489 for (vector<string>::const_iterator i = branchnames.begin();
1490 i != branchnames.end(); i++)
1491 {
1492 if (their_matcher(*i))
1493 {
1494 if (!our_matcher(*i))
1495 {
1496 error((F("not serving branch '%s'") % *i).str());
1497
1498 }
1499 else if (!app.lua.hook_get_netsync_read_permitted(*i, their_id))
1500 {
1501 error((F("denied '%s' read permission for '%s' excluding '%s' because of branch '%s'")
1502 % their_id % their_include_pattern % their_exclude_pattern % *i).str());
1503 }
1504 else
1505 ok_branches.insert(utf8(*i));
1506 }
1507 }
1508
1509 // If we're source_and_sink_role, continue even with no branches readable
1510 // eg. serve --db=empty.db
1511 P(F("allowed '%s' read permission for '%s' excluding '%s'")
1512 % their_id % their_include_pattern % their_exclude_pattern);
1513
1514 // Client as source, server as sink (writing).
1515
1516 if (their_role == source_role || their_role == source_and_sink_role)
1517 {
1518 if (this->role != sink_role && this->role != source_and_sink_role)
1519 {
1520 this->saved_nonce = id("");
1521 error((F("denied '%s' write permission for '%s' excluding '%s' while running as pure source")
1522 % their_id % their_include_pattern % their_exclude_pattern).str());
1523 }
1524
1525 if (!app.lua.hook_get_netsync_write_permitted(their_id))
1526 {
1527 this->saved_nonce = id("");
1528 error((F("denied '%s' write permission for '%s' excluding '%s'")
1529 % their_id % their_include_pattern % their_exclude_pattern).str());
1530 }
1531
1532 P(F("allowed '%s' write permission for '%s' excluding '%s'")
1533 % their_id % their_include_pattern % their_exclude_pattern);
1534 }
1535
1536 rebuild_merkle_trees(app, ok_branches);
1537
1538 // Save their identity.
1539 this->remote_peer_key_hash = client;
1540
1541 // Check the signature.
1542 base64<rsa_sha1_signature> sig;
1543 encode_base64(rsa_sha1_signature(signature), sig);
1544 if (check_signature(app, their_id, their_key, nonce1(), sig))
1545 {
1546 // Get our private key and sign back.
1547 L(FL("client signature OK, accepting authentication"));
1548 this->authenticated = true;
1549 this->remote_peer_key_name = their_id;
1550
1551 assume_corresponding_role(their_role);
1552 return true;
1553 }
1554 else
1555 {
1556 error((F("bad client signature")).str());
1557 }
1558 return false;
1559}
1560
1561bool
1562session::process_refine_cmd(refinement_type ty, merkle_node const & node)
1563{
1564 string typestr;
1565 netcmd_item_type_to_string(node.type, typestr);
1566 L(FL("processing refine cmd for %s node at level %d")
1567 % typestr % node.level);
1568
1569 switch (node.type)
1570 {
1571 case file_item:
1572 W(F("Unexpected 'refine' command on non-refined item type"));
1573 break;
1574
1575 case key_item:
1576 key_refiner.process_refinement_command(ty, node);
1577 break;
1578
1579 case revision_item:
1580 rev_refiner.process_refinement_command(ty, node);
1581 break;
1582
1583 case cert_item:
1584 cert_refiner.process_refinement_command(ty, node);
1585 break;
1586
1587 case epoch_item:
1588 epoch_refiner.process_refinement_command(ty, node);
1589 break;
1590 }
1591 return true;
1592}
1593
1594bool
1595session::process_bye_cmd(u8 phase,
1596 transaction_guard & guard)
1597{
1598
1599// Ideal shutdown
1600// ~~~~~~~~~~~~~~~
1601//
1602// I/O events state transitions
1603// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
1604// client: C_WORKING
1605// server: S_WORKING
1606// 0. [refinement, data, deltas, etc.]
1607// client: C_SHUTDOWN
1608// (client checkpoints here)
1609// 1. client -> "bye 0"
1610// 2. "bye 0" -> server
1611// server: S_SHUTDOWN
1612// (server checkpoints here)
1613// 3. "bye 1" <- server
1614// 4. client <- "bye 1"
1615// client: C_CONFIRMED
1616// 5. client -> "bye 2"
1617// 6. "bye 2" -> server
1618// server: S_CONFIRMED
1619// 7. [server drops connection]
1620//
1621//
1622// Affects of I/O errors or disconnections
1623// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1624// C_WORKING: report error and fault
1625// S_WORKING: report error and recover
1626// C_SHUTDOWN: report error and fault
1627// S_SHUTDOWN: report success and recover
1628// (and warn that client might falsely see error)
1629// C_CONFIRMED: report success
1630// S_CONFIRMED: report success
1631
1632 switch (phase)
1633 {
1634 case 0:
1635 if (voice == server_voice &&
1636 protocol_state == working_state)
1637 {
1638 protocol_state = shutdown_state;
1639 guard.do_checkpoint();
1640 queue_bye_cmd(1);
1641 }
1642 else
1643 error("unexpected bye phase 0 received");
1644 break;
1645
1646 case 1:
1647 if (voice == client_voice &&
1648 protocol_state == shutdown_state)
1649 {
1650 protocol_state = confirmed_state;
1651 queue_bye_cmd(2);
1652 }
1653 else
1654 error("unexpected bye phase 1 received");
1655 break;
1656
1657 case 2:
1658 if (voice == server_voice &&
1659 protocol_state == shutdown_state)
1660 {
1661 protocol_state = confirmed_state;
1662 return false;
1663 }
1664 else
1665 error("unexpected bye phase 2 received");
1666 break;
1667
1668 default:
1669 error((F("unknown bye phase %d received") % phase).str());
1670 }
1671
1672 return true;
1673}
1674
1675bool
1676session::process_done_cmd(netcmd_item_type type, size_t n_items)
1677{
1678 string typestr;
1679 netcmd_item_type_to_string(type, typestr);
1680 L(FL("received 'done' command for %s (%s items)") % typestr % n_items);
1681 switch (type)
1682 {
1683 case file_item:
1684 W(F("Unexpected 'done' command on non-refined item type"));
1685 break;
1686
1687 case key_item:
1688 key_refiner.process_done_command(n_items);
1689 if (key_refiner.done && role != sink_role)
1690 send_all_data(key_item, key_refiner.items_to_send);
1691 break;
1692
1693 case revision_item:
1694 rev_refiner.process_done_command(n_items);
1695 break;
1696
1697 case cert_item:
1698 cert_refiner.process_done_command(n_items);
1699 break;
1700
1701 case epoch_item:
1702 epoch_refiner.process_done_command(n_items);
1703 if (epoch_refiner.done)
1704 {
1705 send_all_data(epoch_item, epoch_refiner.items_to_send);
1706 maybe_note_epochs_finished();
1707 }
1708 break;
1709 }
1710 return true;
1711}
1712
1713void
1714session::respond_to_confirm_cmd()
1715{
1716 epoch_refiner.begin_refinement();
1717}
1718
1719bool
1720session::data_exists(netcmd_item_type type,
1721 id const & item)
1722{
1723 hexenc<id> hitem;
1724 encode_hexenc(item, hitem);
1725 switch (type)
1726 {
1727 case key_item:
1728 return key_refiner.local_item_exists(item)
1729 || app.db.public_key_exists(hitem);
1730 case file_item:
1731 return app.db.file_version_exists(file_id(hitem));
1732 case revision_item:
1733 return rev_refiner.local_item_exists(item)
1734 || app.db.revision_exists(revision_id(hitem));
1735 case cert_item:
1736 return cert_refiner.local_item_exists(item)
1737 || app.db.revision_cert_exists(hitem);
1738 case epoch_item:
1739 return epoch_refiner.local_item_exists(item)
1740 || app.db.epoch_exists(epoch_id(hitem));
1741 }
1742 return false;
1743}
1744
1745void
1746session::load_data(netcmd_item_type type,
1747 id const & item,
1748 string & out)
1749{
1750 string typestr;
1751 netcmd_item_type_to_string(type, typestr);
1752 hexenc<id> hitem;
1753 encode_hexenc(item, hitem);
1754
1755 if (!data_exists(type, item))
1756 throw bad_decode(F("%s with hash '%s' does not exist in our database")
1757 % typestr % hitem);
1758
1759 switch (type)
1760 {
1761 case epoch_item:
1762 {
1763 cert_value branch;
1764 epoch_data epoch;
1765 app.db.get_epoch(epoch_id(hitem), branch, epoch);
1766 write_epoch(branch, epoch, out);
1767 }
1768 break;
1769 case key_item:
1770 {
1771 rsa_keypair_id keyid;
1772 base64<rsa_pub_key> pub_encoded;
1773 app.db.get_pubkey(hitem, keyid, pub_encoded);
1774 L(FL("public key '%s' is also called '%s'") % hitem % keyid);
1775 write_pubkey(keyid, pub_encoded, out);
1776 }
1777 break;
1778
1779 case revision_item:
1780 {
1781 revision_data mdat;
1782 data dat;
1783 app.db.get_revision(revision_id(hitem), mdat);
1784 out = mdat.inner()();
1785 }
1786 break;
1787
1788 case file_item:
1789 {
1790 file_data fdat;
1791 data dat;
1792 app.db.get_file_version(file_id(hitem), fdat);
1793 out = fdat.inner()();
1794 }
1795 break;
1796
1797 case cert_item:
1798 {
1799 revision<cert> c;
1800 app.db.get_revision_cert(hitem, c);
1801 string tmp;
1802 write_cert(c.inner(), out);
1803 }
1804 break;
1805 }
1806}
1807
1808bool
1809session::process_data_cmd(netcmd_item_type type,
1810 id const & item,
1811 string const & dat)
1812{
1813 hexenc<id> hitem;
1814 encode_hexenc(item, hitem);
1815
1816 string typestr;
1817 netcmd_item_type_to_string(type, typestr);
1818
1819 note_item_arrived(type, item);
1820 if (data_exists(type, item))
1821 {
1822 L(FL("%s '%s' already exists in our database") % typestr % hitem);
1823 return true;
1824 }
1825
1826 switch (type)
1827 {
1828 case epoch_item:
1829 {
1830 cert_value branch;
1831 epoch_data epoch;
1832 read_epoch(dat, branch, epoch);
1833 L(FL("received epoch %s for branch %s") % epoch % branch);
1834 map<cert_value, epoch_data> epochs;
1835 app.db.get_epochs(epochs);
1836 map<cert_value, epoch_data>::const_iterator i;
1837 i = epochs.find(branch);
1838 if (i == epochs.end())
1839 {
1840 L(FL("branch %s has no epoch; setting epoch to %s") % branch % epoch);
1841 app.db.set_epoch(branch, epoch);
1842 }
1843 else
1844 {
1845 L(FL("branch %s already has an epoch; checking") % branch);
1846 // If we get here, then we know that the epoch must be
1847 // different, because if it were the same then the
1848 // if (epoch_exists()) branch up above would have been taken.
1849 // if somehow this is wrong, then we have broken epoch
1850 // hashing or something, which is very dangerous, so play it
1851 // safe...
1852 I(!(i->second == epoch));
1853
1854 // It is safe to call 'error' here, because if we get here,
1855 // then the current netcmd packet cannot possibly have
1856 // written anything to the database.
1857 error((F("Mismatched epoch on branch %s."
1858 " Server has '%s', client has '%s'.")
1859 % branch
1860 % (voice == server_voice ? i->second : epoch)
1861 % (voice == server_voice ? epoch : i->second)).str());
1862 }
1863 }
1864 maybe_note_epochs_finished();
1865 break;
1866
1867 case key_item:
1868 {
1869 rsa_keypair_id keyid;
1870 base64<rsa_pub_key> pub;
1871 read_pubkey(dat, keyid, pub);
1872 hexenc<id> tmp;
1873 key_hash_code(keyid, pub, tmp);
1874 if (! (tmp == hitem))
1875 throw bad_decode(F("hash check failed for public key '%s' (%s);"
1876 " wanted '%s' got '%s'")
1877 % hitem % keyid % hitem % tmp);
1878 this->dbw.consume_public_key(keyid, pub);
1879 }
1880 break;
1881
1882 case cert_item:
1883 {
1884 cert c;
1885 read_cert(dat, c);
1886 hexenc<id> tmp;
1887 cert_hash_code(c, tmp);
1888 if (! (tmp == hitem))
1889 throw bad_decode(F("hash check failed for revision cert '%s'") % hitem);
1890 this->dbw.consume_revision_cert(revision<cert>(c));
1891 }
1892 break;
1893
1894 case revision_item:
1895 {
1896 L(FL("received revision '%s'") % hitem);
1897 this->dbw.consume_revision_data(revision_id(hitem), revision_data(dat));
1898 }
1899 break;
1900
1901 case file_item:
1902 {
1903 L(FL("received file '%s'") % hitem);
1904 this->dbw.consume_file_data(file_id(hitem), file_data(dat));
1905 }
1906 break;
1907 }
1908 return true;
1909}
1910
1911bool
1912session::process_delta_cmd(netcmd_item_type type,
1913 id const & base,
1914 id const & ident,
1915 delta const & del)
1916{
1917 string typestr;
1918 netcmd_item_type_to_string(type, typestr);
1919 hexenc<id> hbase, hident;
1920 encode_hexenc(base, hbase);
1921 encode_hexenc(ident, hident);
1922
1923 pair<id,id> id_pair = make_pair(base, ident);
1924
1925 note_item_arrived(type, ident);
1926
1927 switch (type)
1928 {
1929 case file_item:
1930 {
1931 file_id src_file(hbase), dst_file(hident);
1932 this->dbw.consume_file_delta(src_file,
1933 dst_file,
1934 file_delta(del));
1935 }
1936 break;
1937
1938 default:
1939 L(FL("ignoring delta received for item type %s") % typestr);
1940 break;
1941 }
1942 return true;
1943}
1944
1945bool
1946session::process_usher_cmd(utf8 const & msg)
1947{
1948 if (msg().size())
1949 {
1950 if (msg()[0] == '!')
1951 P(F("Received warning from usher: %s") % msg().substr(1));
1952 else
1953 L(FL("Received greeting from usher: %s") % msg().substr(1));
1954 }
1955 netcmd cmdout;
1956 cmdout.write_usher_reply_cmd(peer_id, our_include_pattern);
1957 write_netcmd_and_try_flush(cmdout);
1958 L(FL("Sent reply."));
1959 return true;
1960}
1961
1962
1963void
1964session::send_all_data(netcmd_item_type ty, set<id> const & items)
1965{
1966 string typestr;
1967 netcmd_item_type_to_string(ty, typestr);
1968
1969 // Use temporary; passed arg will be invalidated during iteration.
1970 set<id> tmp = items;
1971
1972 for (set<id>::const_iterator i = tmp.begin();
1973 i != tmp.end(); ++i)
1974 {
1975 hexenc<id> hitem;
1976 encode_hexenc(*i, hitem);
1977
1978 if (data_exists(ty, *i))
1979 {
1980 string out;
1981 load_data(ty, *i, out);
1982 queue_data_cmd(ty, *i, out);
1983 }
1984 }
1985}
1986
1987bool
1988session::dispatch_payload(netcmd const & cmd,
1989 transaction_guard & guard)
1990{
1991
1992 switch (cmd.get_cmd_code())
1993 {
1994
1995 case error_cmd:
1996 {
1997 string errmsg;
1998 cmd.read_error_cmd(errmsg);
1999 return process_error_cmd(errmsg);
2000 }
2001 break;
2002
2003 case hello_cmd:
2004 require(! authenticated, "hello netcmd received when not authenticated");
2005 require(voice == client_voice, "hello netcmd received in client voice");
2006 {
2007 rsa_keypair_id server_keyname;
2008 rsa_pub_key server_key;
2009 id nonce;
2010 cmd.read_hello_cmd(server_keyname, server_key, nonce);
2011 return process_hello_cmd(server_keyname, server_key, nonce);
2012 }
2013 break;
2014
2015 case bye_cmd:
2016 require(authenticated, "bye netcmd received when not authenticated");
2017 {
2018 u8 phase;
2019 cmd.read_bye_cmd(phase);
2020 return process_bye_cmd(phase, guard);
2021 }
2022 break;
2023
2024 case anonymous_cmd:
2025 require(! authenticated, "anonymous netcmd received when not authenticated");
2026 require(voice == server_voice, "anonymous netcmd received in server voice");
2027 require(role == source_role ||
2028 role == source_and_sink_role,
2029 "anonymous netcmd received in source or source/sink role");
2030 {
2031 protocol_role role;
2032 utf8 their_include_pattern, their_exclude_pattern;
2033 rsa_oaep_sha_data hmac_key_encrypted;
2034 cmd.read_anonymous_cmd(role, their_include_pattern, their_exclude_pattern, hmac_key_encrypted);
2035 L(FL("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' "
2036 "in %s mode\n")
2037 % their_include_pattern % their_exclude_pattern
2038 % (role == source_and_sink_role ? _("source and sink") :
2039 (role == source_role ? _("source") : _("sink"))));
2040
2041 set_session_key(hmac_key_encrypted);
2042 if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern))
2043 return false;
2044 queue_confirm_cmd();
2045 return true;
2046 }
2047 break;
2048
2049 case auth_cmd:
2050 require(! authenticated, "auth netcmd received when not authenticated");
2051 require(voice == server_voice, "auth netcmd received in server voice");
2052 {
2053 protocol_role role;
2054 string signature;
2055 utf8 their_include_pattern, their_exclude_pattern;
2056 id client, nonce1, nonce2;
2057 rsa_oaep_sha_data hmac_key_encrypted;
2058 cmd.read_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2059 client, nonce1, hmac_key_encrypted, signature);
2060
2061 hexenc<id> their_key_hash;
2062 encode_hexenc(client, their_key_hash);
2063 hexenc<id> hnonce1;
2064 encode_hexenc(nonce1, hnonce1);
2065
2066 L(FL("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' "
2067 "exclude '%s' in %s mode with nonce1 '%s'\n")
2068 % their_key_hash % their_include_pattern % their_exclude_pattern
2069 % (role == source_and_sink_role ? _("source and sink") :
2070 (role == source_role ? _("source") : _("sink")))
2071 % hnonce1);
2072
2073 set_session_key(hmac_key_encrypted);
2074
2075 if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2076 client, nonce1, signature))
2077 return false;
2078 queue_confirm_cmd();
2079 return true;
2080 }
2081 break;
2082
2083 case confirm_cmd:
2084 require(! authenticated, "confirm netcmd received when not authenticated");
2085 require(voice == client_voice, "confirm netcmd received in client voice");
2086 {
2087 string signature;
2088 cmd.read_confirm_cmd();
2089 this->authenticated = true;
2090 respond_to_confirm_cmd();
2091 return true;
2092 }
2093 break;
2094
2095 case refine_cmd:
2096 require(authenticated, "refine netcmd received when authenticated");
2097 {
2098 merkle_node node;
2099 refinement_type ty;
2100 cmd.read_refine_cmd(ty, node);
2101 return process_refine_cmd(ty, node);
2102 }
2103 break;
2104
2105 case done_cmd:
2106 require(authenticated, "done netcmd received when not authenticated");
2107 {
2108 size_t n_items;
2109 netcmd_item_type type;
2110 cmd.read_done_cmd(type, n_items);
2111 return process_done_cmd(type, n_items);
2112 }
2113 break;
2114
2115 case data_cmd:
2116 require(authenticated, "data netcmd received when not authenticated");
2117 require(role == sink_role ||
2118 role == source_and_sink_role,
2119 "data netcmd received in source or source/sink role");
2120 {
2121 netcmd_item_type type;
2122 id item;
2123 string dat;
2124 cmd.read_data_cmd(type, item, dat);
2125 return process_data_cmd(type, item, dat);
2126 }
2127 break;
2128
2129 case delta_cmd:
2130 require(authenticated, "delta netcmd received when not authenticated");
2131 require(role == sink_role ||
2132 role == source_and_sink_role,
2133 "delta netcmd received in source or source/sink role");
2134 {
2135 netcmd_item_type type;
2136 id base, ident;
2137 delta del;
2138 cmd.read_delta_cmd(type, base, ident, del);
2139 return process_delta_cmd(type, base, ident, del);
2140 }
2141 break;
2142
2143 case usher_cmd:
2144 {
2145 utf8 greeting;
2146 cmd.read_usher_cmd(greeting);
2147 return process_usher_cmd(greeting);
2148 }
2149 break;
2150
2151 case usher_reply_cmd:
2152 return false; // Should not happen.
2153 break;
2154 }
2155 return false;
2156}
2157
2158// This kicks off the whole cascade starting from "hello".
2159void
2160session::begin_service()
2161{
2162 keypair kp;
2163 if (app.use_transport_auth)
2164 app.keys.get_key_pair(app.signing_key, kp);
2165 queue_hello_cmd(app.signing_key, kp.pub, mk_nonce());
2166}
2167
2168void
2169session::maybe_step()
2170{
2171 while (done_all_refinements()
2172 && !rev_enumerator.done()
2173 && outbuf_size < constants::bufsz * 10)
2174 {
2175 rev_enumerator.step();
2176 }
2177}
2178
2179void
2180session::maybe_say_goodbye(transaction_guard & guard)
2181{
2182 if (voice == client_voice
2183 && protocol_state == working_state
2184 && finished_working())
2185 {
2186 protocol_state = shutdown_state;
2187 guard.do_checkpoint();
2188 queue_bye_cmd(0);
2189 }
2190}
2191
2192bool
2193session::arm()
2194{
2195 if (!armed)
2196 {
2197 // Don't pack the buffer unnecessarily.
2198 if (outbuf_size > constants::bufsz * 10)
2199 return false;
2200
2201 if (cmd.read(inbuf, read_hmac))
2202 {
2203 armed = true;
2204 }
2205 }
2206 return armed;
2207}
2208
2209bool session::process(transaction_guard & guard)
2210{
2211 if (encountered_error)
2212 return true;
2213 try
2214 {
2215 if (!arm())
2216 return true;
2217
2218 armed = false;
2219 L(FL("processing %d byte input buffer from peer %s")
2220 % inbuf.size() % peer_id);
2221
2222 size_t sz = cmd.encoded_size();
2223 bool ret = dispatch_payload(cmd, guard);
2224
2225 if (inbuf.size() >= constants::netcmd_maxsz)
2226 W(F("input buffer for peer %s is overfull "
2227 "after netcmd dispatch\n") % peer_id);
2228
2229 guard.maybe_checkpoint(sz);
2230
2231 if (!ret)
2232 L(FL("finishing processing with '%d' packet")
2233 % cmd.get_cmd_code());
2234 return ret;
2235 }
2236 catch (bad_decode & bd)
2237 {
2238 W(F("protocol error while processing peer %s: '%s'")
2239 % peer_id % bd.what);
2240 return false;
2241 }
2242 catch (netsync_error & err)
2243 {
2244 W(F("error: %s") % err.msg);
2245 queue_error_cmd(err.msg);
2246 encountered_error = true;
2247 return true; // Don't terminate until we've send the error_cmd.
2248 }
2249}
2250
2251
2252static shared_ptr<Netxx::StreamBase>
2253build_stream_to_server(app_state & app,
2254 utf8 const & include_pattern,
2255 utf8 const & exclude_pattern,
2256 utf8 const & address,
2257 Netxx::port_type default_port,
2258 Netxx::Timeout timeout)
2259{
2260 shared_ptr<Netxx::StreamBase> server;
2261 uri u;
2262 vector<string> argv;
2263 if (parse_uri(address(), u)
2264 && app.lua.hook_get_netsync_connect_command(u,
2265 include_pattern(),
2266 exclude_pattern(),
2267 global_sanity.debug,
2268 argv))
2269 {
2270 I(argv.size() > 0);
2271 string cmd = argv[0];
2272 argv.erase(argv.begin());
2273 app.use_transport_auth = app.lua.hook_use_transport_auth(u);
2274 return shared_ptr<Netxx::StreamBase>
2275 (new Netxx::PipeStream(cmd, argv));
2276
2277 }
2278 else
2279 {
2280#ifdef USE_IPV6
2281 bool use_ipv6=true;
2282#else
2283 bool use_ipv6=false;
2284#endif
2285 Netxx::Address addr(address().c_str(),
2286 default_port, use_ipv6);
2287 return shared_ptr<Netxx::StreamBase>
2288 (new Netxx::Stream(addr, timeout));
2289 }
2290}
2291
2292static void
2293call_server(protocol_role role,
2294 utf8 const & include_pattern,
2295 utf8 const & exclude_pattern,
2296 app_state & app,
2297 utf8 const & address,
2298 Netxx::port_type default_port,
2299 unsigned long timeout_seconds)
2300{
2301 Netxx::PipeCompatibleProbe probe;
2302 transaction_guard guard(app.db);
2303
2304 Netxx::Timeout timeout(static_cast<long>(timeout_seconds)), instant(0,1);
2305
2306 // FIXME: split into labels and convert to ace here.
2307
2308 P(F("connecting to %s") % address());
2309
2310 shared_ptr<Netxx::StreamBase> server
2311 = build_stream_to_server(app,
2312 include_pattern,
2313 exclude_pattern,
2314 address, default_port,
2315 timeout);
2316
2317
2318 // 'false' here means not to revert changes when the SockOpt
2319 // goes out of scope.
2320 Netxx::SockOpt socket_options(server->get_socketfd(), false);
2321 socket_options.set_non_blocking();
2322
2323 session sess(role, client_voice,
2324 include_pattern,
2325 exclude_pattern,
2326 app, address(), server);
2327
2328 while (true)
2329 {
2330 bool armed = false;
2331 try
2332 {
2333 armed = sess.arm();
2334 }
2335 catch (bad_decode & bd)
2336 {
2337 E(false, F("protocol error while processing peer %s: '%s'")
2338 % sess.peer_id % bd.what);
2339 }
2340
2341 sess.maybe_step();
2342 sess.maybe_say_goodbye(guard);
2343
2344 if (armed)
2345 {
2346 if (!sess.process(guard))
2347 {
2348 // Commit whatever work we managed to accomplish anyways.
2349 guard.commit();
2350
2351 // We failed during processing. This should only happen in
2352 // client voice when we have a decode exception, or received an
2353 // error from our server (which is translated to a decode
2354 // exception). We call these cases E() errors.
2355 E(false, F("processing failure while talking to "
2356 "peer %s, disconnecting\n")
2357 % sess.peer_id);
2358 return;
2359 }
2360 // Don't bother to waste time checking to see if there's more stuff
2361 // if we were going to ignore it anyway
2362 continue;
2363 }
2364 probe.clear();
2365 probe.add(*(sess.str), sess.which_events());
2366 Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout);
2367 Netxx::Probe::ready_type event = res.second;
2368 Netxx::socket_type fd = res.first;
2369
2370 if (fd == -1 && !armed)
2371 {
2372 E(false, (F("timed out waiting for I/O with "
2373 "peer %s, disconnecting\n")
2374 % sess.peer_id));
2375 }
2376
2377 bool all_io_clean = (event != Netxx::Probe::ready_oobd);
2378
2379 if (event & Netxx::Probe::ready_read)
2380 all_io_clean = all_io_clean && sess.read_some();
2381
2382 if (event & Netxx::Probe::ready_write)
2383 all_io_clean = all_io_clean && sess.write_some();
2384
2385 if (!all_io_clean)
2386 {
2387 // Commit whatever work we managed to accomplish anyways.
2388 guard.commit();
2389
2390 // We had an I/O error. We must decide if this represents a
2391 // user-reported error or a clean disconnect. See protocol
2392 // state diagram in session::process_bye_cmd.
2393
2394 if (sess.protocol_state == session::confirmed_state)
2395 {
2396 P(F("successful exchange with %s")
2397 % sess.peer_id);
2398 return;
2399 }
2400 else if (sess.encountered_error)
2401 {
2402 P(F("peer %s disconnected after we informed them of error")
2403 % sess.peer_id);
2404 return;
2405 }
2406 else
2407 E(false, (F("I/O failure while talking to "
2408 "peer %s, disconnecting\n")
2409 % sess.peer_id));
2410 }
2411 }
2412}
2413
2414static void
2415drop_session_associated_with_fd(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2416 Netxx::socket_type fd)
2417{
2418 // This is a bit of a hack. Initially all "file descriptors" in
2419 // netsync were full duplex, so we could get away with indexing
2420 // sessions by their file descriptor.
2421 //
2422 // When using pipes in unix, it's no longer true: a session gets
2423 // entered in the session map under its read pipe fd *and* its write
2424 // pipe fd. When we're in such a situation the socket fd is "-1" and
2425 // we downcast to a PipeStream and use its read+write fds.
2426 //
2427 // When using pipes in windows, we use a full duplex pipe (named
2428 // pipe) so the socket-like abstraction holds.
2429
2430 I(fd != -1);
2431 map<Netxx::socket_type, shared_ptr<session> >::const_iterator i = sessions.find(fd);
2432 I(i != sessions.end());
2433 shared_ptr<session> sess = i->second;
2434 fd = sess->str->get_socketfd();
2435 if (fd != -1)
2436 {
2437 sessions.erase(fd);
2438 }
2439 else
2440 {
2441 shared_ptr<Netxx::PipeStream> pipe =
2442 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
2443 I(static_cast<bool>(pipe));
2444 I(pipe->get_writefd() != -1);
2445 I(pipe->get_readfd() != -1);
2446 sessions.erase(pipe->get_readfd());
2447 sessions.erase(pipe->get_writefd());
2448 }
2449}
2450
2451static void
2452arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe,
2453 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2454 set<Netxx::socket_type> & armed_sessions)
2455{
2456 set<Netxx::socket_type> arm_failed;
2457 for (map<Netxx::socket_type,
2458 shared_ptr<session> >::const_iterator i = sessions.begin();
2459 i != sessions.end(); ++i)
2460 {
2461 i->second->maybe_step();
2462 try
2463 {
2464 if (i->second->arm())
2465 {
2466 L(FL("fd %d is armed") % i->first);
2467 armed_sessions.insert(i->first);
2468 }
2469 probe.add(*i->second->str, i->second->which_events());
2470 }
2471 catch (bad_decode & bd)
2472 {
2473 W(F("protocol error while processing peer %s: '%s', marking as bad")
2474 % i->second->peer_id % bd.what);
2475 arm_failed.insert(i->first);
2476 }
2477 }
2478 for (set<Netxx::socket_type>::const_iterator i = arm_failed.begin();
2479 i != arm_failed.end(); ++i)
2480 {
2481 drop_session_associated_with_fd(sessions, *i);
2482 }
2483}
2484
2485static void
2486handle_new_connection(Netxx::Address & addr,
2487 Netxx::StreamServer & server,
2488 Netxx::Timeout & timeout,
2489 protocol_role role,
2490 utf8 const & include_pattern,
2491 utf8 const & exclude_pattern,
2492 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2493 app_state & app)
2494{
2495 L(FL("accepting new connection on %s : %s")
2496 % (addr.get_name()?addr.get_name():"") % lexical_cast<string>(addr.get_port()));
2497 Netxx::Peer client = server.accept_connection();
2498
2499 if (!client)
2500 {
2501 L(FL("accept() returned a dead client"));
2502 }
2503 else
2504 {
2505 P(F("accepted new client connection from %s : %s")
2506 % client.get_address() % lexical_cast<string>(client.get_port()));
2507
2508 // 'false' here means not to revert changes when the SockOpt
2509 // goes out of scope.
2510 Netxx::SockOpt socket_options(client.get_socketfd(), false);
2511 socket_options.set_non_blocking();
2512
2513 shared_ptr<Netxx::Stream> str =
2514 shared_ptr<Netxx::Stream>
2515 (new Netxx::Stream(client.get_socketfd(), timeout));
2516
2517 shared_ptr<session> sess(new session(role, server_voice,
2518 include_pattern, exclude_pattern,
2519 app,
2520 lexical_cast<string>(client), str));
2521 sess->begin_service();
2522 sessions.insert(make_pair(client.get_socketfd(), sess));
2523 }
2524}
2525
2526static void
2527handle_read_available(Netxx::socket_type fd,
2528 shared_ptr<session> sess,
2529 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2530 set<Netxx::socket_type> & armed_sessions,
2531 bool & live_p)
2532{
2533 if (sess->read_some())
2534 {
2535 try
2536 {
2537 if (sess->arm())
2538 armed_sessions.insert(fd);
2539 }
2540 catch (bad_decode & bd)
2541 {
2542 W(F("protocol error while processing peer %s: '%s', disconnecting")
2543 % sess->peer_id % bd.what);
2544 drop_session_associated_with_fd(sessions, fd);
2545 live_p = false;
2546 }
2547 }
2548 else
2549 {
2550 switch (sess->protocol_state)
2551 {
2552 case session::working_state:
2553 P(F("peer %s read failed in working state (error)")
2554 % sess->peer_id);
2555 break;
2556
2557 case session::shutdown_state:
2558 P(F("peer %s read failed in shutdown state "
2559 "(possibly client misreported error)\n")
2560 % sess->peer_id);
2561 break;
2562
2563 case session::confirmed_state:
2564 P(F("peer %s read failed in confirmed state (success)")
2565 % sess->peer_id);
2566 break;
2567 }
2568 drop_session_associated_with_fd(sessions, fd);
2569 live_p = false;
2570 }
2571}
2572
2573
2574static void
2575handle_write_available(Netxx::socket_type fd,
2576 shared_ptr<session> sess,
2577 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2578 bool & live_p)
2579{
2580 if (!sess->write_some())
2581 {
2582 switch (sess->protocol_state)
2583 {
2584 case session::working_state:
2585 P(F("peer %s write failed in working state (error)")
2586 % sess->peer_id);
2587 break;
2588
2589 case session::shutdown_state:
2590 P(F("peer %s write failed in shutdown state "
2591 "(possibly client misreported error)\n")
2592 % sess->peer_id);
2593 break;
2594
2595 case session::confirmed_state:
2596 P(F("peer %s write failed in confirmed state (success)")
2597 % sess->peer_id);
2598 break;
2599 }
2600
2601 drop_session_associated_with_fd(sessions, fd);
2602 live_p = false;
2603 }
2604}
2605
2606static void
2607process_armed_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2608 set<Netxx::socket_type> & armed_sessions,
2609 transaction_guard & guard)
2610{
2611 for (set<Netxx::socket_type>::const_iterator i = armed_sessions.begin();
2612 i != armed_sessions.end(); ++i)
2613 {
2614 map<Netxx::socket_type, shared_ptr<session> >::iterator j;
2615 j = sessions.find(*i);
2616 if (j == sessions.end())
2617 continue;
2618 else
2619 {
2620 shared_ptr<session> sess = j->second;
2621 if (!sess->process(guard))
2622 {
2623 P(F("peer %s processing finished, disconnecting")
2624 % sess->peer_id);
2625 drop_session_associated_with_fd(sessions, *i);
2626 }
2627 }
2628 }
2629}
2630
2631static void
2632reap_dead_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2633 unsigned long timeout_seconds)
2634{
2635 // Kill any clients which haven't done any i/o inside the timeout period
2636 // or who have exchanged all items and flushed their output buffers.
2637 set<Netxx::socket_type> dead_clients;
2638 time_t now = ::time(NULL);
2639 for (map<Netxx::socket_type, shared_ptr<session> >::const_iterator
2640 i = sessions.begin(); i != sessions.end(); ++i)
2641 {
2642 if (static_cast<unsigned long>(i->second->last_io_time + timeout_seconds)
2643 < static_cast<unsigned long>(now))
2644 {
2645 P(F("fd %d (peer %s) has been idle too long, disconnecting")
2646 % i->first % i->second->peer_id);
2647 dead_clients.insert(i->first);
2648 }
2649 }
2650 for (set<Netxx::socket_type>::const_iterator i = dead_clients.begin();
2651 i != dead_clients.end(); ++i)
2652 {
2653 drop_session_associated_with_fd(sessions, *i);
2654 }
2655}
2656
2657static void
2658serve_connections(protocol_role role,
2659 utf8 const & include_pattern,
2660 utf8 const & exclude_pattern,
2661 app_state & app,
2662 utf8 const & address,
2663 Netxx::port_type default_port,
2664 unsigned long timeout_seconds,
2665 unsigned long session_limit)
2666{
2667 Netxx::PipeCompatibleProbe probe;
2668
2669 Netxx::Timeout
2670 forever,
2671 timeout(static_cast<long>(timeout_seconds)),
2672 instant(0,1);
2673
2674 if (!app.bind_port().empty())
2675 default_port = std::atoi(app.bind_port().c_str());
2676#ifdef USE_IPV6
2677 bool use_ipv6=true;
2678#else
2679 bool use_ipv6=false;
2680#endif
2681 // This will be true when we try to bind while using IPv6. See comments
2682 // further down.
2683 bool try_again=false;
2684
2685 do
2686 {
2687 try
2688 {
2689 try_again = false;
2690
2691 Netxx::Address addr(use_ipv6);
2692
2693 if (!app.bind_address().empty())
2694 addr.add_address(app.bind_address().c_str(), default_port);
2695 else
2696 addr.add_all_addresses (default_port);
2697
2698 // If se use IPv6 and the initialisation of server fails, we want
2699 // to try again with IPv4. The reason is that someone may have
2700 // downloaded a IPv6-enabled monotone on a system that doesn't
2701 // have IPv6, and which might fail therefore.
2702 // On failure, Netxx::NetworkException is thrown, and we catch
2703 // it further down.
2704 try_again=use_ipv6;
2705
2706 Netxx::StreamServer server(addr, timeout);
2707
2708 // If we came this far, whatever we used (IPv6 or IPv4) was
2709 // accepted, so we don't need to try again any more.
2710 try_again=false;
2711
2712 const char *name = addr.get_name();
2713 P(F("beginning service on %s : %s")
2714 % (name != NULL ? name : _("<all interfaces>"))
2715 % lexical_cast<string>(addr.get_port()));
2716
2717 map<Netxx::socket_type, shared_ptr<session> > sessions;
2718 set<Netxx::socket_type> armed_sessions;
2719
2720 shared_ptr<transaction_guard> guard;
2721
2722 while (true)
2723 {
2724 probe.clear();
2725 armed_sessions.clear();
2726
2727 if (sessions.size() >= session_limit)
2728 W(F("session limit %d reached, some connections "
2729 "will be refused\n") % session_limit);
2730 else
2731 probe.add(server);
2732
2733 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions);
2734
2735 L(FL("i/o probe with %d armed") % armed_sessions.size());
2736 Netxx::socket_type fd;
2737 Netxx::Timeout how_long;
2738 if (sessions.empty())
2739 how_long = forever;
2740 else if (armed_sessions.empty())
2741 how_long = timeout;
2742 else
2743 how_long = instant;
2744 do
2745 {
2746 Netxx::Probe::result_type res = probe.ready(how_long);
2747 how_long = instant;
2748 Netxx::Probe::ready_type event = res.second;
2749 fd = res.first;
2750
2751 if (!guard)
2752 guard = shared_ptr<transaction_guard>(new transaction_guard(app.db));
2753
2754 I(guard);
2755
2756 if (fd == -1)
2757 {
2758 if (armed_sessions.empty())
2759 L(FL("timed out waiting for I/O (listening on %s : %s)")
2760 % addr.get_name() % lexical_cast<string>(addr.get_port()));
2761 }
2762
2763 // we either got a new connection
2764 else if (fd == server)
2765 handle_new_connection(addr, server, timeout, role,
2766 include_pattern, exclude_pattern,
2767 sessions, app);
2768
2769 // or an existing session woke up
2770 else
2771 {
2772 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
2773 i = sessions.find(fd);
2774 if (i == sessions.end())
2775 {
2776 L(FL("got woken up for action on unknown fd %d") % fd);
2777 }
2778 else
2779 {
2780 probe.remove(*(i->second->str));
2781 shared_ptr<session> sess = i->second;
2782 bool live_p = true;
2783
2784 if (event & Netxx::Probe::ready_read)
2785 handle_read_available(fd, sess, sessions,
2786 armed_sessions, live_p);
2787
2788 if (live_p && (event & Netxx::Probe::ready_write))
2789 handle_write_available(fd, sess, sessions, live_p);
2790
2791 if (live_p && (event & Netxx::Probe::ready_oobd))
2792 {
2793 P(F("got OOB from peer %s, disconnecting")
2794 % sess->peer_id);
2795 drop_session_associated_with_fd(sessions, fd);
2796 }
2797 }
2798 }
2799 }
2800 while (fd != -1);
2801 process_armed_sessions(sessions, armed_sessions, *guard);
2802 reap_dead_sessions(sessions, timeout_seconds);
2803
2804 if (sessions.empty())
2805 {
2806 // Let the guard die completely if everything's gone quiet.
2807 guard->commit();
2808 guard.reset();
2809 }
2810 }
2811 }
2812 // This exception is thrown when bind() fails somewhere in Netxx.
2813 catch (Netxx::NetworkException &)
2814 {
2815 // If we tried with IPv6 and failed, we want to try again using IPv4.
2816 if (try_again)
2817 {
2818 use_ipv6 = false;
2819 }
2820 // In all other cases, just rethrow the exception.
2821 else
2822 throw;
2823 }
2824 // This exception is thrown when there is no support for the type of
2825 // connection we want to do in the kernel, for example when a socket()
2826 // call fails somewhere in Netxx.
2827 catch (Netxx::Exception &)
2828 {
2829 // If we tried with IPv6 and failed, we want to try again using IPv4.
2830 if (try_again)
2831 {
2832 use_ipv6 = false;
2833 }
2834 // In all other cases, just rethrow the exception.
2835 else
2836 throw;
2837 }
2838 }
2839 while(try_again);
2840 }
2841
2842static void
2843serve_single_connection(shared_ptr<session> sess,
2844 unsigned long timeout_seconds)
2845{
2846 Netxx::PipeCompatibleProbe probe;
2847
2848 Netxx::Timeout
2849 forever,
2850 timeout(static_cast<long>(timeout_seconds)),
2851 instant(0,1);
2852
2853 P(F("beginning service on %s") % sess->peer_id);
2854
2855 sess->begin_service();
2856
2857 transaction_guard guard(sess->app.db);
2858
2859 map<Netxx::socket_type, shared_ptr<session> > sessions;
2860 set<Netxx::socket_type> armed_sessions;
2861
2862 if (sess->str->get_socketfd() == -1)
2863 {
2864 // Unix pipes are non-duplex, have two filedescriptors
2865 shared_ptr<Netxx::PipeStream> pipe =
2866 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
2867 I(pipe);
2868 sessions[pipe->get_writefd()]=sess;
2869 sessions[pipe->get_readfd()]=sess;
2870 }
2871 else
2872 sessions[sess->str->get_socketfd()]=sess;
2873
2874 while (!sessions.empty())
2875 {
2876 probe.clear();
2877 armed_sessions.clear();
2878
2879 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions);
2880
2881 L(FL("i/o probe with %d armed") % armed_sessions.size());
2882 Netxx::Probe::result_type res = probe.ready((armed_sessions.empty() ? timeout
2883 : instant));
2884 Netxx::Probe::ready_type event = res.second;
2885 Netxx::socket_type fd = res.first;
2886
2887 if (fd == -1)
2888 {
2889 if (armed_sessions.empty())
2890 L(FL("timed out waiting for I/O (listening on %s)")
2891 % sess->peer_id);
2892 }
2893
2894 // an existing session woke up
2895 else
2896 {
2897 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
2898 i = sessions.find(fd);
2899 if (i == sessions.end())
2900 {
2901 L(FL("got woken up for action on unknown fd %d") % fd);
2902 }
2903 else
2904 {
2905 shared_ptr<session> sess = i->second;
2906 bool live_p = true;
2907
2908 if (event & Netxx::Probe::ready_read)
2909 handle_read_available(fd, sess, sessions, armed_sessions, live_p);
2910
2911 if (live_p && (event & Netxx::Probe::ready_write))
2912 handle_write_available(fd, sess, sessions, live_p);
2913
2914 if (live_p && (event & Netxx::Probe::ready_oobd))
2915 {
2916 P(F("got some OOB data on fd %d (peer %s), disconnecting")
2917 % fd % sess->peer_id);
2918 drop_session_associated_with_fd(sessions, fd);
2919 }
2920 }
2921 }
2922 process_armed_sessions(sessions, armed_sessions, guard);
2923 reap_dead_sessions(sessions, timeout_seconds);
2924 }
2925}
2926
2927
2928void
2929insert_with_parents(revision_id rev,
2930 refiner & ref,
2931 revision_enumerator & rev_enumerator,
2932 set<revision_id> & revs,
2933 app_state & app,
2934 ticker & revisions_ticker)
2935{
2936 deque<revision_id> work;
2937 work.push_back(rev);
2938 while (!work.empty())
2939 {
2940 revision_id rid = work.front();
2941 work.pop_front();
2942
2943 if (!null_id(rid) && revs.find(rid) == revs.end())
2944 {
2945 revs.insert(rid);
2946 ++revisions_ticker;
2947 id rev_item;
2948 decode_hexenc(rid.inner(), rev_item);
2949 ref.note_local_item(rev_item);
2950 vector<revision_id> parents;
2951 rev_enumerator.get_revision_parents(rid, parents);
2952 for (vector<revision_id>::const_iterator i = parents.begin();
2953 i != parents.end(); ++i)
2954 {
2955 work.push_back(*i);
2956 }
2957 }
2958 }
2959}
2960
2961void
2962session::rebuild_merkle_trees(app_state & app,
2963 set<utf8> const & branchnames)
2964{
2965 P(F("finding items to synchronize:"));
2966 for (set<utf8>::const_iterator i = branchnames.begin();
2967 i != branchnames.end(); ++i)
2968 L(FL("including branch %s") % *i);
2969
2970 // xgettext: please use short message and try to avoid multibytes chars
2971 ticker revisions_ticker(N_("revisions"), "r", 64);
2972 // xgettext: please use short message and try to avoid multibytes chars
2973 ticker certs_ticker(N_("certificates"), "c", 256);
2974 // xgettext: please use short message and try to avoid multibytes chars
2975 ticker keys_ticker(N_("keys"), "k", 1);
2976
2977 set<revision_id> revision_ids;
2978 set<rsa_keypair_id> inserted_keys;
2979
2980 {
2981 // Get our branches
2982 vector<string> names;
2983 get_branches(app, names);
2984 for (size_t i = 0; i < names.size(); ++i)
2985 {
2986 if(branchnames.find(names[i]) != branchnames.end())
2987 {
2988 // Branch matches, get its certs.
2989 vector< revision<cert> > certs;
2990 base64<cert_value> encoded_name;
2991 encode_base64(cert_value(names[i]),encoded_name);
2992 app.db.get_revision_certs(branch_cert_name, encoded_name, certs);
2993 for (vector< revision<cert> >::const_iterator j = certs.begin();
2994 j != certs.end(); j++)
2995 {
2996 revision_id rid(j->inner().ident);
2997 insert_with_parents(rid, rev_refiner, rev_enumerator,
2998 revision_ids, app, revisions_ticker);
2999 // Granch certs go in here, others later on.
3000 hexenc<id> tmp;
3001 id item;
3002 cert_hash_code(j->inner(), tmp);
3003 decode_hexenc(tmp, item);
3004 cert_refiner.note_local_item(item);
3005 rev_enumerator.note_cert(rid, tmp);
3006 if (inserted_keys.find(j->inner().key) == inserted_keys.end())
3007 inserted_keys.insert(j->inner().key);
3008 }
3009 }
3010 }
3011 }
3012
3013 {
3014 map<cert_value, epoch_data> epochs;
3015 app.db.get_epochs(epochs);
3016
3017 epoch_data epoch_zero(string(constants::epochlen, '0'));
3018 for (set<utf8>::const_iterator i = branchnames.begin();
3019 i != branchnames.end(); ++i)
3020 {
3021 cert_value branch((*i)());
3022 map<cert_value, epoch_data>::const_iterator j;
3023 j = epochs.find(branch);
3024
3025 // Set to zero any epoch which is not yet set.
3026 if (j == epochs.end())
3027 {
3028 L(FL("setting epoch on %s to zero") % branch);
3029 epochs.insert(make_pair(branch, epoch_zero));
3030 app.db.set_epoch(branch, epoch_zero);
3031 }
3032
3033 // Then insert all epochs into merkle tree.
3034 j = epochs.find(branch);
3035 I(j != epochs.end());
3036 epoch_id eid;
3037 id epoch_item;
3038 epoch_hash_code(j->first, j->second, eid);
3039 decode_hexenc(eid.inner(), epoch_item);
3040 epoch_refiner.note_local_item(epoch_item);
3041 }
3042 }
3043
3044 {
3045 typedef vector< pair<hexenc<id>,
3046 pair<revision_id, rsa_keypair_id> > > cert_idx;
3047
3048 cert_idx idx;
3049 app.db.get_revision_cert_nobranch_index(idx);
3050
3051 // Insert all non-branch certs reachable via these revisions
3052 // (branch certs were inserted earlier).
3053
3054 for (cert_idx::const_iterator i = idx.begin(); i != idx.end(); ++i)
3055 {
3056 hexenc<id> const & hash = i->first;
3057 revision_id const & ident = i->second.first;
3058 rsa_keypair_id const & key = i->second.second;
3059
3060 rev_enumerator.note_cert(ident, hash);
3061
3062 if (revision_ids.find(ident) == revision_ids.end())
3063 continue;
3064
3065 id item;
3066 decode_hexenc(hash, item);
3067 cert_refiner.note_local_item(item);
3068 ++certs_ticker;
3069 if (inserted_keys.find(key) == inserted_keys.end())
3070 inserted_keys.insert(key);
3071 }
3072 }
3073
3074 // Add any keys specified on the command line.
3075 for (vector<rsa_keypair_id>::const_iterator key
3076 = app.keys_to_push.begin();
3077 key != app.keys_to_push.end(); ++key)
3078 {
3079 if (inserted_keys.find(*key) == inserted_keys.end())
3080 {
3081 if (!app.db.public_key_exists(*key))
3082 {
3083 if (app.keys.key_pair_exists(*key))
3084 app.keys.ensure_in_database(*key);
3085 else
3086 W(F("Cannot find key '%s'") % *key);
3087 }
3088 inserted_keys.insert(*key);
3089 }
3090 }
3091
3092 // Insert all the keys.
3093 for (set<rsa_keypair_id>::const_iterator key = inserted_keys.begin();
3094 key != inserted_keys.end(); key++)
3095 {
3096 if (app.db.public_key_exists(*key))
3097 {
3098 base64<rsa_pub_key> pub_encoded;
3099 app.db.get_key(*key, pub_encoded);
3100 hexenc<id> keyhash;
3101 key_hash_code(*key, pub_encoded, keyhash);
3102 L(FL("noting key '%s' = '%s' to send") % *key % keyhash);
3103 id key_item;
3104 decode_hexenc(keyhash, key_item);
3105 key_refiner.note_local_item(key_item);
3106 ++keys_ticker;
3107 }
3108 }
3109
3110 rev_refiner.reindex_local_items();
3111 cert_refiner.reindex_local_items();
3112 key_refiner.reindex_local_items();
3113 epoch_refiner.reindex_local_items();
3114}
3115
3116void
3117run_netsync_protocol(protocol_voice voice,
3118 protocol_role role,
3119 utf8 const & addr,
3120 utf8 const & include_pattern,
3121 utf8 const & exclude_pattern,
3122 app_state & app)
3123{
3124 if (include_pattern().find_first_of("'\"") != string::npos)
3125 {
3126 W(F("include branch pattern contains a quote character:\n"
3127 "%s\n") % include_pattern());
3128 }
3129
3130 if (exclude_pattern().find_first_of("'\"") != string::npos)
3131 {
3132 W(F("exclude branch pattern contains a quote character:\n"
3133 "%s\n") % exclude_pattern());
3134 }
3135
3136 try
3137 {
3138 if (voice == server_voice)
3139 {
3140 if (app.bind_stdio)
3141 {
3142 shared_ptr<Netxx::PipeStream> str(new Netxx::PipeStream(0,1));
3143 shared_ptr<session> sess(new session(role, server_voice,
3144 include_pattern, exclude_pattern,
3145 app, "stdio", str));
3146 serve_single_connection(sess,constants::netsync_timeout_seconds);
3147 }
3148 else
3149 serve_connections(role, include_pattern, exclude_pattern, app,
3150 addr, static_cast<Netxx::port_type>(constants::netsync_default_port),
3151 static_cast<unsigned long>(constants::netsync_timeout_seconds),
3152 static_cast<unsigned long>(constants::netsync_connection_limit));
3153 }
3154 else
3155 {
3156 I(voice == client_voice);
3157 call_server(role, include_pattern, exclude_pattern, app,
3158 addr, static_cast<Netxx::port_type>(constants::netsync_default_port),
3159 static_cast<unsigned long>(constants::netsync_timeout_seconds));
3160 }
3161 }
3162 catch (Netxx::NetworkException & e)
3163 {
3164 throw informative_failure((F("network error: %s") % e.what()).str());
3165 }
3166 catch (Netxx::Exception & e)
3167 {
3168 throw oops((F("network error: %s") % e.what()).str());;
3169 }
3170}
3171
3172// Local Variables:
3173// mode: C++
3174// fill-column: 76
3175// c-file-style: "gnu"
3176// indent-tabs-mode: nil
3177// End:
3178// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status