grimoire0.1.0-SNAPSHOTdependencies
dev dependencies
| (this space intentionally left almost blank) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.cluster (:refer-clojure :exclude [join]) (:require [grimoire [node :as node] [protocol :as protocol]] [zookeeper :as zk] [zookeeper.data :as data] [zookeeper.logger :as logger] [clojure.tools.logging :as log])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
publish node-state to zookeeper use zk to switch nodes on and off and control amount of traffic that goes to a distinct node | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def group-name "/cluster") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def ^:dynamic *client* nil) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-local-ip [] (.getHostAddress (java.net.InetAddress/getLocalHost))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn serialize [node-data] (data/to-bytes (pr-str node-data))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn deserialize [node-data] (read-string (data/to-string node-data))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-node-path [node-name] (str group-name "/" node-name)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-node-data [node-name] (let [response (zk/data *client* (get-node-path node-name))] (deserialize (:data response)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-remote-ip [node-name] (let [response (zk/exists *client* (get-node-path node-name))] (when-not (nil? response) (:host (deserialize (:data response)))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private create-group [] (when-not (zk/exists *client* group-name) (zk/create *client* group-name :persistent? true))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private node-watcher [node-name event] (log/info "node-watcher" node-name event) (if (= (:event-type event) :NodeDeleted) (node/disconnect node-name) (do (log/info "node-watcher" (:path event)) (zk/exists *client* (:path event) :watcher (partial node-watcher node-name))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-remote-nodes [] (filter #(not (= (node/get-node-name) %)) (zk/children *client* group-name))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private group-watcher [event] (when (= (:event-type event) :NodeChildrenChanged) (log/info "group-watcher" event) (let [all-nodes (zk/children *client* group-name :watcher group-watcher) connections (node/get-connections) remote-nodes (->> all-nodes (filter #(not (= (node/get-node-name) %))) (filter #(nil? (connections %))))] (log/info "group-watcher" all-nodes remote-nodes) (doseq [node-name remote-nodes] (let [node-data (get-node-data node-name)] (log/info "group-watcher" node-name node-data) (zk/exists *client* (get-node-path node-name) :watcher (partial node-watcher node-name)) (node/connect node-name node-data)))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn join [local-port] (let [ip (get-local-ip) ;; needs refactoring ;; inet-socket-address (str ip ":" local-port) node-name (node/get-node-name) node-data (serialize {:host ip :port local-port :node-name node-name})] (log/info "join-cluster" ip local-port node-name) (zk/create *client* (get-node-path node-name) :data node-data)) (zk/children *client* group-name :watcher group-watcher)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn start [{:keys [zk-host zk-port local-host local-port]}] (log/info "start" zk-host zk-port local-host local-port) (let [client (zk/connect (str zk-host ":" zk-port))] (alter-var-root #'*client* (fn [_] client))) (create-group) (join local-port)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.config) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defonce system {:save-interval 10000 :session-timeout 60000 :session-version 1}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defonce facebook {:app-id :app-secret :app-name :api-key :canvas-url }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defonce zookeeper [{:port 2181 :host "127.0.0.1"} {:port 2181 :host "127.0.0.1"} {:port 2181 :host "127.0.0.1"}]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defonce aws {:access-key "033G2TPHEBTWC360RRG2" :secret-key "mCfw/OxodR/YDyC6qINI1VSYuw0dn1+XoGMRMDg5" :bucket "grimoire-dev"}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
default options | (defonce db {:host "localhost" :port 6379}) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
default options | (defonce canvas-server {:host "localhost" :port 3000}) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
default options | (defonce api-server {:host "localhost" :port 4000}) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.core (:gen-class) (:use clojure.tools.cli) ;; (:use [clojure.tools.nrepl.server :only [start-server stop-server]]) (:require [grimoire [config :as config] [web :as web] [cluster :as cluster] [node :as node] [registry :as registry] [landing :as landing] [session :as session]] [clojure.tools.logging :as log])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(set! *warn-on-reflection* true) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defonce nrepl-server (start-server :port 7888)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn run-test [users] (dotimes [user-id users] (session/setup user-id) (when (= (rem user-id 1000) 0) ;; (log/info (str "done" user-id "so far"))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn start [options] (println "Starting server...") (node/start-server (:cluster-port options) (:node-id options)) (cluster/start {:zk-host "127.0.0.1" :zk-port 2181 :local-port (:cluster-port options)}) (registry/start "127.0.0.1" 2181) (web/start (:api-port options)) ;; (landing/start (:canvas-port options)) (println (format "'%s' waiting for work, work!" (node/get-node-name)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn stop [] (println "Preparing exit...") ;; FIXME server states are missing (shutdown-agents) (web/stop) (println "All agents are dead.\nGoodbye.")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn at-exit [runnable] (.addShutdownHook (Runtime/getRuntime) (Thread. ^Runnable runnable))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn -main [& args] (let [[options args banner] (cli args ["-h" "--help" "Show help" :default false :flag true] ["-i" "--node-id" "Node ID for cross-node addressing" :default 0 :parse-fn #(Integer/parseInt %)] ["-n" "--cluster-port" "Local cluster listening port" :default 7000 :parse-fn #(Integer/parseInt %)] ["-a" "--api-port" "Client facing API listening port" :default (config/api-server :port) :parse-fn #(Integer/parseInt %)] ["-c" "--canvas-port" "Landing page listening port" :default (config/canvas-server :port) :parse-fn #(Integer/parseInt %)])] (when (:help options) (println banner) (System/exit 0)) (start options)) (at-exit stop)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.game (:require [clojure.tools.logging :as log] [grimoire [level :as level] [map :as map]])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-update-function [event] (case (:action event) :level (level/update event) :map (map/update event) (throw (Exception. "no_update_function_found, args=[%s]" event)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.landing (:use [ring.adapter.jetty :only [run-jetty]] [compojure.core] [clojure.tools.logging :only [info error]]) (:require [compojure.route :as route] [hiccup [page :refer [html5 include-js]] [element :refer [javascript-tag]]] [grimoire.config :as config])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def ^:dynamic *jetty-server* (atom nil)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- run-clojurescript [path init] (list (javascript-tag "var CLOSURE_NO_DEPS = true;") (include-js path) (javascript-tag init))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn index [] (html5 [:head [:title "grimoire"]] [:body [:div {:id "content"} "Hello World"] (run-clojurescript "/js/main.js" "grimoire_client.core.say_hello()")])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defroutes application (GET "/" [] (index)) (route/resources "/") (route/not-found "Page not found")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn start [port] (let [jetty-server (run-jetty #'application {:port (or port 8080) :join? false})] (reset! *jetty-server* jetty-server)) (info "Canvas server running on port " port)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn stop [] (.stop @*jetty-server*) (info "Stopped canvas server")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.level (:require [clojure.tools.logging :as log]) (:use grimoire.util [lamina core executor])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def example-levels {1 {:required-points 0 :reward 0} 2 {:required-points 10 :reward 5} 3 {:required-points 20 :reward 5} 4 {:required-points 30 :reward 5} 5 {:required-points 40 :reward 5}}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defmulti update :verb) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defupdate update :complete (fn [state {:keys [level]}] {:pre (number? level)} (let [level-config (example-levels level)] (if (>= (:points state) (:required-points level-config)) (-> state (update-in [:balance] + (:reward level-config)) to-result-map) (throw (Exception. (format "insufficient_points_for_level_complete, args=[%s, %s]" (:id state) level))))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.map (:require [clojure.tools.logging :as log]) (:use [lamina core executor] [grimoire.util] [cheshire.core :only [generate-string parse-string]])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def example-entities {1 {:name "entity 1" :width 1 :height 1 :cost 5} 2 {:name "entity 2" :width 1 :height 1 :cost 7} 3 {:name "entity 3" :width 1 :height 1 :cost 4}}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def example-contracts {1 {:name "contract 1" :duration 2 :reward 5} 2 {:name "contract 2" :duration 4 :reward 22} 3 {:name "contract 3" :duration 6 :reward 30}}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def default-width 10) (def default-height 10) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private m-keyword ([x y] (m-keyword [x y])) ([[x y]] (keyword (str "x" x "y" y)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn generate-empty-map ;; Creates an empty map of the structure {:x1y1 nil :x1y2 nil} [width height] (let [m (reduce #(assoc! %1 (m-keyword %2) nil) (transient {}) (mapcat (fn [x] (for [y (range width)] [x y])) (range height)))] (persistent! m))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def empty-map (memoize generate-empty-map)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defmulti update :verb) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defupdate update :add (fn [state {:keys [id x y]}] ;; (log/info "add" (Thread/currentThread)) ;; (Thread/sleep 10000) (let [entity-config (example-entities id) coord-key (m-keyword x y) entity (get-in state [:map coord-key])] (if (nil? entity) (-> state (assoc-in [:map coord-key] {:id id}) (update-in [:balance] - (:cost entity-config)) to-result-map) (throw (Exception. (format "map_position_not_empty, args=[%s,%s]" x y))))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defupdate update :start-contract (fn [state {:keys [id x y]}] (let [coord-key (m-keyword x y) entity (get-in state [:map coord-key])] (if (and (not (nil? entity)) (nil? (:contract-start entity))) (-> state (update-in [:map coord-key] merge {:contract-start (System/currentTimeMillis) :contract-id id}) to-result-map) (throw (Exception. (format "start_contract_failed, args=[%s, %s]" (:id state) entity))))))) (defn ^:private collectable? [entity] {:pre [(has-keys? entity :contract-start :contract-id)]} (let [contract-config (example-contracts (:contract-id entity)) duration (:duration contract-config)] (> (System/currentTimeMillis) (+ duration (:contract-start entity))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defupdate update :collect-contract (fn [state {:keys [x y]}] (let [coord-key (m-keyword x y) entity (get-in state [:map coord-key])] (if (collectable? entity) (-> state (assoc-in [:map coord-key] (dissoc entity :contract-start :contract-id)) (update-in [:balance] - (:reward (example-contracts (:contract-id entity)))) to-result-map) (throw (Exception. (format "collect_contract_failed, args=[%s, %s]" (:id state) entity))))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.node (:use lamina.stats [lamina core executor]) (:require [clojure.tools.logging :as log] [aleph.tcp :as aleph] [gloss.core :as gloss] [grimoire.protocol :as protocol]) (:import java.util.Arrays java.util.zip.ZipException)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def ^:dynamic *server-close-fn* nil) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defonce node-prefix "grim-") (def local-node-id) (def local-node-name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def inbound-channel (permanent-channel)) (def stats-channel (permanent-channel)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def connections (atom {})) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-node-id [] local-node-id) (defn get-node-name [] local-node-name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-connections [] @connections) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn register-stats-channel [event-channel] (log/info "register-stats-channel" event-channel) (siphon (fork stats-channel) event-channel)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn publish-event [node-name message] (when-not (nil? (@connections node-name)) (let [remote-channel (:channel (@connections node-name))] (when-not (nil? remote-channel) ;; TODO move encoding/decoding to separate namespace (enqueue remote-channel (protocol/encode message)))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private decode-frame [msg-bytes] (let [msg-buffer (gloss.io/contiguous msg-bytes) msg-byte-array (byte-array (.remaining msg-buffer))] (.get msg-buffer msg-byte-array) (protocol/decode msg-byte-array))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private client-success-callback [node-name client-channel] (log/info "on-success-callback" node-name client-channel) ;; TODO make this (:status) less explicit and actually get state from channel (swap! connections assoc node-name {:status :connected :channel client-channel}) (log/info "client-success-callback" (@connections node-name)) ;; FIXME use pipeline and handle errors ;; synchronously wait for reply and move forward from there on (let [msg {:type :node-connection-start :content {:node-name node-name}}] (enqueue client-channel (protocol/encode msg))) (siphon (map* #(decode-frame %) client-channel) inbound-channel)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private client-error-callback [node-name error] (log/info "on-error-callback" node-name error)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn start-client [host port node-name] (let [options {:host host :port port :frame (gloss/finite-block :int32)} client-result-channel (aleph/tcp-client options)] (on-realized client-result-channel (partial client-success-callback node-name) (partial client-error-callback node-name)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TODO actually implement reconnection logic don't forget back-off | (defn ^:private reconnect [] (log/info "reconnect-client")) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn disconnect [node-name] (log/info "disconnect") (let [remote-connection (@connections node-name)] (try (close (:channel remote-connection)) (catch Exception exception (log/error "disconnect" (.getMessage exception)))) (swap! connections dissoc node-name))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn connect [node-name node-data] (log/info "connect" node-name node-data) (when (nil? (@connections node-name)) (swap! connections assoc-in [node-name :status] :connecting) (start-client (:host node-data) (:port node-data) node-name))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn server-handler [connection-channel client-info] (log/info "server-handler" client-info (:address client-info)) ;; TODO map bare rate messages to more descriptive hashmaps ;; (siphon (rate connection-channel) stats-channel) ;; FIXME very brittle; fix (run-pipeline connection-channel read-channel (fn [message] (let [node-connection-start (decode-frame message) remote-node-name (get-in node-connection-start [:event :node-connection-start :node-name])] (swap! connections assoc remote-node-name {:status :connected :channel connection-channel}) (siphon (map* #(decode-frame %) connection-channel) inbound-channel))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private create-distributor [] (distributor :user-id (fn [facet facet-channel] (log/info "distributor" facet facet-channel) ;; here's hoping to named-channel's idempotency (let [user-channel (named-channel (keyword (str facet)) (fn [_]))] (log/info "distributor" user-channel) (close-on-idle 5000 facet-channel) (close-on-idle 5000 user-channel) (ground user-channel) (siphon facet-channel user-channel))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn start-server [^Integer port ^Integer node-id] (log/info "start-server" port node-id) (alter-var-root #'local-node-id (fn [_] node-id)) (alter-var-root #'local-node-name (fn [_] (str "grim-" node-id))) ;; (receive-all inbound-channel #(log/info "inbound-channel received" %)) (siphon inbound-channel (create-distributor)) ;; If a tree falls in a forest and no one is around to hear it, does it make a sound? (ground stats-channel) (let [options {:port port :frame (gloss/finite-block :int32)} server (aleph/start-tcp-server server-handler options)] (alter-var-root #'*server-close-fn* (fn [_] server)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn stop-server [] (*server-close-fn*) (log/info "stop-server")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.protocol (:refer clojure.string :only [upper-case]) (:require [clojure.tools.logging :as log]) (:import Nodemessages$Event Nodemessages$Foo Nodemessages$Bar Nodemessages$NodeConnectionStart) (:use protobuf.core)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def Event (protodef Nodemessages$Event)) (def Foo (protodef Nodemessages$Foo)) (def Bar (protodef Nodemessages$Bar)) (def NodeConnectionStart (protodef Nodemessages$NodeConnectionStart)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def protobuf-content-types {:foo Foo :bar Bar :node-connection-start NodeConnectionStart}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private get-protobuf-type [type] (if-let [the-type (type protobuf-content-types)] the-type (throw (Exception. (format "protobuf_type_does_not_exist, args=[%s]" type))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn decode [msg-bytes] (let [event (protobuf-load Event msg-bytes) content ((:type event) event)] {:user-id (:receiver content) :event event})) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn encode [message] (let [type (:type message) content (flatten (seq (:content message))) protobuf-content-type (get-protobuf-type type) protobuf-content (apply protobuf protobuf-content-type content) event-type (upper-case (name type)) protobuf-msg (protobuf Event :type event-type type protobuf-content)] (protobuf-dump protobuf-msg))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.registry (:require [clojure.tools.logging :as log] [grimoire [config :as config] [node :as node] [cluster :as cluster]] [zookeeper :as zk] [zookeeper.data :as zk-data] [zookeeper.util :as zk-util])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defonce registry-group-name "/registry") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def ^:dynamic *client* nil) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private to-zk-location [user-id & z-nodes] (str registry-group-name "/" user-id (clojure.string/join "/" z-nodes))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn register [user-id valid-until cluster-node] ;; (log/info "register" user-id valid-until cluster-node) (let [user-znode (to-zk-location user-id) lock-znode (str user-znode "/_lock-")] (let [create-response (zk/create-all *client* lock-znode :persistent? true :sequential? true) create-id (zk-util/extract-id create-response) user-node-response (zk/exists *client* user-znode)] (if (= 0 create-id) (zk/set-data *client* user-znode (cluster/serialize {:valid-until valid-until :node cluster-node}) (:version user-node-response)) (throw (Exception. (format "session_already_registered, args=[%s]" user-id))))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn deregister [user-id] (log/info "deregister" user-id (to-zk-location user-id)) (zk/delete-all *client* (to-zk-location user-id))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn extend-timeout [user-id valid-until] (let [zk-location (to-zk-location user-id) response (zk/exists *client* zk-location) data (cluster/deserialize (:data response)) new-data (assoc-in data :valid-until valid-until)] (zk/set-data *client* zk-location (cluster/serialize new-data) (:version response)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn deregister-all [] (log/info "delete all registered users") (zk/delete-all *client* registry-group-name)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-node-data [user-id] (let [response (zk/data *client* (to-zk-location user-id))] (cluster/deserialize (:data response)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-location [user-id] (:node (get-node-data user-id))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn local? [user-id] (let [user-node-name (:node (get-node-data user-id))] (= user-node-name (node/get-node-name)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn registered? [user-id] (not (nil? (get-node-data user-id)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private create-registry-group [] (when (nil? (zk/exists *client* registry-group-name)) (zk/create *client* registry-group-name :persistent? true))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TODO check for dev environment | (defn ^:private dev-clean [] (let [remote-nodes (cluster/get-remote-nodes)] (when (empty? remote-nodes) (zk/delete-all *client* registry-group-name)))) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn start [host port] (let [client (zk/connect (str host ":" port))] (alter-var-root #'*client* (fn [_] client))) ;; developer-sanity-protection-measures (dev-clean) (create-registry-group)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn b-fn [i] (zk/create *client* (to-zk-location i))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(future (do (log/info "begin sleep") (Thread/sleep 8000) (log/info "done sleeping") (log/info "begin zk node creation") (let [t (time (dotimes [i 5000] (b-fn i)))] (log/info t)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.session (:use [lamina core executor]) (:import (org.jboss.netty.util HashedWheelTimer Timeout TimerTask) (java.util.concurrent TimeUnit)) (:require [grimoire.config :as config] [grimoire.registry :as registry] [grimoire.node :as node] [grimoire.storage :as storage] [grimoire.user :as user] [grimoire.game :as game] [grimoire.session-store :as store] [grimoire.util :as util] [clojure.tools.logging :as log])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defonce ^:private ^HashedWheelTimer hashed-wheel-timer (HashedWheelTimer.)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-expire-time [] (+ (System/currentTimeMillis) (:session-timeout config/system))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- get-request-channel [user-id] (let [session (store/get user-id)] (:request-channel @session))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- enqueue-event [user-id event] (enqueue (get-request-channel user-id) event)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- close-request-channel [user-id] (let [session (store/get user-id) request-channel (:request-channel @session)] (close request-channel))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-event-channel [user-id] (let [session (store/get user-id) event-channel (:event-channel @session)] (if (or (nil? event-channel) (closed? event-channel)) (let [new-event-channel (channel)] (swap! session assoc :event-channel new-event-channel) new-event-channel) event-channel))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{:keys [async?] :or {:async? false} :as options} | (defmacro build-pipeline [& tasks] (let [channel (gensym 'channel)] `(fn [~channel value#] (run-pipeline value# {:error-handler #(error ~channel %)} ~@(map (fn [s] `(fn [event#] (task (~s ~channel event#)))) tasks))))) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(util/log-with-thread "generic-event-stage - start sleep") (Thread/sleep 5000) (util/log-with-thread "generic-event-stage - done sleeping") | (defn- handle-game-event [{:keys [user-id payload args] :as event}] (let [session (store/get user-id) result-map (apply (game/get-update-function payload) (:state @session) args)] (swap! session assoc :state (:state result-map)) (assoc event :response (:response result-map)))) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(declare periodic-save) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- handle-system-event [{:keys [user-id payload args] :as event}] (log/info "handle-system-event" event) (let [session (store/get user-id)] (case (:action payload) :periodic-save ((partial periodic-save session) args) nil))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- handle-event [event] (case (:type event) :game (handle-game-event event) :system (handle-system-event event) event)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- process-event [ch event] (try (handle-event event) (catch Exception e (close ch) (ground ch) (log/error "pipeline_error" e) {:type :error :receiver (:receiver event) :error {:error (.getMessage e)}}))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- reply [{:keys [type receiver response error] :as event}] (case type :game (if (:close? event) (enqueue-and-close receiver response) (enqueue receiver response)) :error (enqueue-and-close receiver error) event)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- receive-in-order-with-pipeline [ch] (consume ch :channel nil :initial-value nil :reduce (fn [_ event] (run-pipeline event {:error-handler #(error ch %)} ;; watch for async tags #(process-event ch %) reply)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn enqueue-game-event [user-id action response-channel] (if-let [session (store/get user-id)] (let [receiver (or response-channel (:event-channel @session)) request-channel (:request-channel @session)] (if (closed? request-channel) (enqueue-and-close receiver {:error "request_channel_is_closed"}) (enqueue request-channel {:type :game :user-id user-id :receiver receiver :close? true :reload-on-error? true :payload action :args nil}))) (throw (Exception. (format "session_not_running, args=[%s]" user-id))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- clean-timeouts [session] (doseq [timeout-type [:session-timeout :save-timeout]] (try (.cancel ^Timeout (timeout-type @session)) (catch Exception e (log/error "clean-timeouts" (.getMessage e))) (finally (swap! session assoc timeout-type nil))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- save-to-storage [user-id state] (log/info "save-to-storage" user-id) (let [user-json (user/to-json state)] (storage/put-data user-id user-json))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- safe-close-channel [ch] (when-not (nil? ch) (try (close ch) (catch Exception e (log/error "safe-close-channel" (.getMessage e)))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- clean [user-id] (log/info "clean" user-id) (let [session (store/get user-id) channel-keys [:remote-channel :event-channel]] (doseq [channel-key channel-keys] (safe-close-channel (channel-key @session))) (clean-timeouts session) (store/remove user-id) (registry/deregister user-id)) nil) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- reset [user-id] ;; instead of resetting the session on the setup call ;; why not do it instantly?) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- should-save? [state] (let [answer (> (:updated-at state) (:saved-at state))] (log/info "should-save?" answer) answer)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- stop [user-id] (log/info "stop" user-id) (let [session (store/get user-id) state (:state @session) request-channel (:request-channel @session)] (close request-channel) (clean-timeouts session) (when (should-save? state) (try (save-to-storage user-id (assoc state :saved-at (System/currentTimeMillis))) (catch Exception e (log/error "stop" (.getMessage e) user-id)))) (clean user-id) nil)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- get-timeout [delay timeout-fn] (let [timer-task (reify org.jboss.netty.util.TimerTask (^void run [this ^Timeout timeout] (timeout-fn timeout)))] (.newTimeout hashed-wheel-timer timer-task delay (TimeUnit/MILLISECONDS)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- renew-timeout [user-id delay timeout-type timeout-fn] (log/info "renew-timeout" user-id timeout-type timeout-fn) (let [session (store/get user-id)] (.cancel ^Timeout (timeout-type @session)) (swap! session assoc timeout-type (get-timeout delay (partial timeout-fn user-id))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- session-timeout-handler [user-id ^:Timeout timeout] (log/info "session-timeout-handler" user-id timeout) (future (stop user-id))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(declare save-timeout-handler) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- periodic-save [session user-id] (log/info "periodic-save" user-id) (let [new-state (assoc (:state @session) :saved-at (System/currentTimeMillis)) save-interval (config/system :save-interval)] (save-to-storage user-id new-state) (swap! session assoc :state new-state))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- save-timeout-handler [user-id ^:Timeout timeout] (log/info "save-timeout-handler") (let [state (:state @(store/get user-id)) save-interval (config/system :save-interval)] (renew-timeout user-id save-interval :save-timeout save-timeout-handler) (when (should-save? state) (enqueue-event user-id {:type :system :user-id user-id :reload-on-error false :payload {:action :periodic-save} :args [user-id]})))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn handle-remote-message [msg] (log/info "handle-remote-message" msg)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- start [user-id state] (log/info "start") (let [remote-channel (named-channel (keyword (str user-id)) nil) request-channel (channel) session-timeout (get-timeout (config/system :session-timeout) (partial session-timeout-handler user-id)) save-timeout (get-timeout (config/system :save-interval) (partial save-timeout-handler user-id)) session (atom {:state state :request-channel request-channel :event-channel nil :remote-channel remote-channel :session-timeout session-timeout :save-timeout save-timeout})] (receive-in-order-with-pipeline request-channel) (store/put user-id session) (receive-all remote-channel handle-remote-message)) nil) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- try-restart [user-id] (log/info "try-testart") (let [session (store/get user-id) request-channel (:request-channel @session)] (if (closed? request-channel) (let [new-request-channel (channel)] (receive-in-order-with-pipeline new-request-channel) (swap! session assoc :request-channel request-channel) (user/to-json (:state @session))) (throw (Exception. (format "session_still_running, args=[%s]" user-id)))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- load-user [user-id] (log/info "load-user") (let [result (storage/get-data user-id)] (if (nil? result) (user/new user-id) (user/from-json result)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- load-and-start [user-id] (log/info "load-and-start") (registry/register user-id (get-expire-time) (node/get-node-name)) (try (let [state (load-user user-id)] (start user-id state) (user/to-json state)) (catch Exception e (log/error "setup_failed" (.getMessage e) user-id) (clean user-id) (throw (Exception. (format "session_start_failed, args=[%s]" user-id)))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn setup [user-id] (log/info "setup") (if (and (store/exists? user-id) (registry/registered? user-id) (registry/local? user-id)) (try-restart user-id) (load-and-start user-id))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn run-bench [] (log/info "run-bench" (node/get-node-id)) ;; (let [runs 1000 (let [runs 10 ;; global-start (* (node/get-node-id) runs)] global-start (* 0 runs)] ;; (Thread/sleep 5000) (dotimes [j runs] (let [batch-size 100 start (+ global-start (* j batch-size)) end (+ start batch-size) the-range (range start end)] (time (doseq [i the-range] (setup i)))) (log/info "num-sessions" (store/num-sessions)) ;; (Thread/sleep 1000)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.session-store (:refer-clojure :exclude [get remove]) (:require [clojure.tools.logging :as log])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defonce ^:private store (atom {})) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn num-sessions [] (count @store)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn exists? [user-id] (not (nil? (@store user-id)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get [user-id] (@store user-id)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn put [user-id session] (if (nil? (@store user-id)) (swap! store assoc user-id session) (throw (Exception. (format "session_already_running, args=[%s]" user-id))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn remove [user-id] (swap! store dissoc user-id)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.storage (:use [slingshot.slingshot :only [try+ throw+]] [cheshire.core :only [generate-string]]) (:require [aws.sdk.s3 :as s3] [grimoire [config :as config]] [clojure.tools.logging :as log]) (:import com.amazonaws.services.s3.model.AmazonS3Exception)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def credentials {:access-key (config/aws :access-key), :secret-key (config/aws :secret-key)}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def bucket (config/aws :bucket)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private id-to-storage-key [user-id] (str user-id)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn ^:private storage-key-to-id [storage-key] (Integer/parseInt storage-key)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TODO implement | (defn ^:private clean-bucket []) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn get-data [^Integer user-id] ;; (log/info "get-data" user-id) ;; (try ;; (let [s3-object (s3/get-object credentials bucket (id-to-storage-key user-id))] ;; (slurp (:content s3-object))) ;; (catch AmazonS3Exception exception ;; (when-not (= (.getErrorCode exception) "NoSuchKey") ;; (log/error (.getMessage exception))))) nil) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn put-data [^Integer user-id ^String json] ;; (log/info "put-data" user-id) ;; (let [result (s3/put-object credentials bucket (id-to-storage-key user-id) json)] ;; (log/info "put-data" result)) nil) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.timer) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.user (:use [slingshot.slingshot :only [try+ throw+]] [cheshire.core :only [generate-string parse-string]]) (:require [grimoire [config :as config] [map :as game-map]] [clojure.tools.logging :as log])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defrecord User [^Integer id name points balance ;; inventory map updated-at created-at saved-at version]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
add session-token | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn new [user-id & name] (let [current-time (System/currentTimeMillis) current-version (:session-version config/system) default-map (game-map/empty-map 7 7)] {:id user-id :name (or name "John Doe") :points 0 :balance 0 :map default-map :updated-at current-time :created-at current-time :saved-at 0 :version current-version})) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn from-struct [{:keys [id name points balance map updated-at created-at saved-at version], :or {saved-at 0}}] ;; (log/info "from-struct" map) ;; (User. id name points balance {} updated-at created-at saved-at ;version) (apply hash-map (User. id name points balance {} updated-at created-at saved-at version))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn from-json [json] (let [struct (parse-string json true)] (from-struct struct))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn to-json [user] (generate-string user)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.util (:use [lamina core executor]) (:require [clojure.tools.logging :as log])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn current-time [] (System/currentTimeMillis)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defmacro log-with-thread [& msg] `(log/info ~@msg (Thread/currentThread))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn has-keys? [m & ks] (every? #(contains? m %) ks)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn touch [state] (assoc state :updated-at (current-time))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn to-result-map ([state] (to-result-map state {})) ([state response] {:state state :response response})) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defmacro defupdate [multi-fn dispatch-value & body-fn] `(. ~(with-meta multi-fn {:tag 'clojure.lang.MultiFn}) addMethod ~dispatch-value (fn [event#] (fn [state#] (let [result-map# (~@body-fn state# (:body event#))] (assoc-in result-map# [:state :updated-at] (current-time))))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(ns grimoire.web (:use [slingshot.slingshot :only [try+ throw+]] [lamina core executor] [aleph http formats] compojure.core [ring.middleware [json-params] [reload]] [cheshire.core :only [generate-string]]) (:require [compojure.route :as route] [clojure.tools.logging :as log] [grimoire [node :as node] [session :as session] [registry :as registry] [config :as config] [util :as util]]) (:import org.codehaus.jackson.JsonParseException) ;; index/stats html imports ;; TODO move to template ns (:require [hiccup [page :refer [html5 include-js]] [element :refer [javascript-tag]]])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def ^:dynamic *aleph-stop* (atom nil)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def error-codes {:invalid 400 :not-found 404}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn json-response [data & [status]] {:status (or status 200) :headers {"Content-Type" "application/json"} :body (generate-string (or data {}))}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- encode-event-data [data] (str "data:" (generate-string data) "\n\n")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn respond [response-channel data] (enqueue response-channel (json-response data))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn wrap-bounce-favicon [handler] (log/info "bouncing favicon request") (fn [req] (if (= [:get "/favicon.ico"] [(:request-method req) (:uri req)]) {:status 404 :headers {} :body } (handler req)))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn wrap-error-handling [handler] (log/info "entering error-handling" handler) (fn [req] (try+ (or (handler req) (json-response {"error" "resource not found"} 404)) (catch JsonParseException e (json-response {"error" "malformed json"} 400)) (catch [:type :grimoire] {:keys [message args]} (log/error message args) (json-response {"error" message})) (catch Exception e (let [message (.getMessage ^Exception e)] (log/error "wrap-error-handling" e message) (json-response {"error" message}))) (catch Object e (let [{:keys [type message]} (meta e)] (log/error "wrap-error-handling" type message e) (json-response {"error" message} (error-codes type))))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- run-clojurescript [path init] (list (javascript-tag "var CLOSURE_NO_DEPS = true;") (include-js path) (javascript-tag init))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def page-title "Grimoire") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn index [user-id] (html5 [:head [:title (page-title)]] [:body [:div {:id "content"} ] (run-clojurescript "/js/main.js" (format "grimoire_client.core.init(%s)" user-id))])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn stats-index [] (html5 [:head [:title (page-title)]] [:body ;; http://goo.gl/QrXDs (javascript-tag "var source = new EventSource('/stats-events');") (javascript-tag "source.addEventListener('message', function(event) { console.log(JSON.parse(event.data)) })")])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- pipeline-error-handler [ch error] (let [message (.getMessage ^Exception error)] (log/error "pipeline" message) (respond ch {"error" message}))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Use lamina.executor/task's for potentially long-running operations in the pipeline. The server handler will return immediatle to it's pool and aleph/lamina will take care of responding to the client at some later point. | (defmacro defpipeline [name & tasks] `(defn ~name [] (wrap-aleph-handler (fn [channel# request#] (run-pipeline request# {:error-handler (partial pipeline-error-handler channel#)} ~@tasks (fn [response#] (respond channel# response#))))))) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defpipeline setup-handler #(task (let [{{:keys [user-id]} :route-params} %] (session/setup (read-string user-id))))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defpipeline where-is-handler #(task (let [{{:keys [user-id]} :route-params} %] {:node (registry/get-location (read-string user-id))}))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FIXME should guard for exception (java.lang.NumberFormatException) | (defn action-handler [response-channel request] (let [chunk-channel (map* generate-string (channel)) {{:keys [user-id action verb]} :route-params} request game-event {:action (keyword action) :verb (keyword verb) :body (decode-json (:body request))}] (session/enqueue-game-event (read-string user-id) game-event chunk-channel) (enqueue response-channel {:status 200 :headers {"Content-Type" "application/json"} :body chunk-channel}))) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn- respond-chunked [response-channel event-channel] (enqueue response-channel {:status 200 :headers {"Content-Type" "text/event-stream"} :body (map* encode-event-data event-channel)})) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn session-events-handler [response-channel request] (let [{{:keys [user-id]} :route-params} request event-channel (session/get-event-channel (read-string user-id))] (respond-chunked response-channel event-channel))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn stats-events-handler [response-channel request] (let [event-channel (channel)] (node/register-stats-channel event-channel) (respond-chunked response-channel event-channel))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TODO move to bench/test namespace | (defn bench-handler [response-channel request] (let [event-channel (map* generate-string (channel)) user-id (Integer/parseInt "123")] (session/enqueue-game-event user-id {:action (keyword "map") :verb (keyword "add") :body {:id 1 :x (rand-int 100) :y (rand-int 100)}} :response-channel event-channel) (enqueue response-channel {:status 200 :headers {"Content-Type" "application/json"} :body event-channel}))) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TODO move to bench/test namespace | (defn noop-handler [] {:status 200 :headers {} :body nil}) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
:level (level/update action) :map (map/update action) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def handlers (routes (GET "/" [] (index 123)) (GET "/:user-id/where-is" [] (where-is-handler)) (GET "/:user-id/events" [] (wrap-aleph-handler session-events-handler)) (POST "/:user-id/:action/:verb" [] (wrap-aleph-handler action-handler)) (GET "/:user-id/setup" [] (setup-handler)) (GET "/stats-events" [] (wrap-aleph-handler stats-events-handler)) (GET "/stats" [] (stats-index)) (GET "/bench" [] (wrap-aleph-handler bench-handler)) (GET "/noop" [] (noop-handler)) (route/resources "/") (route/not-found "Page not found"))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(def application (-> handlers wrap-bounce-favicon wrap-json-params wrap-error-handling)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn start [port] (let [wrapped-handler (wrap-ring-handler application) aleph-stop-fn (start-http-server wrapped-handler {:port port})] (reset! *aleph-stop* aleph-stop-fn))) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(defn stop [] (@*aleph-stop*)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||