grimoire

0.1.0-SNAPSHOT


dependencies

org.clojure/clojure
1.4.0
org.slf4j/slf4j-api
1.6.6
org.slf4j/slf4j-log4j12
1.6.6
org.clojure/tools.logging
0.2.3
lamina
0.5.0-SNAPSHOT
aleph
0.3.0-beta7
zookeeper-clj
0.9.2
ring
1.1.1
compojure
1.1.1
hiccup
1.0.0
ring/ring-jetty-adapter
1.1.0
ring-json-params
0.1.3
compojure
1.1.0
slingshot
0.10.3
cheshire
4.0.3
timewarrior/clj-aws-s3
0.3.3
org.clojure/tools.cli
0.2.1
protobuf
0.6.1
clj-oauth2
0.2.0
org.clojure/tools.nrepl
0.2.0-beta9

dev dependencies

criterium
0.2.1



(this space intentionally left almost blank)
 
(ns grimoire.cluster
  (:refer-clojure :exclude [join])
  (:require  [grimoire
              [node :as node]
              [protocol :as protocol]]
             [zookeeper :as zk]
             [zookeeper.data :as data]
             [zookeeper.logger :as logger]
             [clojure.tools.logging :as log]))

publish node-state to zookeeper use zk to switch nodes on and off and control amount of traffic that goes to a distinct node

(def group-name "/cluster")
(def ^:dynamic *client* nil)
(defn get-local-ip
  []
  (.getHostAddress (java.net.InetAddress/getLocalHost)))
(defn serialize
  [node-data]
  (data/to-bytes (pr-str node-data)))
(defn deserialize
  [node-data]
  (read-string (data/to-string node-data)))
(defn get-node-path
  [node-name]
  (str group-name "/" node-name))
(defn get-node-data
  [node-name]
  (let [response (zk/data *client* (get-node-path node-name))]
    (deserialize (:data response))))
(defn get-remote-ip
  [node-name]
  (let [response (zk/exists *client* (get-node-path node-name))]
    (when-not (nil? response)
      (:host (deserialize (:data response))))))
(defn ^:private create-group
  []
  (when-not (zk/exists *client* group-name)
    (zk/create *client* group-name :persistent? true)))
(defn ^:private node-watcher
  [node-name event]
  (log/info "node-watcher" node-name event)
  (if (= (:event-type event) :NodeDeleted)
    (node/disconnect node-name)
    (do
      (log/info "node-watcher" (:path event))
      (zk/exists *client* (:path event) :watcher (partial node-watcher node-name)))))
