OpenShot Audio Library | OpenShotAudio  0.3.0
juce_ConnectedChildProcess.cpp
1 /*
2  ==============================================================================
3 
4  This file is part of the JUCE library.
5  Copyright (c) 2017 - ROLI Ltd.
6 
7  JUCE is an open source library subject to commercial or open-source
8  licensing.
9 
10  The code included in this file is provided under the terms of the ISC license
11  http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12  To use, copy, modify, and/or distribute this software for any purpose with or
13  without fee is hereby granted provided that the above copyright notice and
14  this permission notice appear in all copies.
15 
16  JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17  EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18  DISCLAIMED.
19 
20  ==============================================================================
21 */
22 
23 namespace juce
24 {
25 
26 enum { magicMastSlaveConnectionHeader = 0x712baf04 };
27 
28 static const char* startMessage = "__ipc_st";
29 static const char* killMessage = "__ipc_k_";
30 static const char* pingMessage = "__ipc_p_";
31 enum { specialMessageSize = 8, defaultTimeoutMs = 8000 };
32 
33 static inline bool isMessageType (const MemoryBlock& mb, const char* messageType) noexcept
34 {
35  return mb.matches (messageType, (size_t) specialMessageSize);
36 }
37 
38 static String getCommandLinePrefix (const String& commandLineUniqueID)
39 {
40  return "--" + commandLineUniqueID + ":";
41 }
42 
43 //==============================================================================
44 // This thread sends and receives ping messages every second, so that it
45 // can find out if the other process has stopped running.
46 struct ChildProcessPingThread : public Thread,
47  private AsyncUpdater
48 {
49  ChildProcessPingThread (int timeout) : Thread ("IPC ping"), timeoutMs (timeout)
50  {
51  pingReceived();
52  }
53 
54  void pingReceived() noexcept { countdown = timeoutMs / 1000 + 1; }
55  void triggerConnectionLostMessage() { triggerAsyncUpdate(); }
56 
57  virtual bool sendPingMessage (const MemoryBlock&) = 0;
58  virtual void pingFailed() = 0;
59 
60  int timeoutMs;
61 
62 private:
63  Atomic<int> countdown;
64 
65  void handleAsyncUpdate() override { pingFailed(); }
66 
67  void run() override
68  {
69  while (! threadShouldExit())
70  {
71  if (--countdown <= 0 || ! sendPingMessage ({ pingMessage, specialMessageSize }))
72  {
73  triggerConnectionLostMessage();
74  break;
75  }
76 
77  wait (1000);
78  }
79  }
80 
81  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ChildProcessPingThread)
82 };
83 
84 //==============================================================================
85 struct ChildProcessMaster::Connection : public InterprocessConnection,
86  private ChildProcessPingThread
87 {
88  Connection (ChildProcessMaster& m, const String& pipeName, int timeout)
89  : InterprocessConnection (false, magicMastSlaveConnectionHeader),
90  ChildProcessPingThread (timeout),
91  owner (m)
92  {
93  if (createPipe (pipeName, timeoutMs))
94  startThread (4);
95  }
96 
97  ~Connection() override
98  {
99  stopThread (10000);
100  }
101 
102 private:
103  void connectionMade() override {}
104  void connectionLost() override { owner.handleConnectionLost(); }
105 
106  bool sendPingMessage (const MemoryBlock& m) override { return owner.sendMessageToSlave (m); }
107  void pingFailed() override { connectionLost(); }
108 
109  void messageReceived (const MemoryBlock& m) override
110  {
111  pingReceived();
112 
113  if (m.getSize() != specialMessageSize || ! isMessageType (m, pingMessage))
114  owner.handleMessageFromSlave (m);
115  }
116 
117  ChildProcessMaster& owner;
118 
119  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Connection)
120 };
121 
122 //==============================================================================
124 
126 {
128 }
129 
131 
133 {
134  if (connection != nullptr)
135  return connection->sendMessage (mb);
136 
137  jassertfalse; // this can only be used when the connection is active!
138  return false;
139 }
140 
141 bool ChildProcessMaster::launchSlaveProcess (const File& executable, const String& commandLineUniqueID,
142  int timeoutMs, int streamFlags)
143 {
145 
146  auto pipeName = "p" + String::toHexString (Random().nextInt64());
147 
148  StringArray args;
149  args.add (executable.getFullPathName());
150  args.add (getCommandLinePrefix (commandLineUniqueID) + pipeName);
151 
152  childProcess.reset (new ChildProcess());
153 
154  if (childProcess->start (args, streamFlags))
155  {
156  connection.reset (new Connection (*this, pipeName, timeoutMs <= 0 ? defaultTimeoutMs : timeoutMs));
157 
158  if (connection->isConnected())
159  {
160  sendMessageToSlave ({ startMessage, specialMessageSize });
161  return true;
162  }
163 
164  connection.reset();
165  }
166 
167  return false;
168 }
169 
171 {
172  if (connection != nullptr)
173  {
174  sendMessageToSlave ({ killMessage, specialMessageSize });
175  connection->disconnect();
176  connection.reset();
177  }
178 
179  childProcess.reset();
180 }
181 
182 //==============================================================================
183 struct ChildProcessSlave::Connection : public InterprocessConnection,
184  private ChildProcessPingThread
185 {
186  Connection (ChildProcessSlave& p, const String& pipeName, int timeout)
187  : InterprocessConnection (false, magicMastSlaveConnectionHeader),
188  ChildProcessPingThread (timeout),
189  owner (p)
190  {
191  connectToPipe (pipeName, timeoutMs);
192  startThread (4);
193  }
194 
195  ~Connection() override
196  {
197  stopThread (10000);
198  }
199 
200 private:
201  ChildProcessSlave& owner;
202 
203  void connectionMade() override {}
204  void connectionLost() override { owner.handleConnectionLost(); }
205 
206  bool sendPingMessage (const MemoryBlock& m) override { return owner.sendMessageToMaster (m); }
207  void pingFailed() override { connectionLost(); }
208 
209  void messageReceived (const MemoryBlock& m) override
210  {
211  pingReceived();
212 
213  if (isMessageType (m, pingMessage))
214  return;
215 
216  if (isMessageType (m, killMessage))
217  return triggerConnectionLostMessage();
218 
219  if (isMessageType (m, startMessage))
220  return owner.handleConnectionMade();
221 
222  owner.handleMessageFromMaster (m);
223  }
224 
225  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Connection)
226 };
227 
228 //==============================================================================
231 
234 
236 {
237  if (connection != nullptr)
238  return connection->sendMessage (mb);
239 
240  jassertfalse; // this can only be used when the connection is active!
241  return false;
242 }
243 
245  const String& commandLineUniqueID,
246  int timeoutMs)
247 {
248  auto prefix = getCommandLinePrefix (commandLineUniqueID);
249 
250  if (commandLine.trim().startsWith (prefix))
251  {
252  auto pipeName = commandLine.fromFirstOccurrenceOf (prefix, false, false)
253  .upToFirstOccurrenceOf (" ", false, false).trim();
254 
255  if (pipeName.isNotEmpty())
256  {
257  connection.reset (new Connection (*this, pipeName, timeoutMs <= 0 ? defaultTimeoutMs : timeoutMs));
258 
259  if (! connection->isConnected())
260  connection.reset();
261  }
262  }
263 
264  return connection != nullptr;
265 }
266 
267 } // namespace juce
bool sendMessageToSlave(const MemoryBlock &)
bool launchSlaveProcess(const File &executableToLaunch, const String &commandLineUniqueID, int timeoutMs=0, int streamFlags=ChildProcess::wantStdOut|ChildProcess::wantStdErr)
bool initialiseFromCommandLine(const String &commandLine, const String &commandLineUniqueID, int timeoutMs=0)
bool sendMessageToMaster(const MemoryBlock &)
const String & getFullPathName() const noexcept
Definition: juce_File.h:149
InterprocessConnection(bool callbacksOnMessageThread=true, uint32 magicMessageHeaderNumber=0xf2b49e2c)
void add(String stringToAdd)
String upToFirstOccurrenceOf(StringRef substringToEndWith, bool includeSubStringInResult, bool ignoreCase) const
String trim() const
bool startsWith(StringRef text) const noexcept
static String toHexString(IntegerType number)
Definition: juce_String.h:1053
String fromFirstOccurrenceOf(StringRef substringToStartFrom, bool includeSubStringInResult, bool ignoreCase) const
bool wait(int timeOutMilliseconds) const
Thread(const String &threadName, size_t threadStackSize=0)
Definition: juce_Thread.cpp:26
bool threadShouldExit() const