Source code for otree_redwood.models

from channels import Group as ChannelGroup
from contextlib import contextmanager
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.utils import timezone
from django.db import models
import json
from jsonfield import JSONField
import logging
import otree.common_internal
from otree import matching
from otree.models import BaseGroup
from otree.views.abstract import get_redis_lock
from otree_redwood.stats import track 
from otree_redwood.utils import DiscreteEventEmitter
import redis_lock
import threading
import time


logger = logging.getLogger(__name__)


[docs]class Event(models.Model): """Event stores a single message going in or out across a WebSocket connection.""" class Meta: # Default to queries returning most recent Event first. ordering = ['timestamp'] timestamp = models.DateTimeField(null=False) """Time the event was received or sent by the server.""" content_type = models.ForeignKey(ContentType, related_name='content_type_events') """Used to relate this Event to an arbitrary Group.""" group_pk = models.PositiveIntegerField() """Primary key of the Event's related Group.""" group = GenericForeignKey('content_type', 'group_pk') """The Group the event was sent to/from.""" channel = models.CharField(max_length=100, null=False) """Channels act as tags to route Events.""" participant = models.ForeignKey( 'otree.Participant', related_name='+', null=True) """The Participant who sent the event - null for server-sent events.""" value = JSONField() """Arbitrary Event payload.""" @property def message(self): """Dictionary representation of the Event appropriate for JSON-encoding.""" return { 'timestamp': time.mktime(self.timestamp.timetuple())*1e3 + self.timestamp.microsecond/1e3, 'group': self.group_pk, 'participant': None if not self.participant else self.participant.code, 'channel': self.channel, 'value': self.value }
[docs] def save(self, *args, **kwargs): """Saving an Event automatically sets the timestamp if not already set.""" if self.timestamp is None: self.timestamp = timezone.now() super().save(*args, **kwargs)
[docs]class Connection(models.Model): """Connections are created and deleted as Participants connect to a WebSocket.""" participant = models.ForeignKey( 'otree.Participant', related_name='+', null=True) """Each Participant should have only one connection."""
[docs]class Group(BaseGroup): """Group is designed to be used instead of the oTree BaseGroup to provide Redwood-specific functions for coordinating inter-page communication for players in the Group. """
[docs] class Meta: abstract = True
ran_ready_function = models.DateTimeField(null=True) """Set when the :meth:`when_all_players_ready` function has been run. Ensures run-only-once semantics. """
[docs] def period_length(self): """Implement this to set a timeout on the page. A message will be sent on the period_start when all players in the group have connected their websockets. Another message will be send on the period_end channel period_length seconds from the period_start message. """ return None
[docs] def when_all_players_ready(self): """Implement this to perform an action for the group once all players are ready."""
[docs] def when_player_disconnects(self, player): """Implement this to perform an action when a player disconnects."""
def _on_connect(self, participant): """Called from the WebSocket consumer. Checks if all players in the group have connected; runs :meth:`when_all_players_ready` once all connections are established. """ lock = get_redis_lock() if not lock: lock = fake_lock() with lock: self.refresh_from_db() if self.ran_ready_function: return for player in self.get_players(): if Connection.objects.filter(participant__code=player.participant.code).count() == 0: return self.when_all_players_ready() self.ran_ready_function = timezone.now() self.save() self.send('state', 'period_start') if self.period_length(): # TODO: Should replace this with something like Huey/Celery so it'll survive a server restart. self._timer = threading.Timer( self.period_length(), lambda: self.send('state', 'period_end')) self._timer.start() def _on_disconnect(self, participant): """Trigger the :meth:`when_player_disconnects` callback.""" player = None for p in self.get_players(): if p.participant == participant: player = p break self.when_player_disconnects(player)
[docs] def send(self, channel, payload): """Send a message with the given payload on the given channel. Messages are broadcast to all players in the group. """ with track('send_channel=' + channel): with track('create event'): Event.objects.create( group=self, channel=channel, value=payload) ChannelGroup(str(self.pk)).send( {'text': json.dumps({ 'channel': channel, 'payload': payload })})
[docs] def save(self, *args, **kwargs): """ BUG: Django save-the-change, which all oTree models inherit from, doesn't recognize changes to JSONField properties. So saving the model won't trigger a database save. This is a hack, but fixes it so any JSONFields get updated every save. oTree uses a forked version of save-the-change so a good alternative might be to fix that to recognize JSONFields (diff them at save time, maybe?). """ super().save(*args, **kwargs) if self.pk is not None: update_fields = kwargs.get('update_fields') json_fields = {} for field in self._meta.get_fields(): if isinstance(field, JSONField) and (update_fields is None or field.attname in update_fields): json_fields[field.attname] = getattr(self, field.attname) self.__class__._default_manager.filter(pk=self.pk).update(**json_fields)
[docs]@contextmanager def fake_lock(): logger.warning('using fake lock - install redis in production') yield logger.warning('exiting fake lock - install redis in production')
[docs]class DecisionGroup(Group): """DecisionGroup receives Events on the ``decisions`` channel, then broadcasts them back to all members of the group on the ``group_decisions`` channel. """
[docs] class Meta: abstract = True
group_decisions = JSONField(null=True) """:attr:`group_decisions` holds a map from participant code to their current decision.""" subperiod_group_decisions = JSONField(null=True) """:attr:`subperiod_group_decisions` is a copy of the state of :attr:`group_decisions` at the end of each subperiod.""" _group_decisions_updated = models.BooleanField(default=False) """:attr:`_group_decisions_updated` is a private field used with rate limiting to determine whether group decisions need to be resent."""
[docs] def num_subperiods(self): """Override to turn on sub-period behavior. None by default.""" return None
[docs] def rate_limit(self): """Override to turn on rate-limiting behavior. If used, the return value of rate_limit determines the minimum time between broadcasted ::attr::`group_decisions` updates.""" return None
[docs] def when_all_players_ready(self): """Initializes decisions based on ``player.initial_decision()``. If :attr:`num_subperiods` is set, starts a timed task to run the sub-periods. """ self.group_decisions = {} self.subperiod_group_decisions = {} for player in self.get_players(): self.group_decisions[player.participant.code] = player.initial_decision() self.subperiod_group_decisions[player.participant.code] = player.initial_decision() if self.num_subperiods(): emitter = DiscreteEventEmitter( self.period_length() / self.num_subperiods(), self.period_length(), self, self._subperiod_tick) emitter.start() elif self.rate_limit(): def _tick(current_interval, intervals): self.refresh_from_db() if self._group_decisions_updated: self.send('group_decisions', self.group_decisions) self._group_decisions_updated = False self.save(update_fields=['_group_decisions_updated']) update_period = self.rate_limit() emitter = DiscreteEventEmitter( update_period, self.period_length(), self, _tick) emitter.start() self.save()
def _subperiod_tick(self, current_interval, intervals): """Tick each sub-period, copying group_decisions to subperiod_group_decisions.""" self.refresh_from_db() for key, value in self.group_decisions.items(): self.subperiod_group_decisions[key] = value self.send('group_decisions', self.subperiod_group_decisions) self.save(update_fields=['subperiod_group_decisions']) def _on_decisions_event(self, event=None, **kwargs): """Called when an Event is received on the decisions channel. Saves the value in group_decisions. If num_subperiods is None, immediately broadcasts the event back out on the group_decisions channel. """ if not self.ran_ready_function: logger.warning('ignoring decision from {} before when_all_players_ready: {}'.format(event.participant.code, event.value)) return with track('_on_decisions_event'): self.group_decisions[event.participant.code] = event.value self._group_decisions_updated = True self.save(update_fields=['group_decisions', '_group_decisions_updated']) if not self.num_subperiods() and not self.rate_limit(): self.send('group_decisions', self.group_decisions)