SQL_db = struct
  type numeric_option = NO_None | NO_Unsigned | NO_UnsignedZeroFill
  type char_option    = CO_None | CO_Binary | CO_Ascii
  type varchar_option = VCO_None | VCO_Binary

  type ty =
    | TinyInt   of int option * numeric_option
    | MediumInt of int option * numeric_option
    | Int       of int option * numeric_option
    | BigInt    of int option * numeric_option

    | Double    of (int * int) option * numeric_option
    | Float     of (int * int) option * numeric_option
    | Decimal   of (int * int) option * numeric_option

    | Char      of int
    | VarChar   of int

    | TinyBlob
    | Blob
    | MediumBlob
    | LongBlob

    | TinyText
    | Text
    | MediumText
    | LongText

  type table = {
    mutable ta_name    : string;
    mutable ta_comment : string;
    mutable ta_pkey    : column list;
    mutable ta_db      : db;
    mutable ta_columns : column list;
    mutable ta_logged  : bool ;
  }

  and vtable = {
    mutable vt_name   : string;
    mutable vt_db     : db;
    mutable vt_ftable : table;
    mutable vt_join   : (table * (column * column) list) list
  }

  and column = {
    mutable col_name         : string;
    mutable col_comment      : string;
    mutable col_table        : table;
    mutable col_type         : ty;
    mutable col_nullable     : bool;
    mutable col_spec_options : (string list) Dbf_misc.StringMap.t;
    mutable col_spec_ty      : string Dbf_misc.StringMap.t;
    mutable col_ocaml_ty     : string;
    mutable col_sql2ml       : string;
    mutable col_ml2sql       : string;
  }

  and index = {
    mutable idx_name     : string;
    mutable idx_columns  : column list;
    mutable idx_unique   : bool;
    mutable idx_db       : db;
  }

  and query = {
      mutable qry_name    : string ;
      mutable qry_query   : string ;
      mutable qry_comment : string ;
      mutable qry_db      : db;
    }

  and db = {
    mutable db_tables  : table list;
    mutable db_vtables : vtable list;
    mutable db_indexes : index list;
    mutable db_queries : query list;
  }

  exception Invalid_name    of string
  exception Duplicated_name of string
  exception Invalid_args    of string

  let validate_name = fun s ->
    Str.string_match (Str.regexp "^[a-zA-Z0-9_]+$") s 0

  let validate_name_exn = fun s ->
    if not (validate_name s) then
      raise (Invalid_name s)

  let create_empty = fun () ->
    { db_tables = []; db_vtables = [];
      db_indexes = []; db_queries = []; }

  (*=======\
  | Tables |
  \=======*)

  let table_by_name = fun db name ->
    List.find (fun t -> t.ta_name = name) db.db_tables

  let table_by_name_opt = fun db name ->
    try
      Some (table_by_name db name)
    with
      | Not_found -> None

  let create_table_name = fun db ?(prefix = "table_") ?(from = 0) () ->
    let rec create = fun idx ->
      let name = prefix ^ (string_of_int idx) in
        match table_by_name_opt db name with
          | None   -> name
          | Some _ -> create (idx + 1)
    in
      create from

  let insert_table = fun db ~name ~comment ~logged ->
    validate_name_exn name;
    match table_by_name_opt db name with
      | None -> let table = { ta_name    = name;
                              ta_comment = comment;
                              ta_db      = db;
                              ta_columns = [];
                              ta_logged  = logged ;
                              ta_pkey    = []; }
        in
          db.db_tables <- db.db_tables @ [table];
          table
      | Some _ ->
          raise (Duplicated_name name)

  let rename_table = fun table ~name ->
    if table.ta_name <> name then begin
      validate_name_exn name;
      match table_by_name_opt table.ta_db name with
        | None   -> table.ta_name <- name
        | Some _ -> raise (Duplicated_name name)
    end

  let set_primary_key = fun table columns ->
    assert (List.for_all                (* Phys. eq *)
              (fun c -> c.col_table == table)
              columns);
    table.ta_pkey <- columns

  let unset_primary_key = fun table -> table.ta_pkey <- []

  (*========\
  | Columns |
  \========*)

  let column_fullname = fun c ->
    Printf.sprintf "%s.%s" c.col_table.ta_name c.col_name

  let column_by_name = fun table ~name ->
    List.find (fun c -> c.col_name = name) table.ta_columns

  let column_by_name_opt = fun table ~name ->
    try
      Some (column_by_name table name)
    with
      | Not_found -> None

  let create_column_name = fun table ?(prefix = "column_") ?(from = 0) () ->
    let rec create = fun idx ->
      let name = prefix ^ (string_of_int idx) in
        match column_by_name_opt table name with
          | None   -> name
          | Some _ -> create (idx + 1)
    in
      create from

  let insert_column = fun table ~name ~comment ~ty ?(nullable = true) () ->
    validate_name_exn name;
    match column_by_name_opt table name with
      | None -> let column = { col_name         = name;
                               col_comment      = comment;
                               col_table        = table;
                               col_type         = ty;
                               col_nullable     = nullable;
                               col_spec_options = Dbf_misc.StringMap.empty;
                               col_spec_ty      = Dbf_misc.StringMap.empty;
                               col_ocaml_ty     = "";
                               col_sql2ml       = "";
                               col_ml2sql       = ""; }
        in
          table.ta_columns <- table.ta_columns @ [column];
          column
      | Some _ ->
          raise (Duplicated_name name)

  let rename_column = fun column ~name ->
    if column.col_name <> name then begin
      validate_name_exn name;
      match column_by_name_opt column.col_table name with
        | None   -> column.col_name <- name
        | Some _ -> raise (Duplicated_name name)
    end

  let string_of_spec_options = fun opts ->
    let string_of_spec_options_db = fun db options ->
      Printf.sprintf "%s = [%s]"  db
        (Dbf_misc.join ~sep:", " ~to_string:(fun x -> x) options)
    in
    let strings =
      Dbf_misc.StringMap.fold
        (fun db options strings ->
           (string_of_spec_options_db db options) :: strings)
        opts
        []
    in
      Dbf_misc.join ~sep:", " ~to_string:(fun x -> x)
        (List.rev strings)

  (*===============\
  | Virtual tables |
  \===============*)

  let create_vtable = fun ~name ~table ->
    validate_name_exn name;
    { vt_name   = name;
      vt_ftable = table;
      vt_join   = [];
      vt_db     = table.ta_db; }

  let do_join = fun vtable table constraints ->
    assert (vtable.vt_db == table.ta_db); (* Physical eq *)
    (* FIXME: We should check the constraints *)
    vtable.vt_join <- vtable.vt_join @ [table, constraints]

  let table_in_join = fun vtable table ->
    assert (vtable.vt_db == table.ta_db); (* Physical eq *)
    if vtable.vt_ftable == table then
      true
    else
      List.exists (fun (t, _) -> t == table) vtable.vt_join

  let vtable_by_name = fun db name ->
    List.find (fun vt -> vt.vt_name = name) db.db_vtables

  let vtable_by_name_opt = fun db name ->
    try
      Some (List.find (fun vt -> vt.vt_name = name) db.db_vtables)
    with
      | Not_found -> None

  let create_vtable_name = fun db ?(prefix = "vtable_") ?(from = 0) () ->
    let rec create = fun idx ->
      let name = prefix ^ (string_of_int idx) in
        match vtable_by_name_opt db name with
          | None   -> name
          | Some _ -> create (idx + 1)
    in
      create from

  let link_vtable_to_db = fun vtable ->
    let db = vtable.vt_db in
      List.iter
        (fun vt -> assert (vt != vtable); (* Physical eq *)
           if vtable.vt_name = vt.vt_name then
             raise (Duplicated_name vtable.vt_name))
        db.db_vtables;
      db.db_vtables <- db.db_vtables @ [vtable]

  let unlink_vtable = fun vtable ->
    let (vtables, to_be_removed) =
      List.partition (fun vt -> vt.vt_name <> vtable.vt_name)
        vtable.vt_db.db_vtables
    in
      vtable.vt_db.db_vtables <- vtables;
      match to_be_removed with
        | [vt] -> vt.vt_db <- (Obj.magic 0)
        | _    -> Dbf_misc.ie ()

  let rename_vtable = fun vtable ~name ->
    if name <> vtable.vt_name then
      match vtable_by_name_opt vtable.vt_db name with
        | None   -> vtable.vt_name <- name
        | Some _ -> raise (Duplicated_name name)

  (* FIXME: beurk, beurk *)
  let string_of_vtable = fun vtable ->
    let current = ref vtable.vt_ftable.ta_name
    and first   = ref true
    and string_of_constraint = fun (c1, c2) ->
      Printf.sprintf "%s = %s"
      (column_fullname c1)
      (column_fullname c2) in
    let do_join = fun (table, columns) ->
      let parent_current =
        if   !first
        then !current
        else Printf.sprintf "(%s)" !current
      in
        first := false;
        match columns with
          | [] ->
              current := Printf.sprintf "%s, %s" parent_current table.ta_name
          | _ ->
              let constraints =
                Dbf_misc.join
                  ~sep:" AND "
                  ~to_string:string_of_constraint
                  columns
              in
                current :=
                Printf.sprintf "%s INNER JOIN %s ON %s"
                  parent_current table.ta_name constraints
    in
      List.iter do_join vtable.vt_join;
      !current

  (*========\
  | Indexes |
  \========*)

  let table_of_index = fun index ->
    match index.idx_columns with
      | []      -> Dbf_misc.ie ()
      | hd :: _ -> hd.col_table

  let string_of_index = fun index ->
    Dbf_misc.join ~sep:", " ~to_string:(fun c -> c.col_name)
      index.idx_columns

  let index_by_name = fun db name ->
    List.find (fun i -> i.idx_name = name) db.db_indexes

  let index_by_name_opt = fun db name ->
    try
      Some (List.find (fun i -> i.idx_name = name) db.db_indexes)
    with
      | Not_found -> None

  let column_in_index = fun index column ->
    (* Physical eq *)
    List.memq column index.idx_columns

  let unlink_index = fun index ->
    let (indexes, to_be_removed) =
      List.partition (fun idx -> idx.idx_name <> index.idx_name)
        index.idx_db.db_indexes
    in
      index.idx_db.db_indexes <- indexes;
      match to_be_removed with
        | [idx] -> idx.idx_db <- (Obj.magic 0)
        | _     -> Dbf_misc.ie ()

  let create_index_name = fun db ?(prefix = "index_") ?(from = 0) () ->
    let rec create = fun idx ->
      let name = prefix ^ (string_of_int idx) in
        match index_by_name_opt db name with
          | None   -> name
          | Some _ -> create (idx + 1)
    in
      create from

  let insert_index = fun ~name ~columns ~unique ->
    validate_name_exn name;
    match columns with
      | []       ->
          raise (Invalid_args "An index must contain at least one column")
      | hd :: _  ->
          let db = hd.col_table.ta_db in
            assert
              (not (List.exists
                      (fun c -> c.col_table.ta_db != db) (* Phys. eq *)
                      columns));
            if index_by_name_opt db name <> None then
              raise (Duplicated_name name);
            let index = { idx_name    = name;
                          idx_db      = db;
                          idx_columns = columns;
                          idx_unique  = unique; }
            in
              db.db_indexes <- db.db_indexes @ [index];
              index

  let rename_index = fun index ~name ->
    if index.idx_name <> name then begin
      validate_name_exn name;
      match index_by_name_opt index.idx_db name with
        | None   -> index.idx_name <- name
        | Some _ -> raise (Duplicated_name name)
    end

  let update_index = fun index ~name ~columns ~unique ->
    if index.idx_name <> name then begin
      validate_name_exn name;
      begin
        match index_by_name_opt index.idx_db name with
          | None   -> index.idx_name <- name
          | Some _ -> raise (Duplicated_name name)
      end;
    end;
    if columns = [] then
      raise (Invalid_args "An index must contain at least one column");
    index.idx_name <- name;
    index.idx_columns <- columns;
    index.idx_unique <- unique

  (*========\
  | Queries |
  \========*)


  type query_state =
    | Query_ok of (column option) list (* return values (column option) *)
          * (string * column option) list (* ok, parameters (name,column option) *)
    | Query_parse_error of int * int * string (* line, char, message *)
    | Query_invalid_against_schema of string
    | Query_incorrect of string

  exception Bad_query of query_state

  let query_by_name = fun db name ->
    List.find (fun q -> q.qry_name = name) db.db_queries

  let query_by_name_opt = fun db name ->
    try Some (query_by_name db name)
    with Not_found -> None

  let unlink_query = fun query ->
    let (queries, to_be_removed) =
      List.partition (fun q -> q.qry_name <> q.qry_name)
        query.qry_db.db_queries
    in
    query.qry_db.db_queries <- queries;
    match to_be_removed with
    | [q] -> q.qry_db <- (Obj.magic 0)
    | _   -> Dbf_misc.ie ()

  let create_query_name = fun db ?(prefix = "query_") ?(from = 0) () ->
    let rec create = fun idx ->
      let name = prefix ^ (string_of_int idx) in
        match query_by_name_opt db name with
          | None   -> name
          | Some _ -> create (idx + 1)
    in
    create from

  let insert_query = fun db ~name ~query ~comment ->
    validate_name_exn name;
    if query_by_name_opt db name <> None then
      raise (Duplicated_name name);
    let query = { qry_name    = name;
                  qry_db      = db;
                  qry_query   = query;
                  qry_comment = comment; }
    in
    db.db_queries <- db.db_queries @ [query];
    query

  let rename_query = fun query ~name ->
    if query.qry_name <> name then begin
      validate_name_exn name;
      match query_by_name_opt query.qry_db name with
        | None   -> query.qry_name <- name
        | Some _ -> raise (Duplicated_name name)
    end

  let update_query = fun q ~name ~query ~comment ->
    if q.qry_name <> name then begin
      validate_name_exn name;
      begin
        match query_by_name_opt q.qry_db name with
          | None   -> q.qry_name <- name
          | Some _ -> raise (Duplicated_name name)
      end;
    end;
    q.qry_query <- query;
    q.qry_comment <- comment

  let query_parameters db query =
    let params = Hashtbl.create 13 in
    let bad state = raise (Bad_query state) in
    let add_param name colopt =
      try
        ignore (Hashtbl.find params name)
        (* FIXME: check that types/colums are identical ? *)
      with
        Not_found ->
          Hashtbl.add params name colopt
    in
    let table_by_name s =
      try table_by_name db s
      with Not_found ->
        let msg = Printf.sprintf "No table %s in schema" s in
        bad (Query_invalid_against_schema msg)
    in

    let env_from_from =
      let f acc t =
        let (name,t) = match t with
          `table s -> (s, table_by_name s)
        | `tableas (s1,s2) ->
            let t = table_by_name s1 in
            (s2, t)
        in
        if List.exists (fun (s,_) -> s = name) acc then
          let msg = Printf.sprintf
              "No unique table name %s in from clause" name
          in
          bad (Query_incorrect msg)
        else
          (name,t) :: acc
      in
      List.fold_left f
    in
    let get_column env = function
        `ref s ->
          let l = List.fold_left
              (fun acc (table,t) ->
                match column_by_name_opt t s with
                  None -> acc
                | Some c -> (table,c)::acc)
              []
              env
          in
          (
           match l with
             [] ->
               let msg = Printf.sprintf "Unbound column name %s" s in
               bad (Query_invalid_against_schema msg)
           | [(_,c)] -> c
           | _ :: _ :: _ ->
               let msg = Printf.sprintf
                   "Ambiguous column name %s; could belong to %s"
                   s (String.concat ", " (List.map fst l))
               in
               bad (Query_invalid_against_schema msg)
          )
      |        `refdotref (s1,s2) ->
          try
            let t = List.assoc s1 env in
            column_by_name t s2
          with
            Not_found ->
              let msg = Printf.sprintf "Unbound column name %s.%s" s1 s2 in
              bad (Query_invalid_against_schema msg)
    in
    let check_column env c = ignore (get_column env c) in
    let rec in_query_exp env = function
        `select s -> in_select env s
      |        `union (q1,q2) ->
          (in_query_exp env q1) @ (in_query_exp env q2)
      |        `unionall (q1,q2) ->
          (in_query_exp env q1) @ (in_query_exp env q2)
    and in_select env (_mod,selection,from,where_opt,group_by_opt,having_opt) =
      let env = env_from_from env from in
      let return_type =
        match selection with
          `star -> bad (Query_incorrect "'*' not supported in selection")
        | `list l ->
            let f = function
                `column c -> Some (get_column env c)
              |        _ -> None
            in
            List.map f l
      in
      (match where_opt with None -> () | Some w -> in_condition env w);
      (match group_by_opt with None -> () | Some g -> in_group_by env g);
      (match having_opt with None -> () | Some h -> in_condition env h);
      return_type
    and in_group_by env l = List.iter (check_column env) l
    and in_condition env = function
        `cand (c1, c2)
      |        `cor (c1, c2) -> in_condition env c1; in_condition env c2
      |        `cnot c -> in_condition env c
      |        `p p -> in_predicate env p
    (* FIXME: improve analysis of expressions to guess types of parameters *)
    and in_predicate env = function
        `comparisonexp (e1,_,e2) -> in_exp env e1; in_exp env e2
      |        `comparisonselect (e1,_,sel) ->
          in_exp env e1; ignore (in_select env sel)
      |        `between (_,e1,e2,e3) ->
          in_exp env e1; in_exp env e2; in_exp env e3
      |        `like (_,e1,a1,a2opt) ->
          in_exp env e1;
          in_atom env a1;
          (match a2opt with None -> () | Some a -> in_atom env a)
      |        `iscolnull (_,col) -> check_column env col
      |        `in_select (_,e1,sel) ->
          in_exp env e1; ignore (in_select env sel)
      |        `in_atom_list (_,e1,l) ->
          in_exp env e1; List.iter (in_atom env) l
      |        `allorany (e1,_,_,sel) ->
          in_exp env e1; ignore (in_select env sel)
      |        `exists sel -> ignore (in_select env sel)
    and in_exp env = function
        `binop (_,e1,e2) ->
          in_exp env e1; in_exp env e2
      |        `uminus e -> in_exp env e
      |        `atom a -> in_atom env a
      |        `column c -> check_column env c
      |        `functioncall _ ->
          (* FIXME: implement *)
          ()
    and in_atom env = function
        `parameter p -> in_parameter env p
      |        _ -> ()
    and in_parameter env = function
        `single s -> add_param s None
      |        `single_annotated (s,s2,s3) ->
          (
           try
             let t = table_by_name s2 in
             let col = column_by_name t s3 in
             add_param s (Some col)
           with
             Not_found ->
               let msg = Printf.sprintf "Unknown column %s.%s" s2 s3 in
               bad (Query_invalid_against_schema msg)
          )
      |        `couple _ ->
          bad (Query_incorrect "Don't know what to do with couple parameters")
      |        `indicator _ ->
          bad (Query_incorrect "Don't know what to do with indicator parameters")
    in
    let (qexp,ordlist) = query in
    let return_type = in_query_exp [] qexp in
    (* FIXME: handle ordering list *)
    (* FIXME: prevent parsing of capitalized parameters ? *)
    let params =
      Hashtbl.fold
        (fun name colopt acc -> (name,colopt)::acc)
        params
        []
    in
    let params = List.sort
        (fun (name1,_) (name2,_) -> Pervasives.compare name1 name2)
        params
    in
    (return_type, params)


  let query_state = fun query ->
    try
      let q = Sqml.query_of_string query.qry_query in
      let (t,l) = query_parameters query.qry_db q in
      Query_ok (t,l)
    with
      Sqml.Syntax_error (l,c,s) -> Query_parse_error (l,c,s)
    | Bad_query s -> s

  let string_of_query_state = function
    | Query_invalid_against_schema s -> s
    | Query_incorrect s -> s
    | Query_parse_error (l,c,s) ->
        Printf.sprintf "%s line %d character %d" s l c
    | Query_ok (t,params) ->
        let type_of_col_opt = function
            None -> "string option"
          | Some col ->
              Printf.sprintf "(%s%s)"
                col.col_ocaml_ty
                (if col.col_nullable then " option" else "")
        in
        let params = List.map
            (fun (name,copt) ->
              Printf.sprintf "%s: %s -> " name
                (type_of_col_opt copt)
            )
            params
        in
        let t = Printf.sprintf "(%s) list"
            (String.concat "*" (List.map type_of_col_opt t))
        in
        Printf.sprintf "%s %s" (String.concat "" params) t

  (*======================================\
  | Table/Virtual table/Index interaction |
  \======================================*)

  let vtables_using_table_part = fun table ->
    let vtable_use_table = fun vtable ->
      vtable.vt_ftable == table ||      (* Phys. eq *)
      List.exists (fun (t, _) -> t == table) vtable.vt_join
    in
      List.partition vtable_use_table table.ta_db.db_vtables

  let vtables_using_column_part = fun c ->
    let db = c.col_table.ta_db
    and vtable_use_column = fun vtable ->
      List.exists
        (fun (_, columns) ->
           List.exists (fun (c1, c2) -> c1 == c || c2 == c) columns)
        vtable.vt_join
    in
      List.partition vtable_use_column db.db_vtables

  let indexes_using_table_part = fun table ->
    let index_use_table = fun index ->
      match index.idx_columns with
        | c :: _ -> c.col_table == table (* Phys. eq *)
        | _      -> Dbf_misc.ie ()
    in
      List.partition index_use_table table.ta_db.db_indexes

  let indexes_using_column_part = fun column ->
    let index_use_column = fun index ->
      List.exists (fun c -> c == column) index.idx_columns (* Phys. eq *)
    in
      List.partition index_use_column
        column.col_table.ta_db.db_indexes

  let vtables_using_table = fun table ->
    fst (vtables_using_table_part table)

  let vtables_using_column = fun c ->
    fst (vtables_using_column_part c)

  let indexes_using_table = fun table ->
    fst (indexes_using_table_part table)

  let indexes_using_column = fun column ->
    fst (indexes_using_column_part column)

  (*=======\
  | tables |
  \=======*)

  let unlink_table = fun table ->
    let (tables, to_be_removed) =
      List.partition (fun t -> t.ta_name <> table.ta_name)
        table.ta_db.db_tables
    in
    let (t_use, t_dont_use) = vtables_using_table_part table
    and (i_use, i_dont_use) = indexes_using_table_part table in
      table.ta_db.db_tables  <- tables;
      table.ta_db.db_vtables <- t_dont_use;
      table.ta_db.db_indexes <- i_dont_use;
      match to_be_removed with
        | [t] -> t.ta_db <- create_empty (); (t_use, i_use)
        | _   -> Dbf_misc.ie ()

  let unlink_column = fun column ->
    let (columns, to_be_removed) =
      List.partition (fun c -> c.col_name <> column.col_name)
        column.col_table.ta_columns
    in
    let (t_use, t_dont_use)   = vtables_using_column_part column
    and (i_use, i_dont_use)   = indexes_using_column_part column
    and (pk_use, pk_dont_use) =
      (* Phys. eq *)
      List.partition (fun c -> c == column) column.col_table.ta_pkey
    in
      column.col_table.ta_columns       <- columns;
      column.col_table.ta_pkey          <- pk_dont_use;
      column.col_table.ta_db.db_vtables <- t_dont_use;
      column.col_table.ta_db.db_indexes <- i_dont_use;
      match to_be_removed with
        | [c] -> c.col_table <- (Obj.magic 0); (t_use, i_use, pk_use <> [])
        | _   -> Dbf_misc.ie ()
end