Source code for fuckery.vm

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
pyFuckery - vm.py
Created on 2/12/17.

VM Definition to execute brainfuck programs which have been parsed into lark.Tree objects.
"""
# Stdlib
import argparse
import hashlib
import logging
import sys

# Custom Code
from fuckery.constants import DEFAULT_MEMORY_SIZE
from fuckery.constants import SYM_ACTIONS
from fuckery.constants import SYM_DATA_DEC
from fuckery.constants import SYM_DATA_INC
from fuckery.constants import SYM_EXPRESSIONS
from fuckery.constants import SYM_IO_INPUT
from fuckery.constants import SYM_IO_OUTPUT
from fuckery.constants import SYM_LOOP
from fuckery.constants import SYM_PROGRAM
from fuckery.constants import SYM_PTR_DEC
from fuckery.constants import SYM_PTR_INC
from fuckery.exc import ExitCondition
from fuckery.exc import VMError
from fuckery.memory import Storage
from fuckery.parser import parse_program
# Third Party Code
from lark import Tree

log = logging.getLogger(__name__)


[docs]class VirtualMachine(object): """ This is the brainfuck VM. You can drop this into programs that need a brainfuck VM, such as a module you don't want a coworker to ever easily maintain, or a really cruel programming based game. """ def __init__(self, memory_size: int = DEFAULT_MEMORY_SIZE, loop_detection: bool = False): """ Init function for the VM. :param memory_size: Number of memory cells to instantiate. :param loop_detection: Enables loop detection if this evaluates to True. This is very costly from a computation perspective, so use it wisely. """ self.data_pointer = 0 self._memory_size = memory_size self.memory = Storage(n=memory_size) self.loop_detection = loop_detection self.stream_in = sys.stdin self.stream_out = sys.stdout self.sym2func = { SYM_DATA_DEC: self.dec_data_value, SYM_DATA_INC: self.inc_data_value, SYM_IO_INPUT: self.io_input, SYM_IO_OUTPUT: self.io_output, SYM_PTR_DEC: self.dec_data_ptr, SYM_PTR_INC: self.inc_data_ptr, } @property def current_value(self) -> int: """ Property which represents the data value the current memory address points too. :return: """ return self.memory.get(self.data_pointer) @property def state_hash(self) -> str: """ MD5 representing the state of the system. It is a hash of the memory and the current data pointer. Note - Computing this frequently can be expensive to do with a large memory section, as the memory section is serialized via msgpack.dumps() prior to hashing. :return: """ s = self.memory.mem_hash + str(self.data_pointer) ret = hashlib.md5(s.encode()).hexdigest() return ret
[docs] def inc_data_ptr(self) -> None: """ Increments the data pointer by 1. :return: None """ temp = self.data_pointer + 1 if temp not in self.memory: raise VMError(f'Invalid memory address: {temp}') self.data_pointer = temp
[docs] def dec_data_ptr(self) -> None: """ Decrements the data pointer by 1. :return: None """ temp = self.data_pointer - 1 if temp not in self.memory: raise VMError(f'Invalid memory address: {temp}') self.data_pointer = temp
[docs] def inc_data_value(self) -> None: """ Increments the value pointed to by the data pointer by 1. This wraps at 255, back to zero. :return: None """ v = self.memory.get(self.data_pointer) v = (v + 1) % 256 self.memory.set(addr=self.data_pointer, value=v)
[docs] def dec_data_value(self) -> None: """ Decrements the value pointed to by the data pointer by 1. This wraps at zero, back to 255. :return: None """ v = self.memory.get(self.data_pointer) v = (v - 1) % 256 self.memory.set(addr=self.data_pointer, value=v)
[docs] def io_output(self) -> None: """ Writes the current value, after casting it via chr(), to self.stream_out. :return: None """ v = self.current_value s = chr(v) # Ducktyping stream_out self.stream_out.write(s)
[docs] def io_input(self) -> None: """ Reads a single character from self.stream_in. If self.stream_in is sys.stdin (default value), it will prompt the user for a string and record the FIRST byte of that string. Otherwise, it will attempt to read 1 byte from the stream_in buffer. Empty inputs have no effect on the state of the system. :return: None """ self.memory.set(addr=self.data_pointer, value=0) try: if self.stream_in is sys.stdin: # pragma: no cover v = input('>') v = v[0] else: v = self.stream_in.read(1) v = ord(v) except (TypeError, IndexError): return except KeyboardInterrupt: raise ExitCondition('Keyboard error - exiting.') else: self.memory.set(addr=self.data_pointer, value=v)
[docs] def run(self, tree: Tree) -> None: """ Walk a Brainfuck AST and execute the program contained in the AST. This function is recursive, so its possible for a deeply nested program to hit the Python interpreter recursion limit, but if your brainfuck does that, kudos to you. :param tree: Parsed brainfuck program. :return: """ # XXX This is a manual tree parsing. Can we do better with a lark Transformer? if tree.data == SYM_PROGRAM: log.debug('Program entry point.') for t in tree.children: self.run(t) return if tree.data == SYM_ACTIONS: token = tree.children[0] func = self.sym2func.get(token) func() return if tree.data == SYM_EXPRESSIONS: for i, t in enumerate(tree.children): # log.debug(f'Executing command #{i} - {repr(t)}') self.run(tree=t) return if tree.data == SYM_LOOP: sh = '' if self.loop_detection: sh = self.state_hash while self.current_value != 0: for t in tree.children: if isinstance(t, Tree): self.run(tree=t) if self.loop_detection: if self.state_hash == sh: raise VMError('Infinite loop detected - no change in memory during loop execution!') sh = self.state_hash return raise NotImplementedError(f'Unknown tree type seen: {tree.data}') # pragma: no cover
[docs] def parse_and_run(self, program: str) -> None: """ Parse and run a brainfuck program. :param program: String representing a brainfuck program. :return: None """ t = parse_program(s=program) self.run(tree=t)
# noinspection PyMissingOrEmptyDocstring
[docs]def main(options): # pragma: no cover if not options.verbose: logging.disable(logging.DEBUG) log.info('Executing "Hello World!"') s = '++++++++[>++++[>++>+++>+++>+<<<<-]>+>+>->>+[<]<-]>>.>---.+++++++..+++.>>.<-.<.+++.------.--------.>>+.>++.' t = parse_program(s=s) vm = VirtualMachine() vm.run(tree=t) sys.exit(0)
# noinspection PyMissingOrEmptyDocstring
[docs]def makeargpaser(): # pragma: no cover parser = argparse.ArgumentParser(description="Run a simple hello world program.") parser.add_argument('-v', '--verbose', dest='verbose', default=False, action='store_true', help='Enable verbose output') return parser
def _main(): # pragma: no cover logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(funcName)s]') p = makeargpaser() opts = p.parse_args() main(opts) if __name__ == '__main__': # pragma: no cover _main()