r"""
This module is designed for use in co-processing Python scripts. It provides a
class, Pipeline, which is designed to be used as the base-class for Python
pipeline. Additionally, this module has several other utility functions that are
approriate for co-processing.
"""
from paraview import simple, servermanager
from vtk.vtkPVVTKExtensionsCore import *
import math
# -----------------------------------------------------------------------------
[docs]def IsInModulo(timestep, frequencyArray):
"""
Return True if the given timestep is in one of the provided frequency.
This can be interpreted as follow::
isFM = IsInModulo(timestep, [2,3,7])
is similar to::
isFM = (timestep % 2 == 0) or (timestep % 3 == 0) or (timestep % 7 == 0)
"""
for frequency in frequencyArray:
if frequency > 0 and (timestep % frequency == 0):
return True
return False
[docs]class CoProcessor(object):
"""Base class for co-processing Pipelines. paraview.cpstate Module can
be used to dump out ParaView states as co-processing pipelines. Those are
typically subclasses of this. The subclasses must provide an
implementation for the CreatePipeline() method.
Cinema Tracks
=============
CoProcessor maintains user-defined information for the Cinema generation in
__CinemaTracks. This information includes track parameter values, data array
names, etc. __CinemaTracks holds this information in the following structure:
{ proxy_reference : { 'ControlName' : [value_1, value_2, ..., value_n],
'arraySelection' : ['ArrayName_1', ..., 'ArrayName_n'] } }
__CinemaTracks is populated when defining the co-processing pipline through
paraview.cpstate. paraview.cpstate uses accessor instances to set values and
array names through the RegisterCinemaTrack and AddArraysToCinemaTrack methods
of this class.
"""
def __init__(self):
self.__PipelineCreated = False
self.__ProducersMap = {}
self.__WritersList = []
self.__ViewsList = []
self.__EnableLiveVisualization = False
self.__LiveVisualizationFrequency = 1;
self.__LiveVisualizationLink = None
# __CinemaTracksList is just for Spec-A compatibility (will be deprecated
# when porting Spec-A to pv_introspect. Use __CinemaTracks instead.
self.__CinemaTracksList = []
self.__CinemaTracks = {}
self.__InitialFrequencies = {}
[docs] def SetUpdateFrequencies(self, frequencies):
"""Set the frequencies at which the pipeline needs to be updated.
Typically, this is called by the subclass once it has determined what
timesteps co-processing will be needed to be done.
frequencies is a map, with key->string name of for the simulation
input, and value is a list of frequencies.
"""
if type(frequencies) != dict:
raise RuntimeError (
"Incorrect argument type: %s, must be a dict" % type(frequencies))
self.__InitialFrequencies = frequencies
[docs] def EnableLiveVisualization(self, enable, frequency = 1):
"""Call this method to enable live-visualization. When enabled,
DoLiveVisualization() will communicate with ParaView server if possible
for live visualization. Frequency specifies how often the
communication happens (default is every second)."""
self.__EnableLiveVisualization = enable
self.__LiveVisualizationFrequency = frequency
[docs] def CreatePipeline(self, datadescription):
"""This methods must be overridden by subclasses to create the
visualization pipeline."""
raise RuntimeError ("Subclasses must override this method.")
[docs] def LoadRequestedData(self, datadescription):
"""Call this method in RequestDataDescription co-processing pass to mark
the datadescription with information about what fields and grids are
required for this pipeline for the given timestep, if any.
Default implementation uses the update-frequencies set using
SetUpdateFrequencies() to determine if the current timestep needs to
be processed and then requests all fields. Subclasses can override
this method to provide addtional customizations."""
timestep = datadescription.GetTimeStep()
# if this is a time step to do live then all of the inputs
# must be made available. note that we want the pipeline built
# before we do the actual first live connection.
if self.__EnableLiveVisualization and timestep % self.__LiveVisualizationFrequency == 0 \
and self.__LiveVisualizationLink:
if self.__LiveVisualizationLink.Initialize(servermanager.ActiveConnection.Session.GetSessionProxyManager()):
num_inputs = datadescription.GetNumberOfInputDescriptions()
for cc in range(num_inputs):
input_name = datadescription.GetInputDescriptionName(cc)
datadescription.GetInputDescription(cc).AllFieldsOn()
datadescription.GetInputDescription(cc).GenerateMeshOn()
return
# if we haven't processed the pipeline yet in DoCoProcessing() we
# must use the initial frequencies to figure out if there's
# work to do this time/timestep. If Live is enabled we mark
# all inputs as needed (this is only done if the Live connection
# hasn't been set up yet). If we don't have live enabled
# we know that the output frequencies aren't changed and can
# just use the initial frequencies.
if self.__InitialFrequencies or not self.__EnableLiveVisualization:
num_inputs = datadescription.GetNumberOfInputDescriptions()
for cc in range(num_inputs):
input_name = datadescription.GetInputDescriptionName(cc)
freqs = self.__InitialFrequencies.get(input_name, [])
if self.__EnableLiveVisualization or ( self and IsInModulo(timestep, freqs) ):
datadescription.GetInputDescription(cc).AllFieldsOn()
datadescription.GetInputDescription(cc).GenerateMeshOn()
else:
# the catalyst pipeline may have been changed by a live connection
# so we need to regenerate the frequencies
import cpstate
frequencies = {}
for writer in self.__WritersList:
frequency = writer.parameters.GetProperty(
"WriteFrequency").GetElement(0)
if (timestep % frequency) == 0 or \
datadescription.GetForceOutput() == True:
writerinputs = cpstate.locate_simulation_inputs(writer)
for writerinput in writerinputs:
datadescription.GetInputDescriptionByName(writerinput).AllFieldsOn()
datadescription.GetInputDescriptionByName(writerinput).GenerateMeshOn()
for view in self.__ViewsList:
if (view.cpFrequency and timestep % view.cpFrequency == 0) or \
datadescription.GetForceOutput() == True:
viewinputs = cpstate.locate_simulation_inputs(writer)
for viewinput in viewinputs:
datadescription.GetInputDescriptionByName(viewinput).AllFieldsOn()
datadescription.GetInputDescriptionByName(viewinput).GenerateMeshOn()
[docs] def UpdateProducers(self, datadescription):
"""This method will update the producers in the pipeline. If the
pipeline is not created, it will be created using
self.CreatePipeline().
"""
if not self.__PipelineCreated:
self.CreatePipeline(datadescription)
self.__PipelineCreated = True
if self.__EnableLiveVisualization:
# we don't want to use __InitialFrequencies any more with live viz
self.__InitialFrequencies = None
else:
simtime = datadescription.GetTime()
for name, producer in self.__ProducersMap.iteritems():
producer.GetClientSideObject().SetOutput(
datadescription.GetInputDescriptionByName(name).GetGrid(),
simtime)
[docs] def WriteData(self, datadescription):
"""This method will update all writes present in the pipeline, as
needed, to generate the output data files, respecting the
write-frequencies set on the writers."""
timestep = datadescription.GetTimeStep()
for writer in self.__WritersList:
frequency = writer.parameters.GetProperty(
"WriteFrequency").GetElement(0)
if (timestep % frequency) == 0 or \
datadescription.GetForceOutput() == True:
fileName = writer.parameters.GetProperty("FileName").GetElement(0)
writer.FileName = fileName.replace("%t", str(timestep))
writer.UpdatePipeline(datadescription.GetTime())
[docs] def WriteImages(self, datadescription, rescale_lookuptable=False):
"""This method will update all views, if present and write output
images, as needed."""
timestep = datadescription.GetTimeStep()
cinema_dirs = []
for view in self.__ViewsList:
if (view.cpFrequency and timestep % view.cpFrequency == 0) or \
datadescription.GetForceOutput() == True:
fname = view.cpFileName
fname = fname.replace("%t", str(timestep))
if view.cpFitToScreen != 0:
if view.IsA("vtkSMRenderViewProxy") == True:
view.ResetCamera()
elif view.IsA("vtkSMContextViewProxy") == True:
view.ResetDisplay()
else:
print (' do not know what to do with a ', view.GetClassName())
view.ViewTime = datadescription.GetTime()
if rescale_lookuptable:
self.RescaleDataRange(view, datadescription.GetTime())
cinemaOptions = view.cpCinemaOptions
if cinemaOptions and 'camera' in cinemaOptions:
if 'composite' in view.cpCinemaOptions and view.cpCinemaOptions['composite'] == True:
dirname = self.UpdateCinema(view, datadescription,
specLevel="B")
else:
dirname = self.UpdateCinema(view, datadescription,
specLevel="A")
if dirname:
cinema_dirs.append(dirname)
else:
# for png quality = 0 means no compression. compression can be a potentially
# very costly serial operation on process 0
if fname.endswith('png'):
simple.SaveScreenshot(fname, view, magnification=view.cpMagnification, quality=0)
else:
simple.SaveScreenshot(fname, view, magnification=view.cpMagnification)
if len(cinema_dirs) > 1:
import paraview.cinemaIO.pv_introspect as pv_introspect
pv_introspect.make_workspace_file("cinema", cinema_dirs)
[docs] def DoLiveVisualization(self, datadescription, hostname, port):
"""This method execute the code-stub needed to communicate with ParaView
for live-visualization. Call this method only if you want to support
live-visualization with your co-processing module."""
if not self.__EnableLiveVisualization:
return
if not self.__LiveVisualizationLink and self.__EnableLiveVisualization:
# Create the vtkLiveInsituLink i.e. the "link" to the visualization processes.
self.__LiveVisualizationLink = servermanager.vtkLiveInsituLink()
# Tell vtkLiveInsituLink what host/port must it connect to
# for the visualization process.
self.__LiveVisualizationLink.SetHostname(hostname)
self.__LiveVisualizationLink.SetInsituPort(int(port))
# Initialize the "link"
self.__LiveVisualizationLink.Initialize(servermanager.ActiveConnection.Session.GetSessionProxyManager())
timeStep = datadescription.GetTimeStep()
if self.__EnableLiveVisualization and timeStep % self.__LiveVisualizationFrequency == 0:
if not self.__LiveVisualizationLink.Initialize(servermanager.ActiveConnection.Session.GetSessionProxyManager()):
return
time = datadescription.GetTime()
# stay in the loop while the simulation is paused
while True:
# Update the simulation state, extracts and simulationPaused
# from ParaView Live
self.__LiveVisualizationLink.InsituUpdate(time, timeStep)
# sources need to be updated by insitu
# code. vtkLiveInsituLink never updates the pipeline, it
# simply uses the data available at the end of the
# pipeline, if any.
from paraview import simple
for source in simple.GetSources().values():
source.UpdatePipeline(time)
# push extracts to the visualization process.
self.__LiveVisualizationLink.InsituPostProcess(time, timeStep)
if (self.__LiveVisualizationLink.GetSimulationPaused()):
# This blocks until something changes on ParaView Live
# and then it continues the loop. Returns != 0 if LIVE side
# disconnects
if (self.__LiveVisualizationLink.WaitForLiveChange()):
break;
else:
break
[docs] def CreateProducer(self, datadescription, inputname):
"""Creates a producer proxy for the grid. This method is generally used in
CreatePipeline() call to create producers."""
# Check that the producer name for the input given is valid for the
# current setup.
if not datadescription.GetInputDescriptionByName(inputname):
raise RuntimeError ("Simulation input name '%s' does not exist" % inputname)
grid = datadescription.GetInputDescriptionByName(inputname).GetGrid()
producer = simple.PVTrivialProducer(guiName=inputname)
producer.add_attribute("cpSimulationInput", inputname)
# mark this as an input proxy so we can use cpstate.locate_simulation_inputs()
# to find it
producer.SMProxy.cpSimulationInput = inputname
# we purposefully don't set the time for the PVTrivialProducer here.
# when we update the pipeline we will do it then.
producer.GetClientSideObject().SetOutput(grid, datadescription.GetTime())
if grid.IsA("vtkImageData") == True or \
grid.IsA("vtkStructuredGrid") == True or \
grid.IsA("vtkRectilinearGrid") == True:
extent = datadescription.GetInputDescriptionByName(inputname).GetWholeExtent()
producer.WholeExtent= [ extent[0], extent[1], extent[2], extent[3], extent[4], extent[5] ]
# Save the producer for easy access in UpdateProducers() call.
self.__ProducersMap[inputname] = producer
producer.UpdatePipeline(datadescription.GetTime())
return producer
[docs] def RegisterWriter(self, writer, filename, freq):
"""Registers a writer proxy. This method is generally used in
CreatePipeline() to register writers. All writes created as such will
write the output files appropriately in WriteData() is called."""
writerParametersProxy = self.WriterParametersProxy(
writer, filename, freq)
writer.FileName = filename
writer.add_attribute("parameters", writerParametersProxy)
self.__WritersList.append(writer)
return writer
[docs] def WriterParametersProxy(self, writer, filename, freq):
"""Creates a client only proxy that will be synchronized with ParaView
Live, allowing a user to set the filename and frequency.
"""
controller = servermanager.ParaViewPipelineController()
# assume that a client only proxy with the same name as a writer
# is available in "insitu_writer_paramters"
# Since coprocessor sometimes pass writer as a custom object and not
# a proxy, we need to handle that. Just creating any arbitrary writer
# proxy to store the parameters it acceptable. So let's just do that
# when the writer is not a proxy.
writerIsProxy = isinstance(writer, servermanager.Proxy)
helperName = writer.GetXMLName() if writerIsProxy else "XMLPImageDataWriter"
proxy = servermanager.ProxyManager().NewProxy(
"insitu_writer_parameters", helperName)
controller.PreInitializeProxy(proxy)
if writerIsProxy:
# it's possible that the writer can take in multiple input connections
# so we need to go through all of them. the try/except block seems
# to be the best way to figure out if there are multipel input connections
try:
length = len(writer.Input)
for i in range(length):
proxy.GetProperty("Input").AddInputConnection(
writer.Input[i].SMProxy, 0)
except:
proxy.GetProperty("Input").SetInputConnection(
0, writer.Input.SMProxy, 0)
proxy.GetProperty("FileName").SetElement(0, filename)
proxy.GetProperty("WriteFrequency").SetElement(0, freq)
controller.PostInitializeProxy(proxy)
controller.RegisterPipelineProxy(proxy)
return proxy
[docs] def RegisterCinemaTrack(self, name, proxy, smproperty, valrange):
"""
Register a point of control (filter's property) that will be varied over in a cinema export.
"""
if not isinstance(proxy, servermanager.Proxy):
raise RuntimeError ("Invalid 'proxy' argument passed to RegisterCinemaTrack.")
self.__CinemaTracksList.append({"name":name, "proxy":proxy, "smproperty":smproperty, "valrange":valrange})
proxyDefinitions = self.__CinemaTracks[proxy] if (proxy in self.__CinemaTracks) else {}
proxyDefinitions[smproperty] = valrange
self.__CinemaTracks[proxy] = proxyDefinitions
return proxy
[docs] def AddArraysToCinemaTrack(self, proxy, propertyName, arrayNames):
''' Register user-defined target arrays by name. '''
if not isinstance(proxy, servermanager.Proxy):
raise RuntimeError ("Invalid 'proxy' argument passed to AddArraysToCinemaTrack.")
proxyDefinitions = self.__CinemaTracks[proxy] if (proxy in self.__CinemaTracks) else {}
proxyDefinitions[propertyName] = arrayNames
self.__CinemaTracks[proxy] = proxyDefinitions
return proxy
[docs] def RegisterView(self, view, filename, freq, fittoscreen, magnification, width, height, cinema=None):
"""Register a view for image capture with extra meta-data such
as magnification, size and frequency."""
if not isinstance(view, servermanager.Proxy):
raise RuntimeError ("Invalid 'view' argument passed to RegisterView.")
view.add_attribute("cpFileName", filename)
view.add_attribute("cpFrequency", freq)
view.add_attribute("cpFileName", filename)
view.add_attribute("cpFitToScreen", fittoscreen)
view.add_attribute("cpMagnification", magnification)
view.add_attribute("cpCinemaOptions", cinema)
view.ViewSize = [ width, height ]
self.__ViewsList.append(view)
return view
[docs] def CreateWriter(self, proxy_ctor, filename, freq):
""" **** DEPRECATED!!! Use RegisterWriter instead ****
Creates a writer proxy. This method is generally used in
CreatePipeline() to create writers. All writes created as such will
write the output files appropriately in WriteData() is called."""
writer = proxy_ctor()
return self.RegisterWriter(writer, filename, freq)
[docs] def CreateView(self, proxy_ctor, filename, freq, fittoscreen, magnification, width, height):
""" **** DEPRECATED!!! Use RegisterView instead ****
Create a CoProcessing view for image capture with extra meta-data
such as magnification, size and frequency."""
view = proxy_ctor()
return self.RegisterView(view, filename, freq, fittoscreen, magnification, width, height, None)
[docs] def Finalize(self):
for writer in self.__WritersList:
if hasattr(writer, 'Finalize'):
writer.Finalize()
for view in self.__ViewsList:
if hasattr(view, 'Finalize'):
view.Finalize()
[docs] def RescaleDataRange(self, view, time):
"""DataRange can change across time, sometime we want to rescale the
color map to match to the closer actual data range."""
reps = view.Representations
for rep in reps:
if not hasattr(rep, 'Visibility') or \
not rep.Visibility or \
not hasattr(rep, 'MapScalars') or \
not rep.MapScalars or \
not rep.LookupTable:
# rep is either not visibile or not mapping scalars using a LUT.
continue;
input = rep.Input
input.UpdatePipeline(time) #make sure range is up-to-date
lut = rep.LookupTable
colorArrayInfo = rep.GetArrayInformationForColorArray()
if not colorArrayInfo:
continue
if lut.VectorMode != 'Magnitude' or \
colorArrayInfo.GetNumberOfComponents() == 1:
datarange = colorArrayInfo.GetComponentRange(lut.VectorComponent)
else:
# -1 corresponds to the magnitude.
datarange = colorArrayInfo.GetComponentRange(-1)
import vtkParallelCorePython
import paraview.vtk as vtk
import paraview.servermanager
pm = paraview.servermanager.vtkProcessModule.GetProcessModule()
globalController = pm.GetGlobalController()
localarray = vtk.vtkDoubleArray()
localarray.SetNumberOfTuples(2)
localarray.SetValue(0, -datarange[0]) # negate so that MPI_MAX gets min instead of doing a MPI_MIN and MPI_MAX
localarray.SetValue(1, datarange[1])
globalarray = vtk.vtkDoubleArray()
globalarray.SetNumberOfTuples(2)
globalController.AllReduce(localarray, globalarray, 0)
globaldatarange = [-globalarray.GetValue(0), globalarray.GetValue(1)]
rgbpoints = lut.RGBPoints.GetData()
numpts = len(rgbpoints)//4
if globaldatarange[0] != rgbpoints[0] or globaldatarange[1] != rgbpoints[(numpts-1)*4]:
# rescale all of the points
oldrange = rgbpoints[(numpts-1)*4] - rgbpoints[0]
newrange = globaldatarange[1] - globaldatarange[0]
# only readjust if the new range isn't zero.
if newrange != 0:
newrgbpoints = list(rgbpoints)
# if the old range isn't 0 then we use that ranges distribution
if oldrange != 0:
for v in range(numpts-1):
newrgbpoints[v*4] = globaldatarange[0]+(rgbpoints[v*4] - rgbpoints[0])*newrange/oldrange
# avoid numerical round-off, at least with the last point
newrgbpoints[(numpts-1)*4] = globaldatarange[1]
else: # the old range is 0 so the best we can do is to space the new points evenly
for v in range(numpts+1):
newrgbpoints[v*4] = globaldatarange[0]+v*newrange/(1.0*numpts)
lut.RGBPoints.SetData(newrgbpoints)
[docs] def UpdateCinema(self, view, datadescription, specLevel):
""" called from catalyst at each timestep to add to the cinema database """
if not view.IsA("vtkSMRenderViewProxy") == True:
return
try:
import paraview.cinemaIO.cinema_store as CS
import paraview.cinemaIO.explorers as explorers
import paraview.cinemaIO.pv_explorers as pv_explorers
import paraview.cinemaIO.pv_introspect as pv_introspect
import paraview.simple as simple
except ImportError as e:
paraview.print_error("Cannot import cinema")
paraview.print_error(e)
return
#figure out where to put this store
import os.path
vfname = view.cpFileName
vfname = vfname[0:vfname.rfind("_")] #strip _num.ext
fname = os.path.join(os.path.dirname(vfname),
"cinema",
os.path.basename(vfname),
"info.json")
def float_limiter(x):
#a shame, but needed to make sure python, javascript and (directory/file)name agree
if isinstance(x, (float)):
return '%.6e' % x #arbitrarily chose 6 significant digits
else:
return x
#what time?
timestep = datadescription.GetTimeStep()
time = datadescription.GetTime()
view.ViewTime = time
formatted_time = float_limiter(time)
# Include camera information in the user defined parameters.
# pv_introspect uses __CinemaTracks to customize the exploration.
co = view.cpCinemaOptions
camType = co["camera"]
if "phi" in co:
self.__CinemaTracks["phi"] = co["phi"]
if "theta" in co:
self.__CinemaTracks["theta"] = co["theta"]
if "roll" in co:
self.__CinemaTracks["roll"] = co["roll"]
tracking_def = {}
if "tracking" in co:
tracking_def = co['tracking']
#figure out what we show now
pxystate= pv_introspect.record_visibility()
# a conservative global bounds for consistent z scaling
minbds, maxbds = pv_introspect.max_bounds()
#make sure depth rasters are consistent
view.MaxClipBounds = [minbds, maxbds, minbds, maxbds, minbds, maxbds]
view.LockBounds = 1
if specLevel=="B":
p = pv_introspect.inspect(skip_invisible=True)
else:
p = pv_introspect.inspect(skip_invisible=False)
fs = pv_introspect.make_cinema_store(p, fname, view,
forcetime = formatted_time,
userDefined = self.__CinemaTracks,
specLevel = specLevel,
camType = camType)
#all nodes participate, but only root can writes out the files
pm = servermanager.vtkProcessModule.GetProcessModule()
pid = pm.GetPartitionId()
enableFloatVal = False if 'floatValues' not in co else co['floatValues']
pv_introspect.explore(fs, p, iSave = (pid == 0), currentTime = {'time':formatted_time},
userDefined = self.__CinemaTracks,
specLevel = specLevel,
camType = camType,
tracking = tracking_def, floatValues = enableFloatVal)
if pid == 0:
fs.save()
view.LockBounds = 0
#restore what we showed
pv_introspect.restore_visibility(pxystate)
return os.path.basename(vfname)