(defn get-remote-nodes
  []
  (filter #(not (= (node/get-node-name) %)) (zk/children *client* group-name)))
(defn ^:private group-watcher
  [event]
  (when (= (:event-type event) :NodeChildrenChanged)
    (log/info "group-watcher" event)
    (let [all-nodes (zk/children *client* group-name :watcher group-watcher)
          connections (node/get-connections)
          remote-nodes (->> all-nodes
                            (filter #(not (= (node/get-node-name) %)))
                            (filter #(nil? (connections %))))]
      (log/info "group-watcher" all-nodes remote-nodes)
      (doseq [node-name remote-nodes]
        (let [node-data (get-node-data node-name)]
          (log/info "group-watcher" node-name node-data)
          (zk/exists *client* (get-node-path node-name)
                     :watcher (partial node-watcher node-name))
          (node/connect node-name node-data))))))
(defn join
  [local-port]
  (let [ip (get-local-ip)
        ;; needs refactoring
        ;; inet-socket-address (str ip ":" local-port)
        node-name (node/get-node-name)
        node-data (serialize {:host ip :port local-port :node-name node-name})]
    (log/info "join-cluster" ip local-port node-name)
    (zk/create *client* (get-node-path node-name) :data node-data))
  (zk/children *client* group-name :watcher group-watcher))
(defn start
  [{:keys [zk-host zk-port local-host local-port]}]
  (log/info "start" zk-host zk-port local-host local-port)
  (let [client (zk/connect (str zk-host ":" zk-port))]
    (alter-var-root #'*client* (fn [_] client)))
  (create-group)
  (join local-port))
 
(ns grimoire.config)
(defonce system
  {:save-interval 10000
   :session-timeout 60000
   :session-version 1})
(defonce facebook
  {:app-id 
   :app-secret 
   :app-name 
   :api-key 
   :canvas-url })
(defonce zookeeper
  [{:port 2181 :host "127.0.0.1"}
   {:port 2181 :host "127.0.0.1"}
   {:port 2181 :host "127.0.0.1"}])
(defonce aws
  {:access-key "033G2TPHEBTWC360RRG2"
   :secret-key "mCfw/OxodR/YDyC6qINI1VSYuw0dn1+XoGMRMDg5"
   :bucket "grimoire-dev"})

default options

(defonce db
  {:host "localhost"
   :port 6379})

default options

(defonce canvas-server
  {:host "localhost"
   :port 3000})

default options

(defonce api-server
  {:host "localhost"
   :port 4000})
 
(ns grimoire.core
  (:gen-class)
  (:use clojure.tools.cli)
  ;; (:use [clojure.tools.nrepl.server :only [start-server stop-server]])
  (:require [grimoire
             [config :as config]
             [web :as web]
             [cluster :as cluster]
             [node :as node]
             [registry :as registry]
             [landing :as landing]
             [session :as session]]
            [clojure.tools.logging :as log]))
(set! *warn-on-reflection* true)

(defonce nrepl-server (start-server :port 7888))

(defn run-test
  [users]
  (dotimes [user-id users]
    (session/setup user-id)
    (when (= (rem user-id 1000) 0)
      ;; (log/info (str "done" user-id "so far")))))
(defn start
  [options]
  (println "Starting server...")
  (node/start-server (:cluster-port options) (:node-id options))
  (cluster/start {:zk-host "127.0.0.1" :zk-port 2181
                  :local-port (:cluster-port options)})
  (registry/start "127.0.0.1" 2181)
  (web/start (:api-port options))
  ;; (landing/start (:canvas-port options))
  (println (format "'%s' waiting for work, work!" (node/get-node-name))))
(defn stop
  []
  (println "Preparing exit...")
  ;; FIXME server states are missing
  (shutdown-agents)
  (web/stop)
  (println "All agents are dead.\nGoodbye."))
(defn at-exit
  [runnable]
  (.addShutdownHook (Runtime/getRuntime) (Thread. ^Runnable runnable)))
(defn -main
  [& args]
  (let [[options args banner]
        (cli args
             ["-h" "--help" "Show help" :default false :flag true]
             ["-i" "--node-id" "Node ID for cross-node addressing"
              :default 0 :parse-fn #(Integer/parseInt %)]
             ["-n" "--cluster-port" "Local cluster listening port"
              :default 7000 :parse-fn #(Integer/parseInt %)]
             ["-a" "--api-port" "Client facing API listening port"
              :default (config/api-server :port) :parse-fn #(Integer/parseInt %)]
             ["-c" "--canvas-port" "Landing page listening port"
              :default (config/canvas-server :port) :parse-fn #(Integer/parseInt %)])]
    (when (:help options)
      (println banner)
      (System/exit 0))
    (start options))
  (at-exit stop))
 
(ns grimoire.game
  (:require [clojure.tools.logging :as log]
            [grimoire
             [level :as level]
             [map :as map]]))
(defn get-update-function
  [event]
  (case (:action event)
    :level (level/update event)
    :map (map/update event)
    (throw (Exception. "no_update_function_found, args=[%s]" event))))
 
(ns grimoire.landing
  (:use [ring.adapter.jetty :only [run-jetty]]
        [compojure.core]
        [clojure.tools.logging :only [info error]])
  (:require [compojure.route :as route]
            [hiccup
             [page :refer [html5 include-js]]
             [element :refer [javascript-tag]]]
            [grimoire.config :as config]))
(def ^:dynamic *jetty-server* (atom nil))
(defn- run-clojurescript [path init]
  (list
   (javascript-tag "var CLOSURE_NO_DEPS = true;")
   (include-js path)
   (javascript-tag init)))
(defn index
  []
  (html5
   [:head
    [:title "grimoire"]]
   [:body
    [:div {:id "content"} "Hello World"]
    (run-clojurescript
     "/js/main.js"
     "grimoire_client.core.say_hello()")]))
(defroutes application
  (GET "/" [] (index))
  (route/resources "/")
  (route/not-found "Page not found"))
(defn start
  [port]
  (let [jetty-server (run-jetty #'application {:port (or port 8080) :join? false})]
    (reset! *jetty-server* jetty-server))
  (info "Canvas server running on port " port))
(defn stop
  []
  (.stop @*jetty-server*)
  (info "Stopped canvas server"))
 
(ns grimoire.level
  (:require [clojure.tools.logging :as log])
  (:use grimoire.util
        [lamina core executor]))
(def example-levels
  {1 {:required-points 0 :reward 0}
   2 {:required-points 10 :reward 5}
   3 {:required-points 20 :reward 5}
   4 {:required-points 30 :reward 5}
   5 {:required-points 40 :reward 5}})
(defmulti update :verb)
(defupdate update :complete
  (fn [state {:keys [level]}]
    {:pre (number? level)}
    (let [level-config (example-levels level)]
      (if (>= (:points state) (:required-points level-config))
        (-> state
            (update-in [:balance] + (:reward level-config))
            to-result-map)
        (throw (Exception.
                (format "insufficient_points_for_level_complete, args=[%s, %s]"
                        (:id state) level)))))))
 
(ns grimoire.map
  (:require [clojure.tools.logging :as log])
  (:use [lamina core executor]
        [grimoire.util]
        [cheshire.core :only [generate-string parse-string]]))
(def example-entities
  {1 {:name "entity 1" :width 1 :height 1 :cost 5}
   2 {:name "entity 2" :width 1 :height 1 :cost 7}
   3 {:name "entity 3" :width 1 :height 1 :cost 4}})
(def example-contracts
  {1 {:name "contract 1" :duration 2 :reward 5}
   2 {:name "contract 2" :duration 4 :reward 22}
   3 {:name "contract 3" :duration 6 :reward 30}})
(def default-width 10)
(def default-height 10)
(defn ^:private m-keyword
  ([x y]
     (m-keyword [x y]))
  ([[x y]]
     (keyword (str "x" x "y" y))))
(defn generate-empty-map
  ;; Creates an empty map of the structure {:x1y1 nil :x1y2 nil}
  [width height]
  (let [m (reduce
           #(assoc! %1 (m-keyword %2) nil)
           (transient {})
           (mapcat (fn [x] (for [y (range width)] [x y])) (range height)))]
    (persistent! m)))
(def empty-map (memoize generate-empty-map))
(defmulti update :verb)
(defupdate update :add
  (fn [state {:keys [id x y]}]
    ;; (log/info "add" (Thread/currentThread))
    ;; (Thread/sleep 10000)
    (let [entity-config (example-entities id)
          coord-key (m-keyword x y)
          entity (get-in state [:map coord-key])]
      (if (nil? entity)
        (-> state
            (assoc-in [:map coord-key] {:id id})
            (update-in [:balance] - (:cost entity-config))
            to-result-map)
        (throw (Exception. (format "map_position_not_empty, args=[%s,%s]" x y)))))))
(defupdate update :start-contract
  (fn [state {:keys [id x y]}]
    (let [coord-key (m-keyword x y)
          entity (get-in state [:map coord-key])]
      (if (and (not (nil? entity))
               (nil? (:contract-start entity)))
        (-> state
            (update-in [:map coord-key] merge
                       {:contract-start (System/currentTimeMillis) :contract-id id})
            to-result-map)
        (throw (Exception. (format "start_contract_failed, args=[%s, %s]"
                                   (:id state) entity)))))))
(defn ^:private collectable?
  [entity]
  {:pre [(has-keys? entity :contract-start :contract-id)]}
  (let [contract-config (example-contracts (:contract-id entity))
        duration (:duration contract-config)]
    (> (System/currentTimeMillis) (+ duration (:contract-start entity)))))
(defupdate update :collect-contract
  (fn [state {:keys [x y]}]
    (let [coord-key (m-keyword x y)
          entity (get-in state [:map coord-key])]
      (if (collectable? entity)
        (-> state
            (assoc-in [:map coord-key] (dissoc entity :contract-start :contract-id))
            (update-in [:balance] - (:reward (example-contracts (:contract-id entity))))
            to-result-map)
        (throw (Exception. (format "collect_contract_failed, args=[%s, %s]"
                                   (:id state) entity)))))))
 
(ns grimoire.node
  (:use lamina.stats
        [lamina core executor])
  (:require [clojure.tools.logging :as log]
            [aleph.tcp :as aleph]
            [gloss.core :as gloss]
            [grimoire.protocol :as protocol])
  (:import java.util.Arrays
           java.util.zip.ZipException))
(def ^:dynamic *server-close-fn* nil)
(defonce node-prefix "grim-")
(def local-node-id)
(def local-node-name)
(def inbound-channel (permanent-channel))
(def stats-channel (permanent-channel))
(def connections (atom {}))
(defn get-node-id [] local-node-id)
(defn get-node-name [] local-node-name)
(defn get-connections [] @connections)
(defn register-stats-channel
  [event-channel]
  (log/info "register-stats-channel" event-channel)
  (siphon (fork stats-channel) event-channel))
(defn publish-event
  [node-name message]
  (when-not (nil? (@connections node-name))
    (let [remote-channel (:channel (@connections node-name))]
      (when-not (nil? remote-channel)
        ;; TODO move encoding/decoding to separate namespace
        (enqueue remote-channel (protocol/encode message))))))
(defn ^:private decode-frame
  [msg-bytes]
  (let [msg-buffer (gloss.io/contiguous msg-bytes)
        msg-byte-array (byte-array (.remaining msg-buffer))]
    (.get msg-buffer msg-byte-array)
    (protocol/decode msg-byte-array)))
(defn ^:private client-success-callback
  [node-name client-channel]
  (log/info "on-success-callback" node-name client-channel)
  ;; TODO make this (:status) less explicit and actually get state from channel
  (swap! connections assoc node-name {:status :connected :channel client-channel})
  (log/info "client-success-callback" (@connections node-name))
  ;; FIXME use pipeline and handle errors
  ;; synchronously wait for reply and move forward from there on
  (let [msg {:type :node-connection-start :content {:node-name node-name}}]
    (enqueue client-channel (protocol/encode msg)))
  (siphon (map* #(decode-frame %) client-channel) inbound-channel))
(defn ^:private client-error-callback
  [node-name error]
  (log/info "on-error-callback" node-name error))
(defn start-client
  [host port node-name]
  (let [options {:host host :port port :frame (gloss/finite-block :int32)}
        client-result-channel (aleph/tcp-client options)]
    (on-realized client-result-channel
                        (partial client-success-callback node-name)
                        (partial client-error-callback node-name))))

TODO actually implement reconnection logic don't forget back-off

(defn ^:private reconnect
  []
  (log/info "reconnect-client"))
(defn disconnect
  [node-name]
  (log/info "disconnect")
  (let [remote-connection (@connections node-name)]
    (try
      (close (:channel remote-connection))
      (catch Exception exception
        (log/error "disconnect" (.getMessage exception))))
    (swap! connections dissoc node-name)))
(defn connect
  [node-name node-data]
  (log/info "connect" node-name node-data)
  (when (nil? (@connections node-name))
    (swap! connections assoc-in [node-name :status] :connecting)
    (start-client (:host node-data) (:port node-data) node-name)))
(defn server-handler
  [connection-channel client-info]
  (log/info "server-handler" client-info (:address client-info))
  ;; TODO map bare rate messages to more descriptive hashmaps
  ;; (siphon (rate connection-channel) stats-channel)
  ;; FIXME very brittle; fix
  (run-pipeline
   connection-channel
   read-channel
   (fn [message]
     (let [node-connection-start (decode-frame message)
           remote-node-name (get-in node-connection-start [:event
                                                           :node-connection-start
                                                           :node-name])]
       (swap! connections assoc remote-node-name
              {:status :connected :channel connection-channel})
       (siphon (map* #(decode-frame %) connection-channel) inbound-channel)))))
(defn ^:private create-distributor
  []
  (distributor :user-id
               (fn [facet facet-channel]
                 (log/info "distributor" facet facet-channel)
                 ;; here's hoping to named-channel's idempotency
                 (let [user-channel (named-channel (keyword (str facet)) (fn [_]))]
                   (log/info "distributor" user-channel)
                   (close-on-idle 5000 facet-channel)
                   (close-on-idle 5000 user-channel)
                   (ground user-channel)
                   (siphon facet-channel user-channel)))))
(defn start-server
  [^Integer port ^Integer node-id]
  (log/info "start-server" port node-id)
  (alter-var-root #'local-node-id (fn [_] node-id))
  (alter-var-root #'local-node-name (fn [_] (str "grim-" node-id)))
  ;; (receive-all inbound-channel #(log/info "inbound-channel received" %))
  (siphon inbound-channel (create-distributor))
  ;; If a tree falls in a forest and no one is around to hear it, does it make a sound?
  (ground stats-channel)
  (let [options {:port port :frame (gloss/finite-block :int32)}
        server (aleph/start-tcp-server server-handler options)]
    (alter-var-root #'*server-close-fn* (fn [_] server))))
(defn stop-server
  []
  (*server-close-fn*)
  (log/info "stop-server"))
 
(ns grimoire.protocol
  (:refer clojure.string :only [upper-case])
  (:require [clojure.tools.logging :as log])
  (:import Nodemessages$Event
           Nodemessages$Foo
           Nodemessages$Bar
           Nodemessages$NodeConnectionStart)
  (:use protobuf.core))
(def Event (protodef Nodemessages$Event))
(def Foo (protodef Nodemessages$Foo))
(def Bar (protodef Nodemessages$Bar))
(def NodeConnectionStart (protodef Nodemessages$NodeConnectionStart))
(def protobuf-content-types {:foo Foo
                             :bar Bar
                             :node-connection-start NodeConnectionStart})
(defn ^:private get-protobuf-type
  [type]
  (if-let [the-type (type protobuf-content-types)]
    the-type
    (throw (Exception. (format "protobuf_type_does_not_exist, args=[%s]" type)))))
(defn decode
  [msg-bytes]
  (let [event (protobuf-load Event msg-bytes)
        content ((:type event) event)]
    {:user-id (:receiver content) :event event}))
(defn encode [message]
  (let [type (:type message)
        content (flatten (seq (:content message)))
        protobuf-content-type (get-protobuf-type type)
        protobuf-content (apply protobuf protobuf-content-type content)
        event-type (upper-case (name type))
        protobuf-msg (protobuf Event :type event-type type protobuf-content)]
    (protobuf-dump protobuf-msg)))
 
(ns grimoire.registry
  (:require [clojure.tools.logging :as log]
            [grimoire
             [config :as config]
             [node :as node]
             [cluster :as cluster]]
            [zookeeper :as zk]
            [zookeeper.data :as zk-data]
            [zookeeper.util :as zk-util]))
(defonce registry-group-name "/registry")
(def ^:dynamic *client* nil)
(defn ^:private to-zk-location
  [user-id & z-nodes]
  (str registry-group-name "/" user-id (clojure.string/join "/" z-nodes)))
(defn register
  [user-id valid-until cluster-node]
  ;; (log/info "register" user-id valid-until cluster-node)
  (let [user-znode (to-zk-location user-id)
        lock-znode (str user-znode "/_lock-")]
    (let [create-response (zk/create-all *client* lock-znode
                                         :persistent? true :sequential? true)
          create-id (zk-util/extract-id create-response)
          user-node-response (zk/exists *client* user-znode)]
      (if (= 0 create-id)
        (zk/set-data *client* user-znode
                     (cluster/serialize {:valid-until valid-until :node cluster-node})
                     (:version user-node-response))
        (throw
         (Exception. (format "session_already_registered, args=[%s]" user-id)))))))
(defn deregister
  [user-id]
  (log/info "deregister" user-id (to-zk-location user-id))
  (zk/delete-all *client* (to-zk-location user-id)))
(defn extend-timeout
  [user-id valid-until]
  (let [zk-location (to-zk-location user-id)
        response (zk/exists *client* zk-location)
        data (cluster/deserialize (:data response))
        new-data (assoc-in data :valid-until valid-until)]
    (zk/set-data *client* zk-location (cluster/serialize new-data) (:version response))))
(defn deregister-all
  []
  (log/info "delete all registered users")
  (zk/delete-all *client* registry-group-name))
(defn get-node-data
  [user-id]
  (let [response (zk/data *client* (to-zk-location user-id))]
    (cluster/deserialize (:data response))))
(defn get-location
  [user-id]
  (:node (get-node-data user-id)))
(defn local?
  [user-id]
  (let [user-node-name (:node (get-node-data user-id))]
    (= user-node-name (node/get-node-name))))
(defn registered?
  [user-id]
  (not (nil? (get-node-data user-id))))
(defn ^:private create-registry-group
  []
  (when (nil? (zk/exists *client* registry-group-name))
    (zk/create *client* registry-group-name :persistent? true)))

TODO check for dev environment

(defn ^:private dev-clean
  []
  (let [remote-nodes (cluster/get-remote-nodes)]
    (when (empty? remote-nodes)
      (zk/delete-all *client* registry-group-name))))
(defn start
  [host port]
  (let [client (zk/connect (str host ":" port))]
    (alter-var-root #'*client* (fn [_] client)))
  ;; developer-sanity-protection-measures
  (dev-clean)
  (create-registry-group))
(defn b-fn
  [i]
  (zk/create *client* (to-zk-location i)))

(future (do (log/info "begin sleep") (Thread/sleep 8000) (log/info "done sleeping") (log/info "begin zk node creation") (let [t (time (dotimes [i 5000] (b-fn i)))] (log/info t))))

 
(ns grimoire.session
  (:use [lamina core executor])
  (:import (org.jboss.netty.util HashedWheelTimer Timeout TimerTask)
           (java.util.concurrent TimeUnit))
  (:require [grimoire.config :as config]
            [grimoire.registry :as registry]
            [grimoire.node :as node]
            [grimoire.storage :as storage]
            [grimoire.user :as user]
            [grimoire.game :as game]
            [grimoire.session-store :as store]
            [grimoire.util :as util]
            [clojure.tools.logging :as log]))
(defonce ^:private ^HashedWheelTimer hashed-wheel-timer (HashedWheelTimer.))
(defn get-expire-time
  []
  (+ (System/currentTimeMillis)
     (:session-timeout config/system)))
(defn- get-request-channel
  [user-id]
  (let [session (store/get user-id)]
    (:request-channel @session)))
(defn- enqueue-event
  [user-id event]
  (enqueue (get-request-channel user-id) event))
(defn- close-request-channel
  [user-id]
  (let [session (store/get user-id)
        request-channel (:request-channel @session)]
    (close request-channel)))
(defn get-event-channel
  [user-id]
  (let [session (store/get user-id)
        event-channel (:event-channel @session)]
    (if (or (nil? event-channel)
            (closed? event-channel))
      (let [new-event-channel (channel)]
        (swap! session assoc :event-channel new-event-channel)
        new-event-channel)
      event-channel)))

{:keys [async?] :or {:async? false} :as options}

(defmacro build-pipeline
  [& tasks]
  (let [channel (gensym 'channel)]
    `(fn [~channel value#]
       (run-pipeline value#
                     {:error-handler #(error ~channel %)}
                     ~@(map
                        (fn [s]
                          `(fn [event#]
                             (task (~s ~channel event#))))
                        tasks)))))

(util/log-with-thread "generic-event-stage - start sleep") (Thread/sleep 5000) (util/log-with-thread "generic-event-stage - done sleeping")

(defn- handle-game-event
  [{:keys [user-id payload args] :as event}]
  (let [session (store/get user-id)
        result-map (apply (game/get-update-function payload) (:state @session) args)]
    (swap! session assoc :state (:state result-map))
    (assoc event :response (:response result-map))))
(declare periodic-save)
(defn- handle-system-event
  [{:keys [user-id payload args] :as event}]
  (log/info "handle-system-event" event)
  (let [session (store/get user-id)]
    (case (:action payload)
      :periodic-save ((partial periodic-save session) args)
      nil)))
(defn- handle-event
  [event]
  (case (:type event)
    :game (handle-game-event event)
    :system (handle-system-event event)
    event))
(defn- process-event
  [ch event]
  (try
    (handle-event event)
    (catch Exception e
      (close ch)
      (ground ch)
      (log/error "pipeline_error" e)
      {:type :error
       :receiver (:receiver event)
       :error {:error (.getMessage e)}})))
(defn- reply
  [{:keys [type receiver response error] :as event}]
  (case type
    :game (if (:close? event)
            (enqueue-and-close receiver response)
            (enqueue receiver response))
    :error (enqueue-and-close receiver error)
    event))
(defn- receive-in-order-with-pipeline
  [ch]
  (consume ch
           :channel nil
           :initial-value nil
           :reduce (fn [_ event]
                     (run-pipeline event
                                   {:error-handler #(error ch %)}
                                   ;; watch for async tags
                                   #(process-event ch %)
                                   reply))))
(defn enqueue-game-event
  [user-id action response-channel]
  (if-let [session (store/get user-id)]
    (let [receiver (or response-channel (:event-channel @session))
          request-channel (:request-channel @session)]
      (if (closed? request-channel)
        (enqueue-and-close receiver {:error "request_channel_is_closed"})
        (enqueue request-channel {:type :game
                                  :user-id user-id
                                  :receiver receiver
                                  :close? true
                                  :reload-on-error? true
                                  :payload action
                                  :args nil})))
    (throw (Exception. (format "session_not_running, args=[%s]" user-id)))))
(defn- clean-timeouts
  [session]
  (doseq [timeout-type [:session-timeout :save-timeout]]
    (try
      (.cancel ^Timeout (timeout-type @session))
      (catch Exception e
        (log/error "clean-timeouts" (.getMessage e)))
      (finally
       (swap! session assoc timeout-type nil)))))
(defn- save-to-storage
  [user-id state]
  (log/info "save-to-storage" user-id)
  (let [user-json (user/to-json state)]
    (storage/put-data user-id user-json)))
(defn- safe-close-channel
  [ch]
  (when-not (nil? ch)
    (try
      (close ch)
      (catch Exception e
        (log/error "safe-close-channel" (.getMessage e))))))
(defn- clean
  [user-id]
  (log/info "clean" user-id)
  (let [session (store/get user-id)
        channel-keys [:remote-channel :event-channel]]
    (doseq [channel-key channel-keys]
      (safe-close-channel (channel-key @session)))
    (clean-timeouts session)
    (store/remove user-id)
    (registry/deregister user-id))
  nil)
(defn- reset
  [user-id]
  ;; instead of resetting the session on the setup call
  ;; why not do it instantly?)
(defn- should-save?
  [state]
  (let [answer (> (:updated-at state) (:saved-at state))]
    (log/info "should-save?" answer)
    answer))
(defn- stop
  [user-id]
  (log/info "stop" user-id)
  (let [session (store/get user-id)
        state (:state @session)
        request-channel (:request-channel @session)]
    (close request-channel)
    (clean-timeouts session)
    (when (should-save? state)
      (try
        (save-to-storage user-id (assoc state :saved-at (System/currentTimeMillis)))
        (catch Exception e
          (log/error "stop" (.getMessage e) user-id))))
    (clean user-id)
    nil))
(defn- get-timeout
  [delay timeout-fn]
  (let [timer-task (reify org.jboss.netty.util.TimerTask
                     (^void run [this ^Timeout timeout]
                       (timeout-fn timeout)))]
    (.newTimeout hashed-wheel-timer timer-task delay (TimeUnit/MILLISECONDS))))
(defn- renew-timeout
  [user-id delay timeout-type timeout-fn]
  (log/info "renew-timeout" user-id timeout-type timeout-fn)
  (let [session (store/get user-id)]
    (.cancel ^Timeout (timeout-type @session))
    (swap! session assoc timeout-type
           (get-timeout delay (partial timeout-fn user-id)))))
(defn- session-timeout-handler
  [user-id ^:Timeout timeout]
  (log/info "session-timeout-handler" user-id timeout)
  (future (stop user-id)))
(declare save-timeout-handler)
(defn- periodic-save
  [session user-id]
  (log/info "periodic-save" user-id)
  (let [new-state (assoc (:state @session) :saved-at (System/currentTimeMillis))
        save-interval (config/system :save-interval)]
    (save-to-storage user-id new-state)
    (swap! session assoc :state new-state)))
(defn- save-timeout-handler
  [user-id ^:Timeout timeout]
  (log/info "save-timeout-handler")
  (let [state (:state @(store/get user-id))
        save-interval (config/system :save-interval)]
    (renew-timeout user-id save-interval :save-timeout save-timeout-handler)
    (when (should-save? state)
      (enqueue-event user-id {:type :system
                              :user-id user-id
                              :reload-on-error false
                              :payload {:action :periodic-save}
                              :args [user-id]}))))
(defn handle-remote-message
  [msg]
  (log/info "handle-remote-message" msg))
(defn- start
  [user-id state]
  (log/info "start")
  (let [remote-channel (named-channel (keyword (str user-id)) nil)
        request-channel (channel)
        session-timeout (get-timeout (config/system :session-timeout)
                                     (partial session-timeout-handler user-id))
        save-timeout (get-timeout (config/system :save-interval)
                                  (partial save-timeout-handler user-id))
        session (atom {:state state
                       :request-channel request-channel
                       :event-channel nil
                       :remote-channel remote-channel
                       :session-timeout session-timeout
                       :save-timeout save-timeout})]
    (receive-in-order-with-pipeline request-channel)
    (store/put user-id session)
    (receive-all remote-channel handle-remote-message))
  nil)
(defn- try-restart
  [user-id]
  (log/info "try-testart")
  (let [session (store/get user-id)
        request-channel (:request-channel @session)]
    (if (closed? request-channel)
      (let [new-request-channel (channel)]
        (receive-in-order-with-pipeline new-request-channel)
        (swap! session assoc :request-channel request-channel)
        (user/to-json (:state @session)))
      (throw (Exception. (format "session_still_running, args=[%s]" user-id))))))
(defn- load-user
  [user-id]
  (log/info "load-user")
  (let [result (storage/get-data user-id)]
    (if (nil? result)
      (user/new user-id)
      (user/from-json result))))
(defn- load-and-start
  [user-id]
  (log/info "load-and-start")
  (registry/register user-id (get-expire-time) (node/get-node-name))
  (try
    (let [state (load-user user-id)]
      (start user-id state)
      (user/to-json state))
    (catch Exception e
      (log/error "setup_failed" (.getMessage e) user-id)
      (clean user-id)
      (throw (Exception. (format "session_start_failed, args=[%s]" user-id))))))
(defn setup
  [user-id]
  (log/info "setup")
  (if (and (store/exists? user-id)
           (registry/registered? user-id)
           (registry/local? user-id))
    (try-restart user-id)
    (load-and-start user-id)))
(defn run-bench
  []
  (log/info "run-bench" (node/get-node-id))
  ;; (let [runs 1000
  (let [runs 10
        ;; global-start (* (node/get-node-id) runs)]
        global-start (* 0 runs)]
    ;; (Thread/sleep 5000)
    (dotimes [j runs]
      (let [batch-size 100
            start (+ global-start (* j batch-size))
            end (+ start batch-size)
            the-range (range start end)]
        (time (doseq [i the-range] (setup i))))
      (log/info "num-sessions" (store/num-sessions))
      ;; (Thread/sleep 1000))))
 
(ns grimoire.session-store
  (:refer-clojure :exclude [get remove])
  (:require [clojure.tools.logging :as log]))
(defonce ^:private store (atom {}))
(defn num-sessions [] (count @store))
(defn exists?
  [user-id]
  (not (nil? (@store user-id))))
(defn get
  [user-id]
  (@store user-id))
(defn put
  [user-id session]
  (if (nil? (@store user-id))
    (swap! store assoc user-id session)
    (throw (Exception. (format "session_already_running, args=[%s]" user-id)))))
(defn remove
  [user-id]
  (swap! store dissoc user-id))
 
(ns grimoire.storage
  (:use [slingshot.slingshot :only [try+ throw+]]
        [cheshire.core :only [generate-string]])
  (:require [aws.sdk.s3 :as s3]
            [grimoire
             [config :as config]]
            [clojure.tools.logging :as log])
  (:import com.amazonaws.services.s3.model.AmazonS3Exception))
(def credentials {:access-key (config/aws :access-key),
                  :secret-key (config/aws :secret-key)})
(def bucket (config/aws :bucket))
(defn ^:private id-to-storage-key
  [user-id]
  (str user-id))
(defn ^:private storage-key-to-id
  [storage-key]
  (Integer/parseInt storage-key))

TODO implement

(defn ^:private clean-bucket
  [])
(defn get-data
  [^Integer user-id]
  ;; (log/info "get-data" user-id)
  ;; (try
  ;;   (let [s3-object (s3/get-object credentials bucket (id-to-storage-key user-id))]
  ;;     (slurp (:content s3-object)))
  ;;   (catch AmazonS3Exception exception
  ;;     (when-not (= (.getErrorCode exception) "NoSuchKey")
  ;;       (log/error (.getMessage exception)))))
  nil)
(defn put-data
  [^Integer user-id ^String json]
  ;; (log/info "put-data" user-id)
  ;; (let [result (s3/put-object credentials bucket (id-to-storage-key user-id) json)]
  ;;   (log/info "put-data" result))
  nil)
 
(ns grimoire.timer)
 
(ns grimoire.user
  (:use [slingshot.slingshot :only [try+ throw+]]
        [cheshire.core :only [generate-string parse-string]])
  (:require [grimoire
             [config :as config]
             [map :as game-map]]
            [clojure.tools.logging :as log]))
(defrecord User [^Integer id
                 name
                 points
                 balance
                 ;; inventory
                 map
                 updated-at
                 created-at
                 saved-at
                 version])

add session-token

(defn new
  [user-id & name]
  (let [current-time (System/currentTimeMillis)
        current-version (:session-version config/system)
        default-map (game-map/empty-map 7 7)]
    {:id user-id
     :name (or name "John Doe")
     :points 0
     :balance 0
     :map default-map
     :updated-at current-time
     :created-at current-time
     :saved-at 0
     :version current-version}))
(defn from-struct
  [{:keys [id name points balance map
           updated-at created-at saved-at version], :or {saved-at 0}}]
  ;; (log/info "from-struct" map)
  ;; (User. id name points balance {} updated-at created-at saved-at
                                        ;version)
  (apply hash-map
         (User. id name points balance {}
                updated-at created-at saved-at version)))
(defn from-json
  [json]
  (let [struct (parse-string json true)]
    (from-struct struct)))
(defn to-json
  [user]
  (generate-string user))
 
(ns grimoire.util
  (:use [lamina core executor])
  (:require [clojure.tools.logging :as log]))
(defn current-time
  []
  (System/currentTimeMillis))
(defmacro log-with-thread
  [& msg]
  `(log/info ~@msg (Thread/currentThread)))
(defn has-keys?
  [m & ks] (every? #(contains? m %) ks))
(defn touch
  [state]
  (assoc state :updated-at (current-time)))
(defn to-result-map
  ([state]
     (to-result-map state {}))
  ([state response]
     {:state state :response response}))
(defmacro defupdate
  [multi-fn dispatch-value & body-fn]
  `(. ~(with-meta multi-fn  {:tag 'clojure.lang.MultiFn})
      addMethod
      ~dispatch-value
      (fn [event#]
        (fn [state#]
          (let [result-map# (~@body-fn state# (:body event#))]
            (assoc-in result-map# [:state :updated-at] (current-time)))))))
 
(ns grimoire.web
  (:use [slingshot.slingshot :only [try+ throw+]]
        [lamina core executor]
        [aleph http formats]
        compojure.core
        [ring.middleware
         [json-params]
         [reload]]
        [cheshire.core :only [generate-string]])
  (:require [compojure.route :as route]
            [clojure.tools.logging :as log]
            [grimoire
             [node :as node]
             [session :as session]
             [registry :as registry]
             [config :as config]
             [util :as util]])
  (:import org.codehaus.jackson.JsonParseException)
  ;; index/stats html imports
  ;; TODO move to template ns
  (:require [hiccup
             [page :refer [html5 include-js]]
             [element :refer [javascript-tag]]]))
(def ^:dynamic *aleph-stop* (atom nil))
(def error-codes
  {:invalid 400
   :not-found 404})
(defn json-response [data & [status]]
  {:status (or status 200)
   :headers {"Content-Type" "application/json"}
   :body (generate-string (or data {}))})
(defn- encode-event-data
  [data]
  (str "data:" (generate-string data) "\n\n"))
(defn respond
  [response-channel data]
  (enqueue response-channel (json-response data)))
(defn wrap-bounce-favicon [handler]
  (log/info "bouncing favicon request")
  (fn [req]
    (if (= [:get "/favicon.ico"] [(:request-method req) (:uri req)])
      {:status 404
       :headers {}
       :body }
      (handler req))))
(defn wrap-error-handling [handler]
  (log/info "entering error-handling" handler)
  (fn [req]
    (try+
     (or (handler req)
         (json-response {"error" "resource not found"} 404))
     (catch JsonParseException e
       (json-response {"error" "malformed json"} 400))
     (catch [:type :grimoire] {:keys [message args]}
       (log/error message args)
       (json-response {"error" message}))
     (catch Exception e
       (let [message (.getMessage ^Exception e)]
         (log/error "wrap-error-handling" e message)
         (json-response {"error" message})))
     (catch Object e
       (let [{:keys [type message]} (meta e)]
         (log/error "wrap-error-handling" type message e)
         (json-response {"error" message} (error-codes type)))))))
(defn- run-clojurescript [path init]
  (list
   (javascript-tag "var CLOSURE_NO_DEPS = true;")
   (include-js path)
   (javascript-tag init)))
(def page-title "Grimoire")
(defn index
  [user-id]
  (html5
   [:head
    [:title (page-title)]]
   [:body
    [:div {:id "content"} ]
    (run-clojurescript
     "/js/main.js"
     (format "grimoire_client.core.init(%s)" user-id))]))
(defn stats-index
  []
  (html5
   [:head
    [:title (page-title)]]
   [:body
    ;; http://goo.gl/QrXDs
    (javascript-tag "var source = new EventSource('/stats-events');")
    (javascript-tag "source.addEventListener('message', function(event) { console.log(JSON.parse(event.data)) })")]))
(defn- pipeline-error-handler
  [ch error]
  (let [message (.getMessage ^Exception error)]
    (log/error "pipeline" message)
    (respond ch {"error" message})))

Use lamina.executor/task's for potentially long-running operations in the pipeline. The server handler will return immediatle to it's pool and aleph/lamina will take care of responding to the client at some later point.

(defmacro defpipeline
  [name & tasks]
  `(defn ~name
     []
     (wrap-aleph-handler
      (fn [channel# request#]
        (run-pipeline request#
                      {:error-handler (partial pipeline-error-handler channel#)}
                      ~@tasks
                      (fn [response#]
                        (respond channel# response#)))))))
(defpipeline setup-handler
  #(task
    (let [{{:keys [user-id]} :route-params} %]
      (session/setup (read-string user-id)))))
(defpipeline where-is-handler
  #(task
    (let [{{:keys [user-id]} :route-params} %]
      {:node (registry/get-location (read-string user-id))})))

FIXME should guard for exception (java.lang.NumberFormatException)

(defn action-handler
  [response-channel request]
  (let [chunk-channel (map* generate-string (channel))
        {{:keys [user-id action verb]} :route-params} request
        game-event {:action (keyword action)
                    :verb (keyword verb)
                    :body (decode-json (:body request))}]
    (session/enqueue-game-event (read-string user-id) game-event chunk-channel)
    (enqueue response-channel
             {:status 200
              :headers {"Content-Type" "application/json"}
              :body chunk-channel})))
(defn- respond-chunked
  [response-channel event-channel]
  (enqueue response-channel
           {:status 200
            :headers {"Content-Type" "text/event-stream"}
            :body (map* encode-event-data event-channel)}))
(defn session-events-handler
  [response-channel request]
  (let [{{:keys [user-id]} :route-params} request
        event-channel (session/get-event-channel (read-string user-id))]
    (respond-chunked response-channel event-channel)))
(defn stats-events-handler
  [response-channel request]
  (let [event-channel (channel)]
    (node/register-stats-channel event-channel)
    (respond-chunked response-channel event-channel)))

TODO move to bench/test namespace

(defn bench-handler
  [response-channel request]
  (let [event-channel (map* generate-string (channel))
        user-id (Integer/parseInt "123")]
    (session/enqueue-game-event user-id
                                {:action (keyword "map")
                                 :verb (keyword "add")
                                 :body {:id 1 :x (rand-int 100) :y (rand-int 100)}}
                                :response-channel event-channel)
    (enqueue response-channel
             {:status 200
              :headers {"Content-Type" "application/json"}
              :body event-channel})))

TODO move to bench/test namespace

(defn noop-handler
  []
  {:status 200
   :headers {}
   :body nil})

:level (level/update action) :map (map/update action)

(def handlers
  (routes
   (GET "/" [] (index 123))
   (GET "/:user-id/where-is" [] (where-is-handler))
   (GET "/:user-id/events" [] (wrap-aleph-handler session-events-handler))
   (POST "/:user-id/:action/:verb" [] (wrap-aleph-handler action-handler))
   (GET "/:user-id/setup" [] (setup-handler))
   (GET "/stats-events" [] (wrap-aleph-handler stats-events-handler))
   (GET "/stats" [] (stats-index))
   (GET "/bench" [] (wrap-aleph-handler bench-handler))
   (GET "/noop" [] (noop-handler))
   (route/resources "/")
   (route/not-found "Page not found")))
(def application
  (-> handlers
      wrap-bounce-favicon
      wrap-json-params
      wrap-error-handling))
(defn start
  [port]
  (let [wrapped-handler (wrap-ring-handler application)
        aleph-stop-fn (start-http-server wrapped-handler  {:port port})]
    (reset! *aleph-stop* aleph-stop-fn)))
(defn stop
  []
  (@*aleph-stop*))