1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
//! # StellarSQL
//! A minimal SQL DBMS written in Rust
//!
#[macro_use]
extern crate clap;
#[macro_use]
extern crate dotenv_codegen;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate log;

mod component;
mod connection;
mod index;
mod manager;
mod sql;
mod storage;

use clap::App;
use std::io::BufReader;
use tokio::io::write_all;

use crate::connection::message;
use crate::connection::request::Request;
use crate::connection::response::Response;
use crate::manager::pool::Pool;
use env_logger;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;

use crate::storage::diskinterface::DiskInterface;
use std::path::Path;
use std::sync::{Arc, Mutex};

/// The entry of the program
///
/// Use `Tokio` to handle each TCP connection and spawn a thread to handle the request.
fn main() {
    info!("Hello, StellarSQL!");

    // start logger
    env_logger::init();

    env_init();

    // Parse arguments
    let yml = load_yaml!("../cli.yml");
    let m = App::from_yaml(yml).get_matches();

    let port = if let Some(port_) = m.value_of("port") {
        port_
    } else {
        dotenv!("PORT")
    };

    let addr = format!("127.0.0.1:{}", port).parse().unwrap();

    lazy_static! {
        static ref mutex: Arc<Mutex<Pool>> = Arc::new(Mutex::new(Pool::new(dotenv!("POOL_SIZE").parse().unwrap())));
    }
    // Bind a TCP listener to the socket address.
    // Note that this is the Tokio TcpListener, which is fully async.
    let listener = TcpListener::bind(&addr).unwrap();

    // The server task asynchronously iterates over and processes each
    // incoming connection.
    let server = listener
        .incoming()
        .for_each(move |socket| {
            let addr = socket.peer_addr().unwrap();
            info!("New Connection: {}", addr);

            // Spawn a task to process the connection
            process(socket, &mutex, addr);

            Ok(())
        })
        .map_err(|err| {
            error!("accept error = {:?}", err);
        });

    info!("StellarSQL running on {} port", port);
    tokio::run(server);
}

/// initialize the environment
///
/// Note that any error are not allowed in this step, so panic directly.
fn env_init() {
    // check `../.env`: FILE_BASE_PATH, create usernames.json
    let path = dotenv!("FILE_BASE_PATH");
    if !Path::new(path).exists() {
        match DiskInterface::create_file_base(Some(path)) {
            Ok(_) => {}
            Err(e) => panic!(e),
        }
    }
}

/// Process the TCP socket connection
///
/// The request message pass to [`Response`](connection/request/index.html) and get [`Response`](connection/response/index.html)
fn process(socket: TcpStream, mutex: &'static Arc<Mutex<Pool>>, addr: std::net::SocketAddr) {
    let (reader, writer) = socket.split();

    let messages = message::new(BufReader::new(reader));

    let mut requests = Request::new(addr.to_string());

    // note the `move` keyword on the closure here which moves ownership
    // of the reference into the closure, which we'll need for spawning the
    // client below.
    //
    // The `map` function here means that we'll run some code for all
    // requests (lines) we receive from the client. The actual handling here
    // is pretty simple, first we parse the request and if it's valid we
    // generate a response.
    let responses = messages.map(move |message| match Request::parse(&message, &mutex, &mut requests) {
        Ok(req) => req,
        Err(e) => return Response::Error { msg: format!("{}", e) },
    });

    // At this point `responses` is a stream of `Response` types which we
    // now want to write back out to the client. To do that we use
    // `Stream::fold` to perform a loop here, serializing each response and
    // then writing it out to the client.
    let writes = responses.fold(writer, |writer, response| {
        let response = response.serialize().into_bytes();
        write_all(writer, response).map(|(w, _)| w)
    });

    // `spawn` this client to ensure it
    // runs concurrently with all other clients, for now ignoring any errors
    // that we see.
    let connection = writes.then(move |_| {
        // write back
        let mut pool = mutex.lock().unwrap();

        // TODO: retry if failed once
        match pool.write_back(addr.to_string()) {
            Ok(_) => {}
            // if failed to write back to client, just log error.
            Err(e) => error!("{}", e),
        }
        Ok(())
    });

    // Spawn the task. Internally, this submits the task to a thread pool.
    tokio::spawn(connection);
}